近期调研和使用 zeromq 与 cppzmq 的一些问题

2023-04-27,,

关于message

消息分片

消息分片的发送

消息分片允许将多个消息封装成一条消息。在发送自定义协议数据时,我们经常需要在消息前“填充”一个包头。如下代码,在发送的时候加上 zmq::send_flags::sndmore 标识(对应 zeromq ZMQ_SNDMORE),表示后面还有消息。这样 zeromq 会将 ZMQ_SNDMORE 的消息和最后一段消息拼装成一条完整的消息发送。

int SendData(char* pMsg, int iMsgLen)
{
tagMsgHead stHead;
bzero(&stHead, sizeof(stHead));
...
stHead.Len = iMsgLen;
stHead.Crc = 0; try
{
m_socket.send(zmq::const_buffer((const void*)(&stHead), sizeof(stHead)),zmq::send_flags::dontwait|zmq::send_flags::sndmore);
m_socket.send(zmq::const_buffer((const void*)(pMsg), static_cast<size_t>(iMsgLen)), zmq::send_flags::dontwait);
}
catch (...)
{
....
} return 0;
}

消息分片的接收

需要注意的是,如果发送使用了 ZMQ_SNDMORE 分片,那么在接收时也需要分多次 recv 接收数据(这点比较麻烦)。开始的时候以为 recv 接收的是一个完成的包,后面才知道 recv 接收的其实是“帧”数据,多个“帧”拼装成一个消息。具体接收方法如下:

Buffer buffer;
while (1) {
// 接收消息
zmq_msg_t identify;
zmq_msg_t message;
zmq_msg_init(&identify);
zmq_msg_init(&message);
zmq_recvmsg(socket, &identify, 0);
zmq_recvmsg(socket, &message, 0); buffer.Append(zmq_msg_data(&message), zmq_msg_size(&message));
// 检查是否还有更多消息可读
while(zmq_msg_more(&message)) {
zmq_recvmsg(socket, &message, 0);
buffer.Append(zmq_msg_data(&message), zmq_msg_size(&message));
}
zmq_msg_close(&identify);
zmq_msg_close(&message);
}

使用 cppzmq 的话,代码如下:

Buffer buffer;
while (1) {
// 接收消息
zmq::message_t identity;
zmq::message_t message;
socket.recv(identity, zmq::recv_flags::none);
socket.recv(message, zmq::recv_flags::none); buffer.Append(message.data(), message.size());
// 检查是否还有更多消息可读
while(message.more()) {
socket.recv(message, zmq::recv_flags::none);
buffer.Append(message.data(), message.size());
}
}

使用 ZMQ_SNDMORE 后接收也需要分片接收,这个确实是比较麻烦的地方。个人觉得如果改成一次接收会更好,因为这样更符合使用的“直觉”。

后面会不断更新这部分,有新的问题会加进来。

近期调研和使用 zeromq 与 cppzmq 的一些问题的相关教程结束。

《近期调研和使用 zeromq 与 cppzmq 的一些问题.doc》

下载本文的Word格式文档,以方便收藏与打印。