mpmc_unordered_ring.cpp 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. #include "mpmc_unordered_ring.h"
  2. namespace NThreading {
  3. TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {
  4. Y_ABORT_UNLESS(size > 0);
  5. RingSize = size;
  6. RingBuffer.Reset(new void*[size]);
  7. memset(&RingBuffer[0], 0, sizeof(void*) * size);
  8. }
  9. bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {
  10. if (retryCount == 0) {
  11. StubbornPush(msg);
  12. return true;
  13. }
  14. for (ui16 itry = retryCount; itry-- > 0;) {
  15. if (WeakPush(msg)) {
  16. return true;
  17. }
  18. }
  19. return false;
  20. }
  21. bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {
  22. auto pawl = AtomicIncrement(WritePawl);
  23. if (pawl - AtomicGet(ReadFront) >= RingSize) {
  24. // Queue is full
  25. AtomicDecrement(WritePawl);
  26. return false;
  27. }
  28. auto writeSlot = AtomicGetAndIncrement(WriteFront);
  29. if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {
  30. return true;
  31. }
  32. // slot is occupied for some reason, retry
  33. return false;
  34. }
  35. void* TMPMCUnorderedRing::Pop() noexcept {
  36. ui64 readSlot;
  37. for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {
  38. auto pawl = AtomicIncrement(ReadPawl);
  39. if (pawl > AtomicGet(WriteFront)) {
  40. // Queue is empty
  41. AtomicDecrement(ReadPawl);
  42. return nullptr;
  43. }
  44. readSlot = AtomicGetAndIncrement(ReadFront);
  45. auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);
  46. if (msg != nullptr) {
  47. return msg;
  48. }
  49. }
  50. /* got no message in the slot, let's try to rollback readfront */
  51. AtomicCas(&ReadFront, readSlot - 1, readSlot);
  52. return nullptr;
  53. }
  54. void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {
  55. for (; *last < RingSize;) {
  56. auto msg = AtomicSwap(&RingBuffer[*last], nullptr);
  57. ++*last;
  58. if (msg != nullptr) {
  59. return msg;
  60. }
  61. }
  62. return nullptr;
  63. }
  64. }