track.h 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #pragma once
  2. #include "spawn.h"
  3. #include "task.h"
  4. #include <library/cpp/messagebus/async_result.h>
  5. #include <library/cpp/messagebus/actor/queue_in_actor.h>
  6. #include <library/cpp/messagebus/misc/atomic_box.h>
  7. #include <util/generic/intrlist.h>
  8. #include <util/system/event.h>
  9. namespace NRainCheck {
  10. class TTaskTracker;
  11. namespace NPrivate {
  12. struct ITaskFactory {
  13. virtual TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener*) = 0;
  14. virtual ~ITaskFactory() {
  15. }
  16. };
  17. struct TTaskTrackerReceipt: public ISubtaskListener, public TIntrusiveListItem<TTaskTrackerReceipt> {
  18. TTaskTracker* const TaskTracker;
  19. TIntrusivePtr<TTaskRunnerBase> Task;
  20. TTaskTrackerReceipt(TTaskTracker* taskTracker)
  21. : TaskTracker(taskTracker)
  22. {
  23. }
  24. void SetDone() override;
  25. TString GetStatusSingleLine();
  26. };
  27. struct TTaskTrackerStatus {
  28. ui32 Size;
  29. };
  30. }
  31. class TTaskTracker
  32. : public TAtomicRefCount<TTaskTracker>,
  33. public NActor::TActor<TTaskTracker>,
  34. public NActor::TQueueInActor<TTaskTracker, NPrivate::ITaskFactory*>,
  35. public NActor::TQueueInActor<TTaskTracker, NPrivate::TTaskTrackerReceipt*>,
  36. public NActor::TQueueInActor<TTaskTracker, TAsyncResult<NPrivate::TTaskTrackerStatus>*> {
  37. friend struct NPrivate::TTaskTrackerReceipt;
  38. private:
  39. TAtomicBox<bool> ShutdownFlag;
  40. TSystemEvent ShutdownEvent;
  41. TIntrusiveList<NPrivate::TTaskTrackerReceipt> Tasks;
  42. template <typename TItem>
  43. NActor::TQueueInActor<TTaskTracker, TItem>* GetQueue() {
  44. return this;
  45. }
  46. public:
  47. TTaskTracker(NActor::TExecutor* executor);
  48. ~TTaskTracker() override;
  49. void Shutdown();
  50. void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::ITaskFactory*);
  51. void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, NPrivate::TTaskTrackerReceipt*);
  52. void ProcessItem(NActor::TDefaultTag, NActor::TDefaultTag, TAsyncResult<NPrivate::TTaskTrackerStatus>*);
  53. void Act(NActor::TDefaultTag);
  54. template <typename TTask, typename TEnv, typename TParam>
  55. void Spawn(TEnv* env, TParam param) {
  56. struct TTaskFactory: public NPrivate::ITaskFactory {
  57. TEnv* const Env;
  58. TParam Param;
  59. TTaskFactory(TEnv* env, TParam param)
  60. : Env(env)
  61. , Param(param)
  62. {
  63. }
  64. TIntrusivePtr<TTaskRunnerBase> NewTask(ISubtaskListener* subtaskListener) override {
  65. return NRainCheck::SpawnTask<TTask>(Env, Param, subtaskListener).Get();
  66. }
  67. };
  68. GetQueue<NPrivate::ITaskFactory*>()->EnqueueAndSchedule(new TTaskFactory(env, param));
  69. }
  70. ui32 Size();
  71. };
  72. }