mpsc_htswap.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #pragma once
  2. /*
  3. http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
  4. Simple semi-wait-free queue. Many producers - one consumer.
  5. Tracking of allocated memory is not required.
  6. No CAS. Only atomic swap (exchange) operations.
  7. WARNING: a sleeping producer can stop progress for consumer.
  8. WARNING: there is no wait&notify mechanic for consumer,
  9. consumer receives nullptr if queue was empty.
  10. WARNING: the algorithm itself is lock-free
  11. but producers and consumer could be blocked by memory allocator
  12. Reference design: rtmapreduce/libs/threading/lfqueue.h
  13. */
  14. #include <util/generic/noncopyable.h>
  15. #include <util/system/types.h>
  16. #include <library/cpp/deprecated/atomic/atomic.h>
  17. #include "tune.h"
  18. namespace NThreading {
  19. namespace NHTSwapPrivate {
  20. template <typename T, typename TTuneup>
  21. struct TNode
  22. : public TTuneup::TNodeBase,
  23. public TTuneup::template TNodeLayout<TNode<T, TTuneup>, T> {
  24. TNode(const T& item) {
  25. this->Next = nullptr;
  26. this->Item = item;
  27. }
  28. TNode(T&& item) {
  29. this->Next = nullptr;
  30. this->Item = std::move(item);
  31. }
  32. };
  33. struct TDefaultTuneup {
  34. struct TNodeBase: private TNonCopyable {
  35. };
  36. template <typename TNode, typename T>
  37. struct TNodeLayout {
  38. TNode* Next;
  39. T Item;
  40. };
  41. template <typename TNode>
  42. struct TQueueLayout {
  43. TNode* Head;
  44. TNode* Tail;
  45. };
  46. };
  47. template <typename T, typename TTuneup>
  48. class THTSwapQueueImpl
  49. : protected TTuneup::template TQueueLayout<TNode<T, TTuneup>> {
  50. protected:
  51. using TTunedNode = TNode<T, TTuneup>;
  52. public:
  53. using TItem = T;
  54. THTSwapQueueImpl() {
  55. this->Head = new TTunedNode(T());
  56. this->Tail = this->Head;
  57. }
  58. ~THTSwapQueueImpl() {
  59. TTunedNode* node = this->Head;
  60. while (node != nullptr) {
  61. TTunedNode* next = node->Next;
  62. delete node;
  63. node = next;
  64. }
  65. }
  66. template <typename TT>
  67. void Push(TT&& item) {
  68. Enqueue(new TTunedNode(std::forward<TT>(item)));
  69. }
  70. T Peek() {
  71. TTunedNode* next = AtomicGet(this->Head->Next);
  72. if (next == nullptr) {
  73. return T();
  74. }
  75. return next->Item;
  76. }
  77. void Enqueue(TTunedNode* node) {
  78. // our goal is to avoid expensive CAS here,
  79. // but now consumer will be blocked until new tail linked.
  80. // fortunately 'window of inconsistency' is extremely small.
  81. TTunedNode* prev = AtomicSwap(&this->Tail, node);
  82. AtomicSet(prev->Next, node);
  83. }
  84. T Pop() {
  85. TTunedNode* next = AtomicGet(this->Head->Next);
  86. if (next == nullptr) {
  87. return nullptr;
  88. }
  89. auto item = std::move(next->Item);
  90. std::swap(this->Head, next); // no need atomic here
  91. delete next;
  92. return item;
  93. }
  94. bool IsEmpty() const {
  95. TTunedNode* next = AtomicGet(this->Head->Next);
  96. return (next == nullptr);
  97. }
  98. };
  99. }
  100. DeclareTuneTypeParam(THTSwapNodeBase, TNodeBase);
  101. DeclareTuneTypeParam(THTSwapNodeLayout, TNodeLayout);
  102. DeclareTuneTypeParam(THTSwapQueueLayout, TQueueLayout);
  103. template <typename T = void*, typename... TParams>
  104. class THTSwapQueue
  105. : public NHTSwapPrivate::THTSwapQueueImpl<T,
  106. TTune<NHTSwapPrivate::TDefaultTuneup, TParams...>> {
  107. };
  108. }