scheduler_queue.h 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #pragma once
  2. #include "scheduler_cookie.h"
  3. #include <library/cpp/actors/util/queue_chunk.h>
  4. #include <library/cpp/actors/core/event.h>
  5. namespace NActors {
  6. class IEventHandle;
  7. class ISchedulerCookie;
  8. namespace NSchedulerQueue {
  9. struct TEntry {
  10. ui64 InstantMicroseconds;
  11. IEventHandle* Ev;
  12. ISchedulerCookie* Cookie;
  13. };
  14. struct TChunk : TQueueChunkDerived<TEntry, 512, TChunk> {};
  15. class TReader;
  16. class TWriter;
  17. class TWriterWithPadding;
  18. class TReader : ::TNonCopyable {
  19. TChunk* ReadFrom;
  20. ui32 ReadPosition;
  21. friend class TWriter;
  22. public:
  23. TReader()
  24. : ReadFrom(new TChunk())
  25. , ReadPosition(0)
  26. {
  27. }
  28. ~TReader() {
  29. while (TEntry* x = Pop()) {
  30. if (x->Cookie)
  31. x->Cookie->Detach();
  32. delete x->Ev;
  33. }
  34. delete ReadFrom;
  35. }
  36. TEntry* Pop() {
  37. TChunk* head = ReadFrom;
  38. if (ReadPosition != TChunk::EntriesCount) {
  39. if (AtomicLoad(&head->Entries[ReadPosition].InstantMicroseconds) != 0)
  40. return const_cast<TEntry*>(&head->Entries[ReadPosition++]);
  41. else
  42. return nullptr;
  43. } else if (TChunk* next = AtomicLoad(&head->Next)) {
  44. ReadFrom = next;
  45. delete head;
  46. ReadPosition = 0;
  47. return Pop();
  48. }
  49. return nullptr;
  50. }
  51. };
  52. class TWriter : ::TNonCopyable {
  53. TChunk* WriteTo;
  54. ui32 WritePosition;
  55. public:
  56. TWriter()
  57. : WriteTo(nullptr)
  58. , WritePosition(0)
  59. {
  60. }
  61. void Init(const TReader& reader) {
  62. WriteTo = reader.ReadFrom;
  63. WritePosition = 0;
  64. }
  65. void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) {
  66. if (Y_UNLIKELY(instantMicrosends == 0)) {
  67. // Protect against Pop() getting stuck forever
  68. instantMicrosends = 1;
  69. }
  70. if (WritePosition != TChunk::EntriesCount) {
  71. volatile TEntry& entry = WriteTo->Entries[WritePosition];
  72. entry.Cookie = cookie;
  73. entry.Ev = ev;
  74. AtomicStore(&entry.InstantMicroseconds, instantMicrosends);
  75. ++WritePosition;
  76. } else {
  77. TChunk* next = new TChunk();
  78. volatile TEntry& entry = next->Entries[0];
  79. entry.Cookie = cookie;
  80. entry.Ev = ev;
  81. entry.InstantMicroseconds = instantMicrosends;
  82. AtomicStore(&WriteTo->Next, next);
  83. WriteTo = next;
  84. WritePosition = 1;
  85. }
  86. }
  87. };
  88. class TWriterWithPadding: public TWriter {
  89. private:
  90. ui8 CacheLinePadding[64 - sizeof(TWriter)];
  91. void UnusedCacheLinePadding() {
  92. Y_UNUSED(CacheLinePadding);
  93. }
  94. };
  95. struct TQueueType {
  96. TReader Reader;
  97. TWriter Writer;
  98. TQueueType() {
  99. Writer.Init(Reader);
  100. }
  101. };
  102. }
  103. }