123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- #include "impl.h"
- #include "stack/stack_allocator.h"
- #include "stack/stack_guards.h"
- #include <util/generic/scope.h>
- #include <util/thread/singleton.h>
- #include <util/stream/format.h>
- #include <util/stream/output.h>
- #include <util/system/yassert.h>
- TCont::TJoinWait::TJoinWait(TCont& c) noexcept
- : Cont_(c)
- {}
- void TCont::TJoinWait::Wake() noexcept {
- Cont_.ReSchedule();
- }
- TCont::TCont(NCoro::NStack::IAllocator& allocator,
- uint32_t stackSize,
- TContExecutor& executor,
- NCoro::TTrampoline::TFunc func,
- const char* name) noexcept
- : Executor_(executor)
- , Name_(name)
- , Trampoline_(
- allocator,
- stackSize,
- std::move(func),
- this
- )
- {}
- void TCont::PrintMe(IOutputStream& out) const noexcept {
- out << "cont("
- << "name = " << Name_ << ", "
- << "addr = " << Hex((size_t)this)
- << ")";
- }
- bool TCont::Join(TCont* c, TInstant deadLine, std::function<void(TJoinWait&, TCont*)> forceStop) noexcept {
- TJoinWait ev(*this);
- c->Waiters_.PushBack(&ev);
- do {
- if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
- if (!ev.Empty()) {
- if (forceStop) {
- forceStop(ev, c);
- } else {
- c->Cancel();
- }
- do {
- Switch();
- } while (!ev.Empty());
- }
- return false;
- }
- } while (!ev.Empty());
- return true;
- }
- int TCont::SleepD(TInstant deadline) noexcept {
- TTimerEvent event(this, deadline);
- return ExecuteEvent(&event);
- }
- void TCont::Switch() noexcept {
- Executor()->RunScheduler();
- }
- void TCont::Yield() noexcept {
- if (SleepD(TInstant::Zero())) {
- ReScheduleAndSwitch();
- }
- }
- void TCont::ReScheduleAndSwitch() noexcept {
- ReSchedule();
- Switch();
- }
- void TCont::Terminate() {
- while (!Waiters_.Empty()) {
- Waiters_.PopFront()->Wake();
- }
- Executor()->Exit(this);
- }
- bool TCont::IAmRunning() const noexcept {
- return this == Executor()->Running();
- }
- void TCont::Cancel() noexcept {
- if (Cancelled()) {
- return;
- }
- Cancelled_ = true;
- if (!IAmRunning()) {
- ReSchedule();
- }
- }
- void TCont::Cancel(THolder<std::exception> exception) noexcept {
- if (!Cancelled()) {
- SetException(std::move(exception));
- Cancel();
- }
- }
- void TCont::ReSchedule() noexcept {
- if (Cancelled()) {
- // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
- Executor()->ScheduleExecutionNow(this);
- } else {
- Executor()->ScheduleExecution(this);
- }
- }
- TContExecutor::TContExecutor(
- uint32_t defaultStackSize,
- THolder<IPollerFace> poller,
- NCoro::IScheduleCallback* scheduleCallback,
- NCoro::IEnterPollerCallback* enterPollerCallback,
- NCoro::NStack::EGuard defaultGuard,
- TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
- NCoro::ITime* time
- )
- : ScheduleCallback_(scheduleCallback)
- , EnterPollerCallback_(enterPollerCallback)
- , DefaultStackSize_(defaultStackSize)
- , Poller_(std::move(poller))
- , Time_(time)
- {
- StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
- }
- TContExecutor::~TContExecutor() {
- Y_ABORT_UNLESS(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
- }
- void TContExecutor::Execute() noexcept {
- auto nop = [](void*){};
- Execute(nop);
- }
- void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
- CreateOwned([=](TCont* cont) {
- func(cont, arg);
- }, "sys_main");
- RunScheduler();
- }
- void TContExecutor::WaitForIO() {
- while (Ready_.Empty() && !WaitQueue_.Empty()) {
- const auto now = Now();
- // Waking a coroutine puts it into ReadyNext_ list
- const auto next = WaitQueue_.WakeTimedout(now);
- if (!UserEvents_.Empty()) {
- TIntrusiveList<IUserEvent> userEvents;
- userEvents.Swap(UserEvents_);
- do {
- userEvents.PopFront()->Execute();
- } while (!userEvents.Empty());
- }
- // Polling will return as soon as there is an event to process or a timeout.
- // If there are woken coroutines we do not want to sleep in the poller
- // yet still we want to check for new io
- // to prevent ourselves from locking out of io by constantly waking coroutines.
- if (ReadyNext_.Empty()) {
- if (EnterPollerCallback_) {
- EnterPollerCallback_->OnEnterPoller();
- }
- Poll(next);
- if (EnterPollerCallback_) {
- EnterPollerCallback_->OnExitPoller();
- }
- } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
- if (EnterPollerCallback_) {
- EnterPollerCallback_->OnEnterPoller();
- }
- Poll(now);
- if (EnterPollerCallback_) {
- EnterPollerCallback_->OnExitPoller();
- }
- }
- Ready_.Append(ReadyNext_);
- }
- }
- void TContExecutor::Poll(TInstant deadline) {
- Poller_.Wait(PollerEvents_, deadline);
- LastPoll_ = Now();
- // Waking a coroutine puts it into ReadyNext_ list
- for (auto event : PollerEvents_) {
- auto* lst = (NCoro::TPollEventList*)event.Data;
- const int status = event.Status;
- if (status) {
- for (auto it = lst->Begin(); it != lst->End();) {
- (it++)->OnPollEvent(status);
- }
- } else {
- const ui16 filter = event.Filter;
- for (auto it = lst->Begin(); it != lst->End();) {
- if (it->What() & filter) {
- (it++)->OnPollEvent(0);
- } else {
- ++it;
- }
- }
- }
- }
- }
- void TContExecutor::Abort() noexcept {
- WaitQueue_.Abort();
- auto visitor = [](TCont* c) {
- c->Cancel();
- };
- Ready_.ForEach(visitor);
- ReadyNext_.ForEach(visitor);
- }
- TCont* TContExecutor::Create(
- TContFunc func,
- void* arg,
- const char* name,
- TMaybe<ui32> customStackSize
- ) noexcept {
- return CreateOwned([=](TCont* cont) {
- func(cont, arg);
- }, name, customStackSize);
- }
- TCont* TContExecutor::CreateOwned(
- NCoro::TTrampoline::TFunc func,
- const char* name,
- TMaybe<ui32> customStackSize
- ) noexcept {
- Allocated_ += 1;
- if (!customStackSize) {
- customStackSize = DefaultStackSize_;
- }
- auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
- ScheduleExecution(cont);
- return cont;
- }
- NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
- return StackAllocator_->GetStackStats();
- }
- void TContExecutor::Release(TCont* cont) noexcept {
- delete cont;
- Allocated_ -= 1;
- }
- void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
- ToDelete_.PushBack(cont);
- }
- void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
- cont->Scheduled_ = true;
- ReadyNext_.PushBack(cont);
- }
- void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
- cont->Scheduled_ = true;
- Ready_.PushBack(cont);
- }
- namespace {
- inline TContExecutor*& ThisThreadExecutor() {
- struct TThisThreadExecutorHolder {
- TContExecutor* Executor = nullptr;
- };
- return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor;
- }
- }
- void TContExecutor::DeleteScheduled() noexcept {
- ToDelete_.ForEach([this](TCont* c) {
- Release(c);
- });
- }
- TCont* RunningCont() {
- TContExecutor* thisThreadExecutor = ThisThreadExecutor();
- return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
- }
- void TContExecutor::RunScheduler() noexcept {
- try {
- TContExecutor* const prev = ThisThreadExecutor();
- ThisThreadExecutor() = this;
- TCont* caller = Current_;
- TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_;
- Y_DEFER {
- ThisThreadExecutor() = prev;
- };
- while (true) {
- if (ScheduleCallback_ && Current_) {
- ScheduleCallback_->OnUnschedule(*this);
- }
- WaitForIO();
- DeleteScheduled();
- Ready_.Append(ReadyNext_);
- if (Ready_.Empty()) {
- Current_ = nullptr;
- if (caller) {
- context->SwitchTo(&SchedContext_);
- }
- break;
- }
- TCont* cont = Ready_.PopFront();
- if (ScheduleCallback_) {
- ScheduleCallback_->OnSchedule(*this, *cont);
- }
- Current_ = cont;
- cont->Scheduled_ = false;
- if (cont == caller) {
- break;
- }
- context->SwitchTo(cont->Trampoline_.Context());
- if (Paused_) {
- Paused_ = false;
- Current_ = nullptr;
- break;
- }
- if (caller) {
- break;
- }
- }
- } catch (...) {
- TBackTrace::FromCurrentException().PrintTo(Cerr);
- Y_ABORT("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
- }
- }
- void TContExecutor::Pause() {
- if (auto cont = Running()) {
- Paused_ = true;
- ScheduleExecutionNow(cont);
- cont->SwitchTo(&SchedContext_);
- }
- }
- void TContExecutor::Exit(TCont* cont) noexcept {
- ScheduleToDelete(cont);
- cont->SwitchTo(&SchedContext_);
- Y_ABORT("can not return from exit");
- }
- TInstant TContExecutor::Now() {
- return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
- }
- template <>
- void Out<TCont>(IOutputStream& out, const TCont& c) {
- c.PrintMe(out);
- }
|