impl.cpp 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. #include "impl.h"
  2. #include "stack/stack_allocator.h"
  3. #include "stack/stack_guards.h"
  4. #include <util/generic/scope.h>
  5. #include <util/thread/singleton.h>
  6. #include <util/stream/format.h>
  7. #include <util/stream/output.h>
  8. #include <util/system/yassert.h>
  9. TCont::TJoinWait::TJoinWait(TCont& c) noexcept
  10. : Cont_(c)
  11. {}
  12. void TCont::TJoinWait::Wake() noexcept {
  13. Cont_.ReSchedule();
  14. }
  15. TCont::TCont(NCoro::NStack::IAllocator& allocator,
  16. uint32_t stackSize,
  17. TContExecutor& executor,
  18. NCoro::TTrampoline::TFunc func,
  19. const char* name) noexcept
  20. : Executor_(executor)
  21. , Name_(name)
  22. , Trampoline_(
  23. allocator,
  24. stackSize,
  25. std::move(func),
  26. this
  27. )
  28. {}
  29. void TCont::PrintMe(IOutputStream& out) const noexcept {
  30. out << "cont("
  31. << "name = " << Name_ << ", "
  32. << "addr = " << Hex((size_t)this)
  33. << ")";
  34. }
  35. bool TCont::Join(TCont* c, TInstant deadLine, std::function<void(TJoinWait&, TCont*)> forceStop) noexcept {
  36. TJoinWait ev(*this);
  37. c->Waiters_.PushBack(&ev);
  38. do {
  39. if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
  40. if (!ev.Empty()) {
  41. if (forceStop) {
  42. forceStop(ev, c);
  43. } else {
  44. c->Cancel();
  45. }
  46. do {
  47. Switch();
  48. } while (!ev.Empty());
  49. }
  50. return false;
  51. }
  52. } while (!ev.Empty());
  53. return true;
  54. }
  55. int TCont::SleepD(TInstant deadline) noexcept {
  56. TTimerEvent event(this, deadline);
  57. return ExecuteEvent(&event);
  58. }
  59. void TCont::Switch() noexcept {
  60. Executor()->RunScheduler();
  61. }
  62. void TCont::Yield() noexcept {
  63. if (SleepD(TInstant::Zero())) {
  64. ReScheduleAndSwitch();
  65. }
  66. }
  67. void TCont::ReScheduleAndSwitch() noexcept {
  68. ReSchedule();
  69. Switch();
  70. }
  71. void TCont::Terminate() {
  72. while (!Waiters_.Empty()) {
  73. Waiters_.PopFront()->Wake();
  74. }
  75. Executor()->Exit(this);
  76. }
  77. bool TCont::IAmRunning() const noexcept {
  78. return this == Executor()->Running();
  79. }
  80. void TCont::Cancel() noexcept {
  81. if (Cancelled()) {
  82. return;
  83. }
  84. Cancelled_ = true;
  85. if (!IAmRunning()) {
  86. ReSchedule();
  87. }
  88. }
  89. void TCont::Cancel(THolder<std::exception> exception) noexcept {
  90. if (!Cancelled()) {
  91. SetException(std::move(exception));
  92. Cancel();
  93. }
  94. }
  95. void TCont::ReSchedule() noexcept {
  96. if (Cancelled()) {
  97. // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
  98. Executor()->ScheduleExecutionNow(this);
  99. } else {
  100. Executor()->ScheduleExecution(this);
  101. }
  102. }
  103. TContExecutor::TContExecutor(
  104. uint32_t defaultStackSize,
  105. THolder<IPollerFace> poller,
  106. NCoro::IScheduleCallback* scheduleCallback,
  107. NCoro::IEnterPollerCallback* enterPollerCallback,
  108. NCoro::NStack::EGuard defaultGuard,
  109. TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
  110. NCoro::ITime* time
  111. )
  112. : ScheduleCallback_(scheduleCallback)
  113. , EnterPollerCallback_(enterPollerCallback)
  114. , DefaultStackSize_(defaultStackSize)
  115. , Poller_(std::move(poller))
  116. , Time_(time)
  117. {
  118. StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
  119. }
  120. TContExecutor::~TContExecutor() {
  121. Y_ABORT_UNLESS(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
  122. }
  123. void TContExecutor::Execute() noexcept {
  124. auto nop = [](void*){};
  125. Execute(nop);
  126. }
  127. void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
  128. CreateOwned([=](TCont* cont) {
  129. func(cont, arg);
  130. }, "sys_main");
  131. RunScheduler();
  132. }
  133. void TContExecutor::WaitForIO() {
  134. while (Ready_.Empty() && !WaitQueue_.Empty()) {
  135. const auto now = Now();
  136. // Waking a coroutine puts it into ReadyNext_ list
  137. const auto next = WaitQueue_.WakeTimedout(now);
  138. if (!UserEvents_.Empty()) {
  139. TIntrusiveList<IUserEvent> userEvents;
  140. userEvents.Swap(UserEvents_);
  141. do {
  142. userEvents.PopFront()->Execute();
  143. } while (!userEvents.Empty());
  144. }
  145. // Polling will return as soon as there is an event to process or a timeout.
  146. // If there are woken coroutines we do not want to sleep in the poller
  147. // yet still we want to check for new io
  148. // to prevent ourselves from locking out of io by constantly waking coroutines.
  149. if (ReadyNext_.Empty()) {
  150. if (EnterPollerCallback_) {
  151. EnterPollerCallback_->OnEnterPoller();
  152. }
  153. Poll(next);
  154. if (EnterPollerCallback_) {
  155. EnterPollerCallback_->OnExitPoller();
  156. }
  157. } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
  158. if (EnterPollerCallback_) {
  159. EnterPollerCallback_->OnEnterPoller();
  160. }
  161. Poll(now);
  162. if (EnterPollerCallback_) {
  163. EnterPollerCallback_->OnExitPoller();
  164. }
  165. }
  166. Ready_.Append(ReadyNext_);
  167. }
  168. }
  169. void TContExecutor::Poll(TInstant deadline) {
  170. Poller_.Wait(PollerEvents_, deadline);
  171. LastPoll_ = Now();
  172. // Waking a coroutine puts it into ReadyNext_ list
  173. for (auto event : PollerEvents_) {
  174. auto* lst = (NCoro::TPollEventList*)event.Data;
  175. const int status = event.Status;
  176. if (status) {
  177. for (auto it = lst->Begin(); it != lst->End();) {
  178. (it++)->OnPollEvent(status);
  179. }
  180. } else {
  181. const ui16 filter = event.Filter;
  182. for (auto it = lst->Begin(); it != lst->End();) {
  183. if (it->What() & filter) {
  184. (it++)->OnPollEvent(0);
  185. } else {
  186. ++it;
  187. }
  188. }
  189. }
  190. }
  191. }
  192. void TContExecutor::Abort() noexcept {
  193. WaitQueue_.Abort();
  194. auto visitor = [](TCont* c) {
  195. c->Cancel();
  196. };
  197. Ready_.ForEach(visitor);
  198. ReadyNext_.ForEach(visitor);
  199. }
  200. TCont* TContExecutor::Create(
  201. TContFunc func,
  202. void* arg,
  203. const char* name,
  204. TMaybe<ui32> customStackSize
  205. ) noexcept {
  206. return CreateOwned([=](TCont* cont) {
  207. func(cont, arg);
  208. }, name, customStackSize);
  209. }
  210. TCont* TContExecutor::CreateOwned(
  211. NCoro::TTrampoline::TFunc func,
  212. const char* name,
  213. TMaybe<ui32> customStackSize
  214. ) noexcept {
  215. Allocated_ += 1;
  216. if (!customStackSize) {
  217. customStackSize = DefaultStackSize_;
  218. }
  219. auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
  220. ScheduleExecution(cont);
  221. return cont;
  222. }
  223. NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
  224. return StackAllocator_->GetStackStats();
  225. }
  226. void TContExecutor::Release(TCont* cont) noexcept {
  227. delete cont;
  228. Allocated_ -= 1;
  229. }
  230. void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
  231. ToDelete_.PushBack(cont);
  232. }
  233. void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
  234. cont->Scheduled_ = true;
  235. ReadyNext_.PushBack(cont);
  236. }
  237. void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
  238. cont->Scheduled_ = true;
  239. Ready_.PushBack(cont);
  240. }
  241. namespace {
  242. inline TContExecutor*& ThisThreadExecutor() {
  243. struct TThisThreadExecutorHolder {
  244. TContExecutor* Executor = nullptr;
  245. };
  246. return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor;
  247. }
  248. }
  249. void TContExecutor::DeleteScheduled() noexcept {
  250. ToDelete_.ForEach([this](TCont* c) {
  251. Release(c);
  252. });
  253. }
  254. TCont* RunningCont() {
  255. TContExecutor* thisThreadExecutor = ThisThreadExecutor();
  256. return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
  257. }
  258. void TContExecutor::RunScheduler() noexcept {
  259. try {
  260. TContExecutor* const prev = ThisThreadExecutor();
  261. ThisThreadExecutor() = this;
  262. TCont* caller = Current_;
  263. TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_;
  264. Y_DEFER {
  265. ThisThreadExecutor() = prev;
  266. };
  267. while (true) {
  268. if (ScheduleCallback_ && Current_) {
  269. ScheduleCallback_->OnUnschedule(*this);
  270. }
  271. WaitForIO();
  272. DeleteScheduled();
  273. Ready_.Append(ReadyNext_);
  274. if (Ready_.Empty()) {
  275. Current_ = nullptr;
  276. if (caller) {
  277. context->SwitchTo(&SchedContext_);
  278. }
  279. break;
  280. }
  281. TCont* cont = Ready_.PopFront();
  282. if (ScheduleCallback_) {
  283. ScheduleCallback_->OnSchedule(*this, *cont);
  284. }
  285. Current_ = cont;
  286. cont->Scheduled_ = false;
  287. if (cont == caller) {
  288. break;
  289. }
  290. context->SwitchTo(cont->Trampoline_.Context());
  291. if (Paused_) {
  292. Paused_ = false;
  293. Current_ = nullptr;
  294. break;
  295. }
  296. if (caller) {
  297. break;
  298. }
  299. }
  300. } catch (...) {
  301. TBackTrace::FromCurrentException().PrintTo(Cerr);
  302. Y_ABORT("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
  303. }
  304. }
  305. void TContExecutor::Pause() {
  306. if (auto cont = Running()) {
  307. Paused_ = true;
  308. ScheduleExecutionNow(cont);
  309. cont->SwitchTo(&SchedContext_);
  310. }
  311. }
  312. void TContExecutor::Exit(TCont* cont) noexcept {
  313. ScheduleToDelete(cont);
  314. cont->SwitchTo(&SchedContext_);
  315. Y_ABORT("can not return from exit");
  316. }
  317. TInstant TContExecutor::Now() {
  318. return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
  319. }
  320. template <>
  321. void Out<TCont>(IOutputStream& out, const TCont& c) {
  322. c.PrintMe(out);
  323. }