task.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. #include "rain_check.h"
  2. #include <library/cpp/messagebus/actor/temp_tls_vector.h>
  3. #include <util/system/type_name.h>
  4. #include <util/system/tls.h>
  5. using namespace NRainCheck;
  6. using namespace NRainCheck::NPrivate;
  7. using namespace NActor;
  8. namespace {
  9. Y_POD_STATIC_THREAD(TTaskRunnerBase*)
  10. ThreadCurrentTask;
  11. }
  12. void TNopSubtaskListener::SetDone() {
  13. }
  14. TNopSubtaskListener TNopSubtaskListener::Instance;
  15. TTaskRunnerBase::TTaskRunnerBase(IEnv* env, ISubtaskListener* parentTask, TAutoPtr<ITaskBase> impl)
  16. : TActor<TTaskRunnerBase>(env->GetExecutor())
  17. , Impl(impl)
  18. , ParentTask(parentTask)
  19. //, HoldsSelfReference(false)
  20. , Done(false)
  21. , SetDoneCalled(false)
  22. {
  23. }
  24. TTaskRunnerBase::~TTaskRunnerBase() {
  25. Y_ASSERT(Done);
  26. }
  27. namespace {
  28. struct TRunningInThisThreadGuard {
  29. TTaskRunnerBase* const Task;
  30. TRunningInThisThreadGuard(TTaskRunnerBase* task)
  31. : Task(task)
  32. {
  33. Y_ASSERT(!ThreadCurrentTask);
  34. ThreadCurrentTask = task;
  35. }
  36. ~TRunningInThisThreadGuard() {
  37. Y_ASSERT(ThreadCurrentTask == Task);
  38. ThreadCurrentTask = nullptr;
  39. }
  40. };
  41. }
  42. void NRainCheck::TTaskRunnerBase::Act(NActor::TDefaultTag) {
  43. Y_ASSERT(RefCount() > 0);
  44. TRunningInThisThreadGuard g(this);
  45. //RetainRef();
  46. for (;;) {
  47. TTempTlsVector<TSubtaskCompletion*> temp;
  48. temp.GetVector()->swap(Pending);
  49. for (auto& pending : *temp.GetVector()) {
  50. if (pending->IsComplete()) {
  51. pending->FireCompletionCallback(GetImplBase());
  52. } else {
  53. Pending.push_back(pending);
  54. }
  55. }
  56. if (!Pending.empty()) {
  57. return;
  58. }
  59. if (!Done) {
  60. Done = !ReplyReceived();
  61. } else {
  62. if (Pending.empty()) {
  63. if (!SetDoneCalled) {
  64. ParentTask->SetDone();
  65. SetDoneCalled = true;
  66. }
  67. //ReleaseRef();
  68. return;
  69. }
  70. }
  71. }
  72. }
  73. bool TTaskRunnerBase::IsRunningInThisThread() const {
  74. return ThreadCurrentTask == this;
  75. }
  76. TSubtaskCompletion::~TSubtaskCompletion() {
  77. ESubtaskState state = State.Get();
  78. Y_ASSERT(state == CREATED || state == DONE || state == CANCELED);
  79. }
  80. void TSubtaskCompletion::FireCompletionCallback(ITaskBase* task) {
  81. Y_ASSERT(IsComplete());
  82. if (!!CompletionFunc) {
  83. TSubtaskCompletionFunc temp = CompletionFunc;
  84. // completion func must be reset before calling it,
  85. // because function may set it back
  86. CompletionFunc = TSubtaskCompletionFunc();
  87. (task->*(temp.Func))(this);
  88. }
  89. }
  90. void NRainCheck::TSubtaskCompletion::Cancel() {
  91. for (;;) {
  92. ESubtaskState state = State.Get();
  93. if (state == CREATED && State.CompareAndSet(CREATED, CANCELED)) {
  94. return;
  95. }
  96. if (state == RUNNING && State.CompareAndSet(RUNNING, CANCEL_REQUESTED)) {
  97. return;
  98. }
  99. if (state == DONE && State.CompareAndSet(DONE, CANCELED)) {
  100. return;
  101. }
  102. if (state == CANCEL_REQUESTED || state == CANCELED) {
  103. return;
  104. }
  105. }
  106. }
  107. void TSubtaskCompletion::SetRunning(TTaskRunnerBase* parent) {
  108. Y_ASSERT(!TaskRunner);
  109. Y_ASSERT(!!parent);
  110. TaskRunner = parent;
  111. parent->Pending.push_back(this);
  112. parent->RefV();
  113. for (;;) {
  114. ESubtaskState current = State.Get();
  115. if (current != CREATED && current != DONE) {
  116. Y_ABORT("current state should be CREATED or DONE: %s", ToCString(current));
  117. }
  118. if (State.CompareAndSet(current, RUNNING)) {
  119. return;
  120. }
  121. }
  122. }
  123. void TSubtaskCompletion::SetDone() {
  124. Y_ASSERT(!!TaskRunner);
  125. TTaskRunnerBase* temp = TaskRunner;
  126. TaskRunner = nullptr;
  127. for (;;) {
  128. ESubtaskState state = State.Get();
  129. if (state == RUNNING) {
  130. if (State.CompareAndSet(RUNNING, DONE)) {
  131. break;
  132. }
  133. } else if (state == CANCEL_REQUESTED) {
  134. if (State.CompareAndSet(CANCEL_REQUESTED, CANCELED)) {
  135. break;
  136. }
  137. } else {
  138. Y_ABORT("cannot SetDone: unknown state: %s", ToCString(state));
  139. }
  140. }
  141. temp->ScheduleV();
  142. temp->UnRefV();
  143. }
  144. #if 0
  145. void NRainCheck::TTaskRunnerBase::RetainRef()
  146. {
  147. if (HoldsSelfReference) {
  148. return;
  149. }
  150. HoldsSelfReference = true;
  151. Ref();
  152. }
  153. void NRainCheck::TTaskRunnerBase::ReleaseRef()
  154. {
  155. if (!HoldsSelfReference) {
  156. return;
  157. }
  158. HoldsSelfReference = false;
  159. DecRef();
  160. }
  161. #endif
  162. void TTaskRunnerBase::AssertInThisThread() const {
  163. Y_ASSERT(IsRunningInThisThread());
  164. }
  165. TTaskRunnerBase* TTaskRunnerBase::CurrentTask() {
  166. Y_ABORT_UNLESS(!!ThreadCurrentTask);
  167. return ThreadCurrentTask;
  168. }
  169. ITaskBase* TTaskRunnerBase::CurrentTaskImpl() {
  170. return CurrentTask()->GetImplBase();
  171. }
  172. TString TTaskRunnerBase::GetStatusSingleLine() {
  173. return TypeName(*Impl);
  174. }
  175. bool NRainCheck::AreWeInsideTask() {
  176. return ThreadCurrentTask != nullptr;
  177. }