• ZeroMQ的独占模式学习

    一、通信模式介绍 引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。” ---来自百度 ZeroMQ已经支持多种通信模式: Request-reply pattern(请求-回复模式) Publish-subscribe pattern(发布-订阅模式) Pipeline pattern(管道模式) Exclusive pair pattern (独占对等模式) 1-N、N-1 和 N-M消息传递模式 1-N 模式 本质:一个发送者(源)将消息广播或分发给多个接收者。发送者主动推送消息,而接收者被动接收消息。这种模式常用于广播和消息分发。 举例: Pub-Sub(发布-订阅)模式:一个发布者发布消息,多个订阅者接收这些消息。 场景:新闻广播系统 发布者:新闻机构 订阅者:各种新闻应用和用户 N-1 模式 本质:多个发送者将消息发送到一个接收者。接收者主动从多个发送者处拉取消息,而发送者被动推送消息。这种模式适用于任务收集或负载均衡。 举例: Pipeline(Push-Pull)模式:多个任务生成者(Push)将任务推送到一个任务处理器(Pull)。 场景:日志收集系统 任务生成者:各个服务器的日志生成模块 任务处理器:日志分析或存储系统 N-M 模式 本质:多个发送者将消息发送到多个接收者。每个发送者可以向多个接收者发送消息,多个接收者可以从多个发送者处接收消息。这种模式适用于复杂的广播、聚合或分布式任务处理。 举例: Pub-Sub(发布-订阅):多个发布者广播消息到多个订阅者。每个发布者可以广播不同的主题,订阅者可以订阅多个主题。 场景:实时数据流系统 发布者:数据源(传感器、日志系统) 订阅者:多个应用程序(监控仪表盘、分析工具) 1. Request-reply pattern(请求-回复模式) REQ套接字 REQ套接字用于客户端向服务发送请求并接收回复。该套接字类型仅允许发送和随后的接收调用交替进行。REQ套接字可以连接到任意数量的REP或ROUTER套接字。每个发送的请求会在所有连接的服务之间进行轮询,每个接收到的回复与最后发出的请求匹配。它适用于简单的请求-回复模型,其中对失效对等方的可靠性不是问题。 如果没有服务可用,则套接字上的任何发送操作将阻塞,直到至少有一个服务可用。REQ套接字不会丢弃任何消息。 特性总结: 兼容的对等套接字:REP, ROUTER 方向:双向 发送/接收模式:发送,接收,发送,接收,… 出站路由策略:轮询 入站路由策略:最后一个对等方 静默状态下的动作:阻塞 REP套接字 REP套接字用于服务从客户端接收请求并发送回复。该套接字类型仅允许接收和随后的发送调用交替进行。每个接收到的请求从所有客户端中公平排队,每个发送的回复路由到发出最后请求的客户端。如果原始请求者不存在,则回复会被静默丢弃。 特性总结: 兼容的对等套接字:REQ, DEALER 方向:双向 发送/接收模式:接收,发送,接收,发送,… 出站路由策略:公平轮询 入站路由策略:最后一个对等方 DEALER套接字 DEALER套接字类型与一组匿名对等方通信,使用轮询算法发送和接收消息。它是可靠的,因为它不会丢弃消息。DEALER作为REQ的异步替代,适用于与REP或ROUTER服务器通信的客户端。DEALER接收到的消息从所有连接的对等方中公平排队。 当DEALER套接字由于达到所有对等方的高水位标记而进入静默状态,或者如果根本没有对等方,则套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个对等方可用;消息不会被丢弃。 当DEALER套接字连接到REP套接字时,发送的消息必须包含一个空帧作为消息的第一部分(分隔符),后跟一个或多个正文部分。 特性总结: 兼容的对等套接字:ROUTER, REP, DEALER 方向:双向 发送/接收模式:不受限制 出站路由策略:轮询 入站路由策略:公平排队 静默状态下的动作:阻塞 ROUTER套接字 ROUTER套接字类型与一组对等方通信,使用显式地址,以便每个传出的消息发送到特定的对等连接。ROUTER作为REP的异步替代,通常用于与DEALER客户端通信的服务器。 当接收消息时,ROUTER套接字将在消息前添加一个包含原始对等方路由ID的消息部分,然后传递给应用程序。接收到的消息从所有连接的对等方中公平排队。当发送消息时,ROUTER套接字将移除消息的第一部分,并使用它来确定消息应路由到的对等方的路由ID。如果对等方不再存在,或者从未存在,则消息将被静默丢弃。 当ROUTER套接字由于达到所有对等方的高水位标记而进入静默状态时,发送到套接字的任何消息将被丢弃,直到静默状态结束。同样,任何路由到已达到其单个高水位标记的对等方的消息也将被丢弃。 当REQ套接字连接到ROUTER套接字时,除了原始对等方的路由ID,每个接收到的消息应包含一个空分隔符消息部分。因此,应用程序看到的每个接收消息的整个结构变为:一个或多个路由ID部分,分隔符部分,一个或多个正文部分。当发送回复给REQ套接字时,应用程序必须包含分隔符部分。 特性总结: 兼容的对等套接字:DEALER, REQ, ROUTER 方向:双向 发送/接收模式:不受限制 出站路由策略:见正文 入站路由策略:公平排队 静默状态下的动作:丢弃(见正文) 特性详细解释 REQ套接字 1.兼容的对等套接字: REP (Reply)和ROUTER套接字与 REQ 套接字兼容。也就是说,REQ 套接字可以与 REP 套接字和 ROUTER 套接字进行通信。 REP:这是一个典型的服务端套接字,负责接收请求并发送响应。 ROUTER:这是一个更复杂的服务端套接字,能够处理多个客户端,并可以进行更复杂的路由操作。 2.方向: 双向:虽然 REQ 套接字只处理请求和响应的交替操作,但它的通信方向是双向的。即,客户端发送请求,服务端返回响应,整个过程是双向交互的。 3.发送/接收模式: 发送,接收,发送,接收:REQ 套接字的工作模式是交替的。每次发送请求后,必须等待响应才能发送下一个请求。也就是说,发送和接收操作是严格交替进行的,这保证了每个请求都有一个响应对应。 4.出站路由策略: 轮询:REQ 套接字会在所有连接的服务端之间轮询发送请求。假设客户端连接到多个服务端(如 REP 套接字),REQ 套接字会按照轮询的方式将请求发送给各个服务端。这有助于实现负载均衡。 5.入站路由策略: 最后一个对等方:每个发送的请求的响应必须来自于最后一个接收到请求的对等方。这意味着每个请求和其对应的响应是匹配的,并且每个请求只会从一个特定的服务端得到回应。 6.静默状态下的动作: 阻塞:如果 REQ 套接字没有任何服务端可用时,发送操作会阻塞。也就是说,客户端会等待,直到至少有一个服务端可以接受请求。在没有服务端的情况下,请求不会丢失,而是会被阻塞,等待服务端的出现。 2. Publish-subscribe pattern(发布-订阅模式) 发布-订阅模式用于将数据从单个发布者以扇出方式分发到多个订阅者。 发布-订阅模式由RFC 29/PUBSUB正式定义。 ZeroMQ通过四种套接字类型支持发布/订阅: PUB 套接字类型 XPUB 套接字类型 SUB 套接字类型 XSUB 套接字类型 PUB 套接字 PUB套接字用于发布者分发数据。发送的消息以扇出方式分发给所有连接的对等方。此套接字类型不能接收任何消息。 当PUB套接字由于达到订阅者的高水位标记而进入静默状态时,将丢弃发送给该订阅者的消息,直到静默状态结束。发送功能永远不会阻塞此套接字类型。 特性总结: 兼容的对等套接字:SUB, XSUB 方向:单向 发送/接收模式:仅发送 入站路由策略:不适用 出站路由策略:扇出 静默状态下的动作:丢弃 SUB 套接字 SUB套接字用于订阅者订阅发布者分发的数据。最初SUB套接字不订阅任何消息。此套接字类型没有实现发送功能。 特性总结: 兼容的对等套接字:PUB, XPUB 方向:单向 发送/接收模式:仅接收 入站路由策略:公平排队 出站路由策略:不适用 XPUB 套接字 与PUB套接字相同,不同之处在于可以以传入消息的形式接收来自对等方的订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也会被接收,但对订阅状态没有影响。 特性总结: 兼容的对等套接字:ZMQ_SUB, ZMQ_XSUB 方向:单向 发送/接收模式:发送消息,接收订阅 入站路由策略:不适用 出站路由策略:扇出 静默状态下的动作:丢弃 XSUB 套接字 与SUB套接字相同,不同之处在于通过向套接字发送订阅消息来订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也可以发送,但对订阅状态没有影响。 特性总结: 兼容的对等套接字:ZMQ_PUB, ZMQ_XPUB 方向:单向 发送/接收模式:接收消息,发送订阅 入站路由策略:公平排队 出站路由策略:不适用 静默状态下的动作:丢弃 3. Pipeline pattern(管道模式) 管道模式用于任务分发,通常在多阶段流水线中,其中一个或少数节点将工作推送给许多工作节点,然后它们再将结果推送给一个或少数收集节点。这个模式主要是可靠的,只要节点不意外断开连接,消息就不会被丢弃。它具有可扩展性,节点可以随时加入。 管道模式由RFC 30/PIPELINE正式定义。 ZeroMQ通过两种套接字类型支持管道模式: PUSH 套接字类型 PULL 套接字类型 PUSH 套接字 PUSH套接字类型与一组匿名的PULL对等方通信,使用轮询算法发送消息。此套接字类型没有实现接收操作。 当PUSH套接字由于达到所有下游节点的高水位标记而进入静默状态,或者根本没有下游节点时,套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个下游节点可用进行发送;消息不会被丢弃。 特性总结: 兼容的对等套接字:PULL 方向:单向 发送/接收模式:仅发送 入站路由策略:不适用 出站路由策略:轮询 静默状态下的动作:阻塞 PULL 套接字 PULL套接字类型与一组匿名的PUSH对等方通信,使用公平排队算法接收消息。 此套接字类型没有实现发送操作。 特性总结: 兼容的对等套接字:PUSH 方向:单向 发送/接收模式:仅接收 入站路由策略:公平排队 出站路由策略:不适用 静默状态下的动作:阻塞 小结:在管道模式中,PUSH套接字负责将任务分发给多个PULL套接字,PULL套接字则负责接收这些任务。该模式通过轮询和公平排队算法确保任务的有效分配和处理,并且具备高可靠性和可扩展性,是实现任务分发和工作负载均衡的有效方式。 4. Exclusive pair pattern (独占 PAIR 模式) 独占对等模式 PAIR套接字不是通用套接字,而是用于特定用例,其中两个对等方在架构上是稳定的。这通常将PAIR限制在单个进程内,用于线程间通信。 独占对等模式由RFC 31/EXPAIR正式定义。 PAIR 套接字 PAIR套接字类型只能与一个对等方建立连接。消息在PAIR套接字上发送时不会进行路由或过滤。 当PAIR套接字由于达到连接对等方的高水位标记而进入静默状态,或者没有对等方连接时,套接字上的任何发送操作将阻塞,直到对等方变得可用进行发送;消息不会被丢弃。 尽管PAIR套接字可以通过除inproc之外的其他传输方式使用,但由于它们无法自动重连,并且在存在任何先前连接(包括处于关闭状态的连接)时,新入站连接将被终止,因此在大多数情况下,它们不适用于TCP。 小结:PAIR套接字专用于架构上稳定的两个对等方之间的通信,通常用于单个进程内的线程间通信。它只能与一个对等方连接,并且不支持消息路由或过滤。当达到高水位标记时,发送操作会阻塞,直到连接恢复。由于缺乏自动重连功能和处理新连接的限制,PAIR套接字不适合用于TCP连接。 二、请求-响应模式实现 请求响应模式是通信中最简单和基础的模式,ZeroMQ同样支持这个模式。 请求响应基础 请求-响应模式是计算机科学和网络通信中一种常见的通信模式。这个模式通常涉及两个主要角色:客户端和服务器。 基本概念 客户端: 向服务器发送请求的实体。它可以是浏览器、应用程序或任何其他发起通信的设备或程序。 服务器: 接收客户端请求并返回响应的实体。它通常是一个提供服务、资源或数据的程序或设备。 工作流程 客户端发起请求: 客户端构造一个请求消息,通常包含请求的类型(如 GET、POST)、请求的资源(如网页、API端点)、以及可能的附加数据(如表单数据)。 服务器处理请求: 服务器接收到请求后,解析请求内容,根据请求的类型和资源进行处理。处理可能包括访问数据库、执行计算或调用其他服务。 服务器返回响应: 处理完成后,服务器生成一个响应消息,通常包含状态码(如200表示成功、404表示资源未找到)、响应的内容(如网页内容、数据结果)以及其他信息(如响应时间、服务器信息)。 客户端接收响应: 客户端接收到响应后,解析响应内容并根据需要展示或处理这些数据。 典型应用 网页浏览: 当你在浏览器中输入网址并按下回车时,浏览器(客户端)会向服务器发出一个请求,服务器会返回网页内容作为响应。 API调用: 在应用程序中调用API时,客户端发送请求(如获取数据、提交表单),服务器处理请求并返回结果。 请求-响应模式的特点 同步通信: 通常情况下,请求-响应模式是同步的,即客户端发送请求后会等待服务器响应完成后才继续执行后续操作。 单向通信: 这种模式是一种单向通信,客户端请求数据,服务器响应数据,但服务器不会主动向客户端发送消息(除非使用长轮询或WebSocket等技术)。 简单直观: 由于其简单的结构和流程,很多网络协议和应用程序设计都是基于这种模式的。 应用实例 HTTP/HTTPS: 用于Web浏览和API交互的协议,客户端(浏览器或应用)发送HTTP请求,服务器返回HTTP响应。 REST API: 基于HTTP协议的API风格,允许客户端通过标准的HTTP请求(如GET、POST、PUT、DELETE)与服务器进行交互。 优点 易于理解和实现: 请求-响应模式简洁明了,易于理解和实现。 兼容性强: 许多网络协议和技术都基于这种模式,因此具有良好的兼容性。 缺点 延迟问题: 在网络不稳定的情况下,请求和响应的延迟可能影响用户体验。 同步阻塞: 客户端通常需要等待响应完成才能继续执行,可能导致性能瓶颈。 总的来说,请求-响应模式是现代计算和通信中的基础构建块,为各种网络应用和服务提供了一个标准化的通信方式。 ZEROMQ  C语言 在 C 语言中使用 ZeroMQ 实现请求-回复模式(Request-Reply Pattern)涉及创建一个请求端和一个回复端,通过 ZeroMQ 套接字进行通信。 服务器端代码(Reply Server) 下面是使用 C 语言编写的 ZeroMQ 请求-回复模式的示例代码。我们将分别实现一个服务器端(Reply Server)和一个客户端(Request Client)。 #include #include #include #include int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new(); // 创建 REP (reply) 套接字 void *responder = zmq_socket(context, ZMQ_REP); // 将套接字绑定到端口 zmq_bind(responder, "tcp://*:5555"); while (1) { // 接收请求 char buffer[256]; zmq_recv(responder, buffer, 255, 0); printf("Received request: %s\n", buffer); // 发送回复 const char *reply = "World"; zmq_send(responder, reply, strlen(reply), 0); } // 清理资源 zmq_close(responder); zmq_ctx_destroy(context); return 0;} 客户端代码(Request Client) #include #include #include int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new(); // 创建 REQ (request) 套接字 void *requester = zmq_socket(context, ZMQ_REQ); // 连接到服务器 zmq_connect(requester, "tcp://localhost:5555"); // 发送请求 const char *request = "Hello"; zmq_send(requester, request, strlen(request), 0); // 接收回复 char buffer[256]; zmq_recv(requester, buffer, 255, 0); buffer[255] = '\0'; // 确保字符串以 null 结尾 printf("Received reply: %s\n", buffer); // 清理资源 zmq_close(requester); zmq_ctx_destroy(context); return 0;} 编译代码 编译上述代码时,需要链接 ZeroMQ 库。下面是使用 gcc 编译器的示例命令: gcc -o server server.c -lzmqgcc -o client client.c -lzmq 详细说明 ZeroMQ Context: zmq_ctx_new() 用于创建 ZeroMQ 上下文,管理所有的套接字和连接。每个应用程序应该有一个上下文对象。 Sockets: 使用 zmq_socket() 创建套接字。请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。 Bind 和 Connect: zmq_bind() 绑定服务器端套接字到指定的地址和端口。 zmq_connect() 连接客户端套接字到服务器端的地址和端口。 消息传递: 使用 zmq_send() 发送消息。 使用 zmq_recv() 接收消息。 资源清理: zmq_close() 关闭套接字。 zmq_ctx_destroy() 销毁上下文,释放相关资源。 ZEROMQ  C++ 安装 ZeroMQ C++ Bindings (cppzmq) ZeroMQ 的 C++ 绑定库 cppzmq 提供了对 ZeroMQ 的 C++ 封装。你可以通过包管理工具或者从 GitHub 下载源代码来安装它。 使用 vcpkg vcpkg install cppzmq 从 GitHub 安装 git clone https://github.com/zeromq/cppzmq.git 编写代码 下面是一个简单的示例,展示了如何在 C++ 中使用 ZeroMQ 实现请求-回复模式。我们将分别编写一个服务器(Reply Server)和一个客户端(Request Client)。 服务器端代码(Reply Server) #include #include #include int main() { // Initialize ZeroMQ context zmq::context_t context(1); // Create a REP (reply) socket zmq::socket_t socket(context, ZMQ_REP); // Bind the socket to an endpoint (address:port) socket.bind("tcp://*:5555"); while (true) { // Receive a request zmq::message_t request; socket.recv(request); std::string request_str(static_cast<char*>(request.data()), request.size()); std::cout << "Received request: " << request_str << std::endl; // Send a reply std::string reply_str = "World"; zmq::message_t reply(reply_str.size()); memcpy(reply.data(), reply_str.data(), reply_str.size()); socket.send(reply); } return 0;} 客户端代码(Request Client) #include #include #include int main() { // Initialize ZeroMQ context zmq::context_t context(1); // Create a REQ (request) socket zmq::socket_t socket(context, ZMQ_REQ); // Connect to the server (address:port) socket.connect("tcp://localhost:5555"); // Send a request std::string request_str = "Hello"; zmq::message_t request(request_str.size()); memcpy(request.data(), request_str.data(), request_str.size()); socket.send(request); // Receive a reply zmq::message_t reply; socket.recv(reply); std::string reply_str(static_cast<char*>(reply.data()), reply.size()); std::cout << "Received reply: " << reply_str << std::endl; return 0;} 解释 ZeroMQ Context: 这是 ZeroMQ 的基础对象,负责管理所有的套接字和连接。每个线程应该有一个唯一的上下文对象。 Sockets: ZeroMQ 的套接字对象用于发送和接收消息。在请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。 Bind 和 Connect: 服务器使用 bind 将套接字绑定到一个特定的地址和端口,客户端使用 connect 连接到该地址和端口。 消息传递: 使用 send 和 recv 方法来传递消息。消息是通过 zmq::message_t 对象来表示的。 小结 使用 ZeroMQ 实现请求-响应模式可以带来显著的性能提升和灵活性。它不仅支持高性能的消息传递,还提供了丰富的特性,如自动重连、负载均衡、多语言支持等,使得它成为构建高性能、可靠的分布式系统和微服务架构的理想选择。 三、发布-订阅模式实现 发布订阅模式是典型的异步模式,通过ZeroMq来看看他的原理与实现。 简称 Pub-Sub,是一种消息传递模式。 允许发送者(发布者)和接收者(订阅者)之间解耦。 它广泛应用于消息队列、事件驱动系统和实时通知等场景。 基本原理 参与者: 发布者(Publisher):发送消息的实体。 订阅者(Subscriber):接收消息的实体。 消息代理(Message Broker):中介实体,负责接收发布者的消息并分发给相应的订阅者。 主题(Topic): 消息根据主题分类,订阅者订阅一个或多个主题,发布者将消息发布到特定主题。 消息传递: 发布者将消息发送到消息代理,并指定消息的主题。 消息代理根据主题将消息分发给所有订阅了该主题的订阅者。 工作流程 订阅(Subscribe): 订阅者向消息代理注册自己对某个主题的兴趣。 订阅者可以订阅多个主题。 发布(Publish): 发布者向消息代理发送消息,并指定消息的主题。 分发(Distribute): 消息代理接收到消息后,根据主题查找所有订阅了该主题的订阅者。 消息代理将消息分发给所有符合条件的订阅者。 举例说明 假设有一个天气预报系统: 发布者:天气预报服务 订阅者:用户手机应用、网页应用等 主题:不同城市的天气(如“北京天气”、“上海天气”) 用户 A 通过手机应用订阅了“北京天气”主题。 用户 B 通过网页应用订阅了“上海天气”主题。 天气预报服务发布了一条“北京天气”的消息到消息代理。 消息代理接收到消息后,将其分发给所有订阅了“北京天气”主题的用户应用(如用户 A 的手机应用)。 优点 解耦:发布者和订阅者不直接交互,彼此独立。 扩展性:可以方便地增加或减少订阅者,不影响发布者。 灵活性:可以动态改变订阅关系,适应不同需求。 缺点 复杂性:需要维护消息代理和订阅关系,增加系统复杂性。 可靠性:消息传递的可靠性需要额外保障,如消息丢失和重复的问题。 C代码实现天气预报系统 #include #include #include #include #include #include const char* cities[] = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const char* weather_conditions[] = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息void generate_weather_update(char* buffer, size_t buffer_size, const char* city) { int temp = rand() % 35; // 随机温度 const char* condition = weather_conditions[rand() % 10]; // 随机天气情况 snprintf(buffer, buffer_size, "%s天气 %s %d°C", city, condition, temp);} // 发布天气更新消息DWORD WINAPI publish_weather_update(LPVOID arg) { void* context = arg; const char* address = "tcp://*:5556"; // 创建发布者套接字 void* publisher = zmq_socket(context, ZMQ_PUB); zmq_bind(publisher, address); srand((unsigned)time(NULL)); while (1) { // 随机选择城市并生成天气更新 for (int i = 0; i < 10; ++i) { char update[256]; generate_weather_update(update, sizeof(update), cities[i]); zmq_send(publisher, update, strlen(update), 0); printf("发布消息: %s\n", update); } // 模拟发布间隔 Sleep(10000); // Windows 中使用 Sleep } // 关闭套接字 zmq_close(publisher); return 0;} // 订阅天气更新消息DWORD WINAPI subscribe_weather_updates(LPVOID arg) { void* context = zmq_ctx_new(); const char* city = (char*)arg; const char* address = "tcp://localhost:5556"; // 创建订阅者套接字 void* subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, address); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, city, strlen(city)); while (1) { // 接收消息 char update[256]; int size = zmq_recv(subscriber, update, 255, 0); if (size != -1) { update[size] = '\0'; printf("接收消息 (%s): %s\n", city, update); } } // 关闭套接字 zmq_close(subscriber); zmq_ctx_destroy(context); return 0;} int main() { void* context = zmq_ctx_new(); // 创建发布者线程 HANDLE publisher_thread = CreateThread(NULL, 0, publish_weather_update, context, 0, NULL); // 创建10个订阅者线程 HANDLE subscriber_threads[10]; for (int i = 0; i < 10; ++i) { subscriber_threads[i] = CreateThread(NULL, 0, subscribe_weather_updates, (LPVOID)cities[i], 0, NULL); } // 等待线程完成 WaitForSingleObject(publisher_thread, INFINITE); for (int i = 0; i < 10; ++i) { WaitForSingleObject(subscriber_threads[i], INFINITE); CloseHandle(subscriber_threads[i]); } // 关闭线程句柄 CloseHandle(publisher_thread); zmq_ctx_destroy(context); return 0;} 说明 发布者线程:publish_weather_update 函数在一个独立线程中运行,发布10个城市的天气预报消息,天气信息随机变化。 订阅者线程:subscribe_weather_updates 函数在10个独立线程中运行,每个线程订阅一个特定城市的天气预报消息。 主程序:在主程序中创建并启动发布者和10个订阅者线程,然后等待它们完成。 C++代码 #include #include #include #include #include #include #include #include const std::vector<std::string> cities = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const std::vector<std::string> weather_conditions = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息std::string generate_weather_update(const std::string& city) { int temp = rand() % 35; // 随机温度 const std::string& condition = weather_conditions[rand() % weather_conditions.size()]; // 随机天气情况 return city + "天气 " + condition + " " + std::to_string(temp) + "°C";} // 发布天气更新消息void publish_weather_update(zmq::context_t& context) { zmq::socket_t publisher(context, ZMQ_PUB); publisher.bind("tcp://*:5556"); srand(static_cast<unsigned>(time(nullptr))); while (true) { for (const auto& city : cities) { std::string update = generate_weather_update(city); zmq::message_t message(update.begin(), update.end()); publisher.send(message, zmq::send_flags::none); std::cout << "发布消息: " << update << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1)); }} // 订阅天气更新消息void subscribe_weather_updates(const std::string& city) { zmq::context_t context(1); zmq::socket_t subscriber(context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); subscriber.set(zmq::sockopt::subscribe, city); while (true) { zmq::message_t message; subscriber.recv(message, zmq::recv_flags::none); std::string update(static_cast<char*>(message.data()), message.size()); std::cout << "接收消息 (" << city << "): " << update << std::endl; }} int main() { zmq::context_t context(1); // 创建发布者线程 std::thread publisher_thread(publish_weather_update, std::ref(context)); // 创建10个订阅者线程 std::vector<std::thread> subscriber_threads; for (const auto& city : cities) { subscriber_threads.emplace_back(subscribe_weather_updates, city); } // 等待线程完成 publisher_thread.join(); for (auto& thread : subscriber_threads) { thread.join(); } return 0;} 其他场景举例 1. 金融市场数据分发 应用场景: 股票市场:股票价格、交易量等市场数据的实时分发。 外汇市场:汇率变动、交易量等数据的实时更新。 详细举例: Bloomberg 和 Reuters 等金融信息服务商使用发布-订阅模式来分发实时的市场数据给订阅者,这些订阅者可能是金融机构、投资者等。数据包括股票报价、外汇汇率、商品价格等。 交易平台:交易所使用发布-订阅系统向交易参与者推送交易数据和市场信息。 2. 日志和监控系统 应用场景: 服务器日志:实时收集和分析分布式系统的日志数据。 系统监控:监控系统健康状态、资源使用等。 详细举例: Elasticsearch, Logstash, Kibana (ELK) Stack:使用发布-订阅模式在分布式系统中收集和分析日志数据。Logstash 作为数据收集器,可以接收来自不同源的日志并发布到 Elasticsearch,Kibana 订阅这些日志并进行可视化展示。 Prometheus 和 Grafana:Prometheus 收集系统和应用的监控数据,Grafana 订阅这些数据并生成实时的监控仪表板。 3. 社交媒体和消息通知 应用场景: 消息推送:向用户实时推送消息、通知。 活动流:发布和订阅用户活动、帖子、评论等。 详细举例: Facebook 和 Twitter 等社交媒体平台使用发布-订阅模式来处理用户的状态更新、评论、点赞等事件。用户可以订阅朋友或关注的人的动态。 即时通讯应用:如 Slack 和 WhatsApp,用户接收消息的过程是通过发布-订阅模式实现的,确保消息的即时性和可靠性。 4. 物联网(IoT) 应用场景: 设备状态监控:实时监控和控制物联网设备。 传感器数据收集:从传感器收集数据并实时处理。 详细举例: 智能家居:智能家居系统中的传感器(如温度、湿度传感器)使用发布-订阅模式将数据发送到中央控制系统,用户可以通过应用程序订阅这些数据并进行监控和控制。 工业物联网:在工业环境中,机器和设备通过发布-订阅模式报告运行状态和故障信息,管理系统订阅这些信息进行实时监控和预警。 5. 分布式系统和微服务架构 应用场景: 服务通信:微服务之间的通信和协调。 事件驱动架构:基于事件的系统设计。 详细举例: Apache Kafka:广泛用于构建分布式系统和微服务架构中的事件流处理。不同的微服务可以发布和订阅事件,确保系统的松耦合和高扩展性。 Amazon Web Services (AWS) SNS:Simple Notification Service (SNS) 是一个托管的发布-订阅服务,用于将消息从一个应用程序发送到多个订阅者,常用于触发基于事件的处理流程。 6. 在线游戏 应用场景: 游戏状态更新:实时同步游戏状态和玩家操作。 聊天系统:游戏内聊天消息的实时分发。 详细举例: 多人在线游戏(MMO):如《魔兽世界》,使用发布-订阅模式在服务器和客户端之间同步游戏状态和玩家操作。游戏服务器发布玩家动作和状态,其他玩家的客户端订阅这些消息以保持同步。 在线棋牌类游戏:如《炉石传说》,游戏服务器发布牌局状态和玩家操作,玩家客户端订阅这些消息以实时更新游戏界面。 7. 新闻和内容分发 应用场景: 新闻推送:实时向用户推送新闻和内容更新。 订阅服务:用户订阅特定主题或作者的内容更新。 详细举例: RSS Feeds:使用发布-订阅模式向用户提供新闻和博客更新。用户订阅感兴趣的内容源,新的内容发布时会自动推送到用户的阅读器。 内容聚合平台:如 Flipboard 和 Feedly,用户订阅不同的内容源,平台通过发布-订阅模式实时获取和推送内容更新。 这些实际应用展示了发布-订阅模式在各种领域中的广泛应用,利用这一模式可以实现系统的解耦、扩展和实时性。 四、管道模式实现 ZeroMQ 的管道模式通过简单而高效的PUSH和PULL套接字实现了负载均衡和任务分发,适用于分布式任务处理和数据处理流水线等场景。其自动负载均衡和高可扩展性使得它成为构建高效、可扩展分布式系统的理想选择。 基础原理介绍 ZeroMQ 的管道模式是一种特殊的消息传递模式,通常用于将任务或消息从一个生产者传递到多个消费者。其主要特性是: 消息一对多传递:单个生产者可以将消息发送到多个消费者。 负载均衡:消息在消费者之间进行负载均衡,以确保每个消费者都能接收到均衡的消息量。 下游消费者处理:消费者处理消息,并且可以将处理后的消息传递给下游消费者,形成消息处理链。 在 ZeroMQ 中,管道模式由PUSH和PULL套接字类型实现: PUSH 套接字:生产者使用 PUSH 套接字发送消息。消息按照负载均衡的方式发送到连接的 PULL 套接字。 PULL 套接字:消费者使用 PULL 套接字接收消息。每个 PULL 套接字接收到的消息数量大致相等。 内部原理 PUSH 套接字 特性:PUSH 套接字负责将消息发送给一个或多个连接的 PULL 套接字。 消息分发:当有多个 PULL 套接字连接到一个 PUSH 套接字时,PUSH 套接字会按照循环(round-robin)的方式将消息分发给每个 PULL 套接字。这意味着每个消息都会发送给下一个连接的 PULL 套接字,直到轮回到第一个 PULL 套接字。 PULL 套接字 特性:PULL 套接字负责接收从 PUSH 套接字发送来的消息。 消息接收:每个 PULL 套接字只会接收发送给它的消息,不会与其他 PULL 套接字共享消息。 实现负载均衡的方法 1.循环分发(Round-Robin Dispatching): PUSH 套接字在有多个 PULL 套接字连接时,按照循环分发的方式将消息逐个发送给每个 PULL 套接字。 这种方式确保了所有连接的 PULL 套接字能够接收到大致相同数量的消息,从而实现了负载均衡。 2.消息队列: 每个 PULL 套接字维护一个接收队列,PUSH 套接字发送的消息会被分发到这些接收队列中。 PULL 套接字从其接收队列中取出消息进行处理,避免了消息丢失或重复处理的情况。 Round-Robin 原理 Round-robin 是一种简单而常用的调度算法,广泛应用于任务调度、资源分配和网络通信中。在 round-robin 调度中,资源(如 CPU 时间、网络带宽或消息)按顺序分配给各个请求者,循环地进行,确保每个请求者都能公平地获得资源。 Round-Robin 在 ZeroMQ 的应用 在 ZeroMQ 中,PUSH 套接字使用 round-robin 算法将消息分发给多个连接的 PULL 套接字。具体工作原理如下: 1.初始化: 当一个 PUSH 套接字被创建并绑定到一个端口时,它开始监听来自 PULL 套接字的连接。 当 PULL 套接字连接到 PUSH 套接字时,PUSH 套接字维护一个内部队列,记录所有连接的 PULL 套接字。 2.消息分发: 每当 PUSH 套接字发送一条消息时,它会按照 round-robin 的顺序选择下一个 PULL 套接字,将消息发送给它。 这种分发方式确保每个连接的 PULL 套接字接收到的消息数量大致相同。 3.循环: 当所有的 PULL 套接字都接收过一次消息后,PUSH 套接字重新开始,从第一个 PULL 套接字继续发送消息。 Round-Robin 调度算法的优点 公平性:Round-robin 算法确保每个连接的 PULL 套接字都能公平地接收到消息,没有任何一个 PULL 套接字会被饿死。 简单性:该算法非常简单,不需要复杂的计算和维护,只需要一个循环计数器即可实现。 负载均衡:由于消息均匀地分布到各个 PULL 套接字,系统能够实现负载均衡,防止某个 PULL 套接字过载。 Round-robin算法: #include #define WORKER_COUNT 5#define TASK_COUNT 20 int main() { int workers[WORKER_COUNT] = {0}; // 存储每个工作者处理的任务数量 int tasks[TASK_COUNT]; // 初始化任务 for (int i = 0; i < TASK_COUNT; i++) { tasks[i] = i + 1; // 每个任务用一个整数表示 } // Round-Robin 分配任务 for (int i = 0; i < TASK_COUNT; i++) { int worker_id = i % WORKER_COUNT; // 使用模运算实现循环分配 workers[worker_id]++; printf("任务 %d 分配给工作者 %d\n", tasks[i], worker_id); } // 输出每个工作者处理的任务数量 for (int i = 0; i < WORKER_COUNT; i++) { printf("工作者 %d 处理了 %d 个任务\n", i, workers[i]); } return 0;} 主要用途 管道模式在以下场景中非常有用: 任务分发:将任务从生产者分发到多个工作者进行处理。例如,分布式计算中的任务调度。 负载均衡:在多个工作者之间平衡负载,确保每个工作者处理的任务量大致相同。 数据处理流水线:将数据流从一个阶段传递到下一个阶段,每个阶段由不同的消费者处理。 优势 简单且高效:管道模式通过简单的 PUSH 和 PULL 套接字实现高效的消息传递和负载均衡。 自动负载均衡:消息在多个消费者之间自动均衡分配,无需手动干预。 高可扩展性:可以轻松增加或减少消费者,调整系统的处理能力。 灵活性:可以形成复杂的消息处理流水线,适用于多种分布式任务处理场景。 实际场景 1.ELK Stack 中的 Logstash: 用途:集中收集和处理分布式系统的日志。 工作原理:Logstash 从不同来源接收日志数据,处理后推送到 Elasticsearch。通过管道模式,多个 Logstash 实例可以同时处理日志数据,并将处理后的数据均衡分发到 Elasticsearch 节点。 2.视频处理流水线: 用途:在视频直播或视频编辑场景中,视频帧需要经过多个处理阶段,如解码、特效处理、编码等。 工作原理:视频帧通过管道模式从一个处理单元推送到下一个处理单元。每个处理单元可以并行处理不同的视频帧,最终生成处理后的视频流。 3.分布式交易系统: 用途:在金融市场中,实现高频交易和低延迟的数据传输。 工作原理:交易指令和市场数据通过管道模式在交易系统的各个节点间传递,确保数据的快速分发和处理。每个交易节点可以并行处理接收到的数据,优化交易性能和响应速度。 与发布订阅的区别 发布-订阅模式 基础原理 参与者:发布者(Publisher)和订阅者(Subscriber)。 主题:消息通过主题(Topic)进行分类,订阅者可以选择感兴趣的主题。 消息分发:发布者将消息发布到特定主题,消息代理将消息分发给所有订阅了该主题的订阅者。 主要特点 多对多:一个发布者可以有多个订阅者,多个发布者也可以有多个订阅者。 去耦合:发布者和订阅者彼此之间是解耦的,通过消息代理进行通信。 灵活的订阅机制:订阅者可以动态订阅或取消订阅主题。 应用场景 通知系统:如新闻推送、股票行情更新。 事件驱动架构:系统组件之间通过事件进行松耦合通信。 示例 消息队列系统:如 Apache Kafka、RabbitMQ 等。 实时通知系统:如 Slack、Twitter 等。 管道模式 基础原理 参与者:推送者(PUSH)和拉取者(PULL)。 消息流动:消息从推送者流向拉取者。 负载均衡:消息在拉取者之间进行负载均衡,确保每个拉取者接收到的消息量大致相同。 主要特点 一对多:一个推送者可以有多个拉取者,但一个消息只能被一个拉取者处理。 负载均衡:自动将消息均衡地分发给多个拉取者。 顺序处理:消息按照推送的顺序被处理,适用于流水线处理。 应用场景 任务分发系统:将任务分发给多个工作者进行处理。 流水线处理:如视频处理、数据处理流水线。 示例 分布式任务调度系统:如高性能计算任务调度。 日志处理系统:如 ELK Stack 中的 Logstash。 主要区别 1.消息分发机制: 发布-订阅模式:一个消息可以被多个订阅者接收。消息通过主题进行分类,订阅者可以根据兴趣选择订阅特定主题。 管道模式:一个消息只能被一个拉取者接收。消息从推送者传递到拉取者,并在拉取者之间进行负载均衡。 2.使用场景: 发布-订阅模式:适用于通知系统、事件驱动架构等需要广播消息的场景。 管道模式:适用于任务分发、流水线处理等需要消息均衡处理的场景。 3.参与者关系: 发布-订阅模式:发布者和订阅者之间没有直接联系,通过消息代理实现解耦。 管道模式:推送者和拉取者之间有直接联系,消息直接从推送者传递到拉取者。 为什么会觉得相似 两者都是消息传递模式,解决的是分布式系统中的通信问题。它们的相似之处在于: 都是基于消息的异步通信:两者都通过消息实现参与者之间的异步通信,解耦发送方和接收方。 都可以实现多对多的消息传递:尽管实现方式不同,发布-订阅模式通过主题,管道模式通过多个拉取者,都可以实现消息的多对多传递。 然而,它们在消息分发的具体机制和应用场景上有显著区别,选择哪种模式取决于具体的应用需求。 任务发放C代码 #include #include #include #include #include #include #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量int task_count[WORKER_COUNT] = { 0 };HANDLE mutex;volatile int keep_running = 1; // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数DWORD WINAPI producer_thread(LPVOID arg) { void* context = zmq_ctx_new(); void* producer = zmq_socket(context, ZMQ_PUSH); zmq_bind(producer, "tcp://*:5557"); srand((unsigned int)time(NULL)); for (int i = 0; i < TASK_COUNT; ++i) { char message[10]; int workload = rand() % 100; // 随机生成任务负载 snprintf(message, sizeof(message), "%d", workload); zmq_send(producer, message, strlen(message), 0); printf("发送任务: %s\n", message); Sleep(10); // 模拟任务生成间隔 } zmq_close(producer); zmq_ctx_destroy(context); keep_running = 0; // 停止任务处理者线程 return 0;} // 任务处理者(消费者)函数DWORD WINAPI consumer_thread(LPVOID arg) { int worker_id = (int)arg; void* context = zmq_ctx_new(); void* consumer = zmq_socket(context, ZMQ_PULL); zmq_connect(consumer, "tcp://localhost:5557"); while (keep_running || zmq_recv(consumer, NULL, 0, ZMQ_DONTWAIT) != -1) { char message[10]; int size = zmq_recv(consumer, message, sizeof(message) - 1, ZMQ_DONTWAIT); if (size != -1) { message[size] = '\0'; int workload = atoi(message); printf("Worker %d 接收任务: %s\n", worker_id, message); Sleep(workload); // 模拟处理任务 // 更新任务计数器 WaitForSingleObject(mutex, INFINITE); task_count[worker_id]++; ReleaseMutex(mutex); } } zmq_close(consumer); zmq_ctx_destroy(context); return 0;} int main() { mutex = CreateMutex(NULL, FALSE, NULL); // 创建任务发布者线程 HANDLE producer = CreateThread(NULL, 0, producer_thread, NULL, 0, NULL); // 创建 5 个任务处理者线程 HANDLE consumers[WORKER_COUNT]; for (int i = 0; i < WORKER_COUNT; ++i) { consumers[i] = CreateThread(NULL, 0, consumer_thread, (LPVOID)i, 0, NULL); } // 等待任务发布者线程完成 WaitForSingleObject(producer, INFINITE); CloseHandle(producer); // 等待所有任务处理者线程完成 WaitForMultipleObjects(WORKER_COUNT, consumers, TRUE, INFINITE); for (int i = 0; i < WORKER_COUNT; ++i) { CloseHandle(consumers[i]); } printf("#########################################################################"); // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { printf("Worker %d 处理了 %d 个任务\n", i, task_count[i]); } CloseHandle(mutex); return 0;} 代码说明 1.全局变量: task_count[WORKER_COUNT]:用于存储每个消费者处理的任务数量。 mutex:用于保护对 task_count 数组的访问,确保线程安全。 volatile int keep_running:用于控制任务处理者线程的运行。 2.任务发布者线程: 创建一个 ZeroMQ 上下文和一个 PUSH 套接字。 绑定 PUSH 套接字到 TCP 端口 5557。 生成 1000个随机任务,并通过 PUSH 套接字发送这些任务。 在完成任务发送后,设置 keep_running 为 0,通知任务处理者线程可以停止。 3.任务处理者线程: 创建一个 ZeroMQ 上下文和一个 PULL 套接字。 连接 PULL 套接字到任务发布者的地址(TCP 端口 5557)。 进入无限循环,接收任务消息并模拟处理这些任务。 处理完任务后更新任务计数器,并打印当前处理者处理的任务总数。 当 keep_running 为 0 并且没有待处理的消息时,退出循环。 4.主程序: 创建一个任务发布者线程。 创建 5 个任务处理者线程。 等待任务发布者线程完成后关闭其句柄。 等待所有任务处理者线程完成后关闭各自的句柄。 输出每个处理者处理的任务个数。 该程序将启动一个任务发布者线程和 100个任务处理者线程,发布者每隔 10 毫秒发送一个任务,任务处理者实时接收并处理这些任务。每个任务处理者线程独立运行,并且可以同时处理不同的任务,从而实现了任务的并行处理和负载均衡。程序结束时,将输出每个处理者处理的任务个数,从而可以清晰地展示负载均衡的效果。 任务发放C++ #include #include #include #include #include #include #include #include #include #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量std::vector<int> task_count(WORKER_COUNT, 0);std::mutex mtx;std::atomic<bool> keep_running(true); // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数void producer_thread() { zmq::context_t context(1); zmq::socket_t producer(context, ZMQ_PUSH); producer.bind("tcp://*:5557"); srand(static_cast<unsigned int>(time(NULL))); for (int i = 0; i < TASK_COUNT; ++i) { int workload = rand() % 100; // 随机生成任务负载 std::string message = std::to_string(workload); zmq::message_t zmq_message(message.data(), message.size()); producer.send(zmq_message, zmq::send_flags::none); std::cout << "发送任务: " << message << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟任务生成间隔 } keep_running = false; // 停止任务处理者线程} // 任务处理者(消费者)函数void consumer_thread(int worker_id) { zmq::context_t context(1); zmq::socket_t consumer(context, ZMQ_PULL); consumer.connect("tcp://localhost:5557"); while (keep_running) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (result) { std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务 // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 短暂休眠以避免空转 } } // 当 keep_running 为 false 后,确保处理完所有剩余消息 while (true) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (!result) { break; // 无消息可接收时退出 } std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务 // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } }} int main() { // 创建任务发布者线程 std::thread producer(producer_thread); // 创建 20 个任务处理者线程 std::vector<std::thread> consumers; for (int i = 0; i < WORKER_COUNT; ++i) { consumers.emplace_back(consumer_thread, i); } // 等待任务发布者线程完成 producer.join(); // 等待所有任务处理者线程完成 for (auto& consumer : consumers) { consumer.join(); } std::cout << "#########################################################################" << std::endl; // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { std::cout << "Worker " << i << " 处理了 " << task_count[i] << " 个任务" << std::endl; } return 0;} 五、独占模式实现 独占PAIR模式是一种设计方法,旨在确保在并发环境中,两个线程或资源对在同一时间内独占地操作共享资源,从而避免资源竞争、数据不一致、死锁等问题。这种模式在高并发环境中显得尤为重要,特别是在需要高可靠性和稳定性的系统中,如金融交易系统、物流管理系统、在线购物平台等。 首先,本节通过一个使用Mutex实现的独占PAIR模式的示例,展示了如何通过互斥锁和条件变量来确保两个线程能够协同工作,共同对共享资源进行操作。这个示例简单而直观地解释了独占PAIR模式的基本概念和工作机制。 接下来,深入探讨了ZeroMQ在进程内的独占模式实现,特别是通过PAIR套接字模式,展示了如何在同一进程的不同线程之间建立高效、安全的双向通信通道。ZeroMQ的inproc传输方式为线程间的消息传递提供了一个高效、线程安全的解决方案,避免了传统线程同步机制的复杂性。 在实际应用方面,列举了多个场景,如金融交易系统中的交易撮合、物流系统中的任务分配、在线购物平台的订单处理、社交媒体平台的消息传递,以及微服务架构中的分布式锁管理。这些场景中的每一个都强调了独占PAIR模式的重要性,特别是在保证数据一致性和系统稳定性方面。 随后,提供了一个基于ZeroMQ的金融交易撮合系统的示例,通过C语言和C++语言的实现展示了如何利用ZeroMQ的消息队列和独占模式确保交易撮合的准确性和独占性。该示例不仅展示了基本的撮合操作,还引入了并发撮合、持久化订单数据和复杂的撮合逻辑(如部分成交和优先级撮合)等高级功能,进一步提高了系统的可扩展性和可靠性。 最后,总结了ZeroMQ独占模式的价值,特别是在高并发环境中的应用。通过限制资源的并发访问,ZeroMQ独占模式有效地避免了并发编程中的常见问题,如数据竞争和死锁,保证了系统在极端情况下的稳定性和一致性。ZeroMQ提供了多种套接字模式和线程同步机制,支持独占模式的实现,使得它在复杂的分布式系统中具有广泛的应用前景。 本节有力地展示了ZeroMQ在并发编程中的强大功能,并为如何在实际项目中应用独占PAIR模式提供了详细的指导。 基础介绍 Exclusive Pair Pattern,中文翻译为“独占PAIR模式”,是一种设计模式或技术,用于在并发编程或资源管理中确保两个资源、线程或实体在同一时间段内只能由唯一的一对(PAIR)对象使用或操作。这种模式通常用于避免资源竞争、死锁或不一致的数据状态。 关键概念: 独占性(Exclusivity): 在独占PAIR模式中,一个资源或对象在某个时刻只能被唯一的一对对象(PAIR)访问或操作,防止其他对象同时访问。 PAIR: 这里的PAIR指的是成对的两个对象或线程,它们一起对某个资源或任务进行操作,必须协同工作。在给定时间内,只有这一对对象可以对资源进行操作。 并发安全: 该模式旨在确保系统在并发情况下的安全性和稳定性,防止资源争用或数据不一致。 示例:使用Mutex实现独占PAIR模式 这个例子模拟了两个线程(Thread A和Thread B)成对地访问和操作共享资源。 #include #include #include // 共享资源int shared_resource = 0; // 互斥锁,用于保护共享资源pthread_mutex_t resource_mutex; // 条件变量,用于实现PAIR的协调pthread_cond_t pair_condition;int pair_ready = 0; // 操作共享资源的函数void* thread_function(void* arg) { int thread_id = *(int*)arg; // 加锁,确保独占访问资源 pthread_mutex_lock(&resource_mutex); // 等待另一个线程准备好(模拟PAIR模式) while (pair_ready == 0) { printf("Thread %d is waiting for its pair...\n", thread_id); pthread_cond_wait(&pair_condition, &resource_mutex); } // 执行资源操作 printf("Thread %d is working with its pair...\n", thread_id); shared_resource += thread_id; // 模拟操作 printf("Thread %d updated shared resource to %d\n", thread_id, shared_resource); // 解除PAIR状态 pair_ready = 0; pthread_cond_signal(&pair_condition); // 解锁 pthread_mutex_unlock(&resource_mutex); return NULL;} int main() { pthread_t thread_a, thread_b; int thread_id_a = 1; int thread_id_b = 2; // 初始化互斥锁和条件变量 pthread_mutex_init(&resource_mutex, NULL); pthread_cond_init(&pair_condition, NULL); // 创建线程 pthread_create(&thread_a, NULL, thread_function, &thread_id_a); pthread_create(&thread_b, NULL, thread_function, &thread_id_b); // 模拟PAIR的准备状态 pthread_mutex_lock(&resource_mutex); pair_ready = 1; pthread_cond_broadcast(&pair_condition); pthread_mutex_unlock(&resource_mutex); // 等待线程完成 pthread_join(thread_a, NULL); pthread_join(thread_b, NULL); // 销毁互斥锁和条件变量 pthread_mutex_destroy(&resource_mutex); pthread_cond_destroy(&pair_condition); return 0;} 代码解析 1.共享资源(shared_resource):这是线程操作的资源,在该例子中,它只是一个简单的整数。 2.互斥锁(pthread_mutex_t):用于保护共享资源,确保只有一对线程能够同时访问和操作资源。 3.条件变量(pthread_cond_t):用于线程之间的协调,确保在某个线程准备好之前,另一个线程必须等待。这模拟了PAIR之间的协同工作。 4.线程函数(thread_function): 线程首先锁定互斥锁,确保对共享资源的独占访问。 线程会检查PAIR是否准备好,如果没有准备好,它将等待(pthread_cond_wait),直到PAIR准备完毕。 一旦PAIR准备好,线程对共享资源进行操作。 操作完成后,解除PAIR状态,允许下一个PAIR操作资源。 5.主线程: 主线程创建了两个子线程(Thread A和Thread B)。 主线程通过设置pair_ready为1并使用pthread_cond_broadcast,模拟了PAIR的准备状态。 主线程等待子线程完成操作,然后清理互斥锁和条件变量。 ZeroMq 进程内独占 #include // 包含 ZeroMQ 库头文件,用于消息队列通信#include // 包含 Windows API 头文件,用于线程管理#include // 包含标准输入输出库头文件#include // 包含字符串操作库头文件#include // 包含 locale.h 头文件,用于设置区域语言环境 // 线程 1 的函数DWORD WINAPI thread1(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_bind(socket, "inproc://pair1"); // 绑定套接字到 "inproc://pair1" 地址 char buffer[256]; // 用于接收消息的缓冲区 while (1) { memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 2 的消息 printf("Thread 1 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 const char* msg = "Message from Thread 1"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 2,+1 包括结束符 '\0' } zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} // 线程 2 的函数DWORD WINAPI thread2(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_connect(socket, "inproc://pair1"); // 连接到 "inproc://pair1" 地址,链接线程 1 的套接字 char buffer[256]; // 用于接收消息的缓冲区 while (1) { const char* msg = "Message from Thread 2"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 1,+1 包括结束符 '\0' memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 1 的消息 printf("Thread 2 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 } zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} int main() { setlocale(LC_ALL, ""); // 设置区域语言环境 void* context = zmq_ctx_new(); // 创建一个新的 ZeroMQ 上下文,用于管理套接字和它们之间的通信 HANDLE t1, t2; // 定义两个线程句柄变量 t1 = CreateThread(NULL, 0, thread1, context, 0, NULL); // 创建线程 1,并启动执行 thread1 函数 t2 = CreateThread(NULL, 0, thread2, context, 0, NULL); // 创建线程 2,并启动执行 thread2 函数 WaitForSingleObject(t1, INFINITE); // 等待线程 1 结束(在这个例子中,线程实际上不会结束) WaitForSingleObject(t2, INFINITE); // 等待线程 2 结束(在这个例子中,线程实际上不会结束) zmq_ctx_destroy(context); // 销毁 ZeroMQ 上下文,清理资源 return 0;} 代码解析 1.头文件包含: zmq.h: 包含 ZeroMQ 库的函数和数据结构声明,用于跨线程或跨进程的消息队列通信。 windows.h: 包含 Windows API 函数的声明,用于线程管理、睡眠等操作。 stdio.h: 提供标准输入输出功能,如 printf。 string.h: 提供字符串操作功能,如 strlen 和 memcpy。 unistd.h: 用于包含 POSIX 标准库函数(如 sleep),但是在 Windows 下可以替换为 Sleep 函数。 2.线程 1 和 线程 2 函数: 两个线程分别创建一个 PAIR 套接字,线程 1 绑定到 inproc://pair1 地址,线程 2 连接到 inproc://pair1。 PAIR 套接字允许线程 1 和线程 2 之间进行双向通信。 在无限循环中,两个线程轮流发送和接收消息,并将接收到的消息打印到控制台。 3.主程序: 创建一个 ZeroMQ 上下文,用于管理套接字和它们之间的通信。 使用 Windows API CreateThread 创建两个线程,每个线程分别执行 thread1 和 thread2 函数。 使用 WaitForSingleObject 函数等待两个线程的完成。在本例中,由于线程处于无限循环中,程序实际上会一直运行。 inproc://pair1 的解释 inproc://pair1 是什么: inproc 是 ZeroMQ 的一种传输方式,用于在同一进程内的线程之间通信。inproc://pair1 是一个地址标识符,pair1 是这个地址的标记(可以任意命名)。 在 zmq_bind 函数中,线程 1 的 PAIR 套接字绑定到这个地址,使它成为服务器端。zmq_connect 函数则使线程 2 的 PAIR 套接字连接到这个地址,成为客户端。 能干什么: inproc://pair1 允许两个线程在同一个进程内通过 ZeroMQ 实现快速、高效的消息通信。这种通信是线程安全的,并且比使用其他跨进程传输协议(如 TCP 或 IPC)更加高效。 使用 inproc 可以避免线程间使用共享内存或复杂的同步机制(如信号量、互斥锁等)进行通信。 在本例中,inproc://pair1用于在线程 1 和线程 2 之间建立可靠的双向通信,利用 PAIR 套接字确保每条消息都被正确接收和发送。这使得线程可以方便地互相通信,而不需要管理复杂的同步和资源共享问题。 C++ 循环发消息 #include #include #include #include void thread_function(int id, zmq::context_t& context) { zmq::socket_t socket(context, ZMQ_PAIR); // 每个线程绑定到唯一的地址 std::string address = "inproc://pair" + std::to_string(id); socket.bind(address); // 等待所有线程连接后开始通信 std::this_thread::sleep_for(std::chrono::seconds(1)); for (int i = 0; i < 10; ++i) { // 接收来自其他线程的消息 zmq::message_t request; socket.recv(request, zmq::recv_flags::none); std::string recv_msg(static_cast<char*>(request.data()), request.size()); std::cout << "Thread " << id << " received: " << recv_msg << std::endl; // 发送消息给下一个线程 std::string next_address = "inproc://pair" + std::to_string((id + 1) % 10); zmq::socket_t next_socket(context, ZMQ_PAIR); next_socket.connect(next_address); std::string msg = "Hello from thread " + std::to_string(id); zmq::message_t reply(msg.size()); memcpy(reply.data(), msg.c_str(), msg.size()); next_socket.send(reply, zmq::send_flags::none); }} int main() { zmq::context_t context(1); std::vector<std::thread> threads; // 创建并启动 10 个线程 for (int i = 0; i < 10; ++i) { threads.push_back(std::thread(thread_function, i, std::ref(context))); } // 等待所有线程结束 for (auto& t : threads) { t.join(); } return 0;} 程序解释 1.头文件包含: iostream: 用于标准输入输出流操作。 thread: 用于创建和管理多线程。 vector: 用于存储和管理线程对象的动态数组。 zmq.hpp: ZeroMQ C++ API 的头文件,用于消息传递和套接字管理。 2.thread_function函数: 每个线程都创建了一个 ZMQ_PAIR 类型的套接字,并绑定到一个唯一的 inproc://pair 地址。 线程在开始通信前等待 1 秒,以确保所有线程都已经绑定到它们的地址。 每个线程循环 10 次,在每次循环中执行以下步骤:1.接收来自其他线程的消息,并将消息内容打印到控制台。2.生成一个要发送给下一个线程的消息,并将其发送到下一个线程的地址。3.下一个线程的地址通过 id 计算得出,确保消息按顺序传递到下一个线程。 3.main函数: 创建一个 zmq::context_t 对象来管理 ZeroMQ 的套接字和通信。 使用 std::thread 创建并启动 10 个线程,每个线程调用 thread_function,并传递它们的线程 ID 和上下文对象的引用。 主线程使用 join 等待所有子线程执行完毕,确保所有线程的生命周期在程序结束前都得到正确的管理。 程序能用在哪些业务上? 线程间通信:该程序展示了如何在同一进程内的多个线程之间使用 ZeroMQ 实现消息传递。类似的模式可以应用于任何需要线程间通信的场景,例如多线程计算任务的协调、负载平衡等。 消息路由:在一些业务场景中,多个线程或模块需要按照某种逻辑顺序传递消息,例如流水线处理、任务分配等。这种环状的通信模式可以确保消息在各个节点之间按照特定顺序流转。 分布式计算:尽管此示例仅在单进程内工作,但类似的模式可以扩展到跨进程甚至跨机器的场景。例如,在分布式系统中,多个节点之间的消息传递和协作可以使用类似的机制进行管理。 事件驱动系统:在事件驱动系统中,不同的事件处理模块可能需要依次处理消息。通过这种线程间的消息传递机制,可以确保事件按照指定的顺序被处理和传递。 应用场景 独占PAIR模式在以下场景中特别有用: 线程同步: 在多线程环境中,多个线程可能需要成对地对共享资源进行操作。例如,读写锁(read-write lock)可以看作是独占PAIR模式的一种应用,只有读锁和写锁之间能够成对操作共享资源。 数据库事务: 当两个数据库连接(PAIR)需要并行工作以完成某个复杂的事务时,独占PAIR模式可以确保在事务完成之前,其他连接无法访问相关的数据或资源,从而保持数据的一致性。 双向通信协议: 在一些双向通信协议中,发送方和接收方作为PAIR进行操作,独占资源,如信号通道或数据包,在操作完成前,其他通信不能干涉。 设备驱动程序: 在硬件设备的驱动程序中,控制信号和数据传输通常成对进行操作,独占设备的访问权,以确保数据传输的完整性。 分布式系统: 在分布式系统中,不同节点之间可能需要成对操作某个共享的资源,独占PAIR模式确保在某对操作完成之前,其他节点无法进行冲突性的操作。 具体场景 1. 金融交易系统中的交易撮合 应用场景: 在金融交易系统中,买方和卖方的订单需要进行撮合。每一笔交易撮合都需要独占地访问交易引擎的某个部分资源,以确保交易的正确性和一致性。 具体应用: 当一个买方订单和卖方订单撮合成功时,这两个订单的状态需要同时更新为“已完成”。 在订单撮合过程中,独占PAIR模式确保只有这对买卖订单在这一时刻可以操作这个交易引擎的资源,防止其他订单干扰。 这种模式避免了在高频交易情况下,由于并发导致的订单状态不一致或重复撮合问题。 2. 物流系统中的运输任务分配 应用场景: 在物流系统中,运输任务的分配需要同时考虑配送任务和可用的运输资源(如车辆、司机)。为了避免同一辆车被分配到多个不同的运输任务,系统需要确保任务和资源的独占性匹配。 具体应用: 当一个运输任务(如从仓库A到地点B的货物运输)需要分配给某辆车时,系统会使用独占PAIR模式,将该任务与该车的资源配对。 该车辆和任务配对后,系统锁定这一对资源,防止其他任务占用这辆车。 任务完成后,车辆资源释放,再进行下一次任务分配。 这样避免了由于资源竞争导致的任务分配混乱或资源浪费。 3. 在线购物平台的订单处理 应用场景: 在在线购物平台中,多个用户可能会同时购买同一件商品。在库存有限的情况下,系统需要保证每次下单操作能独占处理对应的库存数据,防止出现超卖的情况。 具体应用: 每当一个用户发起购买请求时,系统会为用户订单和库存数据配对,确保这对资源(订单和库存)在处理过程中不被其他订单干扰。 在确认支付和扣减库存的操作中,独占PAIR模式确保其他订单不能同时操作相同的库存数据,防止多名用户同时购买超出库存量的商品。 一旦订单处理完成,库存数据的锁定状态解除,其他用户的订单可以继续操作。 这种方式有效避免了超卖问题,确保用户体验和库存管理的准确性。 4. 社交媒体平台的消息传递 应用场景: 在社交媒体平台上,用户之间的私密消息需要确保传输过程的私密性和准确性,特别是当消息可能同时发送给多名接收者时。 具体应用: 当用户A向用户B发送消息时,系统会确保这条消息和用户B的接收信箱形成独占的配对关系。 在这个过程中,系统确保用户B的信箱不会同时接受来自其他来源的同一消息,防止重复发送或接收错误。 一旦消息传输成功,配对关系解除,用户B可以接收其他新的消息。 独占PAIR模式确保了消息传递的完整性和准确性,避免了多线程环境下的消息混乱或丢失。 5. 分布式锁在微服务中的使用 应用场景:在微服务架构中,多个服务实例可能同时尝试操作同一资源,如更新数据库中的某一条记录。为了防止并发更新导致的数据不一致问题,分布式锁通常被用来控制资源的独占访问。 金融交易系统中的交易撮合 ZEROMQ C语言 这个示例将展示如何使用ZeroMQ消息队列进行买卖订单的撮合,同时确保交易撮合的独占性。 这个示例包含两个主要部分: 订单生成器:生成买卖订单,并通过ZeroMQ推送到交易撮合引擎。 交易撮合引擎:接收订单并进行撮合处理,确保每一对订单的独占性。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量和价格typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price;} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price && buy_order->quantity == sell_order->quantity) { // 如果匹配成功,打印匹配信息 printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, buy_order->quantity, buy_order->price); } else { // 如果不匹配,打印未匹配的信息 printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f\n", order.order_id, order.side, order.quantity, order.price); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; match_orders(&buy_order, &sell_order); has_buy_order = 0; // 重置标记,表示买单已撮合 } else { // 如果订单无法撮合,打印提示信息 printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} // 主程序入口int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建订单撮合线程 matcher_thread = CreateThread(NULL, 0, order_matcher, context, 0, NULL); // 等待线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); WaitForSingleObject(matcher_thread, INFINITE); // 关闭线程句柄 CloseHandle(generator_thread); CloseHandle(matcher_thread); // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} 代码解析 1.订单生成器(order_generator): 使用ZeroMQ PUSH套接字将生成的订单推送到交易撮合引擎。 每个订单都包含一个唯一的订单ID、买卖方向、数量和价格。 每隔一定时间生成一个新的订单,模拟真实的交易环境。 2.交易撮合引擎(order_matcher): 使用ZeroMQ PULL套接字接收订单。 通过互斥锁(pthread_mutex_t)实现独占PAIR模式,确保每次撮合操作的并发安全性。 如果当前有买单并收到卖单,尝试撮合。如果买单价格大于或等于卖单价格且数量相等,则匹配成功。 匹配后,释放互斥锁,允许下一个订单对继续撮合。 3.主程序: 创建并启动订单生成器和交易撮合引擎线程。 主线程等待子线程完成。 扩展与优化 并发撮合:可以使用多个撮合引擎线程,以处理更高并发的订单流。 持久化订单数据:在实际系统中,订单信息和撮合结果通常需要持久化到数据库。 复杂撮合逻辑:实际的交易撮合可能包括部分成交、不同优先级的订单撮合等,可以根据需求进一步扩展。 在现有的基础上,可以进行以下扩展与优化,以实现更高效、更复杂的交易撮合系统。我们将: 并发撮合:增加多个撮合引擎线程,以处理更高并发的订单流。 持久化订单数据:模拟将订单信息和撮合结果持久化到数据库。 复杂撮合逻辑:引入部分成交和不同优先级的订单撮合逻辑。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread[4]; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); } // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); } // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); } // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} 扩展与优化解释 1.并发撮合: 增加了4个并发的撮合线程,以提高系统的吞吐量和处理能力。 每个撮合线程独立处理接收到的订单,并通过互斥锁保证撮合操作的独占性。 2.持久化订单数据: 在订单撮合完成后,系统会模拟将撮合结果“持久化”到数据库。这一部分目前通过打印日志来模拟。 实际系统中,可以将这些数据写入数据库(例如使用SQLite、MySQL等)。 3.复杂撮合逻辑: 引入了部分成交:如果买单和卖单的数量不一致,系统会撮合其中较小的部分,并保留未撮合的部分。 引入了优先级撮合:每个订单都有一个优先级(1到10之间)。当有新订单到达时,,如果其优先级高于当前待撮合订单,会优先处理高优先级订单。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread[4]; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); } // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); } // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); } // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} ZEROMQ C++语言 #include #include #include #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级struct Order { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)}; // 全局互斥锁,用于确保撮合操作的独占性std::mutex match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order& buy_order, Order& sell_order) { // 检查买单和卖单是否匹配 if (buy_order.price >= sell_order.price) { int matched_quantity = std::min(buy_order.quantity, sell_order.quantity); std::cout << "Matched Order ID: " << buy_order.order_id << " (Buy) with Order ID: " << sell_order.order_id << " (Sell) - Quantity: " << matched_quantity << " at Price: " << buy_order.price << std::endl; // 模拟持久化撮合结果 std::cout << "Persisting Match: Buy Order " << buy_order.order_id << ", Sell Order " << sell_order.order_id << ", Quantity " << matched_quantity << ", Price " << buy_order.price << std::endl; // 更新剩余数量 buy_order.quantity -= matched_quantity; sell_order.quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order.quantity > 0) { std::cout << "Remaining Buy Order ID: " << buy_order.order_id << ", Quantity: " << buy_order.quantity << std::endl; } if (sell_order.quantity > 0) { std::cout << "Remaining Sell Order ID: " << sell_order.order_id << ", Quantity: " << sell_order.quantity << std::endl; } } else { std::cout << "Order ID: " << buy_order.order_id << " (Buy) and Order ID: " << sell_order.order_id << " (Sell) not matched due to price." << std::endl; }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎void order_generator(zmq::context_t* context) { try { // 创建ZeroMQ PUSH套接字用于发送订单 zmq::socket_t socket(*context, zmq::socket_type::push); socket.connect("tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand(static_cast<unsigned int>(time(NULL))); // 初始化随机数种子 while (true) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = static_cast<float>(rand() % 1000) / 10.0f; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 socket.send(zmq::buffer(&order, sizeof(Order)), zmq::send_flags::none); std::cout << "Generated Order ID: " << order.order_id << ", Side: " << order.side << ", Quantity: " << order.quantity << ", Price: " << order.price << ", Priority: " << order.priority << std::endl; Sleep(100); // 模拟订单生成频率,休眠100毫秒 } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_generator: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_generator: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_generator" << std::endl; }} // 订单撮合线程函数,接收订单并进行撮合void order_matcher(zmq::context_t* context) { try { // 创建ZeroMQ PULL套接字用于接收订单 zmq::socket_t socket(*context, zmq::socket_type::pull); socket.bind("tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 bool has_buy_order = false; // 标记是否有待撮合的买单 while (true) { Order order; // 从ZeroMQ套接字接收订单 socket.recv(zmq::buffer(&order, sizeof(Order)), zmq::recv_flags::none); // 锁定互斥锁,确保撮合操作的独占性 std::lock_guard<std::mutex> lock(match_mutex); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = true; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { std::cout << "New Sell Order ID: " << sell_order.order_id << " has higher priority than Buy Order ID: " << buy_order.order_id << std::endl; has_buy_order = false; // 放弃当前订单,等待新撮合 } else { match_orders(buy_order, sell_order); has_buy_order = (buy_order.quantity > 0); // 检查买单是否还有剩余 } } else { std::cout << "Order " << order.order_id << " queued for future matching." << std::endl; } } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_matcher: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_matcher: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_matcher" << std::endl; }} int main() { try { // 创建ZeroMQ上下文 zmq::context_t context(1); // 定义线程容器 std::vector<std::thread> matcher_threads; // 创建订单生成器线程 std::thread generator_thread(order_generator, &context); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; ++i) { matcher_threads.emplace_back(order_matcher, &context); } // 等待订单生成器线程执行完毕 generator_thread.join(); // 等待所有撮合线程执行完毕 for (auto& thread : matcher_threads) { thread.join(); } } catch (const std::exception& e) { std::cerr << "Exception in main: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in main" << std::endl; } return 0;} 主要改动与解释 1.使用C++特性: #include代替了 #include ,使用 std::cout 和 std::endl 进行输出。 使用了 std::thread 来代替 Windows 的 CreateThread。 使用了 std::vector容器来管理多个撮合线程,避免手动管理线程句柄。 2.ZeroMQ C++ API: C++代码使用了 zmq.hpp,它是ZeroMQ的C++绑定库,比纯C语言API更具可读性和便利性。 zmq::context_t 和 zmq::socket_t 代替了C语言中的 zmq_ctx_new() 和 zmq_socket()。 3.线程管理: 使用 std::thread 来创建并管理线程,thread.join() 替代了 WaitForSingleObject。 std::vector 容器被用来存储多个撮合线程,这样代码更加整洁和易于扩展。 4.使用标准库: 使用 std::min 来替代手动的 if-else 判断更简洁地获取最小值。 小结:ZeroMQ的独占模式(Exclusive Pair Pattern)是一种设计模式,旨在确保两个通信对端(PAIR)在同一时间段内独占地访问共享资源,防止其他对端干扰。它通过限制并发访问,确保资源的安全性和一致性。在高并发场景中,独占模式有助于避免数据竞争、死锁和不一致性问题。ZeroMQ通过多种套接字模式(如PAIR、REQ/REP)和线程同步机制,支持独占模式的实现,保证消息在严格的单对单通信通道中传递,从而提高系统的稳定性和可靠性。

    11-22 90浏览
  • 为什么MySQL数据库要用b+树?

    一、数据结构 数据结构大致可以分为两种 —— 线性结构 和 非线性结构。 1. 线性结构 线性结构是数据结构中的一种基本结构,它的特点是数据元素之间存在一对一的前后关系。线性结构中的数据元素排列成一个线性序列,可以用顺序存储或链式存储来实现。 常见的线性结构有以下几种: 数组:连续存储的一组相同类型的元素,可以通过索引直接访问。 链表:由节点组成的集合,每个节点包含数据和指向下一个节点的指针。 栈:具有先进后出(LIFO)特性的线性表,只能在栈顶进行插入和删除操作。 队列:具有先进先出(FIFO)特性的线性表,只能在队尾插入,在队头删除。 哈希表:使用哈希函数将键映射到存储位置,并通过解决冲突处理碰撞问题。 2. 非线性结构 非线性结构是数据结构中的另一种类型,与线性结构不同,它的数据元素之间并不是简单的前后关系。 常见的非线性结构有以下几种: 树:由节点和边组成的层次结构,每个节点可以有多个子节点。 图:由节点和边组成的集合,节点之间可以存在多种关系,如有向图和无向图。 堆:一种特殊的树形结构,通常用于实现优先队列,具有父节点小于(或大于)子节点的特性。 散列表:基于哈希函数将键映射到存储位置,并通过解决冲突处理碰撞问题。 图表:由多个表格连接而成,用于表示复杂的关系和数据依赖。 还有例如跳表之类的其他的数据结构,也都是从基础数据结构演化出来的,用来解决指定的场景问题。 二、索引的数据结构 我们先把记忆中的 Mysql的索引是使用B+树做的,因为B+树有 xxx 的优点 抹去,没有人在开发的时候就能直接想到完美的解决方案,所以我们也来推导一下索引的数据结构。 1. 索引的作用 索引是用来做什么的? 索引在数据库和数据结构中起到了重要的作用,它能够提高数据的查询效率和访问速度。以下是索引的几个主要作用: 加快数据检索:索引可以按照特定的规则对数据进行排序和组织,从而加快数据的查找速度。通过使用合适的索引,可以避免全表扫描,减少查询所需的时间复杂度。 提高查询性能:当数据库中有大量记录时,使用索引可以显著提升查询性能。索引将数据按照特定列进行排序和分组,使得数据库系统只需搜索一部分数据而不是全部数据。 优化排序和分组操作:索引可以帮助数据库系统快速地进行排序和分组操作。如果在执行SQL语句时需要对结果进行排序或者分组,使用合适的索引可以节省大量计算时间。 约束唯一性:通过在某些字段上创建唯一索引,可以保证该字段值的唯一性,防止重复插入相同值的情况发生。 改善连接操作:当多个表之间需要进行连接操作时,在关联列上创建合适的索引能够极大地提高连接操作的效率。 尽管索引能够提高查询效率,但也会增加存储空间和更新数据的成本。 索引存储在哪里? 索引存储在数据库管理系统的内存和磁盘中。具体来说,有以下几种常见的索引存储方式: 内存中的索引:为了提高查询效率,数据库管理系统通常会将常用的索引数据加载到内存中进行操作。这样可以避免频繁的磁盘访问,加快查询速度。 磁盘上的索引:当内存无法容纳全部索引数据时,数据库管理系统会将部分或全部索引数据存储在磁盘上。通常使用B+树等数据结构来组织和管理索引数据,以支持高效地查找、插入和删除操作。 辅助文件:一些数据库系统会将较大或者不常用的索引存储在单独的文件中,而不是放在主要的数据文件中。这样可以降低主要数据文件的大小,减少IO开销,并且更灵活地管理索引。 我们都说数据持久化数据持久化,其实就是把内存里的数据转移到硬盘上,这样即便是设备断电了,数据也不会受到影响。但是有利必有弊,数据存储在硬盘上带来的后果就是读取的速度变慢。又是变慢,能变多慢呢?内存是纳秒级的处理速度,硬盘是毫秒级的处理速度,二者相差百万倍,这就是速度的差异。所以我们实际使用索引的时候,会把索引从硬盘中读到内存里,然后通过内存里的索引,从硬盘中找到数据。 但是这样优化了又如何呢,只要需要读硬盘,那就会消耗时间,硬盘IO越多,时间消耗越多。 除此之外,我们使用索引不只是为了能够迅速找到某一个数据,而是能够迅速找到某一个范围区间的数据,能够动态的执行有关数据的操作。 那么在上述的描述下,索引能够使用的数据结构就有这么几个 —— 哈希表、跳表、树。 2. 哈希表 哈希表:也叫做散列表。是根据关键字和值(Key-Value)直接进行访问的数据结构。也就是说,它通过关键字 key 和一个映射函数 Hash(key) 计算出对应的值 value,然后把键值对映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做哈希函数(散列函数),用于存放记录的数组叫做 哈希表(散列表)。哈希表的关键思想是使用哈希函数,将键 key 和值 value 映射到对应表的某个区块中。可以将算法思想分为两个部分: 向哈希表中插入一个关键字:哈希函数决定该关键字的对应值应该存放到表中的哪个区块,并将对应值存放到该区块中 在哈希表中搜索一个关键字:使用相同的哈希函数从哈希表中查找对应的区块,并在特定的区块搜索该关键字对应的值 哈希表的原理示例图如下所示: 哈希表的精确查询时间复杂度是 O(1) ,为什么呢?因为计算目标 key 的 hash 值,然后直接对应到数组的下标,这个过程大大的减少了查询所需要的时间。在产生了 hash 碰撞的时候,也会使用链表和红黑树的方式加快碰撞情况下,查询目标值的速度。 这样的话,我们索引的数据结构完全可以采用哈希表的形式来做,效率非常高。但是为什么不这么做呢? 如果使用哈希表来当作索引的数据结构,在进行范围查询的时候需要全部扫描,这是一笔不菲的代价。 3. 跳表 跳表(Skip List)是一种用于实现有序数据结构的数据结构,它在链表的基础上通过添加多级索引来加速搜索操作。 跳表由William Pugh在1989年提出,其设计灵感来自于平衡二叉树。相比于传统的链表,在查找元素时,跳表可以通过使用多级索引进行快速定位,并且具备较高的插入和删除效率。 跳表中的每个节点包含一个值和若干个指向下一层节点的指针。最底层是原始链表,每个节点按照值从小到大排列;而上方的各级索引则以不同步长稀疏地链接部分节点。这样,在搜索时可以先沿着最顶层索引开始查找,逐渐向下层细化范围,直到找到目标节点或者确定目标不存在。 跳表的时间复杂度为 O(log n),其中 n 是元素数量。它相对简单、易于实现,并且支持高效的插入、删除和搜索操作。因此,在某些场景下,跳表可以作为替代平衡二叉树等数据结构的选择。 如图所示,跳表就是在链表的基础上加了索引层,这样就能够实现区间查询的效果。比如我们要查找key = 5,那就先遍历索引层,遍历到3,然后发现下一个索引是6,那么直接从索引层的3往下进入链表,在往后走2步就到key = 5了。 如果数据量非常非常大呢?(图方便这里用 excel 绘制,美观上会差一些) 这样是不是就能发现跳表的好处了,用多个索引划分链表,从高级索引定位到更低级的索引,直到定位到链表中。效率看起来也很高。 但是这还是存在一个问题,我读取完三级索引到内存,然后我还要硬盘IO去读取二级索引,然后还要读取一级索引。还是在硬盘的IO上费了太多的操作。跳表的数据越多,索引层越高,读取索引带来的硬盘IO次数越多,性能降低,这又违背了一开始使用索引的理念。 4. 树 树结构的特性决定了遍历数据本身就支持按区间查询。再加上树是非线性结构的优势相比于线性结构的数组,不必像数组的数据是连续存放的。那么当树结构在插入新数据时就不用像数组插入数据前时,需要将数据所在往后的所有数据节点都得往后挪动的开销。所以树结构更适合插入更新等动态操作的数据结构。 需要C/C++ Linux服务器架构师学习资料加qun579733396获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享 三、树 实现索引使用的数据结构看来是要使用树结构了,常用的树都有哪些呢?二叉树、二叉查找树、平衡二叉查找树、红黑树、B树。 二叉树 二叉树是n(n>=0)个结点的有限集合。当n=0时为空树,当n不为0时,二叉树有以下特点:1.每个结点的度不超过2(可以理解为二孩政策下的结点最多只能有两个孩子); 每个结点的左子树和右子树顺序不能颠倒,所以二叉树是有序树。 特殊二叉树 满二叉树:每一层结点数都达到最大,那么它就是满二叉树。如第1层最多有2 ^0个结点,第2层最多有 2 ^1个结点,则第k层最多有2 ^(k-1)个结点,假设这棵满二叉树有k层,那么它总共有2 ^0+2 ^1+……+2 ^(k-1) = 2 ^k-1个结点。 完全二叉树:深度为k,有n个结点的二叉树当且仅当其每一个结点都与深度为k的满二叉树中编号从1至n的结点一一对应时,称为完全二叉树。(简介版:完全二叉树的前k-1层是满二叉树,最后一层从左到右依次连续) 二叉查找树 二叉查找树可以理解为融合了二分查找的二叉树。二分查找大家都熟悉吧,时间复杂度 O(logN) ,比直接遍历的线性查找快的多,但是需要数组是有序的。 所以说,二叉查找树不同于普通的二叉树,二叉查找树是将小于根节点的元素放在左子树,将大于根节点的元素放在右子树。其实就是从某种含义了实现了二分查找的先决条件 —— 数值有序。 但是二叉树是存在弊端的,如果我们每次都插入一个更小的数或者更大的数,那么二叉树就会在一个方向上无限延长,退化成了链表。那链表的时间复杂度是 O(N),而且又加大了硬盘的IO操作。所以这种结构还是不太行。 平衡二叉查找树 面说一直放更小或者更大的数,让他不断延长,变成一个极端的“很高很瘦”的数,那么用平衡二叉查找树就能解决这个问题。 平衡二叉查找树的关键是 平衡 ,指的是每个节点的左右子树高度差不能超过 1。这样左右子树都能平衡,时间复杂度为 O(logN) 。 无论是二叉树还是二叉查找树还是平衡二叉查找树还是红黑树,他最终都存在一个问题 —— 每个节点只能有 2 个子树。这意味着只要数据量足够大,它总会变成一个深度非常大的树。深度越大,硬盘IO次数越多,性能效率越低,这又双叒叕与索引的初衷背道而驰。 红黑树 与AVL树相比,红黑树并不追求严格的平衡,而是大致的平衡:只是确保从根到叶子的最长的可能路径不多于最短的可能路径的两倍长。从实现来看,红黑树最大的特点是每个节点都属于两种颜色(红色或黑色)之一,且节点颜色的划分需要满足特定的规则。 与AVL树相比,红黑树的查询效率会有所下降,这是因为树的平衡性变差,高度更高。但红黑树的删除效率大大提高了,因为红黑树同时引入了颜色,当插入或删除数据时,只需要进行O(1)次数的旋转以及变色就能保证基本的平衡,不需要像AVL树进行O(lgn)次数的旋转。总的来说,红黑树的统计性能高于AVL。 因此,在实际应用中,AVL树的使用相对较少,而红黑树的使用非常广泛。 对于数据在内存中的情况(如上述的TreeMap和HashMap),红黑树的表现是非常优异的。但是对于数据在磁盘等辅助存储设备中的情况(如MySQL等数据库),红黑树并不擅长,因为红黑树长得还是太高了。当数据在磁盘中时,磁盘IO会成为最大的性能瓶颈,设计的目标应该是尽量减少IO次数;而树的高度越高,增删改查所需要的IO次数也越多,会严重影响性能。 B树 新的数据结构的产生肯定是为了解决之前繁琐的问题。在树的深度不断变大的情况下,B树就应运而生了。 B树的出现解决了树高度的问题,从名字上也能看出来,它不叫B二叉树而是直接叫B树,因为它摆脱了二叉这个概念。它不再限制一个父节点中只能有两个子节点,而是允许拥有 M 个子节点(M > 2)。不仅如此,B树的一个节点可以存储多个元素,相比较于前面的那些二叉树数据结构又将整体的树高度降低了。那么B树实际上就是多叉树。 图中每一个节点叫做 页,是Mysql数据读取的基本单位,也就是上面的磁盘块。其中的 P 是指向子节点的指针。 当数据量足够大的时候,使用平衡二叉查找树则会不断纵向扩展子节点,让整个树变得更高。而B树可以横向扩展子节点,变得更胖,但是树的高度不高,硬盘IO的次数更少。 综上所述,B树已经非常适合用来给Mysql做索引的数据结构了。那么为什么还要去使用B+树呢?实际上B树存在一个缺点,虽然B树实现了区间查找,但是B树的去检查找是基于中序遍历来做的,中序遍历的算法题大家应该都做过,需要来回切换父子节点,切换父子节点在这里就意味着硬盘不断的IO操作,这显然也是不好的。 B+树 B+树其实就是B树的升级版。MySQL 中innoDB引擎中的索引底层数据结构采用的正是B+树。 B+树相对于树做了这些方面的改动:B+树中的非叶子节点只作为索引,不存储数据。转而由叶子节点存放整棵树的所有数据。叶子节点之间再构成一个从小到大的有序的链表并互相指向相邻的叶子节点,也就是在叶子节点之间形成了有序的双向链表。 画图画的不是很清楚,忘记展示双向链表的特点,最下面的箭头指的是相邻的两个叶子是双向的,所有的叶子节点构成双向链表。 再来看B+树的插入和删除,B+树做了大量冗余节点,从上面可以发现父节点的所有元素都会在子节点中出现,这样当删除一个节点时,可以直接从叶子节点中删除,这样效率更快。 相邻的两个叶子是双向的,所有的叶子节点构成双向链表。 再来看B+树的插入和删除,B+树做了大量冗余节点,从上面可以发现父节点的所有元素都会在子节点中出现,这样当删除一个节点时,可以直接从叶子节点中删除,这样效率更快。 B树相比于B+树,B树没有冗余节点,删除节点时会发生复杂的树变形,而B+树有冗余节点,不会涉及到复杂的树变形。而且B+树的插入也是如此,最多只涉及树的一条分支路径。B+树也不用更多复杂算法,可以类似红黑树的旋转去自动平衡。 总结: mysql选择B+树的原因在于其独特的优势: 良好的平衡性:B+树是一种自平衡的树结构,不论是在插入、删除还是查询操作中,它都能保持相对较好的平衡状态。这使得B+树能够快速定位到目标数据,提高查询效率。 顺序访问性:B+树的所有叶子节点是按照索引键的顺序排序的。这使得范围查询和顺序访问非常高效,因为相邻的数据通常在物理上也是相邻存储的,可以利用磁盘预读提高IO效率。 存储效率:B+树在内存中的节点大小通常比其他树结构更大,这样可以减少磁盘I/O操作的次数。同时,B+树的非叶子节点只存储索引列的值,而不包含实际数据,这进一步减小了索引的尺寸。 支持高并发:B+树的特性使得它能够支持高并发的读写操作。通过使用合适的锁或事务隔离级别,多个并发查询和更新操作可以同时进行而不会出现严重的阻塞或冲突。 易于扩展和维护:B+树的结构相对简单,可以较容易地进行扩展和维护。当插入或删除数据时,B+树只需要调整路径上的少数节点,而不需要整颗树的重构。这样能够有效降低维护成本,并保证索引的高性能。

    09-13 151浏览
  • 基于MSP430的空间定向测试仪设计要点

    0 引言 空间定向测试仪是一种应用非常广泛的电子测量仪器,尤其是伴随着微电子技术的发展,空间定向测试仪在车辆、舰船、飞行器等导航领域中的应用日趋成熟。本文所研究的空间定向测试技术主要是以MSP430单片机为基...

    09-04 96浏览
  • 如何提高计算机的处理器管理效率

    所谓软件是指为方便使用计算机和提高使用效率而组织的程序以及用于开发、使用和维护的有关文档。软件系统可分为系统软件和应用软件两大类。 一、系统软件系统软件System software,由一组控制计算机系统并管理其资...

    08-22 159浏览
  • 了解C语言跨平台开发库TBOX吗?介绍、特性一次说清楚

    1 TBOX简介 TBOX针对各个平台,封装了统一的接口,简化了通用开发过程中常用的操作,使你在开发过程中,更加关注实际应用的开发,而不是把时间浪费在琐碎的接口兼容性上面,并且充分利用了各个平台突出的一些特性进行优化。这个项目的目的,是为了高效使C开发更加简单。目前支持的平台有:Windows, Macosx, Linux, Android, iOS, *BSD等等。通过xmake支持多种编译模式: 发布:正式版编译,禁止调试信息、断言,各种检测机制,启用编译器优化 Debug:调试模式,默认实现详细调试信息、断言、内存越界检测、内存泄漏、锁竞争分析等检测机制 Small:最小化编译,取消默认所有扩展模块,实现编译器最小化优化 Micro:针对嵌入式平台,只需编译tbox微内核,仅提供最基础的跨平台接口,生成库仅64K左右(内置轻量libc接口实现) 2 特性 流庫 针对http、file、socket、data等流数据,实现统一接口进行读写,并且支持:阻塞、非阻塞、异步清晰读写模式。支持中间增加多层filter流进行流过滤,实现边读取,内部边进行解压、编码转换、加密等操作,极大地减少了内存使用。主要提供以下模块: Stream:通用非阻塞流,用于一般的单独io处理,同时支持协程以实现异步传输。 传输:流传输器,维护两路流的传输。 static_stream:针对静态数据缓冲区优化的静态流,用于轻量快速的数据解析。 协程库 快速高效的协程切换支持 提供跨平台支持,核心切换算法参考boost,并且由此进行重写和优化,目前支持架构:x86、x86_64、arm、arm64、mips32 提供渠道协程间数据通信支持,基于生产、消费者模型 提供信号量、协程锁支持 socket、stream都模块默认支持协程,并且可以在线程和协程间进行无缝切换 提供http、file等基于协程的简单服务器实例,只需几个行代码,就可以从socket开始写个高性能io服务器,代码逻辑比异步回调模式更清晰 同时提供stackfull、stackless两种协程模式支持,stackless协程更轻量(每个协程只占用几十个字节),切换更快捷(会占用部分射击性) 支持epoll、kqueue、poll、select和IOCP 在协程和poller中支持同时等待和调度socket,pipe io和process 資料 统一并简化数据库操作接口,配备各种数据源,通过统一的url来自动连接打开支持的数据库,数据枚举的采用迭代器模型。 目前支持sqlite3以及mysql两种关系型数据库,也可以自定义扩展使用其他关系型数据库。 xml库 针对xml提供DOM和SAX两种解析模式,SAX方式采用外部迭代模式,灵活和性能更高,并且可以选择路径指定,进行解析。 解析过程基于流,所以是高度流化的,可以实现边下载、边解压、完全边转码、边解析一条龙服务,利用较低的内存也可以解析大规模数据。 提供 xml writer 支持对 xml 生成 内存库 参考linux内核内存池管理机制的实现,并对其进行了各种改造和优化,所实现的TBOX突出了整套内存池管理架构。 调试模式下,可以轻松检测并定位内存丢失、内存越界溢出、内存重叠覆盖等常见内存问题,对磁盘整体内存的使用情况进行了统计和简要分析。 针对大块数据、小块数据、字符串数据进行了充分的利用,避免了大量外部碎片和内部碎片的产生。分配操作进行了各种优化,96%的情况下,效率都在O(1 )。 容器库 提供哈希、链表、队列、队列、堆栈、最小最大堆等常用容器。 支持各种常用的成员类型,在原有的容器期初上,其成员类型还可以自定义完全扩展。 所有容器都支持迭代器操作。 大多数容器都可以支持基于流的序列化和反序列化操作。 算法库 提供各种排序算法:冒泡排序、堆排序、快速排序、插入排序。 提供各种求解算法:线性遍历、二分法搜索。 提供各种遍历、删除、统计算法。 以迭代器为接口,但是实现算法和容器的分离,类似stl,c实现的,更加轻量。 网络库 实现http客户端功能 实现cookies 实现dns解析与缓存 实现ssl(支持openssl、polarssl、mbedtls) 支持ipv4、ipv6 支持通过协程实现异步模式 数学运算库 提供各种精度的定点攻击支持 提供随机数生成器 libc库 libc的一个轻量级实现,跨平台,并且针对不同的架构完全进行了优化。 支持大部分字符串、宽字符串操作。 扩展字符串、宽字符串的各种大小写不敏感操作接口 扩展memset_u16、memset_u32等接口,并对其进行高度优化,尤其适合图形渲染程序 libm库 libm部分接口的一个轻量级实现,以及对常用系统接口的封装。(目前只实现了部分,之后有时间会完全实现掉) 扩展部分常用接口,增加对sqrt、log2等常用函数的整数版本计算,进行高度优化,不涉及浮点侵犯,适合嵌入式环境使用。 对象库 轻量级类apple的CoreFoundation库,支持对象、字典、数组、字符串、数字、日期、数据等常用对象,并且可以方便扩展自定义对象的序列化。 对xml、json、binary以及apple的plist(xplist/bplist)格式序列化和反序列化支持。并且实现了自有的二进制序列化格式,针对明文进行了简单的加密,在不影响性能的前提下,序列化后的大小比bplist节省30%。 平台库 提供文件、目录、socket、线程、时间等常用系统接口 提供atomic、atomic64接口 提供高精度、低精度 提供高性能的线程池操作 提供事件、互斥量、信号量、自旋锁等事件、互斥、信号量、自旋锁操作 提供获取函数堆栈信息的接口,方便调试和错误定位 提供跨平台动态库加载接口(如果系统支持的话) 提供io轮询器,针对epoll、poll、select、kqueue进行跨平台封装 提供跨平台上下文切换接口,主要用于协程实现,切换效率非常高 压缩库 支持zlib/zlibraw/gzip的压缩与解压(需要第三方zlib库支持)。 字符编码库 支持utf8、utf16、gbk、gb2312、uc2、uc4之间的相互转码,并且支持大小端格式。 实用工具库 实现base64/32解码 实现crc32、adler32、md5、sha1等常用哈希算法 实现日志输出、断言等辅助调试工具 实现url编 实现位操作相关接口,支持各种数据格式的解析,可以对8bits、16bits、32bits、64bits、float、double以及任意bits的字段进行解析操作,并且同时支持大端、小端和本地端模式,并针对部分操作进行了优化,像static_stream、stream都有相关接口对其进行了封装,方便在流上进行快速数据解析。 实现了swap16、swap32、swap64等位交换操作,并针对各个平台进行了优化。 实现一些高级的位处理接口,例如:位0的快速统计、前导0和前导1的快速位计数、后导01的位计数 实现单例模块,可以对静态对象、实例对象进行快速的单例封装,实现全局线程安全 实现选项模块,快速对命令行参数进行解析,提供方便的命令行选项创建和解析操作,对于编写终端程序还是很有帮助的 正當理責库 支持匹配和替换操作 支持全局、多行、大小写不敏感等模式 使用pcre, pcre2和posix正则库 3一些使用tbox项目: 格盒 vm86 制作 伊特拉斯 更多项目 4 编译 请先安装: xmake #默认直接编译当前主机平台$ cd./tbox$ xmake #编译mingw平台$ cd./tbox$ xmake f -p mingw --sdk=/home/mingwsdk$ xmake #编译iphoneos平台$ cd./tbox$ xmake f -p iphoneos$ xmake #编译android平台$ cd./tbox$ xmake f -p android --ndk=xxxxx$ xmake #交叉编译$ cd./tbox$ xmake f -p linux --sdk=/home/sdk #--bin=/home/sdk/bin$ xmake 5 个例子 #包括 “tbox/tbox.h” int main (int argc,char ** argv){ // 初始化 tbox 如果(!tb_init(tb_null,tb_null))返回 0; // 痕迹tb_trace_i( "你好 tbox" ); // 初始化向量 tb_vector_ref_t 向量= tb_vector_init( 0 , tb_element_str(tb_true)); 如果(向量){ // 插入项目tb_vector_insert_tail(向量, “你好” );tb_vector_insert_tail(向量,"tbox" ); // 转储所有项目tb_for_all ( tb_char_t const *, cstr,向量){ // 痕迹tb_trace_i( “%s”,cstr);} // 退出向量tb_vector_exit(向量);} // 初始化流 tb_stream_ref_t流 = tb_stream_init_from_url( "http://www.xxx.com/file.txt" ); 如果(流){ // 打开流 如果(tb_stream_open(流)){ // 读取行 tb_long_t大小 = 0 ; tb_char_t行[TB_STREAM_BLOCK_MAXN]; 当((size = tb_stream_bread_line(stream,line,sizeof(line)))> = 0){ // 痕迹tb_trace_i( "行:%s",行);}} // 退出流tb_stream_exit(流);} // 等待tb_getchar(); // 退出 tbox退出(); 返回 0;} 点击阅读原文,了解更多。https://gitee.com/tboox/tbox

    08-21 198浏览
  • STM32三种开发方式

    前言   相比较早几年使用标准库开发来讲,最近几年HAL库的使用是越来越多,那么我们开发应当使用哪一种呢,本文着重介绍常用的几种开发方式及相互之间的区别,白猫也好、黑猫也好,抓到耗子就是好猫。 STM32三种开发方式   通常新手在入门STM32的时候,首先都要先选择一种要用的开发方式,不同的开发方式会导致你编程的架构是完全不一样的。一般大多数都会选用标准库和HAL库,而极少部分人会通过直接配置寄存器进行开发。   网上关于标准库、HAL库的描述相信是数不胜数。可是一个对于很多刚入门的朋友还是没法很直观的去真正了解这些不同开发发方式彼此之间的区别,所以笔者想以一种非常直白的方式,用自己的理解去将这些东西表述出来,如果有描述的不对的地方或者是不同意见的也可以大家提出。 1、直接配置寄存器   不少先学了51的朋友可能会知道,会有一小部分人或是教程是通过汇编语言直接操作寄存器实现功能的,这种方法到了STM32就变得不太容易行得通了,因为STM32的寄存器数量是51单片机的十数倍,如此多的寄存器根本无法全部记忆,开发时需要经常的翻查芯片的数据手册,此时直接操作寄存器就变得非常的费力了。但还是会有很小一部分人,喜欢去直接操作寄存器,因为这样更接近原理,知其然也知其所以然。 2、标准库   上面也提到了,STM32有非常多的寄存器,而导致了开发困难,所以为此ST公司就为每款芯片都编写了一份库文件,也就是工程文件里stm32F1xx…之类的。在这些 .c .h文件中,包括一些常用量的宏定义,把一些外设也通过结构体变量封装起来,如GPIO口时钟等。所以我们只需要配置结构体变量成员就可以修改外设的配置寄存器,从而选择不同的功能。也是目前最多人使用的方式,也是学习STM32接触最多的一种开发方式,我也就不多阐述了。 3、HAL库   HAL库是ST公司目前主力推的开发方式,全称就是Hardware Abstraction Layer(抽象印象层)。库如其名,很抽象,一眼看上去不太容易知道他的作用是什么。   它的出现比标准库要晚,但其实和标准库一样,都是为了节省程序开发的时期,而且HAL库尤其的有效,如果说标准库把实现功能需要配置的寄存器集成了,那么HAL库的一些函数甚至可以做到某些特定功能的集成。也就是说,同样的功能,标准库可能要用几句话,HAL库只需用一句话就够了。   并且HAL库也很好的解决了程序移植的问题,不同型号的stm32芯片它的标准库是不一样的,例如在F4上开发的程序移植到F3上是不能通用的,而使用HAL库,只要使用的是相通的外设,程序基本可以完全复制粘贴,注意是相通外设,意思也就是不能无中生有,例如F7比F3要多几个定时器,不能明明没有这个定时器却非要配置,但其实这种情况不多,绝大多数都可以直接复制粘贴。是而且使用ST公司研发的STMcube软件,可以通过图形化的配置功能,直接生成整个使用HAL库的工程文件,可以说是方便至极,但是方便的同时也造成了它执行效率的低下,在各种论坛帖子真的是被吐槽的数不胜数。 STM32 HAL库与标准库的区别 1、句柄   句柄(handle),有多种意义,其中第一种是指程序设计,第二种是指Windows编程。现在大部分都是指程序设计/程序开发这类。 第一种解释:句柄是一种特殊的智能指针 。当一个应用程序要引用其他系统(如数据库、操作系统)所管理的内存块或对象时,就要使用句柄。 第二种解释:整个Windows编程的基础。一个句柄是指使用的一个唯一的整数值,即一个4字节(64位程序中为8字节)长的数值,来标识应用程序中的不同对象和同类中的不同的实例,诸如,一个窗口,按钮,图标,滚动条,输出设备,控件或者文件等。应用程序能够通过句柄访问相应的对象的信息,但是句柄不是指针,程序不能利用句柄来直接阅读文件中的信息。如果句柄不在I/O文件中,它是毫无用处的。句柄是Windows用来标志应用程序中建立的或是使用的唯一整数,Windows大量使用了句柄来标识对象。 STM32的标准库中,句柄是一种特殊的指针,通常指向结构体!   在STM32的标准库中,假设我们要初始化一个外设(这里以USART为例),我们首先要初始化他们的各个寄存器。在标准库中,这些操作都是利用固件库结构体变量+固件库Init函数实现的: USART_InitTypeDef USART_InitStructure; USART_InitStructure.USART_BaudRate = bound;//串口波特率USART_InitStructure.USART_WordLength = USART_WordLength_8b;//字长为8位数据格式USART_InitStructure.USART_StopBits = USART_StopBits_1;//一个停止位USART_InitStructure.USART_Parity = USART_Parity_No;//无奇偶校验位USART_InitStructure.USART_HardwareFlowControl = USART_HardwareFlowControl_None;//无硬件数据流控制USART_InitStructure.USART_Mode = USART_Mode_Rx | USART_Mode_Tx; //收发模式 USART_Init(USART3, &USART_InitStructure); //初始化串口1 可以看到,要初始化一个串口,需要: 1、对六个位置进行赋值 2、然后引用Init函数   USART_InitStructure并不是一个全局结构体变量,而是只在函数内部的局部变量,初始化完成之后,USART_InitStructure就失去了作用。而在HAL库中,同样是USART初始化结构体变量,我们要定义为全局变量。 UART_HandleTypeDef UART1_Handler; 结构体成员 typedef struct{ USART_TypeDef *Instance; /*!< UART registers base address */ UART_InitTypeDef Init; /*!< UART communication parameters */uint8_t *pTxBuffPtr; /*!< Pointer to UART Tx transfer Buffer */uint16_t TxXferSize; /*!< UART Tx Transfer size */uint16_t TxXferCount; /*!< UART Tx Transfer Counter */uint8_t *pRxBuffPtr; /*!< Pointer to UART Rx transfer Buffer */uint16_t RxXferSize; /*!< UART Rx Transfer size */uint16_t RxXferCount; /*!< UART Rx Transfer Counter */ DMA_HandleTypeDef *hdmatx; /*!< UART Tx DMA Handle parameters */ DMA_HandleTypeDef *hdmarx; /*!< UART Rx DMA Handle parameters */ HAL_LockTypeDef Lock; /*!< Locking object */ __IO HAL_UART_StateTypeDef State; /*!< UART communication state */ __IO uint32_t ErrorCode; /*!< UART Error code */}UART_HandleTypeDef; 我们发现,与标准库不同的是,该成员不仅: 1、包含了之前标准库就有的六个成员(波特率,数据格式等), 2、还包含过采样、(发送或接收的)数据缓存、数据指针、串口 DMA 相关的变量、各种标志位等等要在整个项目流程中都要设置的各个成员。 该 UART1_Handler就被称为串口的句柄,它被贯穿整个USART收发的流程,比如开启中断: HAL_UART_Receive_IT(&UART1_Handler, (u8 *)aRxBuffer, RXBUFFERSIZE); 比如后面要讲到的MSP与Callback回调函数: void HAL_UART_MspInit(UART_HandleTypeDef *huart);void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart); 在这些函数中,只需要调用初始化时定义的句柄UART1_Handler就好。 2、MSP函数 MSP: MCU Specific Package 单片机的具体方案 MSP是指和MCU相关的初始化,引用一下正点原子的解释,个人觉得说的很明白:   我们要初始化一个串口,首先要设置和 MCU 无关的东西,例如波特率,奇偶校验,停止位等,这些参数设置和 MCU 没有任何关系,可以使用 STM32F1,也可以是 STM32F2/F3/F4/F7上的串口。而一个串口设备它需要一个 MCU 来承载,例如用 STM32F4 来做承载,PA9 做为发送,PA10 做为接收,MSP 就是要初始化 STM32F4 的 PA9,PA10,配置这两个引脚。所以 HAL驱动方式的初始化流程就是: HAL_USART_Init()—>HAL_USART_MspInit() ,先初始化与 MCU无关的串口协议,再初始化与 MCU 相关的串口引脚。 在 STM32 的 HAL 驱动中HAL_PPP_MspInit()作为回调,被 HAL_PPP_Init()函数所调用。当我们需要移植程序到 STM32F1平台的时候,我们只需要修改 HAL_PPP_MspInit 函数内容而不需要修改 HAL_PPP_Init 入口参数内容。   在HAL库中,几乎每初始化一个外设就需要设置该外设与单片机之间的联系,比如IO口,是否复用等等,可见,HAL库相对于标准库多了MSP函数之后,移植性非常强,但与此同时却增加了代码量和代码的嵌套层级。可以说各有利弊。 同样,MSP函数又可以配合句柄,达到非常强的移植性: void HAL_UART_MspInit(UART_HandleTypeDef *huart); 3、Callback函数   类似于MSP函数,个人认为Callback函数主要帮助用户应用层的代码编写。   还是以USART为例,在标准库中,串口中断了以后,我们要先在中断中判断是否是接收中断,然后读出数据,顺便清除中断标志位,然后再是对数据的处理,这样如果我们在一个中断函数中写这么多代码,就会显得很混乱: void USART3_IRQHandler(void) //串口1中断服务程序{ u8 Res;if(USART_GetITStatus(USART3, USART_IT_RXNE) != RESET) //接收中断(接收到的数据必须是0x0d 0x0a结尾) { Res =USART_ReceiveData(USART3); //读取接收到的数据/*数据处理区*/ } } } 而在HAL库中,进入串口中断后,直接由HAL库中断函数进行托管: void USART1_IRQHandler(void) { HAL_UART_IRQHandler(&UART1_Handler); //调用HAL库中断处理公用函数 /***************省略无关代码****************/ }   HAL_UART_IRQHandler这个函数完成了判断是哪个中断(接收?发送?或者其他?),然后读出数据,保存至缓存区,顺便清除中断标志位等等操作。   比如我提前设置了,串口每接收五个字节,我就要对这五个字节进行处理。在一开始我定义了一个串口接收缓存区: /*HAL库使用的串口接收缓冲,处理逻辑由HAL库控制,接收完这个数组就会调用HAL_UART_RxCpltCallback进行处理这个数组*//*RXBUFFERSIZE=5*/u8 aRxBuffer[RXBUFFERSIZE]; 在初始化中,我在句柄里设置好了缓存区的地址,缓存大小(五个字节) /*该代码在HAL_UART_Receive_IT函数中,初始化时会引用*/ huart->pRxBuffPtr = pData;//aRxBuffer huart->RxXferSize = Size;//RXBUFFERSIZE huart->RxXferCount = Size;//RXBUFFERSIZE   则在接收数据中,每接收完五个字节,HAL_UART_IRQHandler才会执行一次Callback函数: void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart);   在这个Callback回调函数中,我们只需要对这接收到的五个字节(保存在aRxBuffer[]中)进行处理就好了,完全不用再去手动清除标志位等操作。   所以说Callback函数是一个应用层代码的函数,我们在一开始只设置句柄里面的各个参数,然后就等着HAL库把自己安排好的代码送到手中就可以了~   综上,就是HAL库的三个与标准库不同的地方之个人见解。个人觉得从这三个小点就可以看出HAL库的可移植性之强大,并且用户可以完全不去理会底层各个寄存器的操作,代码也更有逻辑性。但与此带来的是复杂的代码量,极慢的编译速度,略微低下的效率。看怎么取舍了。 STM32 HAL库结构   说到STM32的HAL库,就不得不提STM32CubeMX,其作为一个可视化的配置工具,对于开发者来说,确实大大节省了开发时间。相关推荐:STM32CubeMX安装教程。STM32CubeMX就是以HAL库为基础的,且目前仅支持HAL库及LL库!首先看一下,官方给出的HAL库的包含结构: 1、stm32f4xx.h主要包含STM32同系列芯片的不同具体型号的定义,是否使用HAL库等的定义,接着,其会根据定义的芯片信号包含具体的芯片型号的头文件: #if defined(STM32F405xx)#include "stm32f405xx.h"#elif defined(STM32F415xx)#include "stm32f415xx.h"#elif defined(STM32F407xx)#include "stm32f407xx.h"#elif defined(STM32F417xx)#include "stm32f417xx.h"#else#error "Please select first the target STM32F4xx device used in your application (in stm32f2xx.h file)"#endif 紧接着,其会包含stm32f4xx_hal.h。 2、stm32f4xx_hal.h:stm32f4xx_hal.c/h 主要实现HAL库的初始化、系统滴答相关函数、及CPU的调试模式配置 3、stm32f4xx_hal_conf.h :该文件是一个用户级别的配置文件,用来实现对HAL库的裁剪,其位于用户文件目录,不要放在库目录中。 接下来对于HAL库的源码文件进行一下说明,HAL库文件名均以stm32f4xx_hal开头,后面加上_外设或者模块名(如:stm32f4xx_hal_adc.c): 4、库文件:stm32f4xx_hal_ppp.c/.h // 主要的外设或者模块的驱动源文件,包含了该外设的通用API stm32f4xx_hal_ppp_ex.c/.h // 外围设备或模块驱动程序的扩展文件。这组文件中包含特定型号或者系列的芯片的特殊API。以及如果该特定的芯片内部有不同的实现方式,则该文件中的特殊API将覆盖_ppp中的通用API。 stm32f4xx_hal.c/.h // 此文件用于HAL初始化,并且包含DBGMCU、重映射和基于systick的时间延迟等相关的API 5、其他库文件 用户级别文件: stm32f4xx_hal_msp_template.c // 只有.c没有.h。它包含用户应用程序中使用的外设的MSP初始化和反初始化(主程序和回调函数)。使用者复制到自己目录下使用模板。 stm32f4xx_hal_conf_template.h // 用户级别的库配置文件模板。使用者复制到自己目录下使用 system_stm32f4xx.c // 此文件主要包含SystemInit()函数,该函数在刚复位及跳到main之前的启动过程中被调用。它不在启动时配置系统时钟(与标准库相反)。时钟的配置在用户文件中使用HAL API来完成。startup_stm32f4xx.s // 芯片启动文件,主要包含堆栈定义,终端向量表等 stm32f4xx_it.c/.h // 中断处理函数的相关实现 6 main.c/.h // 根据HAL库的命名规则,其API可以分为以下三大类: 初始化/反初始化函数: HAL_PPP_Init(), HAL_PPP_DeInit() IO 操作函数: HAL_PPP_Read(),HAL_PPP_Write(),HAL_PPP_Transmit(), HAL_PPP_Receive() 控制函数: HAL_PPP_Set (), HAL_PPP_Get (). 状态和错误: ** HAL_PPP_GetState (), HAL_PPP_GetError (). 注意:   目前LL库是和HAL库捆绑发布的,所以在HAL库源码中,还有一些名为 stm32f2xx_ll_ppp的源码文件,这些文件就是新增的LL库文件。使用CubeMX生产项目时,可以选择LL库。   HAL库最大的特点就是对底层进行了抽象。在此结构下,用户代码的处理主要分为三部分: 处理外设句柄(实现用户功能) 处理MSP 处理各种回调函数 相关知识如下: 1、外设句柄定义   用户代码的第一大部分:对于外设句柄的处理。HAL库在结构上,对每个外设抽象成了一个称为ppp_HandleTypeDef的结构体,其中ppp就是每个外设的名字。*所有的函数都是工作在ppp_HandleTypeDef指针之下。 多实例支持:每个外设/模块实例都有自己的句柄。因此,实例资源是独立的 下面,以ADC为例 外围进程相互通信:该句柄用于管理进程例程之间的共享数据资源。 /** * @brief ADC handle Structure definition */typedef struct{ ADC_TypeDef *Instance; /*!< Register base address */ ADC_InitTypeDef Init; /*!< ADC required parameters */ __IO uint32_t NbrOfCurrentConversionRank; /*!< ADC number of current conversion rank */ DMA_HandleTypeDef *DMA_Handle; /*!< Pointer DMA Handler */ HAL_LockTypeDef Lock; /*!< ADC locking object */ __IO uint32_t State; /*!< ADC communication state */ __IO uint32_t ErrorCode; /*!< ADC Error code */}ADC_HandleTypeDef;   从上面的定义可以看出,ADC_HandleTypeDef中包含了ADC可能出现的所有定义,对于用户想要使用ADC只要定义一个ADC_HandleTypeDef的变量,给每个变量赋好值,对应的外设就抽象完了。接下来就是具体使用了。    当然,对于那些共享型外设或者说系统外设来说,他们不需要进行以上这样的抽象,这些部分与原来的标准外设库函数基本一样。例如以下外设: GPIO SYSTICK NVIC RCC FLASH   以GPIO为例,对于HAL_GPIO_Init() 函数,其只需要GPIO 地址以及其初始化参数即可。 2、 三种编程方式 HAL库对所有的函数模型也进行了统一。在HAL库中,支持三种编程模式:轮询模式、中断模式、DMA模式(如果外设支持)。其分别对应如下三种类型的函数(以ADC为例): HAL_StatusTypeDef HAL_ADC_Start(ADC_HandleTypeDef* hadc); HAL_StatusTypeDef HAL_ADC_Stop(ADC_HandleTypeDef* hadc); HAL_StatusTypeDef HAL_ADC_Start_IT(ADC_HandleTypeDef* hadc);HAL_StatusTypeDef HAL_ADC_Stop_IT(ADC_HandleTypeDef* hadc); HAL_StatusTypeDef HAL_ADC_Start_DMA(ADC_HandleTypeDef* hadc, uint32_t* pData, uint32_t Length);HAL_StatusTypeDef HAL_ADC_Stop_DMA(ADC_HandleTypeDef* hadc);    其中,带_IT的表示工作在中断模式下;带_DMA的工作在DMA模式下(注意:DMA模式下也是开中断的);什么都没带的就是轮询模式(没有开启中断的)。至于使用者使用何种方式,就看自己的选择了。   此外,新的HAL库架构下统一采用宏的形式对各种中断等进行配置(原来标准外设库一般都是各种函数)。针对每种外设主要由以下宏: __HAL_PPP_ENABLE_IT(HANDLE, INTERRUPT):使能一个指定的外设中断__HAL_PPP_DISABLE_IT(HANDLE, INTERRUPT):失能一个指定的外设中断__HAL_PPP_GET_IT (HANDLE, __ INTERRUPT __):获得一个指定的外设中断状态__HAL_PPP_CLEAR_IT (HANDLE, __ INTERRUPT __):清除一个指定的外设的中断状态__HAL_PPP_GET_FLAG (HANDLE, FLAG):获取一个指定的外设的标志状态__HAL_PPP_CLEAR_FLAG (HANDLE, FLAG):清除一个指定的外设的标志状态__HAL_PPP_ENABLE(HANDLE) :使能外设__HAL_PPP_DISABLE(HANDLE) :失能外设__HAL_PPP_XXXX (HANDLE, PARAM) :指定外设的宏定义_HAL_PPP_GET IT_SOURCE (HANDLE, __ INTERRUPT __):检查中断源 3、 三大回调函数   在HAL库的源码中,到处可见一些以__weak开头的函数,而且这些函数,有些已经被实现了,比如: __weak HAL_StatusTypeDef HAL_InitTick(uint32_t TickPriority){/*Configure the SysTick to have interrupt in 1ms time basis*/ HAL_SYSTICK_Config(SystemCoreClock/1000U);/*Configure the SysTick IRQ priority */ HAL_NVIC_SetPriority(SysTick_IRQn, TickPriority ,0U);/* Return function status */return HAL_OK;} 有些则没有被实现,例如: __weak void HAL_SPI_TxCpltCallback(SPI_HandleTypeDef *hspi){/* Prevent unused argument(s) compilation warning */ UNUSED(hspi);/* NOTE : This function should not be modified, when the callback is needed,the HAL_SPI_TxCpltCallback should be implemented in the user file */}    所有带有__weak关键字的函数表示,就可以由用户自己来实现。如果出现了同名函数,且不带__weak关键字,那么连接器就会采用外部实现的同名函数。   通常来说,HAL库负责整个处理和MCU外设的处理逻辑,并将必要部分以回调函数的形式给出到用户,用户只需要在对应的回调函数中做修改即可。HAL库包含如下三种用户级别回调函数(PPP为外设名): 1、外设系统级初始化/解除初始化回调函数(用户代码的第二大部分:对于MSP的处理): HAL_PPP_MspInit()和 HAL_PPP_MspDeInit** 例如: __weak void HAL_SPI_MspInit(SPI_HandleTypeDef *hspi)。 在HAL_PPP_Init() 函数中被调用,用来初始化底层相关的设备(GPIOs, clock, DMA, interrupt) 2、处理完成回调函数:HAL_PPP_ProcessCpltCallback*(Process指具体某种处理,如UART的Tx), 例如: __weak void HAL_SPI_RxCpltCallback(SPI_HandleTypeDef *hspi) 当外设或者DMA工作完成后时,触发中断,该回调函数会在外设中断处理函数或者DMA的中断处理函数中被调用错误处理回调函数: HAL_PPP_ErrorCallback 例如: __weak void HAL_SPI_ErrorCallback(SPI_HandleTypeDef hspi)* 3、当外设或者DMA出现错误时,触发终端,该回调函数会在外设中断处理函数或者DMA的中断处理函数中被调用 错误处理回调函数: HAL_PPP_ErrorCallback 例如: __weak void HAL_SPI_ErrorCallback(SPI_HandleTypeDef hspi)*   当外设或者DMA出现错误时,触发终端,该回调函数会在外设中断处理函数或者DMA的中断处理函数中被调用。   绝大多数用户代码均在以上三大回调函数中实现。   HAL库结构中,在每次初始化前(尤其是在多次调用初始化前),先调用对应的反初始化(DeInit)函数是非常有必要的。 某些外设多次初始化时不调用返回会导致初始化失败。完成回调函数有多中,例如串口的完成回调函数有 HAL_UART_TxCpltCallbackHAL_UART_TxHalfCpltCallback   (用户代码的第三大部分:对于上面第二点和第三点的各种回调函数的处理)在实际使用中,发现HAL仍有不少问题,例如在使用USB时,其库配置存在问题。 HAL库移植使用 基本步骤: 1、复制stm32f2xx_hal_msp_template.c,参照该模板,依次实现用到的外设的HAL_PPP_MspInit()和 HAL_PPP_MspDeInit。 2、复制stm32f2xx_hal_conf_template.h,用户可以在此文件中自由裁剪,配置HAL库。 3、在使用HAL库时,必须先调用函数:HAL_StatusTypeDef HAL_Init(void)(该函数在stm32f2xx_hal.c中定义,也就意味着第一点中,必须首先实现HAL_MspInit(void)和HAL_MspDeInit(void)) 4、HAL库与STD库不同,HAL库使用RCC中的函数来配置系统时钟,用户需要单独写时钟配置函数(STD库默认在system_stm32f2xx.c中) 5、关于中断,HAL提供了中断处理函数,只需要调用HAL提供的中断处理函数。用户自己的代码,不建议先写到中断中,而应该写到HAL提供的回调函数中。 6、对于每一个外设,HAL都提供了回调函数,回调函数用来实现用户自己的代码。整个调用结构由HAL库自己完成。 例如: Uart中,HAL提供了 void HAL_UART_IRQHandler(UART_HandleTypeDef *huart); 函数,用户只需要触发中断后,用户只需要调用该函数即可,同时,自己的代码写在对应的回调函数中即可!如下: void HAL_UART_TxCpltCallback(UART_HandleTypeDef *huart);void HAL_UART_TxHalfCpltCallback(UART_HandleTypeDef *huart);void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart);void HAL_UART_RxHalfCpltCallback(UART_HandleTypeDef *huart);void HAL_UART_ErrorCallback(UART_HandleTypeDef *huart); 使用了哪种就用哪个回调函数即可! 基本结构 综上所述,使用HAL库编写程序(针对某个外设)的基本结构(以串口为例)如下: 1、 配置外设句柄 例如,建立UartConfig.c,在其中定义串口句柄 UART_HandleTypeDef huart;接着使用初始化句柄(HAL_StatusTypeDef HAL_UART_Init(UART_HandleTypeDef huart)) 2、编写Msp 例如,建立UartMsp.c,在其中实现void HAL_UART_MspInit(UART_HandleTypeDef huart) 和 void HAL_UART_MspDeInit(UART_HandleTypeDef* huart) 3、实现对应的回调函数 例如,建立UartCallBack.c,在其中实现上文所说明的三大回调函数中的完成回调函数和错误回调函数 参考文档及网文链接 ST - Description of STM32F4 HAL and LL drivers.pdf ST - en.stm32_embedded_software_offering.pdf 作者:ZCShoucsdn 来源:CSDN 原文:https://blog.csdn.net/zcshoucsdn/article/details/55213616 作者:ZCShoucsdn 来源:CSDN 原文:https://blog.csdn.net/zcshoucsdn/article/details/55213616 作者:Error_4O4 来源:CSDN 原文:https://blog.csdn.net/weixin_43186792/article/details/88759321 作者:csdnpapa 来源:CSDN 原文:https://blog.csdn.net/csdnpapa/article/details/79309937

    08-08 149浏览
  • 如何学习嵌入式系统开发?

    你知道吗

    08-07 111浏览
  • 常用的GDB的调试方法和技巧

    本文分为两个大模块,第一部分记录下本人常用到的GDB的调试方法和技巧,第二部分则尝试分析GDB调试的底层原理。

    07-30 211浏览
  • 数据库管理:有效组织、维护和控制数据

    数据库是存放数据的仓库。它的存储空间很大,可以存放百万条、千万条、上亿条数据。但是数据库并不是随意地将数据进行存放,是有一定的规则的,否则查询的效率会很低。当今世界是一个充满着数据的互联网世界,充斥着...

    07-23 163浏览
  • 管理信息系统的标准和要求

    管理信息系统(Management Information System,简称MIS)是一个以人为主导,利用计算机硬件、软件、网络通信设备以及其他办公设备,进行信息的收集、传输、加工、储存、更新、拓展和维护的系统。 管理信息系统(...

    07-23 172浏览
正在努力加载更多...
广告