123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- #pragma once
- #include "executor.h"
- #include "tasks.h"
- #include "what_thread_does.h"
- #include <util/system/yassert.h>
- namespace NActor {
- class IActor: protected IWorkItem {
- public:
- // TODO: make private
- TTasks Tasks;
- public:
- virtual void ScheduleHereV() = 0;
- virtual void ScheduleV() = 0;
- virtual void ScheduleHereAtMostOnceV() = 0;
- // TODO: make private
- virtual void RefV() = 0;
- virtual void UnRefV() = 0;
- // mute warnings
- ~IActor() override {
- }
- };
- struct TDefaultTag {};
- template <typename TThis, typename TTag = TDefaultTag>
- class TActor: public IActor {
- private:
- TExecutor* const Executor;
- public:
- TActor(TExecutor* executor)
- : Executor(executor)
- {
- }
- void AddTaskFromActorLoop() {
- bool schedule = Tasks.AddTask();
- // TODO: check thread id
- Y_ASSERT(!schedule);
- }
- /**
- * Schedule actor.
- *
- * If actor is sleeping, then actor will be executed right now.
- * If actor is executing right now, it will be executed one more time.
- * If this method is called multiple time, actor will be re-executed no more than one more time.
- */
- void Schedule() {
- if (Tasks.AddTask()) {
- EnqueueWork();
- }
- }
- /**
- * Schedule actor, execute it in current thread.
- *
- * If actor is running, continue executing where it is executing.
- * If actor is sleeping, execute it in current thread.
- *
- * Operation is useful for tasks that are likely to complete quickly.
- */
- void ScheduleHere() {
- if (Tasks.AddTask()) {
- Loop();
- }
- }
- /**
- * Schedule actor, execute in current thread no more than once.
- *
- * If actor is running, continue executing where it is executing.
- * If actor is sleeping, execute one iteration here, and if actor got new tasks,
- * reschedule it in worker pool.
- */
- void ScheduleHereAtMostOnce() {
- if (Tasks.AddTask()) {
- bool fetched = Tasks.FetchTask();
- Y_ABORT_UNLESS(fetched, "happens");
- DoAct();
- // if someone added more tasks, schedule them
- if (Tasks.FetchTask()) {
- bool added = Tasks.AddTask();
- Y_ABORT_UNLESS(!added, "happens");
- EnqueueWork();
- }
- }
- }
- void ScheduleHereV() override {
- ScheduleHere();
- }
- void ScheduleV() override {
- Schedule();
- }
- void ScheduleHereAtMostOnceV() override {
- ScheduleHereAtMostOnce();
- }
- void RefV() override {
- GetThis()->Ref();
- }
- void UnRefV() override {
- GetThis()->UnRef();
- }
- private:
- TThis* GetThis() {
- return static_cast<TThis*>(this);
- }
- void EnqueueWork() {
- GetThis()->Ref();
- Executor->EnqueueWork({this});
- }
- void DoAct() {
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
- GetThis()->Act(TTag());
- }
- void Loop() {
- // TODO: limit number of iterations
- while (Tasks.FetchTask()) {
- DoAct();
- }
- }
- void DoWork() override {
- Y_ASSERT(GetThis()->RefCount() >= 1);
- Loop();
- GetThis()->UnRef();
- }
- };
- }
|