mux_event.h 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #pragma once
  2. #include <iterator>
  3. #include <util/datetime/base.h>
  4. #include <library/cpp/deprecated/atomic/atomic.h>
  5. #include <util/system/defaults.h>
  6. #include <util/system/event.h>
  7. #include <util/system/guard.h>
  8. #include <util/system/mutex.h>
  9. #include <util/generic/list.h>
  10. #include <util/generic/vector.h>
  11. #include <util/generic/noncopyable.h>
  12. class TMuxEvent: public TNonCopyable {
  13. friend inline int WaitForAnyEvent(TMuxEvent** array, const int size, TDuration timeout);
  14. public:
  15. enum ResetMode {
  16. rManual,
  17. // TODO: rAuto is not supported yet
  18. };
  19. TMuxEvent(ResetMode rmode = rManual) {
  20. Y_UNUSED(rmode);
  21. }
  22. ~TMuxEvent() {
  23. Y_ABORT_UNLESS(WaitList.empty(), "");
  24. }
  25. // TODO: potentially unsafe, but currently I can't add "virtual" to TSystemEvent methods
  26. operator TSystemEvent&() {
  27. return MyEvent;
  28. }
  29. operator const TSystemEvent&() const {
  30. return MyEvent;
  31. }
  32. bool WaitD(TInstant deadLine) noexcept {
  33. return MyEvent.WaitD(deadLine);
  34. }
  35. // for rManual it's OK to ignore WaitList
  36. void Reset() noexcept {
  37. TGuard<TMutex> lock(WaitListLock);
  38. MyEvent.Reset(); // TODO: do we actually need to be locked here?
  39. }
  40. void Signal() noexcept {
  41. TGuard<TMutex> lock(WaitListLock);
  42. for (auto& i : WaitList) {
  43. i->Signal();
  44. }
  45. MyEvent.Signal(); // TODO: do we actually need to be locked here?
  46. }
  47. // same as in TSystemEvent
  48. inline bool WaitT(TDuration timeOut) noexcept {
  49. return WaitD(timeOut.ToDeadLine());
  50. }
  51. inline void WaitI() noexcept {
  52. WaitD(TInstant::Max());
  53. }
  54. inline bool Wait(ui32 timer) noexcept {
  55. return WaitT(TDuration::MilliSeconds(timer));
  56. }
  57. inline bool Wait() noexcept {
  58. WaitI();
  59. return true;
  60. }
  61. private:
  62. TSystemEvent MyEvent;
  63. TMutex WaitListLock;
  64. TList<TSystemEvent*> WaitList;
  65. };
  66. ///////////////////////////////////////////////////////////////////////////////
  67. inline int WaitForAnyEvent(TMuxEvent** array, const int size, const TDuration timeout = TDuration::Max()) {
  68. TVector<TList<TSystemEvent*>::iterator> listIters;
  69. listIters.reserve(size);
  70. int result = -1;
  71. TSystemEvent e;
  72. for (int i = 0; i != size; ++i) {
  73. TMuxEvent& me = *array[i];
  74. TGuard<TMutex> lock(me.WaitListLock);
  75. if (me.MyEvent.Wait(0)) {
  76. result = i;
  77. break;
  78. }
  79. listIters.push_back(me.WaitList.insert(me.WaitList.end(), &e));
  80. }
  81. const bool timedOut = result == -1 && !e.WaitT(timeout);
  82. for (int i = 0; i != size; ++i) {
  83. TMuxEvent& me = *array[i];
  84. TGuard<TMutex> lock(me.WaitListLock);
  85. if (i < listIters.ysize()) {
  86. me.WaitList.erase(listIters[i]);
  87. }
  88. if (!timedOut && result == -1 && me.MyEvent.Wait(0)) { // always returns first signalled event
  89. result = i;
  90. }
  91. }
  92. Y_ASSERT(timedOut == (result == -1));
  93. return result;
  94. }
  95. ///////////////////////////////////////////////////////////////////////////////
  96. // TODO: rewrite via template<class TIter...>
  97. inline int WaitForAnyEvent(TMuxEvent& e0, const TDuration timeout = TDuration::Max()) {
  98. TMuxEvent* array[] = {&e0};
  99. return WaitForAnyEvent(array, Y_ARRAY_SIZE(array), timeout);
  100. }
  101. inline int WaitForAnyEvent(TMuxEvent& e0, TMuxEvent& e1, const TDuration timeout = TDuration::Max()) {
  102. TMuxEvent* array[] = {&e0, &e1};
  103. return WaitForAnyEvent(array, Y_ARRAY_SIZE(array), timeout);
  104. }
  105. inline int WaitForAnyEvent(TMuxEvent& e0, TMuxEvent& e1, TMuxEvent& e2, const TDuration timeout = TDuration::Max()) {
  106. TMuxEvent* array[] = {&e0, &e1, &e2};
  107. return WaitForAnyEvent(array, Y_ARRAY_SIZE(array), timeout);
  108. }