rq.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. #include "rq.h"
  2. #include "lfqueue.h"
  3. #include <library/cpp/threading/atomic/bool.h>
  4. #include <util/system/tls.h>
  5. #include <util/system/pipe.h>
  6. #include <util/system/event.h>
  7. #include <util/system/mutex.h>
  8. #include <util/system/condvar.h>
  9. #include <util/system/guard.h>
  10. #include <util/network/socket.h>
  11. #include <util/generic/deque.h>
  12. using namespace NNeh;
  13. namespace {
  14. class TBaseLockFreeRequestQueue: public IRequestQueue {
  15. public:
  16. void Clear() override {
  17. IRequestRef req;
  18. while (Q_.Dequeue(&req)) {
  19. }
  20. }
  21. protected:
  22. NNeh::TAutoLockFreeQueue<IRequest> Q_;
  23. };
  24. class TFdRequestQueue: public TBaseLockFreeRequestQueue {
  25. public:
  26. inline TFdRequestQueue() {
  27. TPipeHandle::Pipe(R_, W_);
  28. SetNonBlock(W_);
  29. }
  30. void Schedule(IRequestRef req) override {
  31. Q_.Enqueue(req);
  32. char ch = 42;
  33. W_.Write(&ch, 1);
  34. }
  35. IRequestRef Next() override {
  36. IRequestRef ret;
  37. #if 0
  38. for (size_t i = 0; i < 20; ++i) {
  39. if (Q_.Dequeue(&ret)) {
  40. return ret;
  41. }
  42. //asm volatile ("pause;");
  43. }
  44. #endif
  45. while (!Q_.Dequeue(&ret)) {
  46. char ch;
  47. R_.Read(&ch, 1);
  48. }
  49. return ret;
  50. }
  51. private:
  52. TPipeHandle R_;
  53. TPipeHandle W_;
  54. };
  55. struct TNehFdEvent {
  56. inline TNehFdEvent() {
  57. TPipeHandle::Pipe(R, W);
  58. SetNonBlock(W);
  59. }
  60. inline void Signal() noexcept {
  61. char ch = 21;
  62. W.Write(&ch, 1);
  63. }
  64. inline void Wait() noexcept {
  65. char buf[128];
  66. R.Read(buf, sizeof(buf));
  67. }
  68. TPipeHandle R;
  69. TPipeHandle W;
  70. };
  71. template <class TEvent>
  72. class TEventRequestQueue: public TBaseLockFreeRequestQueue {
  73. public:
  74. void Schedule(IRequestRef req) override {
  75. Q_.Enqueue(req);
  76. E_.Signal();
  77. }
  78. IRequestRef Next() override {
  79. IRequestRef ret;
  80. while (!Q_.Dequeue(&ret)) {
  81. E_.Wait();
  82. }
  83. E_.Signal();
  84. return ret;
  85. }
  86. private:
  87. TEvent E_;
  88. };
  89. template <class TEvent>
  90. class TLazyEventRequestQueue: public TBaseLockFreeRequestQueue {
  91. public:
  92. void Schedule(IRequestRef req) override {
  93. Q_.Enqueue(req);
  94. if (C_.Val()) {
  95. E_.Signal();
  96. }
  97. }
  98. IRequestRef Next() override {
  99. IRequestRef ret;
  100. C_.Inc();
  101. while (!Q_.Dequeue(&ret)) {
  102. E_.Wait();
  103. }
  104. C_.Dec();
  105. if (Q_.Size() && C_.Val()) {
  106. E_.Signal();
  107. }
  108. return ret;
  109. }
  110. private:
  111. TEvent E_;
  112. TAtomicCounter C_;
  113. };
  114. class TCondVarRequestQueue: public IRequestQueue {
  115. public:
  116. void Clear() override {
  117. TGuard<TMutex> g(M_);
  118. Q_.clear();
  119. }
  120. void Schedule(IRequestRef req) override {
  121. {
  122. TGuard<TMutex> g(M_);
  123. Q_.push_back(req);
  124. }
  125. C_.Signal();
  126. }
  127. IRequestRef Next() override {
  128. TGuard<TMutex> g(M_);
  129. while (Q_.empty()) {
  130. C_.Wait(M_);
  131. }
  132. IRequestRef ret = *Q_.begin();
  133. Q_.pop_front();
  134. return ret;
  135. }
  136. private:
  137. TDeque<IRequestRef> Q_;
  138. TMutex M_;
  139. TCondVar C_;
  140. };
  141. class TBusyRequestQueue: public TBaseLockFreeRequestQueue {
  142. public:
  143. void Schedule(IRequestRef req) override {
  144. Q_.Enqueue(req);
  145. }
  146. IRequestRef Next() override {
  147. IRequestRef ret;
  148. while (!Q_.Dequeue(&ret)) {
  149. }
  150. return ret;
  151. }
  152. };
  153. class TSleepRequestQueue: public TBaseLockFreeRequestQueue {
  154. public:
  155. void Schedule(IRequestRef req) override {
  156. Q_.Enqueue(req);
  157. }
  158. IRequestRef Next() override {
  159. IRequestRef ret;
  160. while (!Q_.Dequeue(&ret)) {
  161. usleep(1);
  162. }
  163. return ret;
  164. }
  165. };
  166. struct TStupidEvent {
  167. inline TStupidEvent()
  168. : InWait(false)
  169. {
  170. }
  171. inline bool Signal() noexcept {
  172. const bool ret = InWait;
  173. Ev.Signal();
  174. return ret;
  175. }
  176. inline void Wait() noexcept {
  177. InWait = true;
  178. Ev.Wait();
  179. InWait = false;
  180. }
  181. TAutoEvent Ev;
  182. NAtomic::TBool InWait;
  183. };
  184. template <class TEvent>
  185. class TLFRequestQueue: public TBaseLockFreeRequestQueue {
  186. struct TLocalQueue: public TEvent {
  187. };
  188. public:
  189. void Schedule(IRequestRef req) override {
  190. Q_.Enqueue(req);
  191. for (TLocalQueue* lq = 0; FQ_.Dequeue(&lq) && !lq->Signal();) {
  192. }
  193. }
  194. IRequestRef Next() override {
  195. while (true) {
  196. IRequestRef ret;
  197. if (Q_.Dequeue(&ret)) {
  198. return ret;
  199. }
  200. TLocalQueue* lq = LocalQueue();
  201. FQ_.Enqueue(lq);
  202. if (Q_.Dequeue(&ret)) {
  203. TLocalQueue* besttry;
  204. if (FQ_.Dequeue(&besttry)) {
  205. if (besttry == lq) {
  206. //huraay, get rid of spurious wakeup
  207. } else {
  208. FQ_.Enqueue(besttry);
  209. }
  210. }
  211. return ret;
  212. }
  213. lq->Wait();
  214. }
  215. }
  216. private:
  217. static inline TLocalQueue* LocalQueue() noexcept {
  218. Y_POD_STATIC_THREAD(TLocalQueue*)
  219. lq((TLocalQueue*)0);
  220. if (!lq) {
  221. Y_STATIC_THREAD(TLocalQueue)
  222. slq;
  223. lq = &(TLocalQueue&)slq;
  224. }
  225. return lq;
  226. }
  227. private:
  228. TLockFreeStack<TLocalQueue*> FQ_;
  229. };
  230. }
  231. IRequestQueueRef NNeh::CreateRequestQueue() {
  232. //return new TCondVarRequestQueue();
  233. //return new TSleepRequestQueue();
  234. //return new TBusyRequestQueue();
  235. //return new TLFRequestQueue<TStupidEvent>();
  236. #if defined(_freebsd_)
  237. return new TFdRequestQueue();
  238. #endif
  239. //return new TFdRequestQueue();
  240. return new TLazyEventRequestQueue<TAutoEvent>();
  241. //return new TEventRequestQueue<TAutoEvent>();
  242. //return new TEventRequestQueue<TNehFdEvent>();
  243. }