ZeroMQ的独占模式学习
Linux开发架构之路 2024-11-22

一、通信模式介绍

引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。” ---来自百度

ZeroMQ已经支持多种通信模式:

  • Request-reply pattern(请求-回复模式)

  • Publish-subscribe pattern(发布-订阅模式)

  • Pipeline pattern(管道模式)

  • Exclusive pair pattern (独占对等模式)

1-N、N-1 和 N-M消息传递模式

1-N 模式

本质:一个发送者(源)将消息广播或分发给多个接收者。发送者主动推送消息,而接收者被动接收消息。这种模式常用于广播和消息分发。

举例

Pub-Sub(发布-订阅)模式:一个发布者发布消息,多个订阅者接收这些消息。

场景:新闻广播系统

  • 发布者:新闻机构

  • 订阅者:各种新闻应用和用户

N-1 模式

本质:多个发送者将消息发送到一个接收者。接收者主动从多个发送者处拉取消息,而发送者被动推送消息。这种模式适用于任务收集或负载均衡。

举例

Pipeline(Push-Pull)模式:多个任务生成者(Push)将任务推送到一个任务处理器(Pull)。

场景:日志收集系统

  • 任务生成者:各个服务器的日志生成模块

  • 任务处理器:日志分析或存储系统

N-M 模式

本质:多个发送者将消息发送到多个接收者。每个发送者可以向多个接收者发送消息,多个接收者可以从多个发送者处接收消息。这种模式适用于复杂的广播、聚合或分布式任务处理。

举例

Pub-Sub(发布-订阅):多个发布者广播消息到多个订阅者。每个发布者可以广播不同的主题,订阅者可以订阅多个主题。

场景:实时数据流系统

  • 发布者:数据源(传感器、日志系统)

  • 订阅者:多个应用程序(监控仪表盘、分析工具)

1. Request-reply pattern(请求-回复模式)

REQ套接字

REQ套接字用于客户端向服务发送请求并接收回复。该套接字类型仅允许发送和随后的接收调用交替进行。REQ套接字可以连接到任意数量的REP或ROUTER套接字。每个发送的请求会在所有连接的服务之间进行轮询,每个接收到的回复与最后发出的请求匹配。它适用于简单的请求-回复模型,其中对失效对等方的可靠性不是问题。

如果没有服务可用,则套接字上的任何发送操作将阻塞,直到至少有一个服务可用。REQ套接字不会丢弃任何消息。

特性总结:

  • 兼容的对等套接字:REP, ROUTER

  • 方向:双向

  • 发送/接收模式:发送,接收,发送,接收,…

  • 出站路由策略:轮询

  • 入站路由策略:最后一个对等方

  • 静默状态下的动作:阻塞

REP套接字

REP套接字用于服务从客户端接收请求并发送回复。该套接字类型仅允许接收和随后的发送调用交替进行。每个接收到的请求从所有客户端中公平排队,每个发送的回复路由到发出最后请求的客户端。如果原始请求者不存在,则回复会被静默丢弃。

特性总结:

  • 兼容的对等套接字:REQ, DEALER

  • 方向:双向

  • 发送/接收模式:接收,发送,接收,发送,…

  • 出站路由策略:公平轮询

  • 入站路由策略:最后一个对等方

DEALER套接字

DEALER套接字类型与一组匿名对等方通信,使用轮询算法发送和接收消息。它是可靠的,因为它不会丢弃消息。DEALER作为REQ的异步替代,适用于与REP或ROUTER服务器通信的客户端。DEALER接收到的消息从所有连接的对等方中公平排队。

当DEALER套接字由于达到所有对等方的高水位标记而进入静默状态,或者如果根本没有对等方,则套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个对等方可用;消息不会被丢弃。

当DEALER套接字连接到REP套接字时,发送的消息必须包含一个空帧作为消息的第一部分(分隔符),后跟一个或多个正文部分。

特性总结:

  • 兼容的对等套接字:ROUTER, REP, DEALER

  • 方向:双向

  • 发送/接收模式:不受限制

  • 出站路由策略:轮询

  • 入站路由策略:公平排队

  • 静默状态下的动作:阻塞

ROUTER套接字

ROUTER套接字类型与一组对等方通信,使用显式地址,以便每个传出的消息发送到特定的对等连接。ROUTER作为REP的异步替代,通常用于与DEALER客户端通信的服务器。

当接收消息时,ROUTER套接字将在消息前添加一个包含原始对等方路由ID的消息部分,然后传递给应用程序。接收到的消息从所有连接的对等方中公平排队。当发送消息时,ROUTER套接字将移除消息的第一部分,并使用它来确定消息应路由到的对等方的路由ID。如果对等方不再存在,或者从未存在,则消息将被静默丢弃。

当ROUTER套接字由于达到所有对等方的高水位标记而进入静默状态时,发送到套接字的任何消息将被丢弃,直到静默状态结束。同样,任何路由到已达到其单个高水位标记的对等方的消息也将被丢弃。

当REQ套接字连接到ROUTER套接字时,除了原始对等方的路由ID,每个接收到的消息应包含一个空分隔符消息部分。因此,应用程序看到的每个接收消息的整个结构变为:一个或多个路由ID部分,分隔符部分,一个或多个正文部分。当发送回复给REQ套接字时,应用程序必须包含分隔符部分。

特性总结:

  • 兼容的对等套接字:DEALER, REQ, ROUTER

  • 方向:双向

  • 发送/接收模式:不受限制

  • 出站路由策略:见正文

  • 入站路由策略:公平排队

  • 静默状态下的动作:丢弃(见正文)

特性详细解释

REQ套接字

1.兼容的对等套接字

REP (Reply)ROUTER套接字与 REQ 套接字兼容。也就是说,REQ 套接字可以与 REP 套接字和 ROUTER 套接字进行通信。

  • REP:这是一个典型的服务端套接字,负责接收请求并发送响应。

  • ROUTER:这是一个更复杂的服务端套接字,能够处理多个客户端,并可以进行更复杂的路由操作。

2.方向

  • 双向:虽然 REQ 套接字只处理请求和响应的交替操作,但它的通信方向是双向的。即,客户端发送请求,服务端返回响应,整个过程是双向交互的。

3.发送/接收模式

  • 发送,接收,发送,接收:REQ 套接字的工作模式是交替的。每次发送请求后,必须等待响应才能发送下一个请求。也就是说,发送和接收操作是严格交替进行的,这保证了每个请求都有一个响应对应。

4.出站路由策略

  • 轮询:REQ 套接字会在所有连接的服务端之间轮询发送请求。假设客户端连接到多个服务端(如 REP 套接字),REQ 套接字会按照轮询的方式将请求发送给各个服务端。这有助于实现负载均衡。

5.入站路由策略

  • 最后一个对等方:每个发送的请求的响应必须来自于最后一个接收到请求的对等方。这意味着每个请求和其对应的响应是匹配的,并且每个请求只会从一个特定的服务端得到回应。

6.静默状态下的动作

  • 阻塞:如果 REQ 套接字没有任何服务端可用时,发送操作会阻塞。也就是说,客户端会等待,直到至少有一个服务端可以接受请求。在没有服务端的情况下,请求不会丢失,而是会被阻塞,等待服务端的出现。

2. Publish-subscribe pattern(发布-订阅模式)

发布-订阅模式用于将数据从单个发布者以扇出方式分发到多个订阅者。

发布-订阅模式由RFC 29/PUBSUB正式定义。

ZeroMQ通过四种套接字类型支持发布/订阅:

  • PUB 套接字类型

  • XPUB 套接字类型

  • SUB 套接字类型

  • XSUB 套接字类型

PUB 套接字

PUB套接字用于发布者分发数据。发送的消息以扇出方式分发给所有连接的对等方。此套接字类型不能接收任何消息。

当PUB套接字由于达到订阅者的高水位标记而进入静默状态时,将丢弃发送给该订阅者的消息,直到静默状态结束。发送功能永远不会阻塞此套接字类型。

特性总结:

  • 兼容的对等套接字:SUB, XSUB

  • 方向:单向

  • 发送/接收模式:仅发送

  • 入站路由策略:不适用

  • 出站路由策略:扇出

  • 静默状态下的动作:丢弃

SUB 套接字

SUB套接字用于订阅者订阅发布者分发的数据。最初SUB套接字不订阅任何消息。此套接字类型没有实现发送功能。

特性总结:

  • 兼容的对等套接字:PUB, XPUB

  • 方向:单向

  • 发送/接收模式:仅接收

  • 入站路由策略:公平排队

  • 出站路由策略:不适用

XPUB 套接字

与PUB套接字相同,不同之处在于可以以传入消息的形式接收来自对等方的订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也会被接收,但对订阅状态没有影响。

特性总结:

  • 兼容的对等套接字:ZMQ_SUB, ZMQ_XSUB

  • 方向:单向

  • 发送/接收模式:发送消息,接收订阅

  • 入站路由策略:不适用

  • 出站路由策略:扇出

  • 静默状态下的动作:丢弃

XSUB 套接字

与SUB套接字相同,不同之处在于通过向套接字发送订阅消息来订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也可以发送,但对订阅状态没有影响。

特性总结:

  • 兼容的对等套接字:ZMQ_PUB, ZMQ_XPUB

  • 方向:单向

  • 发送/接收模式:接收消息,发送订阅

  • 入站路由策略:公平排队

  • 出站路由策略:不适用

  • 静默状态下的动作:丢弃

3. Pipeline pattern(管道模式)

管道模式用于任务分发,通常在多阶段流水线中,其中一个或少数节点将工作推送给许多工作节点,然后它们再将结果推送给一个或少数收集节点。这个模式主要是可靠的,只要节点不意外断开连接,消息就不会被丢弃。它具有可扩展性,节点可以随时加入。

管道模式由RFC 30/PIPELINE正式定义。

ZeroMQ通过两种套接字类型支持管道模式:

  • PUSH 套接字类型

  • PULL 套接字类型

PUSH 套接字

PUSH套接字类型与一组匿名的PULL对等方通信,使用轮询算法发送消息。此套接字类型没有实现接收操作。

当PUSH套接字由于达到所有下游节点的高水位标记而进入静默状态,或者根本没有下游节点时,套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个下游节点可用进行发送;消息不会被丢弃。

特性总结:

  • 兼容的对等套接字:PULL

  • 方向:单向

  • 发送/接收模式:仅发送

  • 入站路由策略:不适用

  • 出站路由策略:轮询

  • 静默状态下的动作:阻塞

PULL 套接字

PULL套接字类型与一组匿名的PUSH对等方通信,使用公平排队算法接收消息。

此套接字类型没有实现发送操作。

特性总结:

  • 兼容的对等套接字:PUSH

  • 方向:单向

  • 发送/接收模式:仅接收

  • 入站路由策略:公平排队

  • 出站路由策略:不适用

  • 静默状态下的动作:阻塞

小结:在管道模式中,PUSH套接字负责将任务分发给多个PULL套接字,PULL套接字则负责接收这些任务。该模式通过轮询和公平排队算法确保任务的有效分配和处理,并且具备高可靠性和可扩展性,是实现任务分发和工作负载均衡的有效方式。

4. Exclusive pair pattern (独占 PAIR 模式)

独占对等模式

PAIR套接字不是通用套接字,而是用于特定用例,其中两个对等方在架构上是稳定的。这通常将PAIR限制在单个进程内,用于线程间通信。

独占对等模式由RFC 31/EXPAIR正式定义。

PAIR 套接字

PAIR套接字类型只能与一个对等方建立连接。消息在PAIR套接字上发送时不会进行路由或过滤。

当PAIR套接字由于达到连接对等方的高水位标记而进入静默状态,或者没有对等方连接时,套接字上的任何发送操作将阻塞,直到对等方变得可用进行发送;消息不会被丢弃。

尽管PAIR套接字可以通过除inproc之外的其他传输方式使用,但由于它们无法自动重连,并且在存在任何先前连接(包括处于关闭状态的连接)时,新入站连接将被终止,因此在大多数情况下,它们不适用于TCP。

小结:PAIR套接字专用于架构上稳定的两个对等方之间的通信,通常用于单个进程内的线程间通信。它只能与一个对等方连接,并且不支持消息路由或过滤。当达到高水位标记时,发送操作会阻塞,直到连接恢复。由于缺乏自动重连功能和处理新连接的限制,PAIR套接字不适合用于TCP连接。

二、请求-响应模式实现

请求响应模式是通信中最简单和基础的模式,ZeroMQ同样支持这个模式。

请求响应基础

请求-响应模式是计算机科学和网络通信中一种常见的通信模式。这个模式通常涉及两个主要角色:客户端和服务器。

基本概念

  1. 客户端: 向服务器发送请求的实体。它可以是浏览器、应用程序或任何其他发起通信的设备或程序。

  2. 服务器: 接收客户端请求并返回响应的实体。它通常是一个提供服务、资源或数据的程序或设备。

工作流程

  1. 客户端发起请求: 客户端构造一个请求消息,通常包含请求的类型(如 GET、POST)、请求的资源(如网页、API端点)、以及可能的附加数据(如表单数据)。

  2. 服务器处理请求: 服务器接收到请求后,解析请求内容,根据请求的类型和资源进行处理。处理可能包括访问数据库、执行计算或调用其他服务。

  3. 服务器返回响应: 处理完成后,服务器生成一个响应消息,通常包含状态码(如200表示成功、404表示资源未找到)、响应的内容(如网页内容、数据结果)以及其他信息(如响应时间、服务器信息)。

  4. 客户端接收响应: 客户端接收到响应后,解析响应内容并根据需要展示或处理这些数据。

典型应用

  • 网页浏览: 当你在浏览器中输入网址并按下回车时,浏览器(客户端)会向服务器发出一个请求,服务器会返回网页内容作为响应。

  • API调用: 在应用程序中调用API时,客户端发送请求(如获取数据、提交表单),服务器处理请求并返回结果。

请求-响应模式的特点

  • 同步通信: 通常情况下,请求-响应模式是同步的,即客户端发送请求后会等待服务器响应完成后才继续执行后续操作。

  • 单向通信: 这种模式是一种单向通信,客户端请求数据,服务器响应数据,但服务器不会主动向客户端发送消息(除非使用长轮询或WebSocket等技术)。

  • 简单直观: 由于其简单的结构和流程,很多网络协议和应用程序设计都是基于这种模式的。

应用实例

  • HTTP/HTTPS: 用于Web浏览和API交互的协议,客户端(浏览器或应用)发送HTTP请求,服务器返回HTTP响应。

  • REST API: 基于HTTP协议的API风格,允许客户端通过标准的HTTP请求(如GET、POST、PUT、DELETE)与服务器进行交互。

优点

  • 易于理解和实现: 请求-响应模式简洁明了,易于理解和实现。

  • 兼容性强: 许多网络协议和技术都基于这种模式,因此具有良好的兼容性。

缺点

  • 延迟问题: 在网络不稳定的情况下,请求和响应的延迟可能影响用户体验。

  • 同步阻塞: 客户端通常需要等待响应完成才能继续执行,可能导致性能瓶颈。

总的来说,请求-响应模式是现代计算和通信中的基础构建块,为各种网络应用和服务提供了一个标准化的通信方式。

ZEROMQ  C语言

在 C 语言中使用 ZeroMQ 实现请求-回复模式(Request-Reply Pattern)涉及创建一个请求端和一个回复端,通过 ZeroMQ 套接字进行通信。

服务器端代码(Reply Server)

下面是使用 C 语言编写的 ZeroMQ 请求-回复模式的示例代码。我们将分别实现一个服务器端(Reply Server)和一个客户端(Request Client)。

#include #include #include #include  int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new();  // 创建 REP (reply) 套接字 void *responder = zmq_socket(context, ZMQ_REP);  // 将套接字绑定到端口 zmq_bind(responder, "tcp://*:5555");  while (1) { // 接收请求 char buffer[256]; zmq_recv(responder, buffer, 255, 0); printf("Received request: %s\n", buffer);  // 发送回复 const char *reply = "World"; zmq_send(responder, reply, strlen(reply), 0); }  // 清理资源 zmq_close(responder); zmq_ctx_destroy(context);  return 0;}

客户端代码(Request Client)

#include #include #include  int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new();  // 创建 REQ (request) 套接字 void *requester = zmq_socket(context, ZMQ_REQ);  // 连接到服务器 zmq_connect(requester, "tcp://localhost:5555");  // 发送请求 const char *request = "Hello"; zmq_send(requester, request, strlen(request), 0);  // 接收回复 char buffer[256]; zmq_recv(requester, buffer, 255, 0); buffer[255] = '\0'; // 确保字符串以 null 结尾 printf("Received reply: %s\n", buffer);  // 清理资源 zmq_close(requester); zmq_ctx_destroy(context);  return 0;}

编译代码

编译上述代码时,需要链接 ZeroMQ 库。下面是使用 gcc 编译器的示例命令:

gcc -o server server.c -lzmqgcc -o client client.c -lzmq

详细说明

ZeroMQ Context: zmq_ctx_new() 用于创建 ZeroMQ 上下文,管理所有的套接字和连接。每个应用程序应该有一个上下文对象。

Sockets: 使用 zmq_socket() 创建套接字。请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。

Bind 和 Connect:

  • zmq_bind() 绑定服务器端套接字到指定的地址和端口。

  • zmq_connect() 连接客户端套接字到服务器端的地址和端口。

消息传递:

  • 使用 zmq_send() 发送消息。

  • 使用 zmq_recv() 接收消息。

资源清理:

  • zmq_close() 关闭套接字。

  • zmq_ctx_destroy() 销毁上下文,释放相关资源。

ZEROMQ  C++

安装 ZeroMQ C++ Bindings (cppzmq)

ZeroMQ 的 C++ 绑定库 cppzmq 提供了对 ZeroMQ 的 C++ 封装。你可以通过包管理工具或者从 GitHub 下载源代码来安装它。

使用 vcpkg

vcpkg install cppzmq

从 GitHub 安装

git clone https://github.com/zeromq/cppzmq.git

编写代码

下面是一个简单的示例,展示了如何在 C++ 中使用 ZeroMQ 实现请求-回复模式。我们将分别编写一个服务器(Reply Server)和一个客户端(Request Client)。

服务器端代码(Reply Server)

#include #include #include  int main() { // Initialize ZeroMQ context zmq::context_t context(1);  // Create a REP (reply) socket zmq::socket_t socket(context, ZMQ_REP);  // Bind the socket to an endpoint (address:port) socket.bind("tcp://*:5555");  while (true) { // Receive a request zmq::message_t request; socket.recv(request); std::string request_str(static_cast<char*>(request.data()), request.size()); std::cout << "Received request: " << request_str << std::endl;  // Send a reply std::string reply_str = "World"; zmq::message_t reply(reply_str.size()); memcpy(reply.data(), reply_str.data(), reply_str.size()); socket.send(reply); }  return 0;}

客户端代码(Request Client)

#include #include #include  int main() { // Initialize ZeroMQ context zmq::context_t context(1);  // Create a REQ (request) socket zmq::socket_t socket(context, ZMQ_REQ);  // Connect to the server (address:port) socket.connect("tcp://localhost:5555");  // Send a request std::string request_str = "Hello"; zmq::message_t request(request_str.size()); memcpy(request.data(), request_str.data(), request_str.size()); socket.send(request);  // Receive a reply zmq::message_t reply; socket.recv(reply); std::string reply_str(static_cast<char*>(reply.data()), reply.size()); std::cout << "Received reply: " << reply_str << std::endl;  return 0;}

解释

ZeroMQ Context: 这是 ZeroMQ 的基础对象,负责管理所有的套接字和连接。每个线程应该有一个唯一的上下文对象。

Sockets: ZeroMQ 的套接字对象用于发送和接收消息。在请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。

Bind 和 Connect: 服务器使用 bind 将套接字绑定到一个特定的地址和端口,客户端使用 connect 连接到该地址和端口。

消息传递: 使用 sendrecv 方法来传递消息。消息是通过 zmq::message_t 对象来表示的。

小结

使用 ZeroMQ 实现请求-响应模式可以带来显著的性能提升和灵活性。它不仅支持高性能的消息传递,还提供了丰富的特性,如自动重连、负载均衡、多语言支持等,使得它成为构建高性能、可靠的分布式系统和微服务架构的理想选择。

三、发布-订阅模式实现

发布订阅模式是典型的异步模式,通过ZeroMq来看看他的原理与实现。

简称 Pub-Sub,是一种消息传递模式。

允许发送者(发布者)和接收者(订阅者)之间解耦。

它广泛应用于消息队列、事件驱动系统和实时通知等场景。

基本原理

参与者

  • 发布者(Publisher):发送消息的实体。

  • 订阅者(Subscriber):接收消息的实体。

  • 消息代理(Message Broker):中介实体,负责接收发布者的消息并分发给相应的订阅者。

主题(Topic)

  • 消息根据主题分类,订阅者订阅一个或多个主题,发布者将消息发布到特定主题。

消息传递

  • 发布者将消息发送到消息代理,并指定消息的主题。

  • 消息代理根据主题将消息分发给所有订阅了该主题的订阅者。

工作流程

订阅(Subscribe)

  • 订阅者向消息代理注册自己对某个主题的兴趣。

  • 订阅者可以订阅多个主题。

发布(Publish)

  • 发布者向消息代理发送消息,并指定消息的主题。

分发(Distribute)

  • 消息代理接收到消息后,根据主题查找所有订阅了该主题的订阅者。

  • 消息代理将消息分发给所有符合条件的订阅者。

举例说明

假设有一个天气预报系统:

  • 发布者:天气预报服务

  • 订阅者:用户手机应用、网页应用等

  • 主题:不同城市的天气(如“北京天气”、“上海天气”)

  1. 用户 A 通过手机应用订阅了“北京天气”主题。

  2. 用户 B 通过网页应用订阅了“上海天气”主题。

  3. 天气预报服务发布了一条“北京天气”的消息到消息代理。

  4. 消息代理接收到消息后,将其分发给所有订阅了“北京天气”主题的用户应用(如用户 A 的手机应用)。

优点

  • 解耦:发布者和订阅者不直接交互,彼此独立。

  • 扩展性:可以方便地增加或减少订阅者,不影响发布者。

  • 灵活性:可以动态改变订阅关系,适应不同需求。

缺点

  • 复杂性:需要维护消息代理和订阅关系,增加系统复杂性。

  • 可靠性:消息传递的可靠性需要额外保障,如消息丢失和重复的问题。

C代码实现天气预报系统

#include #include #include #include #include #include  const char* cities[] = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const char* weather_conditions[] = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息void generate_weather_update(char* buffer, size_t buffer_size, const char* city) { int temp = rand() % 35; // 随机温度 const char* condition = weather_conditions[rand() % 10]; // 随机天气情况 snprintf(buffer, buffer_size, "%s天气 %s %d°C", city, condition, temp);} // 发布天气更新消息DWORD WINAPI publish_weather_update(LPVOID arg) { void* context = arg; const char* address = "tcp://*:5556";  // 创建发布者套接字 void* publisher = zmq_socket(context, ZMQ_PUB); zmq_bind(publisher, address);  srand((unsigned)time(NULL));  while (1) { // 随机选择城市并生成天气更新 for (int i = 0; i < 10; ++i) { char update[256]; generate_weather_update(update, sizeof(update), cities[i]); zmq_send(publisher, update, strlen(update), 0); printf("发布消息: %s\n", update); }  // 模拟发布间隔 Sleep(10000); // Windows 中使用 Sleep }  // 关闭套接字 zmq_close(publisher); return 0;} // 订阅天气更新消息DWORD WINAPI subscribe_weather_updates(LPVOID arg) { void* context = zmq_ctx_new(); const char* city = (char*)arg; const char* address = "tcp://localhost:5556";  // 创建订阅者套接字 void* subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, address); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, city, strlen(city));  while (1) { // 接收消息 char update[256]; int size = zmq_recv(subscriber, update, 255, 0); if (size != -1) { update[size] = '\0'; printf("接收消息 (%s): %s\n", city, update); } }  // 关闭套接字 zmq_close(subscriber); zmq_ctx_destroy(context); return 0;} int main() { void* context = zmq_ctx_new();  // 创建发布者线程 HANDLE publisher_thread = CreateThread(NULL, 0, publish_weather_update, context, 0, NULL);  // 创建10个订阅者线程 HANDLE subscriber_threads[10]; for (int i = 0; i < 10; ++i) { subscriber_threads[i] = CreateThread(NULL, 0, subscribe_weather_updates, (LPVOID)cities[i], 0, NULL); }  // 等待线程完成 WaitForSingleObject(publisher_thread, INFINITE); for (int i = 0; i < 10; ++i) { WaitForSingleObject(subscriber_threads[i], INFINITE); CloseHandle(subscriber_threads[i]); }  // 关闭线程句柄 CloseHandle(publisher_thread);  zmq_ctx_destroy(context); return 0;}

说明

  1. 发布者线程publish_weather_update 函数在一个独立线程中运行,发布10个城市的天气预报消息,天气信息随机变化。

  2. 订阅者线程subscribe_weather_updates 函数在10个独立线程中运行,每个线程订阅一个特定城市的天气预报消息。

  3. 主程序:在主程序中创建并启动发布者和10个订阅者线程,然后等待它们完成。

C++代码

#include #include #include #include #include #include #include #include  const std::vector<std::string> cities = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const std::vector<std::string> weather_conditions = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息std::string generate_weather_update(const std::string& city) { int temp = rand() % 35; // 随机温度 const std::string& condition = weather_conditions[rand() % weather_conditions.size()]; // 随机天气情况 return city + "天气 " + condition + " " + std::to_string(temp) + "°C";} // 发布天气更新消息void publish_weather_update(zmq::context_t& context) { zmq::socket_t publisher(context, ZMQ_PUB); publisher.bind("tcp://*:5556");  srand(static_cast<unsigned>(time(nullptr)));  while (true) { for (const auto& city : cities) { std::string update = generate_weather_update(city); zmq::message_t message(update.begin(), update.end()); publisher.send(message, zmq::send_flags::none); std::cout << "发布消息: " << update << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1)); }} // 订阅天气更新消息void subscribe_weather_updates(const std::string& city) { zmq::context_t context(1); zmq::socket_t subscriber(context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); subscriber.set(zmq::sockopt::subscribe, city);  while (true) { zmq::message_t message; subscriber.recv(message, zmq::recv_flags::none); std::string update(static_cast<char*>(message.data()), message.size()); std::cout << "接收消息 (" << city << "): " << update << std::endl; }} int main() { zmq::context_t context(1);  // 创建发布者线程 std::thread publisher_thread(publish_weather_update, std::ref(context));  // 创建10个订阅者线程 std::vector<std::thread> subscriber_threads; for (const auto& city : cities) { subscriber_threads.emplace_back(subscribe_weather_updates, city); }  // 等待线程完成 publisher_thread.join(); for (auto& thread : subscriber_threads) { thread.join(); }  return 0;}

其他场景举例

1. 金融市场数据分发

应用场景:

  • 股票市场:股票价格、交易量等市场数据的实时分发。

  • 外汇市场:汇率变动、交易量等数据的实时更新。

详细举例:

  • BloombergReuters 等金融信息服务商使用发布-订阅模式来分发实时的市场数据给订阅者,这些订阅者可能是金融机构、投资者等。数据包括股票报价、外汇汇率、商品价格等。

  • 交易平台:交易所使用发布-订阅系统向交易参与者推送交易数据和市场信息。

2. 日志和监控系统

应用场景:

  • 服务器日志:实时收集和分析分布式系统的日志数据。

  • 系统监控:监控系统健康状态、资源使用等。

详细举例:

  • Elasticsearch, Logstash, Kibana (ELK) Stack:使用发布-订阅模式在分布式系统中收集和分析日志数据。Logstash 作为数据收集器,可以接收来自不同源的日志并发布到 Elasticsearch,Kibana 订阅这些日志并进行可视化展示。

  • Prometheus 和 Grafana:Prometheus 收集系统和应用的监控数据,Grafana 订阅这些数据并生成实时的监控仪表板。

3. 社交媒体和消息通知

应用场景:

  • 消息推送:向用户实时推送消息、通知。

  • 活动流:发布和订阅用户活动、帖子、评论等。

详细举例:

  • FacebookTwitter 等社交媒体平台使用发布-订阅模式来处理用户的状态更新、评论、点赞等事件。用户可以订阅朋友或关注的人的动态。

  • 即时通讯应用:如 SlackWhatsApp,用户接收消息的过程是通过发布-订阅模式实现的,确保消息的即时性和可靠性。

4. 物联网(IoT)

应用场景:

  • 设备状态监控:实时监控和控制物联网设备。

  • 传感器数据收集:从传感器收集数据并实时处理。

详细举例:

  • 智能家居:智能家居系统中的传感器(如温度、湿度传感器)使用发布-订阅模式将数据发送到中央控制系统,用户可以通过应用程序订阅这些数据并进行监控和控制。

  • 工业物联网:在工业环境中,机器和设备通过发布-订阅模式报告运行状态和故障信息,管理系统订阅这些信息进行实时监控和预警。

5. 分布式系统和微服务架构

应用场景:

  • 服务通信:微服务之间的通信和协调。

  • 事件驱动架构:基于事件的系统设计。

详细举例:

  • Apache Kafka:广泛用于构建分布式系统和微服务架构中的事件流处理。不同的微服务可以发布和订阅事件,确保系统的松耦合和高扩展性。

  • Amazon Web Services (AWS) SNS:Simple Notification Service (SNS) 是一个托管的发布-订阅服务,用于将消息从一个应用程序发送到多个订阅者,常用于触发基于事件的处理流程。

6. 在线游戏

应用场景:

  • 游戏状态更新:实时同步游戏状态和玩家操作。

  • 聊天系统:游戏内聊天消息的实时分发。

详细举例:

  • 多人在线游戏(MMO):如《魔兽世界》,使用发布-订阅模式在服务器和客户端之间同步游戏状态和玩家操作。游戏服务器发布玩家动作和状态,其他玩家的客户端订阅这些消息以保持同步。

  • 在线棋牌类游戏:如《炉石传说》,游戏服务器发布牌局状态和玩家操作,玩家客户端订阅这些消息以实时更新游戏界面。

7. 新闻和内容分发

应用场景:

  • 新闻推送:实时向用户推送新闻和内容更新。

  • 订阅服务:用户订阅特定主题或作者的内容更新。

详细举例:

  • RSS Feeds:使用发布-订阅模式向用户提供新闻和博客更新。用户订阅感兴趣的内容源,新的内容发布时会自动推送到用户的阅读器。

  • 内容聚合平台:如 FlipboardFeedly,用户订阅不同的内容源,平台通过发布-订阅模式实时获取和推送内容更新。

这些实际应用展示了发布-订阅模式在各种领域中的广泛应用,利用这一模式可以实现系统的解耦、扩展和实时性。

四、管道模式实现

ZeroMQ 的管道模式通过简单而高效的PUSHPULL套接字实现了负载均衡和任务分发,适用于分布式任务处理和数据处理流水线等场景。其自动负载均衡和高可扩展性使得它成为构建高效、可扩展分布式系统的理想选择。

基础原理介绍

ZeroMQ 的管道模式是一种特殊的消息传递模式,通常用于将任务或消息从一个生产者传递到多个消费者。其主要特性是:

  • 消息一对多传递:单个生产者可以将消息发送到多个消费者。

  • 负载均衡:消息在消费者之间进行负载均衡,以确保每个消费者都能接收到均衡的消息量。

  • 下游消费者处理:消费者处理消息,并且可以将处理后的消息传递给下游消费者,形成消息处理链。

在 ZeroMQ 中,管道模式由PUSHPULL套接字类型实现:

  • PUSH 套接字:生产者使用 PUSH 套接字发送消息。消息按照负载均衡的方式发送到连接的 PULL 套接字。

  • PULL 套接字:消费者使用 PULL 套接字接收消息。每个 PULL 套接字接收到的消息数量大致相等。

内部原理

PUSH 套接字

  • 特性:PUSH 套接字负责将消息发送给一个或多个连接的 PULL 套接字。

  • 消息分发:当有多个 PULL 套接字连接到一个 PUSH 套接字时,PUSH 套接字会按照循环(round-robin)的方式将消息分发给每个 PULL 套接字。这意味着每个消息都会发送给下一个连接的 PULL 套接字,直到轮回到第一个 PULL 套接字。

PULL 套接字

  • 特性PULL 套接字负责接收从 PUSH 套接字发送来的消息。

  • 消息接收:每个 PULL 套接字只会接收发送给它的消息,不会与其他 PULL 套接字共享消息。

实现负载均衡的方法

1.循环分发(Round-Robin Dispatching)

  • PUSH 套接字在有多个 PULL 套接字连接时,按照循环分发的方式将消息逐个发送给每个 PULL 套接字。

  • 这种方式确保了所有连接的 PULL 套接字能够接收到大致相同数量的消息,从而实现了负载均衡。

2.消息队列

  • 每个 PULL 套接字维护一个接收队列,PUSH 套接字发送的消息会被分发到这些接收队列中。

  • PULL 套接字从其接收队列中取出消息进行处理,避免了消息丢失或重复处理的情况。

Round-Robin 原理

Round-robin 是一种简单而常用的调度算法,广泛应用于任务调度、资源分配和网络通信中。在 round-robin 调度中,资源(如 CPU 时间、网络带宽或消息)按顺序分配给各个请求者,循环地进行,确保每个请求者都能公平地获得资源。

Round-Robin 在 ZeroMQ 的应用

在 ZeroMQ 中,PUSH 套接字使用 round-robin 算法将消息分发给多个连接的 PULL 套接字。具体工作原理如下:

1.初始化

  • 当一个 PUSH 套接字被创建并绑定到一个端口时,它开始监听来自 PULL 套接字的连接。

  • PULL 套接字连接到 PUSH 套接字时,PUSH 套接字维护一个内部队列,记录所有连接的 PULL 套接字。

2.消息分发

  • 每当 PUSH 套接字发送一条消息时,它会按照 round-robin 的顺序选择下一个 PULL 套接字,将消息发送给它。

  • 这种分发方式确保每个连接的 PULL 套接字接收到的消息数量大致相同。

3.循环

  • 当所有的 PULL 套接字都接收过一次消息后,PUSH 套接字重新开始,从第一个 PULL 套接字继续发送消息。

Round-Robin 调度算法的优点

  1. 公平性:Round-robin 算法确保每个连接的 PULL 套接字都能公平地接收到消息,没有任何一个 PULL 套接字会被饿死。

  2. 简单性:该算法非常简单,不需要复杂的计算和维护,只需要一个循环计数器即可实现。

  3. 负载均衡:由于消息均匀地分布到各个 PULL 套接字,系统能够实现负载均衡,防止某个 PULL 套接字过载。

Round-robin算法:

#include  #define WORKER_COUNT 5#define TASK_COUNT 20 int main() { int workers[WORKER_COUNT] = {0}; // 存储每个工作者处理的任务数量 int tasks[TASK_COUNT];  // 初始化任务 for (int i = 0; i < TASK_COUNT; i++) { tasks[i] = i + 1; // 每个任务用一个整数表示 }  // Round-Robin 分配任务 for (int i = 0; i < TASK_COUNT; i++) { int worker_id = i % WORKER_COUNT; // 使用模运算实现循环分配 workers[worker_id]++; printf("任务 %d 分配给工作者 %d\n", tasks[i], worker_id); }  // 输出每个工作者处理的任务数量 for (int i = 0; i < WORKER_COUNT; i++) { printf("工作者 %d 处理了 %d 个任务\n", i, workers[i]); }  return 0;}

主要用途

管道模式在以下场景中非常有用:

  1. 任务分发:将任务从生产者分发到多个工作者进行处理。例如,分布式计算中的任务调度。

  2. 负载均衡:在多个工作者之间平衡负载,确保每个工作者处理的任务量大致相同。

  3. 数据处理流水线:将数据流从一个阶段传递到下一个阶段,每个阶段由不同的消费者处理。

优势

  1. 简单且高效:管道模式通过简单的 PUSHPULL 套接字实现高效的消息传递和负载均衡。

  2. 自动负载均衡:消息在多个消费者之间自动均衡分配,无需手动干预。

  3. 高可扩展性:可以轻松增加或减少消费者,调整系统的处理能力。

  4. 灵活性:可以形成复杂的消息处理流水线,适用于多种分布式任务处理场景。

实际场景

1.ELK Stack 中的 Logstash

  • 用途:集中收集和处理分布式系统的日志。

  • 工作原理:Logstash 从不同来源接收日志数据,处理后推送到 Elasticsearch。通过管道模式,多个 Logstash 实例可以同时处理日志数据,并将处理后的数据均衡分发到 Elasticsearch 节点。

2.视频处理流水线

  • 用途:在视频直播或视频编辑场景中,视频帧需要经过多个处理阶段,如解码、特效处理、编码等。

  • 工作原理:视频帧通过管道模式从一个处理单元推送到下一个处理单元。每个处理单元可以并行处理不同的视频帧,最终生成处理后的视频流。

3.分布式交易系统

  • 用途:在金融市场中,实现高频交易和低延迟的数据传输。

  • 工作原理:交易指令和市场数据通过管道模式在交易系统的各个节点间传递,确保数据的快速分发和处理。每个交易节点可以并行处理接收到的数据,优化交易性能和响应速度。

与发布订阅的区别

发布-订阅模式

基础原理

  • 参与者:发布者(Publisher)和订阅者(Subscriber)。

  • 主题:消息通过主题(Topic)进行分类,订阅者可以选择感兴趣的主题。

  • 消息分发:发布者将消息发布到特定主题,消息代理将消息分发给所有订阅了该主题的订阅者。

主要特点

  • 多对多:一个发布者可以有多个订阅者,多个发布者也可以有多个订阅者。

  • 去耦合:发布者和订阅者彼此之间是解耦的,通过消息代理进行通信。

  • 灵活的订阅机制:订阅者可以动态订阅或取消订阅主题。

应用场景

  • 通知系统:如新闻推送、股票行情更新。

  • 事件驱动架构:系统组件之间通过事件进行松耦合通信。

示例

  • 消息队列系统:如 Apache Kafka、RabbitMQ 等。

  • 实时通知系统:如 Slack、Twitter 等。

管道模式

基础原理

  • 参与者:推送者(PUSH)和拉取者(PULL)。

  • 消息流动:消息从推送者流向拉取者。

  • 负载均衡:消息在拉取者之间进行负载均衡,确保每个拉取者接收到的消息量大致相同。

主要特点

  • 一对多:一个推送者可以有多个拉取者,但一个消息只能被一个拉取者处理。

  • 负载均衡:自动将消息均衡地分发给多个拉取者。

  • 顺序处理:消息按照推送的顺序被处理,适用于流水线处理。

应用场景

  • 任务分发系统:将任务分发给多个工作者进行处理。

  • 流水线处理:如视频处理、数据处理流水线。

示例

  • 分布式任务调度系统:如高性能计算任务调度。

  • 日志处理系统:如 ELK Stack 中的 Logstash。

主要区别

1.消息分发机制

  • 发布-订阅模式:一个消息可以被多个订阅者接收。消息通过主题进行分类,订阅者可以根据兴趣选择订阅特定主题。

  • 管道模式:一个消息只能被一个拉取者接收。消息从推送者传递到拉取者,并在拉取者之间进行负载均衡。

2.使用场景

  • 发布-订阅模式:适用于通知系统、事件驱动架构等需要广播消息的场景。

  • 管道模式:适用于任务分发、流水线处理等需要消息均衡处理的场景。

3.参与者关系

  • 发布-订阅模式:发布者和订阅者之间没有直接联系,通过消息代理实现解耦。

  • 管道模式:推送者和拉取者之间有直接联系,消息直接从推送者传递到拉取者。

为什么会觉得相似

两者都是消息传递模式,解决的是分布式系统中的通信问题。它们的相似之处在于:

  • 都是基于消息的异步通信:两者都通过消息实现参与者之间的异步通信,解耦发送方和接收方。

  • 都可以实现多对多的消息传递:尽管实现方式不同,发布-订阅模式通过主题,管道模式通过多个拉取者,都可以实现消息的多对多传递。

然而,它们在消息分发的具体机制和应用场景上有显著区别,选择哪种模式取决于具体的应用需求。

任务发放C代码

#include #include #include #include #include #include  #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量int task_count[WORKER_COUNT] = { 0 };HANDLE mutex;volatile int keep_running = 1; // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数DWORD WINAPI producer_thread(LPVOID arg) { void* context = zmq_ctx_new(); void* producer = zmq_socket(context, ZMQ_PUSH); zmq_bind(producer, "tcp://*:5557");  srand((unsigned int)time(NULL));  for (int i = 0; i < TASK_COUNT; ++i) { char message[10]; int workload = rand() % 100; // 随机生成任务负载 snprintf(message, sizeof(message), "%d", workload); zmq_send(producer, message, strlen(message), 0); printf("发送任务: %s\n", message); Sleep(10); // 模拟任务生成间隔 }  zmq_close(producer); zmq_ctx_destroy(context); keep_running = 0; // 停止任务处理者线程 return 0;} // 任务处理者(消费者)函数DWORD WINAPI consumer_thread(LPVOID arg) { int worker_id = (int)arg; void* context = zmq_ctx_new(); void* consumer = zmq_socket(context, ZMQ_PULL); zmq_connect(consumer, "tcp://localhost:5557");  while (keep_running || zmq_recv(consumer, NULL, 0, ZMQ_DONTWAIT) != -1) { char message[10]; int size = zmq_recv(consumer, message, sizeof(message) - 1, ZMQ_DONTWAIT); if (size != -1) { message[size] = '\0'; int workload = atoi(message); printf("Worker %d 接收任务: %s\n", worker_id, message); Sleep(workload); // 模拟处理任务  // 更新任务计数器 WaitForSingleObject(mutex, INFINITE); task_count[worker_id]++; ReleaseMutex(mutex); } }  zmq_close(consumer); zmq_ctx_destroy(context); return 0;} int main() { mutex = CreateMutex(NULL, FALSE, NULL);  // 创建任务发布者线程 HANDLE producer = CreateThread(NULL, 0, producer_thread, NULL, 0, NULL);  // 创建 5 个任务处理者线程 HANDLE consumers[WORKER_COUNT]; for (int i = 0; i < WORKER_COUNT; ++i) { consumers[i] = CreateThread(NULL, 0, consumer_thread, (LPVOID)i, 0, NULL); }  // 等待任务发布者线程完成 WaitForSingleObject(producer, INFINITE); CloseHandle(producer);  // 等待所有任务处理者线程完成 WaitForMultipleObjects(WORKER_COUNT, consumers, TRUE, INFINITE); for (int i = 0; i < WORKER_COUNT; ++i) { CloseHandle(consumers[i]); }   printf("#########################################################################"); // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { printf("Worker %d 处理了 %d 个任务\n", i, task_count[i]); }  CloseHandle(mutex);  return 0;}

代码说明

1.全局变量

  • task_count[WORKER_COUNT]:用于存储每个消费者处理的任务数量。

  • mutex:用于保护对 task_count 数组的访问,确保线程安全。

  • volatile int keep_running:用于控制任务处理者线程的运行。

2.任务发布者线程

  • 创建一个 ZeroMQ 上下文和一个 PUSH 套接字。

  • 绑定 PUSH 套接字到 TCP 端口 5557。

  • 生成 1000个随机任务,并通过 PUSH 套接字发送这些任务。

  • 在完成任务发送后,设置 keep_running 为 0,通知任务处理者线程可以停止。

3.任务处理者线程

  • 创建一个 ZeroMQ 上下文和一个 PULL 套接字。

  • 连接 PULL 套接字到任务发布者的地址(TCP 端口 5557)。

  • 进入无限循环,接收任务消息并模拟处理这些任务。

  • 处理完任务后更新任务计数器,并打印当前处理者处理的任务总数。

  • 当 keep_running 为 0 并且没有待处理的消息时,退出循环。

4.主程序

  • 创建一个任务发布者线程。

  • 创建 5 个任务处理者线程。

  • 等待任务发布者线程完成后关闭其句柄。

  • 等待所有任务处理者线程完成后关闭各自的句柄。

  • 输出每个处理者处理的任务个数。

该程序将启动一个任务发布者线程和 100个任务处理者线程,发布者每隔 10 毫秒发送一个任务,任务处理者实时接收并处理这些任务。每个任务处理者线程独立运行,并且可以同时处理不同的任务,从而实现了任务的并行处理和负载均衡。程序结束时,将输出每个处理者处理的任务个数,从而可以清晰地展示负载均衡的效果。

任务发放C++

#include #include #include #include #include #include #include #include #include  #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量std::vector<int> task_count(WORKER_COUNT, 0);std::mutex mtx;std::atomic<bool> keep_running(true); // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数void producer_thread() { zmq::context_t context(1); zmq::socket_t producer(context, ZMQ_PUSH); producer.bind("tcp://*:5557");  srand(static_cast<unsigned int>(time(NULL)));  for (int i = 0; i < TASK_COUNT; ++i) { int workload = rand() % 100; // 随机生成任务负载 std::string message = std::to_string(workload); zmq::message_t zmq_message(message.data(), message.size()); producer.send(zmq_message, zmq::send_flags::none); std::cout << "发送任务: " << message << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟任务生成间隔 }  keep_running = false; // 停止任务处理者线程} // 任务处理者(消费者)函数void consumer_thread(int worker_id) { zmq::context_t context(1); zmq::socket_t consumer(context, ZMQ_PULL); consumer.connect("tcp://localhost:5557");  while (keep_running) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (result) { std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务  // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 短暂休眠以避免空转 } }  // 当 keep_running 为 false 后,确保处理完所有剩余消息 while (true) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (!result) { break; // 无消息可接收时退出 } std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务  // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } }} int main() { // 创建任务发布者线程 std::thread producer(producer_thread);  // 创建 20 个任务处理者线程 std::vector<std::thread> consumers; for (int i = 0; i < WORKER_COUNT; ++i) { consumers.emplace_back(consumer_thread, i); }  // 等待任务发布者线程完成 producer.join();  // 等待所有任务处理者线程完成 for (auto& consumer : consumers) { consumer.join(); }  std::cout << "#########################################################################" << std::endl; // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { std::cout << "Worker " << i << " 处理了 " << task_count[i] << " 个任务" << std::endl; }  return 0;}

五、独占模式实现

独占PAIR模式是一种设计方法,旨在确保在并发环境中,两个线程或资源对在同一时间内独占地操作共享资源,从而避免资源竞争、数据不一致、死锁等问题。这种模式在高并发环境中显得尤为重要,特别是在需要高可靠性和稳定性的系统中,如金融交易系统、物流管理系统、在线购物平台等。

首先,本节通过一个使用Mutex实现的独占PAIR模式的示例,展示了如何通过互斥锁和条件变量来确保两个线程能够协同工作,共同对共享资源进行操作。这个示例简单而直观地解释了独占PAIR模式的基本概念和工作机制。

接下来,深入探讨了ZeroMQ在进程内的独占模式实现,特别是通过PAIR套接字模式,展示了如何在同一进程的不同线程之间建立高效、安全的双向通信通道。ZeroMQ的inproc传输方式为线程间的消息传递提供了一个高效、线程安全的解决方案,避免了传统线程同步机制的复杂性。

在实际应用方面,列举了多个场景,如金融交易系统中的交易撮合、物流系统中的任务分配、在线购物平台的订单处理、社交媒体平台的消息传递,以及微服务架构中的分布式锁管理。这些场景中的每一个都强调了独占PAIR模式的重要性,特别是在保证数据一致性和系统稳定性方面。

随后,提供了一个基于ZeroMQ的金融交易撮合系统的示例,通过C语言和C++语言的实现展示了如何利用ZeroMQ的消息队列和独占模式确保交易撮合的准确性和独占性。该示例不仅展示了基本的撮合操作,还引入了并发撮合、持久化订单数据和复杂的撮合逻辑(如部分成交和优先级撮合)等高级功能,进一步提高了系统的可扩展性和可靠性。

最后,总结了ZeroMQ独占模式的价值,特别是在高并发环境中的应用。通过限制资源的并发访问,ZeroMQ独占模式有效地避免了并发编程中的常见问题,如数据竞争和死锁,保证了系统在极端情况下的稳定性和一致性。ZeroMQ提供了多种套接字模式和线程同步机制,支持独占模式的实现,使得它在复杂的分布式系统中具有广泛的应用前景。

本节有力地展示了ZeroMQ在并发编程中的强大功能,并为如何在实际项目中应用独占PAIR模式提供了详细的指导。

基础介绍

Exclusive Pair Pattern,中文翻译为“独占PAIR模式”,是一种设计模式或技术,用于在并发编程或资源管理中确保两个资源、线程或实体在同一时间段内只能由唯一的一对(PAIR)对象使用或操作。这种模式通常用于避免资源竞争、死锁或不一致的数据状态。

关键概念:

  1. 独占性(Exclusivity): 在独占PAIR模式中,一个资源或对象在某个时刻只能被唯一的一对对象(PAIR)访问或操作,防止其他对象同时访问。

  2. PAIR: 这里的PAIR指的是成对的两个对象或线程,它们一起对某个资源或任务进行操作,必须协同工作。在给定时间内,只有这一对对象可以对资源进行操作。

  3. 并发安全: 该模式旨在确保系统在并发情况下的安全性和稳定性,防止资源争用或数据不一致。

示例:使用Mutex实现独占PAIR模式

这个例子模拟了两个线程(Thread A和Thread B)成对地访问和操作共享资源。

#include #include #include  // 共享资源int shared_resource = 0; // 互斥锁,用于保护共享资源pthread_mutex_t resource_mutex; // 条件变量,用于实现PAIR的协调pthread_cond_t pair_condition;int pair_ready = 0; // 操作共享资源的函数void* thread_function(void* arg) { int thread_id = *(int*)arg;  // 加锁,确保独占访问资源 pthread_mutex_lock(&resource_mutex);  // 等待另一个线程准备好(模拟PAIR模式) while (pair_ready == 0) { printf("Thread %d is waiting for its pair...\n", thread_id); pthread_cond_wait(&pair_condition, &resource_mutex); }  // 执行资源操作 printf("Thread %d is working with its pair...\n", thread_id); shared_resource += thread_id; // 模拟操作 printf("Thread %d updated shared resource to %d\n", thread_id, shared_resource);  // 解除PAIR状态 pair_ready = 0; pthread_cond_signal(&pair_condition);  // 解锁 pthread_mutex_unlock(&resource_mutex);  return NULL;} int main() { pthread_t thread_a, thread_b; int thread_id_a = 1; int thread_id_b = 2;  // 初始化互斥锁和条件变量 pthread_mutex_init(&resource_mutex, NULL); pthread_cond_init(&pair_condition, NULL);  // 创建线程 pthread_create(&thread_a, NULL, thread_function, &thread_id_a); pthread_create(&thread_b, NULL, thread_function, &thread_id_b);  // 模拟PAIR的准备状态 pthread_mutex_lock(&resource_mutex); pair_ready = 1; pthread_cond_broadcast(&pair_condition); pthread_mutex_unlock(&resource_mutex);  // 等待线程完成 pthread_join(thread_a, NULL); pthread_join(thread_b, NULL);  // 销毁互斥锁和条件变量 pthread_mutex_destroy(&resource_mutex); pthread_cond_destroy(&pair_condition);  return 0;}

代码解析

1.共享资源(shared_resource):这是线程操作的资源,在该例子中,它只是一个简单的整数。

2.互斥锁(pthread_mutex_t):用于保护共享资源,确保只有一对线程能够同时访问和操作资源。

3.条件变量(pthread_cond_t):用于线程之间的协调,确保在某个线程准备好之前,另一个线程必须等待。这模拟了PAIR之间的协同工作。

4.线程函数(thread_function)

  • 线程首先锁定互斥锁,确保对共享资源的独占访问。

  • 线程会检查PAIR是否准备好,如果没有准备好,它将等待(pthread_cond_wait),直到PAIR准备完毕。

  • 一旦PAIR准备好,线程对共享资源进行操作。

  • 操作完成后,解除PAIR状态,允许下一个PAIR操作资源。

5.主线程

  • 主线程创建了两个子线程(Thread A和Thread B)。

  • 主线程通过设置pair_ready为1并使用pthread_cond_broadcast,模拟了PAIR的准备状态。

  • 主线程等待子线程完成操作,然后清理互斥锁和条件变量。

ZeroMq 进程内独占

#include       // 包含 ZeroMQ 库头文件,用于消息队列通信#include   // 包含 Windows API 头文件,用于线程管理#include     // 包含标准输入输出库头文件#include    // 包含字符串操作库头文件#include    // 包含 locale.h 头文件,用于设置区域语言环境 // 线程 1 的函数DWORD WINAPI thread1(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_bind(socket, "inproc://pair1"); // 绑定套接字到 "inproc://pair1" 地址  char buffer[256]; // 用于接收消息的缓冲区 while (1) { memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 2 的消息 printf("Thread 1 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒  const char* msg = "Message from Thread 1"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 2,+1 包括结束符 '\0' }  zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} // 线程 2 的函数DWORD WINAPI thread2(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_connect(socket, "inproc://pair1"); // 连接到 "inproc://pair1" 地址,链接线程 1 的套接字  char buffer[256]; // 用于接收消息的缓冲区 while (1) { const char* msg = "Message from Thread 2"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 1,+1 包括结束符 '\0'  memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 1 的消息 printf("Thread 2 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 }  zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} int main() { setlocale(LC_ALL, ""); // 设置区域语言环境  void* context = zmq_ctx_new(); // 创建一个新的 ZeroMQ 上下文,用于管理套接字和它们之间的通信  HANDLE t1, t2; // 定义两个线程句柄变量  t1 = CreateThread(NULL, 0, thread1, context, 0, NULL); // 创建线程 1,并启动执行 thread1 函数 t2 = CreateThread(NULL, 0, thread2, context, 0, NULL); // 创建线程 2,并启动执行 thread2 函数  WaitForSingleObject(t1, INFINITE); // 等待线程 1 结束(在这个例子中,线程实际上不会结束) WaitForSingleObject(t2, INFINITE); // 等待线程 2 结束(在这个例子中,线程实际上不会结束)  zmq_ctx_destroy(context); // 销毁 ZeroMQ 上下文,清理资源 return 0;}

代码解析

1.头文件包含

  • zmq.h: 包含 ZeroMQ 库的函数和数据结构声明,用于跨线程或跨进程的消息队列通信。

  • windows.h: 包含 Windows API 函数的声明,用于线程管理、睡眠等操作。

  • stdio.h: 提供标准输入输出功能,如 printf

  • string.h: 提供字符串操作功能,如 strlenmemcpy

  • unistd.h: 用于包含 POSIX 标准库函数(如 sleep),但是在 Windows 下可以替换为 Sleep 函数。

2.线程 1 和 线程 2 函数

  • 两个线程分别创建一个 PAIR 套接字,线程 1 绑定到 inproc://pair1 地址,线程 2 连接到 inproc://pair1

  • PAIR 套接字允许线程 1 和线程 2 之间进行双向通信。

  • 在无限循环中,两个线程轮流发送和接收消息,并将接收到的消息打印到控制台。

3.主程序

  • 创建一个 ZeroMQ 上下文,用于管理套接字和它们之间的通信。

  • 使用 Windows API CreateThread 创建两个线程,每个线程分别执行 thread1 和 thread2 函数。

  • 使用 WaitForSingleObject 函数等待两个线程的完成。在本例中,由于线程处于无限循环中,程序实际上会一直运行。

inproc://pair1 的解释

inproc://pair1 是什么

  • inproc 是 ZeroMQ 的一种传输方式,用于在同一进程内的线程之间通信。inproc://pair1 是一个地址标识符,pair1 是这个地址的标记(可以任意命名)。

  • 在 zmq_bind 函数中,线程 1 的 PAIR 套接字绑定到这个地址,使它成为服务器端。zmq_connect 函数则使线程 2 的 PAIR 套接字连接到这个地址,成为客户端。

能干什么

  • inproc://pair1 允许两个线程在同一个进程内通过 ZeroMQ 实现快速、高效的消息通信。这种通信是线程安全的,并且比使用其他跨进程传输协议(如 TCP 或 IPC)更加高效。

  • 使用 inproc 可以避免线程间使用共享内存或复杂的同步机制(如信号量、互斥锁等)进行通信。

在本例中,inproc://pair1用于在线程 1 和线程 2 之间建立可靠的双向通信,利用 PAIR 套接字确保每条消息都被正确接收和发送。这使得线程可以方便地互相通信,而不需要管理复杂的同步和资源共享问题。

C++ 循环发消息

#include #include #include #include  void thread_function(int id, zmq::context_t& context) { zmq::socket_t socket(context, ZMQ_PAIR);  // 每个线程绑定到唯一的地址 std::string address = "inproc://pair" + std::to_string(id); socket.bind(address);  // 等待所有线程连接后开始通信 std::this_thread::sleep_for(std::chrono::seconds(1));  for (int i = 0; i < 10; ++i) { // 接收来自其他线程的消息 zmq::message_t request; socket.recv(request, zmq::recv_flags::none); std::string recv_msg(static_cast<char*>(request.data()), request.size()); std::cout << "Thread " << id << " received: " << recv_msg << std::endl;  // 发送消息给下一个线程 std::string next_address = "inproc://pair" + std::to_string((id + 1) % 10); zmq::socket_t next_socket(context, ZMQ_PAIR); next_socket.connect(next_address); std::string msg = "Hello from thread " + std::to_string(id); zmq::message_t reply(msg.size()); memcpy(reply.data(), msg.c_str(), msg.size()); next_socket.send(reply, zmq::send_flags::none); }} int main() { zmq::context_t context(1); std::vector<std::thread> threads;  // 创建并启动 10 个线程 for (int i = 0; i < 10; ++i) { threads.push_back(std::thread(thread_function, i, std::ref(context))); }  // 等待所有线程结束 for (auto& t : threads) { t.join(); }  return 0;}

程序解释

1.头文件包含

  • iostream: 用于标准输入输出流操作。

  • thread: 用于创建和管理多线程。

  • vector: 用于存储和管理线程对象的动态数组。

  • zmq.hpp: ZeroMQ C++ API 的头文件,用于消息传递和套接字管理。

2.thread_function函数

  • 每个线程都创建了一个 ZMQ_PAIR 类型的套接字,并绑定到一个唯一的 inproc://pair 地址。

  • 线程在开始通信前等待 1 秒,以确保所有线程都已经绑定到它们的地址。

  • 每个线程循环 10 次,在每次循环中执行以下步骤:1.接收来自其他线程的消息,并将消息内容打印到控制台。2.生成一个要发送给下一个线程的消息,并将其发送到下一个线程的地址。3.下一个线程的地址通过 id 计算得出,确保消息按顺序传递到下一个线程。

3.main函数

  • 创建一个 zmq::context_t 对象来管理 ZeroMQ 的套接字和通信。

  • 使用 std::thread 创建并启动 10 个线程,每个线程调用 thread_function,并传递它们的线程 ID 和上下文对象的引用。

  • 主线程使用 join 等待所有子线程执行完毕,确保所有线程的生命周期在程序结束前都得到正确的管理。

程序能用在哪些业务上?

线程间通信:该程序展示了如何在同一进程内的多个线程之间使用 ZeroMQ 实现消息传递。类似的模式可以应用于任何需要线程间通信的场景,例如多线程计算任务的协调、负载平衡等。

消息路由:在一些业务场景中,多个线程或模块需要按照某种逻辑顺序传递消息,例如流水线处理、任务分配等。这种环状的通信模式可以确保消息在各个节点之间按照特定顺序流转。

分布式计算:尽管此示例仅在单进程内工作,但类似的模式可以扩展到跨进程甚至跨机器的场景。例如,在分布式系统中,多个节点之间的消息传递和协作可以使用类似的机制进行管理。

事件驱动系统:在事件驱动系统中,不同的事件处理模块可能需要依次处理消息。通过这种线程间的消息传递机制,可以确保事件按照指定的顺序被处理和传递。

应用场景

独占PAIR模式在以下场景中特别有用:

线程同步: 在多线程环境中,多个线程可能需要成对地对共享资源进行操作。例如,读写锁(read-write lock)可以看作是独占PAIR模式的一种应用,只有读锁和写锁之间能够成对操作共享资源。

数据库事务: 当两个数据库连接(PAIR)需要并行工作以完成某个复杂的事务时,独占PAIR模式可以确保在事务完成之前,其他连接无法访问相关的数据或资源,从而保持数据的一致性。

双向通信协议: 在一些双向通信协议中,发送方和接收方作为PAIR进行操作,独占资源,如信号通道或数据包,在操作完成前,其他通信不能干涉。

设备驱动程序: 在硬件设备的驱动程序中,控制信号和数据传输通常成对进行操作,独占设备的访问权,以确保数据传输的完整性。

分布式系统: 在分布式系统中,不同节点之间可能需要成对操作某个共享的资源,独占PAIR模式确保在某对操作完成之前,其他节点无法进行冲突性的操作。

具体场景

1. 金融交易系统中的交易撮合

应用场景: 在金融交易系统中,买方和卖方的订单需要进行撮合。每一笔交易撮合都需要独占地访问交易引擎的某个部分资源,以确保交易的正确性和一致性。

具体应用:

  • 当一个买方订单和卖方订单撮合成功时,这两个订单的状态需要同时更新为“已完成”。

  • 在订单撮合过程中,独占PAIR模式确保只有这对买卖订单在这一时刻可以操作这个交易引擎的资源,防止其他订单干扰。

  • 这种模式避免了在高频交易情况下,由于并发导致的订单状态不一致或重复撮合问题。

2. 物流系统中的运输任务分配

应用场景: 在物流系统中,运输任务的分配需要同时考虑配送任务和可用的运输资源(如车辆、司机)。为了避免同一辆车被分配到多个不同的运输任务,系统需要确保任务和资源的独占性匹配。

具体应用:

  • 当一个运输任务(如从仓库A到地点B的货物运输)需要分配给某辆车时,系统会使用独占PAIR模式,将该任务与该车的资源配对。

  • 该车辆和任务配对后,系统锁定这一对资源,防止其他任务占用这辆车。

  • 任务完成后,车辆资源释放,再进行下一次任务分配。

  • 这样避免了由于资源竞争导致的任务分配混乱或资源浪费。

3. 在线购物平台的订单处理

应用场景: 在在线购物平台中,多个用户可能会同时购买同一件商品。在库存有限的情况下,系统需要保证每次下单操作能独占处理对应的库存数据,防止出现超卖的情况。

具体应用:

  • 每当一个用户发起购买请求时,系统会为用户订单和库存数据配对,确保这对资源(订单和库存)在处理过程中不被其他订单干扰。

  • 在确认支付和扣减库存的操作中,独占PAIR模式确保其他订单不能同时操作相同的库存数据,防止多名用户同时购买超出库存量的商品。

  • 一旦订单处理完成,库存数据的锁定状态解除,其他用户的订单可以继续操作。

  • 这种方式有效避免了超卖问题,确保用户体验和库存管理的准确性。

4. 社交媒体平台的消息传递

应用场景: 在社交媒体平台上,用户之间的私密消息需要确保传输过程的私密性和准确性,特别是当消息可能同时发送给多名接收者时。

具体应用:

  • 当用户A向用户B发送消息时,系统会确保这条消息和用户B的接收信箱形成独占的配对关系。

  • 在这个过程中,系统确保用户B的信箱不会同时接受来自其他来源的同一消息,防止重复发送或接收错误。

  • 一旦消息传输成功,配对关系解除,用户B可以接收其他新的消息。

  • 独占PAIR模式确保了消息传递的完整性和准确性,避免了多线程环境下的消息混乱或丢失。

5. 分布式锁在微服务中的使用

应用场景:在微服务架构中,多个服务实例可能同时尝试操作同一资源,如更新数据库中的某一条记录。为了防止并发更新导致的数据不一致问题,分布式锁通常被用来控制资源的独占访问。

金融交易系统中的交易撮合 ZEROMQ C语言

这个示例将展示如何使用ZeroMQ消息队列进行买卖订单的撮合,同时确保交易撮合的独占性。

这个示例包含两个主要部分:

  1. 订单生成器:生成买卖订单,并通过ZeroMQ推送到交易撮合引擎。

  2. 交易撮合引擎:接收订单并进行撮合处理,确保每一对订单的独占性。

#include #include #include #include #include #include  // 定义订单结构体,包含订单ID、买卖方向、数量和价格typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price;} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price && buy_order->quantity == sell_order->quantity) { // 如果匹配成功,打印匹配信息 printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, buy_order->quantity, buy_order->price); } else { // 如果不匹配,打印未匹配的信息 printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字  int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间)  // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f\n", order.order_id, order.side, order.quantity, order.price);  Sleep(100); // 模拟订单生成频率,休眠100毫秒 }  zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达  Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单  while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0);  // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE);  if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; match_orders(&buy_order, &sell_order); has_buy_order = 0; // 重置标记,表示买单已撮合 } else { // 如果订单无法撮合,打印提示信息 printf("Order %d queued for future matching.\n", order.order_id); }  // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); }  zmq_close(socket); // 关闭套接字 return 0;} // 主程序入口int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new();  // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL);  // 定义线程句柄 HANDLE generator_thread, matcher_thread;  // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL);  // 创建订单撮合线程 matcher_thread = CreateThread(NULL, 0, order_matcher, context, 0, NULL);  // 等待线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); WaitForSingleObject(matcher_thread, INFINITE);  // 关闭线程句柄 CloseHandle(generator_thread); CloseHandle(matcher_thread);  // 关闭互斥锁 CloseHandle(match_mutex);  // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;}

代码解析

1.订单生成器(order_generator)

  • 使用ZeroMQ PUSH套接字将生成的订单推送到交易撮合引擎。

  • 每个订单都包含一个唯一的订单ID、买卖方向、数量和价格。

  • 每隔一定时间生成一个新的订单,模拟真实的交易环境。

2.交易撮合引擎(order_matcher)

  • 使用ZeroMQ PULL套接字接收订单。

  • 通过互斥锁(pthread_mutex_t)实现独占PAIR模式,确保每次撮合操作的并发安全性。

  • 如果当前有买单并收到卖单,尝试撮合。如果买单价格大于或等于卖单价格且数量相等,则匹配成功。

  • 匹配后,释放互斥锁,允许下一个订单对继续撮合。

3.主程序

  • 创建并启动订单生成器和交易撮合引擎线程。

  • 主线程等待子线程完成。

扩展与优化

  • 并发撮合:可以使用多个撮合引擎线程,以处理更高并发的订单流。

  • 持久化订单数据:在实际系统中,订单信息和撮合结果通常需要持久化到数据库。

  • 复杂撮合逻辑:实际的交易撮合可能包括部分成交、不同优先级的订单撮合等,可以根据需求进一步扩展。

在现有的基础上,可以进行以下扩展与优化,以实现更高效、更复杂的交易撮合系统。我们将:

  1. 并发撮合:增加多个撮合引擎线程,以处理更高并发的订单流。

  2. 持久化订单数据:模拟将订单信息和撮合结果持久化到数据库。

  3. 复杂撮合逻辑:引入部分成交和不同优先级的订单撮合逻辑。

#include #include #include #include #include #include  // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price);  // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price);  // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity;  // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字  int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间)  // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority);  Sleep(100); // 模拟订单生成频率,休眠100毫秒 }  zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达  Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单  while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0);  // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE);  if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); }  // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); }  zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new();  // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL);  // 定义线程句柄 HANDLE generator_thread, matcher_thread[4];  // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL);  // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); }  // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); }  // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); }  // 关闭互斥锁 CloseHandle(match_mutex);  // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;}

扩展与优化解释

1.并发撮合

  • 增加了4个并发的撮合线程,以提高系统的吞吐量和处理能力。

  • 每个撮合线程独立处理接收到的订单,并通过互斥锁保证撮合操作的独占性。

2.持久化订单数据

  • 在订单撮合完成后,系统会模拟将撮合结果“持久化”到数据库。这一部分目前通过打印日志来模拟。

  • 实际系统中,可以将这些数据写入数据库(例如使用SQLite、MySQL等)。

3.复杂撮合逻辑

  • 引入了部分成交:如果买单和卖单的数量不一致,系统会撮合其中较小的部分,并保留未撮合的部分。

  • 引入了优先级撮合:每个订单都有一个优先级(1到10之间)。当有新订单到达时,,如果其优先级高于当前待撮合订单,会优先处理高优先级订单。

#include #include #include #include #include #include  // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price);  // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price);  // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity;  // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字  int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间)  // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority);  Sleep(100); // 模拟订单生成频率,休眠100毫秒 }  zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达  Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单  while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0);  // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE);  if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); }  // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); }  zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new();  // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL);  // 定义线程句柄 HANDLE generator_thread, matcher_thread[4];  // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL);  // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); }  // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); }  // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); }  // 关闭互斥锁 CloseHandle(match_mutex);  // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;}

ZEROMQ C++语言

#include  #include #include #include #include #include #include #include #include  // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级struct Order { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)}; // 全局互斥锁,用于确保撮合操作的独占性std::mutex match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order& buy_order, Order& sell_order) { // 检查买单和卖单是否匹配 if (buy_order.price >= sell_order.price) { int matched_quantity = std::min(buy_order.quantity, sell_order.quantity); std::cout << "Matched Order ID: " << buy_order.order_id << " (Buy) with Order ID: " << sell_order.order_id << " (Sell) - Quantity: " << matched_quantity << " at Price: " << buy_order.price << std::endl;  // 模拟持久化撮合结果 std::cout << "Persisting Match: Buy Order " << buy_order.order_id << ", Sell Order " << sell_order.order_id << ", Quantity " << matched_quantity << ", Price " << buy_order.price << std::endl;  // 更新剩余数量 buy_order.quantity -= matched_quantity; sell_order.quantity -= matched_quantity;  // 检查是否有剩余订单需要处理 if (buy_order.quantity > 0) { std::cout << "Remaining Buy Order ID: " << buy_order.order_id << ", Quantity: " << buy_order.quantity << std::endl; } if (sell_order.quantity > 0) { std::cout << "Remaining Sell Order ID: " << sell_order.order_id << ", Quantity: " << sell_order.quantity << std::endl; } } else { std::cout << "Order ID: " << buy_order.order_id << " (Buy) and Order ID: " << sell_order.order_id << " (Sell) not matched due to price." << std::endl; }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎void order_generator(zmq::context_t* context) { try { // 创建ZeroMQ PUSH套接字用于发送订单 zmq::socket_t socket(*context, zmq::socket_type::push); socket.connect("tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字  int order_id = 1; srand(static_cast<unsigned int>(time(NULL))); // 初始化随机数种子 while (true) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = static_cast<float>(rand() % 1000) / 10.0f; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间)  // 发送订单到撮合引擎 socket.send(zmq::buffer(&order, sizeof(Order)), zmq::send_flags::none); std::cout << "Generated Order ID: " << order.order_id << ", Side: " << order.side << ", Quantity: " << order.quantity << ", Price: " << order.price << ", Priority: " << order.priority << std::endl;  Sleep(100); // 模拟订单生成频率,休眠100毫秒 } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_generator: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_generator: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_generator" << std::endl; }} // 订单撮合线程函数,接收订单并进行撮合void order_matcher(zmq::context_t* context) { try { // 创建ZeroMQ PULL套接字用于接收订单 zmq::socket_t socket(*context, zmq::socket_type::pull); socket.bind("tcp://*:5555"); // 绑定到端口5555,等待订单到达  Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 bool has_buy_order = false; // 标记是否有待撮合的买单  while (true) { Order order; // 从ZeroMQ套接字接收订单 socket.recv(zmq::buffer(&order, sizeof(Order)), zmq::recv_flags::none);  // 锁定互斥锁,确保撮合操作的独占性 std::lock_guard<std::mutex> lock(match_mutex);  if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = true; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { std::cout << "New Sell Order ID: " << sell_order.order_id << " has higher priority than Buy Order ID: " << buy_order.order_id << std::endl; has_buy_order = false; // 放弃当前订单,等待新撮合 } else { match_orders(buy_order, sell_order); has_buy_order = (buy_order.quantity > 0); // 检查买单是否还有剩余 } } else { std::cout << "Order " << order.order_id << " queued for future matching." << std::endl; } } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_matcher: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_matcher: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_matcher" << std::endl; }} int main() { try { // 创建ZeroMQ上下文 zmq::context_t context(1);  // 定义线程容器 std::vector<std::thread> matcher_threads;  // 创建订单生成器线程 std::thread generator_thread(order_generator, &context);  // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; ++i) { matcher_threads.emplace_back(order_matcher, &context); }  // 等待订单生成器线程执行完毕 generator_thread.join(); // 等待所有撮合线程执行完毕 for (auto& thread : matcher_threads) { thread.join(); }  } catch (const std::exception& e) { std::cerr << "Exception in main: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in main" << std::endl; }  return 0;}

主要改动与解释

1.使用C++特性

  • #include代替了 #include ,使用 std::cout 和 std::endl 进行输出。

  • 使用了 std::thread 来代替 Windows 的 CreateThread。

  • 使用了 std::vector容器来管理多个撮合线程,避免手动管理线程句柄。

2.ZeroMQ C++ API

  • C++代码使用了 zmq.hpp,它是ZeroMQ的C++绑定库,比纯C语言API更具可读性和便利性。

  • zmq::context_tzmq::socket_t 代替了C语言中的 zmq_ctx_new()zmq_socket()

3.线程管理

  • 使用 std::thread 来创建并管理线程,thread.join() 替代了 WaitForSingleObject

  • std::vector 容器被用来存储多个撮合线程,这样代码更加整洁和易于扩展。

4.使用标准库

  • 使用 std::min 来替代手动的 if-else 判断更简洁地获取最小值。

小结:ZeroMQ的独占模式(Exclusive Pair Pattern)是一种设计模式,旨在确保两个通信对端(PAIR)在同一时间段内独占地访问共享资源,防止其他对端干扰。它通过限制并发访问,确保资源的安全性和一致性。在高并发场景中,独占模式有助于避免数据竞争、死锁和不一致性问题。ZeroMQ通过多种套接字模式(如PAIR、REQ/REP)和线程同步机制,支持独占模式的实现,保证消息在严格的单对单通信通道中传递,从而提高系统的稳定性和可靠性。


声明: 本文转载自其它媒体或授权刊载,目的在于信息传递,并不代表本站赞同其观点和对其真实性负责,如有新闻稿件和图片作品的内容、版权以及其它问题的,请联系我们及时删除。(联系我们,邮箱:evan.li@aspencore.com )
0
评论
  • 相关技术文库
  • C语言
  • 编程
  • 软件开发
  • 程序
下载排行榜
更多
评测报告
更多
广告