123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #include "executor.h"
- #include "thread_extra.h"
- #include "what_thread_does.h"
- #include "what_thread_does_guard.h"
- #include <util/generic/utility.h>
- #include <util/random/random.h>
- #include <util/stream/str.h>
- #include <util/system/tls.h>
- #include <util/system/yassert.h>
- #include <array>
- using namespace NActor;
- using namespace NActor::NPrivate;
- namespace {
- struct THistoryInternal {
- struct TRecord {
- TAtomic MaxQueueSize;
- TRecord()
- : MaxQueueSize()
- {
- }
- TExecutorHistory::THistoryRecord Capture() {
- TExecutorHistory::THistoryRecord r;
- r.MaxQueueSize = AtomicGet(MaxQueueSize);
- return r;
- }
- };
- ui64 Start;
- ui64 LastTime;
- std::array<TRecord, 3600> Records;
- THistoryInternal() {
- Start = TInstant::Now().Seconds();
- LastTime = Start - 1;
- }
- TRecord& GetRecordForTime(ui64 time) {
- return Records[time % Records.size()];
- }
- TRecord& GetNowRecord(ui64 now) {
- for (ui64 t = LastTime + 1; t <= now; ++t) {
- GetRecordForTime(t) = TRecord();
- }
- LastTime = now;
- return GetRecordForTime(now);
- }
- TExecutorHistory Capture() {
- TExecutorHistory history;
- ui64 now = TInstant::Now().Seconds();
- ui64 lastHistoryRecord = now - 1;
- ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1);
- history.HistoryRecords.resize(historySize);
- for (ui32 i = 0; i < historySize; ++i) {
- history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture();
- }
- history.LastHistoryRecordSecond = lastHistoryRecord;
- return history;
- }
- };
- }
- Y_POD_STATIC_THREAD(TExecutor*)
- ThreadCurrentExecutor;
- static const char* NoLocation = "nowhere";
- struct TExecutorWorkerThreadLocalData {
- ui32 MaxQueueSize;
- };
- static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
- Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
- WorkerThreadLocalData;
- namespace NActor {
- struct TExecutorWorker {
- TExecutor* const Executor;
- TThread Thread;
- const char** WhatThreadDoesLocation;
- TExecutorWorkerThreadLocalData* ThreadLocalData;
- TExecutorWorker(TExecutor* executor)
- : Executor(executor)
- , Thread(RunThreadProc, this)
- , WhatThreadDoesLocation(&NoLocation)
- , ThreadLocalData(&::WorkerNoThreadLocalData)
- {
- Thread.Start();
- }
- void Run() {
- WhatThreadDoesLocation = ::WhatThreadDoesLocation();
- AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
- Executor->RunWorker();
- }
- static void* RunThreadProc(void* thiz0) {
- TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
- thiz->Run();
- return nullptr;
- }
- };
- struct TExecutor::TImpl {
- TExecutor* const Executor;
- THistoryInternal History;
- TSystemEvent HelperStopSignal;
- TThread HelperThread;
- TImpl(TExecutor* executor)
- : Executor(executor)
- , HelperThread(HelperThreadProc, this)
- {
- }
- void RunHelper() {
- ui64 nowSeconds = TInstant::Now().Seconds();
- for (;;) {
- TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
- if (HelperStopSignal.WaitD(nextStop)) {
- return;
- }
- nowSeconds = nextStop.Seconds();
- THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
- ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
- if (maxQueueSize > record.MaxQueueSize) {
- AtomicSet(record.MaxQueueSize, maxQueueSize);
- }
- }
- }
- static void* HelperThreadProc(void* impl0) {
- TImpl* impl = (TImpl*)impl0;
- impl->RunHelper();
- return nullptr;
- }
- };
- }
- static TExecutor::TConfig MakeConfig(unsigned workerCount) {
- TExecutor::TConfig config;
- config.WorkerCount = workerCount;
- return config;
- }
- TExecutor::TExecutor(size_t workerCount)
- : Config(MakeConfig(workerCount))
- {
- Init();
- }
- TExecutor::TExecutor(const TExecutor::TConfig& config)
- : Config(config)
- {
- Init();
- }
- void TExecutor::Init() {
- Impl.Reset(new TImpl(this));
- AtomicSet(ExitWorkers, 0);
- Y_ABORT_UNLESS(Config.WorkerCount > 0);
- for (size_t i = 0; i < Config.WorkerCount; i++) {
- WorkerThreads.push_back(new TExecutorWorker(this));
- }
- Impl->HelperThread.Start();
- }
- TExecutor::~TExecutor() {
- Stop();
- }
- void TExecutor::Stop() {
- AtomicSet(ExitWorkers, 1);
- Impl->HelperStopSignal.Signal();
- Impl->HelperThread.Join();
- {
- TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop");
- WorkAvailable.BroadCast();
- }
- for (size_t i = 0; i < WorkerThreads.size(); i++) {
- WorkerThreads[i]->Thread.Join();
- }
- // TODO: make queue empty at this point
- ProcessWorkQueueHere();
- }
- void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
- if (wis.empty())
- return;
- if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
- Y_ABORT_UNLESS(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name);
- }
- TWhatThreadDoesPushPop pp("executor: EnqueueWork");
- WorkItems.PushAll(wis);
- {
- if (wis.size() == 1) {
- TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
- WorkAvailable.Signal();
- } else {
- TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
- WorkAvailable.BroadCast();
- }
- }
- }
- size_t TExecutor::GetWorkQueueSize() const {
- return WorkItems.Size();
- }
- using namespace NTSAN;
- ui32 TExecutor::GetMaxQueueSizeAndClear() const {
- ui32 max = 0;
- for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
- TExecutorWorkerThreadLocalData* wtls = AtomicGet(WorkerThreads[i]->ThreadLocalData);
- max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
- RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
- }
- return max;
- }
- TString TExecutor::GetStatus() const {
- return GetStatusRecordInternal().Status;
- }
- TString TExecutor::GetStatusSingleLine() const {
- TStringStream ss;
- ss << "work items: " << GetWorkQueueSize();
- return ss.Str();
- }
- TExecutorStatus TExecutor::GetStatusRecordInternal() const {
- TExecutorStatus r;
- r.WorkQueueSize = GetWorkQueueSize();
- {
- TStringStream ss;
- ss << "work items: " << GetWorkQueueSize() << "\n";
- ss << "workers:\n";
- for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
- ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
- }
- r.Status = ss.Str();
- }
- r.History = Impl->History.Capture();
- return r;
- }
- bool TExecutor::IsInExecutorThread() const {
- return ThreadCurrentExecutor == this;
- }
- TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
- IWorkItem* wi = reinterpret_cast<IWorkItem*>(1);
- size_t queueSize = Max<size_t>();
- if (!WorkItems.TryPop(&wi, &queueSize)) {
- TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
- while (!WorkItems.TryPop(&wi, &queueSize)) {
- if (AtomicGet(ExitWorkers) != 0)
- return nullptr;
- TWhatThreadDoesPushPop pp("waiting for work on condvar");
- WorkAvailable.Wait(WorkMutex);
- }
- }
- auto& wtls = TlsRef(WorkerThreadLocalData);
- if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
- RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
- }
- return wi;
- }
- void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) {
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
- wi.Release()->DoWork();
- }
- void TExecutor::ProcessWorkQueueHere() {
- IWorkItem* wi;
- while (WorkItems.TryPop(&wi)) {
- RunWorkItem(wi);
- }
- }
- void TExecutor::RunWorker() {
- Y_ABORT_UNLESS(!ThreadCurrentExecutor, "state check");
- ThreadCurrentExecutor = this;
- SetCurrentThreadName("wrkr");
- for (;;) {
- TAutoPtr<IWorkItem> wi = DequeueWork();
- if (!wi) {
- break;
- }
- // Note for messagebus users: make sure program crashes
- // on uncaught exception in thread, otherewise messagebus may just hang on error.
- RunWorkItem(wi);
- }
- ThreadCurrentExecutor = (TExecutor*)nullptr;
- }
|