本文简要介绍Adf.QueueServer的一些常规应用和协议。
1. 协议描述
本服务支持 WebSocket Json/ WebSocket Binary / HTTP Json 三种通信方式
一般使用 WebSocket 做异步push/pull, 使用HTTP做同步push/pull
若同时使用多种格式JSON/BINARY,需要注意消息体(body)是否可以通过UTF8编码于二进制与字符串之间相互转化,否则可能会出现乱码情况。理论上同一队列应尽量保持使用同一种格式传输。
本服务为FIFO QUEUE,消息将保持顺序PULL,在单线程消费下消息与业务均保持顺序性。在多终端或多线程或多PULL下、消息保持顺序PULl但不保证业务执行的顺序性。
本服务支持命令:
rpush | 队列右侧插入一项 |
lpush | 队列左侧插入一项 |
pull | 从队列中获取最左侧一项 |
delete | 删除已从队列中pull过的项,未被pull过的项不可被删除 |
lcancel | 恢复已pull过的项至队列左侧 |
rcancel | 恢复已pull过的项至队列右侧 |
count | 查询队列长度 |
clear | 清空队列项 |
2. JSON格式
字符编码格式: UTF8
//push request: { "action":"lpush/rpush", "requestid":"1 ~ 32 chr. propose use uuid", "queue": "/order/new", "body":"this is a test message." } //push response: { "action":"lpush/rpush", "requestid":"88bea088837d4743af67a2d49e6d08d1", "queue": "/order/new", "result":"ok or failure message", "messageid": 1452443242 } //delete/lcancel/rcancel/createqueue/deletequeue/count/clear/pull request: { "action":"delete/lcancel/rcancel/createqueue/deletequeue/count/clear/pull", "requestid":"1 ~ 32 chr. propose use uuid", "queue": "/order/new" } //delete/lcancel/rcancel/createqueue/deletequeue response: { "action":"delete/lcancel/rcancel/createqueue/deletequeue", "requestid":"88bea088837d4743af67a2d49e6d08d1", "queue": "/order/new", "result":"ok or failure message" } //count/clear response: { "action":"count/clear", "requestid":"88bea088837d4743af67a2d49e6d08d1", "queue": "/order/new", "result":"ok or failure message", "count":0 } //pull response: { "action":"pull", "requestid":"88bea088837d4743af67a2d49e6d08d1", "queue": "/order/new", "result":"ok or failure message", "body":"message body.", "duplications": 0, "messageid": 1452443242 }
2. 二进制传输格式
Endianness : Big-Endian
Body: UTF8
二进制元素内容与JSON结构一致。
//type: action : byte xx length : uint16 duplications: uint16 count : int32 messageid : uint64 //push request: +---------------------------------------------------------------+ | action (1) +---------------------------------------------------------------+ | id length (2) +---------------------------------------------------------------+ | id (1 ~ 65535) +---------------------------------------------------------------+ | queue length (2) +---------------------------------------------------------------+ | queue (1 ~ 65535) +---------------------------------------------------------------+ | body length (2) +---------------------------------------------------------------+ | body (1 ~ 65535) +---------------------------------------------------------------+ //push response: +---------------------------------------------------------------+ | action (1) +---------------------------------------------------------------+ | id length (2) +---------------------------------------------------------------+ | id (1 ~ 65535) +---------------------------------------------------------------+ | queue length (2) +---------------------------------------------------------------+ | queue (1 ~ 65535) +---------------------------------------------------------------+ | result length (2) +---------------------------------------------------------------+ | result (1 ~ 65535) +---------------------------------------------------------------+ | OK | messageid length (8) +---------------------------------------------------------------+ //delete/lcancel/rcancel/createqueue/deletequeue/count/clear/pull request: +---------------------------------------------------------------+ | action (1) +---------------------------------------------------------------+ | id length (2) +---------------------------------------------------------------+ | id (1 ~ 65535) +---------------------------------------------------------------+ | queue length (2) +---------------------------------------------------------------+ | queue (1 ~ 65535) +---------------------------------------------------------------+ //delete/lcancel/rcancel/createqueue/deletequeue response: +---------------------------------------------------------------+ | action (1) +---------------------------------------------------------------+ | id length (2) +---------------------------------------------------------------+ | id (1 ~ 65535) +---------------------------------------------------------------+ | queue length (2) +---------------------------------------------------------------+ | queue (1 ~ 65535) +---------------------------------------------------------------+ | result length (2) +---------------------------------------------------------------+ | result (1 ~ 65535) +---------------------------------------------------------------+ //count/clear response: +---------------------------------------------------------------+ | action (1) +---------------------------------------------------------------+ | id length (2) +---------------------------------------------------------------+ | id (1 ~ 65535) +---------------------------------------------------------------+ | queue length (2) +---------------------------------------------------------------+ | queue (1 ~ 65535) +---------------------------------------------------------------+ | result length (2) +---------------------------------------------------------------+ | result (1 ~ 65535) +---------------------------------------------------------------+ | OK | count (4) +---------------------------------------------------------------+ //pull response: +---------------------------------------------------------------+ | action (1) +---------------------------------------------------------------+ | id length (2) +---------------------------------------------------------------+ | id (1 ~ 65535) +---------------------------------------------------------------+ | queue length (2) +---------------------------------------------------------------+ | queue (1 ~ 65535) +---------------------------------------------------------------+ | result length (2) +---------------------------------------------------------------+ | result (1 ~ 65535) +---------------------------------------------------------------+ | | body length (2) + -----------------------------------------------------------+ | | body (1 ~ 65535) + OK -----------------------------------------------------------+ | | duplications (2) + -----------------------------------------------------------+ | | messageid (8) +---------------------------------------------------------------+
3. HTTP 请求
字符编码: UTF8
元素与JSON结构一致,但在请求时除rpush/lpush时的body以外,其它所有元素均以Queue String方式进行传递。
返回结果与同JSON结构
以.json后缀的接口,返回JSON
以.bin 后缀的接口,返回二进制数据
PATH | METHOD | REMARKS |
/queue/rpush.json?queue=&requestid= /queue/rpush.bin?queue=&requestid= |
POST | application/octet-stream 内容为消息实体BODY |
/queue/lpush.json?queue=&requestid= /queue/lpush.bin?queue=&requestid= |
POST | application/octet-stream 内容为消息实体BODY |
/queue/pull.json?queue=&requestid= /queue/pull.bin?queue=&requestid= |
GET | 拉取一个消息 |
/queue/delete.json?queue=&requestid= /queue/delete.bin?queue=&requestid= |
GET | 删除一个已拉取消息 |
/queue/lcancel.json?queue=&requestid= /queue/lcancel.bin?queue=&requestid= |
GET | 恢复一个已拉取消息至队列左侧 |
/queue/rcancel.json?queue=&requestid= /queue/rcancel.bin?queue=&requestid= |
GET | 恢复一个已拉取消息至队列右侧 |
/queue/count.json?queue=&requestid= /queue/count.bin?queue=&requestid= |
GET | 获取一个队列元素数 |
/queue/clear.json?queue=&requestid= /queue/clear.bin?queue=&requestid= |
GET | 清空一个队列元素 |
4. Websocket
Binary ws://{host}:{port}/queue/bin
Json ws://{host}:{port}/queue/json
同一个ws连接允许同时多个PULL请求。
5. RequestID 描述
协议中 RequestID 为请求标识,不做为消息标识,该标识由使用者自行生成,可以是1-32位字符,除PULL以外的其它所有命令均允许该体重复。
在PULL时若使用websocket则必需保证同一连接中不重复,常在同一连接中使用递增数做为标识。若使用http则需保证http全局不重复。 不能重复是因为PULL的标识需在要delete/lcancel/rcancel时做为识别标识使用。
6. Action 描述
编码如下
const byte LPUSH = 1; const byte RPUSH = 2; const byte DELETE = 3; const byte PULL = 4; const byte CLEAR = 5; const byte COUNT = 6; const byte LCANCEL = 7; const byte RCANCEL = 8; const byte CREATEQUEUE = 9; const byte DELETEQUEUE = 10; const string OK = "ok";
7. 消息事务模式
本服务原生支持事务模式, 不支持非事务模式的消息消费,所有消息在消费后必需通过delete命令删除;
若终端在处理业务时认为是一个失败的业务消费可以通过lcancel/rcancel将消息退回至队列的左侧或右侧;
面向连接使用方式,若不进行delete操作,则在连接断开后将自动恢复至队列左侧;
消息每被退回后再次PULL一次则duplications将为递增1,通常可以通过该值来判断该消息是否为被初次消费,若不是被初次消费则应检查重复消费所导致的脏数据或重复数据,如:不应产生两次相同的订单或扣款。