scheduler_basic.h 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. #pragma once
  2. #include "actorsystem.h"
  3. #include "monotonic.h"
  4. #include "scheduler_queue.h"
  5. #include <library/cpp/actors/util/queue_chunk.h>
  6. #include <library/cpp/threading/future/legacy_future.h>
  7. #include <util/generic/hash.h>
  8. #include <util/generic/map.h>
  9. namespace NActors {
  10. class TBasicSchedulerThread: public ISchedulerThread {
  11. // TODO: replace with NUMA-local threads and per-thread schedules
  12. const TSchedulerConfig Config;
  13. struct TMonCounters;
  14. const THolder<TMonCounters> MonCounters;
  15. TActorSystem* ActorSystem;
  16. volatile ui64* CurrentTimestamp;
  17. volatile ui64* CurrentMonotonic;
  18. ui32 TotalReaders;
  19. TArrayHolder<NSchedulerQueue::TReader*> Readers;
  20. volatile bool StopFlag;
  21. typedef TMap<ui64, TAutoPtr<NSchedulerQueue::TQueueType>> TMomentMap; // intrasecond queues
  22. typedef THashMap<ui64, TAutoPtr<TMomentMap>> TScheduleMap; // over-second schedule
  23. TScheduleMap ScheduleMap;
  24. THolder<NThreading::TLegacyFuture<void, false>> MainCycle;
  25. static const ui64 IntrasecondThreshold = 1048576; // ~second
  26. void CycleFunc();
  27. public:
  28. TBasicSchedulerThread(const TSchedulerConfig& config = TSchedulerConfig());
  29. ~TBasicSchedulerThread();
  30. void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override;
  31. void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override;
  32. void PrepareStart() override;
  33. void Start() override;
  34. void PrepareStop() override;
  35. void Stop() override;
  36. };
  37. class TMockSchedulerThread: public ISchedulerThread {
  38. public:
  39. virtual ~TMockSchedulerThread() override {
  40. }
  41. void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) override {
  42. Y_UNUSED(actorSystem);
  43. *currentTimestamp = TInstant::Now().MicroSeconds();
  44. *currentMonotonic = GetMonotonicMicroSeconds();
  45. }
  46. void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) override {
  47. Y_UNUSED(readers);
  48. Y_UNUSED(scheduleReadersCount);
  49. }
  50. void Start() override {
  51. }
  52. void PrepareStop() override {
  53. }
  54. void Stop() override {
  55. }
  56. };
  57. ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& cfg);
  58. }