• ZeroMQ的独占模式学习

    一、通信模式介绍 引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。” ---来自百度 ZeroMQ已经支持多种通信模式: Request-reply pattern(请求-回复模式) Publish-subscribe pattern(发布-订阅模式) Pipeline pattern(管道模式) Exclusive pair pattern (独占对等模式) 1-N、N-1 和 N-M消息传递模式 1-N 模式 本质:一个发送者(源)将消息广播或分发给多个接收者。发送者主动推送消息,而接收者被动接收消息。这种模式常用于广播和消息分发。 举例: Pub-Sub(发布-订阅)模式:一个发布者发布消息,多个订阅者接收这些消息。 场景:新闻广播系统 发布者:新闻机构 订阅者:各种新闻应用和用户 N-1 模式 本质:多个发送者将消息发送到一个接收者。接收者主动从多个发送者处拉取消息,而发送者被动推送消息。这种模式适用于任务收集或负载均衡。 举例: Pipeline(Push-Pull)模式:多个任务生成者(Push)将任务推送到一个任务处理器(Pull)。 场景:日志收集系统 任务生成者:各个服务器的日志生成模块 任务处理器:日志分析或存储系统 N-M 模式 本质:多个发送者将消息发送到多个接收者。每个发送者可以向多个接收者发送消息,多个接收者可以从多个发送者处接收消息。这种模式适用于复杂的广播、聚合或分布式任务处理。 举例: Pub-Sub(发布-订阅):多个发布者广播消息到多个订阅者。每个发布者可以广播不同的主题,订阅者可以订阅多个主题。 场景:实时数据流系统 发布者:数据源(传感器、日志系统) 订阅者:多个应用程序(监控仪表盘、分析工具) 1. Request-reply pattern(请求-回复模式) REQ套接字 REQ套接字用于客户端向服务发送请求并接收回复。该套接字类型仅允许发送和随后的接收调用交替进行。REQ套接字可以连接到任意数量的REP或ROUTER套接字。每个发送的请求会在所有连接的服务之间进行轮询,每个接收到的回复与最后发出的请求匹配。它适用于简单的请求-回复模型,其中对失效对等方的可靠性不是问题。 如果没有服务可用,则套接字上的任何发送操作将阻塞,直到至少有一个服务可用。REQ套接字不会丢弃任何消息。 特性总结: 兼容的对等套接字:REP, ROUTER 方向:双向 发送/接收模式:发送,接收,发送,接收,… 出站路由策略:轮询 入站路由策略:最后一个对等方 静默状态下的动作:阻塞 REP套接字 REP套接字用于服务从客户端接收请求并发送回复。该套接字类型仅允许接收和随后的发送调用交替进行。每个接收到的请求从所有客户端中公平排队,每个发送的回复路由到发出最后请求的客户端。如果原始请求者不存在,则回复会被静默丢弃。 特性总结: 兼容的对等套接字:REQ, DEALER 方向:双向 发送/接收模式:接收,发送,接收,发送,… 出站路由策略:公平轮询 入站路由策略:最后一个对等方 DEALER套接字 DEALER套接字类型与一组匿名对等方通信,使用轮询算法发送和接收消息。它是可靠的,因为它不会丢弃消息。DEALER作为REQ的异步替代,适用于与REP或ROUTER服务器通信的客户端。DEALER接收到的消息从所有连接的对等方中公平排队。 当DEALER套接字由于达到所有对等方的高水位标记而进入静默状态,或者如果根本没有对等方,则套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个对等方可用;消息不会被丢弃。 当DEALER套接字连接到REP套接字时,发送的消息必须包含一个空帧作为消息的第一部分(分隔符),后跟一个或多个正文部分。 特性总结: 兼容的对等套接字:ROUTER, REP, DEALER 方向:双向 发送/接收模式:不受限制 出站路由策略:轮询 入站路由策略:公平排队 静默状态下的动作:阻塞 ROUTER套接字 ROUTER套接字类型与一组对等方通信,使用显式地址,以便每个传出的消息发送到特定的对等连接。ROUTER作为REP的异步替代,通常用于与DEALER客户端通信的服务器。 当接收消息时,ROUTER套接字将在消息前添加一个包含原始对等方路由ID的消息部分,然后传递给应用程序。接收到的消息从所有连接的对等方中公平排队。当发送消息时,ROUTER套接字将移除消息的第一部分,并使用它来确定消息应路由到的对等方的路由ID。如果对等方不再存在,或者从未存在,则消息将被静默丢弃。 当ROUTER套接字由于达到所有对等方的高水位标记而进入静默状态时,发送到套接字的任何消息将被丢弃,直到静默状态结束。同样,任何路由到已达到其单个高水位标记的对等方的消息也将被丢弃。 当REQ套接字连接到ROUTER套接字时,除了原始对等方的路由ID,每个接收到的消息应包含一个空分隔符消息部分。因此,应用程序看到的每个接收消息的整个结构变为:一个或多个路由ID部分,分隔符部分,一个或多个正文部分。当发送回复给REQ套接字时,应用程序必须包含分隔符部分。 特性总结: 兼容的对等套接字:DEALER, REQ, ROUTER 方向:双向 发送/接收模式:不受限制 出站路由策略:见正文 入站路由策略:公平排队 静默状态下的动作:丢弃(见正文) 特性详细解释 REQ套接字 1.兼容的对等套接字: REP (Reply)和ROUTER套接字与 REQ 套接字兼容。也就是说,REQ 套接字可以与 REP 套接字和 ROUTER 套接字进行通信。 REP:这是一个典型的服务端套接字,负责接收请求并发送响应。 ROUTER:这是一个更复杂的服务端套接字,能够处理多个客户端,并可以进行更复杂的路由操作。 2.方向: 双向:虽然 REQ 套接字只处理请求和响应的交替操作,但它的通信方向是双向的。即,客户端发送请求,服务端返回响应,整个过程是双向交互的。 3.发送/接收模式: 发送,接收,发送,接收:REQ 套接字的工作模式是交替的。每次发送请求后,必须等待响应才能发送下一个请求。也就是说,发送和接收操作是严格交替进行的,这保证了每个请求都有一个响应对应。 4.出站路由策略: 轮询:REQ 套接字会在所有连接的服务端之间轮询发送请求。假设客户端连接到多个服务端(如 REP 套接字),REQ 套接字会按照轮询的方式将请求发送给各个服务端。这有助于实现负载均衡。 5.入站路由策略: 最后一个对等方:每个发送的请求的响应必须来自于最后一个接收到请求的对等方。这意味着每个请求和其对应的响应是匹配的,并且每个请求只会从一个特定的服务端得到回应。 6.静默状态下的动作: 阻塞:如果 REQ 套接字没有任何服务端可用时,发送操作会阻塞。也就是说,客户端会等待,直到至少有一个服务端可以接受请求。在没有服务端的情况下,请求不会丢失,而是会被阻塞,等待服务端的出现。 2. Publish-subscribe pattern(发布-订阅模式) 发布-订阅模式用于将数据从单个发布者以扇出方式分发到多个订阅者。 发布-订阅模式由RFC 29/PUBSUB正式定义。 ZeroMQ通过四种套接字类型支持发布/订阅: PUB 套接字类型 XPUB 套接字类型 SUB 套接字类型 XSUB 套接字类型 PUB 套接字 PUB套接字用于发布者分发数据。发送的消息以扇出方式分发给所有连接的对等方。此套接字类型不能接收任何消息。 当PUB套接字由于达到订阅者的高水位标记而进入静默状态时,将丢弃发送给该订阅者的消息,直到静默状态结束。发送功能永远不会阻塞此套接字类型。 特性总结: 兼容的对等套接字:SUB, XSUB 方向:单向 发送/接收模式:仅发送 入站路由策略:不适用 出站路由策略:扇出 静默状态下的动作:丢弃 SUB 套接字 SUB套接字用于订阅者订阅发布者分发的数据。最初SUB套接字不订阅任何消息。此套接字类型没有实现发送功能。 特性总结: 兼容的对等套接字:PUB, XPUB 方向:单向 发送/接收模式:仅接收 入站路由策略:公平排队 出站路由策略:不适用 XPUB 套接字 与PUB套接字相同,不同之处在于可以以传入消息的形式接收来自对等方的订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也会被接收,但对订阅状态没有影响。 特性总结: 兼容的对等套接字:ZMQ_SUB, ZMQ_XSUB 方向:单向 发送/接收模式:发送消息,接收订阅 入站路由策略:不适用 出站路由策略:扇出 静默状态下的动作:丢弃 XSUB 套接字 与SUB套接字相同,不同之处在于通过向套接字发送订阅消息来订阅。订阅消息是一个字节1(表示订阅)或字节0(表示取消订阅),后跟订阅体。没有订阅/取消订阅前缀的消息也可以发送,但对订阅状态没有影响。 特性总结: 兼容的对等套接字:ZMQ_PUB, ZMQ_XPUB 方向:单向 发送/接收模式:接收消息,发送订阅 入站路由策略:公平排队 出站路由策略:不适用 静默状态下的动作:丢弃 3. Pipeline pattern(管道模式) 管道模式用于任务分发,通常在多阶段流水线中,其中一个或少数节点将工作推送给许多工作节点,然后它们再将结果推送给一个或少数收集节点。这个模式主要是可靠的,只要节点不意外断开连接,消息就不会被丢弃。它具有可扩展性,节点可以随时加入。 管道模式由RFC 30/PIPELINE正式定义。 ZeroMQ通过两种套接字类型支持管道模式: PUSH 套接字类型 PULL 套接字类型 PUSH 套接字 PUSH套接字类型与一组匿名的PULL对等方通信,使用轮询算法发送消息。此套接字类型没有实现接收操作。 当PUSH套接字由于达到所有下游节点的高水位标记而进入静默状态,或者根本没有下游节点时,套接字上的任何发送操作将阻塞,直到静默状态结束或至少有一个下游节点可用进行发送;消息不会被丢弃。 特性总结: 兼容的对等套接字:PULL 方向:单向 发送/接收模式:仅发送 入站路由策略:不适用 出站路由策略:轮询 静默状态下的动作:阻塞 PULL 套接字 PULL套接字类型与一组匿名的PUSH对等方通信,使用公平排队算法接收消息。 此套接字类型没有实现发送操作。 特性总结: 兼容的对等套接字:PUSH 方向:单向 发送/接收模式:仅接收 入站路由策略:公平排队 出站路由策略:不适用 静默状态下的动作:阻塞 小结:在管道模式中,PUSH套接字负责将任务分发给多个PULL套接字,PULL套接字则负责接收这些任务。该模式通过轮询和公平排队算法确保任务的有效分配和处理,并且具备高可靠性和可扩展性,是实现任务分发和工作负载均衡的有效方式。 4. Exclusive pair pattern (独占 PAIR 模式) 独占对等模式 PAIR套接字不是通用套接字,而是用于特定用例,其中两个对等方在架构上是稳定的。这通常将PAIR限制在单个进程内,用于线程间通信。 独占对等模式由RFC 31/EXPAIR正式定义。 PAIR 套接字 PAIR套接字类型只能与一个对等方建立连接。消息在PAIR套接字上发送时不会进行路由或过滤。 当PAIR套接字由于达到连接对等方的高水位标记而进入静默状态,或者没有对等方连接时,套接字上的任何发送操作将阻塞,直到对等方变得可用进行发送;消息不会被丢弃。 尽管PAIR套接字可以通过除inproc之外的其他传输方式使用,但由于它们无法自动重连,并且在存在任何先前连接(包括处于关闭状态的连接)时,新入站连接将被终止,因此在大多数情况下,它们不适用于TCP。 小结:PAIR套接字专用于架构上稳定的两个对等方之间的通信,通常用于单个进程内的线程间通信。它只能与一个对等方连接,并且不支持消息路由或过滤。当达到高水位标记时,发送操作会阻塞,直到连接恢复。由于缺乏自动重连功能和处理新连接的限制,PAIR套接字不适合用于TCP连接。 二、请求-响应模式实现 请求响应模式是通信中最简单和基础的模式,ZeroMQ同样支持这个模式。 请求响应基础 请求-响应模式是计算机科学和网络通信中一种常见的通信模式。这个模式通常涉及两个主要角色:客户端和服务器。 基本概念 客户端: 向服务器发送请求的实体。它可以是浏览器、应用程序或任何其他发起通信的设备或程序。 服务器: 接收客户端请求并返回响应的实体。它通常是一个提供服务、资源或数据的程序或设备。 工作流程 客户端发起请求: 客户端构造一个请求消息,通常包含请求的类型(如 GET、POST)、请求的资源(如网页、API端点)、以及可能的附加数据(如表单数据)。 服务器处理请求: 服务器接收到请求后,解析请求内容,根据请求的类型和资源进行处理。处理可能包括访问数据库、执行计算或调用其他服务。 服务器返回响应: 处理完成后,服务器生成一个响应消息,通常包含状态码(如200表示成功、404表示资源未找到)、响应的内容(如网页内容、数据结果)以及其他信息(如响应时间、服务器信息)。 客户端接收响应: 客户端接收到响应后,解析响应内容并根据需要展示或处理这些数据。 典型应用 网页浏览: 当你在浏览器中输入网址并按下回车时,浏览器(客户端)会向服务器发出一个请求,服务器会返回网页内容作为响应。 API调用: 在应用程序中调用API时,客户端发送请求(如获取数据、提交表单),服务器处理请求并返回结果。 请求-响应模式的特点 同步通信: 通常情况下,请求-响应模式是同步的,即客户端发送请求后会等待服务器响应完成后才继续执行后续操作。 单向通信: 这种模式是一种单向通信,客户端请求数据,服务器响应数据,但服务器不会主动向客户端发送消息(除非使用长轮询或WebSocket等技术)。 简单直观: 由于其简单的结构和流程,很多网络协议和应用程序设计都是基于这种模式的。 应用实例 HTTP/HTTPS: 用于Web浏览和API交互的协议,客户端(浏览器或应用)发送HTTP请求,服务器返回HTTP响应。 REST API: 基于HTTP协议的API风格,允许客户端通过标准的HTTP请求(如GET、POST、PUT、DELETE)与服务器进行交互。 优点 易于理解和实现: 请求-响应模式简洁明了,易于理解和实现。 兼容性强: 许多网络协议和技术都基于这种模式,因此具有良好的兼容性。 缺点 延迟问题: 在网络不稳定的情况下,请求和响应的延迟可能影响用户体验。 同步阻塞: 客户端通常需要等待响应完成才能继续执行,可能导致性能瓶颈。 总的来说,请求-响应模式是现代计算和通信中的基础构建块,为各种网络应用和服务提供了一个标准化的通信方式。 ZEROMQ  C语言 在 C 语言中使用 ZeroMQ 实现请求-回复模式(Request-Reply Pattern)涉及创建一个请求端和一个回复端,通过 ZeroMQ 套接字进行通信。 服务器端代码(Reply Server) 下面是使用 C 语言编写的 ZeroMQ 请求-回复模式的示例代码。我们将分别实现一个服务器端(Reply Server)和一个客户端(Request Client)。 #include #include #include #include int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new(); // 创建 REP (reply) 套接字 void *responder = zmq_socket(context, ZMQ_REP); // 将套接字绑定到端口 zmq_bind(responder, "tcp://*:5555"); while (1) { // 接收请求 char buffer[256]; zmq_recv(responder, buffer, 255, 0); printf("Received request: %s\n", buffer); // 发送回复 const char *reply = "World"; zmq_send(responder, reply, strlen(reply), 0); } // 清理资源 zmq_close(responder); zmq_ctx_destroy(context); return 0;} 客户端代码(Request Client) #include #include #include int main() { // 初始化 ZeroMQ 上下文 void *context = zmq_ctx_new(); // 创建 REQ (request) 套接字 void *requester = zmq_socket(context, ZMQ_REQ); // 连接到服务器 zmq_connect(requester, "tcp://localhost:5555"); // 发送请求 const char *request = "Hello"; zmq_send(requester, request, strlen(request), 0); // 接收回复 char buffer[256]; zmq_recv(requester, buffer, 255, 0); buffer[255] = '\0'; // 确保字符串以 null 结尾 printf("Received reply: %s\n", buffer); // 清理资源 zmq_close(requester); zmq_ctx_destroy(context); return 0;} 编译代码 编译上述代码时,需要链接 ZeroMQ 库。下面是使用 gcc 编译器的示例命令: gcc -o server server.c -lzmqgcc -o client client.c -lzmq 详细说明 ZeroMQ Context: zmq_ctx_new() 用于创建 ZeroMQ 上下文,管理所有的套接字和连接。每个应用程序应该有一个上下文对象。 Sockets: 使用 zmq_socket() 创建套接字。请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。 Bind 和 Connect: zmq_bind() 绑定服务器端套接字到指定的地址和端口。 zmq_connect() 连接客户端套接字到服务器端的地址和端口。 消息传递: 使用 zmq_send() 发送消息。 使用 zmq_recv() 接收消息。 资源清理: zmq_close() 关闭套接字。 zmq_ctx_destroy() 销毁上下文,释放相关资源。 ZEROMQ  C++ 安装 ZeroMQ C++ Bindings (cppzmq) ZeroMQ 的 C++ 绑定库 cppzmq 提供了对 ZeroMQ 的 C++ 封装。你可以通过包管理工具或者从 GitHub 下载源代码来安装它。 使用 vcpkg vcpkg install cppzmq 从 GitHub 安装 git clone https://github.com/zeromq/cppzmq.git 编写代码 下面是一个简单的示例,展示了如何在 C++ 中使用 ZeroMQ 实现请求-回复模式。我们将分别编写一个服务器(Reply Server)和一个客户端(Request Client)。 服务器端代码(Reply Server) #include #include #include int main() { // Initialize ZeroMQ context zmq::context_t context(1); // Create a REP (reply) socket zmq::socket_t socket(context, ZMQ_REP); // Bind the socket to an endpoint (address:port) socket.bind("tcp://*:5555"); while (true) { // Receive a request zmq::message_t request; socket.recv(request); std::string request_str(static_cast<char*>(request.data()), request.size()); std::cout << "Received request: " << request_str << std::endl; // Send a reply std::string reply_str = "World"; zmq::message_t reply(reply_str.size()); memcpy(reply.data(), reply_str.data(), reply_str.size()); socket.send(reply); } return 0;} 客户端代码(Request Client) #include #include #include int main() { // Initialize ZeroMQ context zmq::context_t context(1); // Create a REQ (request) socket zmq::socket_t socket(context, ZMQ_REQ); // Connect to the server (address:port) socket.connect("tcp://localhost:5555"); // Send a request std::string request_str = "Hello"; zmq::message_t request(request_str.size()); memcpy(request.data(), request_str.data(), request_str.size()); socket.send(request); // Receive a reply zmq::message_t reply; socket.recv(reply); std::string reply_str(static_cast<char*>(reply.data()), reply.size()); std::cout << "Received reply: " << reply_str << std::endl; return 0;} 解释 ZeroMQ Context: 这是 ZeroMQ 的基础对象,负责管理所有的套接字和连接。每个线程应该有一个唯一的上下文对象。 Sockets: ZeroMQ 的套接字对象用于发送和接收消息。在请求-回复模式中,客户端使用 ZMQ_REQ 套接字,服务器使用 ZMQ_REP 套接字。 Bind 和 Connect: 服务器使用 bind 将套接字绑定到一个特定的地址和端口,客户端使用 connect 连接到该地址和端口。 消息传递: 使用 send 和 recv 方法来传递消息。消息是通过 zmq::message_t 对象来表示的。 小结 使用 ZeroMQ 实现请求-响应模式可以带来显著的性能提升和灵活性。它不仅支持高性能的消息传递,还提供了丰富的特性,如自动重连、负载均衡、多语言支持等,使得它成为构建高性能、可靠的分布式系统和微服务架构的理想选择。 三、发布-订阅模式实现 发布订阅模式是典型的异步模式,通过ZeroMq来看看他的原理与实现。 简称 Pub-Sub,是一种消息传递模式。 允许发送者(发布者)和接收者(订阅者)之间解耦。 它广泛应用于消息队列、事件驱动系统和实时通知等场景。 基本原理 参与者: 发布者(Publisher):发送消息的实体。 订阅者(Subscriber):接收消息的实体。 消息代理(Message Broker):中介实体,负责接收发布者的消息并分发给相应的订阅者。 主题(Topic): 消息根据主题分类,订阅者订阅一个或多个主题,发布者将消息发布到特定主题。 消息传递: 发布者将消息发送到消息代理,并指定消息的主题。 消息代理根据主题将消息分发给所有订阅了该主题的订阅者。 工作流程 订阅(Subscribe): 订阅者向消息代理注册自己对某个主题的兴趣。 订阅者可以订阅多个主题。 发布(Publish): 发布者向消息代理发送消息,并指定消息的主题。 分发(Distribute): 消息代理接收到消息后,根据主题查找所有订阅了该主题的订阅者。 消息代理将消息分发给所有符合条件的订阅者。 举例说明 假设有一个天气预报系统: 发布者:天气预报服务 订阅者:用户手机应用、网页应用等 主题:不同城市的天气(如“北京天气”、“上海天气”) 用户 A 通过手机应用订阅了“北京天气”主题。 用户 B 通过网页应用订阅了“上海天气”主题。 天气预报服务发布了一条“北京天气”的消息到消息代理。 消息代理接收到消息后,将其分发给所有订阅了“北京天气”主题的用户应用(如用户 A 的手机应用)。 优点 解耦:发布者和订阅者不直接交互,彼此独立。 扩展性:可以方便地增加或减少订阅者,不影响发布者。 灵活性:可以动态改变订阅关系,适应不同需求。 缺点 复杂性:需要维护消息代理和订阅关系,增加系统复杂性。 可靠性:消息传递的可靠性需要额外保障,如消息丢失和重复的问题。 C代码实现天气预报系统 #include #include #include #include #include #include const char* cities[] = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const char* weather_conditions[] = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息void generate_weather_update(char* buffer, size_t buffer_size, const char* city) { int temp = rand() % 35; // 随机温度 const char* condition = weather_conditions[rand() % 10]; // 随机天气情况 snprintf(buffer, buffer_size, "%s天气 %s %d°C", city, condition, temp);} // 发布天气更新消息DWORD WINAPI publish_weather_update(LPVOID arg) { void* context = arg; const char* address = "tcp://*:5556"; // 创建发布者套接字 void* publisher = zmq_socket(context, ZMQ_PUB); zmq_bind(publisher, address); srand((unsigned)time(NULL)); while (1) { // 随机选择城市并生成天气更新 for (int i = 0; i < 10; ++i) { char update[256]; generate_weather_update(update, sizeof(update), cities[i]); zmq_send(publisher, update, strlen(update), 0); printf("发布消息: %s\n", update); } // 模拟发布间隔 Sleep(10000); // Windows 中使用 Sleep } // 关闭套接字 zmq_close(publisher); return 0;} // 订阅天气更新消息DWORD WINAPI subscribe_weather_updates(LPVOID arg) { void* context = zmq_ctx_new(); const char* city = (char*)arg; const char* address = "tcp://localhost:5556"; // 创建订阅者套接字 void* subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, address); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, city, strlen(city)); while (1) { // 接收消息 char update[256]; int size = zmq_recv(subscriber, update, 255, 0); if (size != -1) { update[size] = '\0'; printf("接收消息 (%s): %s\n", city, update); } } // 关闭套接字 zmq_close(subscriber); zmq_ctx_destroy(context); return 0;} int main() { void* context = zmq_ctx_new(); // 创建发布者线程 HANDLE publisher_thread = CreateThread(NULL, 0, publish_weather_update, context, 0, NULL); // 创建10个订阅者线程 HANDLE subscriber_threads[10]; for (int i = 0; i < 10; ++i) { subscriber_threads[i] = CreateThread(NULL, 0, subscribe_weather_updates, (LPVOID)cities[i], 0, NULL); } // 等待线程完成 WaitForSingleObject(publisher_thread, INFINITE); for (int i = 0; i < 10; ++i) { WaitForSingleObject(subscriber_threads[i], INFINITE); CloseHandle(subscriber_threads[i]); } // 关闭线程句柄 CloseHandle(publisher_thread); zmq_ctx_destroy(context); return 0;} 说明 发布者线程:publish_weather_update 函数在一个独立线程中运行,发布10个城市的天气预报消息,天气信息随机变化。 订阅者线程:subscribe_weather_updates 函数在10个独立线程中运行,每个线程订阅一个特定城市的天气预报消息。 主程序:在主程序中创建并启动发布者和10个订阅者线程,然后等待它们完成。 C++代码 #include #include #include #include #include #include #include #include const std::vector<std::string> cities = { "北京", "上海", "广州", "深圳", "杭州", "成都", "武汉", "重庆", "南京", "天津"}; const std::vector<std::string> weather_conditions = { "晴", "多云", "阴", "小雨", "中雨", "大雨", "雷阵雨", "雪", "雾", "霾"}; // 随机生成天气信息std::string generate_weather_update(const std::string& city) { int temp = rand() % 35; // 随机温度 const std::string& condition = weather_conditions[rand() % weather_conditions.size()]; // 随机天气情况 return city + "天气 " + condition + " " + std::to_string(temp) + "°C";} // 发布天气更新消息void publish_weather_update(zmq::context_t& context) { zmq::socket_t publisher(context, ZMQ_PUB); publisher.bind("tcp://*:5556"); srand(static_cast<unsigned>(time(nullptr))); while (true) { for (const auto& city : cities) { std::string update = generate_weather_update(city); zmq::message_t message(update.begin(), update.end()); publisher.send(message, zmq::send_flags::none); std::cout << "发布消息: " << update << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1)); }} // 订阅天气更新消息void subscribe_weather_updates(const std::string& city) { zmq::context_t context(1); zmq::socket_t subscriber(context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); subscriber.set(zmq::sockopt::subscribe, city); while (true) { zmq::message_t message; subscriber.recv(message, zmq::recv_flags::none); std::string update(static_cast<char*>(message.data()), message.size()); std::cout << "接收消息 (" << city << "): " << update << std::endl; }} int main() { zmq::context_t context(1); // 创建发布者线程 std::thread publisher_thread(publish_weather_update, std::ref(context)); // 创建10个订阅者线程 std::vector<std::thread> subscriber_threads; for (const auto& city : cities) { subscriber_threads.emplace_back(subscribe_weather_updates, city); } // 等待线程完成 publisher_thread.join(); for (auto& thread : subscriber_threads) { thread.join(); } return 0;} 其他场景举例 1. 金融市场数据分发 应用场景: 股票市场:股票价格、交易量等市场数据的实时分发。 外汇市场:汇率变动、交易量等数据的实时更新。 详细举例: Bloomberg 和 Reuters 等金融信息服务商使用发布-订阅模式来分发实时的市场数据给订阅者,这些订阅者可能是金融机构、投资者等。数据包括股票报价、外汇汇率、商品价格等。 交易平台:交易所使用发布-订阅系统向交易参与者推送交易数据和市场信息。 2. 日志和监控系统 应用场景: 服务器日志:实时收集和分析分布式系统的日志数据。 系统监控:监控系统健康状态、资源使用等。 详细举例: Elasticsearch, Logstash, Kibana (ELK) Stack:使用发布-订阅模式在分布式系统中收集和分析日志数据。Logstash 作为数据收集器,可以接收来自不同源的日志并发布到 Elasticsearch,Kibana 订阅这些日志并进行可视化展示。 Prometheus 和 Grafana:Prometheus 收集系统和应用的监控数据,Grafana 订阅这些数据并生成实时的监控仪表板。 3. 社交媒体和消息通知 应用场景: 消息推送:向用户实时推送消息、通知。 活动流:发布和订阅用户活动、帖子、评论等。 详细举例: Facebook 和 Twitter 等社交媒体平台使用发布-订阅模式来处理用户的状态更新、评论、点赞等事件。用户可以订阅朋友或关注的人的动态。 即时通讯应用:如 Slack 和 WhatsApp,用户接收消息的过程是通过发布-订阅模式实现的,确保消息的即时性和可靠性。 4. 物联网(IoT) 应用场景: 设备状态监控:实时监控和控制物联网设备。 传感器数据收集:从传感器收集数据并实时处理。 详细举例: 智能家居:智能家居系统中的传感器(如温度、湿度传感器)使用发布-订阅模式将数据发送到中央控制系统,用户可以通过应用程序订阅这些数据并进行监控和控制。 工业物联网:在工业环境中,机器和设备通过发布-订阅模式报告运行状态和故障信息,管理系统订阅这些信息进行实时监控和预警。 5. 分布式系统和微服务架构 应用场景: 服务通信:微服务之间的通信和协调。 事件驱动架构:基于事件的系统设计。 详细举例: Apache Kafka:广泛用于构建分布式系统和微服务架构中的事件流处理。不同的微服务可以发布和订阅事件,确保系统的松耦合和高扩展性。 Amazon Web Services (AWS) SNS:Simple Notification Service (SNS) 是一个托管的发布-订阅服务,用于将消息从一个应用程序发送到多个订阅者,常用于触发基于事件的处理流程。 6. 在线游戏 应用场景: 游戏状态更新:实时同步游戏状态和玩家操作。 聊天系统:游戏内聊天消息的实时分发。 详细举例: 多人在线游戏(MMO):如《魔兽世界》,使用发布-订阅模式在服务器和客户端之间同步游戏状态和玩家操作。游戏服务器发布玩家动作和状态,其他玩家的客户端订阅这些消息以保持同步。 在线棋牌类游戏:如《炉石传说》,游戏服务器发布牌局状态和玩家操作,玩家客户端订阅这些消息以实时更新游戏界面。 7. 新闻和内容分发 应用场景: 新闻推送:实时向用户推送新闻和内容更新。 订阅服务:用户订阅特定主题或作者的内容更新。 详细举例: RSS Feeds:使用发布-订阅模式向用户提供新闻和博客更新。用户订阅感兴趣的内容源,新的内容发布时会自动推送到用户的阅读器。 内容聚合平台:如 Flipboard 和 Feedly,用户订阅不同的内容源,平台通过发布-订阅模式实时获取和推送内容更新。 这些实际应用展示了发布-订阅模式在各种领域中的广泛应用,利用这一模式可以实现系统的解耦、扩展和实时性。 四、管道模式实现 ZeroMQ 的管道模式通过简单而高效的PUSH和PULL套接字实现了负载均衡和任务分发,适用于分布式任务处理和数据处理流水线等场景。其自动负载均衡和高可扩展性使得它成为构建高效、可扩展分布式系统的理想选择。 基础原理介绍 ZeroMQ 的管道模式是一种特殊的消息传递模式,通常用于将任务或消息从一个生产者传递到多个消费者。其主要特性是: 消息一对多传递:单个生产者可以将消息发送到多个消费者。 负载均衡:消息在消费者之间进行负载均衡,以确保每个消费者都能接收到均衡的消息量。 下游消费者处理:消费者处理消息,并且可以将处理后的消息传递给下游消费者,形成消息处理链。 在 ZeroMQ 中,管道模式由PUSH和PULL套接字类型实现: PUSH 套接字:生产者使用 PUSH 套接字发送消息。消息按照负载均衡的方式发送到连接的 PULL 套接字。 PULL 套接字:消费者使用 PULL 套接字接收消息。每个 PULL 套接字接收到的消息数量大致相等。 内部原理 PUSH 套接字 特性:PUSH 套接字负责将消息发送给一个或多个连接的 PULL 套接字。 消息分发:当有多个 PULL 套接字连接到一个 PUSH 套接字时,PUSH 套接字会按照循环(round-robin)的方式将消息分发给每个 PULL 套接字。这意味着每个消息都会发送给下一个连接的 PULL 套接字,直到轮回到第一个 PULL 套接字。 PULL 套接字 特性:PULL 套接字负责接收从 PUSH 套接字发送来的消息。 消息接收:每个 PULL 套接字只会接收发送给它的消息,不会与其他 PULL 套接字共享消息。 实现负载均衡的方法 1.循环分发(Round-Robin Dispatching): PUSH 套接字在有多个 PULL 套接字连接时,按照循环分发的方式将消息逐个发送给每个 PULL 套接字。 这种方式确保了所有连接的 PULL 套接字能够接收到大致相同数量的消息,从而实现了负载均衡。 2.消息队列: 每个 PULL 套接字维护一个接收队列,PUSH 套接字发送的消息会被分发到这些接收队列中。 PULL 套接字从其接收队列中取出消息进行处理,避免了消息丢失或重复处理的情况。 Round-Robin 原理 Round-robin 是一种简单而常用的调度算法,广泛应用于任务调度、资源分配和网络通信中。在 round-robin 调度中,资源(如 CPU 时间、网络带宽或消息)按顺序分配给各个请求者,循环地进行,确保每个请求者都能公平地获得资源。 Round-Robin 在 ZeroMQ 的应用 在 ZeroMQ 中,PUSH 套接字使用 round-robin 算法将消息分发给多个连接的 PULL 套接字。具体工作原理如下: 1.初始化: 当一个 PUSH 套接字被创建并绑定到一个端口时,它开始监听来自 PULL 套接字的连接。 当 PULL 套接字连接到 PUSH 套接字时,PUSH 套接字维护一个内部队列,记录所有连接的 PULL 套接字。 2.消息分发: 每当 PUSH 套接字发送一条消息时,它会按照 round-robin 的顺序选择下一个 PULL 套接字,将消息发送给它。 这种分发方式确保每个连接的 PULL 套接字接收到的消息数量大致相同。 3.循环: 当所有的 PULL 套接字都接收过一次消息后,PUSH 套接字重新开始,从第一个 PULL 套接字继续发送消息。 Round-Robin 调度算法的优点 公平性:Round-robin 算法确保每个连接的 PULL 套接字都能公平地接收到消息,没有任何一个 PULL 套接字会被饿死。 简单性:该算法非常简单,不需要复杂的计算和维护,只需要一个循环计数器即可实现。 负载均衡:由于消息均匀地分布到各个 PULL 套接字,系统能够实现负载均衡,防止某个 PULL 套接字过载。 Round-robin算法: #include #define WORKER_COUNT 5#define TASK_COUNT 20 int main() { int workers[WORKER_COUNT] = {0}; // 存储每个工作者处理的任务数量 int tasks[TASK_COUNT]; // 初始化任务 for (int i = 0; i < TASK_COUNT; i++) { tasks[i] = i + 1; // 每个任务用一个整数表示 } // Round-Robin 分配任务 for (int i = 0; i < TASK_COUNT; i++) { int worker_id = i % WORKER_COUNT; // 使用模运算实现循环分配 workers[worker_id]++; printf("任务 %d 分配给工作者 %d\n", tasks[i], worker_id); } // 输出每个工作者处理的任务数量 for (int i = 0; i < WORKER_COUNT; i++) { printf("工作者 %d 处理了 %d 个任务\n", i, workers[i]); } return 0;} 主要用途 管道模式在以下场景中非常有用: 任务分发:将任务从生产者分发到多个工作者进行处理。例如,分布式计算中的任务调度。 负载均衡:在多个工作者之间平衡负载,确保每个工作者处理的任务量大致相同。 数据处理流水线:将数据流从一个阶段传递到下一个阶段,每个阶段由不同的消费者处理。 优势 简单且高效:管道模式通过简单的 PUSH 和 PULL 套接字实现高效的消息传递和负载均衡。 自动负载均衡:消息在多个消费者之间自动均衡分配,无需手动干预。 高可扩展性:可以轻松增加或减少消费者,调整系统的处理能力。 灵活性:可以形成复杂的消息处理流水线,适用于多种分布式任务处理场景。 实际场景 1.ELK Stack 中的 Logstash: 用途:集中收集和处理分布式系统的日志。 工作原理:Logstash 从不同来源接收日志数据,处理后推送到 Elasticsearch。通过管道模式,多个 Logstash 实例可以同时处理日志数据,并将处理后的数据均衡分发到 Elasticsearch 节点。 2.视频处理流水线: 用途:在视频直播或视频编辑场景中,视频帧需要经过多个处理阶段,如解码、特效处理、编码等。 工作原理:视频帧通过管道模式从一个处理单元推送到下一个处理单元。每个处理单元可以并行处理不同的视频帧,最终生成处理后的视频流。 3.分布式交易系统: 用途:在金融市场中,实现高频交易和低延迟的数据传输。 工作原理:交易指令和市场数据通过管道模式在交易系统的各个节点间传递,确保数据的快速分发和处理。每个交易节点可以并行处理接收到的数据,优化交易性能和响应速度。 与发布订阅的区别 发布-订阅模式 基础原理 参与者:发布者(Publisher)和订阅者(Subscriber)。 主题:消息通过主题(Topic)进行分类,订阅者可以选择感兴趣的主题。 消息分发:发布者将消息发布到特定主题,消息代理将消息分发给所有订阅了该主题的订阅者。 主要特点 多对多:一个发布者可以有多个订阅者,多个发布者也可以有多个订阅者。 去耦合:发布者和订阅者彼此之间是解耦的,通过消息代理进行通信。 灵活的订阅机制:订阅者可以动态订阅或取消订阅主题。 应用场景 通知系统:如新闻推送、股票行情更新。 事件驱动架构:系统组件之间通过事件进行松耦合通信。 示例 消息队列系统:如 Apache Kafka、RabbitMQ 等。 实时通知系统:如 Slack、Twitter 等。 管道模式 基础原理 参与者:推送者(PUSH)和拉取者(PULL)。 消息流动:消息从推送者流向拉取者。 负载均衡:消息在拉取者之间进行负载均衡,确保每个拉取者接收到的消息量大致相同。 主要特点 一对多:一个推送者可以有多个拉取者,但一个消息只能被一个拉取者处理。 负载均衡:自动将消息均衡地分发给多个拉取者。 顺序处理:消息按照推送的顺序被处理,适用于流水线处理。 应用场景 任务分发系统:将任务分发给多个工作者进行处理。 流水线处理:如视频处理、数据处理流水线。 示例 分布式任务调度系统:如高性能计算任务调度。 日志处理系统:如 ELK Stack 中的 Logstash。 主要区别 1.消息分发机制: 发布-订阅模式:一个消息可以被多个订阅者接收。消息通过主题进行分类,订阅者可以根据兴趣选择订阅特定主题。 管道模式:一个消息只能被一个拉取者接收。消息从推送者传递到拉取者,并在拉取者之间进行负载均衡。 2.使用场景: 发布-订阅模式:适用于通知系统、事件驱动架构等需要广播消息的场景。 管道模式:适用于任务分发、流水线处理等需要消息均衡处理的场景。 3.参与者关系: 发布-订阅模式:发布者和订阅者之间没有直接联系,通过消息代理实现解耦。 管道模式:推送者和拉取者之间有直接联系,消息直接从推送者传递到拉取者。 为什么会觉得相似 两者都是消息传递模式,解决的是分布式系统中的通信问题。它们的相似之处在于: 都是基于消息的异步通信:两者都通过消息实现参与者之间的异步通信,解耦发送方和接收方。 都可以实现多对多的消息传递:尽管实现方式不同,发布-订阅模式通过主题,管道模式通过多个拉取者,都可以实现消息的多对多传递。 然而,它们在消息分发的具体机制和应用场景上有显著区别,选择哪种模式取决于具体的应用需求。 任务发放C代码 #include #include #include #include #include #include #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量int task_count[WORKER_COUNT] = { 0 };HANDLE mutex;volatile int keep_running = 1; // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数DWORD WINAPI producer_thread(LPVOID arg) { void* context = zmq_ctx_new(); void* producer = zmq_socket(context, ZMQ_PUSH); zmq_bind(producer, "tcp://*:5557"); srand((unsigned int)time(NULL)); for (int i = 0; i < TASK_COUNT; ++i) { char message[10]; int workload = rand() % 100; // 随机生成任务负载 snprintf(message, sizeof(message), "%d", workload); zmq_send(producer, message, strlen(message), 0); printf("发送任务: %s\n", message); Sleep(10); // 模拟任务生成间隔 } zmq_close(producer); zmq_ctx_destroy(context); keep_running = 0; // 停止任务处理者线程 return 0;} // 任务处理者(消费者)函数DWORD WINAPI consumer_thread(LPVOID arg) { int worker_id = (int)arg; void* context = zmq_ctx_new(); void* consumer = zmq_socket(context, ZMQ_PULL); zmq_connect(consumer, "tcp://localhost:5557"); while (keep_running || zmq_recv(consumer, NULL, 0, ZMQ_DONTWAIT) != -1) { char message[10]; int size = zmq_recv(consumer, message, sizeof(message) - 1, ZMQ_DONTWAIT); if (size != -1) { message[size] = '\0'; int workload = atoi(message); printf("Worker %d 接收任务: %s\n", worker_id, message); Sleep(workload); // 模拟处理任务 // 更新任务计数器 WaitForSingleObject(mutex, INFINITE); task_count[worker_id]++; ReleaseMutex(mutex); } } zmq_close(consumer); zmq_ctx_destroy(context); return 0;} int main() { mutex = CreateMutex(NULL, FALSE, NULL); // 创建任务发布者线程 HANDLE producer = CreateThread(NULL, 0, producer_thread, NULL, 0, NULL); // 创建 5 个任务处理者线程 HANDLE consumers[WORKER_COUNT]; for (int i = 0; i < WORKER_COUNT; ++i) { consumers[i] = CreateThread(NULL, 0, consumer_thread, (LPVOID)i, 0, NULL); } // 等待任务发布者线程完成 WaitForSingleObject(producer, INFINITE); CloseHandle(producer); // 等待所有任务处理者线程完成 WaitForMultipleObjects(WORKER_COUNT, consumers, TRUE, INFINITE); for (int i = 0; i < WORKER_COUNT; ++i) { CloseHandle(consumers[i]); } printf("#########################################################################"); // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { printf("Worker %d 处理了 %d 个任务\n", i, task_count[i]); } CloseHandle(mutex); return 0;} 代码说明 1.全局变量: task_count[WORKER_COUNT]:用于存储每个消费者处理的任务数量。 mutex:用于保护对 task_count 数组的访问,确保线程安全。 volatile int keep_running:用于控制任务处理者线程的运行。 2.任务发布者线程: 创建一个 ZeroMQ 上下文和一个 PUSH 套接字。 绑定 PUSH 套接字到 TCP 端口 5557。 生成 1000个随机任务,并通过 PUSH 套接字发送这些任务。 在完成任务发送后,设置 keep_running 为 0,通知任务处理者线程可以停止。 3.任务处理者线程: 创建一个 ZeroMQ 上下文和一个 PULL 套接字。 连接 PULL 套接字到任务发布者的地址(TCP 端口 5557)。 进入无限循环,接收任务消息并模拟处理这些任务。 处理完任务后更新任务计数器,并打印当前处理者处理的任务总数。 当 keep_running 为 0 并且没有待处理的消息时,退出循环。 4.主程序: 创建一个任务发布者线程。 创建 5 个任务处理者线程。 等待任务发布者线程完成后关闭其句柄。 等待所有任务处理者线程完成后关闭各自的句柄。 输出每个处理者处理的任务个数。 该程序将启动一个任务发布者线程和 100个任务处理者线程,发布者每隔 10 毫秒发送一个任务,任务处理者实时接收并处理这些任务。每个任务处理者线程独立运行,并且可以同时处理不同的任务,从而实现了任务的并行处理和负载均衡。程序结束时,将输出每个处理者处理的任务个数,从而可以清晰地展示负载均衡的效果。 任务发放C++ #include #include #include #include #include #include #include #include #include #define WORKER_COUNT 20#define TASK_COUNT 1000 // 全局变量,存储每个消费者处理的任务数量std::vector<int> task_count(WORKER_COUNT, 0);std::mutex mtx;std::atomic<bool> keep_running(true); // 用于控制任务处理者线程的运行 // 任务发布者(生产者)函数void producer_thread() { zmq::context_t context(1); zmq::socket_t producer(context, ZMQ_PUSH); producer.bind("tcp://*:5557"); srand(static_cast<unsigned int>(time(NULL))); for (int i = 0; i < TASK_COUNT; ++i) { int workload = rand() % 100; // 随机生成任务负载 std::string message = std::to_string(workload); zmq::message_t zmq_message(message.data(), message.size()); producer.send(zmq_message, zmq::send_flags::none); std::cout << "发送任务: " << message << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟任务生成间隔 } keep_running = false; // 停止任务处理者线程} // 任务处理者(消费者)函数void consumer_thread(int worker_id) { zmq::context_t context(1); zmq::socket_t consumer(context, ZMQ_PULL); consumer.connect("tcp://localhost:5557"); while (keep_running) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (result) { std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务 // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 短暂休眠以避免空转 } } // 当 keep_running 为 false 后,确保处理完所有剩余消息 while (true) { zmq::message_t message; zmq::recv_result_t result = consumer.recv(message, zmq::recv_flags::dontwait); if (!result) { break; // 无消息可接收时退出 } std::string message_str(static_cast<char*>(message.data()), message.size()); int workload = std::stoi(message_str); std::cout << "Worker " << worker_id << " 接收任务: " << message_str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(workload)); // 模拟处理任务 // 更新任务计数器 { std::lock_guard<std::mutex> lock(mtx); task_count[worker_id]++; } }} int main() { // 创建任务发布者线程 std::thread producer(producer_thread); // 创建 20 个任务处理者线程 std::vector<std::thread> consumers; for (int i = 0; i < WORKER_COUNT; ++i) { consumers.emplace_back(consumer_thread, i); } // 等待任务发布者线程完成 producer.join(); // 等待所有任务处理者线程完成 for (auto& consumer : consumers) { consumer.join(); } std::cout << "#########################################################################" << std::endl; // 输出每个处理者处理的任务个数 for (int i = 0; i < WORKER_COUNT; ++i) { std::cout << "Worker " << i << " 处理了 " << task_count[i] << " 个任务" << std::endl; } return 0;} 五、独占模式实现 独占PAIR模式是一种设计方法,旨在确保在并发环境中,两个线程或资源对在同一时间内独占地操作共享资源,从而避免资源竞争、数据不一致、死锁等问题。这种模式在高并发环境中显得尤为重要,特别是在需要高可靠性和稳定性的系统中,如金融交易系统、物流管理系统、在线购物平台等。 首先,本节通过一个使用Mutex实现的独占PAIR模式的示例,展示了如何通过互斥锁和条件变量来确保两个线程能够协同工作,共同对共享资源进行操作。这个示例简单而直观地解释了独占PAIR模式的基本概念和工作机制。 接下来,深入探讨了ZeroMQ在进程内的独占模式实现,特别是通过PAIR套接字模式,展示了如何在同一进程的不同线程之间建立高效、安全的双向通信通道。ZeroMQ的inproc传输方式为线程间的消息传递提供了一个高效、线程安全的解决方案,避免了传统线程同步机制的复杂性。 在实际应用方面,列举了多个场景,如金融交易系统中的交易撮合、物流系统中的任务分配、在线购物平台的订单处理、社交媒体平台的消息传递,以及微服务架构中的分布式锁管理。这些场景中的每一个都强调了独占PAIR模式的重要性,特别是在保证数据一致性和系统稳定性方面。 随后,提供了一个基于ZeroMQ的金融交易撮合系统的示例,通过C语言和C++语言的实现展示了如何利用ZeroMQ的消息队列和独占模式确保交易撮合的准确性和独占性。该示例不仅展示了基本的撮合操作,还引入了并发撮合、持久化订单数据和复杂的撮合逻辑(如部分成交和优先级撮合)等高级功能,进一步提高了系统的可扩展性和可靠性。 最后,总结了ZeroMQ独占模式的价值,特别是在高并发环境中的应用。通过限制资源的并发访问,ZeroMQ独占模式有效地避免了并发编程中的常见问题,如数据竞争和死锁,保证了系统在极端情况下的稳定性和一致性。ZeroMQ提供了多种套接字模式和线程同步机制,支持独占模式的实现,使得它在复杂的分布式系统中具有广泛的应用前景。 本节有力地展示了ZeroMQ在并发编程中的强大功能,并为如何在实际项目中应用独占PAIR模式提供了详细的指导。 基础介绍 Exclusive Pair Pattern,中文翻译为“独占PAIR模式”,是一种设计模式或技术,用于在并发编程或资源管理中确保两个资源、线程或实体在同一时间段内只能由唯一的一对(PAIR)对象使用或操作。这种模式通常用于避免资源竞争、死锁或不一致的数据状态。 关键概念: 独占性(Exclusivity): 在独占PAIR模式中,一个资源或对象在某个时刻只能被唯一的一对对象(PAIR)访问或操作,防止其他对象同时访问。 PAIR: 这里的PAIR指的是成对的两个对象或线程,它们一起对某个资源或任务进行操作,必须协同工作。在给定时间内,只有这一对对象可以对资源进行操作。 并发安全: 该模式旨在确保系统在并发情况下的安全性和稳定性,防止资源争用或数据不一致。 示例:使用Mutex实现独占PAIR模式 这个例子模拟了两个线程(Thread A和Thread B)成对地访问和操作共享资源。 #include #include #include // 共享资源int shared_resource = 0; // 互斥锁,用于保护共享资源pthread_mutex_t resource_mutex; // 条件变量,用于实现PAIR的协调pthread_cond_t pair_condition;int pair_ready = 0; // 操作共享资源的函数void* thread_function(void* arg) { int thread_id = *(int*)arg; // 加锁,确保独占访问资源 pthread_mutex_lock(&resource_mutex); // 等待另一个线程准备好(模拟PAIR模式) while (pair_ready == 0) { printf("Thread %d is waiting for its pair...\n", thread_id); pthread_cond_wait(&pair_condition, &resource_mutex); } // 执行资源操作 printf("Thread %d is working with its pair...\n", thread_id); shared_resource += thread_id; // 模拟操作 printf("Thread %d updated shared resource to %d\n", thread_id, shared_resource); // 解除PAIR状态 pair_ready = 0; pthread_cond_signal(&pair_condition); // 解锁 pthread_mutex_unlock(&resource_mutex); return NULL;} int main() { pthread_t thread_a, thread_b; int thread_id_a = 1; int thread_id_b = 2; // 初始化互斥锁和条件变量 pthread_mutex_init(&resource_mutex, NULL); pthread_cond_init(&pair_condition, NULL); // 创建线程 pthread_create(&thread_a, NULL, thread_function, &thread_id_a); pthread_create(&thread_b, NULL, thread_function, &thread_id_b); // 模拟PAIR的准备状态 pthread_mutex_lock(&resource_mutex); pair_ready = 1; pthread_cond_broadcast(&pair_condition); pthread_mutex_unlock(&resource_mutex); // 等待线程完成 pthread_join(thread_a, NULL); pthread_join(thread_b, NULL); // 销毁互斥锁和条件变量 pthread_mutex_destroy(&resource_mutex); pthread_cond_destroy(&pair_condition); return 0;} 代码解析 1.共享资源(shared_resource):这是线程操作的资源,在该例子中,它只是一个简单的整数。 2.互斥锁(pthread_mutex_t):用于保护共享资源,确保只有一对线程能够同时访问和操作资源。 3.条件变量(pthread_cond_t):用于线程之间的协调,确保在某个线程准备好之前,另一个线程必须等待。这模拟了PAIR之间的协同工作。 4.线程函数(thread_function): 线程首先锁定互斥锁,确保对共享资源的独占访问。 线程会检查PAIR是否准备好,如果没有准备好,它将等待(pthread_cond_wait),直到PAIR准备完毕。 一旦PAIR准备好,线程对共享资源进行操作。 操作完成后,解除PAIR状态,允许下一个PAIR操作资源。 5.主线程: 主线程创建了两个子线程(Thread A和Thread B)。 主线程通过设置pair_ready为1并使用pthread_cond_broadcast,模拟了PAIR的准备状态。 主线程等待子线程完成操作,然后清理互斥锁和条件变量。 ZeroMq 进程内独占 #include // 包含 ZeroMQ 库头文件,用于消息队列通信#include // 包含 Windows API 头文件,用于线程管理#include // 包含标准输入输出库头文件#include // 包含字符串操作库头文件#include // 包含 locale.h 头文件,用于设置区域语言环境 // 线程 1 的函数DWORD WINAPI thread1(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_bind(socket, "inproc://pair1"); // 绑定套接字到 "inproc://pair1" 地址 char buffer[256]; // 用于接收消息的缓冲区 while (1) { memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 2 的消息 printf("Thread 1 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 const char* msg = "Message from Thread 1"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 2,+1 包括结束符 '\0' } zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} // 线程 2 的函数DWORD WINAPI thread2(LPVOID context) { void* socket = zmq_socket(context, ZMQ_PAIR); // 创建一个 PAIR 套接字,用于双向通信 zmq_connect(socket, "inproc://pair1"); // 连接到 "inproc://pair1" 地址,链接线程 1 的套接字 char buffer[256]; // 用于接收消息的缓冲区 while (1) { const char* msg = "Message from Thread 2"; zmq_send(socket, msg, strlen(msg) + 1, 0); // 发送消息给线程 1,+1 包括结束符 '\0' memset(buffer, 0, sizeof(buffer)); // 清理缓冲区 zmq_recv(socket, buffer, 256, 0); // 接收来自线程 1 的消息 printf("Thread 2 received: %s\n", buffer); // 打印接收到的消息 Sleep(1000); // 等待 1 秒,Windows 下使用 Sleep 函数,单位是毫秒 } zmq_close(socket); // 关闭套接字(这段代码通常不会被执行到,因为 while 循环是无限的) return 0;} int main() { setlocale(LC_ALL, ""); // 设置区域语言环境 void* context = zmq_ctx_new(); // 创建一个新的 ZeroMQ 上下文,用于管理套接字和它们之间的通信 HANDLE t1, t2; // 定义两个线程句柄变量 t1 = CreateThread(NULL, 0, thread1, context, 0, NULL); // 创建线程 1,并启动执行 thread1 函数 t2 = CreateThread(NULL, 0, thread2, context, 0, NULL); // 创建线程 2,并启动执行 thread2 函数 WaitForSingleObject(t1, INFINITE); // 等待线程 1 结束(在这个例子中,线程实际上不会结束) WaitForSingleObject(t2, INFINITE); // 等待线程 2 结束(在这个例子中,线程实际上不会结束) zmq_ctx_destroy(context); // 销毁 ZeroMQ 上下文,清理资源 return 0;} 代码解析 1.头文件包含: zmq.h: 包含 ZeroMQ 库的函数和数据结构声明,用于跨线程或跨进程的消息队列通信。 windows.h: 包含 Windows API 函数的声明,用于线程管理、睡眠等操作。 stdio.h: 提供标准输入输出功能,如 printf。 string.h: 提供字符串操作功能,如 strlen 和 memcpy。 unistd.h: 用于包含 POSIX 标准库函数(如 sleep),但是在 Windows 下可以替换为 Sleep 函数。 2.线程 1 和 线程 2 函数: 两个线程分别创建一个 PAIR 套接字,线程 1 绑定到 inproc://pair1 地址,线程 2 连接到 inproc://pair1。 PAIR 套接字允许线程 1 和线程 2 之间进行双向通信。 在无限循环中,两个线程轮流发送和接收消息,并将接收到的消息打印到控制台。 3.主程序: 创建一个 ZeroMQ 上下文,用于管理套接字和它们之间的通信。 使用 Windows API CreateThread 创建两个线程,每个线程分别执行 thread1 和 thread2 函数。 使用 WaitForSingleObject 函数等待两个线程的完成。在本例中,由于线程处于无限循环中,程序实际上会一直运行。 inproc://pair1 的解释 inproc://pair1 是什么: inproc 是 ZeroMQ 的一种传输方式,用于在同一进程内的线程之间通信。inproc://pair1 是一个地址标识符,pair1 是这个地址的标记(可以任意命名)。 在 zmq_bind 函数中,线程 1 的 PAIR 套接字绑定到这个地址,使它成为服务器端。zmq_connect 函数则使线程 2 的 PAIR 套接字连接到这个地址,成为客户端。 能干什么: inproc://pair1 允许两个线程在同一个进程内通过 ZeroMQ 实现快速、高效的消息通信。这种通信是线程安全的,并且比使用其他跨进程传输协议(如 TCP 或 IPC)更加高效。 使用 inproc 可以避免线程间使用共享内存或复杂的同步机制(如信号量、互斥锁等)进行通信。 在本例中,inproc://pair1用于在线程 1 和线程 2 之间建立可靠的双向通信,利用 PAIR 套接字确保每条消息都被正确接收和发送。这使得线程可以方便地互相通信,而不需要管理复杂的同步和资源共享问题。 C++ 循环发消息 #include #include #include #include void thread_function(int id, zmq::context_t& context) { zmq::socket_t socket(context, ZMQ_PAIR); // 每个线程绑定到唯一的地址 std::string address = "inproc://pair" + std::to_string(id); socket.bind(address); // 等待所有线程连接后开始通信 std::this_thread::sleep_for(std::chrono::seconds(1)); for (int i = 0; i < 10; ++i) { // 接收来自其他线程的消息 zmq::message_t request; socket.recv(request, zmq::recv_flags::none); std::string recv_msg(static_cast<char*>(request.data()), request.size()); std::cout << "Thread " << id << " received: " << recv_msg << std::endl; // 发送消息给下一个线程 std::string next_address = "inproc://pair" + std::to_string((id + 1) % 10); zmq::socket_t next_socket(context, ZMQ_PAIR); next_socket.connect(next_address); std::string msg = "Hello from thread " + std::to_string(id); zmq::message_t reply(msg.size()); memcpy(reply.data(), msg.c_str(), msg.size()); next_socket.send(reply, zmq::send_flags::none); }} int main() { zmq::context_t context(1); std::vector<std::thread> threads; // 创建并启动 10 个线程 for (int i = 0; i < 10; ++i) { threads.push_back(std::thread(thread_function, i, std::ref(context))); } // 等待所有线程结束 for (auto& t : threads) { t.join(); } return 0;} 程序解释 1.头文件包含: iostream: 用于标准输入输出流操作。 thread: 用于创建和管理多线程。 vector: 用于存储和管理线程对象的动态数组。 zmq.hpp: ZeroMQ C++ API 的头文件,用于消息传递和套接字管理。 2.thread_function函数: 每个线程都创建了一个 ZMQ_PAIR 类型的套接字,并绑定到一个唯一的 inproc://pair 地址。 线程在开始通信前等待 1 秒,以确保所有线程都已经绑定到它们的地址。 每个线程循环 10 次,在每次循环中执行以下步骤:1.接收来自其他线程的消息,并将消息内容打印到控制台。2.生成一个要发送给下一个线程的消息,并将其发送到下一个线程的地址。3.下一个线程的地址通过 id 计算得出,确保消息按顺序传递到下一个线程。 3.main函数: 创建一个 zmq::context_t 对象来管理 ZeroMQ 的套接字和通信。 使用 std::thread 创建并启动 10 个线程,每个线程调用 thread_function,并传递它们的线程 ID 和上下文对象的引用。 主线程使用 join 等待所有子线程执行完毕,确保所有线程的生命周期在程序结束前都得到正确的管理。 程序能用在哪些业务上? 线程间通信:该程序展示了如何在同一进程内的多个线程之间使用 ZeroMQ 实现消息传递。类似的模式可以应用于任何需要线程间通信的场景,例如多线程计算任务的协调、负载平衡等。 消息路由:在一些业务场景中,多个线程或模块需要按照某种逻辑顺序传递消息,例如流水线处理、任务分配等。这种环状的通信模式可以确保消息在各个节点之间按照特定顺序流转。 分布式计算:尽管此示例仅在单进程内工作,但类似的模式可以扩展到跨进程甚至跨机器的场景。例如,在分布式系统中,多个节点之间的消息传递和协作可以使用类似的机制进行管理。 事件驱动系统:在事件驱动系统中,不同的事件处理模块可能需要依次处理消息。通过这种线程间的消息传递机制,可以确保事件按照指定的顺序被处理和传递。 应用场景 独占PAIR模式在以下场景中特别有用: 线程同步: 在多线程环境中,多个线程可能需要成对地对共享资源进行操作。例如,读写锁(read-write lock)可以看作是独占PAIR模式的一种应用,只有读锁和写锁之间能够成对操作共享资源。 数据库事务: 当两个数据库连接(PAIR)需要并行工作以完成某个复杂的事务时,独占PAIR模式可以确保在事务完成之前,其他连接无法访问相关的数据或资源,从而保持数据的一致性。 双向通信协议: 在一些双向通信协议中,发送方和接收方作为PAIR进行操作,独占资源,如信号通道或数据包,在操作完成前,其他通信不能干涉。 设备驱动程序: 在硬件设备的驱动程序中,控制信号和数据传输通常成对进行操作,独占设备的访问权,以确保数据传输的完整性。 分布式系统: 在分布式系统中,不同节点之间可能需要成对操作某个共享的资源,独占PAIR模式确保在某对操作完成之前,其他节点无法进行冲突性的操作。 具体场景 1. 金融交易系统中的交易撮合 应用场景: 在金融交易系统中,买方和卖方的订单需要进行撮合。每一笔交易撮合都需要独占地访问交易引擎的某个部分资源,以确保交易的正确性和一致性。 具体应用: 当一个买方订单和卖方订单撮合成功时,这两个订单的状态需要同时更新为“已完成”。 在订单撮合过程中,独占PAIR模式确保只有这对买卖订单在这一时刻可以操作这个交易引擎的资源,防止其他订单干扰。 这种模式避免了在高频交易情况下,由于并发导致的订单状态不一致或重复撮合问题。 2. 物流系统中的运输任务分配 应用场景: 在物流系统中,运输任务的分配需要同时考虑配送任务和可用的运输资源(如车辆、司机)。为了避免同一辆车被分配到多个不同的运输任务,系统需要确保任务和资源的独占性匹配。 具体应用: 当一个运输任务(如从仓库A到地点B的货物运输)需要分配给某辆车时,系统会使用独占PAIR模式,将该任务与该车的资源配对。 该车辆和任务配对后,系统锁定这一对资源,防止其他任务占用这辆车。 任务完成后,车辆资源释放,再进行下一次任务分配。 这样避免了由于资源竞争导致的任务分配混乱或资源浪费。 3. 在线购物平台的订单处理 应用场景: 在在线购物平台中,多个用户可能会同时购买同一件商品。在库存有限的情况下,系统需要保证每次下单操作能独占处理对应的库存数据,防止出现超卖的情况。 具体应用: 每当一个用户发起购买请求时,系统会为用户订单和库存数据配对,确保这对资源(订单和库存)在处理过程中不被其他订单干扰。 在确认支付和扣减库存的操作中,独占PAIR模式确保其他订单不能同时操作相同的库存数据,防止多名用户同时购买超出库存量的商品。 一旦订单处理完成,库存数据的锁定状态解除,其他用户的订单可以继续操作。 这种方式有效避免了超卖问题,确保用户体验和库存管理的准确性。 4. 社交媒体平台的消息传递 应用场景: 在社交媒体平台上,用户之间的私密消息需要确保传输过程的私密性和准确性,特别是当消息可能同时发送给多名接收者时。 具体应用: 当用户A向用户B发送消息时,系统会确保这条消息和用户B的接收信箱形成独占的配对关系。 在这个过程中,系统确保用户B的信箱不会同时接受来自其他来源的同一消息,防止重复发送或接收错误。 一旦消息传输成功,配对关系解除,用户B可以接收其他新的消息。 独占PAIR模式确保了消息传递的完整性和准确性,避免了多线程环境下的消息混乱或丢失。 5. 分布式锁在微服务中的使用 应用场景:在微服务架构中,多个服务实例可能同时尝试操作同一资源,如更新数据库中的某一条记录。为了防止并发更新导致的数据不一致问题,分布式锁通常被用来控制资源的独占访问。 金融交易系统中的交易撮合 ZEROMQ C语言 这个示例将展示如何使用ZeroMQ消息队列进行买卖订单的撮合,同时确保交易撮合的独占性。 这个示例包含两个主要部分: 订单生成器:生成买卖订单,并通过ZeroMQ推送到交易撮合引擎。 交易撮合引擎:接收订单并进行撮合处理,确保每一对订单的独占性。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量和价格typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price;} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price && buy_order->quantity == sell_order->quantity) { // 如果匹配成功,打印匹配信息 printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, buy_order->quantity, buy_order->price); } else { // 如果不匹配,打印未匹配的信息 printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f\n", order.order_id, order.side, order.quantity, order.price); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; match_orders(&buy_order, &sell_order); has_buy_order = 0; // 重置标记,表示买单已撮合 } else { // 如果订单无法撮合,打印提示信息 printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} // 主程序入口int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建订单撮合线程 matcher_thread = CreateThread(NULL, 0, order_matcher, context, 0, NULL); // 等待线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); WaitForSingleObject(matcher_thread, INFINITE); // 关闭线程句柄 CloseHandle(generator_thread); CloseHandle(matcher_thread); // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} 代码解析 1.订单生成器(order_generator): 使用ZeroMQ PUSH套接字将生成的订单推送到交易撮合引擎。 每个订单都包含一个唯一的订单ID、买卖方向、数量和价格。 每隔一定时间生成一个新的订单,模拟真实的交易环境。 2.交易撮合引擎(order_matcher): 使用ZeroMQ PULL套接字接收订单。 通过互斥锁(pthread_mutex_t)实现独占PAIR模式,确保每次撮合操作的并发安全性。 如果当前有买单并收到卖单,尝试撮合。如果买单价格大于或等于卖单价格且数量相等,则匹配成功。 匹配后,释放互斥锁,允许下一个订单对继续撮合。 3.主程序: 创建并启动订单生成器和交易撮合引擎线程。 主线程等待子线程完成。 扩展与优化 并发撮合:可以使用多个撮合引擎线程,以处理更高并发的订单流。 持久化订单数据:在实际系统中,订单信息和撮合结果通常需要持久化到数据库。 复杂撮合逻辑:实际的交易撮合可能包括部分成交、不同优先级的订单撮合等,可以根据需求进一步扩展。 在现有的基础上,可以进行以下扩展与优化,以实现更高效、更复杂的交易撮合系统。我们将: 并发撮合:增加多个撮合引擎线程,以处理更高并发的订单流。 持久化订单数据:模拟将订单信息和撮合结果持久化到数据库。 复杂撮合逻辑:引入部分成交和不同优先级的订单撮合逻辑。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread[4]; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); } // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); } // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); } // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} 扩展与优化解释 1.并发撮合: 增加了4个并发的撮合线程,以提高系统的吞吐量和处理能力。 每个撮合线程独立处理接收到的订单,并通过互斥锁保证撮合操作的独占性。 2.持久化订单数据: 在订单撮合完成后,系统会模拟将撮合结果“持久化”到数据库。这一部分目前通过打印日志来模拟。 实际系统中,可以将这些数据写入数据库(例如使用SQLite、MySQL等)。 3.复杂撮合逻辑: 引入了部分成交:如果买单和卖单的数量不一致,系统会撮合其中较小的部分,并保留未撮合的部分。 引入了优先级撮合:每个订单都有一个优先级(1到10之间)。当有新订单到达时,,如果其优先级高于当前待撮合订单,会优先处理高优先级订单。 #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级typedef struct { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)} Order; // 定义互斥锁,用于确保撮合操作的独占性HANDLE match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order* buy_order, Order* sell_order) { // 检查买单和卖单是否匹配 if (buy_order->price >= sell_order->price) { int matched_quantity = buy_order->quantity < sell_order->quantity ? buy_order->quantity : sell_order->quantity; printf("Matched Order ID: %d (Buy) with Order ID: %d (Sell) - Quantity: %d at Price: %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 模拟持久化撮合结果 printf("Persisting Match: Buy Order %d, Sell Order %d, Quantity %d, Price %.2f\n", buy_order->order_id, sell_order->order_id, matched_quantity, buy_order->price); // 更新剩余数量 buy_order->quantity -= matched_quantity; sell_order->quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order->quantity > 0) { printf("Remaining Buy Order ID: %d, Quantity: %d\n", buy_order->order_id, buy_order->quantity); } if (sell_order->quantity > 0) { printf("Remaining Sell Order ID: %d, Quantity: %d\n", sell_order->order_id, sell_order->quantity); } } else { printf("Order ID: %d (Buy) and Order ID: %d (Sell) not matched due to price.\n", buy_order->order_id, sell_order->order_id); }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎DWORD WINAPI order_generator(LPVOID context) { // 创建ZeroMQ PUSH套接字用于发送订单 void* socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, "tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand((unsigned int)time(NULL)); // 初始化随机数种子 while (1) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = (float)(rand() % 1000) / 10.0; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 zmq_send(socket, &order, sizeof(Order), 0); printf("Generated Order ID: %d, Side: %c, Quantity: %d, Price: %.2f, Priority: %d\n", order.order_id, order.side, order.quantity, order.price, order.priority); Sleep(100); // 模拟订单生成频率,休眠100毫秒 } zmq_close(socket); // 关闭套接字 return 0;} // 订单撮合线程函数,接收订单并进行撮合DWORD WINAPI order_matcher(LPVOID context) { // 创建ZeroMQ PULL套接字用于接收订单 void* socket = zmq_socket(context, ZMQ_PULL); zmq_bind(socket, "tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 int has_buy_order = 0; // 标记是否有待撮合的买单 while (1) { Order order; // 从ZeroMQ套接字接收订单 zmq_recv(socket, &order, sizeof(Order), 0); // 获取互斥锁,确保撮合操作的独占性 WaitForSingleObject(match_mutex, INFINITE); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = 1; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { printf("New Sell Order ID: %d has higher priority than Buy Order ID: %d\n", sell_order.order_id, buy_order.order_id); has_buy_order = 0; // 放弃当前订单,等待新撮合 } else { match_orders(&buy_order, &sell_order); has_buy_order = (buy_order.quantity > 0) ? 1 : 0; // 检查买单是否还有剩余 } } else { printf("Order %d queued for future matching.\n", order.order_id); } // 释放互斥锁,允许其他线程进行撮合操作 ReleaseMutex(match_mutex); } zmq_close(socket); // 关闭套接字 return 0;} int main() { // 创建ZeroMQ上下文 void* context = zmq_ctx_new(); // 创建互斥锁,用于保护撮合操作的独占性 match_mutex = CreateMutex(NULL, FALSE, NULL); // 定义线程句柄 HANDLE generator_thread, matcher_thread[4]; // 创建订单生成器线程 generator_thread = CreateThread(NULL, 0, order_generator, context, 0, NULL); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; i++) { matcher_thread[i] = CreateThread(NULL, 0, order_matcher, context, 0, NULL); } // 等待订单生成器线程执行完毕 WaitForSingleObject(generator_thread, INFINITE); // 等待所有撮合线程执行完毕 for (int i = 0; i < 4; i++) { WaitForSingleObject(matcher_thread[i], INFINITE); } // 关闭线程句柄 CloseHandle(generator_thread); for (int i = 0; i < 4; i++) { CloseHandle(matcher_thread[i]); } // 关闭互斥锁 CloseHandle(match_mutex); // 销毁ZeroMQ上下文 zmq_ctx_destroy(context); return 0;} ZEROMQ C++语言 #include #include #include #include #include #include #include #include #include // 定义订单结构体,包含订单ID、买卖方向、数量、价格和优先级struct Order { int order_id; char side; // 'B' 代表买单(Buy),'S' 代表卖单(Sell) int quantity; float price; int priority; // 优先级(数值越高,优先级越高)}; // 全局互斥锁,用于确保撮合操作的独占性std::mutex match_mutex; // 订单撮合函数,用于撮合买单和卖单,支持部分成交和优先级撮合void match_orders(Order& buy_order, Order& sell_order) { // 检查买单和卖单是否匹配 if (buy_order.price >= sell_order.price) { int matched_quantity = std::min(buy_order.quantity, sell_order.quantity); std::cout << "Matched Order ID: " << buy_order.order_id << " (Buy) with Order ID: " << sell_order.order_id << " (Sell) - Quantity: " << matched_quantity << " at Price: " << buy_order.price << std::endl; // 模拟持久化撮合结果 std::cout << "Persisting Match: Buy Order " << buy_order.order_id << ", Sell Order " << sell_order.order_id << ", Quantity " << matched_quantity << ", Price " << buy_order.price << std::endl; // 更新剩余数量 buy_order.quantity -= matched_quantity; sell_order.quantity -= matched_quantity; // 检查是否有剩余订单需要处理 if (buy_order.quantity > 0) { std::cout << "Remaining Buy Order ID: " << buy_order.order_id << ", Quantity: " << buy_order.quantity << std::endl; } if (sell_order.quantity > 0) { std::cout << "Remaining Sell Order ID: " << sell_order.order_id << ", Quantity: " << sell_order.quantity << std::endl; } } else { std::cout << "Order ID: " << buy_order.order_id << " (Buy) and Order ID: " << sell_order.order_id << " (Sell) not matched due to price." << std::endl; }} // 订单生成器线程函数,模拟订单的生成并发送到撮合引擎void order_generator(zmq::context_t* context) { try { // 创建ZeroMQ PUSH套接字用于发送订单 zmq::socket_t socket(*context, zmq::socket_type::push); socket.connect("tcp://localhost:5555"); // 连接到撮合引擎的PULL套接字 int order_id = 1; srand(static_cast<unsigned int>(time(NULL))); // 初始化随机数种子 while (true) { // 生成随机订单 Order order; order.order_id = order_id++; order.side = (order_id % 2 == 0) ? 'B' : 'S'; // 偶数ID为买单,奇数ID为卖单 order.quantity = rand() % 100 + 1; // 随机生成数量(1到100之间) order.price = static_cast<float>(rand() % 1000) / 10.0f; // 随机生成价格(0到99.9之间) order.priority = rand() % 10 + 1; // 随机生成优先级(1到10之间) // 发送订单到撮合引擎 socket.send(zmq::buffer(&order, sizeof(Order)), zmq::send_flags::none); std::cout << "Generated Order ID: " << order.order_id << ", Side: " << order.side << ", Quantity: " << order.quantity << ", Price: " << order.price << ", Priority: " << order.priority << std::endl; Sleep(100); // 模拟订单生成频率,休眠100毫秒 } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_generator: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_generator: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_generator" << std::endl; }} // 订单撮合线程函数,接收订单并进行撮合void order_matcher(zmq::context_t* context) { try { // 创建ZeroMQ PULL套接字用于接收订单 zmq::socket_t socket(*context, zmq::socket_type::pull); socket.bind("tcp://*:5555"); // 绑定到端口5555,等待订单到达 Order buy_order, sell_order; // 用于保存待撮合的买单和卖单 bool has_buy_order = false; // 标记是否有待撮合的买单 while (true) { Order order; // 从ZeroMQ套接字接收订单 socket.recv(zmq::buffer(&order, sizeof(Order)), zmq::recv_flags::none); // 锁定互斥锁,确保撮合操作的独占性 std::lock_guard<std::mutex> lock(match_mutex); if (order.side == 'B' && !has_buy_order) { // 如果收到的是买单且当前没有待撮合的买单,将其保存 buy_order = order; has_buy_order = true; // 设置标记,表示有待撮合的买单 } else if (order.side == 'S' && has_buy_order) { // 如果收到的是卖单且已有待撮合的买单,尝试撮合 sell_order = order; // 进行优先级比较,如果新订单优先级较高,则放弃当前订单,保存新订单 if (sell_order.priority > buy_order.priority) { std::cout << "New Sell Order ID: " << sell_order.order_id << " has higher priority than Buy Order ID: " << buy_order.order_id << std::endl; has_buy_order = false; // 放弃当前订单,等待新撮合 } else { match_orders(buy_order, sell_order); has_buy_order = (buy_order.quantity > 0); // 检查买单是否还有剩余 } } else { std::cout << "Order " << order.order_id << " queued for future matching." << std::endl; } } } catch (const zmq::error_t& e) { std::cerr << "ZMQ Error in order_matcher: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "Exception in order_matcher: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in order_matcher" << std::endl; }} int main() { try { // 创建ZeroMQ上下文 zmq::context_t context(1); // 定义线程容器 std::vector<std::thread> matcher_threads; // 创建订单生成器线程 std::thread generator_thread(order_generator, &context); // 创建多个订单撮合线程,用于并发处理 for (int i = 0; i < 4; ++i) { matcher_threads.emplace_back(order_matcher, &context); } // 等待订单生成器线程执行完毕 generator_thread.join(); // 等待所有撮合线程执行完毕 for (auto& thread : matcher_threads) { thread.join(); } } catch (const std::exception& e) { std::cerr << "Exception in main: " << e.what() << std::endl; } catch (...) { std::cerr << "Unknown error in main" << std::endl; } return 0;} 主要改动与解释 1.使用C++特性: #include代替了 #include ,使用 std::cout 和 std::endl 进行输出。 使用了 std::thread 来代替 Windows 的 CreateThread。 使用了 std::vector容器来管理多个撮合线程,避免手动管理线程句柄。 2.ZeroMQ C++ API: C++代码使用了 zmq.hpp,它是ZeroMQ的C++绑定库,比纯C语言API更具可读性和便利性。 zmq::context_t 和 zmq::socket_t 代替了C语言中的 zmq_ctx_new() 和 zmq_socket()。 3.线程管理: 使用 std::thread 来创建并管理线程,thread.join() 替代了 WaitForSingleObject。 std::vector 容器被用来存储多个撮合线程,这样代码更加整洁和易于扩展。 4.使用标准库: 使用 std::min 来替代手动的 if-else 判断更简洁地获取最小值。 小结:ZeroMQ的独占模式(Exclusive Pair Pattern)是一种设计模式,旨在确保两个通信对端(PAIR)在同一时间段内独占地访问共享资源,防止其他对端干扰。它通过限制并发访问,确保资源的安全性和一致性。在高并发场景中,独占模式有助于避免数据竞争、死锁和不一致性问题。ZeroMQ通过多种套接字模式(如PAIR、REQ/REP)和线程同步机制,支持独占模式的实现,保证消息在严格的单对单通信通道中传递,从而提高系统的稳定性和可靠性。

    11-22 90浏览
  • HAL_Delay(1000)是延时1秒吗

    我们在使用STM32F系列芯片编写程序中,经常会使用HAL库开发,它把所有我们关心的硬件都封装成了API,调调API就可以,使用方便。 比如HAL_Delay,经常用来延时一段时间。 HAL_Delay有几个注意事项,不知道你平时开发中有没有注意到。 HAL_Delay(1000)是延时1秒吗 假设我们配置的tick为1ms,那么HAL_Delay(1000)是延时1000ms吗? 其实不是延时1000ms。 实际延时是1001ms。 我们看下代码 typedef enum{ HAL_TICK_FREQ_10HZ = 100U, HAL_TICK_FREQ_100HZ = 10U, HAL_TICK_FREQ_1KHZ = 1U, HAL_TICK_FREQ_DEFAULT = HAL_TICK_FREQ_1KHZ} HAL_TickFreqTypeDef; HAL_TickFreqTypeDef uwTickFreq = HAL_TICK_FREQ_DEFAULT; /* 1KHz */ #define HAL_MAX_DELAY 0xFFFFFFFFU __weak void HAL_Delay(uint32_t Delay){ uint32_t tickstart = HAL_GetTick(); uint32_t wait = Delay; /* Add a freq to guarantee minimum wait */ if (wait < HAL_MAX_DELAY) { wait += (uint32_t)(uwTickFreq); } while ((HAL_GetTick() - tickstart) < wait) { }} 注意第21行,wait =Delay+uwTickFreq 这个条件绝大多数情况下是成立的,也就是这里多加了一个1。 因此我们传入1000,实际延时是1001。 HAL_GetTick() - tickstart) < wait会溢出吗 不会 它执行的是差模运算 假设 tickstart 是 0xFFFFFF00,然后系统继续运行了一段时间,HAL_GetTick() 达到 0x00000050,此时它发生了溢出,变为 0x00000050。 此时差值应该是: 0xFFFFFFFF-0xFFFFFF00+0x00000050+1=0x150 差模运算也是0x150: 0x00000050-0xFFFFFF00=0x150 可以在中断中调用HAL_GetTick吗 HAL_GetTick()是一种阻塞式延时,不建议在其他中断函数中使用。 中断处理一般都是要求快进快出,不能阻塞太久,这是原因之一。 HAL_GetTick()使用的systick中断通常是优先级最低的一类中断,一般是给RTOS调度做时基使用的。 如果在其他中断服务函数中调用这个函数的话,就会导致中断函数一直在等待Delay中的死循环结束。 但是由于Delay的优先级不够,因此出现了一种高优先级任务等待低优先级任务的情况。这种现象被称作优先级翻转。这是原因之二。 如果实在需要调用,建议用其他硬件timer作为定时器的时基,并提高其中断优先级 好了,今天的笔记就分享到这,谢谢大家!

    11-22 47浏览
  • 如何进行linux内核调试

    内核开发比用户空间开发更难的一个因素就是内核调试艰难。内核错误往往会导致系统宕机,很难保留出错时的现场。调试内核的关键在于你的对内核的深刻理解。 一  调试前的准备 在调试一个bug之前,我们所要做的准备工作有: 有一个被确认的bug。 包含这个bug的内核版本号,需要分析出这个bug在哪一个版本被引入,这个对于解决问题有极大的帮助。可以采用二分查找法来逐步锁定bug引入版本号。 对内核代码理解越深刻越好,同时还需要一点点运气。 该bug可以复现。如果能够找到复现规律,那么离找到问题的原因就不远了。 最小化系统。把可能产生bug的因素逐一排除掉。 二  内核中的bug 内核中的bug也是多种多样的。它们的产生有无数的原因,同时表象也变化多端。从隐藏在源代码中的错误到展现在目击者面前的bug,其发作往往是一系列连锁反应的事件才可能出发的。虽然内核调试有一定的困难,但是通过你的努力和理解,说不定你会喜欢上这样的挑战。 三  内核调试配置选项 学习编写驱动程序要构建安装自己的内核(标准主线内核)。最重要的原因之一是:内核开发者已经建立了多项用于调试的功能。但是由于这些功能会造成额外的输出,并导致能下降,因此发行版厂商通常会禁止发行版内核中的调试功能。 1  内核配置 为了实现内核调试,在内核配置上增加了几项: Kernel hacking ---> [*] Magic SysRq key [*] Kernel debugging [*] Debug slab memory allocations [*] Spinlock and rw-lock debugging: basic checks [*] Spinlock debugging: sleep-inside-spinlock checking [*] Compile the kernel with debug info Device Drivers ---> Generic Driver Options ---> [*] Driver Core verbose debug messages General setup ---> [*] Configure standard kernel features (for small systems) ---> [*] Load all symbols for debugging/ksymoops 启用选项例如: slab layer debugging(slab层调试选项) high-memory debugging(高端内存调试选项) I/O mapping debugging(I/O映射调试选项) spin-lock debugging(自旋锁调试选项) stack-overflow checking(栈溢出检查选项) sleep-inside-spinlock checking(自旋锁内睡眠选项) 2  调试原子操作 从内核2.5开发,为了检查各类由原子操作引发的问题,内核提供了极佳的工具。 内核提供了一个原子操作计数器,它可以配置成,一旦在原子操作过程中,进城进入睡眠或者做了一些可能引起睡眠的操作,就打印警告信息并提供追踪线索。 所以,包括在使用锁的时候调用schedule(),正使用锁的时候以阻塞方式请求分配内存等,各种潜在的bug都能够被探测到。 下面这些选项可以最大限度地利用该特性: CONFIG_PREEMPT = y CONFIG_DEBUG_KERNEL = y CONFIG_KLLSYMS = y CONFIG_SPINLOCK_SLEEP = y 四  引发bug并打印信息 1  BUG()和BUG_ON() 一些内核调用可以用来方便标记bug,提供断言并输出信息。最常用的两个是BUG()和BUG_ON()。 定义在中: #ifndef HAVE_ARCH_BUG #define BUG() do { printk("BUG: failure at %s:%d/%s()! ", __FILE__, __LINE__, __FUNCTION__); panic("BUG!"); /* 引发更严重的错误,不但打印错误消息,而且整个系统业会挂起 */ } while (0) #endif #ifndef HAVE_ARCH_BUG_ON #define BUG_ON(condition) do { if (unlikely(condition)) BUG(); } while(0) #endif 当调用这两个宏的时候,它们会引发OOPS,导致栈的回溯和错误消息的打印。 ※ 可以把这两个调用当作断言使用,如:BUG_ON(bad_thing); 2.WARN(x) 和 WARN_ON(x) 而WARN_ON则是调用dump_stack,打印堆栈信息,不会OOPS 定义在中: #ifndef __WARN_TAINT#ifndef __ASSEMBLY__extern void warn_slowpath_fmt(const char *file, const int line, const char *fmt, ...) __attribute__((format(printf, 3, 4)));extern void warn_slowpath_fmt_taint(const char *file, const int line, unsigned taint, const char *fmt, ...) __attribute__((format(printf, 4, 5)));extern void warn_slowpath_null(const char *file, const int line);#define WANT_WARN_ON_SLOWPATH#endif#define __WARN() warn_slowpath_null(__FILE__, __LINE__)#define __WARN_printf(arg...) warn_slowpath_fmt(__FILE__, __LINE__, arg)#define __WARN_printf_taint(taint, arg...) \ warn_slowpath_fmt_taint(__FILE__, __LINE__, taint, arg)#else#define __WARN() __WARN_TAINT(TAINT_WARN)#define __WARN_printf(arg...) do { printk(arg); __WARN(); } while (0)#define __WARN_printf_taint(taint, arg...) \ do { printk(arg); __WARN_TAINT(taint); } while (0)#endif #ifndef WARN_ON#define WARN_ON(condition) ({ \ int __ret_warn_on = !!(condition); \ if (unlikely(__ret_warn_on)) \ __WARN(); \ unlikely(__ret_warn_on); \})#endif #ifndef WARN#define WARN(condition, format...) ({ \ int __ret_warn_on = !!(condition); \ if (unlikely(__ret_warn_on)) \ __WARN_printf(format); \ unlikely(__ret_warn_on); \})#endif 3 dump_stack() 有些时候,只需要在终端上打印一下栈的回溯信息来帮助你调试。这时可以使用dump_stack()。这个函数只在终端上打印寄存器上下文和函数的跟踪线索。 if (!debug_check) { printk(KERN_DEBUG “provide some information…/n”); dump_stack(); } 五  printk() 内核提供的格式化打印函数。 1  printk函数的健壮性 健壮性是printk最容易被接受的一个特质,几乎在任何地方,任何时候内核都可以调用它(中断上下文、进程上下文、持有锁时、多处理器处理时等)。 2  printk函数脆弱之处 在系统启动过程中,终端初始化之前,在某些地方是不能调用的。如果真的需要调试系统启动过程最开始的地方,有以下方法可以使用: 使用串口调试,将调试信息输出到其他终端设备。 使用early_printk(),该函数在系统启动初期就有打印能力。但它只支持部分硬件体系。 3  LOG等级 printk和printf一个主要的区别就是前者可以指定一个LOG等级。内核根据这个等级来判断是否在终端上打印消息。内核把比指定等级高的所有消息显示在终端。 可以使用下面的方式指定一个LOG级别: printk(KERN_CRIT “Hello, world!\n”); 注意,第一个参数并不一个真正的参数,因为其中没有用于分隔级别(KERN_CRIT)和格式字符的逗号(,)。KERN_CRIT本身只是一个普通的字符串(事实上,它表示的是字符串 "";表 1 列出了完整的日志级别清单)。作为预处理程序的一部分,C 会自动地使用一个名为 字符串串联 的功能将这两个字符串组合在一起。组合的结果是将日志级别和用户指定的格式字符串包含在一个字符串中。 内核使用这个指定LOG级别与当前终端LOG等级console_loglevel来决定是不是向终端打印。 下面是可使用的LOG等级: #define KERN_EMERG "" /* system is unusable */#define KERN_ALERT "" /* action must be taken immediately */ #define KERN_CRIT "" /* critical conditions */#define KERN_ERR "" /* error conditions */#define KERN_WARNING "" /* warning conditions */#define KERN_NOTICE "" /* normal but significant condition */#define KERN_INFO "" /* informational */#define KERN_DEBUG "" /* debug-level messages */#define KERN_DEFAULT "" /* Use the default kernel loglevel */ 注意,如果调用者未将日志级别提供给 printk,那么系统就会使用默认值KERN_WARNING ""(表示只有KERN_WARNING 级别以上的日志消息会被记录)。由于默认值存在变化,所以在使用时最好指定LOG级别。有LOG级别的一个好处就是我们可以选择性的输出LOG。比如平时我们只需要打印KERN_WARNING级别以上的关键性LOG,但是调试的时候,我们可以选择打印KERN_DEBUG等以上的详细LOG。而这些都不需要我们修改代码,只需要通过命令修改默认日志输出级别: mtj@ubuntu :~$ cat /proc/sys/kernel/printk4 4 1 7mtj@ubuntu :~$ cat /proc/sys/kernel/printk_delay0mtj@ubuntu :~$ cat /proc/sys/kernel/printk_ratelimit5mtj@ubuntu :~$ cat /proc/sys/kernel/printk_ratelimit_burst10 第一项定义了 printk API 当前使用的日志级别。这些日志级别表示了控制台的日志级别、默认消息日志级别、最小控制台日志级别和默认控制台日志级别。printk_delay 值表示的是 printk 消息之间的延迟毫秒数(用于提高某些场景的可读性)。注意,这里它的值为 0,而它是不可以通过 /proc 设置的。printk_ratelimit 定义了消息之间允许的最小时间间隔(当前定义为每 5 秒内的某个内核消息数)。消息数量是由 printk_ratelimit_burst 定义的(当前定义为 10)。如果您拥有一个非正式内核而又使用有带宽限制的控制台设备(如通过串口), 那么这非常有用。注意,在内核中,速度限制是由调用者控制的,而不是在printk 中实现的。如果一个 printk 用户要求进行速度限制,那么该用户就需要调用printk_ratelimit 函数。 4  记录缓冲区 内核消息都被保存在一个LOG_BUF_LEN大小的环形队列中。 关于LOG_BUF_LEN定义: #define __LOG_BUF_LEN (1 << CONFIG_LOG_BUF_SHIFT) ※ 变量CONFIG_LOG_BUF_SHIFT在内核编译时由配置文件定义,对于i386平台,其值定义如下(在linux26/arch/i386/defconfig中): CONFIG_LOG_BUF_SHIFT=18 记录缓冲区操作: ① 消息被读出到用户空间时,此消息就会从环形队列中删除。 ② 当消息缓冲区满时,如果再有printk()调用时,新消息将覆盖队列中的老消息。 ③ 在读写环形队列时,同步问题很容易得到解决。 ※ 这个纪录缓冲区之所以称为环形,是因为它的读写都是按照环形队列的方式进行操作的。 5  syslogd/klogd 在标准的Linux系统上,用户空间的守护进程klogd从纪录缓冲区中获取内核消息,再通过syslogd守护进程把这些消息保存在系统日志文件中。klogd进程既可以从/proc/kmsg文件中,也可以通过syslog()系统调用读取这些消息。默认情况下,它选择读取/proc方式实现。klogd守护进程在消息缓冲区有新的消息之前,一直处于阻塞状态。一旦有新的内核消息,klogd被唤醒,读出内核消息并进行处理。默认情况下,处理例程就是把内核消息传给syslogd守护进程。syslogd守护进程一般把接收到的消息写入/var/log/messages文件中。不过,还是可以通过/etc/syslog.conf文件来进行配置,可以选择其他的输出文件。 6  dmesg dmesg 命令也可用于打印和控制内核环缓冲区。这个命令使用 klogctl 系统调用来读取内核环缓冲区,并将它转发到标准输出(stdout)。这个命令也可以用来清除内核环缓冲区(使用 -c 选项),设置控制台日志级别(-n 选项),以及定义用于读取内核日志消息的缓冲区大小(-s 选项)。注意,如果没有指定缓冲区大小,那么 dmesg 会使用 klogctl 的SYSLOG_ACTION_SIZE_BUFFER 操作确定缓冲区大小。 7 注意 a) 虽然printk很健壮,但是看了源码你就知道,这个函数的效率很低:做字符拷贝时一次只拷贝一个字节,且去调用console输出可能还产生中断。所以如果你的驱动在功能调试完成以后做性能测试或者发布的时候千万记得尽量减少printk输出,做到仅在出错时输出少量信息。否则往console输出无用信息影响性能。 b) printk的临时缓存printk_buf只有1K,所有一次printk函数只能记录<1K的信息到log buffer,并且printk使用的“ringbuffer”. 8 内核printk和日志系统的总体结构 9  动态调试 动态调试是通过动态的开启和禁止某些内核代码来获取额外的内核信息。 首先内核选项CONFIG_DYNAMIC_DEBUG应该被设置。所有通过pr_debug()/dev_debug()打印的信息都可以动态的显示或不显示。 可以通过简单的查询语句来筛选需要显示的信息。 -源文件名 -函数名 -行号(包括指定范围的行号) -模块名 -格式化字符串 将要打印信息的格式写入/dynamic_debug/control中。 nullarbor:~ # echo 'file svcsock.c line 1603 +p' >/dynamic_debug/control 六  内存调试工具 1  MEMWATCH MEMWATCH 由 Johan Lindh 编写,是一个开放源代码 C 语言内存错误检测工具,您可以自己下载它。只要在代码中添加一个头文件并在 gcc 语句中定义了 MEMWATCH 之后,您就可以跟踪程序中的内存泄漏和错误了。MEMWATCH 支持ANSIC,它提供结果日志纪录,能检测双重释放(double-free)、错误释放(erroneous free)、没有释放的内存(unfreedmemory)、溢出和下溢等等。 内存样本(test1.c) #include #include #include "memwatch.h"int main(void){ char *ptr1; char *ptr2; ptr1 = malloc(512); ptr2 = malloc(512); ptr2 = ptr1; free(ptr2); free(ptr1);} 内存样本(test1.c)中的代码将分配两个 512 字节的内存块,然后指向第一个内存块的指针被设定为指向第二个内存块。结果,第二个内存块的地址丢失,从而产生了内存泄漏。 现在我们编译内存样本(test1.c) 的 memwatch.c。下面是一个 makefile 示例: test1 gcc -DMEMWATCH -DMW_STDIO test1.c memwatchc -o test1 当您运行 test1 程序后,它会生成一个关于泄漏的内存的报告。下面展示了示例 memwatch.log 输出文件。 test1 memwatch.log 文件 MEMWATCH 2.67 Copyright (C) 1992-1999 Johan Lindh...double-free: <4> test1.c(15), 0x80517b4 was freed from test1.c(14)...unfreed: <2> test1.c(11), 512 bytes at 0x80519e4{FE FE FE FE FE FE FE FE FE FE FE FE ..............}Memory usage statistics (global): N)umber of allocations made: 2 L)argest memory usage : 1024 T)otal of all alloc() calls: 1024 U)nfreed bytes totals : 512 MEMWATCH 为您显示真正导致问题的行。如果您释放一个已经释放过的指针,它会告诉您。对于没有释放的内存也一样。日志结尾部分显示统计信息,包括泄漏了多少内存,使用了多少内存,以及总共分配了多少内存。 2  YAMD YAMD 软件包由 Nate Eldredge 编写,可以查找 C 和 C++ 中动态的、与内存分配有关的问题。在撰写本文时,YAMD 的最新版本为 0.32。请下载 yamd-0.32.tar.gz。执行 make 命令来构建程序;然后执行 make install 命令安装程序并设置工具。 一旦您下载了 YAMD 之后,请在 test1.c 上使用它。请删除 #include memwatch.h 并对 makefile 进行如下小小的修改: 使用 YAMD 的 test1 gcc -g test1.c -o test1 展示了来自 test1 上的 YAMD 的输出,使用 YAMD 的 test1 输出 YAMD version 0.32Executable: /usr/src/test/yamd-0.32/test1...INFO: Normal allocation of this blockAddress 0x40025e00, size 512...INFO: Normal allocation of this blockAddress 0x40028e00, size 512...INFO: Normal deallocation of this blockAddress 0x40025e00, size 512...ERROR: Multiple freeing Atfree of pointer already freedAddress 0x40025e00, size 512...WARNING: Memory leakAddress 0x40028e00, size 512WARNING: Total memory leaks:1 unfreed allocations totaling 512 bytes*** Finished at Tue ... 10:07:15 2002Allocated a grand total of 1024 bytes 2 allocationsAverage of 512 bytes per allocationMax bytes allocated at one time: 102424 K alloced internally / 12 K mapped now / 8 K maxVirtual program size is 1416 KEnd. YAMD 显示我们已经释放了内存,而且存在内存泄漏。让我们在另一个样本程序上试试 YAMD。 内存代码(test2.c) #include #include int main(void){ char *ptr1; char *ptr2; char *chptr; int i = 1; ptr1 = malloc(512); ptr2 = malloc(512); chptr = (char *)malloc(512); for (i; i <= 512; i++) { chptr[i] = 'S'; } ptr2 = ptr1; free(ptr2); free(ptr1); free(chptr);} 您可以使用下面的命令来启动 YAMD: ./run-yamd /usr/src/test/test2/test2 显示了在样本程序 test2 上使用 YAMD 得到的输出。YAMD 告诉我们在 for 循环中有“越界(out-of-bounds)”的情况,使用 YAMD 的 test2 输出 Running /usr/src/test/test2/test2Temp output to /tmp/yamd-out.1243*********./run-yamd: line 101: 1248 Segmentation fault (core dumped)YAMD version 0.32Starting run: /usr/src/test/test2/test2Executable: /usr/src/test/test2/test2Virtual program size is 1380 K...INFO: Normal allocation of this blockAddress 0x40025e00, size 512...INFO: Normal allocation of this blockAddress 0x40028e00, size 512...INFO: Normal allocation of this blockAddress 0x4002be00, size 512ERROR: Crash...Tried to write address 0x4002c000Seems to be part of this block:Address 0x4002be00, size 512...Address in question is at offset 512 (out of bounds)Will dump core after checking heap.Done. MEMWATCH 和 YAMD 都是很有用的调试工具,它们的使用方法有所不同。对于 MEMWATCH,您需要添加包含文件memwatch.h 并打开两个编译时间标记。对于链接(link)语句,YAMD 只需要 -g 选项。 3  Electric Fence 多数 Linux 分发版包含一个 Electric Fence 包,不过您也可以选择下载它。Electric Fence 是一个由 Bruce Perens 编写的malloc()调试库。它就在您分配内存后分配受保护的内存。如果存在 fencepost 错误(超过数组末尾运行),程序就会产生保护错误,并立即结束。通过结合 Electric Fence 和 gdb,您可以精确地跟踪到哪一行试图访问受保护内存。ElectricFence 的另一个功能就是能够检测内存泄漏。 七  strace strace 命令是一种强大的工具,它能够显示所有由用户空间程序发出的系统调用。strace 显示这些调用的参数并返回符号形式的值。strace 从内核接收信息,而且不需要以任何特殊的方式来构建内核。将跟踪信息发送到应用程序及内核开发者都很有用。在下面代码中,分区的一种格式有错误,显示了 strace 的开头部分,内容是关于调出创建文件系统操作(mkfs )的。strace 确定哪个调用导致问题出现。 mkfs 上 strace 的开头部分 execve("/sbin/mkfs.jfs", ["mkfs.jfs", "-f", "/dev/test1"], &...open("/dev/test1", O_RDWR|O_LARGEFILE) = 4stat64("/dev/test1", {st_mode=&, st_rdev=makedev(63, 255), ...}) = 0ioctl(4, 0x40041271, 0xbfffe128) = -1 EINVAL (Invalid argument)write(2, "mkfs.jfs: warning - cannot setb" ..., 98mkfs.jfs: warning -cannot set blocksize on block device /dev/test1: Invalid argument ) = 98stat64("/dev/test1", {st_mode=&, st_rdev=makedev(63, 255), ...}) = 0open("/dev/test1", O_RDONLY|O_LARGEFILE) = 5ioctl(5, 0x80041272, 0xbfffe124) = -1 EINVAL (Invalid argument)write(2, "mkfs.jfs: can\'t determine device"..., ..._exit(1) = ? 显示 ioctl 调用导致用来格式化分区的 mkfs 程序失败。ioctl BLKGETSIZE64 失败。( BLKGET-SIZE64 在调用 ioctl的源代码中定义。) BLKGETSIZE64 ioctl 将被添加到 Linux 中所有的设备,而在这里,逻辑卷管理器还不支持它。因此,如果BLKGETSIZE64 ioctl 调用失败,mkfs 代码将改为调用较早的 ioctl 调用;这使得 mkfs 适用于逻辑卷管理器。 八  OOPS OOPS(也称 Panic)消息包含系统错误的细节,如 CPU 寄存器的内容等。是内核告知用户有不幸发生的最常用的方式。 内核只能发布OOPS,这个过程包括向终端上输出错误消息,输出寄存器保存的信息,并输出可供跟踪的回溯线索。通常,发送完OOPS之后,内核会处于一种不稳定的状态。 OOPS的产生有很多可能原因,其中包括内存访问越界或非法的指令等。 ※ 作为内核的开发者,必定将会经常处理OOPS。 ※ OOPS中包含的重要信息,对所有体系结构的机器都是完全相同的:寄存器上下文和回溯线索(回溯线索显示了导致错误发生的函数调用链)。 1  ksymoops 在 Linux 中,调试系统崩溃的传统方法是分析在发生崩溃时发送到系统控制台的 Oops 消息。一旦您掌握了细节,就可以将消息发送到 ksymoops 实用程序,它将试图将代码转换为指令并将堆栈值映射到内核符号。 ※ 如:回溯线索中的地址,会通过ksymoops转化成名称可见的函数名。 ksymoops需要几项内容:Oops 消息输出、来自正在运行的内核的 System.map 文件,还有 /proc/ksyms、vmlinux和/proc/modules。 关于如何使用 ksymoops,内核源代码 /usr/src/linux/Documentation/oops-tracing.txt 中或 ksymoops 手册页上有完整的说明可以参考。Ksymoops 反汇编代码部分,指出发生错误的指令,并显示一个跟踪部分表明代码如何被调用。 首先,将 Oops 消息保存在一个文件中以便通过 ksymoops 实用程序运行它。下面显示了由安装 JFS 文件系统的 mount命令创建的 Oops 消息。 ksymoops 处理后的 Oops 消息 ksymoops 2.4.0 on i686 2.4.17. Options used... 15:59:37 sfb1 kernel: Unable to handle kernel NULL pointer dereference atvirtual address 0000000... 15:59:37 sfb1 kernel: c01588fc... 15:59:37 sfb1 kernel: *pde = 0000000... 15:59:37 sfb1 kernel: Oops: 0000... 15:59:37 sfb1 kernel: CPU: 0... 15:59:37 sfb1 kernel: EIP: 0010:[jfs_mount+60/704]... 15:59:37 sfb1 kernel: Call Trace: [jfs_read_super+287/688] [get_sb_bdev+563/736] [do_kern_mount+189/336] [do_add_mount+35/208][do_page_fault+0/1264]... 15:59:37 sfb1 kernel: Call Trace: []...... 15:59:37 sfb1 kernel: [

    11-22 52浏览
  • 内核同步缘起何处?

    内核同步缘起何处? 提到内核同步,这还要从操作系统的发展说起。操作系统在进程未出现之前,只是单任务在单处理器cpu上运行,只是系统资源利用率低,并不存在进程同步的问题。后来,随着操作系统的发展,多进程多任务的出现,系统资源利用率大幅度提升,但面临的问题就是进程之间抢夺资源,导致系统紊乱。因此,进程们需要通过进程通信一起坐下来聊一聊了进程同步的问题了,在linux系统中内核同步由此诞生。 实际上,内核同步的问题还是相对较复杂的,有人说,既然同步问题那么复杂,我们为什么还要去解决同步问题,简简单单不要并发不就好了吗?凡事都有两面性,我们要想获得更短的等待时间,就必须要去处理复杂的同步问题,而并发给我们带来的好处已经足够吸引我们去处理很复杂的同步问题。先提两个概念: 临界资源: 各进程采取互斥的方式,实现共享的资源称作临界资源。属于临界资源的硬件有打印机、磁带机等,软件有消息缓冲队列、变量、数组、缓冲区等。 诸进程间应采取互斥方式,实现对这种资源的共享。 临界区: 每个进程中访问临界资源的那段代码称为临界区。显然,若能保证诸进程互斥地进入自己的临界区,便可实现诸进程对临界资源的互斥访问。为此,每个进程在进入临界区之前,应先对欲访问的临界资源进行检查,看它是否正被访问。如果此刻该临界资源未被访问,进程便可进入临界区对该资源进行访问,并设置它正被访问的标志;如果此刻该临界资源正被某进程访问,则本进程不能进入临界区。 说到此处,内核同步实际上就是进程间通过一系列同步机制,并发执行程序,不但提高了资源利用率和系统吞吐量,而且进程之间不会随意抢占资源造成系统紊乱。 为了防止并发程序对我们的数据造成破坏,我们可以通过锁来保护数据,同时还要避免死锁。这里给出一些简单的规则来避免死锁的发生: 注意加锁的顺序 防止发生饥饿 不要重复请求同一个锁 设计锁力求简单 我们知道了可以用锁来保护我们的数据,但我们更需要知道,哪些数据容易被竞争,需要被保护,这就要求我们能够辨认出需要共享的数据和相应的临界区。实际上,我们需要在编写代码之前就设计好锁,所以我们需要知道内核中造成并发的原因,以便更好的识别出需要保护的数据和临界区。内核中造成并发的原因: 中断 内核抢占 睡眠 对称多处理 为了避免并发,防止竞争,内核提供了一些方法来实现对内核共享数据的保护。下面将介绍内核中的原子操作、自旋锁和信号量等同步措施。 1、内存屏障(memory-barrier) 内存屏障的作用是强制对内存的访问顺序进行排序,保证多线程或多核处理器下的内存访问的一致性和可见性。通过插入内存屏障,可以防止编译器对代码进行过度优化,也可以解决CPU乱序执行引起的问题,确保程序的执行顺序符合预期。 Linux内核提供了多种内存屏障,包括通用的内存屏障、数据依赖屏障、写屏障、读屏障、释放操作和获取操作等。 Linux内核中的内存屏障源码主要位于include/linux/compiler.h和arch/*/include/asm/barrier.h中。 include/linux/compiler.h中定义了一系列内存屏障相关的宏定义和函数,如: #define barrier() __asm__ __volatile__("": : :"memory")#define smp_mb() barrier()#define smp_rmb() barrier()#define smp_wmb() barrier()#define smp_read_barrier_depends() barrier()#define smp_store_mb(var, value) do { smp_wmb(); WRITE_ONCE(var, value); } while (0)#define smp_load_acquire(var) ({ typeof(var) ____p1 = ACCESS_ONCE(var); smp_rmb(); ____p1; })#define smp_cond_load_acquire(p, c) ({ typeof(*p) ____p1; ____p1 = ACCESS_ONCE(*p); smp_rmb(); unlikely(c) ? NULL : &____p1; }) 其中,barrier()是一个内存屏障宏定义,用于实现完整的内存屏障操作;smp_mb()、smp_rmb()、smp_wmb()分别是读屏障、写屏障、读写屏障的宏定义;smp_read_barrier_depends()是一个读屏障函数,用于增加依赖关系,确保之前的读取操作在之后的读取操作之前完成;smp_store_mb()和smp_load_acquire()分别是带屏障的存储和加载函数,用于确保存储和加载操作的顺序和一致性。 内存屏障原语 smp_mb():全局内存屏障,用于确保对共享变量的所有读写操作都已完成,并且按照正确顺序进行。 smp_rmb():读屏障,用于确保对共享变量的读取不会发生在该读取之前的其他内存访问完成之前。 smp_wmb():写屏障,用于确保对共享变量的写入不会发生在该写入之后的其他内存访问开始之前。 rmb():读内存屏障,类似于smp_rmb(),但更强制。它提供了一个更明确和可靠的方式来防止乱序执行。 wmb():写内存屏障,类似于smp_wmb(),但更强制。它提供了一个更明确和可靠的方式来防止乱序执行。 read_barrier_depends():依赖性读栅栏,用于指示编译器不应重排与此函数相关的代码顺序。 这些内存屏障原语主要用于指示编译器和处理器不要对其前后的指令进行优化重排,以确保内存操作按照程序员预期的顺序执行。 读内存屏障(如rmb())只针对后续的读取操作有效,它告诉编译器和处理器在读取之前先确保前面的所有写入操作已经完成。类似地,写内存屏障(如wmb())只针对前面的写入操作有效,它告诉编译器和处理器在写入之后再开始后续的其他内存访问。 smp_xxx()系列的内存屏障函数主要用于多核系统中,在竞态条件下保证数据同步、一致性和正确顺序执行。在单核系统中,这些函数没有实际效果。 而read_barrier_depends()函数是一个特殊情况,在某些架构上使用它可以避免编译器对代码进行过度重排。 在x86系统上,如果支持lfence汇编指令,则rmb()(读内存屏障)的实现可能会使用lfence指令。lfence指令是一种序列化指令,它会阻止该指令之前和之后的加载操作重排,并确保所有之前的加载操作已经完成。 具体而言,在x86架构上,rmb()可能被实现为: #define rmb() asm volatile("lfence" ::: "memory") 这个宏定义使用了GCC内联汇编语法,将lfence指令嵌入到代码中,通过volatile关键字告诉编译器不要对此进行优化,并且使用了"clobber memory"约束来通知编译器该指令可能会影响内存。 如果在x86系统上不支持lfence汇编指令,那么rmb()(读内存屏障)的实现可能会使用其他可用的序列化指令来达到相同的效果。一种常见的替代方案是使用mfence指令,它可以确保所有之前的内存加载操作已经完成。 在这种情况下,rmb()的实现可能如下: #define rmb() asm volatile("mfence" ::: "memory") 同样,这个宏定义使用了GCC内联汇编语法,将mfence指令嵌入到代码中,并通过"clobber memory"约束通知编译器该指令可能会影响内存。 内存一致性模型 内存一致性模型是指多个处理器或多个线程之间对共享内存的读写操作所满足的一组规则和保证。 在并发编程中,由于多个处理器或线程可以同时访问共享内存,存在着数据竞争和可见性问题。为了解决这些问题,需要定义一种内存一致性模型,以规定对共享内存的读写操作应该如何进行、如何保证读写操作的正确顺序和可见性。 常见的内存一致性模型包括: 顺序一致性(Sequential Consistency):在顺序一致性模型下,所有的读写操作对其他处理器或线程都是按照程序顺序可见的,即每个操作都会立即对所有处理器或线程可见,并且所有处理器或线程看到的操作顺序是一致的。这种模型简单直观,但在实践中由于需要强制同步和屏障,会导致性能开销较大。 弱一致性模型(Weak Consistency Models):弱一致性模型允许对共享变量的读写操作可能出现乱序或重排序,但是要求满足一定的一致性条件。常见的弱一致性模型有Release-Acquire模型、Total Store Order(TSO)模型和Partial Store Order(PSO)模型等。这些模型通过使用特定的内存屏障指令或同步原语,对读写操作进行排序和同步,以实现一定程度的一致性保证。 松散一致性模型(Relaxed Consistency Models):松散一致性模型放宽了对共享变量读写操作顺序的限制,允许更多的重排序和乱序访问。常见的松散一致性模型有Release Consistency模型、Entry Consistency模型和Processor Consistency模型等。这些模型通过定义不同的一致性保证级别和同步原语,允许更灵活的访问和重排序,从而提高系统的并发性能。 这些序列关系的正确性和顺序一致性是通过硬件层面的内存屏障指令、缓存一致性协议和处理器乱序执行的机制来实现的。 **顺序一致性(Sequential Consistency)**模型是指内核对于多个线程或进程之间的操作顺序保持与程序代码中指定的顺序一致。简单来说,它要求内核按照程序中编写的顺序执行操作,并且对所有线程和进程都呈现出一个统一的全局视图。 在这个模型下,每个线程或进程看到的操作执行顺序必须符合以下两个条件: 串行语义:每个线程或进程内部的操作必须按照原始程序中的指定顺序执行,不会发生重排序。 全局排序:所有线程和进程之间的操作必须按照某种确定的全局排序进行。 **弱一致性(Weak Consistency)**模型是指在多个线程或进程之间,对于操作执行顺序的保证比顺序一致性模型更为宽松。在这种模型下,程序无法依赖于全局的、确定性的操作执行顺序。 弱一致性模型允许发生以下情况: 重排序:线程或进程内部的操作可以被重新排序,只要最终结果对外表现一致即可。 缓存不一致:不同线程或进程之间的缓存数据可能不及时更新,导致读取到过期数据。 写入写入冲突:当两个线程同时写入相同地址时,写入结果的先后顺序可能会出现变化。 **松散一致性模型(Relaxed Consistency Models)**是与多处理器系统相关的概念。它涉及到多个处理器或核心共享内存时的数据一致性问题。 在传统的严格一致性模型下,对于所有处理器/核心来说,读写操作都必须按照全局总序列进行排序。这会带来很大的开销和限制,并可能导致性能下降。 而松散一致性模型则允许部分乱序执行和缓存一致性协议的使用,以提高并行度和性能。它引入了几种松散的一致性模型,包括: 总线事务内存:TM允许并发线程之间通过事务进行原子操作,并尽可能避免锁竞争。这样可以提高并行度和响应速度。 松散记忆顺序:此模型允许处理器乱序执行指令,并通过内存屏障等机制确保特定操作的有序性。 内存同步原语:Linux内核提供了多种同步原语(如原子操作、自旋锁、信号量等),用于控制共享数据的访问顺序和正确同步。 对于读取(Load)和写入(Store)指令,一共有四种组合: Load-Load(LL):两个读取指令之间的顺序关系。LL序列表示第一个读取操作必须在第二个读取操作之前完成才能保证顺序一致性。 Load-Store(LS):一个读取指令和一个写入指令之间的顺序关系。LS序列表示读取操作必须在写入操作之前完成才能保证顺序一致性。 Store-Load(SL):一个写入指令和一个读取指令之间的顺序关系。SL序列表示写入操作必须在读取操作之前完成才能保证顺序一致性。 Store-Store(SS):两个写入指令之间的顺序关系。SS序列表示第一个写入操作必须在第二个写入操作之前完成才能保证顺序一致性。 需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享 内存屏障的使用规则 常见的内存屏障使用场景和相应的规则: 在读写共享变量时,应该使用对应的读屏障和写屏障来确保读写操作的顺序和一致性。例如,在写入共享变量后,应该使用写屏障(smp_wmb())来确保写入操作已经完成,然后在读取共享变量前,应该使用读屏障(smp_rmb())来确保读取操作是在之前的写入操作之后进行的。 在多个 CPU 访问同一个共享变量时,应该使用内存屏障来保证正确性。例如,在使用自旋锁等同步机制时,应该在获取锁和释放锁的操作之前加上读写屏障(smp_mb())来确保操作的顺序和一致性。 在编写高并发性能代码时,可以使用内存屏障来优化代码的性能。例如,在使用无锁算法时,可以使用适当的内存屏障来减少 CPU 的缓存一致性花费,从而提高性能。 在使用内存屏障时,应该遵循先使用再优化的原则,不应该过度依赖内存屏障来解决并发问题。例如,在使用锁时,应该尽量减少锁的使用次数和持有时间,以避免竞争和饥饿等问题。 在选择内存屏障时,应该考虑所在的 CPU 架构和硬件平台,以选择最适合的屏障类型和实现方式。例如,在 x86 架构下,应该使用 lfence、sfence 和 mfence 指令来实现读屏障、写屏障和读写屏障。 内核中隐式的内存屏障 在 Linux 内核中,存在一些隐式的内存屏障,它们无需显式地在代码中添加,而是由编译器或硬件自动插入。这些隐式的内存屏障可以确保代码在不同的优化级别下保持正确性和一致性,同时提高代码的性能。 以下是一些常见的隐式内存屏障: 编译器层面的内存屏障:编译器会根据代码的语义和优化级别自动插入适当的内存屏障。例如,编译器会在函数调用前后插入内存屏障来确保函数的参数和返回值在内存中的正确性。 硬件层面的内存屏障:现代处理器中的指令重排和缓存一致性机制会自动插入内存屏障。例如,处理器会根据内存访问模式自动插入读写屏障,以确保读写操作的顺序和一致性。 内核层面的内存屏障:Linux 内核中的同步原语(如自旋锁、互斥锁)会在关键代码段中插入内存屏障,以确保多个 CPU 访问共享变量的顺序和一致性。 Linux内核有很多锁结构,这些锁结构都隐含一定的屏障操作,以确保内存的一致性和顺序性。 spin locks(自旋锁):在LOCK操作中会使用一个类似于原子交换的操作,确保只有一个线程能够获得锁。UNLOCK操作包括适当的内存屏障以保证对共享数据的修改可见。 R/W spin locks(读写自旋锁):用于实现读写并发控制。与普通自旋锁类似,在LOCK和UNLOCK操作中都包含适当的内存屏障。 mutexes(互斥锁):在LOCK操作中可能会使用比较和交换等原子指令,并且在UNLOCK操作中也会包含适当的屏障来保证同步和顺序性。 semaphores(信号量):在LOCK和UNLOCK操作中均包含适当的内存屏障来确保同步和顺序性。 R/W semaphores(读写信号量):与普通信号量类似,在LOCK和UNLOCK操作中都会有相应的内存屏障。 RCU(Read-Copy Update,读拷贝更新):RCU机制不直接使用传统意义上的锁,但它涉及到对共享数据进行访问、更新或者删除时需要进行同步操作,确保数据的一致性和顺序性。 优化屏障 在 Linux 中,"优化屏障"是通过barrier()宏定义来实现的。这个宏定义确保编译器不会对其前后的代码进行过度的优化,以保证特定的内存访问顺序和可见性。 在 GCC 编译器中,"优化屏障"的定义可以在linux-5.6.18\include\linux\compiler-gcc.h源码文件中找到。这个头文件通常包含一些与编译器相关的宏定义和内联汇编指令,用于处理一些特殊情况下需要控制代码生成和优化行为的需求。 具体而言,barrier()宏定义使用了GCC内置函数__asm__ volatile("": : :"memory")来作为一个空的内联汇编语句,并加上了"memory"约束来告知编译器,在此处需要添加一个内存屏障。 内存屏障指令 在ARM架构中,有三条内存屏障指令: DMB (Data Memory Barrier):用于确保在指令执行之前的所有数据访问操作都已完成,并且写回到内存中。 DSB (Data Synchronization Barrier):用于确保在指令执行之前的所有数据访问操作都已完成,并且其结果可见。 ISB (Instruction Synchronization Barrier):用于确保在指令执行之前的所有指令已被处理器执行完毕,并清空处理器流水线。 2、原子操作 原子操作,指的是一段不可打断的执行。也就是你不可分割的操作。 在 Linux 上提供了原子操作的结构,atomic_t : typedef struct { int counter;} atomic_t; 把整型原子操作定义为结构体,让原子函数只接收atomic_t类型的参数进而确保原子操作只与这种特殊类型数据一起使用,同时也保证了该类型的数据不会被传递给非原子函数。 初始化并定义一个原子变量: atomic_t v = ATOMIC_INIT(0); 基本调用 Linux 为原子操作提供了基本的操作宏函数: atomic_inc(v); // 原子变量自增1atomic_dec(v); // 原子变量自减1atomic_read(v) // 读取一个原子量atomic_add(int i, atomic_t *v) // 原子量增加 iatomic_sub(int i, atomic_t *v) // 原子量减少 i 这里定义的都是通用入口,真正的操作和处理器架构体系相关,这里分析 ARM 架构体系的实现: static inline void atomic_inc(atomic_t *v){ atomic_add_return(1, v);} 具体实现 对于 add 来说: #define atomic_inc_return(v) atomic_add_return(1, (v))static inline int atomic_add_return(int i, atomic_t *v){ unsigned long tmp; int result; smp_mb(); __asm__ __volatile__("@ atomic_add_return\n" "1: ldrex %0, [%3]\n" /*【1】独占方式加载v->counter到result*/ " add %0, %0, %4\n" /*【2】result加一*/ " strex %1, %0, [%3]\n" /*【3】独占方式将result值写回v->counter*/ " teq %1, #0\n" /*【4】判断strex更新内存是否成*/ " bne 1b" /*【5】不成功跳转到1:*/ : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) /*输出部*/ : "r" (&v->counter), "Ir" (i) /*输入部*/ : "cc"); /*损坏部*/ smp_mb(); return result;} 所以,这里我们需要着重分析两条汇编: LDREX 和 STREX LDREX和STREX指令,是将单纯的更新内存的原子操作分成了两个独立的步骤。 1)LDREX 用来读取内存中的值,并标记对该段内存的独占访问: LDREX Rx, [Ry] 上面的指令意味着,读取寄存器Ry指向的4字节内存值,将其保存到Rx寄存器中,同时标记对Ry指向内存区域的独占访问。如果执行LDREX指令的时候发现已经被标记为独占访问了,并不会对指令的执行产生影响。 2)STREX 在更新内存数值时,会检查该段内存是否已经被标记为独占访问,并以此来决定是否更新内存中的值: STREX Rx, Ry, [Rz] 如果执行这条指令的时候发现已经被标记为独占访问了,则将寄存器Ry中的值更新到寄存器Rz指向的内存,并将寄存器Rx设置成0。指令执行成功后,会将独占访问标记位清除。 而如果执行这条指令的时候发现没有设置独占标记,则不会更新内存,且将寄存器Rx的值设置成1。 一旦某条STREX指令执行成功后,以后再对同一段内存尝试使用STREX指令更新的时候,会发现独占标记已经被清空了,就不能再更新了,从而实现独占访问的机制。 在ARM系统中,内存有两种不同且对立的属性,即共享(Shareable)和非共享(Non-shareable)。共享意味着该段内存可以被系统中不同处理器访问到,这些处理器可以是同构的也可以是异构的。而非共享,则相反,意味着该段内存只能被系统中的一个处理器所访问到,对别的处理器来说不可见。 为了实现独占访问,ARM系统中还特别提供了所谓独占监视器(Exclusive Monitor)的东西,一共有两种类型的独占监视器。每一个处理器内部都有一个本地监视器(Local Monitor),且在整个系统范围内还有一个全局监视器(GlobalMonitor)。 如果要对非共享内存区中的值进行独占访问,只需要涉及本处理器内部的本地监视器就可以了;而如果要对共享内存区中的内存进行独占访问,除了要涉及到本处理器内部的本地监视器外,由于该内存区域可以被系统中所有处理器访问到,因此还必须要由全局监视器来协调。 对于本地监视器来说,它只标记了本处理器对某段内存的独占访问,在调用LDREX指令时设置独占访问标志,在调用STREX指令时清除独占访问标志。 而对于全局监视器来说,它可以标记每个处理器对某段内存的独占访问。也就是说,当一个处理器调用LDREX访问某段共享内存时,全局监视器只会设置针对该处理器的独占访问标记,不会影响到其它的处理器。当在以下两种情况下,会清除某个处理器的独占访问标记: 1)当该处理器调用LDREX指令,申请独占访问另一段内存时; 2)当别的处理器成功更新了该段独占访问内存值时。 对于第二种情况,也就是说,当独占内存访问内存的值在任何情况下,被任何一个处理器更改过之后,所有申请独占该段内存的处理器的独占标记都会被清空。 另外,更新内存的操作不一定非要是STREX指令,任何其它存储指令都可以。但如果不是STREX的话,则没法保证独占访问性。 现在的处理器基本上都是多核的,一个芯片上集成了多个处理器。而且对于一般的操作系统,系统内存基本上都被设置上了共享属性,也就是说对系统中所有处理器可见。因此,我们这里主要分析多核系统中对共享内存的独占访问的情况。 为了更加清楚的说明,我们可以举一个例子。假设系统中有两个处理器内核,而一个程序由三个线程组成,其中两个线程被分配到了第一个处理器上,另外一个线程被分配到了第二个处理器上。且他们的执行序列如下: 大致经历的步骤如下: 1)CPU2上的线程3最早执行LDREX,锁定某段共享内存区域。它会相应更新本地监视器和全局监视器。 2)然后,CPU1上的线程1执行LDREX,它也会更新本地监视器和全局监视器。这时在全局监视器上,CPU1和CPU2都对该段内存做了独占标记。 3)接着,CPU1上的线程2执行LDREX指令,它会发现本处理器的本地监视器对该段内存有了独占标记,同时全局监视器上CPU1也对该段内存做了独占标记,但这并不会影响这条指令的操作。 4)再下来,CPU1上的线程1最先执行了STREX指令,尝试更新该段内存的值。它会发现本地监视器对该段内存是有独占标记的,而全局监视器上CPU1也有该段内存的独占标记,则更新内存值成功。同时,清除本地监视器对该段内存的独占标记,还有全局监视器所有处理器对该段内存的独占标记。 5)下面,CPU2上的线程3执行STREX指令,也想更新该段内存值。它会发现本地监视器拥有对该段内存的独占标记,但是在全局监视器上CPU1没有了该段内存的独占标记(前面一步清空了),则更新不成功。 6)最后,CPU1上的线程2执行STREX指令,试着更新该段内存值。它会发现本地监视器已经没有了对该段内存的独占标记(第4步清除了),则直接更新失败,不需要再查全局监视器了。 所以,可以看出来,这套机制的精髓就是,无论有多少个处理器,有多少个地方会申请对同一个内存段进行操作,保证只有最早的更新可以成功,这之后的更新都会失败。失败了就证明对该段内存有访问冲突了。实际的使用中,可以重新用LDREX读取该段内存中保存的最新值,再处理一次,再尝试保存,直到成功为止。 还有一点需要说明,LDREX和STREX是对内存中的一个字(Word,32 bit)进行独占访问的指令。如果想独占访问的内存区域不是一个字,还有其它的指令: 1)LDREXB和STREXB:对内存中的一个字节(Byte,8 bit)进行独占访问; 2)LDREXH和STREXH:中的一个半字(Half Word,16 bit)进行独占访问; 3)LDREXD和STREXD:中的一个双字(Double Word,64 bit)进行独占访问。 它们必须配对使用,不能混用。 3、自旋锁 内核当发生访问资源冲突的时候,可以有两种锁的解决方案选择: 一个是原地等待 一个是挂起当前进程,调度其他进程执行(睡眠) Spinlock 是内核中提供的一种比较常见的锁机制,自旋锁是“原地等待”的方式解决资源冲突的,即,一个线程获取了一个自旋锁后,另外一个线程期望获取该自旋锁,获取不到,只能够原地“打转”(忙等待)。由于自旋锁的这个忙等待的特性,注定了它使用场景上的限制 ——自旋锁不应该被长时间的持有(消耗 CPU 资源)。 自旋锁的使用 在linux kernel的实现中,经常会遇到这样的场景:共享数据被中断上下文和进程上下文访问,该如何保护呢?如果只有进程上下文的访问,那么可以考虑使用semaphore或者mutex的锁机制,但是现在“中断上下文”也参和进来,那些可以导致睡眠的lock就不能使用了,这时候,可以考虑使用spin lock。 这里为什么把中断上下文加引号呢?因为在中断上下文,是不允许睡眠的,所以,这里需要的是一个不会导致睡眠的锁——spinlock。 换言之,中断上下文要用锁,首选 spinlock。 使用自旋锁,有两种方式定义一个锁: 动态的: spinlock_t lock;spin_lock_init (&lock); 静态的: DEFINE_SPINLOCK(lock); 自旋锁的死锁和解决 自旋锁不可递归,自己等待自己已经获取的锁,会导致死锁。 自旋锁可以在中断上下文中使用,但是试想一个场景:一个线程获取了一个锁,但是被中断处理程序打断,中断处理程序也获取了这个锁(但是之前已经被锁住了,无法获取到,只能自旋),中断无法退出,导致线程中后面释放锁的代码无法被执行,导致死锁。(如果确认中断中不会访问和线程中同一个锁,其实无所谓) 一、考虑下面的场景(内核抢占场景): (1)进程A在某个系统调用过程中访问了共享资源 R (2)进程B在某个系统调用过程中也访问了共享资源 R 会不会造成冲突呢?假设在A访问共享资源R的过程中发生了中断,中断唤醒了沉睡中的,优先级更高的B,在中断返回现场的时候,发生进程切换,B启动执行,并通过系统调用访问了R,如果没有锁保护,则会出现两个thread进入临界区,导致程序执行不正确。OK,我们加上spin lock看看如何:A在进入临界区之前获取了spin lock,同样的,在A访问共享资源R的过程中发生了中断,中断唤醒了沉睡中的,优先级更高的B,B在访问临界区之前仍然会试图获取spin lock,这时候由于A进程持有spin lock而导致B进程进入了永久的spin……怎么破?linux的kernel很简单,在A进程获取spin lock的时候,禁止本CPU上的抢占(上面的永久spin的场合仅仅在本CPU的进程抢占本CPU的当前进程这样的场景中发生)。如果A和B运行在不同的CPU上,那么情况会简单一些:A进程虽然持有spin lock而导致B进程进入spin状态,不过由于运行在不同的CPU上,A进程会持续执行并会很快释放spin lock,解除B进程的spin状态 二、再考虑下面的场景(中断上下文场景): (1)运行在CPU0上的进程A在某个系统调用过程中访问了共享资源 R (2)运行在CPU1上的进程B在某个系统调用过程中也访问了共享资源 R (3)外设P的中断handler中也会访问共享资源 R 在这样的场景下,使用spin lock可以保护访问共享资源R的临界区吗?我们假设CPU0上的进程A持有spin lock进入临界区,这时候,外设P发生了中断事件,并且调度到了CPU1上执行,看起来没有什么问题,执行在CPU1上的handler会稍微等待一会CPU0上的进程A,等它立刻临界区就会释放spin lock的,但是,如果外设P的中断事件被调度到了CPU0上执行会怎么样?CPU0上的进程A在持有spin lock的状态下被中断上下文抢占,而抢占它的CPU0上的handler在进入临界区之前仍然会试图获取spin lock,悲剧发生了,CPU0上的P外设的中断handler永远的进入spin状态,这时候,CPU1上的进程B也不可避免在试图持有spin lock的时候失败而导致进入spin状态。为了解决这样的问题,linux kernel采用了这样的办法:如果涉及到中断上下文的访问,spin lock需要和禁止本 CPU 上的中断联合使用。 三、再考虑下面的场景(底半部场景) linux kernel中提供了丰富的bottom half的机制,虽然同属中断上下文,不过还是稍有不同。我们可以把上面的场景简单修改一下:外设P不是中断handler中访问共享资源R,而是在的bottom half中访问。使用spin lock+禁止本地中断当然是可以达到保护共享资源的效果,但是使用牛刀来杀鸡似乎有点小题大做,这时候disable bottom half就OK了 四、中断上下文之间的竞争 同一种中断handler之间在uni core和multi core上都不会并行执行,这是linux kernel的特性。 如果不同中断handler需要使用spin lock保护共享资源,对于新的内核(不区分fast handler和slow handler),所有handler都是关闭中断的,因此使用spin lock不需要关闭中断的配合。 bottom half又分成softirq和tasklet,同一种softirq会在不同的CPU上并发执行,因此如果某个驱动中的softirq的handler中会访问某个全局变量,对该全局变量是需要使用spin lock保护的,不用配合disable CPU中断或者bottom half。 tasklet更简单,因为同一种tasklet不会多个CPU上并发。 自旋锁的实现 1、文件整理 和体系结构无关的代码如下: (1) include/linux/spinlock_types.h 这个头文件定义了通用spin lock的基本的数据结构(例如spinlock_t)和如何初始化的接口(DEFINE_SPINLOCK)。这里的“通用”是指不论SMP还是UP都通用的那些定义。 (2)include/linux/spinlock_types_up.h 这个头文件不应该直接include,在include/linux/spinlock_types.h文件会根据系统的配置(是否SMP)include相关的头文件,如果UP则会include该头文件。这个头文定义UP系统中和spin lock的基本的数据结构和如何初始化的接口。当然,对于non-debug版本而言,大部分struct都是empty的。 (3)include/linux/spinlock.h 这个头文件定义了通用spin lock的接口函数声明,例如spin_lock、spin_unlock等,使用spin lock模块接口API的驱动模块或者其他内核模块都需要include这个头文件。 (4)include/linux/spinlock_up.h 这个头文件不应该直接include,在include/linux/spinlock.h文件会根据系统的配置(是否SMP)include相关的头文件。这个头文件是debug版本的spin lock需要的。 (5)include/linux/spinlock_api_up.h 同上,只不过这个头文件是non-debug版本的spin lock需要的 (6)linux/spinlock_api_smp.h SMP上的spin lock模块的接口声明 (7)kernel/locking/spinlock.c SMP上的spin lock实现。 对UP和SMP上spin lock头文件进行整理: UP需要的头文件 SMP需要的头文件 linux/spinlock_type_up.h: linux/spinlock_types.h: linux/spinlock_up.h: linux/spinlock_api_up.h: linux/spinlock.h asm/spinlock_types.h linux/spinlock_types.h: asm/spinlock.h linux/spinlock_api_smp.h: linux/spinlock.h 2、数据结构 首先定义一个 spinlock_t 的数据类型,其本质上是一个整数值(对该数值的操作需要保证原子性),该数值表示spin lock是否可用。初始化的时候被设定为1。当thread想要持有锁的时候调用spin_lock函数,该函数将spin lock那个整数值减去1,然后进行判断,如果等于0,表示可以获取spin lock,如果是负数,则说明其他thread的持有该锁,本thread需要spin。 内核中的spinlock_t的数据类型定义如下: typedef struct spinlock { struct raw_spinlock rlock; } spinlock_t; typedef struct raw_spinlock { arch_spinlock_t raw_lock; } raw_spinlock_t; 通用(适用于各种arch)的spin lock使用spinlock_t这样的type name,各种arch定义自己的struct raw_spinlock。听起来不错的主意和命名方式,直到linux realtime tree(PREEMPT_RT)提出对spinlock的挑战。real time linux是一个试图将linux kernel增加硬实时性能的一个分支(你知道的,linux kernel mainline只是支持soft realtime),多年来,很多来自realtime branch的特性被merge到了mainline上,例如:高精度timer、中断线程化等等。realtime tree希望可以对现存的spinlock进行分类:一种是在realtime kernel中可以睡眠的spinlock,另外一种就是在任何情况下都不可以睡眠的spinlock。分类很清楚但是如何起名字?起名字绝对是个技术活,起得好了事半功倍,可维护性好,什么文档啊、注释啊都素那浮云,阅读代码就是享受,如沐春风。起得不好,注定被后人唾弃,或者拖出来吊打(这让我想起给我儿子起名字的那段不堪回首的岁月……)。最终,spin lock的命名规范定义如下: (1)spinlock,在rt linux(配置了PREEMPT_RT)的时候可能会被抢占(实际底层可能是使用支持PI(优先级翻转)的mutext)。 (2)raw_spinlock,即便是配置了PREEMPT_RT也要顽强的spin (3)arch_spinlock,spin lock是和architecture相关的,arch_spinlock是architecture相关的实现 对于UP平台,所有的arch_spinlock_t都是一样的,定义如下: typedef struct { } arch_spinlock_t; 什么都没有,一切都是空啊。当然,这也符合前面的分析,对于UP,即便是打开的preempt选项,所谓的spin lock也不过就是disable preempt而已,不需定义什么spin lock的变量。 对于SMP平台,这和arch相关,我们在下面描述。 在具体的实现面,我们不可能把每一个接口函数的代码都呈现出来,我们选择最基础的spin_lock为例子,其他的读者可以自己阅读代码来理解。 spin_lock的代码如下: static inline void spin_lock(spinlock_t *lock) { raw_spin_lock(&lock->rlock); } 当然,在linux mainline代码中,spin_lock和raw_spin_lock是一样的,在这里重点看看raw_spin_lock,代码如下: #define raw_spin_lock(lock) _raw_spin_lock(lock) UP中的实现: #define _raw_spin_lock(lock) __LOCK(lock) #define __LOCK(lock) \ do { preempt_disable(); ___LOCK(lock); } while (0) SMP的实现: void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) { __raw_spin_lock(lock); } static inline void __raw_spin_lock(raw_spinlock_t *lock) { preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); } UP中很简单,本质上就是一个preempt_disable而已,SMP中稍显复杂,preempt_disable当然也是必须的,spin_acquire可以略过,这是和运行时检查锁的有效性有关的,如果没有定义CONFIG_LOCKDEP其实就是空函数。如果没有定义CONFIG_LOCK_STAT(和锁的统计信息相关),LOCK_CONTENDED就是调用 do_raw_spin_lock 而已,如果没有定义CONFIG_DEBUG_SPINLOCK,它的代码如下: static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock) { __acquire(lock); arch_spin_lock(&lock->raw_lock); } __acquire和静态代码检查相关,忽略之,最终实际的获取spin lock还是要靠arch相关的代码实现。 针对 ARM 平台的 arch_spin_lock 代码位于arch/arm/include/asm/spinlock.h和spinlock_type.h,和通用代码类似,spinlock_type.h定义ARM相关的spin lock定义以及初始化相关的宏;spinlock.h中包括了各种具体的实现。 1. 回到2.6.23版本的内核中 和arm平台相关spin lock数据结构的定义如下(那时候还是使用raw_spinlock_t而不是arch_spinlock_t): typedef struct { volatile unsigned int lock; } raw_spinlock_t; 一个整数就OK了,0表示unlocked,1表示locked。配套的API包括__raw_spin_lock和__raw_spin_unlock。__raw_spin_lock会持续判断lock的值是否等于0,如果不等于0(locked)那么其他thread已经持有该锁,本thread就不断的spin,判断lock的数值,一直等到该值等于0为止,一旦探测到lock等于0,那么就设定该值为1,表示本thread持有该锁了,当然,这些操作要保证原子性,细节和exclusive版本的ldr和str(即ldrex和strexeq)相关,这里略过。立刻临界区后,持锁thread会调用__raw_spin_unlock函数是否spin lock,其实就是把0这个数值赋给lock。 这个版本的spin lock的实现当然可以实现功能,而且在没有冲突的时候表现出不错的性能,不过存在一个问题:不公平。也就是所有的thread都是在无序的争抢spin lock,谁先抢到谁先得,不管thread等了很久还是刚刚开始spin。在冲突比较少的情况下,不公平不会体现的特别明显,然而,随着硬件的发展,多核处理器的数目越来越多,多核之间的冲突越来越剧烈,无序竞争的spinlock带来的performance issue终于浮现出来,根据Nick Piggin的描述: On an 8 core (2 socket) Opteron, spinlock unfairness is extremely noticable, with a userspace test having a difference of up to 2x runtime per thread, and some threads are starved or "unfairly" granted the lock up to 1 000 000 (!) times. 多么的不公平,有些可怜的thread需要饥饿的等待1000000次。本质上无序竞争从概率论的角度看应该是均匀分布的,不过由于硬件特性导致这么严重的不公平,我们来看一看硬件block: lock本质上是保存在main memory中的,由于cache的存在,当然不需要每次都有访问main memory。在多核架构下,每个CPU都有自己的L1 cache,保存了lock的数据。假设CPU0获取了spin lock,那么执行完临界区,在释放锁的时候会调用smp_mb invalide其他忙等待的CPU的L1 cache,这样后果就是释放spin lock的那个cpu可以更快的访问L1cache,操作lock数据,从而大大增加的下一次获取该spin lock的机会。 2、回到现在:arch_spinlock_t ARM平台中的arch_spinlock_t定义如下(little endian): typedef struct { union { u32 slock; struct __raw_tickets { u16 owner; u16 next; } tickets; }; } arch_spinlock_t; 本来以为一个简单的整数类型的变量就搞定的spin lock看起来没有那么简单,要理解这个数据结构,需要了解一些ticket-based spin lock的概念。如果你有机会去九毛九去排队吃饭(声明:不是九毛九的饭托,仅仅是喜欢面食而常去吃而已)就会理解ticket-based spin lock。大概是因为便宜,每次去九毛九总是无法长驱直入,门口的笑容可掬的靓女会给一个ticket,上面写着15号,同时会告诉你,当前状态是10号已经入席,11号在等待。 回到arch_spinlock_t,这里的owner就是当前已经入席的那个号码,next记录的是下一个要分发的号码。下面的描述使用普通的计算机语言和在九毛九就餐(假设九毛九只有一张餐桌)的例子来进行描述,估计可以让吃货更有兴趣阅读下去。最开始的时候,slock被赋值为0,也就是说owner和next都是0,owner和next相等,表示unlocked。当第一个个thread调用spin_lock来申请lock(第一个人就餐)的时候,owner和next相等,表示unlocked,这时候该thread持有该spin lock(可以拥有九毛九的唯一的那个餐桌),并且执行next++,也就是将next设定为1(再来人就分配1这个号码让他等待就餐)。也许该thread执行很快(吃饭吃的快),没有其他thread来竞争就调用spin_unlock了(无人等待就餐,生意惨淡啊),这时候执行owner++,也就是将owner设定为1(表示当前持有1这个号码牌的人可以就餐)。姗姗来迟的1号获得了直接就餐的机会,next++之后等于2。1号这个家伙吃饭巨慢,这是不文明现象(thread不能持有spin lock太久),但是存在。又来一个人就餐,分配当前next值的号码2,当然也会执行next++,以便下一个人或者3的号码牌。持续来人就会分配3、4、5、6这些号码牌,next值不断的增加,但是owner岿然不动,直到欠扁的1号吃饭完毕(调用spin_unlock),释放饭桌这个唯一资源,owner++之后等于2,表示持有2那个号码牌的人可以进入就餐了。 3、ARM 结构体系 arch_spin_lock 接口实现 3.1 加锁 同样的,这里也只是选择一个典型的API来分析,其他的大家可以自行学习。我们选择的是 arch_spin_lock,其ARM32的代码如下: static inline void arch_spin_lock(arch_spinlock_t *lock) { unsigned long tmp; u32 newval; arch_spinlock_t lockval; prefetchw(&lock->slock);------------------------(0) __asm__ __volatile__( "1: ldrex %0, [%3]\n"-------------------------(1) " add %1, %0, %4\n" --------------------------(2)" strex %2, %1, [%3]\n"------------------------(3) " teq %2, #0\n"----------------------------(4) " bne 1b" : "=&r" (lockval), "=&r" (newval), "=&r" (tmp) : "r" (&lock->slock), "I" (1 << TICKET_SHIFT) : "cc"); while (lockval.tickets.next != lockval.tickets.owner) {-------(5) wfe();--------------------------------(6) lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);----(7) } smp_mb();---------------------------------(8) } (0)和preloading cache相关的操作,主要是为了性能考虑 (1)lockval = lock->slock (如果lock->slock没有被其他处理器独占,则标记当前执行处理器对lock->slock地址的独占访问;否则不影响) (2)newval = lockval + (1 << TICKET_SHIFT) (3)strex tmp, newval, [&lock->slock] (如果当前执行处理器没有独占lock->slock地址的访问,不进行存储,返回1给temp;如果当前处理器已经独占lock->slock内存访问,则对内存进行写,返回0给temp,清除独占标记) lock->tickets.next = lock->tickets.next + 1 (4)检查是否写入成功 lockval.tickets.next (5)初始化时lock->tickets.owner、lock->tickets.next都为0,假设第一次执行arch_spin_lock,lockval = *lock,lock->tickets.next++,lockval.tickets.next 等于 lockval.tickets.owner,获取到自旋锁;自旋锁未释放,第二次执行的时候,lock->tickets.owner = 0, lock->tickets.next = 1,拷贝到lockval后,lockval.tickets.next != lockval.tickets.owner,会执行wfe等待被自旋锁释放被唤醒,自旋锁释放时会执行 lock->tickets.owner++,lockval.tickets.owner重新赋值 (6)暂时中断挂起执行。如果当前spin lock的状态是locked,那么调用wfe进入等待状态。 (7)其他的CPU唤醒了本cpu的执行,说明owner发生了变化,该新的own赋给lockval,然后继续判断spin lock的状态,也就是回到step 5。 (8)memory barrier的操作,具体的操作后面描述。 3.1 释放锁 static inline void arch_spin_unlock(arch_spinlock_t *lock){ smp_mb(); lock->tickets.owner++; ---------------------- (0) dsb_sev(); ---------------------------------- (1)} (0)lock->tickets.owner增加1,下一个被唤醒的处理器会检查该值是否与自己的lockval.tickets.next相等,lock->tickets.owner代表可以获取的自旋锁的处理器,lock->tickets.next你一个可以获取的自旋锁的owner;处理器获取自旋锁时,会先读取lock->tickets.next用于与lock->tickets.owner比较并且对lock->tickets.next加1,下一个处理器获取到的lock->tickets.next就与当前处理器不一致了,两个处理器都与lock->tickets.owner比较,肯定只有一个处理器会相等,自旋锁释放时时对lock->tickets.owner加1计算,因此,先申请自旋锁多处理器lock->tickets.next值更新,自然先获取到自旋锁 (1)执行sev指令,唤醒wfe等待的处理器 自旋锁的变体 接口API的类型 spinlock中的定义 raw_spinlock的定义 定义spin lock并初始化 DEFINE_SPINLOCK DEFINE_RAW_SPINLOCK 动态初始化spin lock spin_lock_init raw_spin_lock_init 获取指定的spin lock spin_lock raw_spin_lock 获取指定的spin lock同时disable本CPU中断 spin_lock_irq raw_spin_lock_irq 保存本CPU当前的irq状态,disable本CPU中断并获取指定的spin lock spin_lock_irqsave raw_spin_lock_irqsave 获取指定的spin lock同时disable本CPU的bottom half spin_lock_bh raw_spin_lock_bh 释放指定的spin lock spin_unlock raw_spin_unlock 释放指定的spin lock同时enable本CPU中断 spin_unlock_irq raw_spin_unock_irq 释放指定的spin lock同时恢复本CPU的中断状态 spin_unlock_irqstore raw_spin_unlock_irqstore 获取指定的spin lock同时enable本CPU的bottom half spin_unlock_bh raw_spin_unlock_bh 尝试去获取spin lock,如果失败,不会spin,而是返回非零值 spin_trylock raw_spin_trylock 判断spin lock是否是locked,如果其他的thread已经获取了该lock,那么返回非零值,否则返回0 spin_is_locked raw_spin_is_locked static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock){ unsigned long flags; local_irq_save(flags); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); /* * On lockdep we dont want the hand-coded irq-enable of * do_raw_spin_lock_flags() code, because lockdep assumes * that interrupts are not re-enabled during lock-acquire: */#ifdef CONFIG_LOCKDEP LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);#else do_raw_spin_lock_flags(lock, &flags);#endif return flags;} static inline void __raw_spin_lock_irq(raw_spinlock_t *lock){ local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);} static inline void __raw_spin_lock_bh(raw_spinlock_t *lock){ local_bh_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);} static inline void __raw_spin_lock(raw_spinlock_t *lock){ preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);} #endif /* CONFIG_PREEMPT */ static inline void __raw_spin_unlock(raw_spinlock_t *lock){ spin_release(&lock->dep_map, 1, _RET_IP_); do_raw_spin_unlock(lock); preempt_enable();} static inline void __raw_spin_unlock_irqrestore(raw_spinlock_t *lock, unsigned long flags){ spin_release(&lock->dep_map, 1, _RET_IP_); do_raw_spin_unlock(lock); local_irq_restore(flags); preempt_enable();} static inline void __raw_spin_unlock_irq(raw_spinlock_t *lock){ spin_release(&lock->dep_map, 1, _RET_IP_); do_raw_spin_unlock(lock); local_irq_enable(); preempt_enable();} static inline void __raw_spin_unlock_bh(raw_spinlock_t *lock){ spin_release(&lock->dep_map, 1, _RET_IP_); do_raw_spin_unlock(lock); preempt_enable_no_resched(); local_bh_enable_ip((unsigned long)__builtin_return_address(0));} static inline int __raw_spin_trylock_bh(raw_spinlock_t *lock){ local_bh_disable(); preempt_disable(); if (do_raw_spin_trylock(lock)) { spin_acquire(&lock->dep_map, 0, 1, _RET_IP_); return 1; } preempt_enable_no_resched(); local_bh_enable_ip((unsigned long)__builtin_return_address(0)); return 0;} 小结 spin_lock 的时候,禁止内核抢占 如果涉及到中断上下文的访问,spin lock需要和禁止本 CPU 上的中断联合使用(spin_lock_irqsave / spin_unlock_irqstore) 涉及 half bottom 使用:spin_lock_bh / spin_unlock_bh 4、读-写自旋锁(rwlock) 试想这样一种场景:一个内核链表元素,很多进程(或者线程)都会对其进行读写,但是使用 spinlock 的话,多个读之间无法并发,只能被 spin,为了提高系统的整体性能,内核定义了一种锁: 1. 允许多个处理器进程(或者线程或者中断上下文)并发的进行读操作(SMP 上),这样是安全的,并且提高了 SMP 系统的性能。 2. 在写的时候,保证临界区的完全互斥 所以,当某种内核数据结构被分为:读-写,或者生产-消费,这种类型的时候,类似这种 读-写自旋锁就起到了作用。对读者是共享的,对写者完全互斥。 读/写自旋锁是在保护SMP体系下的共享数据结构而引入的,它的引入是为了增加内核的并发能力。只要内核控制路径没有对数据结构进行修改,读/写自旋锁就允许多个内核控制路径同时读同一数据结构。如果一个内核控制路径想对这个结构进行写操作,那么它必须首先获取读/写锁的写锁,写锁授权独占访问这个资源。这样设计的目的,即允许对数据结构并发读可以提高系统性能。 加锁的逻辑: (1)假设临界区内没有任何的thread,这时候任何read thread或者write thread可以进入,但是只能是其一。 (2)假设临界区内有一个read thread,这时候新来的read thread可以任意进入,但是write thread不可以进入 (3)假设临界区内有一个write thread,这时候任何的read thread或者write thread都不可以进入 (4)假设临界区内有一个或者多个read thread,write thread当然不可以进入临界区,但是该write thread也无法阻止后续read thread的进入,他要一直等到临界区一个read thread也没有的时候,才可以进入。 解锁的逻辑: (1)在write thread离开临界区的时候,由于write thread是排他的,因此临界区有且只有一个write thread,这时候,如果write thread执行unlock操作,释放掉锁,那些处于spin的各个thread(read或者write)可以竞争上岗。 (2)在read thread离开临界区的时候,需要根据情况来决定是否让其他处于spin的write thread们参与竞争。如果临界区仍然有read thread,那么write thread还是需要spin(注意:这时候read thread可以进入临界区,听起来也是不公平的)直到所有的read thread释放锁(离开临界区),这时候write thread们可以参与到临界区的竞争中,如果获取到锁,那么该write thread可以进入。 读-写自旋锁的使用 与 spinlock 的使用方式几乎一致,读-写自旋锁初始化方式也分为两种: 动态的: rwlock_t rw_lock;rwlock_init (&rw_lock); 静态的: DEFINE_RWLOCK(rwlock); 初始化完成后就可以使用读-写自旋锁了,内核提供了一组 APIs 来操作读写自旋锁,最简单的比如: 读临界区: rwlock_t rw_lock;rwlock_init (&rw_lock); read_lock(rw_lock);------------- 读临界区 -------------read_unlock(rw_lock); 写临界区: rwlock_t rw_lock;rwlock_init (&rw_lock); write_lock(rw_lock);------------- 写临界区 -------------write_unlock(rw_lock); 注意:读锁和写锁会位于完全分开的代码中,若是: read_lock(lock);write_lock(lock); 这样会导致死锁,因为读写锁的本质还是自旋锁。写锁不断的等待读锁的释放,导致死锁。如果读-写不能清晰的分开的话,使用一般的自旋锁,就别使用读写锁了。 注意:由于读写自旋锁的这种特性(允许多个读者),使得即便是递归的获取同一个读锁也是允许的。更比如,在中断服务程序中,如果确定对数据只有读操作的话(没有写操作),那么甚至可以使用 read_lock 而不是 read_lock_irqsave,但是对于写来说,还是需要调用 write_lock_irqsave 来保证不被中断打断,否则如果在中断中去获取了锁,就会导致死锁。 读-写锁内核 APIs 与 spinlock 一样,Read/Write spinlock 有如下的 APIs: 接口API描述 Read/Write Spinlock API 定义rw spin lock并初始化 DEFINE_RWLOCK 动态初始化rw spin lock rwlock_init 获取指定的rw spin lock read_lock write_lock 获取指定的rw spin lock同时disable本CPU中断 read_lock_irq write_lock_irq 保存本CPU当前的irq状态,disable本CPU中断并获取指定的rw spin lock read_lock_irqsave write_lock_irqsave 获取指定的rw spin lock同时disable本CPU的bottom half read_lock_bh write_lock_bh 释放指定的spin lock read_unlock write_unlock 释放指定的rw spin lock同时enable本CPU中断 read_unlock_irq write_unlock_irq 释放指定的rw spin lock同时恢复本CPU的中断状态 read_unlock_irqrestore write_unlock_irqrestore 获取指定的rw spin lock同时enable本CPU的bottom half read_unlock_bh write_unlock_bh 尝试去获取rw spin lock,如果失败,不会spin,而是返回非零值 read_trylock write_trylock 读-写锁内核实现 说明:使用读写内核锁需要包含的头文件和 spinlock 一样,只需要包含:include/linux/spinlock.h 就可以了 这里仅看 和体系架构相关的部分,在 ARM 体系架构上: arch_rwlock_t 的定义: typedef struct { u32 lock; } arch_rwlock_t; 看看arch_write_lock的实现: static inline void arch_write_lock(arch_rwlock_t *rw){ unsigned long tmp; prefetchw(&rw->lock);------------------------(0) __asm__ __volatile__("1: ldrex %0, [%1]\n"--------------------------(1)" teq %0, #0\n"--------------------------------(2) WFE("ne")------------------------------------(3)" strexeq %0, %2, [%1]\n"----------------------(4)" teq %0, #0\n"--------------------------------(5)" bne 1b"--------------------------------------(6) : "=&r" (tmp) : "r" (&rw->lock), "r" (0x80000000) : "cc"); smp_mb();------------------------------------(7)} (0) : 先通知 hw 进行preloading cache (1): 标记独占,获取 rw->lock 的值并保存在 tmp 中 (2) : 判断 tmp 是否等于 0 (3) : 如果 tmp 不等于0,那么说明有read 或者write的thread持有锁,那么还是静静的等待吧。其他thread会在unlock的时候Send Event来唤醒该CPU的 (4) : 如果 tmp 等于0,将 0x80000000 这个值赋给 rw->lock (5) : 是否 str 成功,如果有其他 thread 在上面的过程插入进来就会失败 (6) : 如果不成功,那么需要重新来过跳转到标号为 1 的地方,即开始的地方,否则持有锁,进入临界区 (7) : 内存屏障,保证执行顺序 arch_write_unlock 的实现: static inline void arch_write_unlock(arch_rwlock_t *rw) { smp_mb(); ---------------------------(0) __asm__ __volatile__( "str %1, [%0]\n" -----------------(1) : : "r" (&rw->lock), "r" (0) : "cc"); dsb_sev(); --------------------------(2)} (0) : 内存屏障 (1) : rw->lock 赋值为 0 (2) :唤醒处于 WFE 的 thread arch_read_lock 的实现: static inline void arch_read_lock(arch_rwlock_t *rw){ unsigned long tmp, tmp2; prefetchw(&rw->lock); __asm__ __volatile__("1: ldrex %0, [%2]\n" ----------- (0)" adds %0, %0, #1\n" --------- (1)" strexpl %1, %0, [%2]\n" ------- (2) WFE("mi") --------------------- (3)" rsbpls %0, %1, #0\n" --------- (4)" bmi 1b" ----------------------- (5) : "=&r" (tmp), "=&r" (tmp2) : "r" (&rw->lock) : "cc"); smp_mb();} (0) : 标记独占,获取 rw->lock 的值并保存在 tmp 中 (1) : tmp = tmp + 1 (2) : 如果 tmp 结果非负值,那么就执行该指令,将 tmp 值存入rw->lock (3) : 如果 tmp 是负值,说明有 write thread,那么就进入 wait for event 状态 (4) : 判断strexpl指令是否成功执行 (5) : 如果不成功,那么需要重新来过,否则持有锁,进入临界区 arch_read_unlock 的实现: static inline void arch_read_unlock(arch_rwlock_t *rw){ unsigned long tmp, tmp2; smp_mb(); prefetchw(&rw->lock); __asm__ __volatile__("1: ldrex %0, [%2]\n" -----------(0)" sub %0, %0, #1\n" -------------(1)" strex %1, %0, [%2]\n" -------(2)" teq %1, #0\n" -----------------(3)" bne 1b" -----------------------(4) : "=&r" (tmp), "=&r" (tmp2) : "r" (&rw->lock) : "cc"); if (tmp == 0) dsb_sev(); ----------------(5)} (0) : 标记独占,获取 rw->lock 的值并保存在 tmp 中 (1) : read 退出临界区,所以,tmp = tmp + 1 (2) : 将tmp值存入 rw->lock 中 (3) :是否str成功,如果有其他thread在上面的过程插入进来就会失败 (4) : 如果不成功,那么需要重新来过,否则离开临界区 (5) : 如果read thread已经等于0,说明是最后一个离开临界区的 Reader,那么调用 sev 去唤醒 WF E的 CPU Core(配合 Writer 线程) 所以看起来,读-写锁使用了一个 32bits 的数来存储当前的状态,最高位代表着是否有写线程占用了锁,而低 31 位代表可以同时并发的读的数量,看起来现在至少是绰绰有余了。 小结 读-写锁自旋锁本质上还是属于自旋锁。只不过允许了并发的读操作,对于写和写,写和读之间,都需要互斥自旋等待。 注意读-写锁自旋锁的使用,避免死锁。 合理运用内核提供的 APIs (诸如:write_lock_irqsave 等)。 5、顺序锁(seqlock) 上面讲到的读-写自旋锁,更加偏向于读者。内核提供了更加偏向于写者的锁 —— seqlock 这种锁提供了一种简单的读写共享的机制,他的设计偏向于写者,无论是什么情况(没有多个写者竞争的情况),写者都有直接写入的权利(霸道),而读者呢?这里提供了一个序列值,当写者进入的时候,这个序列值会加 1,而读者去在读出数值的前后分别来check这个值,便知道是否在读的过程中(奇数还偶数),被写者“篡改”过数据,如果有的话,则再次 spin 的去读,一直到数据被完全的篡改完毕。 顺序锁临界区只允许一个writer thread进入(在多个写者之间是互斥的),临界区只允许一个writer thread进入,在没有writer thread的情况下,reader thread可以随意进入,也就是说reader不会阻挡reader。在临界区只有有reader thread的情况下,writer thread可以立刻执行,不会等待 Writer thread的操作: 对于writer thread,获取seqlock操作如下: (1)获取锁(例如spin lock),该锁确保临界区只有一个writer进入。 (2)sequence counter加一 释放seqlock操作如下: (1)释放锁,允许其他writer thread进入临界区。 (2)sequence counter加一(注意:不是减一哦,sequence counter是一个不断累加的counter) 由上面的操作可知,如果临界区没有任何的writer thread,那么sequence counter是偶数(sequence counter初始化为0),如果临界区有一个writer thread(当然,也只能有一个),那么sequence counter是奇数。 Reader thread的操作如下: (1)获取sequence counter的值,如果是偶数,可以进入临界区,如果是奇数,那么等待writer离开临界区(sequence counter变成偶数)。进入临界区时候的sequence counter的值我们称之old sequence counter。 (2)进入临界区,读取数据 (3)获取sequence counter的值,如果等于old sequence counter,说明一切OK,否则回到step(1) 适用场景: 一般而言,seqlock适用于: (1)read操作比较频繁 (2)write操作较少,但是性能要求高,不希望被reader thread阻挡(之所以要求write操作较少主要是考虑read side的性能) (3)数据类型比较简单,但是数据的访问又无法利用原子操作来保护。我们举一个简单的例子来描述:假设需要保护的数据是一个链表,header--->A node--->B node--->C node--->null。reader thread遍历链表的过程中,将B node的指针赋给了临时变量x,这时候,中断发生了,reader thread被preempt(注意,对于seqlock,reader并没有禁止抢占)。这样在其他cpu上执行的writer thread有充足的时间释放B node的memory(注意:reader thread中的临时变量x还指向这段内存)。当read thread恢复执行,并通过x这个指针进行内存访问(例如试图通过next找到C node),悲剧发生了…… 顺序锁的使用 定义 定义一个顺序锁有两种方式: seqlock_t seqlockseqlock_init(&seqlock) DEFINE_SEQLOCK(seqlock) 写临界区: write_seqlock(&seqlock);/* -------- 写临界区 ---------*/write_sequnlock(&seqlock); 读临界区: unsigned long seq; do { seq = read_seqbegin(&seqlock); /* ---------- 这里读临界区数据 ----------*/} while (read_seqretry(&seqlock, seq)); 例子:在 kernel 中,jiffies_64 保存了从系统启动以来的 tick 数目,对该数据的访问(以及其他jiffies相关数据)需要持有jiffies_lock 这个 seq lock: 读当前的 tick : u64 get_jiffies_64(void) { do { seq = read_seqbegin(&jiffies_lock); ret = jiffies_64; } while (read_seqretry(&jiffies_lock, seq)); } 内核更新当前的 tick : static void tick_do_update_jiffies64(ktime_t now) { write_seqlock(&jiffies_lock); /* 临界区会修改jiffies_64等相关变量 */ write_sequnlock(&jiffies_lock); } 顺序锁的实现 1. seqlock_t 结构: typedef struct { struct seqcount seqcount; spinlock_t lock;} seqlock_t; 2. write_seqlock/write_sequnlock static inline void write_seqlock(seqlock_t *sl) { spin_lock(&sl->lock); sl->sequence++; smp_wmb(); } static inline void write_sequnlock(seqlock_t *sl){ smp_wmb(); s->sequence++; spin_unlock(&sl->lock);} 可以看到 seqlock 其实也是基于 spinlock 的。smp_wmb 是写内存屏障,由于seq lock 是基于 sequence counter 的,所以必须保证这个操作。 3. read_seqbegin: static inline unsigned read_seqbegin(const seqlock_t *sl) { unsigned ret; repeat: ret = ACCESS_ONCE(sl->sequence); ---进入临界区之前,先要获取sequenc counter的快照 if (unlikely(ret & 1)) { -----如果是奇数,说明有writer thread cpu_relax(); goto repeat; ----如果有writer,那么先不要进入临界区,不断的polling sequenc counter } smp_rmb(); ---确保sequenc counter和临界区的内存访问顺序 return ret; } 如果有writer thread,read_seqbegin函数中会有一个不断polling sequenc counter,直到其变成偶数的过程,在这个过程中,如果不加以控制,那么整体系统的性能会有损失(这里的性能指的是功耗和速度)。因此,在polling过程中,有一个cpu_relax的调用,对于ARM64,其代码是: static inline void cpu_relax(void) { asm volatile("yield" ::: "memory"); } yield指令用来告知硬件系统,本cpu上执行的指令是polling操作,没有那么急迫,如果有任何的资源冲突,本cpu可以让出控制权。 4. read_seqretry static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start) { smp_rmb();---确保sequenc counter和临界区的内存访问顺序 return unlikely(sl->sequence != start); } start参数就是进入临界区时候的sequenc counter的快照,比对当前退出临界区的sequenc counter,如果相等,说明没有writer进入打搅reader thread,那么可以愉快的离开临界区。 还有一个比较有意思的逻辑问题:read_seqbegin为何要进行奇偶判断?把一切都推到read_seqretry中进行判断不可以吗?也就是说,为何read_seqbegin要等到没有writer thread的情况下才进入临界区?其实有writer thread也可以进入,反正在read_seqretry中可以进行奇偶以及相等判断,从而保证逻辑的正确性。当然,这样想也是对的,不过在performance上有欠缺,reader在检测到有writer thread在临界区后,仍然放reader thread进入,可能会导致writer thread的一些额外的开销(cache miss),因此,最好的方法是在read_seqbegin中拦截。 6、信号量(semaphore) Linux Kernel 除了提供了自旋锁,还提供了睡眠锁,信号量就是一种睡眠锁。信号量的特点是,如果一个任务试图获取一个已经被占用的信号量,他会被推入等待队列,让其进入睡眠。此刻处理器重获自由,去执行其他的代码。当持有的信号量被释放,处于等待队列的任务将被唤醒,并获取到该信号量。 从信号量的睡眠特性得出一些结论: 由于竞争信号量的时候,未能拿到信号的进程会进入睡眠,所以信号量可以适用于长时间持有。 而且信号量不适合短时间的持有,因为会导致睡眠的原因,维护队列,唤醒,等各种开销,在短时间的锁定某对象,反而比忙等锁的效率低。 由于睡眠的特性,只能在进程上下文进行调用,无法再中断上下文中使用信号量。 一个进程可以在持有信号量的情况下去睡眠(可能并不需要,这里只是假如),另外的进程尝试获取该信号量时候,不会死锁。 期望去占用一个信号量的同时,不允许持有自旋锁,因为企图去获取信号量的时候,可能导致睡眠,而自旋锁不允许睡眠。 在有一些特定的场景,自旋锁和信号量没得选,比如中断上下文,只能用自旋锁,比如需要要和用户空间做同步的时候,代码需要睡眠,信号量是唯一选择。如果有的地方,既可以选择信号量,又可以选自旋锁,则需要根据持有锁的时间长短来进行选择。理想情况下是,越短的时间持有,选择自旋锁,长时间的适合信号量。与此同时,信号量不会关闭调度,他不会对调度造成影响。 信号量允许多个锁持有者,而自旋锁在一个时刻,最多允许一个任务持有。信号量同时允许的持有者数量可以在声明信号的时候指定。绝大多数情况下,信号量允许一个锁的持有者,这种类型的信号量称之为二值信号量,也就是互斥信号量。 一个任务要想访问共享资源,首先必须得到信号量,获取信号量的操作将把信号量的值减1,若当前信号量的值为负数,表明无法获得信号量,该任务必须挂起在该信号量的等待队列等待该信号量可用;若当前信号量的值为非负数,表示可以获得信号量,因而可以立刻访问被该信号量保护的共享资源。 当任务访问完被信号量保护的共享资源后,必须释放信号量,释放信号量通过把信号量的值加1实现,如果信号量的值为非正数,表明有任务等待当前信号量,因此它也唤醒所有等待该信号量的任务。 信号量的操作 信号量相关的东西放置到了: include/linux/semaphore.h 文件 初始化一个信号量有两种方式: struct semaphore sem; sema_init(&sem, val); DEFINE_SEMAPHORE(sem) 内核针对信号量提供了一组操作接口: 函数定义 功能说明 sema_init(struct semaphore *sem, int val) 初始化信号量,将信号量计数器值设置val。 down(struct semaphore *sem) 获取信号量,不建议使用此函数,因为是 UNINTERRUPTABLE 的睡眠。 down_interruptible(struct semaphore *sem) 可被中断地获取信号量,如果睡眠被信号中断,返回错误-EINTR。 down_killable (struct semaphore *sem) 可被杀死地获取信号量。如果睡眠被致命信号中断,返回错误-EINTR。 down_trylock(struct semaphore *sem) 尝试原子地获取信号量,如果成功获取,返回0,不能获取,返回1。 down_timeout(struct semaphore *sem, long jiffies) 在指定的时间jiffies内获取信号量,若超时未获取,返回错误-ETIME。 up(struct semaphore *sem) 释放信号量sem。 注意:down_interruptible 接口,在获取不到信号量的时候,该任务会进入 INTERRUPTABLE 的睡眠,但是 down() 接口会导致进入 UNINTERRUPTABLE 的睡眠,down 用的较少。 信号量的实现 1. 信号量的结构: struct semaphore { raw_spinlock_t lock; unsigned int count; struct list_head wait_list;}; 信号量用结构semaphore描述,它在自旋锁的基础上改进而成,它包括一个自旋锁、信号量计数器和一个等待队列。用户程序只能调用信号量API函数,而不能直接访问信号量结构。 2. 初始化函数sema_init #define __SEMAPHORE_INITIALIZER(name, n) \{ \ .lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \ .count = n, \ .wait_list = LIST_HEAD_INIT((name).wait_list), \} static inline void sema_init(struct semaphore *sem, int val){ static struct lock_class_key __key; *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val); lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);} 初始化了信号量中的 spinlock 结构,count 计数器和初始化链表。 3. 可中断获取信号量函数down_interruptible static noinline int __sched __down_interruptible(struct semaphore *sem){ return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);} int down_interruptible(struct semaphore *sem){ unsigned long flags; int result = 0; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(sem->count > 0)) sem->count--; else result = __down_interruptible(sem); raw_spin_unlock_irqrestore(&sem->lock, flags); return result;} down_interruptible进入后,获取信号量获取成功,进入临界区,否则进入 __down_interruptible->__down_common static inline int __sched __down_common(struct semaphore *sem, long state, long timeout){ struct semaphore_waiter waiter; list_add_tail(&waiter.list, &sem->wait_list); waiter.task = current; waiter.up = false; for (;;) { if (signal_pending_state(state, current)) goto interrupted; if (unlikely(timeout <= 0)) goto timed_out; __set_current_state(state); raw_spin_unlock_irq(&sem->lock); timeout = schedule_timeout(timeout); raw_spin_lock_irq(&sem->lock); if (waiter.up) return 0; } timed_out: list_del(&waiter.list); return -ETIME; interrupted: list_del(&waiter.list); return -EINTR;} 加入到等待队列,将状态设置成为 TASK_INTERRUPTIBLE , 并设置了调度的 Timeout : MAX_SCHEDULE_TIMEOUT 在调用了 schedule_timeout,使得进程进入了睡眠状态。 4. 释放信号量函数 up void up(struct semaphore *sem){ unsigned long flags; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(list_empty(&sem->wait_list))) sem->count++; else __up(sem); raw_spin_unlock_irqrestore(&sem->lock, flags);} 如果等待队列为空,即,没有睡眠的进程期望获取这个信号量,则直接 count++,否则调用 __up: static noinline void __sched __up(struct semaphore *sem){ struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; wake_up_process(waiter->task);} 取出队列中的元素,进行唤醒操作。 7、互斥体(mutex) 互斥体是一种睡眠锁,他是一种简单的睡眠锁,其行为和 count 为 1 的信号量类似。 互斥体简洁高效,但是相比信号量,有更多的限制,因此对于互斥体的使用条件更加严格: 任何时刻,只有一个指定的任务允许持有 mutex,也就是说,mutex 的计数永远是 1; 给 mutex 上锁这,必须负责给他解锁,也就是不允许在一个上下文中上锁,在另外一个上下文中解锁。这个限制注定了 mutex 无法承担内核和用户空间同步的复杂场景。常用的方式是在一个上下文中进行上锁/解锁。 递归的调用上锁和解锁是不允许的。也就是说,不能递归的去持有同一个锁,也不能够递归的解开一个已经解开的锁。 当持有 mutex 的进程,不允许退出 mutex 不允许在中断上下文和软中断上下文中使用过,即便是mutex_trylock 也不行 mutex 只能使用内核提供的 APIs操作,不允许拷贝,手动初始化和重复初始化 信号量和互斥体 他们两者很相似,除非是 mutex 的限制妨碍到逻辑,否则这两者之间,首选 mutex 自旋锁和互斥体 多数情况,很好区分。中断中只能考虑自旋锁,任务睡眠使用互斥体。如果都可以的的情况下,低开销或者短时间的锁,选择自旋锁,长期加锁的话,使用互斥体。 互斥体的使用 函数定义 功能说明 mutex_lock(struct mutex *lock) 加锁,如果不可用,则睡眠(UNINTERRUPTIBLE) mutex_lock_interruptible(struct mutex *lock); 加锁,如果不可用,则睡眠(TASK_INTERRUPTIBLE) mutex_unlock(struct mutex *lock) 解锁 mutex_trylock(struct mutex *lock) 试图获取指定的 mutex,或得到返回1,否则返回 0 mutex_is_locked(struct mutex *lock) 如果 mutex 被占用返回1,否则返回 0 互斥体的实现 互斥体的定义在:include/linux/mutex.h 1. mutex 的结构 struct mutex { atomic_long_t owner; spinlock_t wait_lock;#ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq; /* Spinner MCS lock */#endif struct list_head wait_list;#ifdef CONFIG_DEBUG_MUTEXES void *magic;#endif#ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map;#endif}; 2. mutex 初始化 void__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key){ atomic_long_set(&lock->owner, 0); spin_lock_init(&lock->wait_lock); INIT_LIST_HEAD(&lock->wait_list);#ifdef CONFIG_MUTEX_SPIN_ON_OWNER osq_lock_init(&lock->osq);#endif debug_mutex_init(lock, name, key);}EXPORT_SYMBOL(__mutex_init); 3. mutex 加锁 void __sched mutex_lock(struct mutex *lock){ might_sleep(); if (!__mutex_trylock_fast(lock)) __mutex_lock_slowpath(lock);} 首先check是否能够获得锁,否则调用到 __mutex_lock_slowpath: static noinline void __sched__mutex_lock_slowpath(struct mutex *lock){ __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);} static int __sched__mutex_lock(struct mutex *lock, long state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip){ return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);} 所以调用到了 __mutex_lock_common 函数: static __always_inline int __sched__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip, struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx){ struct mutex_waiter waiter; bool first = false; struct ww_mutex *ww; int ret; might_sleep(); ww = container_of(lock, struct ww_mutex, base); if (use_ww_ctx && ww_ctx) { if (unlikely(ww_ctx == READ_ONCE(ww->ctx))) return -EALREADY; /* * Reset the wounded flag after a kill. No other process can * race and wound us here since they can't have a valid owner * pointer if we don't have any locks held. */ if (ww_ctx->acquired == 0) ww_ctx->wounded = 0; } preempt_disable(); mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); if (__mutex_trylock(lock) || mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) { /* got the lock, yay! */ lock_acquired(&lock->dep_map, ip); if (use_ww_ctx && ww_ctx) ww_mutex_set_context_fastpath(ww, ww_ctx); preempt_enable(); return 0; } spin_lock(&lock->wait_lock); /* * After waiting to acquire the wait_lock, try again. */ if (__mutex_trylock(lock)) { if (use_ww_ctx && ww_ctx) __ww_mutex_check_waiters(lock, ww_ctx); goto skip_wait; } debug_mutex_lock_common(lock, &waiter); lock_contended(&lock->dep_map, ip); if (!use_ww_ctx) { /* add waiting tasks to the end of the waitqueue (FIFO): */ __mutex_add_waiter(lock, &waiter, &lock->wait_list); #ifdef CONFIG_DEBUG_MUTEXES waiter.ww_ctx = MUTEX_POISON_WW_CTX;#endif } else { /* * Add in stamp order, waking up waiters that must kill * themselves. */ ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); if (ret) goto err_early_kill; waiter.ww_ctx = ww_ctx; } waiter.task = current; set_current_state(state); for (;;) { /* * Once we hold wait_lock, we're serialized against * mutex_unlock() handing the lock off to us, do a trylock * before testing the error conditions to make sure we pick up * the handoff. */ if (__mutex_trylock(lock)) goto acquired; /* * Check for signals and kill conditions while holding * wait_lock. This ensures the lock cancellation is ordered * against mutex_unlock() and wake-ups do not go missing. */ if (unlikely(signal_pending_state(state, current))) { ret = -EINTR; goto err; } if (use_ww_ctx && ww_ctx) { ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx); if (ret) goto err; } spin_unlock(&lock->wait_lock); schedule_preempt_disabled(); /* * ww_mutex needs to always recheck its position since its waiter * list is not FIFO ordered. */ if ((use_ww_ctx && ww_ctx) || !first) { first = __mutex_waiter_is_first(lock, &waiter); if (first) __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF); } set_current_state(state); /* * Here we order against unlock; we must either see it change * state back to RUNNING and fall through the next schedule(), * or we must see its unlock and acquire. */ if (__mutex_trylock(lock) || (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter))) break; spin_lock(&lock->wait_lock); } spin_lock(&lock->wait_lock);acquired: __set_current_state(TASK_RUNNING); if (use_ww_ctx && ww_ctx) { /* * Wound-Wait; we stole the lock (!first_waiter), check the * waiters as anyone might want to wound us. */ if (!ww_ctx->is_wait_die && !__mutex_waiter_is_first(lock, &waiter)) __ww_mutex_check_waiters(lock, ww_ctx); } mutex_remove_waiter(lock, &waiter, current); if (likely(list_empty(&lock->wait_list))) __mutex_clear_flag(lock, MUTEX_FLAGS); debug_mutex_free_waiter(&waiter); skip_wait: /* got the lock - cleanup and rejoice! */ lock_acquired(&lock->dep_map, ip); if (use_ww_ctx && ww_ctx) ww_mutex_lock_acquired(ww, ww_ctx); spin_unlock(&lock->wait_lock); preempt_enable(); return 0; err: __set_current_state(TASK_RUNNING); mutex_remove_waiter(lock, &waiter, current);err_early_kill: spin_unlock(&lock->wait_lock); debug_mutex_free_waiter(&waiter); mutex_release(&lock->dep_map, 1, ip); preempt_enable(); return ret;} 进入等待队列。 4. mutex 解锁 void __sched mutex_unlock(struct mutex *lock){#ifndef CONFIG_DEBUG_LOCK_ALLOC if (__mutex_unlock_fast(lock)) return;#endif __mutex_unlock_slowpath(lock, _RET_IP_);}EXPORT_SYMBOL(mutex_unlock); 调用到 __mutex_unlock_slowpath : static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip){ struct task_struct *next = NULL; DEFINE_WAKE_Q(wake_q); unsigned long owner; mutex_release(&lock->dep_map, 1, ip); /* * Release the lock before (potentially) taking the spinlock such that * other contenders can get on with things ASAP. * * Except when HANDOFF, in that case we must not clear the owner field, * but instead set it to the top waiter. */ owner = atomic_long_read(&lock->owner); for (;;) { unsigned long old; #ifdef CONFIG_DEBUG_MUTEXES DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current); DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);#endif if (owner & MUTEX_FLAG_HANDOFF) break; old = atomic_long_cmpxchg_release(&lock->owner, owner, __owner_flags(owner)); if (old == owner) { if (owner & MUTEX_FLAG_WAITERS) break; return; } owner = old; } spin_lock(&lock->wait_lock); debug_mutex_unlock(lock); if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = list_first_entry(&lock->wait_list, struct mutex_waiter, list); next = waiter->task; debug_mutex_wake_waiter(lock, waiter); wake_q_add(&wake_q, next); } if (owner & MUTEX_FLAG_HANDOFF) __mutex_handoff(lock, next); spin_unlock(&lock->wait_lock); wake_up_q(&wake_q);} 做唤醒操作。 8、RCU机制 RCU 的全称是(Read-Copy-Update),意在读写-复制-更新,在Linux提供的所有内核互斥的设施当中属于一种免锁机制。在之前讨论过的读写自旋锁(rwlock)、顺序锁(seqlock)一样,RCU 的适用模型也是读写共存的系统。 读写自旋锁:读者和写者互斥,读者和读者共存,写者和写者互斥。(偏向读者) 顺序锁:写者和写者互斥,写者直接打断读者(偏向写者) 上述两种都是基于 spinlock 的一种用于特定场景的锁机制。 RCU 与他们不同,它的读取和写入操作无需考虑两者之间的互斥问题。 之前的锁分析中,可以知道,加锁、解锁都涉及内存操作,同时伴有内存屏障引入,这些都会导致锁操作的系统开销变大,在此基础之上, 内核在Kernel的 2.5 版本引入了 RCU 的免锁互斥访问机制。 什么叫免锁机制呢?对于被RCU保护的共享数据结构,读者不需要获得任何锁就可以访问它,但写者在访问它时首先拷贝一个副本,然后对副本进行修改,最后使用一个回调(callback)机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据。这个时机就是所有引用该数据的CPU都退出对共享数据的操作。 因此RCU实际上是一种改进的 rwlock,读者几乎没有什么同步开销,它不需要锁,不使用原子指令。不需要锁也使得使用更容易,因为死锁问题就不需要考虑了。写者的同步开销比较大,它需要延迟数据结构的释放,复制被修改的数据结构,它也必须使用某种锁机制同步并行的其它写者的修改操作。读者必须提供一个信号给写者以便写者能够确定数据可以被安全地释放或修改的时机。有一个专门的垃圾收集器来探测读者的信号,一旦所有的读者都已经发送信号告知它们都不在使用被RCU保护的数据结构,垃圾收集器就调用回调函数完成最后的数据释放或修改操作。 RCU与rwlock的不同之处是:它既允许多个读者同时访问被保护的数据,又允许多个读者和多个写者同时访问被保护的数据(注意:是否可以有多个写者并行访问取决于写者之间使用的同步机制),读者没有任何同步开销,而写者的同步开销则取决于使用的写者间同步机制。但RCU不能替代rwlock,因为如果写比较多时,对读者的性能提高不能弥补写者导致的损失。 读者在访问被RCU保护的共享数据期间不能被阻塞,这是RCU机制得以实现的一个基本前提,也就说当读者在引用被RCU保护的共享数据期间,读者所在的CPU不能发生上下文切换,spinlock和rwlock都需要这样的前提。写者在访问被RCU保护的共享数据时不需要和读者竞争任何锁,只有在有多于一个写者的情况下需要获得某种锁以与其他写者同步。写者修改数据前首先拷贝一个被修改元素的副本,然后在副本上进行修改,修改完毕后它向垃圾回收器注册一个回调函数以便在适当的时机执行真正的修改操作。等待适当时机的这一时期称为宽限期 grace period,而CPU发生了上下文切换称为经历一个quiescent state,grace period就是所有CPU都经历一次quiescent state所需要的等待的时间(读的时候禁止了内核抢占,也就是上下文切换,如果在某个 CPU 上发生了进程切换,那么所有对老指针的引用都会结束之后)。垃圾收集器就是在grace period之后调用写者注册的回调函数(call_rcu 函数注册回调)来完成真正的数据修改或数据释放操作的。 总的来说,RCU的行为方式: 1、随时可以拿到读锁,即对临界区的读操作随时都可以得到满足 2、某一时刻只能有一个人拿到写锁,多个写锁需要互斥,写的动作包括 拷贝--修改--宽限窗口到期后删除原值 3、临界区的原始值为m1,如会有人拿到写锁修改了临界区为m2,则在写锁修改临界区之后拿到的读锁获取的临界区的值为m2,之前获取的为m1,这通过原子操作保证 对比发现RCU读操作随时都会得到满足,但写锁之后的写操作所耗费的系统资源就相对比较多了,并且只有在宽限期之后删除原资源。 针对对象 RCU 保护的对象是指针。这一点尤其重要.因为指针赋值是一条单指令.也就是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑cache的影响。 内核中所有关于 RCU 的操作都应该使用内核提供的 RCU 的 APIs 函数完成,这些 APIs 主要集中在指针和链表的操作。 使用场景 RCU 使用在读者多而写者少的情况.RCU和读写锁相似.但RCU的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源 读者是可以嵌套的.也就是说rcu_read_lock()可以嵌套调用 从 RCU 的特性可知,RCU 的读取性能的提升是在增加写入者负担的前提下完成的,因此在一个读者与写者共存的系统中,按照设计者的说法,如果写入者的操作比例在 10% 以上,那么久应该考虑其他的互斥方法,反正,使用 RCU 的话,能够获取较好的性能。 使用限制 读者在访问被RCU保护的共享数据期间不能被阻塞。 在读的时候,会屏蔽掉内核抢占。 RCU 的实现原理 在RCU的实现过程中,我们主要解决以下问题: 1,在读取过程中,另外一个线程删除了一个节点。删除线程可以把这个节点从链表中移除,但它不能直接销毁这个节点,必须等到所有的读取线程读取完成以后,才进行销毁操作。RCU中把这个过程称为宽限期(Grace period)。 2,在读取过程中,另外一个线程插入了一个新节点,而读线程读到了这个节点,那么需要保证读到的这个节点是完整的。这里涉及到了发布-订阅机制(Publish-Subscribe Mechanism)。 3, 保证读取链表的完整性。新增或者删除一个节点,不至于导致遍历一个链表从中间断开。但是RCU并不保证一定能读到新增的节点或者不读到要被删除的节点。 1. 宽限期 通过例子,方便理解这个内容。以下例子修改于Paul的文章: struct foo { int a; char b; long c; }; DEFINE_SPINLOCK(foo_mutex); struct foo *gbl_foo; void foo_read (void){ foo *fp = gbl_foo; if ( fp != NULL ) dosomething(fp->a, fp->b , fp->c );} void foo_update( foo* new_fp ){ spin_lock(&foo_mutex); foo *old_fp = gbl_foo; gbl_foo = new_fp; spin_unlock(&foo_mutex); kfee(old_fp);} 如上的程序,是针对于全局变量gbl_foo的操作。假设以下场景。有两个线程同时运行foo_ read和foo_update的时候,当foo_ read执行完赋值操作后,线程发生切换;此时另一个线程开始执行foo_update并执行完成。当foo_ read运行的进程切换回来后,运行dosomething的时候,fp 已经被删除,这将对系统造成危害。为了防止此类事件的发生,RCU里增加了一个新的概念叫宽限期(Grace period)。如下图所示: 图中每行代表一个线程,最下面的一行是删除线程,当它执行完删除操作后,线程进入了宽限期。宽限期的意义是,在一个删除动作发生后,它必须等待所有在宽限期开始前已经开始的读线程结束,才可以进行销毁操作。这样做的原因是这些线程有可能读到了要删除的元素。图中的宽限期必须等待1和2结束;而读线程5在宽限期开始前已经结束,不需要考虑;而3,4,6也不需要考虑,因为在宽限期结束后开始后的线程不可能读到已删除的元素。为此RCU机制提供了相应的API来实现这个功能。 用 RCU: void foo_read(void){ rcu_read_lock(); foo *fp = gbl_foo; if ( fp != NULL ) dosomething(fp->a,fp->b,fp->c); rcu_read_unlock();} void foo_update( foo* new_fp ){ spin_lock(&foo_mutex); foo *old_fp = gbl_foo; gbl_foo = new_fp; spin_unlock(&foo_mutex); synchronize_rcu(); kfee(old_fp);} 其中 foo_ read 中增加了 rcu_read_lock 和 rcu_read_unlock,这两个函数用来标记一个RCU读过程的开始和结束。其实作用就是帮助检测宽限期是否结束。foo_update 增加了一个函数 synchronize_rcu(),调用该函数意味着一个宽限期的开始,而直到宽限期结束,该函数才会返回。我们再对比着图看一看,线程1和2,在 synchronize_rcu 之前可能得到了旧的 gbl_foo,也就是 foo_update 中的 old_fp,如果不等它们运行结束,就调用 kfee(old_fp),极有可能造成系统崩溃。而3,4,6在synchronize_rcu 之后运行,此时它们已经不可能得到 old_fp,此次的kfee将不对它们产生影响。 宽限期是RCU实现中最复杂的部分,原因是在提高读数据性能的同时,删除数据的性能也不能太差。 2. 订阅——发布机制 当前使用的编译器大多会对代码做一定程度的优化,CPU也会对执行指令做一些优化调整,目的是提高代码的执行效率,但这样的优化,有时候会带来不期望的结果。如例: void foo_update( foo* new_fp ){ spin_lock(&foo_mutex); foo *old_fp = gbl_foo; new_fp->a = 1; new_fp->b = ‘b’; new_fp->c = 100; gbl_foo = new_fp; spin_unlock(&foo_mutex); synchronize_rcu(); kfee(old_fp);} 这段代码中,我们期望的是6,7,8行的代码在第10行代码之前执行。但优化后的代码并不对执行顺序做出保证。在这种情形下,一个读线程很可能读到 new_fp,但new_fp的成员赋值还没执行完成。当读线程执行dosomething(fp->a, fp->b , fp->c ) 的 时候,就有不确定的参数传入到dosomething,极有可能造成不期望的结果,甚至程序崩溃。可以通过优化屏障来解决该问题,RCU机制对优化屏障做了包装,提供了专用的API来解决该问题。这时候,第十行不再是直接的指针赋值,而应该改为 : rcu_assign_pointer(gbl_foo,new_fp); <include/linux/rcupdate.h> 中 rcu_assign_pointer的实现比较简单,如下: #define rcu_assign_pointer(p, v) \ __rcu_assign_pointer((p), (v), __rcu) #define __rcu_assign_pointer(p, v, space) \ do { \ smp_wmb(); \ (p) = (typeof(*v) __force space *)(v); \ } while (0) 我们可以看到它的实现只是在赋值之前加了优化屏障 smp_wmb来确保代码的执行顺序。另外就是宏中用到的__rcu,只是作为编译过程的检测条件来使用的。 <include/linux/rcupdate.h> #define rcu_dereference(p) rcu_dereference_check(p, 0) #define rcu_dereference_check(p, c) \ __rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu) #define __rcu_dereference_check(p, c, space) \ ({ \ typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p); \ rcu_lockdep_assert(c, "suspicious rcu_dereference_check()" \ " usage"); \ rcu_dereference_sparse(p, space); \ smp_read_barrier_depends(); \ ((typeof(*p) __force __kernel *)(_________p1)); \ }) static inline int rcu_read_lock_held(void){ if (!debug_lockdep_rcu_enabled()) return 1; if (rcu_is_cpu_idle()) return 0; if (!rcu_lockdep_current_cpu_online()) return 0; return lock_is_held(&rcu_lock_map);} 这段代码中加入了调试信息,去除调试信息,可以是以下的形式(其实这也是旧版本中的代码): #define rcu_dereference(p) ({ \ typeof(p) _________p1 = p; \ smp_read_barrier_depends(); \ (_________p1); \ }) 在赋值后加入优化屏障smp_read_barrier_depends()。 我们之前的第四行代码改为 foo *fp = rcu_dereference(gbl_foo);,就可以防止上述问题。 3. 数据读取的完整性 还是通过例子来说明这个问题: 如图我们在原list中加入一个节点new到A之前,所要做的第一步是将new的指针指向A节点,第二步才是将Head的指针指向new。这样做的目的是当插入操作完成第一步的时候,对于链表的读取并不产生影响,而执行完第二步的时候,读线程如果读到new节点,也可以继续遍历链表。如果把这个过程反过来,第一步head指向new,而这时一个线程读到new,由于new的指针指向的是Null,这样将导致读线程无法读取到A,B等后续节点。从以上过程中,可以看出RCU并不保证读线程读取到new节点。如果该节点对程序产生影响,那么就需要外部调用做相应的调整。如在文件系统中,通过RCU定位后,如果查找不到相应节点,就会进行其它形式的查找,相关内容等分析到文件系统的时候再进行叙述。 我们再看一下删除一个节点的例子: 如图我们希望删除B,这时候要做的就是将A的指针指向C,保持B的指针,然后删除程序将进入宽限期检测。由于B的内容并没有变更,读到B的线程仍然可以继续读取B的后续节点。B不能立即销毁,它必须等待宽限期结束后,才能进行相应销毁操作。由于A的节点已经指向了C,当宽限期开始之后所有的后续读操作通过A找到的是C,而B已经隐藏了,后续的读线程都不会读到它。这样就确保宽限期过后,删除B并不对系统造成影响。 如何使用 RCU 除了上一节使用 RCU 的例子,这一节在贴一个使用 RCU 的例子: // 假设 struct shared_data 是一个在读者和写者之间共享的数据结构struct shared_data { int a; int b; struct rcu_head rcu;}; Reader Code : // 读者的代码。// 读者调用 rcu_read_lock 和 rcu_read_unlock 来构建并访问临界区// 所有对指向被保护资源指针的引用都应该只在临界区出现,而且临界区代码不能睡眠static void demo_reader(struct shared_data *ptr){ struct shared_data *p = NULL; rcu_read_lock(); // call rcu_dereference to get the ptr pointer p = rcu_dereference(ptr); if (p) do_somethings(p); rcu_read_unlock();} Writer Code : // 写入侧的代码// 写入者提供的回调函数,用于释放老的数据指针static void demo_del_oldptr(struct rcu_head *rh){ struct shared_data *p = container_of(rh, struct rcu_head, rcu); kfree(p);} static void demo_writer(struct shared_data *ptr){ struct shared_data *new_ptr = kmalloc(...); .... new_ptr->a = 10; new_ptr->b = 20; // 用新指针更新老指针 rcu_assign_pointer(ptr, new_ptr); // 调用 call_rcu 让内核在确保所有对老指针 ptr 的引用都解锁后,回调到 demo_del_oldptr 释放老指针 call_rcu(ptr->rcu, demo_del_oldptr);} 在上面的例子,写者调用 rcu_assign_pointer 更新老指针后,使用 call_rcu 接口,向系统注册了一会回调函数,系统在确定没有对老指针引用之后,调用这个函数。另一个类似的函数是上一节遇到的 synchronize_rcu 调用,这个函数可能会阻塞,因为他要等待所有对老指针的引用全部结束才返回,函数返回的时候意味着系统所有对老指针的引用都消失,此时在释放老指针才是安全的。如果在中断上下文执行写入者的操作,那么就应该使用 call_rcu ,不能使用 synchronize_rcu。 对于这 call_rcu 和 synchronize_rcu 的分析如下: 在释放老指针方面,Linux内核提供两种方法供使用者使用,一个是调用call_rcu,另一个是调用synchronize_rcu。前者是一种异步 方式,call_rcu会将释放老指针的回调函数放入一个结点中,然后将该结点加入到当前正在运行call_rcu的处理器的本地链表中,在时钟中断的 softirq部分(RCU_SOFTIRQ), rcu软中断处理函数rcu_process_callbacks会检查当前处理器是否经历了一个休眠期(quiescent,此处涉及内核进程调度等方面的内容),rcu的内核代码实现在确定系统中所有的处理器都经历过了一个休眠期之后(意味着所有处理器上都发生了一次进程切换,因此老指针此时可以被安全释放掉了),将调用call_rcu提供的回调函数。 synchronize_rcu的实现则利用了等待队列,在它的实现过程中也会向call_rcu那样向当前处理器的本地链表中加入一个结点,与 call_rcu不同之处在于该结点中的回调函数是wakeme_after_rcu,然后synchronize_rcu将在一个等待队列中睡眠,直到系统中所有处理器都发生了一次进程切换,因而wakeme_after_rcu被rcu_process_callbacks所调用以唤醒睡眠的 synchronize_rcu,被唤醒之后,synchronize_rcu知道它现在可以释放老指针了。 所以我们看到,call_rcu返回后其注册的回调函数可能还没被调用,因而也就意味着老指针还未被释放,而synchronize_rcu返回后老指针肯定被释放了。所以,是调用call_rcu还是synchronize_rcu,要视特定需求与当前上下文而定,比如中断处理的上下文肯定不能使用 synchronize_rcu函数了。 基本RCU操作 APIs 对于reader,RCU的操作包括: (1)rcu_read_lock:用来标识RCU read side临界区的开始。 (2)rcu_dereference:该接口用来获取RCU protected pointer。reader要访问RCU保护的共享数据,当然要获取RCU protected pointer,然后通过该指针进行dereference的操作。 (3)rcu_read_unlock:用来标识reader离开RCU read side临界区 对于writer,RCU的操作包括: (1)rcu_assign_pointer:该接口被writer用来进行removal的操作,在witer完成新版本数据分配和更新之后,调用这个接口可以让RCU protected pointer指向RCU protected data。 (2)synchronize_rcu:writer端的操作可以是同步的,也就是说,完成更新操作之后,可以调用该接口函数等待所有在旧版本数据上的reader线程离开临界区,一旦从该函数返回,说明旧的共享数据没有任何引用了,可以直接进行reclaimation的操作。 (3)call_rcu:当然,某些情况下(例如在softirq context中),writer无法阻塞,这时候可以调用call_rcu接口函数,该函数仅仅是注册了callback就直接返回了,在适当的时机会调用callback函数,完成reclaimation的操作。这样的场景其实是分开removal和reclaimation的操作在两个不同的线程中:updater和reclaimer。 RCU 的链表操作: 在 Linux kernel 中还专门提供了一个头文件(include/linux/rculist.h),提供了利用 RCU 机制对链表进行增删查改操作的接口。 (1) list_add_rcu :该函数把链表项new插入到RCU保护的链表head的开头。使用内存栅保证了在引用这个新插入的链表项之前,新链表项的链接指针的修改对所有读者是可见的。 static inline void list_add_rcu(struct list_head *new, struct list_head *head) (2) list_add_tail_rcu:该函数类似于list_add_rcu,它将把新的链表项new添加到被RCU保护的链表的末尾。 static inline void list_add_tail_rcu(struct list_head *new, struct list_head *head) (3) list_del_rcu:该函数从RCU保护的链表中移走指定的链表项entry,并且把entry的prev指针设置为LIST_POISON2,但是并没有把entry的next指针设置为LIST_POISON1,因为该指针可能仍然在被读者用于便利该链表。 static inline void list_del_rcu(struct list_head *entry) (4) list_replace_rcu:该函数是RCU新添加的函数,并不存在非RCU版本。它使用新的链表项new取代旧的链表项old,内存栅保证在引用新的链表项之前,它的链接指针的修正对所有读者可见 static inline void list_replace_rcu(struct list_head *old, struct list_head *new) (5) list_for_each_entry_rcu:该宏用于遍历由RCU保护的链表head #define list_for_each_entry_rcu(pos, head, member) \ for (pos = list_entry_rcu((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = list_entry_rcu(pos->member.next, typeof(*pos), member))

    11-21 65浏览
  • 硬件工程师也需要牢记的10大软件技巧

    嵌入式系统设计不仅需要了解硬件,还需了解软件是如何影响硬件并与硬件进行交互的。设计硬件所需的范式可能与设计软

    09-13 230浏览
  • 为什么MySQL数据库要用b+树?

    一、数据结构 数据结构大致可以分为两种 —— 线性结构 和 非线性结构。 1. 线性结构 线性结构是数据结构中的一种基本结构,它的特点是数据元素之间存在一对一的前后关系。线性结构中的数据元素排列成一个线性序列,可以用顺序存储或链式存储来实现。 常见的线性结构有以下几种: 数组:连续存储的一组相同类型的元素,可以通过索引直接访问。 链表:由节点组成的集合,每个节点包含数据和指向下一个节点的指针。 栈:具有先进后出(LIFO)特性的线性表,只能在栈顶进行插入和删除操作。 队列:具有先进先出(FIFO)特性的线性表,只能在队尾插入,在队头删除。 哈希表:使用哈希函数将键映射到存储位置,并通过解决冲突处理碰撞问题。 2. 非线性结构 非线性结构是数据结构中的另一种类型,与线性结构不同,它的数据元素之间并不是简单的前后关系。 常见的非线性结构有以下几种: 树:由节点和边组成的层次结构,每个节点可以有多个子节点。 图:由节点和边组成的集合,节点之间可以存在多种关系,如有向图和无向图。 堆:一种特殊的树形结构,通常用于实现优先队列,具有父节点小于(或大于)子节点的特性。 散列表:基于哈希函数将键映射到存储位置,并通过解决冲突处理碰撞问题。 图表:由多个表格连接而成,用于表示复杂的关系和数据依赖。 还有例如跳表之类的其他的数据结构,也都是从基础数据结构演化出来的,用来解决指定的场景问题。 二、索引的数据结构 我们先把记忆中的 Mysql的索引是使用B+树做的,因为B+树有 xxx 的优点 抹去,没有人在开发的时候就能直接想到完美的解决方案,所以我们也来推导一下索引的数据结构。 1. 索引的作用 索引是用来做什么的? 索引在数据库和数据结构中起到了重要的作用,它能够提高数据的查询效率和访问速度。以下是索引的几个主要作用: 加快数据检索:索引可以按照特定的规则对数据进行排序和组织,从而加快数据的查找速度。通过使用合适的索引,可以避免全表扫描,减少查询所需的时间复杂度。 提高查询性能:当数据库中有大量记录时,使用索引可以显著提升查询性能。索引将数据按照特定列进行排序和分组,使得数据库系统只需搜索一部分数据而不是全部数据。 优化排序和分组操作:索引可以帮助数据库系统快速地进行排序和分组操作。如果在执行SQL语句时需要对结果进行排序或者分组,使用合适的索引可以节省大量计算时间。 约束唯一性:通过在某些字段上创建唯一索引,可以保证该字段值的唯一性,防止重复插入相同值的情况发生。 改善连接操作:当多个表之间需要进行连接操作时,在关联列上创建合适的索引能够极大地提高连接操作的效率。 尽管索引能够提高查询效率,但也会增加存储空间和更新数据的成本。 索引存储在哪里? 索引存储在数据库管理系统的内存和磁盘中。具体来说,有以下几种常见的索引存储方式: 内存中的索引:为了提高查询效率,数据库管理系统通常会将常用的索引数据加载到内存中进行操作。这样可以避免频繁的磁盘访问,加快查询速度。 磁盘上的索引:当内存无法容纳全部索引数据时,数据库管理系统会将部分或全部索引数据存储在磁盘上。通常使用B+树等数据结构来组织和管理索引数据,以支持高效地查找、插入和删除操作。 辅助文件:一些数据库系统会将较大或者不常用的索引存储在单独的文件中,而不是放在主要的数据文件中。这样可以降低主要数据文件的大小,减少IO开销,并且更灵活地管理索引。 我们都说数据持久化数据持久化,其实就是把内存里的数据转移到硬盘上,这样即便是设备断电了,数据也不会受到影响。但是有利必有弊,数据存储在硬盘上带来的后果就是读取的速度变慢。又是变慢,能变多慢呢?内存是纳秒级的处理速度,硬盘是毫秒级的处理速度,二者相差百万倍,这就是速度的差异。所以我们实际使用索引的时候,会把索引从硬盘中读到内存里,然后通过内存里的索引,从硬盘中找到数据。 但是这样优化了又如何呢,只要需要读硬盘,那就会消耗时间,硬盘IO越多,时间消耗越多。 除此之外,我们使用索引不只是为了能够迅速找到某一个数据,而是能够迅速找到某一个范围区间的数据,能够动态的执行有关数据的操作。 那么在上述的描述下,索引能够使用的数据结构就有这么几个 —— 哈希表、跳表、树。 2. 哈希表 哈希表:也叫做散列表。是根据关键字和值(Key-Value)直接进行访问的数据结构。也就是说,它通过关键字 key 和一个映射函数 Hash(key) 计算出对应的值 value,然后把键值对映射到表中一个位置来访问记录,以加快查找的速度。这个映射函数叫做哈希函数(散列函数),用于存放记录的数组叫做 哈希表(散列表)。哈希表的关键思想是使用哈希函数,将键 key 和值 value 映射到对应表的某个区块中。可以将算法思想分为两个部分: 向哈希表中插入一个关键字:哈希函数决定该关键字的对应值应该存放到表中的哪个区块,并将对应值存放到该区块中 在哈希表中搜索一个关键字:使用相同的哈希函数从哈希表中查找对应的区块,并在特定的区块搜索该关键字对应的值 哈希表的原理示例图如下所示: 哈希表的精确查询时间复杂度是 O(1) ,为什么呢?因为计算目标 key 的 hash 值,然后直接对应到数组的下标,这个过程大大的减少了查询所需要的时间。在产生了 hash 碰撞的时候,也会使用链表和红黑树的方式加快碰撞情况下,查询目标值的速度。 这样的话,我们索引的数据结构完全可以采用哈希表的形式来做,效率非常高。但是为什么不这么做呢? 如果使用哈希表来当作索引的数据结构,在进行范围查询的时候需要全部扫描,这是一笔不菲的代价。 3. 跳表 跳表(Skip List)是一种用于实现有序数据结构的数据结构,它在链表的基础上通过添加多级索引来加速搜索操作。 跳表由William Pugh在1989年提出,其设计灵感来自于平衡二叉树。相比于传统的链表,在查找元素时,跳表可以通过使用多级索引进行快速定位,并且具备较高的插入和删除效率。 跳表中的每个节点包含一个值和若干个指向下一层节点的指针。最底层是原始链表,每个节点按照值从小到大排列;而上方的各级索引则以不同步长稀疏地链接部分节点。这样,在搜索时可以先沿着最顶层索引开始查找,逐渐向下层细化范围,直到找到目标节点或者确定目标不存在。 跳表的时间复杂度为 O(log n),其中 n 是元素数量。它相对简单、易于实现,并且支持高效的插入、删除和搜索操作。因此,在某些场景下,跳表可以作为替代平衡二叉树等数据结构的选择。 如图所示,跳表就是在链表的基础上加了索引层,这样就能够实现区间查询的效果。比如我们要查找key = 5,那就先遍历索引层,遍历到3,然后发现下一个索引是6,那么直接从索引层的3往下进入链表,在往后走2步就到key = 5了。 如果数据量非常非常大呢?(图方便这里用 excel 绘制,美观上会差一些) 这样是不是就能发现跳表的好处了,用多个索引划分链表,从高级索引定位到更低级的索引,直到定位到链表中。效率看起来也很高。 但是这还是存在一个问题,我读取完三级索引到内存,然后我还要硬盘IO去读取二级索引,然后还要读取一级索引。还是在硬盘的IO上费了太多的操作。跳表的数据越多,索引层越高,读取索引带来的硬盘IO次数越多,性能降低,这又违背了一开始使用索引的理念。 4. 树 树结构的特性决定了遍历数据本身就支持按区间查询。再加上树是非线性结构的优势相比于线性结构的数组,不必像数组的数据是连续存放的。那么当树结构在插入新数据时就不用像数组插入数据前时,需要将数据所在往后的所有数据节点都得往后挪动的开销。所以树结构更适合插入更新等动态操作的数据结构。 需要C/C++ Linux服务器架构师学习资料加qun579733396获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享 三、树 实现索引使用的数据结构看来是要使用树结构了,常用的树都有哪些呢?二叉树、二叉查找树、平衡二叉查找树、红黑树、B树。 二叉树 二叉树是n(n>=0)个结点的有限集合。当n=0时为空树,当n不为0时,二叉树有以下特点:1.每个结点的度不超过2(可以理解为二孩政策下的结点最多只能有两个孩子); 每个结点的左子树和右子树顺序不能颠倒,所以二叉树是有序树。 特殊二叉树 满二叉树:每一层结点数都达到最大,那么它就是满二叉树。如第1层最多有2 ^0个结点,第2层最多有 2 ^1个结点,则第k层最多有2 ^(k-1)个结点,假设这棵满二叉树有k层,那么它总共有2 ^0+2 ^1+……+2 ^(k-1) = 2 ^k-1个结点。 完全二叉树:深度为k,有n个结点的二叉树当且仅当其每一个结点都与深度为k的满二叉树中编号从1至n的结点一一对应时,称为完全二叉树。(简介版:完全二叉树的前k-1层是满二叉树,最后一层从左到右依次连续) 二叉查找树 二叉查找树可以理解为融合了二分查找的二叉树。二分查找大家都熟悉吧,时间复杂度 O(logN) ,比直接遍历的线性查找快的多,但是需要数组是有序的。 所以说,二叉查找树不同于普通的二叉树,二叉查找树是将小于根节点的元素放在左子树,将大于根节点的元素放在右子树。其实就是从某种含义了实现了二分查找的先决条件 —— 数值有序。 但是二叉树是存在弊端的,如果我们每次都插入一个更小的数或者更大的数,那么二叉树就会在一个方向上无限延长,退化成了链表。那链表的时间复杂度是 O(N),而且又加大了硬盘的IO操作。所以这种结构还是不太行。 平衡二叉查找树 面说一直放更小或者更大的数,让他不断延长,变成一个极端的“很高很瘦”的数,那么用平衡二叉查找树就能解决这个问题。 平衡二叉查找树的关键是 平衡 ,指的是每个节点的左右子树高度差不能超过 1。这样左右子树都能平衡,时间复杂度为 O(logN) 。 无论是二叉树还是二叉查找树还是平衡二叉查找树还是红黑树,他最终都存在一个问题 —— 每个节点只能有 2 个子树。这意味着只要数据量足够大,它总会变成一个深度非常大的树。深度越大,硬盘IO次数越多,性能效率越低,这又双叒叕与索引的初衷背道而驰。 红黑树 与AVL树相比,红黑树并不追求严格的平衡,而是大致的平衡:只是确保从根到叶子的最长的可能路径不多于最短的可能路径的两倍长。从实现来看,红黑树最大的特点是每个节点都属于两种颜色(红色或黑色)之一,且节点颜色的划分需要满足特定的规则。 与AVL树相比,红黑树的查询效率会有所下降,这是因为树的平衡性变差,高度更高。但红黑树的删除效率大大提高了,因为红黑树同时引入了颜色,当插入或删除数据时,只需要进行O(1)次数的旋转以及变色就能保证基本的平衡,不需要像AVL树进行O(lgn)次数的旋转。总的来说,红黑树的统计性能高于AVL。 因此,在实际应用中,AVL树的使用相对较少,而红黑树的使用非常广泛。 对于数据在内存中的情况(如上述的TreeMap和HashMap),红黑树的表现是非常优异的。但是对于数据在磁盘等辅助存储设备中的情况(如MySQL等数据库),红黑树并不擅长,因为红黑树长得还是太高了。当数据在磁盘中时,磁盘IO会成为最大的性能瓶颈,设计的目标应该是尽量减少IO次数;而树的高度越高,增删改查所需要的IO次数也越多,会严重影响性能。 B树 新的数据结构的产生肯定是为了解决之前繁琐的问题。在树的深度不断变大的情况下,B树就应运而生了。 B树的出现解决了树高度的问题,从名字上也能看出来,它不叫B二叉树而是直接叫B树,因为它摆脱了二叉这个概念。它不再限制一个父节点中只能有两个子节点,而是允许拥有 M 个子节点(M > 2)。不仅如此,B树的一个节点可以存储多个元素,相比较于前面的那些二叉树数据结构又将整体的树高度降低了。那么B树实际上就是多叉树。 图中每一个节点叫做 页,是Mysql数据读取的基本单位,也就是上面的磁盘块。其中的 P 是指向子节点的指针。 当数据量足够大的时候,使用平衡二叉查找树则会不断纵向扩展子节点,让整个树变得更高。而B树可以横向扩展子节点,变得更胖,但是树的高度不高,硬盘IO的次数更少。 综上所述,B树已经非常适合用来给Mysql做索引的数据结构了。那么为什么还要去使用B+树呢?实际上B树存在一个缺点,虽然B树实现了区间查找,但是B树的去检查找是基于中序遍历来做的,中序遍历的算法题大家应该都做过,需要来回切换父子节点,切换父子节点在这里就意味着硬盘不断的IO操作,这显然也是不好的。 B+树 B+树其实就是B树的升级版。MySQL 中innoDB引擎中的索引底层数据结构采用的正是B+树。 B+树相对于树做了这些方面的改动:B+树中的非叶子节点只作为索引,不存储数据。转而由叶子节点存放整棵树的所有数据。叶子节点之间再构成一个从小到大的有序的链表并互相指向相邻的叶子节点,也就是在叶子节点之间形成了有序的双向链表。 画图画的不是很清楚,忘记展示双向链表的特点,最下面的箭头指的是相邻的两个叶子是双向的,所有的叶子节点构成双向链表。 再来看B+树的插入和删除,B+树做了大量冗余节点,从上面可以发现父节点的所有元素都会在子节点中出现,这样当删除一个节点时,可以直接从叶子节点中删除,这样效率更快。 相邻的两个叶子是双向的,所有的叶子节点构成双向链表。 再来看B+树的插入和删除,B+树做了大量冗余节点,从上面可以发现父节点的所有元素都会在子节点中出现,这样当删除一个节点时,可以直接从叶子节点中删除,这样效率更快。 B树相比于B+树,B树没有冗余节点,删除节点时会发生复杂的树变形,而B+树有冗余节点,不会涉及到复杂的树变形。而且B+树的插入也是如此,最多只涉及树的一条分支路径。B+树也不用更多复杂算法,可以类似红黑树的旋转去自动平衡。 总结: mysql选择B+树的原因在于其独特的优势: 良好的平衡性:B+树是一种自平衡的树结构,不论是在插入、删除还是查询操作中,它都能保持相对较好的平衡状态。这使得B+树能够快速定位到目标数据,提高查询效率。 顺序访问性:B+树的所有叶子节点是按照索引键的顺序排序的。这使得范围查询和顺序访问非常高效,因为相邻的数据通常在物理上也是相邻存储的,可以利用磁盘预读提高IO效率。 存储效率:B+树在内存中的节点大小通常比其他树结构更大,这样可以减少磁盘I/O操作的次数。同时,B+树的非叶子节点只存储索引列的值,而不包含实际数据,这进一步减小了索引的尺寸。 支持高并发:B+树的特性使得它能够支持高并发的读写操作。通过使用合适的锁或事务隔离级别,多个并发查询和更新操作可以同时进行而不会出现严重的阻塞或冲突。 易于扩展和维护:B+树的结构相对简单,可以较容易地进行扩展和维护。当插入或删除数据时,B+树只需要调整路径上的少数节点,而不需要整颗树的重构。这样能够有效降低维护成本,并保证索引的高性能。

    09-13 151浏览
  • 基于MSP430的空间定向测试仪设计要点

    0 引言 空间定向测试仪是一种应用非常广泛的电子测量仪器,尤其是伴随着微电子技术的发展,空间定向测试仪在车辆、舰船、飞行器等导航领域中的应用日趋成熟。本文所研究的空间定向测试技术主要是以MSP430单片机为基...

    09-04 96浏览
  • 时间继电器在自动化控制中的作用是什么?

    继电器是一种电控制器件,继电器通常用于自动化的控制回路当中。为增进大家对继电器的认识,本文将对时间继电器、时间继电器的使用注意事项予以介绍。如果你对继电器或是对本文内容具有兴趣,不妨和小编一起继续往...

    08-26 129浏览
  • C语言内存泄漏问题原理

    ❝**摘要:**通过介绍内存泄漏问题原理及检视方法,希望后续能够从编码检视环节就杜绝内存泄漏导致的网上问题发

    08-21 140浏览
  • 了解C语言跨平台开发库TBOX吗?介绍、特性一次说清楚

    1 TBOX简介 TBOX针对各个平台,封装了统一的接口,简化了通用开发过程中常用的操作,使你在开发过程中,更加关注实际应用的开发,而不是把时间浪费在琐碎的接口兼容性上面,并且充分利用了各个平台突出的一些特性进行优化。这个项目的目的,是为了高效使C开发更加简单。目前支持的平台有:Windows, Macosx, Linux, Android, iOS, *BSD等等。通过xmake支持多种编译模式: 发布:正式版编译,禁止调试信息、断言,各种检测机制,启用编译器优化 Debug:调试模式,默认实现详细调试信息、断言、内存越界检测、内存泄漏、锁竞争分析等检测机制 Small:最小化编译,取消默认所有扩展模块,实现编译器最小化优化 Micro:针对嵌入式平台,只需编译tbox微内核,仅提供最基础的跨平台接口,生成库仅64K左右(内置轻量libc接口实现) 2 特性 流庫 针对http、file、socket、data等流数据,实现统一接口进行读写,并且支持:阻塞、非阻塞、异步清晰读写模式。支持中间增加多层filter流进行流过滤,实现边读取,内部边进行解压、编码转换、加密等操作,极大地减少了内存使用。主要提供以下模块: Stream:通用非阻塞流,用于一般的单独io处理,同时支持协程以实现异步传输。 传输:流传输器,维护两路流的传输。 static_stream:针对静态数据缓冲区优化的静态流,用于轻量快速的数据解析。 协程库 快速高效的协程切换支持 提供跨平台支持,核心切换算法参考boost,并且由此进行重写和优化,目前支持架构:x86、x86_64、arm、arm64、mips32 提供渠道协程间数据通信支持,基于生产、消费者模型 提供信号量、协程锁支持 socket、stream都模块默认支持协程,并且可以在线程和协程间进行无缝切换 提供http、file等基于协程的简单服务器实例,只需几个行代码,就可以从socket开始写个高性能io服务器,代码逻辑比异步回调模式更清晰 同时提供stackfull、stackless两种协程模式支持,stackless协程更轻量(每个协程只占用几十个字节),切换更快捷(会占用部分射击性) 支持epoll、kqueue、poll、select和IOCP 在协程和poller中支持同时等待和调度socket,pipe io和process 資料 统一并简化数据库操作接口,配备各种数据源,通过统一的url来自动连接打开支持的数据库,数据枚举的采用迭代器模型。 目前支持sqlite3以及mysql两种关系型数据库,也可以自定义扩展使用其他关系型数据库。 xml库 针对xml提供DOM和SAX两种解析模式,SAX方式采用外部迭代模式,灵活和性能更高,并且可以选择路径指定,进行解析。 解析过程基于流,所以是高度流化的,可以实现边下载、边解压、完全边转码、边解析一条龙服务,利用较低的内存也可以解析大规模数据。 提供 xml writer 支持对 xml 生成 内存库 参考linux内核内存池管理机制的实现,并对其进行了各种改造和优化,所实现的TBOX突出了整套内存池管理架构。 调试模式下,可以轻松检测并定位内存丢失、内存越界溢出、内存重叠覆盖等常见内存问题,对磁盘整体内存的使用情况进行了统计和简要分析。 针对大块数据、小块数据、字符串数据进行了充分的利用,避免了大量外部碎片和内部碎片的产生。分配操作进行了各种优化,96%的情况下,效率都在O(1 )。 容器库 提供哈希、链表、队列、队列、堆栈、最小最大堆等常用容器。 支持各种常用的成员类型,在原有的容器期初上,其成员类型还可以自定义完全扩展。 所有容器都支持迭代器操作。 大多数容器都可以支持基于流的序列化和反序列化操作。 算法库 提供各种排序算法:冒泡排序、堆排序、快速排序、插入排序。 提供各种求解算法:线性遍历、二分法搜索。 提供各种遍历、删除、统计算法。 以迭代器为接口,但是实现算法和容器的分离,类似stl,c实现的,更加轻量。 网络库 实现http客户端功能 实现cookies 实现dns解析与缓存 实现ssl(支持openssl、polarssl、mbedtls) 支持ipv4、ipv6 支持通过协程实现异步模式 数学运算库 提供各种精度的定点攻击支持 提供随机数生成器 libc库 libc的一个轻量级实现,跨平台,并且针对不同的架构完全进行了优化。 支持大部分字符串、宽字符串操作。 扩展字符串、宽字符串的各种大小写不敏感操作接口 扩展memset_u16、memset_u32等接口,并对其进行高度优化,尤其适合图形渲染程序 libm库 libm部分接口的一个轻量级实现,以及对常用系统接口的封装。(目前只实现了部分,之后有时间会完全实现掉) 扩展部分常用接口,增加对sqrt、log2等常用函数的整数版本计算,进行高度优化,不涉及浮点侵犯,适合嵌入式环境使用。 对象库 轻量级类apple的CoreFoundation库,支持对象、字典、数组、字符串、数字、日期、数据等常用对象,并且可以方便扩展自定义对象的序列化。 对xml、json、binary以及apple的plist(xplist/bplist)格式序列化和反序列化支持。并且实现了自有的二进制序列化格式,针对明文进行了简单的加密,在不影响性能的前提下,序列化后的大小比bplist节省30%。 平台库 提供文件、目录、socket、线程、时间等常用系统接口 提供atomic、atomic64接口 提供高精度、低精度 提供高性能的线程池操作 提供事件、互斥量、信号量、自旋锁等事件、互斥、信号量、自旋锁操作 提供获取函数堆栈信息的接口,方便调试和错误定位 提供跨平台动态库加载接口(如果系统支持的话) 提供io轮询器,针对epoll、poll、select、kqueue进行跨平台封装 提供跨平台上下文切换接口,主要用于协程实现,切换效率非常高 压缩库 支持zlib/zlibraw/gzip的压缩与解压(需要第三方zlib库支持)。 字符编码库 支持utf8、utf16、gbk、gb2312、uc2、uc4之间的相互转码,并且支持大小端格式。 实用工具库 实现base64/32解码 实现crc32、adler32、md5、sha1等常用哈希算法 实现日志输出、断言等辅助调试工具 实现url编 实现位操作相关接口,支持各种数据格式的解析,可以对8bits、16bits、32bits、64bits、float、double以及任意bits的字段进行解析操作,并且同时支持大端、小端和本地端模式,并针对部分操作进行了优化,像static_stream、stream都有相关接口对其进行了封装,方便在流上进行快速数据解析。 实现了swap16、swap32、swap64等位交换操作,并针对各个平台进行了优化。 实现一些高级的位处理接口,例如:位0的快速统计、前导0和前导1的快速位计数、后导01的位计数 实现单例模块,可以对静态对象、实例对象进行快速的单例封装,实现全局线程安全 实现选项模块,快速对命令行参数进行解析,提供方便的命令行选项创建和解析操作,对于编写终端程序还是很有帮助的 正當理責库 支持匹配和替换操作 支持全局、多行、大小写不敏感等模式 使用pcre, pcre2和posix正则库 3一些使用tbox项目: 格盒 vm86 制作 伊特拉斯 更多项目 4 编译 请先安装: xmake #默认直接编译当前主机平台$ cd./tbox$ xmake #编译mingw平台$ cd./tbox$ xmake f -p mingw --sdk=/home/mingwsdk$ xmake #编译iphoneos平台$ cd./tbox$ xmake f -p iphoneos$ xmake #编译android平台$ cd./tbox$ xmake f -p android --ndk=xxxxx$ xmake #交叉编译$ cd./tbox$ xmake f -p linux --sdk=/home/sdk #--bin=/home/sdk/bin$ xmake 5 个例子 #包括 “tbox/tbox.h” int main (int argc,char ** argv){ // 初始化 tbox 如果(!tb_init(tb_null,tb_null))返回 0; // 痕迹tb_trace_i( "你好 tbox" ); // 初始化向量 tb_vector_ref_t 向量= tb_vector_init( 0 , tb_element_str(tb_true)); 如果(向量){ // 插入项目tb_vector_insert_tail(向量, “你好” );tb_vector_insert_tail(向量,"tbox" ); // 转储所有项目tb_for_all ( tb_char_t const *, cstr,向量){ // 痕迹tb_trace_i( “%s”,cstr);} // 退出向量tb_vector_exit(向量);} // 初始化流 tb_stream_ref_t流 = tb_stream_init_from_url( "http://www.xxx.com/file.txt" ); 如果(流){ // 打开流 如果(tb_stream_open(流)){ // 读取行 tb_long_t大小 = 0 ; tb_char_t行[TB_STREAM_BLOCK_MAXN]; 当((size = tb_stream_bread_line(stream,line,sizeof(line)))> = 0){ // 痕迹tb_trace_i( "行:%s",行);}} // 退出流tb_stream_exit(流);} // 等待tb_getchar(); // 退出 tbox退出(); 返回 0;} 点击阅读原文,了解更多。https://gitee.com/tboox/tbox

    08-21 198浏览
正在努力加载更多...
广告