scheduler_actor.h 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. #pragma once
  2. #include "local_tasks.h"
  3. #include <library/cpp/messagebus/actor/actor.h>
  4. #include <library/cpp/messagebus/actor/what_thread_does_guard.h>
  5. #include <library/cpp/messagebus/scheduler/scheduler.h>
  6. #include <util/system/mutex.h>
  7. namespace NBus {
  8. namespace NPrivate {
  9. template <typename TThis, typename TTag = NActor::TDefaultTag>
  10. class TScheduleActor {
  11. typedef NActor::TActor<TThis, TTag> TActorForMe;
  12. private:
  13. TScheduler* const Scheduler;
  14. TMutex Mutex;
  15. TInstant ScheduleTime;
  16. public:
  17. TLocalTasks Alarm;
  18. private:
  19. struct TScheduleItemImpl: public IScheduleItem {
  20. TIntrusivePtr<TThis> Thiz;
  21. TScheduleItemImpl(TIntrusivePtr<TThis> thiz, TInstant when)
  22. : IScheduleItem(when)
  23. , Thiz(thiz)
  24. {
  25. }
  26. void Do() override {
  27. {
  28. TWhatThreadDoesAcquireGuard<TMutex> guard(Thiz->Mutex, "scheduler actor: acquiring lock for Do");
  29. if (Thiz->ScheduleTime == TInstant::Max()) {
  30. // was already fired
  31. return;
  32. }
  33. Thiz->ScheduleTime = TInstant::Max();
  34. }
  35. Thiz->Alarm.AddTask();
  36. Thiz->GetActorForMe()->Schedule();
  37. }
  38. };
  39. public:
  40. TScheduleActor(TScheduler* scheduler)
  41. : Scheduler(scheduler)
  42. , ScheduleTime(TInstant::Max())
  43. {
  44. }
  45. /// call Act(TTag) at specified time, unless it is already scheduled at earlier time.
  46. void ScheduleAt(TInstant when) {
  47. TWhatThreadDoesAcquireGuard<TMutex> guard(Mutex, "scheduler: acquiring lock for ScheduleAt");
  48. if (when > ScheduleTime) {
  49. // already scheduled
  50. return;
  51. }
  52. ScheduleTime = when;
  53. Scheduler->Schedule(new TScheduleItemImpl(GetThis(), when));
  54. }
  55. private:
  56. TThis* GetThis() {
  57. return static_cast<TThis*>(this);
  58. }
  59. TActorForMe* GetActorForMe() {
  60. return static_cast<TActorForMe*>(GetThis());
  61. }
  62. };
  63. }
  64. }