executor.h 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #pragma once
  2. #include "ring_buffer_with_spin_lock.h"
  3. #include <util/generic/array_ref.h>
  4. #include <util/generic/vector.h>
  5. #include <util/system/condvar.h>
  6. #include <util/system/event.h>
  7. #include <util/system/mutex.h>
  8. #include <util/system/thread.h>
  9. #include <util/thread/lfqueue.h>
  10. namespace NActor {
  11. namespace NPrivate {
  12. struct TExecutorHistory {
  13. struct THistoryRecord {
  14. ui32 MaxQueueSize;
  15. };
  16. TVector<THistoryRecord> HistoryRecords;
  17. ui64 LastHistoryRecordSecond;
  18. ui64 FirstHistoryRecordSecond() const {
  19. return LastHistoryRecordSecond - HistoryRecords.size() + 1;
  20. }
  21. };
  22. struct TExecutorStatus {
  23. size_t WorkQueueSize = 0;
  24. TExecutorHistory History;
  25. TString Status;
  26. };
  27. }
  28. class IWorkItem {
  29. public:
  30. virtual ~IWorkItem() {
  31. }
  32. virtual void DoWork(/* must release this */) = 0;
  33. };
  34. struct TExecutorWorker;
  35. class TExecutor: public TAtomicRefCount<TExecutor> {
  36. friend struct TExecutorWorker;
  37. public:
  38. struct TConfig {
  39. size_t WorkerCount;
  40. const char* Name;
  41. TConfig()
  42. : WorkerCount(1)
  43. , Name()
  44. {
  45. }
  46. };
  47. private:
  48. struct TImpl;
  49. THolder<TImpl> Impl;
  50. const TConfig Config;
  51. TAtomic ExitWorkers;
  52. TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
  53. TRingBufferWithSpinLock<IWorkItem*> WorkItems;
  54. TMutex WorkMutex;
  55. TCondVar WorkAvailable;
  56. public:
  57. explicit TExecutor(size_t workerCount);
  58. TExecutor(const TConfig& config);
  59. ~TExecutor();
  60. void Stop();
  61. void EnqueueWork(TArrayRef<IWorkItem* const> w);
  62. size_t GetWorkQueueSize() const;
  63. TString GetStatus() const;
  64. TString GetStatusSingleLine() const;
  65. NPrivate::TExecutorStatus GetStatusRecordInternal() const;
  66. bool IsInExecutorThread() const;
  67. private:
  68. void Init();
  69. TAutoPtr<IWorkItem> DequeueWork();
  70. void ProcessWorkQueueHere();
  71. inline void RunWorkItem(TAutoPtr<IWorkItem>);
  72. void RunWorker();
  73. ui32 GetMaxQueueSizeAndClear() const;
  74. };
  75. using TExecutorPtr = TIntrusivePtr<TExecutor>;
  76. }