#include "rain_check.h" #include #include #include using namespace NRainCheck; using namespace NRainCheck::NPrivate; using namespace NActor; namespace { Y_POD_STATIC_THREAD(TTaskRunnerBase*) ThreadCurrentTask; } void TNopSubtaskListener::SetDone() { } TNopSubtaskListener TNopSubtaskListener::Instance; TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr impl) : TActor(env->GetExecutor()) , Impl(impl) , ParentTask(parentTask) //, HoldsSelfReference(false) , Done(false) , SetDoneCalled(false) { } TTaskRunnerBase::~TTaskRunnerBase() { Y_ASSERT(Done); } namespace { struct TRunningInThisThreadGuard { TTaskRunnerBase* const Task; TRunningInThisThreadGuard(TTaskRunnerBase* task) : Task(task) { Y_ASSERT(!ThreadCurrentTask); ThreadCurrentTask = task; } ~TRunningInThisThreadGuard() { Y_ASSERT(ThreadCurrentTask == Task); ThreadCurrentTask = nullptr; } }; } void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) { Y_ASSERT(RefCount() > 0); TRunningInThisThreadGuard g(this); //RetainRef(); for (;;) { TTempTlsVector temp; temp.GetVector()->swap(Pending); for (auto& pending : *temp.GetVector()) { if (pending->IsComplete()) { pending->FireCompletionCallback(GetImplBase()); } else { Pending.push_back(pending); } } if (!Pending.empty()) { return; } if (!Done) { Done = !ReplyReceived(); } else { if (Pending.empty()) { if (!SetDoneCalled) { ParentTask->SetDone(); SetDoneCalled = true; } //ReleaseRef(); return; } } } } bool TTaskRunnerBase::IsRunningInThisThread() const { return ThreadCurrentTask == this; } TSubtaskCompletion::~TSubtaskCompletion() { ESubtaskState state = State.Get(); Y_ASSERT(state == CREATED || state == DONE || state == CANCELED); } void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) { Y_ASSERT(IsComplete()); if (!!CompletionFunc) { TSubtaskCompletionFunc temp = CompletionFunc; // completion func must be reset before calling it, // because function may set it back CompletionFunc = TSubtaskCompletionFunc(); (task->*(temp.Func))(this); } } void NRainCheck::TSubtaskCompletion::Cancel() { for (;;) { ESubtaskState state = State.Get(); if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) { return; } if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) { return; } if (state == DONE && State.CompareAndSet(DONE, CANCELED)) { return; } if (state == CANCEL_REQUESTED || state == CANCELED) { return; } } } void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) { Y_ASSERT(!TaskRunner); Y_ASSERT(!!parent); TaskRunner = parent; parent->Pending.push_back(this); parent->RefV(); for (;;) { ESubtaskState current = State.Get(); if (current != CREATED && current != DONE) { Y_ABORT("current state should be CREATED or DONE: %s", ToCString(current)); } if (State.CompareAndSet(current, RUNNING)) { return; } } } void TSubtaskCompletion::SetDone() { Y_ASSERT(!!TaskRunner); TTaskRunnerBase* temp = TaskRunner; TaskRunner = nullptr; for (;;) { ESubtaskState state = State.Get(); if (state == RUNNING) { if (State.CompareAndSet(RUNNING, DONE)) { break; } } else if (state == CANCEL_REQUESTED) { if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) { break; } } else { Y_ABORT("cannot SetDone: unknown state: %s", ToCString(state)); } } temp->ScheduleV(); temp->UnRefV(); } #if 0 void NRainCheck::TTaskRunnerBase::RetainRef() { if (HoldsSelfReference) { return; } HoldsSelfReference = true; Ref(); } void NRainCheck::TTaskRunnerBase::ReleaseRef() { if (!HoldsSelfReference) { return; } HoldsSelfReference = false; DecRef(); } #endif void TTaskRunnerBase::AssertInThisThread() const { Y_ASSERT(IsRunningInThisThread()); } TTaskRunnerBase* TTaskRunnerBase::CurrentTask() { Y_ABORT_UNLESS(!!ThreadCurrentTask); return ThreadCurrentTask; } ITaskBase* TTaskRunnerBase::CurrentTaskImpl() { return CurrentTask()->GetImplBase(); } TString TTaskRunnerBase::GetStatusSingleLine() { return TypeName(*Impl); } bool NRainCheck::AreWeInsideTask() { return ThreadCurrentTask != nullptr; }