123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- #pragma once
- #include "scheduler_cookie.h"
- #include <library/cpp/actors/util/queue_chunk.h>
- #include <library/cpp/actors/core/event.h>
- namespace NActors {
- class IEventHandle;
- class ISchedulerCookie;
- namespace NSchedulerQueue {
- struct TEntry {
- ui64 InstantMicroseconds;
- IEventHandle* Ev;
- ISchedulerCookie* Cookie;
- };
- struct TChunk : TQueueChunkDerived<TEntry, 512, TChunk> {};
- class TReader;
- class TWriter;
- class TWriterWithPadding;
- class TReader : ::TNonCopyable {
- TChunk* ReadFrom;
- ui32 ReadPosition;
- friend class TWriter;
- public:
- TReader()
- : ReadFrom(new TChunk())
- , ReadPosition(0)
- {
- }
- ~TReader() {
- while (TEntry* x = Pop()) {
- if (x->Cookie)
- x->Cookie->Detach();
- delete x->Ev;
- }
- delete ReadFrom;
- }
- TEntry* Pop() {
- TChunk* head = ReadFrom;
- if (ReadPosition != TChunk::EntriesCount) {
- if (AtomicLoad(&head->Entries[ReadPosition].InstantMicroseconds) != 0)
- return const_cast<TEntry*>(&head->Entries[ReadPosition++]);
- else
- return nullptr;
- } else if (TChunk* next = AtomicLoad(&head->Next)) {
- ReadFrom = next;
- delete head;
- ReadPosition = 0;
- return Pop();
- }
- return nullptr;
- }
- };
- class TWriter : ::TNonCopyable {
- TChunk* WriteTo;
- ui32 WritePosition;
- public:
- TWriter()
- : WriteTo(nullptr)
- , WritePosition(0)
- {
- }
- void Init(const TReader& reader) {
- WriteTo = reader.ReadFrom;
- WritePosition = 0;
- }
- void Push(ui64 instantMicrosends, IEventHandle* ev, ISchedulerCookie* cookie) {
- if (Y_UNLIKELY(instantMicrosends == 0)) {
- // Protect against Pop() getting stuck forever
- instantMicrosends = 1;
- }
- if (WritePosition != TChunk::EntriesCount) {
- volatile TEntry& entry = WriteTo->Entries[WritePosition];
- entry.Cookie = cookie;
- entry.Ev = ev;
- AtomicStore(&entry.InstantMicroseconds, instantMicrosends);
- ++WritePosition;
- } else {
- TChunk* next = new TChunk();
- volatile TEntry& entry = next->Entries[0];
- entry.Cookie = cookie;
- entry.Ev = ev;
- entry.InstantMicroseconds = instantMicrosends;
- AtomicStore(&WriteTo->Next, next);
- WriteTo = next;
- WritePosition = 1;
- }
- }
- };
- class TWriterWithPadding: public TWriter {
- private:
- ui8 CacheLinePadding[64 - sizeof(TWriter)];
- void UnusedCacheLinePadding() {
- Y_UNUSED(CacheLinePadding);
- }
- };
- struct TQueueType {
- TReader Reader;
- TWriter Writer;
- TQueueType() {
- Writer.Init(Reader);
- }
- };
- }
- }
|