impl.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. #include "impl.h"
  2. #include "stack/stack_allocator.h"
  3. #include "stack/stack_guards.h"
  4. #include <util/generic/scope.h>
  5. #include <util/thread/singleton.h>
  6. #include <util/stream/format.h>
  7. #include <util/stream/output.h>
  8. #include <util/system/yassert.h>
  9. TCont::TJoinWait::TJoinWait(TCont& c) noexcept
  10. : Cont_(c)
  11. {}
  12. void TCont::TJoinWait::Wake() noexcept {
  13. Cont_.ReSchedule();
  14. }
  15. TCont::TCont(NCoro::NStack::IAllocator& allocator,
  16. uint32_t stackSize,
  17. TContExecutor& executor,
  18. NCoro::TTrampoline::TFunc func,
  19. const char* name) noexcept
  20. : Executor_(executor)
  21. , Name_(name)
  22. , Trampoline_(
  23. allocator,
  24. stackSize,
  25. std::move(func),
  26. this
  27. )
  28. {}
  29. void TCont::PrintMe(IOutputStream& out) const noexcept {
  30. out << "cont("
  31. << "name = " << Name_ << ", "
  32. << "addr = " << Hex((size_t)this)
  33. << ")";
  34. }
  35. bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
  36. TJoinWait ev(*this);
  37. c->Waiters_.PushBack(&ev);
  38. do {
  39. if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
  40. if (!ev.Empty()) {
  41. c->Cancel();
  42. do {
  43. Switch();
  44. } while (!ev.Empty());
  45. }
  46. return false;
  47. }
  48. } while (!ev.Empty());
  49. return true;
  50. }
  51. int TCont::SleepD(TInstant deadline) noexcept {
  52. TTimerEvent event(this, deadline);
  53. return ExecuteEvent(&event);
  54. }
  55. void TCont::Switch() noexcept {
  56. Executor()->RunScheduler();
  57. }
  58. void TCont::Yield() noexcept {
  59. if (SleepD(TInstant::Zero())) {
  60. ReScheduleAndSwitch();
  61. }
  62. }
  63. void TCont::ReScheduleAndSwitch() noexcept {
  64. ReSchedule();
  65. Switch();
  66. }
  67. void TCont::Terminate() {
  68. while (!Waiters_.Empty()) {
  69. Waiters_.PopFront()->Wake();
  70. }
  71. Executor()->Exit(this);
  72. }
  73. bool TCont::IAmRunning() const noexcept {
  74. return this == Executor()->Running();
  75. }
  76. void TCont::Cancel() noexcept {
  77. if (Cancelled()) {
  78. return;
  79. }
  80. Cancelled_ = true;
  81. if (!IAmRunning()) {
  82. ReSchedule();
  83. }
  84. }
  85. void TCont::ReSchedule() noexcept {
  86. if (Cancelled()) {
  87. // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
  88. Executor()->ScheduleExecutionNow(this);
  89. } else {
  90. Executor()->ScheduleExecution(this);
  91. }
  92. }
  93. TContExecutor::TContExecutor(
  94. uint32_t defaultStackSize,
  95. THolder<IPollerFace> poller,
  96. NCoro::IScheduleCallback* scheduleCallback,
  97. NCoro::IEnterPollerCallback* enterPollerCallback,
  98. NCoro::NStack::EGuard defaultGuard,
  99. TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
  100. NCoro::ITime* time
  101. )
  102. : ScheduleCallback_(scheduleCallback)
  103. , EnterPollerCallback_(enterPollerCallback)
  104. , DefaultStackSize_(defaultStackSize)
  105. , Poller_(std::move(poller))
  106. , Time_(time)
  107. {
  108. StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
  109. }
  110. TContExecutor::~TContExecutor() {
  111. Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
  112. }
  113. void TContExecutor::Execute() noexcept {
  114. auto nop = [](void*){};
  115. Execute(nop);
  116. }
  117. void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
  118. CreateOwned([=](TCont* cont) {
  119. func(cont, arg);
  120. }, "sys_main");
  121. RunScheduler();
  122. }
  123. void TContExecutor::WaitForIO() {
  124. while (Ready_.Empty() && !WaitQueue_.Empty()) {
  125. const auto now = Now();
  126. // Waking a coroutine puts it into ReadyNext_ list
  127. const auto next = WaitQueue_.WakeTimedout(now);
  128. if (!UserEvents_.Empty()) {
  129. TIntrusiveList<IUserEvent> userEvents;
  130. userEvents.Swap(UserEvents_);
  131. do {
  132. userEvents.PopFront()->Execute();
  133. } while (!userEvents.Empty());
  134. }
  135. // Polling will return as soon as there is an event to process or a timeout.
  136. // If there are woken coroutines we do not want to sleep in the poller
  137. // yet still we want to check for new io
  138. // to prevent ourselves from locking out of io by constantly waking coroutines.
  139. if (ReadyNext_.Empty()) {
  140. if (EnterPollerCallback_) {
  141. EnterPollerCallback_->OnEnterPoller();
  142. }
  143. Poll(next);
  144. if (EnterPollerCallback_) {
  145. EnterPollerCallback_->OnExitPoller();
  146. }
  147. } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
  148. if (EnterPollerCallback_) {
  149. EnterPollerCallback_->OnEnterPoller();
  150. }
  151. Poll(now);
  152. if (EnterPollerCallback_) {
  153. EnterPollerCallback_->OnExitPoller();
  154. }
  155. }
  156. Ready_.Append(ReadyNext_);
  157. }
  158. }
  159. void TContExecutor::Poll(TInstant deadline) {
  160. Poller_.Wait(PollerEvents_, deadline);
  161. LastPoll_ = Now();
  162. // Waking a coroutine puts it into ReadyNext_ list
  163. for (auto event : PollerEvents_) {
  164. auto* lst = (NCoro::TPollEventList*)event.Data;
  165. const int status = event.Status;
  166. if (status) {
  167. for (auto it = lst->Begin(); it != lst->End();) {
  168. (it++)->OnPollEvent(status);
  169. }
  170. } else {
  171. const ui16 filter = event.Filter;
  172. for (auto it = lst->Begin(); it != lst->End();) {
  173. if (it->What() & filter) {
  174. (it++)->OnPollEvent(0);
  175. } else {
  176. ++it;
  177. }
  178. }
  179. }
  180. }
  181. }
  182. void TContExecutor::Abort() noexcept {
  183. WaitQueue_.Abort();
  184. auto visitor = [](TCont* c) {
  185. c->Cancel();
  186. };
  187. Ready_.ForEach(visitor);
  188. ReadyNext_.ForEach(visitor);
  189. }
  190. TCont* TContExecutor::Create(
  191. TContFunc func,
  192. void* arg,
  193. const char* name,
  194. TMaybe<ui32> customStackSize
  195. ) noexcept {
  196. return CreateOwned([=](TCont* cont) {
  197. func(cont, arg);
  198. }, name, customStackSize);
  199. }
  200. TCont* TContExecutor::CreateOwned(
  201. NCoro::TTrampoline::TFunc func,
  202. const char* name,
  203. TMaybe<ui32> customStackSize
  204. ) noexcept {
  205. Allocated_ += 1;
  206. if (!customStackSize) {
  207. customStackSize = DefaultStackSize_;
  208. }
  209. auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
  210. ScheduleExecution(cont);
  211. return cont;
  212. }
  213. NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
  214. return StackAllocator_->GetStackStats();
  215. }
  216. void TContExecutor::Release(TCont* cont) noexcept {
  217. delete cont;
  218. Allocated_ -= 1;
  219. }
  220. void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
  221. ToDelete_.PushBack(cont);
  222. }
  223. void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
  224. cont->Scheduled_ = true;
  225. ReadyNext_.PushBack(cont);
  226. }
  227. void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
  228. cont->Scheduled_ = true;
  229. Ready_.PushBack(cont);
  230. }
  231. namespace {
  232. inline TContExecutor*& ThisThreadExecutor() {
  233. struct TThisThreadExecutorHolder {
  234. TContExecutor* Executor = nullptr;
  235. };
  236. return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor;
  237. }
  238. }
  239. void TContExecutor::DeleteScheduled() noexcept {
  240. ToDelete_.ForEach([this](TCont* c) {
  241. Release(c);
  242. });
  243. }
  244. TCont* RunningCont() {
  245. TContExecutor* thisThreadExecutor = ThisThreadExecutor();
  246. return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
  247. }
  248. void TContExecutor::RunScheduler() noexcept {
  249. try {
  250. TContExecutor* const prev = ThisThreadExecutor();
  251. ThisThreadExecutor() = this;
  252. TCont* caller = Current_;
  253. TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_;
  254. Y_DEFER {
  255. ThisThreadExecutor() = prev;
  256. };
  257. while (true) {
  258. if (ScheduleCallback_ && Current_) {
  259. ScheduleCallback_->OnUnschedule(*this);
  260. }
  261. WaitForIO();
  262. DeleteScheduled();
  263. Ready_.Append(ReadyNext_);
  264. if (Ready_.Empty()) {
  265. Current_ = nullptr;
  266. if (caller) {
  267. context->SwitchTo(&SchedContext_);
  268. }
  269. break;
  270. }
  271. TCont* cont = Ready_.PopFront();
  272. if (ScheduleCallback_) {
  273. ScheduleCallback_->OnSchedule(*this, *cont);
  274. }
  275. Current_ = cont;
  276. cont->Scheduled_ = false;
  277. if (cont == caller) {
  278. break;
  279. }
  280. context->SwitchTo(cont->Trampoline_.Context());
  281. if (Paused_) {
  282. Paused_ = false;
  283. Current_ = nullptr;
  284. break;
  285. }
  286. if (caller) {
  287. break;
  288. }
  289. }
  290. } catch (...) {
  291. TBackTrace::FromCurrentException().PrintTo(Cerr);
  292. Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
  293. }
  294. }
  295. void TContExecutor::Pause() {
  296. if (auto cont = Running()) {
  297. Paused_ = true;
  298. ScheduleExecutionNow(cont);
  299. cont->SwitchTo(&SchedContext_);
  300. }
  301. }
  302. void TContExecutor::Exit(TCont* cont) noexcept {
  303. ScheduleToDelete(cont);
  304. cont->SwitchTo(&SchedContext_);
  305. Y_FAIL("can not return from exit");
  306. }
  307. TInstant TContExecutor::Now() {
  308. return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
  309. }
  310. template <>
  311. void Out<TCont>(IOutputStream& out, const TCont& c) {
  312. c.PrintMe(out);
  313. }