123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- #pragma once
- #include "ring_buffer_with_spin_lock.h"
- #include <util/generic/array_ref.h>
- #include <util/generic/vector.h>
- #include <util/system/condvar.h>
- #include <util/system/event.h>
- #include <util/system/mutex.h>
- #include <util/system/thread.h>
- #include <util/thread/lfqueue.h>
- namespace NActor {
- namespace NPrivate {
- struct TExecutorHistory {
- struct THistoryRecord {
- ui32 MaxQueueSize;
- };
- TVector<THistoryRecord> HistoryRecords;
- ui64 LastHistoryRecordSecond;
- ui64 FirstHistoryRecordSecond() const {
- return LastHistoryRecordSecond - HistoryRecords.size() + 1;
- }
- };
- struct TExecutorStatus {
- size_t WorkQueueSize = 0;
- TExecutorHistory History;
- TString Status;
- };
- }
- class IWorkItem {
- public:
- virtual ~IWorkItem() {
- }
- virtual void DoWork(/* must release this */) = 0;
- };
- struct TExecutorWorker;
- class TExecutor: public TAtomicRefCount<TExecutor> {
- friend struct TExecutorWorker;
- public:
- struct TConfig {
- size_t WorkerCount;
- const char* Name;
- TConfig()
- : WorkerCount(1)
- , Name()
- {
- }
- };
- private:
- struct TImpl;
- THolder<TImpl> Impl;
- const TConfig Config;
- TAtomic ExitWorkers;
- TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
- TRingBufferWithSpinLock<IWorkItem*> WorkItems;
- TMutex WorkMutex;
- TCondVar WorkAvailable;
- public:
- explicit TExecutor(size_t workerCount);
- TExecutor(const TConfig& config);
- ~TExecutor();
- void Stop();
- void EnqueueWork(TArrayRef<IWorkItem* const> w);
- size_t GetWorkQueueSize() const;
- TString GetStatus() const;
- TString GetStatusSingleLine() const;
- NPrivate::TExecutorStatus GetStatusRecordInternal() const;
- bool IsInExecutorThread() const;
- private:
- void Init();
- TAutoPtr<IWorkItem> DequeueWork();
- void ProcessWorkQueueHere();
- inline void RunWorkItem(TAutoPtr<IWorkItem>);
- void RunWorker();
- ui32 GetMaxQueueSizeAndClear() const;
- };
- using TExecutorPtr = TIntrusivePtr<TExecutor>;
- }
|