scheduler.cpp 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #include "scheduler.h"
  2. #include <util/datetime/base.h>
  3. #include <util/generic/algorithm.h>
  4. #include <util/generic/yexception.h>
  5. //#include "dummy_debugger.h"
  6. using namespace NBus;
  7. using namespace NBus::NPrivate;
  8. class TScheduleDeadlineCompare {
  9. public:
  10. bool operator()(const IScheduleItemAutoPtr& i1, const IScheduleItemAutoPtr& i2) const noexcept {
  11. return i1->GetScheduleTime() > i2->GetScheduleTime();
  12. }
  13. };
  14. TScheduler::TScheduler()
  15. : StopThread(false)
  16. , Thread([&] { this->SchedulerThread(); })
  17. {
  18. }
  19. TScheduler::~TScheduler() {
  20. Y_ABORT_UNLESS(StopThread, "state check");
  21. }
  22. size_t TScheduler::Size() const {
  23. TGuard<TLock> guard(Lock);
  24. return Items.size() + (!!NextItem ? 1 : 0);
  25. }
  26. void TScheduler::Stop() {
  27. {
  28. TGuard<TLock> guard(Lock);
  29. Y_ABORT_UNLESS(!StopThread, "Scheduler already stopped");
  30. StopThread = true;
  31. CondVar.Signal();
  32. }
  33. Thread.Get();
  34. if (!!NextItem) {
  35. NextItem.Destroy();
  36. }
  37. for (auto& item : Items) {
  38. item.Destroy();
  39. }
  40. }
  41. void TScheduler::Schedule(TAutoPtr<IScheduleItem> i) {
  42. TGuard<TLock> lock(Lock);
  43. if (StopThread)
  44. return;
  45. if (!!NextItem) {
  46. if (i->GetScheduleTime() < NextItem->GetScheduleTime()) {
  47. DoSwap(i, NextItem);
  48. }
  49. }
  50. Items.push_back(i);
  51. PushHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
  52. FillNextItem();
  53. CondVar.Signal();
  54. }
  55. void TScheduler::FillNextItem() {
  56. if (!NextItem && !Items.empty()) {
  57. PopHeap(Items.begin(), Items.end(), TScheduleDeadlineCompare());
  58. NextItem = Items.back();
  59. Items.erase(Items.end() - 1);
  60. }
  61. }
  62. void TScheduler::SchedulerThread() {
  63. for (;;) {
  64. IScheduleItemAutoPtr current;
  65. {
  66. TGuard<TLock> guard(Lock);
  67. if (StopThread) {
  68. break;
  69. }
  70. if (!!NextItem) {
  71. CondVar.WaitD(Lock, NextItem->GetScheduleTime());
  72. } else {
  73. CondVar.WaitI(Lock);
  74. }
  75. if (StopThread) {
  76. break;
  77. }
  78. // signal comes if either scheduler is to be stopped of there's work to do
  79. Y_ABORT_UNLESS(!!NextItem, "state check");
  80. if (TInstant::Now() < NextItem->GetScheduleTime()) {
  81. // NextItem is updated since WaitD
  82. continue;
  83. }
  84. current = NextItem.Release();
  85. }
  86. current->Do();
  87. current.Destroy();
  88. {
  89. TGuard<TLock> guard(Lock);
  90. FillNextItem();
  91. }
  92. }
  93. }