mpsc_intrusive_unordered.cpp 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. #include "mpsc_intrusive_unordered.h"
  2. #include <library/cpp/deprecated/atomic/atomic.h>
  3. namespace NThreading {
  4. void TMPSCIntrusiveUnordered::Push(TIntrusiveNode* node) noexcept {
  5. auto head = AtomicGet(HeadForCaS);
  6. for (ui32 i = NUMBER_OF_TRIES_FOR_CAS; i-- > 0;) {
  7. // no ABA here, because Next is exactly head
  8. // it does not matter how many travels head was made/
  9. node->Next = head;
  10. auto prev = AtomicGetAndCas(&HeadForCaS, node, head);
  11. if (head == prev) {
  12. return;
  13. }
  14. head = prev;
  15. }
  16. // boring of trying to do cas, let's just swap
  17. // no need for atomic here, because the next is atomic swap
  18. node->Next = 0;
  19. head = AtomicSwap(&HeadForSwap, node);
  20. if (head != nullptr) {
  21. AtomicSet(node->Next, head);
  22. } else {
  23. // consumer must know if no other thread may access the memory,
  24. // setting Next to node is a way to notify consumer
  25. AtomicSet(node->Next, node);
  26. }
  27. }
  28. TIntrusiveNode* TMPSCIntrusiveUnordered::PopMany() noexcept {
  29. if (NotReadyChain == nullptr) {
  30. auto head = AtomicSwap(&HeadForSwap, nullptr);
  31. NotReadyChain = head;
  32. }
  33. if (NotReadyChain != nullptr) {
  34. auto next = AtomicGet(NotReadyChain->Next);
  35. if (next != nullptr) {
  36. auto ready = NotReadyChain;
  37. TIntrusiveNode* cut;
  38. do {
  39. cut = NotReadyChain;
  40. NotReadyChain = next;
  41. next = AtomicGet(NotReadyChain->Next);
  42. if (next == NotReadyChain) {
  43. cut = NotReadyChain;
  44. NotReadyChain = nullptr;
  45. break;
  46. }
  47. } while (next != nullptr);
  48. cut->Next = nullptr;
  49. return ready;
  50. }
  51. }
  52. if (AtomicGet(HeadForCaS) != nullptr) {
  53. return AtomicSwap(&HeadForCaS, nullptr);
  54. }
  55. return nullptr;
  56. }
  57. TIntrusiveNode* TMPSCIntrusiveUnordered::Pop() noexcept {
  58. if (PopOneQueue != nullptr) {
  59. auto head = PopOneQueue;
  60. PopOneQueue = PopOneQueue->Next;
  61. return head;
  62. }
  63. PopOneQueue = PopMany();
  64. if (PopOneQueue != nullptr) {
  65. auto head = PopOneQueue;
  66. PopOneQueue = PopOneQueue->Next;
  67. return head;
  68. }
  69. return nullptr;
  70. }
  71. }