wfmo.h 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #pragma once
  2. #include "lfqueue.h"
  3. #include <library/cpp/threading/atomic/bool.h>
  4. #include <util/generic/vector.h>
  5. #include <util/generic/scope.h>
  6. #include <library/cpp/deprecated/atomic/atomic.h>
  7. #include <library/cpp/deprecated/atomic/atomic_ops.h>
  8. #include <util/system/event.h>
  9. #include <util/system/spinlock.h>
  10. namespace NNeh {
  11. template <class T>
  12. class TBlockedQueue: public TLockFreeQueue<T>, public TSystemEvent {
  13. public:
  14. inline TBlockedQueue() noexcept
  15. : TSystemEvent(TSystemEvent::rAuto)
  16. {
  17. }
  18. inline void Notify(T t) noexcept {
  19. this->Enqueue(t);
  20. Signal();
  21. }
  22. };
  23. class TWaitQueue: public TThrRefBase {
  24. public:
  25. class TWaitHandle {
  26. public:
  27. ~TWaitHandle() {
  28. SwapWaitQueue(nullptr);
  29. }
  30. void Signal() noexcept {
  31. Signalled_ = true;
  32. if (TIntrusivePtr<TWaitQueue> q = SwapWaitQueue(nullptr)) {
  33. q->Notify(this);
  34. }
  35. }
  36. void Register(TIntrusivePtr<TWaitQueue>& waitQueue) noexcept {
  37. if (Signalled_) {
  38. waitQueue->Notify(this);
  39. SwapWaitQueue(nullptr);
  40. return;
  41. }
  42. waitQueue->Ref();
  43. SwapWaitQueue(waitQueue.Get());
  44. if (Signalled_) {
  45. if (TIntrusivePtr<TWaitQueue> q = SwapWaitQueue(nullptr)) {
  46. q->Notify(this);
  47. }
  48. }
  49. }
  50. bool Signalled() const {
  51. return Signalled_;
  52. }
  53. void ResetState() {
  54. Signalled_ = false;
  55. SwapWaitQueue(nullptr);
  56. }
  57. private:
  58. TIntrusivePtr<TWaitQueue> SwapWaitQueue(TWaitQueue* newQueue) noexcept {
  59. return TIntrusivePtr<TWaitQueue>(AtomicSwap(&WaitQueue_, newQueue), TIntrusivePtr<TWaitQueue>::TNoIncrement());
  60. }
  61. private:
  62. NAtomic::TBool Signalled_ = false;
  63. TWaitQueue* WaitQueue_ = nullptr;
  64. };
  65. inline bool Wait(const TInstant& deadLine) noexcept {
  66. return Q_.WaitD(deadLine);
  67. }
  68. inline void Notify(TWaitHandle* wq) noexcept {
  69. Q_.Notify(wq);
  70. }
  71. inline bool Dequeue(TWaitHandle** wq) noexcept {
  72. return Q_.Dequeue(wq);
  73. }
  74. private:
  75. TBlockedQueue<TWaitHandle*> Q_;
  76. };
  77. typedef TWaitQueue::TWaitHandle TWaitHandle;
  78. template <class T>
  79. static inline void WaitForMultipleObj(TWaitQueue& hndl, const TInstant& deadLine, T& func) {
  80. do {
  81. TWaitHandle* ret = nullptr;
  82. if (hndl.Dequeue(&ret)) {
  83. do {
  84. func(ret);
  85. } while (hndl.Dequeue(&ret));
  86. return;
  87. }
  88. } while (hndl.Wait(deadLine));
  89. }
  90. struct TSignalled {
  91. inline TSignalled()
  92. : Signalled(false)
  93. {
  94. }
  95. inline void operator()(const TWaitHandle*) noexcept {
  96. Signalled = true;
  97. }
  98. bool Signalled;
  99. };
  100. static inline bool WaitForOne(TWaitHandle& wh, const TInstant& deadLine) {
  101. TSignalled func;
  102. auto hndl = MakeIntrusive<TWaitQueue>();
  103. wh.Register(hndl);
  104. WaitForMultipleObj(*hndl, deadLine, func);
  105. return func.Signalled;
  106. }
  107. }