scheduler_queue.h 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #pragma once
  2. #include <library/cpp/actors/util/queue_chunk.h>
  3. namespace NActors {
  4. class IEventHandle;
  5. class ISchedulerCookie;
  6. namespace NSchedulerQueue {
  7. struct TEntry {
  8. ui64 InstantMicroseconds;
  9. IEventHandle* Ev;
  10. ISchedulerCookie* Cookie;
  11. };
  12. struct TChunk : TQueueChunkDerived<TEntry, 512, TChunk> {};
  13. class TReader;
  14. class TWriter;
  15. class TWriterWithPadding;
  16. class TReader : ::TNonCopyable {
  17. TChunk* ReadFrom;
  18. ui32 ReadPosition;
  19. friend class TWriter;
  20. public:
  21. TReader()
  22. : ReadFrom(new TChunk())
  23. , ReadPosition(0)
  24. {
  25. }
  26. ~TReader() {
  27. while (TEntry* x = Pop()) {
  28. if (x->Cookie)
  29. x->Cookie->Detach();
  30. delete x->Ev;
  31. }
  32. delete ReadFrom;
  33. }
  34. TEntry* Pop() {
  35. TChunk* head = ReadFrom;
  36. if (ReadPosition != TChunk::EntriesCount) {
  37. if (AtomicLoad(&head->Entries[ReadPosition].InstantMicroseconds) != 0)
  38. return const_cast<TEntry*>(&head->Entries[ReadPosition++]);
  39. else
  40. return nullptr;
  41. } else if (TChunk* next = AtomicLoad(&head->Next)) {
  42. ReadFrom = next;
  43. delete head;
  44. ReadPosition = 0;
  45. return Pop();
  46. }
  47. return nullptr;
  48. }
  49. };
  50. class TWriter : ::TNonCopyable {
  51. TChunk* WriteTo;
  52. ui32 WritePosition;
  53. public:
  54. TWriter()
  55. : WriteTo(nullptr)
  56. , WritePosition(0)
  57. {
  58. }
  59. void Init(const TReader& reader) {
  60. WriteTo = reader.ReadFrom;
  61. WritePosition = 0;
  62. }
  63. void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) {
  64. if (Y_UNLIKELY(instantMicrosends == 0)) {
  65. // Protect against Pop() getting stuck forever
  66. instantMicrosends = 1;
  67. }
  68. if (WritePosition != TChunk::EntriesCount) {
  69. volatile TEntry& entry = WriteTo->Entries[WritePosition];
  70. entry.Cookie = cookie;
  71. entry.Ev = ev;
  72. AtomicStore(&entry.InstantMicroseconds, instantMicrosends);
  73. ++WritePosition;
  74. } else {
  75. TChunk* next = new TChunk();
  76. volatile TEntry& entry = next->Entries[0];
  77. entry.Cookie = cookie;
  78. entry.Ev = ev;
  79. entry.InstantMicroseconds = instantMicrosends;
  80. AtomicStore(&WriteTo->Next, next);
  81. WriteTo = next;
  82. WritePosition = 1;
  83. }
  84. }
  85. };
  86. class TWriterWithPadding: public TWriter {
  87. private:
  88. ui8 CacheLinePadding[64 - sizeof(TWriter)];
  89. void UnusedCacheLinePadding() {
  90. Y_UNUSED(CacheLinePadding);
  91. }
  92. };
  93. struct TQueueType {
  94. TReader Reader;
  95. TWriter Writer;
  96. TQueueType() {
  97. Writer.Init(Reader);
  98. }
  99. };
  100. }
  101. }