#pragma once #include "ring_buffer_with_spin_lock.h" #include #include #include #include #include #include #include namespace NActor { namespace NPrivate { struct TExecutorHistory { struct THistoryRecord { ui32 MaxQueueSize; }; TVector HistoryRecords; ui64 LastHistoryRecordSecond; ui64 FirstHistoryRecordSecond() const { return LastHistoryRecordSecond - HistoryRecords.size() + 1; } }; struct TExecutorStatus { size_t WorkQueueSize = 0; TExecutorHistory History; TString Status; }; } class IWorkItem { public: virtual ~IWorkItem() { } virtual void DoWork(/* must release this */) = 0; }; struct TExecutorWorker; class TExecutor: public TAtomicRefCount { friend struct TExecutorWorker; public: struct TConfig { size_t WorkerCount; const char* Name; TConfig() : WorkerCount(1) , Name() { } }; private: struct TImpl; THolder Impl; const TConfig Config; TAtomic ExitWorkers; TVector> WorkerThreads; TRingBufferWithSpinLock WorkItems; TMutex WorkMutex; TCondVar WorkAvailable; public: explicit TExecutor(size_t workerCount); TExecutor(const TConfig& config); ~TExecutor(); void Stop(); void EnqueueWork(TArrayRef w); size_t GetWorkQueueSize() const; TString GetStatus() const; TString GetStatusSingleLine() const; NPrivate::TExecutorStatus GetStatusRecordInternal() const; bool IsInExecutorThread() const; private: void Init(); TAutoPtr DequeueWork(); void ProcessWorkQueueHere(); inline void RunWorkItem(TAutoPtr); void RunWorker(); ui32 GetMaxQueueSizeAndClear() const; }; using TExecutorPtr = TIntrusivePtr; }