使用智能指针std::shared_ptr实现
Linux开发架构之路 2024-11-21

无锁栈(lock-free stack)

无锁数据结构意味着线程可以并发地访问数据结构而不出错。例如,一个无锁栈能同时允许一个线程压入数据,另一个线程弹出数据。不仅如此,当调度器中途挂起其中一个访问线程时,其他线程必须能够继续完成自己的工作,而无需等待挂起线程。

无锁栈一个很大的问题在于,如何在不加锁的前提下,正确地分配和释放节点的内存,同时不引起逻辑错误和程序崩溃。

一、使用智能指针std::shared_ptr实现

一个最朴素的想法是,使用智能指针管理节点。事实上,如果平台支持std::atomic_is_lock_free(&some_shared_ptr)实现返回true,那么所有内存回收问题就都迎刃而解了(我在X86和Arm平台测试,均返回false)。示例代码(文件命名为lock_free_stack.h)如下:

#pragma once #include #include  template <typename T>class LockFreeStack { public: LockFreeStack(): head_(nullptr) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } } LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return std::atomic_load(&head_) == nullptr; }  void Push(const T& data) { const auto new_node = std::make_shared(data); new_node->next = std::atomic_load(&head_); // If new_node->next is the same as head_, update head_ to new_node and // return true. // If new_node->next and head_ are not equal, update new_node->next to head_ // and return false. while ( !std::atomic_compare_exchange_weak(&head_, &new_node->next, new_node)) { // Do nothing and wait for the head_ is updated to new_node. } }  std::shared_ptrPop() { std::shared_ptrold_head = std::atomic_load(&head_); // If old_head is not a null pointer and it is the same as head_, update // head_ to old_head->next and return true. // If old_head is not a null pointer and it is not equal to head_, update // old_head to head_ and return false. while (old_head != nullptr && !std::atomic_compare_exchange_weak( &head_, &old_head, std::atomic_load(&old_head->next))) { // Do nothing and wait for the head_ is updated to old_head->next. }  if (old_head != nullptr) { std::atomic_store(&old_head->next, std::shared_ptr()); return old_head->data; }  return std::shared_ptr(); }  private: struct Node { // std::make_shared does not throw an exception. Node(const T& input_data) : data(std::make_shared(input_data)), next(nullptr) {}  std::shared_ptrdata; std::shared_ptrnext; };  std::shared_ptrhead_;};

上述代码中,希望借助std::shared_ptr<>来完成节点内存的动态分配和回收,因为其有内置的引用计数机制。不幸地是,虽然std::shared_ptr<>虽然可以用于原子操作,但在大多数平台上不是无锁的,需要通过C++标准库添加内部锁来实现原子操作,这样会带来极大的性能开销,无法满足高并发访问的需求。

如果编译器支持C++20标准std::atomic允许用户原子地操纵std::shared_ptr,即在确保原子操作的同时,还能正确地处理引用计数。与其他原子类型一样,其实现也不确定是否无锁。使用std::atomic实现无锁栈(表面上看肯定无锁,实际上是否无锁取决于std::atomic的is_lock_free函数返回值是否为true)的示例代码(文件命名为 lock_free_stack.h)如下:

#pragma once #include #include  template <typename T>class LockFreeStack { public: LockFreeStack() : head_(nullptr) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } } LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return std::atomic_load(&head_) == nullptr; }  void Push(const T& data) { const auto new_node = std::make_shared(data); std::shared_ptrold_head = head_.load(); new_node->next = old_head; // If old_head is the same as head_, update head_ to new_node and return // true. If old_head and head_ are not equal, update old_head to head_ and // return false. while (!head_.compare_exchange_weak(old_head, new_node)) { new_node->next = old_head; } }  std::shared_ptrPop() { std::shared_ptrold_head = head_.load(); // If old_head is not a null pointer and it is the same as head_, update // head_ to old_head->next and return true. // If old_head is not a null pointer and it is not equal to head_, update // old_head to head_ and return false. while (old_head != nullptr && !head_.compare_exchange_weak(old_head, old_head->next.load())) { // Do nothing and wait for the head_ is updated to old_head->next. }  if (old_head != nullptr) { old_head->next = std::shared_ptr(); return old_head->data; }  return std::shared_ptr(); }  private: struct Node { // std::make_shared does not throw an exception. Node(const T& input_data) : data(std::make_shared(input_data)), next(nullptr) {}  std::shared_ptrdata; std::atomic<std::shared_ptr> next; };  // Compilation error: /usr/include/c++/9/atomic:191:21: error: static // assertion failed: std::atomic requires a trivially copyable type // static_assert(__is_trivially_copyable(_Tp), std::atomic<std::shared_ptr> head_;};

我的编译器目前只支持C++17标准,上述代码会出现如下编译错误:

In file included from /home/zhiguohe/code/excercise/lock_freee/lock_free_stack_with_shared_ptr_cpp/lock_free_stack_with_shared_ptr.h:3, from /home/zhiguohe/code/excercise/lock_freee/lock_free_stack_with_shared_ptr_cpp/lock_free_stack_with_shared_ptr.cpp:1:/usr/include/c++/9/atomic: In instantiation of ‘struct std::atomic::Node> >’:/home/zhiguohe/code/excercise/lock_freee/lock_free_stack_with_shared_ptr_cpp/lock_free_stack_with_shared_ptr.h:61:38:   required from ‘class LockFreeStack/home/zhiguohe/code/excercise/lock_freee/lock_free_stack_with_shared_ptr_cpp/lock_free_stack_with_shared_ptr.cpp:16:22:   required from here/usr/include/c++/9/atomic:191:21: error: static assertion failed: std::atomic requires a trivially copyable type 191 |       static_assert(__is_trivially_copyable(_Tp), | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~make[2]: *** [CMakeFiles/lock_free_stack_with_shared_ptr_cpp.dir/build.make:63: CMakeFiles/lock_free_stack_with_shared_ptr_cpp.dir/lock_free_stack_with_shared_ptr.cpp.o] Error 1make[1]: *** [CMakeFiles/Makefile2:644: CMakeFiles/lock_free_stack_with_shared_ptr_cpp.dir/all] Error 2make: *** [Makefile:117: all] Error 2

二、手动管理内存——使用简单的计数器判断是否存在线程调用Pop函数

2.1 不考虑放宽内存顺序

如果编译器不支持C++20标准,我们需要手动管理节点的内存分配和回收。一种简单的思路是,判断当前有无线程访问Pop函数,如果不存在,则删除所有弹出的节点,否则将弹出的节点存储到待删除列表to_be_deleted_中,等到最终无线程访问Pop函数时再释放to_be_deleted_。下面展示该思路的实现代码(文件命名为 lock_free_stack.h,示例来源于C++ Concurrency In Action, 2ed 2019,修复了其中的bug):

#pragma once #include #include  template <typename T>class LockFreeStack { public: LockFreeStack() : head_(nullptr), to_be_deleted_(nullptr), threads_in_pop_(0) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } } LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return head_.load() == nullptr; }  void Push(const T& data) { Node* new_node = new Node(data); new_node->next = head_.load(); // If new_node->next is the same as head_, update head_ to new_node and // return true. // If new_node->next and head_ are not equal, update new_node->next to head_ // and return false. while (!head_.compare_exchange_weak(new_node->next, new_node)) { // Do nothing and wait for the head_ is updated to new_node. } }  std::shared_ptrPop() { Node* old_head = head_.load(); // If old_head is not a null pointer and it is the same as head_, update // head_ to old_head->next and return true. // If old_head is not a null pointer and it is not equal to head_, update // old_head to head_ and return false. while (old_head != nullptr && !head_.compare_exchange_weak(old_head, old_head->next)) { // Do nothing and wait for the head_ is updated to old_head->next. }  // return old_head != nullptr ? old_head->data : std::shared_ptr();  std::shared_ptrres; if (old_head != nullptr) { ++threads_in_pop_; res.swap(old_head->data); // Reclaim deleted nodes. TryReclaim(old_head); }  return res; }  ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } }  private: // If the struct definition of Node is placed in the private data member // field where 'head_' is defined, the following compilation error will occur: // // error: 'Node' has not been declared ... // // It should be a bug of the compiler. The struct definition of Node is put in // front of the private member function `DeleteNodes` to eliminate this error. struct Node { // std::make_shared does not throw an exception. Node(const T& input_data) : data(std::make_shared(input_data)), next(nullptr) {}  std::shared_ptrdata; Node* next; };  private: static void DeleteNodes(Node* nodes) { while (nodes != nullptr) { Node* next = nodes->next; delete nodes; nodes = next; } }  void ChainPendingNodes(Node* first, Node* last) { last->next = to_be_deleted_; // If last->next is the same as to_be_deleted_, update head_ to first and // return true. // If last->next and to_be_deleted_ are not equal, update last->next to // to_be_deleted_ and return false. while (!to_be_deleted_.compare_exchange_weak(last->next, first)) { // Do nothing and wait for the to_be_deleted_ is updated to first. } }  void ChainPendingNodes(Node* nodes) { Node* last = nodes; while (Node* next = last->next) { last = next; }  ChainPendingNodes(nodes, last); }  void ChainPendingNode(Node* n) { ChainPendingNodes(n, n); }  void TryReclaim(Node* old_head) { if (old_head == nullptr) { return; }  if (threads_in_pop_ == 1) { Node* nodes_to_delete = to_be_deleted_.exchange(nullptr);  if (!--threads_in_pop_) { DeleteNodes(nodes_to_delete); } else if (nodes_to_delete) { ChainPendingNodes(nodes_to_delete); }  delete old_head; } else { ChainPendingNode(old_head); --threads_in_pop_; } }  private: std::atomic head_; std::atomic to_be_deleted_; std::atomic<unsigned> threads_in_pop_;};

上述代码通过计数的方式来回收节点的内存。当栈处于低负荷状态时,这种方式没有问题。然而,删除节点是一项非常耗时的工作,并且希望其他线程对链表做的修改越少越好。从第一次发现threads_in_pop_是1,到尝试删除节点,会用耗费很长的时间,就会让线程有机会调用Pop(),让threads_in_pop_不为0,阻止节点的删除操作。栈处于高负荷状态时,因为其他线程在初始化后都能使用Pop(),所以待删除节点的链表to_be_deleted_将会无限增加,会再次泄露。另一种方式是,确定无线程访问给定节点,这样给定节点就能回收,这种最简单的替换机制就是使用风险指针(hazard pointer)和引用计数。我们将在后续示例中讲解。

2.2 放宽内存顺序

上述实现代码的所有原子操作函数没给出内存顺序,默认使用的都是std::memory_order_seq_cst(顺序一致序)。std::memory_order_seq_cst比起其他内存序要简单得多,在顺序一致序下,所有操作(包括原子与非原子的操作)都与代码顺序一致,符合人类正常的思维逻辑,但消耗的系统资源相对会更高。任何一个无锁数据的实现,内存顺序都应当从std::memory_order_seq_cst开始。只有当基本操作正常工作的时候,才可考虑放宽内存顺序的选择。通常,放松后的内存顺序很难保证在所有平台工作正常。除非性能真正成了瓶颈,否则不必考虑放宽内存顺序。如果追求极致性能,需要部分放宽内存顺序。实际上,内存顺序仅对ARM嵌入式平台的性能产生较大的影响,在X86平台上几乎无影响,X86编译器实现的内存顺序似乎都是std::memory_order_seq_cst(顺序一致序)。放宽内存顺序的基本原则为:如原子操作不需要和其他操作同步,可使用std::memory_order_relaxed(自由序,或称松弛序);写入操作一般使用std::memory_order_release(释放序),读入操作一般使用std::memory_order_acquire(获取序)。放宽内存顺序需要严格测试,尤其是在嵌入式平台上。如无把握,并且性能瓶颈不严重,建议一律不填写内存顺序参数,也就是使用默认的std::memory_order_seq_cst(顺序一致序)。下面给出参考的放宽内存顺序代码(文件命名为lock_free_stack.h),不能保证所有平台都会正确:

#pragma once #include #include  template <typename T>class LockFreeStack { public: LockFreeStack() : head_(nullptr), to_be_deleted_(nullptr), threads_in_pop_(0) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } } LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return head_.load(std::memory_order_relaxed) == nullptr; }  void Push(const T& data) { Node* new_node = new Node(data); new_node->next = head_.load(std::memory_order_relaxed); // If new_node->next is the same as head_, update head_ to new_node and // return true. // If new_node->next and head_ are not equal, update new_node->next to head_ // and return false. while (!head_.compare_exchange_weak(new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed)) { // Do nothing and wait for the head_ is updated to new_node. } }  std::shared_ptrPop() { Node* old_head = head_.load(std::memory_order_relaxed); // If old_head is not a null pointer and it is the same as head_, update // head_ to old_head->next and return true. // If old_head is not a null pointer and it is not equal to head_, update // old_head to head_ and return false. while (old_head != nullptr && !head_.compare_exchange_weak(old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed)) { // Do nothing and wait for the head_ is updated to old_head->next. }  // return old_head != nullptr ? old_head->data : std::shared_ptr();  std::shared_ptrres; if (old_head != nullptr) { threads_in_pop_.fetch_add(1, std::memory_order_relaxed); res.swap(old_head->data); // Reclaim deleted nodes. TryReclaim(old_head); }  return res; }  private: // If the struct definition of Node is placed in the private data member // field where 'head_' is defined, the following compilation error will occur: // // error: 'Node' has not been declared ... // // It should be a bug of the compiler. The struct definition of Node is put in // front of the private member function `DeleteNodes` to eliminate this error. struct Node { // std::make_shared does not throw an exception. Node(const T& input_data) : data(std::make_shared(input_data)), next(nullptr) {}  std::shared_ptrdata; Node* next; };  private: static void DeleteNodes(Node* nodes) { while (nodes != nullptr) { Node* next = nodes->next; delete nodes; nodes = next; } }  void ChainPendingNodes(Node* first, Node* last) { last->next = to_be_deleted_.load(std::memory_order_relaxed); // If last->next is the same as to_be_deleted_, update head_ to first and // return true. // If last->next and to_be_deleted_ are not equal, update last->next to // to_be_deleted_ and return false. while (!to_be_deleted_.compare_exchange_weak(last->next, first, std::memory_order_release, std::memory_order_relaxed)) { // Do nothing and wait for the to_be_deleted_ is updated to first. } }  void ChainPendingNodes(Node* nodes) { Node* last = nodes; while (Node* next = last->next) { last = next; }  ChainPendingNodes(nodes, last); }  void ChainPendingNode(Node* n) { ChainPendingNodes(n, n); }  void TryReclaim(Node* old_head) { if (old_head == nullptr) { return; }  if (threads_in_pop_ == 1) { Node* nodes_to_delete = to_be_deleted_.exchange(nullptr, std::memory_order_relaxed);  if (!--threads_in_pop_) { DeleteNodes(nodes_to_delete); } else if (nodes_to_delete) { ChainPendingNodes(nodes_to_delete); }  delete old_head; } else { ChainPendingNode(old_head); threads_in_pop_.fetch_sub(1, std::memory_order_relaxed); } }  private: std::atomic head_; std::atomic to_be_deleted_; std::atomic<unsigned> threads_in_pop_;};

三、手动管理内存——使用风险指针(hazard pointer)标识正在访问的对象

风险指针(hazard pointer)之所以称为是风险的,是因为删除一个节点可能会让其他引用线程处于危险状态。其他线程持有已删除节点的指针对其进行解引用操作时,会出现未定义行为。其基本思想就是,当有线程去访问(其他线程)删除的对象时,会先对这个对象设置风险指针,而后通知其他线程——使用这个指针是危险的行为。当这个对象不再需要,就可以清除风险指针。当线程想要删除一个对象,就必须检查系统中其他线程是否持有风险指针。当没有风险指针时,就可以安全删除对象。否则,就必须等待风险指针消失。这样,线程就需要周期性的检查要删除的对象是否能安全删除。下面展示该思路的实现代码(文件命名为 lock_free_stack.h,示例来源于C++ Concurrency In Action, 2ed 2019,修复了其中的bug):

#pragma once #include #include #include #include  template <typename T>class LockFreeStack { public: LockFreeStack() : head_(nullptr), nodes_to_reclaim_(nullptr) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } } LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return head_.load() == nullptr; }  void Push(const T& data) { Node* new_node = new Node(data); new_node->next = head_.load(); // If new_node->next is the same as head_, update head_ to new_node and // return true. // If new_node->next and head_ are not equal, update new_node->next to head_ // and return false. while (!head_.compare_exchange_weak(new_node->next, new_node)) { // Do nothing and wait for the head_ is updated to new_node. } }  std::shared_ptrPop() { std::atomic<void*>& hp = GetHazardPointerForCurrentThread(); Node* old_head = head_.load();  do { Node* temp = nullptr; do { temp = old_head; hp.store(old_head); old_head = head_.load();  } while (old_head != temp); } while (old_head != nullptr && !head_.compare_exchange_strong(old_head, old_head->next));  hp.store(nullptr);  std::shared_ptrres;  if (old_head != nullptr) { res.swap(old_head->data);  if (IsOutstandingHazardPointerForNode(old_head)) { ReClaimLater(old_head); } else { delete old_head; }  DeleteNodesWithNoHazards(); }  return res; }  private: // If the struct definition of Node is placed in the private data member // field where 'head_' is defined, the following compilation error will occur: // // error: 'Node' has not been declared ... // // It should be a bug of the compiler. The struct definition of Node is put in // front of the private member function `DeleteNodes` to eliminate this error. struct Node { // std::make_shared does not throw an exception. Node(const T& input_data) : data(std::make_shared(input_data)), next(nullptr) {}  std::shared_ptrdata; Node* next; };  struct HazardPointer { std::atomic<std::thread::id> id; std::atomic<void*> pointer; };  class HazardPointerOwner { public: HazardPointerOwner(const HazardPointerOwner& other) = delete; HazardPointerOwner operator=(const HazardPointerOwner& other) = delete; HazardPointerOwner() : hp_(nullptr) { for (unsigned i = 0; i < kMaxHazardPointerNum; ++i) { std::thread::id old_id; if (hazard_pointers_[i].id.compare_exchange_strong( old_id, std::this_thread::get_id())) { hp_ = &hazard_pointers_[i]; break; } }  if (hp_ == nullptr) { throw std::runtime_error("No hazard pointers available."); } }  ~HazardPointerOwner() { hp_->pointer.store(nullptr); hp_->id.store(std::thread::id()); }  std::atomic<void*>& GetPointer() { return hp_->pointer; }  private: HazardPointer* hp_; };  template <typename DT> struct DataToReclaim { DataToReclaim(DT* p) : data(p), next(nullptr) {} ~DataToReclaim() { delete data; }  DT* data; DataToReclaim* next; };  private: static std::atomic<void*>& GetHazardPointerForCurrentThread() { static thread_local HazardPointerOwner hazard_owner; return hazard_owner.GetPointer(); } template <typename DT> void ReClaimLater(DT* data) { AddToReclaimList(new DataToReclaim(data)); }  bool IsOutstandingHazardPointerForNode(void* p) { for (unsigned i = 0; i < kMaxHazardPointerNum; ++i) { if (hazard_pointers_[i].pointer.load() == p) { return true; } }  return false; }  void AddToReclaimList(DataToReclaim* node) { if (node == nullptr) { return; }  node->next = nodes_to_reclaim_.load(); while (!nodes_to_reclaim_.compare_exchange_weak(node->next, node)) { // Do nothing. } }  void DeleteNodesWithNoHazards() { DataToReclaim* current = nodes_to_reclaim_.exchange(nullptr); while (current) { DataToReclaim* next = current->next; if (!IsOutstandingHazardPointerForNode(current->data)) { delete current; } else { AddToReclaimList(current); } current = next; } }  private: static constexpr unsigned kMaxHazardPointerNum = 200; static HazardPointer hazard_pointers_[kMaxHazardPointerNum];  std::atomic nodes_to_reclaim_; std::atomic head_;}; // Static member array initialization. The syntax is ugly.template <typename T>typename LockFreeStack::HazardPointer LockFreeStack::hazard_pointers_[kMaxHazardPointerNum];

注意我们之前的比较交换操作用的是compare_exchange_weak函数,而在以下代码中使用的是compare_exchange_strong函数:

 do { Node* temp = nullptr; do { temp = old_head; hp.store(old_head); old_head = head_.load();  } while (old_head != temp); } while (old_head != nullptr && !head_.compare_exchange_strong(old_head, old_head->next));

compare_exchange_weak函数的优点是比较交换动作消耗的资源较少,但经常存在操作系统调度引起的虚假失败;compare_exchange_strong函数的优点是不存在操作系统调度引起的虚假失败,但比较交换动作消耗的资源较多。选择二者依据是:如果while循环体中没有任何操作或者while循环体中的操作消耗资源非常少,则使用compare_exchange_weak函数;反之,如果while循环体中的操作消耗资源比较多,则使用compare_exchange_strong函数。对应到上述代码,大while循环中嵌套了一个小while循环,小while循环中包含三条语句,消耗资源较多,compare_exchange_weak函数虚假失败带来的资源消耗会超过compare_exchange_strong函数比较操作消耗的资源,因此选用compare_exchange_strong函数。如果觉得仍然难以把握,建议反复大数据量测试两种方式的实际资源消耗,最终选出合适的函数版本。

以下代码:

template <typename T>typename LockFreeStack::HazardPointer LockFreeStack::hazard_pointers_[kMaxHazardPointerNum];

是定义静态成员数组hazard_pointers_[kMaxHazardPointerNum],也就是我们通常所说的静态成员数组初始化。语法相当丑陋,但是只能这么写。使用风险指针的方法实现内存回收虽然很简单,也的确安全地回收了删除的节点,不过增加了很多开销。遍历风险指针数组需要检查kMaxHazardPointerNum个原子变量,并且每次Pop()调用时,都需要再检查一遍。原子操作很耗时,所以Pop()成为了性能瓶颈,不仅需要遍历节点的风险指针链表,还要遍历等待链表上的每一个节点。有kMaxHazardPointerNum在链表中时,就需要检查kMaxHazardPointerNum个已存储的风险指针。

四、手动管理内存——使用引用计数判断节点是否未被访问

判断弹出的节点是否能被删除的另一种思路是,当前被删除的节点是否存在线程访问,如果不存在就删除,否则就等待。该思路与智能指针的引用计数思路一致。具体做法为:对每个节点使用两个引用计数:内部计数外部计数。两个值的总和就是对这个节点的引用数。外部记数与节点指针绑定在一起,节点指针每次被线程读到时,外部计数加1。当线程结束对节点的访问时,内部计数减1。当节点(内部包含节点指针和绑定在一起的外部计数)不被外部线程访问时,将内部计数外部计数-2相加并将结果重新赋值给内部计数,同时丢弃外部计数。一旦内部计数等于0,表明当前节点没有被外部线程访问,可安全地将节点删除。实现代码如下(文件命名为 lock_free_stack.h):

4.1 不考虑内存顺序

#pragma once #include#include templateclass LockFreeStack { public: LockFreeStack() : head_(CountedNodePtr()) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } }  // Copy constructs and copy assignments are prohibited LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return head_.load().ptr == 0; }  bool IsLockFree() const { return std::atomic::is_always_lock_free; }  void Push(const T& data) { // Construct a `CountedNodePtr` object that refers to a freshly allocated // node with associated data and set the next value of the node to the // current value of head_. CountedNodePtr new_node; new_node.ptr = reinterpret_cast(new Node(data)); new_node.external_count = 1; reinterpret_cast(new_node.ptr)->next = head_.load();  // Use compare_exchange_weak() to set the value of `head_` to the // `new_node`. The counts are set up so the `internal_count` is zero, and // the `external_count` is one. Because this is a new node, there’s // currently only one external reference to the node (the `head_` pointer // itself).  // If new_node.ptr->next is the same as head_, update head_ to // new_node and return true. If new_node.ptr->next and head_ are not equal, // update new_node.ptr->next to head_ and return false. while (!head_.compare_exchange_weak( reinterpret_cast(new_node.ptr)->next, new_node)) { // Do nothing and wait for the head_ is updated to new_node. } }  std::shared_ptrPop() { CountedNodePtr old_head = head_.load();  while (true) { // Once we've loaded the value of `head_`, we must first increase the // count of external references to the `head_` node to indicate that we’re // referencing it and to ensure that it’s safe to dereference it. If we // dereference the pointer before increasing the reference count, another // thread could free the node before we access it, leaving we with a // dangling pointer. This is the primary reason for using the split // reference count: by incrementing the external reference count, we // ensure that the pointer remains valid for the duration of our access. IncreaseHeadCount(&old_head);  // Once the count has been increased, we can safely dereference the `ptr` // field of the value loaded from `head_` in order to access the // pointed-to node Node* ptr = reinterpret_cast(old_head.ptr);  // If the pointer is a null pointer, we’re at the end of the list: no more // entries. if (ptr == nullptr) { return std::shared_ptr(); }  // If the pointer isn’t a null pointer, we can try to remove the node by a // compare_exchange_strong() call on the `head_`. if (head_.compare_exchange_strong(old_head, ptr->next)) { // If the compare_exchange_strong() succeeds, we've taken ownership of // the node and can swap out the data in preparation for returning it. // This ensures that the data isn’t kept alive just because other // threads accessing the stack happen to still have pointers to its // node. std::shared_ptrres; res.swap(ptr->data);  // It’s important to note that the value we add is two less than the // external count; we've removed the node from the list, so we drop // one off the count for that, and we’re no longer accessing the node // from this thread, so we drop another off the count for that. const int increased_count = old_head.external_count - 2;  // Add the external count to the internal count on the node with an // atomic `fetch_add`. If the reference count is now zero, the previous // value (which is what fetch_add returns) was the negative of what we // added, in which case we can delete the node. if (ptr->internal_count.fetch_add(increased_count) == -increased_count) { delete ptr; }  // Whether or not we deleted the node, we've finished, so we can return // the data. return res;  // If the compare/exchange fails, another thread removed our node // before we did, or another thread added a new node to the stack. // Either way, we need to start again with the fresh value of head // returned by the compare/exchange call. But first we must decrease the // reference count on the node we were trying to remove. This thread // won’t access it anymore. If we’re the last thread to hold a // reference (because another thread removed it from the stack), the // internal reference count will be 1, so subtracting 1 will set the // count to zero. In this case, we can delete the node here before we // loop. } else if (ptr->internal_count.fetch_add(-1) == 1) {  delete ptr; } } }  private: // Forward class declaration struct Node;  struct CountedNodePtr { CountedNodePtr() : external_count(0), ptr(0) {}  // We know that the platform has spare bits in a pointer (for example, // because the address space is only 48 bits but a pointer is 64 bits), we // can store the count inside the spare bits of the pointer to fit it all // back in a single machine word. uint16_t external_count : 16; uint64_t ptr : 48; };  struct Node { // std::make_shared does not throw an exception. explicit Node(const T& input_data) : data(std::make_shared(input_data)), internal_count(0) {}  std::shared_ptrdata; std::atomicinternal_count; CountedNodePtr next; };  private: void IncreaseHeadCount(CountedNodePtr* old_counter) { CountedNodePtr new_counter; // The increment is done with a compare_exchange_strong() loop, which // compares and sets the whole structure to ensure that the pointer hasn’t // been changed by another thread in the meantime. do { new_counter = *old_counter; ++new_counter.external_count; } while (!head_.compare_exchange_strong(*old_counter, new_counter));  old_counter->external_count = new_counter.external_count; }  private: std::atomichead_;};

上述代码中,值得特别指出的是,带引用计数的节点指针结构体CountedNodePtr使用了位域的概念:

 struct CountedNodePtr { CountedNodePtr() : external_count(0), ptr(0) {}  // We know that the platform has spare bits in a pointer (for example, // because the address space is only 48 bits but a pointer is 64 bits), we // can store the count inside the spare bits of the pointer to fit it all // back in a single machine word. uint16_t external_count : 16; uint64_t ptr : 48; };

ptr的真实类型是Node*,但这里给出的却是占据48位内存空间的无符整型uint64_t。为什么要这么做?现在主流的操作系统和编译器只支持最多8字节数据类型的无锁操作,即std::atomic的成员函数is_lock_free只有在sizeof(CountedNodePtr) <= 8时才会返回true。因此,必须将CountedNodePtr的字节数控制8以内,于是我们想到了位域。在主流的操作系统中,指针占用的空间不会超过48位(如果超过该尺寸则必须重新设计位域大小,请查阅操作系统使用手册确认),为此将external_count分配16位(最大支持65535),ptr分配48位,合计64位(8字节)。此时,std::atomic的成员函数is_lock_free在主流操作系统中都会返回true,是真正的无锁原子变量。为了适应上述更改,必须使用reinterpret_cast(new_node.ptr)完成ptr从uint64_t到Node*类型的转换,使用reinterpret_cast(new Node(data)完成指针变量从ptr从Node*到uint64_t类型的转换,从而正常地存储于ptr中。注意:external_count的计数只自增,不自减。当没有线程访问节点时,直接丢弃external_count。注意:internal_count的计数只自减,不自增,另外还与外部计数external_count - 2相加合并。

4.2 考虑内存顺序

修改内存顺序之前,需要检查一下操作间的依赖关系,再去确定适合这种关系的最佳内存序。为了保证这种方式能够工作,需要从线程的视角进行观察。其中最简单的视角就是向栈中推入一个数据项,之后让其他线程从栈中弹出这个数据项。这里需要三个重要数据参与。

  1. CountedNodePtr转移的数据head_

  2. head_引用的Node

  3. 节点所指向的数据项。

执行Push()的线程,会先构造数据项,并设置head_。执行Pop()的线程,会先加载head_,再做“比较/交换”操作,并增加引用计数,读取对应的Node节点,获取next的指向值。next的值是非原子对象,所以为了保证读取安全,必须确定存储(推送线程)和加载(弹出线程)的先行(happens-before)关系。因为原子操作就是Push()函数中的compare_exchange_weak(),所以需要获取两个线程间的先行(happens-before)关系。compare_exchange_weak()必须是std::memory_order_release或更严格的内存序。不过,compare_exchange_weak()调用失败时,什么都不会改变,并且可以持续循环下去,所以使用std::memory_order_relaxed就足够了。

Pop()的实现呢?为了确定先行(happens-before)关系,必须在访问next值之前使用std::memory_order_acquire或更严格的内存序操作。因为,IncreaseHeadCount()中使用compare_exchange_strong(),会获取next指针指向的旧值,所以要其获取成功就需要std::memory_order_acquire。如同调用Push()那样,当交换失败,循环会继续,所以在失败时可使用std::memory_order_relaxed。

compare_exchange_strong()调用成功时,ptr中的值就被存到old_counter中。存储操作是Push()中的一个释放操作,compare_exchange_strong()操作是一个获取操作,现在存储同步于加载,并且能够获取先行(happens-before)关系。因此,Push()中存储ptr的值要先行于在Pop()中对ptr->next的访问,目前的操作完全安全。

内存序对head_.load()的初始化并不妨碍分析,现在就可以使用std::memory_order_relaxed。

接下来compare_exchange_strong()将old_head.ptr->next设置为head_。是否需要做什么来保证操作线程中的数据完整性呢?交换成功就能访问ptr->data,所以需要保证在Push()线程中对ptr->data进行存储(在加载之前)。increase_head_count()中的获取操作,保证与Push()线程中的存储和“比较/交换”操作同步。在Push()线程中存储数据,先行于存储head_指针;调用increase_head_count()先行于对ptr->data的加载。即使,Pop()中的“比较/交换”操作使用std::memory_order_relaxed,这些操作还是能正常运行。唯一不同的地方就是,调用swap()让ptr->data有所变化,且没有其他线程可以对同一节点进行操作(这就是“比较/交换”操作的作用)。

compare_exchange_strong()失败时,新值不会更新old_head,并继续循环。因为确定了std::memory_order_acquire内存序在IncreaseHeadCount()中使用的可行性,所以使用std::memory_order_relaxed也可以。

其他线程呢?是否需要设置一些更为严格的内存序来保证其他线程的安全呢?回答是“不用”。因为,head_只会因“比较/交换”操作有所改变,对于“读-改-写”操作来说,Push()中的“比较/交换”操作是构成释放序列的一部分。因此,即使有很多线程在同一时间对head_进行修改,Push()中的compare_exchange_weak()与IncreaseHeadCount()(读取已存储的值)中的compare_exchange_strong()也是同步的。

剩余的就可以用来处理fetch_add()操作(用来改变引用计数的操作),因为已知其他线程不可能对该节点的数据进行修改,所以从节点中返回数据的线程可以继续执行。不过,当线程获取修改后的值时,就代表操作失败(swap()是用来提取数据项的引用)。为了避免数据竞争,要保证swap()先行于delete操作。一种简单的解决办法:在“成功返回”分支中对fetch_add()使用std::memory_order_release内存序,在“再次循环”分支中对fetch_add()使用std::memory_order_acquire内存序。不过,这有点矫枉过正:只有一个线程做delete操作(将引用计数设置为0的线程),所以只有这个线程需要获取操作。因为fetch_add()是一个“读-改-写”操作,是释放序列的一部分,所以可以使用一个额外的load()做获取。当“再次循环”分支将引用计数减为0时,fetch_add()可以重载引用计数,使用std::memory_order_acquire为了保持需求的同步关系。并且,fetch_add()本身可以使用std::memory_order_relaxed。

完整的放宽内存顺序的代码如下(文件命名为 lock_free_stack.h),不能保证所有平台都会正确:

#pragma once #include#include templateclass LockFreeStack { public: LockFreeStack() : head_(CountedNodePtr()) {} ~LockFreeStack() { while (Pop()) { // Do nothing and wait for all elements are poped. } }  // Copy constructs and copy assignments are prohibited LockFreeStack(const LockFreeStack& other) = delete; LockFreeStack& operator=(const LockFreeStack& other) = delete;  bool IsEmpty() const { return head_.load(std::memory_order_relaxed).ptr == 0; }  bool IsLockFree() const { return std::atomic::is_always_lock_free; }  void Push(const T& data) { // Construct a `CountedNodePtr` object that refers to a freshly allocated // node with associated data and set the next value of the node to the // current value of head_. CountedNodePtr new_node; new_node.ptr = reinterpret_cast(new Node(data)); new_node.external_count = 1; reinterpret_cast(new_node.ptr)->next = head_.load(std::memory_order_relaxed);  // Use compare_exchange_weak() to set the value of `head_` to the // `new_node`. The counts are set up so the `internal_count` is zero, and // the `external_count` is one. Because this is a new node, there’s // currently only one external reference to the node (the `head_` pointer // itself).  // If new_node.ptr->next is the same as head_, update head_ to // new_node and return true. If new_node.ptr->next and head_ are not equal, // update new_node.ptr->next to head_ and return false. while (!head_.compare_exchange_weak( reinterpret_cast(new_node.ptr)->next, new_node, std::memory_order_release, std::memory_order_relaxed)) { // Do nothing and wait for the head_ is updated to new_node. } }  std::shared_ptrPop() { CountedNodePtr old_head = head_.load(std::memory_order_relaxed);  while (true) { // Once we've loaded the value of `head_`, we must first increase the // count of external references to the `head_` node to indicate that we’re // referencing it and to ensure that it’s safe to dereference it. If we // dereference the pointer before increasing the reference count, another // thread could free the node before we access it, leaving we with a // dangling pointer. This is the primary reason for using the split // reference count: by incrementing the external reference count, we // ensure that the pointer remains valid for the duration of our access. IncreaseHeadCount(&old_head);  // Once the count has been increased, we can safely dereference the `ptr` // field of the value loaded from `head_` in order to access the // pointed-to node Node* ptr = reinterpret_cast(old_head.ptr);  // If the pointer is a null pointer, we’re at the end of the list: no more // entries. if (ptr == nullptr) { return std::shared_ptr(); }  // If the pointer isn’t a null pointer, we can try to remove the node by a // compare_exchange_strong() call on the `head_`. if (head_.compare_exchange_strong(old_head, ptr->next, std::memory_order_relaxed, std::memory_order_relaxed)) { // If the compare_exchange_strong() succeeds, we've taken ownership of // the node and can swap out the data in preparation for returning it. // This ensures that the data isn’t kept alive just because other // threads accessing the stack happen to still have pointers to its // node. std::shared_ptrres; res.swap(ptr->data);  // It’s important to note that the value we add is two less than the // external count; we've removed the node from the list, so we drop // one off the count for that, and we’re no longer accessing the node // from this thread, so we drop another off the count for that. const int increased_count = old_head.external_count - 2;  // Add the external count to the internal count on the node with an // atomic `fetch_add`. If the reference count is now zero, the previous // value (which is what fetch_add returns) was the negative of what we // added, in which case we can delete the node. if (ptr->internal_count.fetch_add(increased_count, std::memory_order_release) == -increased_count) { delete ptr; }  // Whether or not we deleted the node, we've finished, so we can return // the data. return res;  // If the compare/exchange fails, another thread removed our node // before we did, or another thread added a new node to the stack. // Either way, we need to start again with the fresh value of head // returned by the compare/exchange call. But first we must decrease the // reference count on the node we were trying to remove. This thread // won’t access it anymore. If we’re the last thread to hold a // reference (because another thread removed it from the stack), the // internal reference count will be 1, so subtracting 1 will set the // count to zero. In this case, we can delete the node here before we // loop. } else if (ptr->internal_count.fetch_add(-1, std::memory_order_relaxed) == 1) { ptr->internal_count.load(std::memory_order_acquire); delete ptr; } } }  private: // Forward class declaration struct Node;  struct CountedNodePtr { CountedNodePtr() : external_count(0), ptr(0) {}  // We know that the platform has spare bits in a pointer (for example, // because the address space is only 48 bits but a pointer is 64 bits), we // can store the count inside the spare bits of the pointer to fit it all // back in a single machine word. uint16_t external_count : 16; uint64_t ptr : 48; };  struct Node { // std::make_shared does not throw an exception. explicit Node(const T& input_data) : data(std::make_shared(input_data)), internal_count(0) {}  std::shared_ptrdata; std::atomicinternal_count; CountedNodePtr next; };  private: void IncreaseHeadCount(CountedNodePtr* old_counter) { CountedNodePtr new_counter; // The increment is done with a compare_exchange_strong() loop, which // compares and sets the whole structure to ensure that the pointer hasn’t // been changed by another thread in the meantime. do { new_counter = *old_counter; ++new_counter.external_count; } while (!head_.compare_exchange_strong(*old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed));  old_counter->external_count = new_counter.external_count; }  private: std::atomichead_;};

五、测试代码

下面给出测试无锁栈工作是否正常的简单测试代码(文件命名为:lock_free_stack.cpp):

#include "lock_free_stack.h" #include #include #include #include #include  namespace {constexpr size_t kElementNum = 10;constexpr size_t kThreadNum = 200;constexpr size_t kLargeThreadNum = 2000;} // namespace int main() { LockFreeStack<int> stack;  // Case 1: Single thread test for (size_t i = 0; i < kElementNum; ++i) { std::cout << "The data " << i << " is pushed in the stack.\n"; stack.Push(i); } std::cout << "stack.IsEmpty() == " << std::boolalpha << stack.IsEmpty() << std::endl; while (auto data = stack.Pop()) { std::cout << "Current data is : " << *data << '\n'; }  // Case 2: multi-thread test. Producers and consumers are evenly distributed std::vector<std::thread> producers1; std::vector<std::thread> producers2; std::vector<std::thread> consumers1; std::vector<std::thread> consumers2; for (size_t i = 0; i < kThreadNum; ++i) { producers1.emplace_back(&LockFreeStack<int>::Push, &stack, i * 10); producers2.emplace_back(&LockFreeStack<int>::Push, &stack, i * 20); consumers1.emplace_back(&LockFreeStack<int>::Pop, &stack); consumers2.emplace_back(&LockFreeStack<int>::Pop, &stack); } for (size_t i = 0; i < kThreadNum; ++i) { producers1[i].join(); consumers1[i].join(); producers2[i].join(); consumers2[i].join(); } producers1.clear(); producers1.shrink_to_fit(); producers2.clear(); producers2.shrink_to_fit(); consumers1.clear(); consumers1.shrink_to_fit(); consumers2.clear(); consumers2.shrink_to_fit();  // Case 3: multi-thread test. Producers and consumers are randomly distributed std::vector<std::thread> producers3; std::vector<std::thread> consumers3; for (size_t i = 0; i < kLargeThreadNum; ++i) { producers3.emplace_back(&LockFreeStack<int>::Push, &stack, i * 30); consumers3.emplace_back(&LockFreeStack<int>::Pop, &stack); } std::vector<int> random_numbers(kLargeThreadNum); std::mt19937 gen(std::random_device{}()); std::uniform_int_distribution<int> dis(0, 100000); auto rand_num_generator = [&gen, &dis]() mutable { return dis(gen); }; std::generate(random_numbers.begin(), random_numbers.end(), rand_num_generator); for (size_t i = 0; i < kLargeThreadNum; ++i) { if (random_numbers[i] % 2) { producers3[i].join(); consumers3[i].join(); } else { consumers3[i].join(); producers3[i].join(); } } consumers3.clear(); consumers3.shrink_to_fit(); consumers3.clear(); consumers3.shrink_to_fit();  return 0;}

CMake的编译配置文件CMakeLists.txt

cmake_minimum_required(VERSION 3.0.0)project(lock_free_stack VERSION 0.1.0)set(CMAKE_CXX_STANDARD 17) # If the debug option is not given, the program will not have debugging information.SET(CMAKE_BUILD_TYPE "Debug") add_executable(${PROJECT_NAME} ${PROJECT_NAME}.cpp) find_package(Threads REQUIRED)# libatomic should be linked to the program.# Otherwise, the following link errors occured:# /usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'# /usr/include/c++/9/atomic:292: undefined reference to `__atomic_compare_exchange_16'# target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT} atomic)target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT}) include(CTest)enable_testing()set(CPACK_PROJECT_NAME ${PROJECT_NAME})set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})include(CPack)

上述配置中添加了对原子库atomic的链接。因为引用计数的结构体CountedNodePtr包含两个数据成员(注:最初实现的版本未使用位域,需要添加对原子库atomic的链接。新版本使用位域,不再需要添加):int external_count; Node* ptr;,这两个变量占用16字节,而16字节的数据结构需要额外链接原子库atomic,否则会出现链接错误:

/usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'/usr/include/c++/9/atomic:292: undefined reference to `xchange_16'

VSCode调试启动配置文件.vscode/launch.json

{ "version": "0.2.0", "configurations": [ { "name": "cpp_gdb_launch", "type": "cppdbg", "request": "launch", "program": "${workspaceFolder}/build/${workspaceFolderBasename}", "args": [], "stopAtEntry": false, "cwd": "${fileDirname}", "environment": [], "externalConsole": false, "MIMode": "gdb", "setupCommands": [ { "description": "Enable neat printing for gdb", "text": "-enable-pretty-printing", "ignoreFailures": true } ], // "preLaunchTask": "cpp_build_task", "miDebuggerPath": "/usr/bin/gdb" } ]}

使用CMake的编译命令:

cd lock_free_stack# 只执行一次mkdir buildcd buildcmake .. && make

运行结果如下:

./lock_free_stack The data 0 is pushed in the stack.The data 1 is pushed in the stack.The data 2 is pushed in the stack.The data 3 is pushed in the stack.The data 4 is pushed in the stack.The data 5 is pushed in the stack.The data 6 is pushed in the stack.The data 7 is pushed in the stack.The data 8 is pushed in the stack.The data 9 is pushed in the stack.stack.IsEmpty() == falseCurrent data is : 9Current data is : 8Current data is : 7Current data is : 6Current data is : 5Current data is : 4Current data is : 3Current data is : 2Current data is : 1Current data is : 0

VSCode调试界面如下:


无锁队列(lock-free queue)

队列的挑战与栈的有些不同,因为Push()Pop()函数在队列中操作的不是同一个地方,同步的需求就不一样。需要保证对一端的修改是正确的,且对另一端是可见的。因此队列需要两个Node指针:head_tail_。这两个指针都是原子变量,从而可在不加锁的情形下,给多个线程同时访问。

在我们的实现中,如果head_tail_指针指向同一个节点(称之为哑节点,dummy node),则认为队列为空

首先来分析单生产者/单消费者的情形。

一、单生产者-单消费者模型下的无锁队列

单生产者/单消费者模型就是指,在某一时刻,最多只存在一个线程调用Push()函数,最多只存在一个线程调用Pop()函数。该情形下的代码(文件命名为lock_free_queue.h)如下:

#pragma once #include #include  template <typename T>class LockFreeQueue { public: LockFreeQueue() : head_(new Node), tail_(head_.load()) {} ~LockFreeQueue() { while (Node* old_head = head_.load()) { head_.store(old_head->next); delete old_head; } }  LockFreeQueue(const LockFreeQueue& other) = delete; LockFreeQueue& operator=(const LockFreeQueue& other) = delete;  bool IsEmpty() const { return head_.load() == tail_.load(); }  void Push(const T& data) { auto new_data = std::make_shared(data); Node* p = new Node; // 3 Node* old_tail = tail_.load(); // 4 old_tail->data.swap(new_data); // 5 old_tail->next = p; // 6 tail_.store(p); // 7 }  std::shared_ptrPop() { Node* old_head = PopHead(); if (old_head == nullptr) { return std::shared_ptr(); }  const std::shared_ptrres(old_head->data); // 2 delete old_head; return res; }  private: // If the struct definition of Node is placed in the private data member // field where 'head_' is defined, the following compilation error will occur: // // error: 'Node' has not been declared ... // // It should be a bug of the compiler. The struct definition of Node is put in // front of the private member function `DeleteNodes` to eliminate this error. struct Node { // std::make_shared does not throw an exception. Node() : data(nullptr), next(nullptr) {}  std::shared_ptrdata; Node* next; };  private: Node* PopHead() { Node* old_head = head_.load(); if (old_head == tail_.load()) { // 1 return nullptr; } head_.store(old_head->next); return old_head; }  private: std::atomic head_; std::atomic tail_;};

一眼望去,这个实现没什么毛病,当只有一个线程调用Push()和Pop()时,这种情况下队列一点毛病没有。Push()和Pop()之间的先行(happens-before )关系非常重要,直接关系到能否安全地获取到队列中的数据。对尾部节点tail_的存储⑦(对应于上述代码片段中的注释// 7,下同)同步(synchronizes with)于对tail_的加载①,存储之前节点的data指针⑤先行(happens-before )于存储tail_。并且,加载tail_先行于加载data指针②,所以对data的存储要先行于加载,一切都没问题。因此,这是一个完美的单生产者/单消费者(SPSC, single-producer, single-consume)队列。

问题在于当多线程对Push()和Pop()并发调用。先看一下Push():如果有两个线程并发调用Push(),会新分配两个节点作为虚拟节点③,也会读取到相同的tail_值④,因此也会同时修改同一个节点,同时设置data和next指针⑤⑥,存在明显的数据竞争!

PopHead()函数也有类似的问题。当有两个线程并发的调用这个函数时,这两个线程就会读取到同一个head_,并且会通过next指针去修改旧值。两个线程都能索引到同一个节点——真是一场灾难!不仅要保证只有一个Pop()线程可以访问给定项,还要保证其他线程在读取head_时,可以安全的访问节点中的next,这就是和无锁栈中Pop()一样的问题了。

Pop()的问题假设已解决,那么Push()呢?问题在于为了获取Push()和Pop()间的先行关系,就需要在为虚拟节点设置数据项前,更新tail_指针。并发访问Push()时,因为每个线程所读取到的是同一个tail_,所以线程会进行竞争。

说明

先行(happens-before )与同步(synchronizes with)是使用原子变量在线程间同步内存数据的两个重要关系。

Happens-before(先行)

Regardless of threads, evaluation A happens-before evaluation B if any of the following is true: 1) A is sequenced-before B; 2) A inter-thread happens before B. The implementation is required to ensure that the happens-before relation is acyclic, by introducing additional synchronization if necessary (it can only be necessary if a consume operation is involved). If one evaluation modifies a memory location, and the other reads or modifies the same memory location, and if at least one of the evaluations is not an atomic operation, the behavior of the program is undefined (the program has a data race) unless there exists a happens-before relationship between these two evaluations.

(无关乎线程,若下列任一为真,则求值 A 先行于求值 B :1) A 先序于 B;2) A 线程间先发生于 B。要求实现确保先发生于关系是非循环的,若有必要则引入额外的同步(若引入消费操作,它才可能为必要)。若一次求值修改一个内存位置,而其他求值读或修改同一内存位置,且至少一个求值不是原子操作,则程序的行为未定义(程序有数据竞争),除非这两个求值之间存在先行关系。)

Synchronizes with(同步)

If an atomic store in thread A is a release operation, an atomic load in thread B from the same variable is an acquire operation, and the load in thread B reads a value written by the store in thread A, then the store in thread A synchronizes-with the load in thread B. Also, some library calls may be defined to synchronize-with other library calls on other threads.

(如果在线程A上的一个原子存储是释放操作,在线程B上的对相同变量的一个原子加载是获得操作,且线程B上的加载读取由线程A上的存储写入的值,则线程A上的存储同步于线程B上的加载。此外,某些库调用也可能定义为同步于其它线程上的其它库调用。)

二、多生产者-多消费者模型下的无锁队列

2.1 不考虑放宽内存顺序

为了解决多个线程同时访问产生的数据竞争问题,可以让Node节点中的data指针原子化,通过“比较/交换”操作对其进行设置。如果“比较/交换”成功,就说明能获取tail_,并能够安全的对其next指针进行设置,也就是更新tail_。因为有其他线程对数据进行了存储,所以会导致“比较/交换”操作的失败,这时就要重新读取tail_,重新循环。如果原子操作对于std::shared_ptr<>是无锁的,那么就基本结束了。然而,目前在多数平台中std::shared_ptr<>不是无锁的,这就需要一个替代方案:让Pop()函数返回std::unique_ptr<>,并且将数据作为普通指针存储在队列中。这就需要队列支持存储std::atomic类型,对于compare_exchange_strong()的调用就很有必要了。使用类似于无锁栈中的引用计数模式,来解决多线程对Pop()Push()的访问。具体做法是:对每个节点使用两个引用计数:内部计数外部计数。两个值的总和就是对这个节点的引用数。外部记数与节点指针绑定在一起,节点指针每次被线程读到时,外部计数加1。当线程结束对节点的访问时,内部计数减1。当节点(内部包含节点指针和绑定在一起的外部计数)不被外部线程访问时,将内部计数外部计数-2相加并将结果重新赋值给内部计数,同时丢弃外部计数。一旦内部计数等于0,表明当前节点没有被外部线程访问,可安全地将节点删除。与无锁栈的区别是,队列中包含head_tail_两个节点,因此需要两个引用计数器来维护节点的内部计数,即使用std::atomiccounter 替换 std::atomicinternal_count(结构体NodeCounter的定义和说明见后文说明)。下面是示例代码(文件命名为 lock_free_queue.h,示例来源于C++ Concurrency In Action, 2ed 2019,修复了其中的bug):

#pragma once #include#include templateclass LockFreeQueue { public: LockFreeQueue() : head_(CountedNodePtr(new Node, 1)), tail_(head_.load()) {} ~LockFreeQueue();  // Copy construct and assignment, move construct and assignment are // prohibited. LockFreeQueue(const LockFreeQueue& other) = delete; LockFreeQueue& operator=(const LockFreeQueue& other) = delete; LockFreeQueue(LockFreeQueue&& other) = delete; LockFreeQueue& operator=(LockFreeQueue&& other) = delete;  bool IsEmpty() const { return head_.load().ptr == tail_.load().ptr; } bool IsLockFree() const { return std::atomic::is_always_lock_free; }  void Push(const T& data); std::unique_ptrPop();  private: // Forward class declaration struct Node;  struct CountedNodePtr { explicit CountedNodePtr(Node* input_ptr = nullptr, uint16_t input_external_count = 0) : ptr(reinterpret_cast(input_ptr)), external_count(input_external_count) {}  // We know that the platform has spare bits in a pointer (for example, // because the address space is only 48 bits but a pointer is 64 bits), we // can store the count inside the spare bits of the pointer to fit it all // back in a single machine word. Keeping the structure within a machine // word makes it more likely that the atomic operations can be lock-free on // many platforms. uint64_t ptr : 48; uint16_t external_count : 16; };  struct NodeCounter { NodeCounter() : internal_count(0), external_counters(0) {} NodeCounter(const uint32_t input_internal_count, const uint8_t input_external_counters) : internal_count(input_internal_count), external_counters(input_external_counters) {}  // external_counters occupies only 2 bits, where the maximum value stored // is 3. Note that we need only 2 bits for the external_counters because // there are at most two such counters. By using a bit field for this and // specifying internal_count as a 30-bit value, we keep the total counter // size to 32 bits. This gives us plenty of scope for large internal count // values while ensuring that the whole structure fits inside a machine word // on 32-bit and 64-bit machines. It’s important to update these counts // together as a single entity in order to avoid race conditions. Keeping // the structure within a machine word makes it more likely that the atomic // operations can be lock-free on many platforms. uint32_t internal_count : 30; uint8_t external_counters : 2; };  struct Node { // There are only two counters in Node (counter and next), so the initial // value of external_counters is 2. Node() : data(nullptr), counter(NodeCounter(0, 2)), next(CountedNodePtr()) {} ~Node(); void ReleaseRef();  std::atomic data; std::atomiccounter; std::atomicnext; };  private: static void IncreaseExternalCount(std::atomic* atomic_node, CountedNodePtr* old_node); static void FreeExternalCounter(CountedNodePtr* old_node); void SetNewTail(const CountedNodePtr& new_tail, CountedNodePtr* old_tail);  private: std::atomichead_; std::atomictail_;}; templateLockFreeQueue::Node::~Node() { if (data.load() != nullptr) { T* old_data = data.exchange(nullptr); if (old_data != nullptr) { delete old_data; } }} templatevoid LockFreeQueue::Node::ReleaseRef() { NodeCounter old_node = counter.load(); NodeCounter new_node;  // the whole count structure has to be updated atomically, even though we // only want to modify the internal_count field. This therefore requires a // compare/exchange loop. do { new_node = old_node; if (new_node.internal_count > 0) { --new_node.internal_count; } } while (!counter.compare_exchange_strong(old_node, new_node));  // Once we’ve decremented internal_count, if both the internal and // external counts are now zero, this is the last reference, so we can // delete the node safely. if (!new_node.internal_count && !new_node.external_counters) { delete this; }} templateLockFreeQueue::~LockFreeQueue() { while (Pop()) { // Do nothing }  // Delete the last dummy node. if (head_.load().ptr == tail_.load().ptr) { CountedNodePtr old_head = head_.load(); CountedNodePtr empty_head; auto* dumpy_ptr = reinterpret_cast(old_head.ptr); if (head_.compare_exchange_strong(old_head, empty_head)) { delete dumpy_ptr; } }} templatevoid LockFreeQueue::Push(const T& data) { auto new_data = std::make_unique(data); CountedNodePtr new_next(new Node, 1); CountedNodePtr old_tail = tail_.load();  while (true) { IncreaseExternalCount(&tail_, &old_tail);  T* old_data = nullptr; // We use compare_exchange_strong() to avoid looping. If the exchange // fails, we know that another thread has already set the next pointer, so // we don’t need the new node we allocated at the beginning, and we can // delete it. We also want to use the next value that the other thread set // for updating tail. if (reinterpret_cast(old_tail.ptr) ->data.compare_exchange_strong(old_data, new_data.get())) { CountedNodePtr old_next; if (!reinterpret_cast(old_tail.ptr) ->next.compare_exchange_strong(old_next, new_next)) { delete reinterpret_cast(new_next.ptr); new_next = old_next; } SetNewTail(new_next, &old_tail);  // Release the ownership of the managed object so that the data will not // be deleted beyond the scope the unique_ptr. new_data.release(); break; } else { // If the thread calling Push() failed to set the data pointer this time // through the loop, it can help the successful thread to complete the // update. First off, we try to update the next pointer to the new node // allocated on this thread. If this succeeds, we want to use the node // we allocated as the new tail node, and we need to allocate another // new node in anticipation of managing to push an item on the queue. We // can then try to set the tail node by calling SetNewTail before // looping around again. CountedNodePtr old_next; if (reinterpret_cast(old_tail.ptr) ->next.compare_exchange_strong(old_next, new_next)) { old_next = new_next; new_next.ptr = reinterpret_cast(new Node); } SetNewTail(old_next, &old_tail); } }} templatestd::unique_ptrLockFreeQueue::Pop() { // We prime the pump by loading the old_head value before we enter the loop, // and before we increase the external count on the loaded value. CountedNodePtr old_head = head_.load(); while (true) { IncreaseExternalCount(&head_, &old_head);  Node* ptr = reinterpret_cast(old_head.ptr); if (ptr == nullptr) { return std::unique_ptr(); } // If the head node is the same as the tail node, we can release the // reference and return a null pointer because there’s no data in the // queue. if (ptr == reinterpret_cast(tail_.load().ptr)) { ptr->ReleaseRef(); return std::unique_ptr(); }  // If there is data, we want to try to claim it and we do this with the // call to compare_exchange_strong(). It compares the external count and // pointer as a single entity; if either changes, we need to loop again, // after releasing the reference. CountedNodePtr next = ptr->next.load(); if (head_.compare_exchange_strong(old_head, next)) { // If the exchange succeeded, we’ve claimed the data in the node as // ours, so we can return that to the caller after we’ve released the // external counter to the popped node. T* res = ptr->data.exchange(nullptr); FreeExternalCounter(&old_head); return std::unique_ptr(res); }  // Once both the external reference counts have been freed and the // internal count has dropped to zero, the node itself can be deleted. ptr->ReleaseRef(); }} templatevoid LockFreeQueue::IncreaseExternalCount( std::atomic* atomic_node, CountedNodePtr* old_node) { CountedNodePtr new_node;  // If `*old_node` is equal to `*atomic_node`, it means that no other thread // changes the `*atomic_node`, update `*atomic_node` to `new_node`. In fact // the `*atomic_node` is still the original node, only the `external_count` // of it is increased by 1. If `*old_node` is not equal to `*atomic_node`, // it means that another thread has changed `*atomic_node`, update // `*old_node` to `*atomic_node`, and keep looping until there are no // threads changing `*atomic_node`. do { new_node = *old_node; ++new_node.external_count; } while (!atomic_node->compare_exchange_strong(*old_node, new_node));  old_node->external_count = new_node.external_count;} templatevoid LockFreeQueue::FreeExternalCounter(CountedNodePtr* old_node) { Node* ptr = reinterpret_cast(old_node->ptr); // It’s important to note that the value we add is two less than the // external count. We’ve removed the node from the list, so we drop one off // the count for that, and we’re no longer accessing the node from this // thread, so we drop another off the count for that. const int increased_count = old_node->external_count - 2; NodeCounter old_counter = ptr->counter.load(); NodeCounter new_counter;  // Update two counters using a single compare_exchange_strong() on the // whole count structure, as we did when decreasing the internal_count // in ReleaseRef(). // This has to be done as a single action (which therefore requires the // compare/exchange loop) to avoid a race condition. If they’re updated // separately, two threads may both think they are the last one and both // delete the node, resulting in undefined behavior. do { new_counter = old_counter; if (new_counter.external_counters > 0) { --new_counter.external_counters; }  int temp_count = new_counter.internal_count + increased_count; // `internal_count` is a non-negative number, and if a negative number is // assigned to it, the result is undefined. new_counter.internal_count = (temp_count > 0 ? temp_count : 0); } while (!ptr->counter.compare_exchange_strong(old_counter, new_counter));  // If both the values are now zero, there are no more references to the // node, so it can be safely deleted. if (!new_counter.internal_count && !new_counter.external_counters) { delete ptr; }} templatevoid LockFreeQueue::SetNewTail(const CountedNodePtr& new_tail, CountedNodePtr* old_tail) { // Use a compare_exchange_weak() loop to update the tail , because if other // threads are trying to push() a new node, the external_count part may have // changed, and we don’t want to lose it. Node* current_tail_ptr = reinterpret_cast(old_tail->ptr); // Due to the scheduling of the operating system, it may cause a false failure // of the `compare_exchange_weak` function, that is, the failure of // `compare_exchange_weak` function is not caused by other thread preemption, // but the operating system schedules the CPU to execute other instructions in // this thread. At this point, we need to compare `old_tail->ptr` and // `current_tail_ptr`. If they are equal, then the `compare_exchange_weak` // function is a false failure and the loop is continued; If they are // different, other threads have preempted and the loop is stopped. while (!tail_.compare_exchange_weak(*old_tail, new_tail) && reinterpret_cast(old_tail->ptr) == current_tail_ptr) { // Do nothing }  // We also need to take care that we don’t replace the value if another // thread has successfully changed it already; otherwise, we may end up with // loops in the queue, which would be a rather bad idea. Consequently, we // need to ensure that the ptr part of the loaded value is the same if the // compare/exchange fails. If the ptr is the same once the loop has exited, // then we must have successfully set the tail , so we need to free the old // external counter. If the ptr value is different, then another thread will // have freed the counter, so we need to release the single reference held // by this thread. if (reinterpret_cast(old_tail->ptr) == current_tail_ptr) { FreeExternalCounter(old_tail); } else { current_tail_ptr->ReleaseRef(); }}

无锁队列的类图如下:


Push操作流程图如下:


Pop操作流程图如下:


上述代码中,将非内联函数的定义放置于类外部,是让代码看得更加清爽(如下例所示):

template <typename T>void LockFreeQueue::Node::ReleaseRef() { // ...}

Push函数中,else分支代码起到负载均衡的作用。也就是说,如果当前线程更新原尾部节点指针的old_tail.ptr->data失败时,就会帮成功的线程更新尾部节点指针的old_tail.ptr ->next,并且正确设置新的尾部节点指针。通过该操作,可避免未抢到控制权的线程反复循环,导致效率低下。事实上,即使删除该段代码也没什么问题,只是在高CPU负载下性能会下降(如下所示):

template <typename T>void LockFreeQueue::Push(const T& data) { auto new_data = std::make_unique(data); CountedNodePtr new_next(new Node); new_next.external_count = 1; CountedNodePtr old_tail = tail_.load();  while (true) { IncreaseExternalCount(&tail_, &old_tail);  T* old_data = nullptr;  if (reinterpret_cast(old_tail.ptr) ->data.compare_exchange_strong(old_data, new_data.get())) { // ... } else { CountedNodePtr old_next = reinterpret_cast(old_tail.ptr)->next.load(); if (reinterpret_cast(old_tail.ptr) ->next.compare_exchange_strong(old_next, new_next)) { old_next = new_next; new_next.ptr = reinterpret_cast<uint64_t>(new Node); } SetNewTail(old_next, &old_tail); } }}

值得特别指出的是,带引用计数的节点指针结构体CountedNodePtr使用了位域的概念:

 struct CountedNodePtr { explicit CountedNodePtr(Node* input_ptr = nullptr) : ptr(reinterpret_cast(input_ptr)), external_count(0) {}  uint64_t ptr : 48; uint16_t external_count : 16; }; };

ptr的真实类型是Node*,但这里给出的却是占据48位内存空间的无符整型uint64_t。为什么要这么做?现在主流的操作系统和编译器只支持最多8字节数据类型的无锁操作,即std::atomic的成员函数is_lock_free只有在sizeof(CountedNodePtr) <= 8时才会返回true。因此,必须将CountedNodePtr的字节数控制8以内,于是我们想到了位域。在主流的操作系统中,指针占用的空间不会超过48位(如果超过该尺寸则必须重新设计位域大小,请查阅操作系统使用手册确认),为此将external_count分配16位(最大支持65535),ptr分配48位,合计64位(8字节)。此时,std::atomic的成员函数is_lock_free在主流操作系统中都会返回true,是真正的无锁原子变量。为了适应上述更改,必须使用reinterpret_cast(new_node.ptr)完成ptr从uint64_t到Node*类型的转换,使用reinterpret_cast(new Node(data)完成指针变量从ptr从Node*到uint64_t类型的转换,从而正常地存储于ptr中。注意:external_count的计数只自增,不自减。当没有线程访问节点时,直接丢弃external_count。

同样地,另一个节点计数器结构体NodeCounter也使用了位域的概念:

 struct NodeCounter { NodeCounter() : internal_count(0), external_counters(0) {} NodeCounter(const uint32_t input_internal_count, const uint8_t input_external_counters) : internal_count(input_internal_count), external_counters(input_external_counters) {}  uint32_t internal_count : 30; uint8_t external_counters : 2; };

理由也是让std::atomic成为真正的无锁原子变量。该结构体中,external_counters只占2位,最大支持的数值为3,因为队列中有head_和tail_两个节点,只需要两个引用计数器分别对其的引用计数,因此external_counters的最大值只需为2,占两位足够。internal_count分配30位(最大支持1073741823)。两个元素合计32位(4字节)。此时,std::atomic的成员函数is_lock_free在主流操作系统中都会返回true,是真正的无锁原子变量。当internal_count和external_counters同时为零时,表示当前的节点已无线程使用,可安全地删除以便回收内存。注意:internal_count的计数只自减,不自增,另外还与外部计数external_count - 2相加合并。

2.2 放宽内存顺序

根据内存关系顺序中最重要的先行(happens-before )与同步(synchronizes with)关系分析,给出放宽内存顺序的版本如下,不能保证所有平台都会正确:

#pragma once #include#include templateclass LockFreeQueue { public: LockFreeQueue() : head_(CountedNodePtr(new Node, 1)), tail_(head_.load(std::memory_order_relaxed)) {} ~LockFreeQueue();  // Copy construct and assignment, move construct and assignment are // prohibited. LockFreeQueue(const LockFreeQueue& other) = delete; LockFreeQueue& operator=(const LockFreeQueue& other) = delete; LockFreeQueue(LockFreeQueue&& other) = delete; LockFreeQueue& operator=(LockFreeQueue&& other) = delete;  bool IsEmpty() const { return head_.load(std::memory_order_relaxed).ptr == tail_.load(std::memory_order_relaxed).ptr; } bool IsLockFree() const { return std::atomic::is_always_lock_free; }  void Push(const T& data); std::unique_ptrPop();  private: // Forward class declaration struct Node;  struct CountedNodePtr { explicit CountedNodePtr(Node* input_ptr = nullptr, uint16_t input_external_count = 0) : ptr(reinterpret_cast(input_ptr)), external_count(input_external_count) {}  // We know that the platform has spare bits in a pointer (for example, // because the address space is only 48 bits but a pointer is 64 bits), we // can store the count inside the spare bits of the pointer to fit it all // back in a single machine word. Keeping the structure within a machine // word makes it more likely that the atomic operations can be lock-free on // many platforms. uint64_t ptr : 48; uint16_t external_count : 16; };  struct NodeCounter { NodeCounter() : internal_count(0), external_counters(0) {} NodeCounter(const uint32_t input_internal_count, const uint8_t input_external_counters) : internal_count(input_internal_count), external_counters(input_external_counters) {}  // external_counters occupies only 2 bits, where the maximum value stored // is 3. Note that we need only 2 bits for the external_counters because // there are at most two such counters. By using a bit field for this and // specifying internal_count as a 30-bit value, we keep the total counter // size to 32 bits. This gives us plenty of scope for large internal count // values while ensuring that the whole structure fits inside a machine word // on 32-bit and 64-bit machines. It’s important to update these counts // together as a single entity in order to avoid race conditions. Keeping // the structure within a machine word makes it more likely that the atomic // operations can be lock-free on many platforms. uint32_t internal_count : 30; uint8_t external_counters : 2; };  struct Node { // There are only two counters in Node (counter and next), so the initial // value of external_counters is 2. Node() : data(nullptr), counter(NodeCounter(0, 2)), next(CountedNodePtr()) {} ~Node(); void ReleaseRef();  std::atomic data; std::atomiccounter; std::atomicnext; };  private: static void IncreaseExternalCount(std::atomic* atomic_node, CountedNodePtr* old_node); static void FreeExternalCounter(CountedNodePtr* old_node); void SetNewTail(const CountedNodePtr& new_tail, CountedNodePtr* old_tail);  private: std::atomichead_; std::atomictail_;}; templateLockFreeQueue::Node::~Node() { if (data.load(std::memory_order_relaxed) != nullptr) { T* old_data = data.exchange(nullptr, std::memory_order_relaxed); if (old_data != nullptr) { delete old_data; } }} templatevoid LockFreeQueue::Node::ReleaseRef() { NodeCounter old_node = counter.load(std::memory_order_relaxed); NodeCounter new_node;  // the whole count structure has to be updated atomically, even though we // only want to modify the internal_count field. This therefore requires a // compare/exchange loop. do { new_node = old_node; if (new_node.internal_count > 0) { --new_node.internal_count; } } while (!counter.compare_exchange_strong(old_node, new_node, std::memory_order_acquire, std::memory_order_relaxed));  // Once we’ve decremented internal_count, if both the internal and // external counts are now zero, this is the last reference, so we can // delete the node safely. if (!new_node.internal_count && !new_node.external_counters) { delete this; }} templateLockFreeQueue::~LockFreeQueue() { while (Pop()) { // Do nothing }  // Delete the last dummy node. if (head_.load(std::memory_order_relaxed).ptr == tail_.load(std::memory_order_relaxed).ptr) { CountedNodePtr old_head = head_.load(std::memory_order_relaxed); CountedNodePtr empty_head; auto* dumpy_ptr = reinterpret_cast(old_head.ptr); if (head_.compare_exchange_strong(old_head, empty_head, std::memory_order_acq_rel, std::memory_order_relaxed)) { delete dumpy_ptr; } }} templatevoid LockFreeQueue::Push(const T& data) { auto new_data = std::make_unique(data); CountedNodePtr new_next(new Node, 1); CountedNodePtr old_tail = tail_.load(std::memory_order_relaxed);  while (true) { IncreaseExternalCount(&tail_, &old_tail);  T* old_data = nullptr; // We use compare_exchange_strong() to avoid looping. If the exchange // fails, we know that another thread has already set the next pointer, so // we don’t need the new node we allocated at the beginning, and we can // delete it. We also want to use the next value that the other thread set // for updating tail. if (reinterpret_cast(old_tail.ptr) ->data.compare_exchange_strong(old_data, new_data.get(), std::memory_order_release, std::memory_order_relaxed)) { CountedNodePtr old_next; if (!reinterpret_cast(old_tail.ptr) ->next.compare_exchange_strong(old_next, new_next, std::memory_order_acq_rel, std::memory_order_relaxed)) { delete reinterpret_cast(new_next.ptr); new_next = old_next; } SetNewTail(new_next, &old_tail);  // Release the ownership of the managed object so that the data will not // be deleted beyond the scope the unique_ptr. new_data.release(); break; } else { // If the thread calling Push() failed to set the data pointer this time // through the loop, it can help the successful thread to complete the // update. First off, we try to update the next pointer to the new node // allocated on this thread. If this succeeds, we want to use the node // we allocated as the new tail node, and we need to allocate another // new node in anticipation of managing to push an item on the queue. We // can then try to set the tail node by calling SetNewTail before // looping around again. CountedNodePtr old_next; if (reinterpret_cast(old_tail.ptr) ->next.compare_exchange_strong(old_next, new_next, std::memory_order_acq_rel, std::memory_order_relaxed)) { old_next = new_next; new_next.ptr = reinterpret_cast(new Node); } SetNewTail(old_next, &old_tail); } }} templatestd::unique_ptrLockFreeQueue::Pop() { // We prime the pump by loading the old_head value before we enter the loop, // and before we increase the external count on the loaded value. CountedNodePtr old_head = head_.load(std::memory_order_relaxed); while (true) { IncreaseExternalCount(&head_, &old_head);  Node* ptr = reinterpret_cast(old_head.ptr); if (ptr == nullptr) { return std::unique_ptr(); } // If the head node is the same as the tail node, we can release the // reference and return a null pointer because there’s no data in the // queue. if (ptr == reinterpret_cast(tail_.load(std::memory_order_acquire).ptr)) { ptr->ReleaseRef(); return std::unique_ptr(); }  // If there is data, we want to try to claim it and we do this with the // call to compare_exchange_strong(). It compares the external count and // pointer as a single entity; if either changes, we need to loop again, // after releasing the reference. CountedNodePtr next = ptr->next.load(std::memory_order_relaxed); if (head_.compare_exchange_strong(old_head, next, std::memory_order_acquire, std::memory_order_relaxed)) { // If the exchange succeeded, we’ve claimed the data in the node as // ours, so we can return that to the caller after we’ve released the // external counter to the popped node. T* res = ptr->data.exchange(nullptr, std::memory_order_acquire); FreeExternalCounter(&old_head); return std::unique_ptr(res); }  // Once both the external reference counts have been freed and the // internal count has dropped to zero, the node itself can be deleted. ptr->ReleaseRef(); }} templatevoid LockFreeQueue::IncreaseExternalCount( std::atomic* atomic_node, CountedNodePtr* old_node) { CountedNodePtr new_node;  // If `*old_node` is equal to `*atomic_node`, it means that no other thread // changes the `*atomic_node`, update `*atomic_node` to `new_node`. In fact // the `*atomic_node` is still the original node, only the `external_count` // of it is increased by 1. If `*old_node` is not equal to `*atomic_node`, // it means that another thread has changed `*atomic_node`, update // `*old_node` to `*atomic_node`, and keep looping until there are no // threads changing `*atomic_node`. do { new_node = *old_node; ++new_node.external_count; } while (!atomic_node->compare_exchange_strong(*old_node, new_node, std::memory_order_acq_rel, std::memory_order_relaxed));  old_node->external_count = new_node.external_count;} templatevoid LockFreeQueue::FreeExternalCounter(CountedNodePtr* old_node) { Node* ptr = reinterpret_cast(old_node->ptr); // It’s important to note that the value we add is two less than the // external count. We’ve removed the node from the list, so we drop one off // the count for that, and we’re no longer accessing the node from this // thread, so we drop another off the count for that. const int increased_count = old_node->external_count - 2; NodeCounter old_counter = ptr->counter.load(std::memory_order_relaxed); NodeCounter new_counter;  // Update two counters using a single compare_exchange_strong() on the // whole count structure, as we did when decreasing the internal_count // in ReleaseRef(). // This has to be done as a single action (which therefore requires the // compare/exchange loop) to avoid a race condition. If they’re updated // separately, two threads may both think they are the last one and both // delete the node, resulting in undefined behavior. do { new_counter = old_counter; if (new_counter.external_counters > 0) { --new_counter.external_counters; }  int temp_count = new_counter.internal_count + increased_count; // `internal_count` is a non-negative number, and if a negative number is // assigned to it, the result is undefined. new_counter.internal_count = (temp_count > 0 ? temp_count : 0); } while (!ptr->counter.compare_exchange_strong(old_counter, new_counter, std::memory_order_release, std::memory_order_relaxed));  // If both the values are now zero, there are no more references to the // node, so it can be safely deleted. if (!new_counter.internal_count && !new_counter.external_counters) { delete ptr; }} templatevoid LockFreeQueue::SetNewTail(const CountedNodePtr& new_tail, CountedNodePtr* old_tail) { // Use a compare_exchange_weak() loop to update the tail , because if other // threads are trying to push() a new node, the external_count part may have // changed, and we don’t want to lose it. Node* current_tail_ptr = reinterpret_cast(old_tail->ptr); // Due to the scheduling of the operating system, it may cause a false failure // of the `compare_exchange_weak` function, that is, the failure of // `compare_exchange_weak` function is not caused by other thread preemption, // but the operating system schedules the CPU to execute other instructions in // this thread. At this point, we need to compare `old_tail->ptr` and // `current_tail_ptr`. If they are equal, then the `compare_exchange_weak` // function is a false failure and the loop is continued; If they are // different, other threads have preempted and the loop is stopped. while (!tail_.compare_exchange_weak(*old_tail, new_tail, std::memory_order_release, std::memory_order_relaxed) && reinterpret_cast(old_tail->ptr) == current_tail_ptr) { // Do nothing }  // We also need to take care that we don’t replace the value if another // thread has successfully changed it already; otherwise, we may end up with // loops in the queue, which would be a rather bad idea. Consequently, we // need to ensure that the ptr part of the loaded value is the same if the // compare/exchange fails. If the ptr is the same once the loop has exited, // then we must have successfully set the tail , so we need to free the old // external counter. If the ptr value is different, then another thread will // have freed the counter, so we need to release the single reference held // by this thread. if (reinterpret_cast(old_tail->ptr) == current_tail_ptr) { FreeExternalCounter(old_tail); } else { current_tail_ptr->ReleaseRef(); }}

下图给出其中一个分析内存顺序的参考示意:


三、测试代码

下面给出测试无锁队列工作是否正常的简单测试代码(文件命名为:lock_free_queue.cpp):

#include "lock_free_queue.h" #include #include #include #include #include  namespace {constexpr size_t kElementNum = 10;constexpr size_t kThreadNum = 200;constexpr size_t kLargeThreadNum = 2000;} // namespace int main() { LockFreeQueue<int> queue;  // Case 1: Single thread test for (size_t i = 0; i < kElementNum; ++i) { std::cout << "The data " << i << " is pushed in the queue.\n"; queue.Push(i); } std::cout << "queue.IsEmpty() == " << std::boolalpha << queue.IsEmpty() << std::endl; while (auto data = queue.Pop()) { std::cout << "Current data is : " << *data << '\n'; }  // Case 2: multi-thread test. Producers and consumers are evenly distributed std::vector<std::thread> producers1; std::vector<std::thread> producers2; std::vector<std::thread> consumers1; std::vector<std::thread> consumers2; for (size_t i = 0; i < kThreadNum; ++i) { producers1.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 10); producers2.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 20); consumers1.emplace_back(&LockFreeQueue<int>::Pop, &queue); consumers2.emplace_back(&LockFreeQueue<int>::Pop, &queue); } for (size_t i = 0; i < kThreadNum; ++i) { producers1[i].join(); consumers1[i].join(); producers2[i].join(); consumers2[i].join(); } producers1.clear(); producers1.shrink_to_fit(); producers2.clear(); producers2.shrink_to_fit(); consumers1.clear(); consumers1.shrink_to_fit(); consumers2.clear(); consumers2.shrink_to_fit();  // Case 3: multi-thread test. Producers and consumers are randomly distributed std::vector<std::thread> producers3; std::vector<std::thread> consumers3; for (size_t i = 0; i < kLargeThreadNum; ++i) { producers3.emplace_back(&LockFreeQueue<int>::Push, &queue, i * 30); consumers3.emplace_back(&LockFreeQueue<int>::Pop, &queue); } std::vector<int> random_numbers(kLargeThreadNum); std::mt19937 gen(std::random_device{}()); std::uniform_int_distribution<int> dis(0, 100000); auto rand_num_generator = [&gen, &dis]() mutable { return dis(gen); }; std::generate(random_numbers.begin(), random_numbers.end(), rand_num_generator); for (size_t i = 0; i < kLargeThreadNum; ++i) { if (random_numbers[i] % 2) { producers3[i].join(); consumers3[i].join(); } else { consumers3[i].join(); producers3[i].join(); } } consumers3.clear(); consumers3.shrink_to_fit(); consumers3.clear(); consumers3.shrink_to_fit();  return 0;}

CMake的编译配置文件CMakeLists.txt

cmake_minimum_required(VERSION 3.0.0)project(lock_free_queue VERSION 0.1.0)set(CMAKE_CXX_STANDARD 17) # If the debug option is not given, the program will not have debugging information.SET(CMAKE_BUILD_TYPE "Debug") # Check for memory leaksSET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=leak -fsanitize=address") add_executable(${PROJECT_NAME} ${PROJECT_NAME}.cpp) find_package(Threads REQUIRED)# libatomic should be linked to the program.# Otherwise, the following link errors occured:# /usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'# /usr/include/c++/9/atomic:292: undefined reference to `__atomic_compare_exchange_16'# target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT} atomic)target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT}) include(CTest)enable_testing()set(CPACK_PROJECT_NAME ${PROJECT_NAME})set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})include(CPack)

上述配置中,SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=leak -fsanitize=address")用于检测程序是否存在内存泄漏。另外添加了对原子库atomic的链接。因为引用计数的结构体CountedNodePtr包含两个数据成员(注:最初实现的版本未使用位域,需要添加对原子库atomic的链接。新版本使用位域,不再需要添加):int external_count; Node* ptr;,这两个变量占用16字节,而16字节的数据结构需要额外链接原子库atomic,否则会出现链接错误:

/usr/include/c++/9/atomic:254: undefined reference to `__atomic_load_16'/usr/include/c++/9/atomic:292: undefined reference to `ge_16'

VSCode调试启动配置文件.vscode/launch.json

{ "version": "0.2.0", "configurations": [ { "name": "cpp_gdb_launch", "type": "cppdbg", "request": "launch", "program": "${workspaceFolder}/build/${workspaceFolderBasename}", "args": [], "stopAtEntry": false, "cwd": "${fileDirname}", "environment": [], "externalConsole": false, "MIMode": "gdb", "setupCommands": [ { "description": "Enable neat printing for gdb", "text": "-enable-pretty-printing", "ignoreFailures": true } ], // "preLaunchTask": "cpp_build_task", "miDebuggerPath": "/usr/bin/gdb" } ]}

使用CMake的编译命令:

cd lock_free_queue# 只执行一次mkdir buildcd buildcmake .. && make

运行结果如下:

./lock_free_queue The data 0 is pushed in the queue.The data 1 is pushed in the queue.The data 2 is pushed in the queue.The data 3 is pushed in the queue.The data 4 is pushed in the queue.The data 5 is pushed in the queue.The data 6 is pushed in the queue.The data 7 is pushed in the queue.The data 8 is pushed in the queue.The data 9 is pushed in the queue.queue.IsEmpty() == falseCurrent data is : 0Current data is : 1Current data is : 2Current data is : 3Current data is : 4Current data is : 5Current data is : 6Current data is : 7Current data is : 8Current data is : 9

VSCode调试界面如下:



声明: 本文转载自其它媒体或授权刊载,目的在于信息传递,并不代表本站赞同其观点和对其真实性负责,如有新闻稿件和图片作品的内容、版权以及其它问题的,请联系我们及时删除。(联系我们,邮箱:evan.li@aspencore.com )
0
评论
  • 相关技术文库
  • 电源
  • DC
  • AC
  • 稳压
  • 开关电源故障及检修方法

    开关电源是各种电子设备必不可缺的组成部分,其性能优劣直接关系到电子设备的技术指标及能否安全可靠地工作。由于深圳开关电源内部关键元器件工作在高频开关状态,功耗小,转化率高,且体积和重量只有线性电源的20%...

    前天
  • 开关电源设计、布局有什么技巧?

    针对开关电源很多人觉得很难,其实不然。设计一款开关电源并不难,难就难在做精,等你真正入门了,积累一定的经验,再采用分立的结构进行设计就简单多了。万事开头难,笔者在这就抛砖引玉,慢慢讲解如何一步一步设...

    前天
  • 开关电源变压器

    开关电源的原理就是将工频交流变成直流,再将直流变换成高频交流,通过开关变压器,反馈稳压等过程变成你所需要的电压的后,通过整流,滤波,再变换成直流的过程,而MOSFET在整个过程中通过其不断的开与关,使高压...

    前天
  • 什么是火线、地线和零线

    一、什么是火线、地线和零线 1、从颜色进行区分 火线一般是红色的线,淡蓝色的线是零线,而黄绿相间的双色线则是地线。 2、用测电笔区分 用测电笔去测定,测电笔会发光的则是火线,而不会发光变亮的则是零线 3、用...

    前天
  • 开关电源适配器的优缺点

    电源适配器(Power adapter)是小型便携式电子设备及电子电器的供电电源变换设备,一般由外壳、变压器、电感、电容、控制IC、PCB板等元器件组成,它的工作原理由交流输入转换为直流输出;按连接方式可分为插墙式和桌面...

    前天
  • 软包锂电池与锂电池包及铝壳锂电池的基本概念

    手机锂电池、手机锂电池保护板、手机锂电池的保养方法,这都是小编在上篇文章中提及的。为增进大家对锂电池的认识,本文将对软包锂电池、锂电池包以及铝壳锂电池进行详细介绍。如果你对锂电池具有兴趣,不妨继续往...

    02-26
  • 什么是断路器?断路器的工作原理

    断路器是机械器件之一,对于断路器,可能大家并未常有所耳闻。所以,本着为大家介绍断路器的目的,本文将对断路器的基本知识以及断路器的工作原理予以阐述。如果你对断路器的相关知识具有兴趣,不妨和小编继续往下...

    02-26
  • BLDC电机控制算法

    ...

    02-20
  • 如何实现电机驱动中Σ-Δ ADC的最佳性能?

    作者:Jens SorensenƩ-Δ 型模数转换器广泛用于需要高信号完整度和电气隔离的电机驱动应用。虽然Σ-Δ技术本身已广为人知,但转换器使用常常存在不足,无法释放这种技术的全部潜力。本文从应用角度考察Σ-Δ AD...

    02-20
  • 电动机的温升和绝缘

    1.电动机的发热过程电动机在运行过程中, 由于总损耗转换的热量不断产生, 电动机温度升高, 就有了温升, 电动机就要向周围散热。温升越高,散热越快。当单位时间发出的热量等于散出的热量时, 电动机温度不再...

    02-20
  • 电动机定子绕组故障检修

    来源:网络1绕组受潮、绝缘电阻偏低原因:电动机长期停用或贮存,受周围的潮湿空气、雨水、腐蚀性气体及油污等侵入,使绕组表面吸附一层导电物质,导致绝缘电阻降低。修理方法:干燥处理,然后进行一次侵...

    02-20
下载排行榜
更多
评测报告
更多
广告