queue.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. #pragma once
  2. #include <util/datetime/base.h>
  3. #include <util/generic/noncopyable.h>
  4. #include <util/generic/ptr.h>
  5. #include <util/generic/typetraits.h>
  6. #include <util/generic/vector.h>
  7. #include <util/generic/ylimits.h>
  8. #include <library/cpp/deprecated/atomic/atomic.h>
  9. #include <util/system/guard.h>
  10. #include <util/system/spinlock.h>
  11. #include <util/system/yassert.h>
  12. #include <type_traits>
  13. #include <utility>
  14. namespace NThreading {
  15. ////////////////////////////////////////////////////////////////////////////////
  16. // Platform helpers
  17. #if !defined(PLATFORM_CACHE_LINE)
  18. #define PLATFORM_CACHE_LINE 64
  19. #endif
  20. #if !defined(PLATFORM_PAGE_SIZE)
  21. #define PLATFORM_PAGE_SIZE 4 * 1024
  22. #endif
  23. template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
  24. struct TPadded: public T {
  25. char Pad[PadSize - sizeof(T) % PadSize];
  26. TPadded() {
  27. static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
  28. Y_UNUSED(Pad);
  29. }
  30. template<typename... Args>
  31. TPadded(Args&&... args)
  32. : T(std::forward<Args>(args)...)
  33. {
  34. static_assert(sizeof(*this) % PadSize == 0, "padding does not work");
  35. Y_UNUSED(Pad);
  36. }
  37. };
  38. ////////////////////////////////////////////////////////////////////////////////
  39. // Type helpers
  40. namespace NImpl {
  41. template <typename T>
  42. struct TPodTypeHelper {
  43. template <typename TT>
  44. static void Write(T* ptr, TT&& value) {
  45. *ptr = value;
  46. }
  47. static T Read(T* ptr) {
  48. return *ptr;
  49. }
  50. static void Destroy(T* ptr) {
  51. Y_UNUSED(ptr);
  52. }
  53. };
  54. template <typename T>
  55. struct TNonPodTypeHelper {
  56. template <typename TT>
  57. static void Write(T* ptr, TT&& value) {
  58. new (ptr) T(std::forward<TT>(value));
  59. }
  60. static T Read(T* ptr) {
  61. return std::move(*ptr);
  62. }
  63. static void Destroy(T* ptr) {
  64. (void)ptr; /* Make MSVC happy. */
  65. ptr->~T();
  66. }
  67. };
  68. template <typename T>
  69. using TTypeHelper = std::conditional_t<
  70. TTypeTraits<T>::IsPod,
  71. TPodTypeHelper<T>,
  72. TNonPodTypeHelper<T>>;
  73. }
  74. ////////////////////////////////////////////////////////////////////////////////
  75. // One producer/one consumer chunked queue.
  76. template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  77. class TOneOneQueue: private TNonCopyable {
  78. using TTypeHelper = NImpl::TTypeHelper<T>;
  79. struct TChunk;
  80. struct TChunkHeader {
  81. size_t Count = 0;
  82. TChunk* Next = nullptr;
  83. };
  84. struct TChunk: public TChunkHeader {
  85. static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T);
  86. char Entries[MaxCount * sizeof(T)];
  87. TChunk() {
  88. Y_UNUSED(Entries); // uninitialized
  89. }
  90. ~TChunk() {
  91. for (size_t i = 0; i < this->Count; ++i) {
  92. TTypeHelper::Destroy(GetPtr(i));
  93. }
  94. }
  95. T* GetPtr(size_t i) {
  96. return (T*)Entries + i;
  97. }
  98. };
  99. struct TWriterState {
  100. TChunk* Chunk = nullptr;
  101. };
  102. struct TReaderState {
  103. TChunk* Chunk = nullptr;
  104. size_t Count = 0;
  105. };
  106. private:
  107. TPadded<TWriterState> Writer;
  108. TPadded<TReaderState> Reader;
  109. public:
  110. using TItem = T;
  111. TOneOneQueue() {
  112. Writer.Chunk = Reader.Chunk = new TChunk();
  113. }
  114. ~TOneOneQueue() {
  115. DeleteChunks(Reader.Chunk);
  116. }
  117. template <typename TT>
  118. void Enqueue(TT&& value) {
  119. T* ptr = PrepareWrite();
  120. Y_ASSERT(ptr);
  121. TTypeHelper::Write(ptr, std::forward<TT>(value));
  122. CompleteWrite();
  123. }
  124. bool Dequeue(T& value) {
  125. if (T* ptr = PrepareRead()) {
  126. value = TTypeHelper::Read(ptr);
  127. CompleteRead();
  128. return true;
  129. }
  130. return false;
  131. }
  132. bool IsEmpty() {
  133. return !PrepareRead();
  134. }
  135. protected:
  136. T* PrepareWrite() {
  137. TChunk* chunk = Writer.Chunk;
  138. Y_ASSERT(chunk && !chunk->Next);
  139. if (chunk->Count != TChunk::MaxCount) {
  140. return chunk->GetPtr(chunk->Count);
  141. }
  142. chunk = new TChunk();
  143. AtomicSet(Writer.Chunk->Next, chunk);
  144. Writer.Chunk = chunk;
  145. return chunk->GetPtr(0);
  146. }
  147. void CompleteWrite() {
  148. AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1);
  149. }
  150. T* PrepareRead() {
  151. TChunk* chunk = Reader.Chunk;
  152. Y_ASSERT(chunk);
  153. for (;;) {
  154. size_t writerCount = AtomicGet(chunk->Count);
  155. if (Reader.Count != writerCount) {
  156. return chunk->GetPtr(Reader.Count);
  157. }
  158. if (writerCount != TChunk::MaxCount) {
  159. return nullptr;
  160. }
  161. chunk = AtomicGet(chunk->Next);
  162. if (!chunk) {
  163. return nullptr;
  164. }
  165. delete Reader.Chunk;
  166. Reader.Chunk = chunk;
  167. Reader.Count = 0;
  168. }
  169. }
  170. void CompleteRead() {
  171. ++Reader.Count;
  172. }
  173. private:
  174. static void DeleteChunks(TChunk* chunk) {
  175. while (chunk) {
  176. TChunk* next = chunk->Next;
  177. delete chunk;
  178. chunk = next;
  179. }
  180. }
  181. };
  182. ////////////////////////////////////////////////////////////////////////////////
  183. // Multiple producers/single consumer partitioned queue.
  184. // Provides FIFO guaranties for each producer.
  185. template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  186. class TManyOneQueue: private TNonCopyable {
  187. using TTypeHelper = NImpl::TTypeHelper<T>;
  188. struct TEntry {
  189. T Value;
  190. ui64 Tag;
  191. };
  192. struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
  193. TAtomic WriteLock = 0;
  194. using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
  195. using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
  196. using TOneOneQueue<TEntry, ChunkSize>::PrepareRead;
  197. using TOneOneQueue<TEntry, ChunkSize>::CompleteRead;
  198. };
  199. private:
  200. union {
  201. TAtomic WriteTag = 0;
  202. char Pad[PLATFORM_CACHE_LINE];
  203. };
  204. TQueueType Queues[Concurrency];
  205. public:
  206. using TItem = T;
  207. template <typename TT>
  208. void Enqueue(TT&& value) {
  209. ui64 tag = NextTag();
  210. while (!TryEnqueue(std::forward<TT>(value), tag)) {
  211. SpinLockPause();
  212. }
  213. }
  214. bool Dequeue(T& value) {
  215. size_t index = 0;
  216. if (TEntry* entry = PrepareRead(index)) {
  217. value = TTypeHelper::Read(&entry->Value);
  218. Queues[index].CompleteRead();
  219. return true;
  220. }
  221. return false;
  222. }
  223. bool IsEmpty() {
  224. for (size_t i = 0; i < Concurrency; ++i) {
  225. if (!Queues[i].IsEmpty()) {
  226. return false;
  227. }
  228. }
  229. return true;
  230. }
  231. private:
  232. ui64 NextTag() {
  233. // TODO: can we avoid synchronization here? it costs 1.5x performance penalty
  234. // return GetCycleCount();
  235. return AtomicIncrement(WriteTag);
  236. }
  237. template <typename TT>
  238. bool TryEnqueue(TT&& value, ui64 tag) {
  239. for (size_t i = 0; i < Concurrency; ++i) {
  240. TQueueType& queue = Queues[i];
  241. if (AtomicTryAndTryLock(&queue.WriteLock)) {
  242. TEntry* entry = queue.PrepareWrite();
  243. Y_ASSERT(entry);
  244. TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
  245. entry->Tag = tag;
  246. queue.CompleteWrite();
  247. AtomicUnlock(&queue.WriteLock);
  248. return true;
  249. }
  250. }
  251. return false;
  252. }
  253. TEntry* PrepareRead(size_t& index) {
  254. TEntry* entry = nullptr;
  255. ui64 tag = Max();
  256. for (size_t i = 0; i < Concurrency; ++i) {
  257. TEntry* e = Queues[i].PrepareRead();
  258. if (e && e->Tag < tag) {
  259. index = i;
  260. entry = e;
  261. tag = e->Tag;
  262. }
  263. }
  264. if (entry) {
  265. // need second pass to catch updates within already scanned range
  266. size_t candidate = index;
  267. for (size_t i = 0; i < candidate; ++i) {
  268. TEntry* e = Queues[i].PrepareRead();
  269. if (e && e->Tag < tag) {
  270. index = i;
  271. entry = e;
  272. tag = e->Tag;
  273. }
  274. }
  275. }
  276. return entry;
  277. }
  278. };
  279. ////////////////////////////////////////////////////////////////////////////////
  280. // Concurrent many-many queue with strong FIFO guaranties.
  281. // Writers will not block readers (and vice versa), but will block each other.
  282. template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
  283. class TManyManyQueue: private TNonCopyable {
  284. private:
  285. TPadded<TLock> WriteLock;
  286. TPadded<TLock> ReadLock;
  287. TOneOneQueue<T, ChunkSize> Queue;
  288. public:
  289. using TItem = T;
  290. template <typename TT>
  291. void Enqueue(TT&& value) {
  292. with_lock (WriteLock) {
  293. Queue.Enqueue(std::forward<TT>(value));
  294. }
  295. }
  296. bool Dequeue(T& value) {
  297. with_lock (ReadLock) {
  298. return Queue.Dequeue(value);
  299. }
  300. }
  301. bool IsEmpty() {
  302. with_lock (ReadLock) {
  303. return Queue.IsEmpty();
  304. }
  305. }
  306. };
  307. ////////////////////////////////////////////////////////////////////////////////
  308. // Multiple producers/single consumer partitioned queue.
  309. // Because of random partitioning reordering possible - FIFO not guaranteed!
  310. template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  311. class TRelaxedManyOneQueue: private TNonCopyable {
  312. struct TQueueType: public TOneOneQueue<T, ChunkSize> {
  313. TAtomic WriteLock = 0;
  314. };
  315. private:
  316. union {
  317. size_t ReadPos = 0;
  318. char Pad[PLATFORM_CACHE_LINE];
  319. };
  320. TQueueType Queues[Concurrency];
  321. public:
  322. using TItem = T;
  323. template <typename TT>
  324. void Enqueue(TT&& value) {
  325. while (!TryEnqueue(std::forward<TT>(value))) {
  326. SpinLockPause();
  327. }
  328. }
  329. bool Dequeue(T& value) {
  330. for (size_t i = 0; i < Concurrency; ++i) {
  331. TQueueType& queue = Queues[ReadPos++ % Concurrency];
  332. if (queue.Dequeue(value)) {
  333. return true;
  334. }
  335. }
  336. return false;
  337. }
  338. bool IsEmpty() {
  339. for (size_t i = 0; i < Concurrency; ++i) {
  340. if (!Queues[i].IsEmpty()) {
  341. return false;
  342. }
  343. }
  344. return true;
  345. }
  346. private:
  347. template <typename TT>
  348. bool TryEnqueue(TT&& value) {
  349. size_t writePos = GetCycleCount();
  350. for (size_t i = 0; i < Concurrency; ++i) {
  351. TQueueType& queue = Queues[writePos++ % Concurrency];
  352. if (AtomicTryAndTryLock(&queue.WriteLock)) {
  353. queue.Enqueue(std::forward<TT>(value));
  354. AtomicUnlock(&queue.WriteLock);
  355. return true;
  356. }
  357. }
  358. return false;
  359. }
  360. };
  361. ////////////////////////////////////////////////////////////////////////////////
  362. // Concurrent many-many partitioned queue.
  363. // Because of random partitioning reordering possible - FIFO not guaranteed!
  364. template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  365. class TRelaxedManyManyQueue: private TNonCopyable {
  366. struct TQueueType: public TOneOneQueue<T, ChunkSize> {
  367. union {
  368. TAtomic WriteLock = 0;
  369. char Pad1[PLATFORM_CACHE_LINE];
  370. };
  371. union {
  372. TAtomic ReadLock = 0;
  373. char Pad2[PLATFORM_CACHE_LINE];
  374. };
  375. };
  376. private:
  377. TQueueType Queues[Concurrency];
  378. public:
  379. using TItem = T;
  380. template <typename TT>
  381. void Enqueue(TT&& value) {
  382. while (!TryEnqueue(std::forward<TT>(value))) {
  383. SpinLockPause();
  384. }
  385. }
  386. bool Dequeue(T& value) {
  387. size_t readPos = GetCycleCount();
  388. for (size_t i = 0; i < Concurrency; ++i) {
  389. TQueueType& queue = Queues[readPos++ % Concurrency];
  390. if (AtomicTryAndTryLock(&queue.ReadLock)) {
  391. bool dequeued = queue.Dequeue(value);
  392. AtomicUnlock(&queue.ReadLock);
  393. if (dequeued) {
  394. return true;
  395. }
  396. }
  397. }
  398. return false;
  399. }
  400. bool IsEmpty() {
  401. for (size_t i = 0; i < Concurrency; ++i) {
  402. TQueueType& queue = Queues[i];
  403. if (AtomicTryAndTryLock(&queue.ReadLock)) {
  404. bool empty = queue.IsEmpty();
  405. AtomicUnlock(&queue.ReadLock);
  406. if (!empty) {
  407. return false;
  408. }
  409. }
  410. }
  411. return true;
  412. }
  413. private:
  414. template <typename TT>
  415. bool TryEnqueue(TT&& value) {
  416. size_t writePos = GetCycleCount();
  417. for (size_t i = 0; i < Concurrency; ++i) {
  418. TQueueType& queue = Queues[writePos++ % Concurrency];
  419. if (AtomicTryAndTryLock(&queue.WriteLock)) {
  420. queue.Enqueue(std::forward<TT>(value));
  421. AtomicUnlock(&queue.WriteLock);
  422. return true;
  423. }
  424. }
  425. return false;
  426. }
  427. };
  428. ////////////////////////////////////////////////////////////////////////////////
  429. // Simple wrapper to deal with AutoPtrs
  430. template <typename T, typename TImpl>
  431. class TAutoQueueBase: private TNonCopyable {
  432. private:
  433. TImpl Impl;
  434. public:
  435. using TItem = TAutoPtr<T>;
  436. ~TAutoQueueBase() {
  437. TAutoPtr<T> value;
  438. while (Dequeue(value)) {
  439. // do nothing
  440. }
  441. }
  442. void Enqueue(TAutoPtr<T> value) {
  443. Impl.Enqueue(value.Get());
  444. Y_UNUSED(value.Release());
  445. }
  446. bool Dequeue(TAutoPtr<T>& value) {
  447. T* ptr = nullptr;
  448. if (Impl.Dequeue(ptr)) {
  449. value.Reset(ptr);
  450. return true;
  451. }
  452. return false;
  453. }
  454. bool IsEmpty() {
  455. return Impl.IsEmpty();
  456. }
  457. };
  458. template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  459. using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>;
  460. template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  461. using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>;
  462. template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock>
  463. using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>;
  464. template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  465. using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>;
  466. template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
  467. using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
  468. }