简介
ZMQ(ZERO MQ) 是个类似于 Socket 的一系列接口,用于 node 与 node 间的通信,node 可以是主机或者是进程。
ZMQ 使用 c/c++ 开发的。接口是 c (接口/实现文件 zmq.h/zmq.cpp)。
官网: https://zeromq.org/
与 Socket 的区别
- 一个套接字可以有多个输入和输出连接。普通的 Socket 是端到端的(1:1的关系),而 ZMQ 却是可以N:M 的关系.
- Socket 是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而 ZMQ 屏蔽了这些细节,让你的网络编程更为简单。
- ZMQ提供了多种模式进行消息路由,如请求-应答模式、发布-订阅模式等。这些模式可以用来搭建网络拓扑结构。
- 使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
- 连接是异步的,并由一组消息队列做缓冲,后台线程异步地处理I/O操作。
- ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
- ZMQ会发送整个消息,使用消息帧的机制来传递。如果你发送了10KB大小的消息,你就会收到10KB大小的消息。不强制使用某种消息格式,消息可以是0字节的,或是大到GB级的数据
- ZMQ会负责自动重连,能够智能地处理网络错误,有时它会进行重试,有时会告知你某项操作发生了错误。服务端可以随意地加入或退出网络。
生命周期
主要包含四个部分:
- 创建和销毁套接字:zmq_socket(), zmq_close()
- 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
- 为套接字建立连接:zmq_bind(), zmq_connect()
- 发送和接收消息:zmq_send(), zmq_recv()
消息异常处理
- ZMQ有阈值(HWM)的机制,可以避免消息溢出。当队列已满,ZMQ会自动阻塞发送者,或丢弃部分消息,这些行为取决于你所使用的消息模式。
- ZMQ会恰当地处理速度较慢的节点,会根据消息模式使用不同的策略。
- ZMQ中可以根据消息模式建立起一些中间装置(很小巧),可以用来降低网络的复杂程度。
Request-Reply 模式
客户端
- 首先初始化环境, 创建 REQ 套接字;
- 然后在一个循环中发送消息 ---> 接收消息. 如果打乱了这个顺序(如连续发送两次)则会报错
- 退出时关闭 REQ 套接字和释放环境(类的实例会自动释放)
#include <string>
#include <iostream>
#include <zmq.hpp>
int main()
{
// initialize the zmq context with a single IO thread
zmq::context_t context{1};
// construct a REQ (request) socket and connect to interface
zmq::socket_t socket{context, zmq::socket_type::req};
socket.connect("tcp://localhost:5555");
// set up some static data to send
const std::string data{"Hello"};
for (auto request_num = 0; request_num < 10; ++request_num)
{
// send the request message
std::cout << "Sending Hello " << request_num << "..." << std::endl;
socket.send(zmq::buffer(data), zmq::send_flags::none);
// wait for reply from server
zmq::message_t reply{};
socket.recv(reply, zmq::recv_flags::none);
std::cout << "Received " << reply.to_string();
std::cout << " (" << request_num << ")";
std::cout << std::endl;
}
return 0;
}
服务端
- 首先初始化环境,创建 REP 套接字,并绑到端口;
- 然后在一个循环中 zmq_recv 接收接受消息 ---> zmq_send 发送. 如果打乱了这个顺序(如接受和发送顺序反了)则会报错
- 对于没有 client 连接的消息, 退出时不用做任何处理(类的实例会自动释放)
#include <string>
#include <chrono>
#include <thread>
#include <iostream>
#include <zmq.hpp>
int main()
{
using namespace std::chrono_literals;
// initialize the zmq context with a single IO thread
zmq::context_t context{1};
// construct a REP (reply) socket and bind to interface
zmq::socket_t socket{context, zmq::socket_type::rep};
socket.bind("tcp://*:5555");
// prepare some static data for responses
const std::string data{"World"};
for (;;)
{
zmq::message_t request;
// receive a request from client
socket.recv(request, zmq::recv_flags::none);
std::cout << "Received " << request.to_string() << std::endl;
// simulate work
std::this_thread::sleep_for(1s);
// send the reply to the client
socket.send(zmq::buffer(data), zmq::send_flags::none);
}
return 0;
}
Publish-Subscribe 模式
客户端
- 首先初始化环境, 创建 ZMQ_SUB 套接字;
- 然后设置订阅消息,如果不 zmq_setsockopt 设置订阅内容,那将什么消息都收不到;
- 最后在一个循环体中使用 s_recv() 接收消息.
- 退出时关闭 ZMQ_SUB 套接字和释放环境(类的实例会自动释放)
.
/**
* Example of ZeroMQ pub/sub usage for C++11.
*/
#include <zmqpp/zmqpp.hpp>
#include <iostream>
#include <chrono>
using namespace std;
static const string PUBLISHER_ENDPOINT = "tcp://localhost:4242";
int main(int argc, char *argv[]) {
// Create a subscriber socket
zmqpp::context context;
zmqpp::socket_type type = zmqpp::socket_type::subscribe;
zmqpp::socket socket(context, type);
// Subscribe to the default channel
socket.subscribe("");
// Connect to the publisher
cout << "Connecting to " << PUBLISHER_ENDPOINT << "..." << endl;
socket.connect(PUBLISHER_ENDPOINT);
while(true) {
// Receive (blocking call)
zmqpp::message message;
socket.receive(message);
// Read as a string
string text;
message >> text;
unsigned long ms = std::chrono::system_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
cout << "[RECV] at " << ms << ": \"" << text << "\"" << endl;
}
// Unreachable, but for good measure
socket.disconnect(PUBLISHER_ENDPOINT);
return 0;
}
服务端
- 首先初始化环境, 创建 ZMQ_PUB 套接字, 并绑定端口;
- 然后在一个循环中发布消息.对于没有 client 订阅的消息,则直接被抛弃
- 退出时关闭 ZMQ_PUB 套接字和释放环境(类的实例会自动释放)
关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。
就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零.
/**
* Example of ZeroMQ pub/sub usage for C++11.
*/
#include <zmqpp/zmqpp.hpp>
#include <iostream>
#include <chrono>
#include <thread>
using namespace std;
static const string PUBLISH_ENDPOINT = "tcp://*:4242";
int main(int argc, char *argv[]) {
// Create a publisher socket
zmqpp::context context;
zmqpp::socket_type type = zmqpp::socket_type::publish;
zmqpp::socket socket (context, type);
// Open the connection
cout << "Binding to " << PUBLISH_ENDPOINT << "..." << endl;
socket.bind(PUBLISH_ENDPOINT);
// Pause to connect
this_thread::sleep_for(chrono::milliseconds(1000));
while(true) {
// Current time in ms
unsigned long ms = chrono::system_clock::now().time_since_epoch() /
chrono::milliseconds(1);
string text = "Hello at " + to_string(ms);
// Create a message and feed data into it
zmqpp::message message;
message << text;
// Send it off to any subscribers
socket.send(message);
cout << "[SENT] at " << ms << ": " << text << endl;
this_thread::sleep_for(chrono::microseconds(1000));
}
// Unreachable, but for good measure
socket.disconnect(PUBLISH_ENDPOINT);
return 0;
}
push/pull 模式
- ZMQ_PUSH 被多个 ZMQ_PULL 连接时,多个 ZMQ_PULL 之间采用公平队列的方式接收数据
- 用的最多的地方就是分而治之,把多个任务平均分配到多个 ZMQ_PULL 上执行,执行完成后再由一个 ZMQ_PULL 收集执行的结果
- ZMQ_PUSH 的消息会一直阻塞,不会被丢弃
push
- 首先初始化环境; 创建 ZMQ_PUSH 套接字 pusher, 并绑定到端口
- pusher 发送具体任务.
- 退出时使用 zmq_close 关闭 push_socket 和释放环境(类的实例会自动释放)
#include <zmq.hpp>
int main()
{
zmq::context_t ctx;
zmq::socket_t pusher(ctx, zmq::socket_type::push);
pusher.bind("inproc://test");
pusher.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
}
pull
- 首先初始化环境; 创建 ZMQ_PULL 套接字 puller,并连接到 pusher;
- 接受来自 ventilator 中 sender 的任务
- 退出时关闭 receiver/sender 两个套接字和释放环境(类的实例会自动释放)
#include <iostream>
#include <zmq_addon.hpp>
int main()
{
zmq::context_t ctx;
zmq::socket_t puller(ctx, zmq::socket_type::pull);
puller.connect(last_endpoint);
std::vector<zmq::message_t> recv_msgs;
const auto ret = zmq::recv_multipart(
puller, std::back_inserter(recv_msgs));
if (!ret)
return 1;
std::cout << "Got " << *ret
<< " messages" << std::endl;
return 0;
}