task_scheduler.h 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #pragma once
  2. #include <library/cpp/deprecated/atomic/atomic.h>
  3. #include <util/generic/vector.h>
  4. #include <util/generic/ptr.h>
  5. #include <util/generic/map.h>
  6. #include <util/datetime/base.h>
  7. #include <util/system/condvar.h>
  8. #include <util/system/mutex.h>
  9. class TTaskScheduler {
  10. public:
  11. class ITask;
  12. using ITaskRef = TIntrusivePtr<ITask>;
  13. class IRepeatedTask;
  14. using IRepeatedTaskRef = TIntrusivePtr<IRepeatedTask>;
  15. public:
  16. explicit TTaskScheduler(size_t threadCount = 1, size_t maxTaskCount = Max<size_t>());
  17. ~TTaskScheduler();
  18. void Start();
  19. void Stop();
  20. bool Add(ITaskRef task, TInstant expire);
  21. bool Add(IRepeatedTaskRef task, TDuration period);
  22. size_t GetTaskCount() const;
  23. private:
  24. class TWorkerThread;
  25. struct TTaskHolder {
  26. explicit TTaskHolder(ITaskRef& task)
  27. : Task(task)
  28. {
  29. }
  30. public:
  31. ITaskRef Task;
  32. TWorkerThread* WaitingWorker = nullptr;
  33. };
  34. using TQueueType = TMultiMap<TInstant, TTaskHolder>;
  35. using TQueueIterator = TQueueType::iterator;
  36. private:
  37. void ChangeDebugState(TWorkerThread* thread, const TString& state);
  38. void ChooseFromQueue(TQueueIterator& toWait);
  39. bool Wait(TWorkerThread* thread, TQueueIterator& toWait);
  40. void WorkerFunc(TWorkerThread* thread);
  41. private:
  42. bool IsStopped_ = false;
  43. TAtomic TaskCounter_ = 0;
  44. TQueueType Queue_;
  45. TCondVar CondVar_;
  46. TMutex Lock_;
  47. TVector<TAutoPtr<TWorkerThread>> Workers_;
  48. const size_t MaxTaskCount_;
  49. };
  50. class TTaskScheduler::ITask
  51. : public TAtomicRefCount<ITask>
  52. {
  53. public:
  54. virtual ~ITask();
  55. virtual TInstant Process() {//returns time to repeat this task
  56. return TInstant::Max();
  57. }
  58. };
  59. class TTaskScheduler::IRepeatedTask
  60. : public TAtomicRefCount<IRepeatedTask>
  61. {
  62. public:
  63. virtual ~IRepeatedTask();
  64. virtual bool Process() {//returns if to repeat task again
  65. return false;
  66. }
  67. };