executor.cpp 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. #include "executor.h"
  2. #include "thread_extra.h"
  3. #include "what_thread_does.h"
  4. #include "what_thread_does_guard.h"
  5. #include <util/generic/utility.h>
  6. #include <util/random/random.h>
  7. #include <util/stream/str.h>
  8. #include <util/system/tls.h>
  9. #include <util/system/yassert.h>
  10. #include <array>
  11. using namespace NActor;
  12. using namespace NActor::NPrivate;
  13. namespace {
  14. struct THistoryInternal {
  15. struct TRecord {
  16. TAtomic MaxQueueSize;
  17. TRecord()
  18. : MaxQueueSize()
  19. {
  20. }
  21. TExecutorHistory::THistoryRecord Capture() {
  22. TExecutorHistory::THistoryRecord r;
  23. r.MaxQueueSize = AtomicGet(MaxQueueSize);
  24. return r;
  25. }
  26. };
  27. ui64 Start;
  28. ui64 LastTime;
  29. std::array<TRecord, 3600> Records;
  30. THistoryInternal() {
  31. Start = TInstant::Now().Seconds();
  32. LastTime = Start - 1;
  33. }
  34. TRecord& GetRecordForTime(ui64 time) {
  35. return Records[time % Records.size()];
  36. }
  37. TRecord& GetNowRecord(ui64 now) {
  38. for (ui64 t = LastTime + 1; t <= now; ++t) {
  39. GetRecordForTime(t) = TRecord();
  40. }
  41. LastTime = now;
  42. return GetRecordForTime(now);
  43. }
  44. TExecutorHistory Capture() {
  45. TExecutorHistory history;
  46. ui64 now = TInstant::Now().Seconds();
  47. ui64 lastHistoryRecord = now - 1;
  48. ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1);
  49. history.HistoryRecords.resize(historySize);
  50. for (ui32 i = 0; i < historySize; ++i) {
  51. history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture();
  52. }
  53. history.LastHistoryRecordSecond = lastHistoryRecord;
  54. return history;
  55. }
  56. };
  57. }
  58. Y_POD_STATIC_THREAD(TExecutor*)
  59. ThreadCurrentExecutor;
  60. static const char* NoLocation = "nowhere";
  61. struct TExecutorWorkerThreadLocalData {
  62. ui32 MaxQueueSize;
  63. };
  64. static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
  65. Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
  66. WorkerThreadLocalData;
  67. namespace NActor {
  68. struct TExecutorWorker {
  69. TExecutor* const Executor;
  70. TThread Thread;
  71. const char** WhatThreadDoesLocation;
  72. TExecutorWorkerThreadLocalData* ThreadLocalData;
  73. TExecutorWorker(TExecutor* executor)
  74. : Executor(executor)
  75. , Thread(RunThreadProc, this)
  76. , WhatThreadDoesLocation(&NoLocation)
  77. , ThreadLocalData(&::WorkerNoThreadLocalData)
  78. {
  79. Thread.Start();
  80. }
  81. void Run() {
  82. WhatThreadDoesLocation = ::WhatThreadDoesLocation();
  83. AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
  84. WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
  85. Executor->RunWorker();
  86. }
  87. static void* RunThreadProc(void* thiz0) {
  88. TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
  89. thiz->Run();
  90. return nullptr;
  91. }
  92. };
  93. struct TExecutor::TImpl {
  94. TExecutor* const Executor;
  95. THistoryInternal History;
  96. TSystemEvent HelperStopSignal;
  97. TThread HelperThread;
  98. TImpl(TExecutor* executor)
  99. : Executor(executor)
  100. , HelperThread(HelperThreadProc, this)
  101. {
  102. }
  103. void RunHelper() {
  104. ui64 nowSeconds = TInstant::Now().Seconds();
  105. for (;;) {
  106. TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
  107. if (HelperStopSignal.WaitD(nextStop)) {
  108. return;
  109. }
  110. nowSeconds = nextStop.Seconds();
  111. THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
  112. ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
  113. if (maxQueueSize > record.MaxQueueSize) {
  114. AtomicSet(record.MaxQueueSize, maxQueueSize);
  115. }
  116. }
  117. }
  118. static void* HelperThreadProc(void* impl0) {
  119. TImpl* impl = (TImpl*)impl0;
  120. impl->RunHelper();
  121. return nullptr;
  122. }
  123. };
  124. }
  125. static TExecutor::TConfig MakeConfig(unsigned workerCount) {
  126. TExecutor::TConfig config;
  127. config.WorkerCount = workerCount;
  128. return config;
  129. }
  130. TExecutor::TExecutor(size_t workerCount)
  131. : Config(MakeConfig(workerCount))
  132. {
  133. Init();
  134. }
  135. TExecutor::TExecutor(const TExecutor::TConfig& config)
  136. : Config(config)
  137. {
  138. Init();
  139. }
  140. void TExecutor::Init() {
  141. Impl.Reset(new TImpl(this));
  142. AtomicSet(ExitWorkers, 0);
  143. Y_ABORT_UNLESS(Config.WorkerCount > 0);
  144. for (size_t i = 0; i < Config.WorkerCount; i++) {
  145. WorkerThreads.push_back(new TExecutorWorker(this));
  146. }
  147. Impl->HelperThread.Start();
  148. }
  149. TExecutor::~TExecutor() {
  150. Stop();
  151. }
  152. void TExecutor::Stop() {
  153. AtomicSet(ExitWorkers, 1);
  154. Impl->HelperStopSignal.Signal();
  155. Impl->HelperThread.Join();
  156. {
  157. TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop");
  158. WorkAvailable.BroadCast();
  159. }
  160. for (size_t i = 0; i < WorkerThreads.size(); i++) {
  161. WorkerThreads[i]->Thread.Join();
  162. }
  163. // TODO: make queue empty at this point
  164. ProcessWorkQueueHere();
  165. }
  166. void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
  167. if (wis.empty())
  168. return;
  169. if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
  170. Y_ABORT_UNLESS(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name);
  171. }
  172. TWhatThreadDoesPushPop pp("executor: EnqueueWork");
  173. WorkItems.PushAll(wis);
  174. {
  175. if (wis.size() == 1) {
  176. TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
  177. WorkAvailable.Signal();
  178. } else {
  179. TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
  180. WorkAvailable.BroadCast();
  181. }
  182. }
  183. }
  184. size_t TExecutor::GetWorkQueueSize() const {
  185. return WorkItems.Size();
  186. }
  187. using namespace NTSAN;
  188. ui32 TExecutor::GetMaxQueueSizeAndClear() const {
  189. ui32 max = 0;
  190. for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
  191. TExecutorWorkerThreadLocalData* wtls = AtomicGet(WorkerThreads[i]->ThreadLocalData);
  192. max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
  193. RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
  194. }
  195. return max;
  196. }
  197. TString TExecutor::GetStatus() const {
  198. return GetStatusRecordInternal().Status;
  199. }
  200. TString TExecutor::GetStatusSingleLine() const {
  201. TStringStream ss;
  202. ss << "work items: " << GetWorkQueueSize();
  203. return ss.Str();
  204. }
  205. TExecutorStatus TExecutor::GetStatusRecordInternal() const {
  206. TExecutorStatus r;
  207. r.WorkQueueSize = GetWorkQueueSize();
  208. {
  209. TStringStream ss;
  210. ss << "work items: " << GetWorkQueueSize() << "\n";
  211. ss << "workers:\n";
  212. for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
  213. ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
  214. }
  215. r.Status = ss.Str();
  216. }
  217. r.History = Impl->History.Capture();
  218. return r;
  219. }
  220. bool TExecutor::IsInExecutorThread() const {
  221. return ThreadCurrentExecutor == this;
  222. }
  223. TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
  224. IWorkItem* wi = reinterpret_cast<IWorkItem*>(1);
  225. size_t queueSize = Max<size_t>();
  226. if (!WorkItems.TryPop(&wi, &queueSize)) {
  227. TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
  228. while (!WorkItems.TryPop(&wi, &queueSize)) {
  229. if (AtomicGet(ExitWorkers) != 0)
  230. return nullptr;
  231. TWhatThreadDoesPushPop pp("waiting for work on condvar");
  232. WorkAvailable.Wait(WorkMutex);
  233. }
  234. }
  235. auto& wtls = TlsRef(WorkerThreadLocalData);
  236. if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
  237. RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
  238. }
  239. return wi;
  240. }
  241. void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) {
  242. WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
  243. wi.Release()->DoWork();
  244. }
  245. void TExecutor::ProcessWorkQueueHere() {
  246. IWorkItem* wi;
  247. while (WorkItems.TryPop(&wi)) {
  248. RunWorkItem(wi);
  249. }
  250. }
  251. void TExecutor::RunWorker() {
  252. Y_ABORT_UNLESS(!ThreadCurrentExecutor, "state check");
  253. ThreadCurrentExecutor = this;
  254. SetCurrentThreadName("wrkr");
  255. for (;;) {
  256. TAutoPtr<IWorkItem> wi = DequeueWork();
  257. if (!wi) {
  258. break;
  259. }
  260. // Note for messagebus users: make sure program crashes
  261. // on uncaught exception in thread, otherewise messagebus may just hang on error.
  262. RunWorkItem(wi);
  263. }
  264. ThreadCurrentExecutor = (TExecutor*)nullptr;
  265. }