bounded_queue.h 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #pragma once
  2. #include <util/generic/yexception.h>
  3. //https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  4. namespace NThreading {
  5. template<typename T>
  6. class TBoundedQueue {
  7. public:
  8. explicit TBoundedQueue(size_t size)
  9. : Buffer_(new TCell[size])
  10. , Mask_(size - 1)
  11. {
  12. Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
  13. for (size_t i = 0; i < size; ++i) {
  14. Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
  15. }
  16. }
  17. template <typename T_>
  18. [[nodiscard]] bool Enqueue(T_&& data) noexcept {
  19. TCell* cell;
  20. size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
  21. for (;;) {
  22. cell = &Buffer_[pos & Mask_];
  23. size_t seq = cell->Sequence.load(std::memory_order_acquire);
  24. intptr_t dif = (intptr_t)seq - (intptr_t)pos;
  25. if (dif == 0) {
  26. if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
  27. break;
  28. }
  29. } else if (dif < 0) {
  30. return false;
  31. } else {
  32. pos = EnqueuePos_.load(std::memory_order_relaxed);
  33. }
  34. }
  35. static_assert(noexcept(cell->Data = std::forward<T_>(data)));
  36. cell->Data = std::forward<T_>(data);
  37. cell->Sequence.store(pos + 1, std::memory_order_release);
  38. return true;
  39. }
  40. [[nodiscard]] bool Dequeue(T& data) noexcept {
  41. TCell* cell;
  42. size_t pos = DequeuePos_.load(std::memory_order_relaxed);
  43. for (;;) {
  44. cell = &Buffer_[pos & Mask_];
  45. size_t seq = cell->Sequence.load(std::memory_order_acquire);
  46. intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
  47. if (dif == 0) {
  48. if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
  49. break;
  50. }
  51. } else if (dif < 0) {
  52. return false;
  53. } else {
  54. pos = DequeuePos_.load(std::memory_order_relaxed);
  55. }
  56. }
  57. static_assert(noexcept(data = std::move(cell->Data)));
  58. data = std::move(cell->Data);
  59. cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
  60. return true;
  61. }
  62. private:
  63. struct TCell {
  64. std::atomic<size_t> Sequence;
  65. T Data;
  66. };
  67. std::unique_ptr<TCell[]> Buffer_;
  68. const size_t Mask_;
  69. alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
  70. alignas(64) std::atomic<size_t> DequeuePos_ = 0;
  71. };
  72. }