#include "test_runtime.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include bool VERBOSE = false; const bool PRINT_EVENT_BODY = false; namespace { TString MakeClusterId() { pid_t pid = getpid(); TStringBuilder uuid; uuid << "Cluster for process with id: " << pid; return uuid; } } namespace NActors { ui64 TScheduledEventQueueItem::NextUniqueId = 0; void PrintEvent(TAutoPtr& ev, const TTestActorRuntimeBase* runtime) { Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite()) << ", from " << ev->Sender.LocalId(); TString name = runtime->GetActorName(ev->Sender); if (!name.empty()) Cerr << " \"" << name << "\""; Cerr << ", to " << ev->GetRecipientRewrite().LocalId(); name = runtime->GetActorName(ev->GetRecipientRewrite()); if (!name.empty()) Cerr << " \"" << name << "\""; Cerr << ", "; if (ev->HasEvent()) Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader()); else if (ev->HasBuffer()) Cerr << " : BUFFER"; else Cerr << " : EMPTY"; Cerr << "\n"; } TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() { ActorSystemTimestamp = nullptr; ActorSystemMonotonic = nullptr; } void TTestActorRuntimeBase::TNodeDataBase::Stop() { if (Poller) Poller->Stop(); if (MailboxTable) { for (ui32 round = 0; !MailboxTable->Cleanup(); ++round) Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub"); } if (ActorSystem) ActorSystem->Stop(); ActorSystem.Destroy(); Poller.Reset(); } TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() { Stop(); } class TTestActorRuntimeBase::TEdgeActor : public TActor { public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } TEdgeActor(TTestActorRuntimeBase* runtime) : TActor(&TEdgeActor::StateFunc) , Runtime(runtime) { } STFUNC(StateFunc) { Y_UNUSED(ctx); TGuard guard(Runtime->Mutex); bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { verbose = false; } if (verbose) { Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); } if (!Runtime->EventFilterFunc(*Runtime, ev)) { ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); Runtime->MailboxesHasEvents.Signal(); if (verbose) Cerr << "Event was added to sent queue\n"; } else { if (verbose) Cerr << "Event was dropped\n"; } } private: TTestActorRuntimeBase* Runtime; }; void TEventMailBox::Send(TAutoPtr ev) { IEventHandle* ptr = ev.Get(); Y_VERIFY(ptr); #ifdef DEBUG_ORDER_EVENTS ui64 counter = NextToSend++; TrackSent[ptr] = counter; #endif Sent.push_back(ev); } TAutoPtr TEventMailBox::Pop() { TAutoPtr result = Sent.front(); Sent.pop_front(); #ifdef DEBUG_ORDER_EVENTS auto it = TrackSent.find(result.Get()); if (it != TrackSent.end()) { Y_VERIFY(ExpectedReceive == it->second); TrackSent.erase(result.Get()); ++ExpectedReceive; } #endif return result; } bool TEventMailBox::IsEmpty() const { return Sent.empty(); } void TEventMailBox::Capture(TEventsList& evList) { evList.insert(evList.end(), Sent.begin(), Sent.end()); Sent.clear(); } void TEventMailBox::PushFront(TAutoPtr& ev) { Sent.push_front(ev); } void TEventMailBox::PushFront(TEventsList& evList) { for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) { if (*rit) { Sent.push_front(*rit); } } } void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) { for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) { evList.insert(*it); } Scheduled.clear(); } void TEventMailBox::PushScheduled(TScheduledEventsList& evList) { for (auto it = evList.begin(); it != evList.end(); ++it) { if (it->Event) { Scheduled.insert(*it); } } evList.clear(); } bool TEventMailBox::IsActive(const TInstant& currentTime) const { return currentTime >= InactiveUntil; } void TEventMailBox::Freeze(const TInstant& deadline) { if (deadline > InactiveUntil) InactiveUntil = deadline; } TInstant TEventMailBox::GetInactiveUntil() const { return InactiveUntil; } void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) { Scheduled.insert(item); } bool TEventMailBox::IsScheduledEmpty() const { return Scheduled.empty(); } TInstant TEventMailBox::GetFirstScheduleDeadline() const { return Scheduled.begin()->Deadline; } ui64 TEventMailBox::GetSentEventCount() const { return Sent.size(); } class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider { public: TTimeProvider(TTestActorRuntimeBase& runtime) : Runtime(runtime) { } TInstant Now() override { return Runtime.GetCurrentTime(); } private: TTestActorRuntimeBase& Runtime; }; class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread { public: TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node) : Runtime(runtime) , Node(node) { Y_UNUSED(Runtime); } void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override { Y_UNUSED(actorSystem); Node->ActorSystemTimestamp = currentTimestamp; Node->ActorSystemMonotonic = currentMonotonic; } void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override { Y_UNUSED(readers); Y_UNUSED(scheduleReadersCount); } void Start() override { } void PrepareStop() override { } void Stop() override { } private: TTestActorRuntimeBase* Runtime; TTestActorRuntimeBase::TNodeDataBase* Node; }; class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool { public: TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) : IExecutorPool(poolId) , Runtime(runtime) , NodeIndex(nodeIndex) , Node(node) { } TTestActorRuntimeBase* GetRuntime() { return Runtime; } // for threads ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override { Y_UNUSED(wctx); Y_UNUSED(revolvingCounter); Y_FAIL(); } void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override { Y_UNUSED(workerId); Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter); } void Schedule(TInstant deadline, TAutoPtr ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(deadline, ev, cookie, workerId); } void Schedule(TMonotonic deadline, TAutoPtr ev, ISchedulerCookie *cookie, TWorkerId workerId) override { DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId); } void Schedule(TDuration delay, TAutoPtr ev, ISchedulerCookie *cookie, TWorkerId workerId) override { TInstant deadline = Runtime->GetTimeProvider()->Now() + delay; DoSchedule(deadline, ev, cookie, workerId); } void DoSchedule(TInstant deadline, TAutoPtr ev, ISchedulerCookie *cookie, TWorkerId workerId) { Y_UNUSED(workerId); TGuard guard(Runtime->Mutex); bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { verbose = false; } if (verbose) { Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); } auto now = Runtime->GetTimeProvider()->Now(); if (deadline < now) { deadline = now; // avoid going backwards in time } TDuration delay = (deadline - now); if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) { ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie)); Runtime->MailboxesHasEvents.Signal(); if (verbose) Cerr << "Event was added to scheduled queue\n"; } else { if (cookie) { cookie->Detach(); } if (verbose) { Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n"; } } } // for actorsystem bool Send(TAutoPtr& ev) override { TGuard guard(Runtime->Mutex); bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE; if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) { verbose = false; } if (verbose) { Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", "; PrintEvent(ev, Runtime); } if (!Runtime->EventFilterFunc(*Runtime, ev)) { ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); TNodeDataBase* node = Runtime->Nodes[nodeId].Get(); if (!AllowSendFrom(node, ev)) { return true; } ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) { const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger"); TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId); if (ev->GetRecipientRewrite() == logger) { TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId()); if (recipientActor) { TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite()); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; recipientActor->Receive(ev, ctx); TlsActivationContext = prevTlsActivationContext; // we expect the logger to never die in tests } } } else { Runtime->GetMailbox(nodeId, mailboxHint).Send(ev); Runtime->MailboxesHasEvents.Signal(); } if (verbose) Cerr << "Event was added to sent queue\n"; } else { if (verbose) Cerr << "Event was dropped\n"; } return true; } void ScheduleActivation(ui32 activation) override { Y_UNUSED(activation); } void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override { Y_UNUSED(activation); Y_UNUSED(revolvingCounter); } TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) override { return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId); } TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override { return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId); } // lifecycle stuff void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override { Y_UNUSED(actorSystem); Y_UNUSED(scheduleReaders); Y_UNUSED(scheduleSz); } void Start() override { } void PrepareStop() override { } void Shutdown() override { } bool Cleanup() override { return true; } // generic TAffinity* Affinity() const override { Y_FAIL(); } private: TTestActorRuntimeBase* const Runtime; const ui32 NodeIndex; TTestActorRuntimeBase::TNodeDataBase* const Node; }; IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) { return new TExecutorPoolStub{runtime, nodeIndex, node, poolId}; } ui32 TTestActorRuntimeBase::NextNodeId = 1; TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv) : TTestActorRuntimeBase(1, 1, false) { SingleSysEnv = true; } TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) : ScheduledCount(0) , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) , ClusterUUID(MakeClusterId()) , FirstNodeId(NextNodeId) , NodeCount(nodeCount) , DataCenterCount(dataCenterCount) , UseRealThreads(useRealThreads) , LocalId(0) , DispatchCyclesCount(0) , DispatchedEventsCount(0) , NeedMonitoring(false) , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed)) , TimeProvider(new TTimeProvider(*this)) , ShouldContinue() , CurrentTimestamp(0) , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT) , ReschedulingDelay(TDuration::MicroSeconds(0)) , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc) , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector) , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc) , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc) , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver) , CurrentDispatchContext(nullptr) { SetDispatcherRandomSeed(TInstant::Now(), 0); EnableActorCallstack(); } void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) { const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger"); node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */, NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0); node->LogSettings->SetAllowDrop(false); node->LogSettings->SetThrottleDelay(TDuration::Zero()); node->DynamicCounters = new NMonitoring::TDynamicCounters; InitNodeImpl(node, nodeIndex); } void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) { node->LogSettings->Append( NActorsServices::EServiceCommon_MIN, NActorsServices::EServiceCommon_MAX, NActorsServices::EServiceCommon_Name ); if (!UseRealThreads) { node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); node->MailboxTable.Reset(new TMailboxTable()); node->ActorSystem = MakeActorSystem(nodeIndex, node); node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); } else { node->ActorSystem = MakeActorSystem(nodeIndex, node); } node->ActorSystem->Start(); } bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr& ev) { ui64 senderLocalId = ev->Sender.LocalId(); ui64 senderMailboxHint = ev->Sender.Hint(); TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint); if (senderMailbox) { IActor* senderActor = senderMailbox->FindActor(senderLocalId); TTestDecorator *decorator = dynamic_cast(senderActor); return !decorator || decorator->BeforeSending(ev); } return true; } TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount) : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) { } TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads) : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) { } TTestActorRuntimeBase::~TTestActorRuntimeBase() { CleanupNodes(); Cerr.Flush(); Cerr.Flush(); Clog.Flush(); DisableActorCallstack(); } void TTestActorRuntimeBase::CleanupNodes() { Nodes.clear(); } bool TTestActorRuntimeBase::IsRealThreads() const { return UseRealThreads; } TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr& event) { Y_UNUSED(runtime); Y_UNUSED(event); return EEventAction::PROCESS; } void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { Y_UNUSED(runtime); Y_UNUSED(queue); scheduledEvents.clear(); } bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr& event) { Y_UNUSED(runtime); Y_UNUSED(event); return false; } bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr& event, TDuration delay, TInstant& deadline) { Y_UNUSED(runtime); Y_UNUSED(delay); Y_UNUSED(event); Y_UNUSED(deadline); return true; } void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) { runtime.ScheduleWhiteList.insert(actorId); runtime.ScheduleWhiteListParent[actorId] = parentId; } } class TScheduledTreeItem { public: TString Name; ui64 Count; TVector Children; TScheduledTreeItem(const TString& name) : Name(name) , Count(0) {} TScheduledTreeItem* GetItem(const TString& name) { TScheduledTreeItem* item = nullptr; for (TScheduledTreeItem& i : Children) { if (i.Name == name) { item = &i; break; } } if (item != nullptr) return item; Children.emplace_back(name); return &Children.back(); } void RecursiveSort() { Sort(Children, [](const TScheduledTreeItem& a, const TScheduledTreeItem& b) -> bool { return a.Count > b.Count; }); for (TScheduledTreeItem& item : Children) { item.RecursiveSort(); } } void Print(IOutputStream& stream, const TString& prefix) { for (auto it = Children.begin(); it != Children.end(); ++it) { bool lastChild = (std::next(it) == Children.end()); TString connectionPrefix = lastChild ? "└─ " : "├─ "; TString subChildPrefix = lastChild ? " " : "│ "; stream << prefix << connectionPrefix << it->Name << " (" << it->Count << ")\n"; it->Print(stream, prefix + subChildPrefix); } } void Print(IOutputStream& stream) { stream << Name << " (" << Count << ")\n"; Print(stream, TString()); } }; void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) { if (scheduledEvents.empty()) return; TInstant time = scheduledEvents.begin()->Deadline; while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) { static THashMap, ui64> eventTypes; auto& item = *scheduledEvents.begin(); TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type); eventTypes[std::make_pair(item.Event->Recipient, name)]++; runtime.ScheduledCount++; if (runtime.ScheduledCount > runtime.ScheduledLimit) { // TScheduledTreeItem root("Root"); // TVector path; // for (const auto& pr : eventTypes) { // path.clear(); // path.push_back(runtime.GetActorName(pr.first.first)); // for (auto it = runtime.ScheduleWhiteListParent.find(pr.first.first); it != runtime.ScheduleWhiteListParent.end(); it = runtime.ScheduleWhiteListParent.find(it->second)) { // path.insert(path.begin(), runtime.GetActorName(it->second)); // } // path.push_back("<" + pr.first.second + ">"); // event name; // ui64 count = pr.second; // TScheduledTreeItem* item = &root; // item->Count += count; // for (TString name : path) { // item = item->GetItem(name); // item->Count += count; // } // } // root.RecursiveSort(); // root.Print(Cerr); ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit); } if (item.Cookie->Get()) { if (item.Cookie->Detach()) { queue.push_back(item.Event); } } else { queue.push_back(item.Event); } scheduledEvents.erase(scheduledEvents.begin()); } runtime.UpdateCurrentTime(time); } TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) { TGuard guard(Mutex); auto result = ObserverFunc; ObserverFunc = observerFunc; return result; } TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) { TGuard guard(Mutex); auto result = ScheduledEventsSelectorFunc; ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc; return result; } TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) { TGuard guard(Mutex); auto result = EventFilterFunc; EventFilterFunc = filterFunc; return result; } TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) { TGuard guard(Mutex); auto result = ScheduledEventFilterFunc; ScheduledEventFilterFunc = filterFunc; return result; } TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) { TGuard guard(Mutex); auto result = RegistrationObserver; RegistrationObserver = observerFunc; return result; } bool TTestActorRuntimeBase::IsVerbose() { return VERBOSE; } void TTestActorRuntimeBase::SetVerbose(bool verbose) { VERBOSE = verbose; } void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) { Y_VERIFY(!IsInitialized); Y_VERIFY(nodeIndex < NodeCount); auto node = Nodes[nodeIndex + FirstNodeId]; if (!node) { node = GetNodeFactory().CreateNode(); Nodes[nodeIndex + FirstNodeId] = node; } node->LocalServicesActors[actorId] = cmd.Actor; node->LocalServices.push_back(std::make_pair(actorId, cmd)); } void TTestActorRuntimeBase::InitNodes() { NextNodeId += NodeCount; Y_VERIFY(NodeCount > 0); for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first; TNodeDataBase* node = nodeIt->second.Get(); InitNode(node, nodeIndex); } } void TTestActorRuntimeBase::Initialize() { InitNodes(); IsInitialized = true; } void SetupCrossDC() { } TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) { TGuard guard(Mutex); TDuration oldTimeout = DispatchTimeout; DispatchTimeout = timeout; return oldTimeout; } TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) { TGuard guard(Mutex); TDuration oldDelay = ReschedulingDelay; ReschedulingDelay = delay; return oldDelay; } void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr logBackend) { Y_VERIFY(!IsInitialized); TGuard guard(Mutex); LogBackend = logBackend; } void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) { TGuard guard(Mutex); for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) { TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); TString explanation; auto status = node->LogSettings->SetLevel(priority, component, explanation); if (status) { Y_FAIL("SetLogPriority failed: %s", explanation.c_str()); } } } TInstant TTestActorRuntimeBase::GetCurrentTime() const { TGuard guard(Mutex); Y_VERIFY(!UseRealThreads); return TInstant::MicroSeconds(CurrentTimestamp); } void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) { static int counter = 0; ++counter; if (VERBOSE) { Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n"; } TGuard guard(Mutex); Y_VERIFY(!UseRealThreads); if (newTime.MicroSeconds() > CurrentTimestamp) { CurrentTimestamp = newTime.MicroSeconds(); for (auto& kv : Nodes) { AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp); AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp); } } } void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) { UpdateCurrentTime(GetCurrentTime() + duration); } TIntrusivePtr TTestActorRuntimeBase::GetTimeProvider() { Y_VERIFY(!UseRealThreads); return TimeProvider; } ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const { Y_VERIFY(index < NodeCount); return FirstNodeId + index; } ui32 TTestActorRuntimeBase::GetNodeCount() const { return NodeCount; } ui64 TTestActorRuntimeBase::AllocateLocalId() { TGuard guard(Mutex); ui64 nextId = ++LocalId; if (VERBOSE) { Cerr << "Allocated id: " << nextId << "\n"; } return nextId; } ui32 TTestActorRuntimeBase::InterconnectPoolId() const { if (UseRealThreads && NSan::TSanIsOn()) { // Interconnect coroutines may move across threads // Use a special single-threaded pool to avoid that return 4; } return 0; } TString TTestActorRuntimeBase::GetTempDir() { if (!TmpDir) TmpDir.Reset(new TTempDir()); return (*TmpDir)(); } TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId); } // first step - find good enough mailbox ui32 hint = 0; TMailboxHeader *mailbox = nullptr; { ui32 hintBackoff = 0; while (hint == 0) { hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter); mailbox = node->MailboxTable->Get(hint); if (!mailbox->LockFromFree()) { node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); hintBackoff = hint; hint = 0; } } node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter); } const ui64 localActorId = AllocateLocalId(); if (VERBOSE) { Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n"; } // ok, got mailbox mailbox->AttachActor(localActorId, actor); // do init const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); switch (mailboxType) { case TMailboxType::Simple: UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; case TMailboxType::Revolving: UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; case TMailboxType::HTSwap: UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; case TMailboxType::ReadAsFilled: UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; case TMailboxType::TinyReadAsFilled: UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter); break; default: Y_FAIL("Unsupported mailbox type"); } return actorId; } TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) { Y_VERIFY(nodeIndex < NodeCount); TGuard guard(Mutex); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (UseRealThreads) { Y_VERIFY(poolId < node->ExecutorPools.size()); return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId); } const ui64 localActorId = AllocateLocalId(); if (VERBOSE) { Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n"; } mailbox->AttachActor(localActorId, actor); const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint); ActorNames[actorId] = TypeName(*actor); RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId); DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient); return actorId; } TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) { TGuard guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); if (!UseRealThreads) { IActor* actor = FindActor(actorId, node); node->LocalServicesActors[serviceId] = actor; node->ActorToActorId[actor] = actorId; } return node->ActorSystem->RegisterLocalService(serviceId, actorId); } TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) { TGuard guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex); EdgeActors.insert(edgeActor); EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor; return edgeActor; } TEventsList TTestActorRuntimeBase::CaptureEvents() { TGuard guard(Mutex); TEventsList result; for (auto& mbox : Mailboxes) { mbox.second->Capture(result); } return result; } TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) { TGuard guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; GetMailbox(nodeId, hint).Capture(result); return result; } void TTestActorRuntimeBase::PushFront(TAutoPtr& ev) { TGuard guard(Mutex); ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); } void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) { TGuard guard(Mutex); for (auto rit = events.rbegin(); rit != events.rend(); ++rit) { if (*rit) { auto& ev = *rit; ui32 nodeId = ev->GetRecipientRewrite().NodeId(); Y_VERIFY(nodeId != 0); GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev); } } events.clear(); } void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) { TGuard guard(Mutex); Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount); TEventsList result; GetMailbox(nodeId, hint).PushFront(events); events.clear(); } TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() { TGuard guard(Mutex); TScheduledEventsList result; for (auto& mbox : Mailboxes) { mbox.second->CaptureScheduled(result); } return result; } bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) { return DispatchEvents(options, TInstant::Max()); } bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) { return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout); } bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) { TGuard guard(Mutex); return DispatchEventsInternal(options, simDeadline); } // Mutex must be locked by caller! bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) { TDispatchContext localContext; localContext.Options = &options; localContext.PrevContext = nullptr; bool verbose = !options.Quiet && VERBOSE; struct TDispatchContextSetter { TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext) : Runtime(runtime) { lastContext.PrevContext = Runtime.CurrentDispatchContext; Runtime.CurrentDispatchContext = &lastContext; } ~TDispatchContextSetter() { Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext; } TTestActorRuntimeBase& Runtime; } DispatchContextSetter(*this, localContext); TInstant dispatchTime = TInstant::MicroSeconds(0); TInstant deadline = dispatchTime + DispatchTimeout; const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10); TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; if (verbose) { Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n"; } struct TTempEdgeEventsCaptor { TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime) : Runtime(runtime) , HasEvents(false) { for (auto edgeActor : Runtime.EdgeActors) { TEventsList events; Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events); auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); auto storeIt = Store.find(mboxId); Y_VERIFY(storeIt == Store.end()); storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first; storeIt->second->PushFront(events); if (!events.empty()) HasEvents = true; } } ~TTempEdgeEventsCaptor() { for (auto edgeActor : Runtime.EdgeActors) { auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint()); auto storeIt = Store.find(mboxId); if (storeIt == Store.end()) { continue; } TEventsList events; storeIt->second->Capture(events); Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events); } } TTestActorRuntimeBase& Runtime; TEventMailBoxList Store; bool HasEvents; }; TEventMailBoxList restrictedMailboxes; const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty(); for (auto mailboxId : options.OnlyMailboxes) { auto it = Mailboxes.find(mailboxId); if (it == Mailboxes.end()) { it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first; } restrictedMailboxes.insert(std::make_pair(mailboxId, it->second)); } TAutoPtr tempEdgeEventsCaptor; if (!restrictedMailboxes) { tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this)); } TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes; while (!currentMailboxes.empty()) { bool hasProgress = true; while (hasProgress) { ++DispatchCyclesCount; hasProgress = false; ui64 eventsToDispatch = 0; for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) { if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { eventsToDispatch += mboxIt->second->GetSentEventCount(); } } ui32 eventsDispatched = 0; //TODO: count events before each cycle, break after dispatching that much events bool isEmpty = false; while (!isEmpty && eventsDispatched < eventsToDispatch) { ui64 mailboxCount = currentMailboxes.size(); ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull; auto startWithMboxIt = currentMailboxes.begin(); for (ui64 i = 0; i < startWith; ++i) { ++startWithMboxIt; } auto endWithMboxIt = startWithMboxIt; isEmpty = true; auto mboxIt = startWithMboxIt; TDeque suspectedBoxes; while (true) { auto& mbox = *mboxIt; bool isIgnored = true; if (!mbox.second->IsEmpty()) { HandleNonEmptyMailboxesForEachContext(mbox.first); if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { bool isEdgeMailbox = false; if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) { isEdgeMailbox = true; TEventsList events; mbox.second->Capture(events); for (auto& ev : events) { TInverseGuard inverseGuard(Mutex); ObserverFunc(*this, ev); } mbox.second->PushFront(events); } if (!isEdgeMailbox) { isEmpty = false; isIgnored = false; ++eventsDispatched; ++DispatchedEventsCount; if (DispatchedEventsCount > DispatchedEventsLimit) { ythrow TWithBackTrace() << "Dispatched " << DispatchedEventsLimit << " events, limit reached."; } auto ev = mbox.second->Pop(); if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) { //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10)); if (verbose) { Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(ev, this); } } hasProgress = true; EEventAction action; { TInverseGuard inverseGuard(Mutex); action = ObserverFunc(*this, ev); } switch (action) { case EEventAction::PROCESS: UpdateFinalEventsStatsForEachContext(*ev); SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false); break; case EEventAction::DROP: // do nothing break; case EEventAction::RESCHEDULE: { TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay; mbox.second->Freeze(deadline); mbox.second->PushFront(ev); break; } default: Y_FAIL("Unknown action"); } } } } Y_VERIFY(mboxIt != currentMailboxes.end()); if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes && mboxIt->second->IsEmpty() && mboxIt->second->IsScheduledEmpty() && mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { suspectedBoxes.push_back(mboxIt->first); } ++mboxIt; if (mboxIt == currentMailboxes.end()) { mboxIt = currentMailboxes.begin(); } Y_VERIFY(endWithMboxIt != currentMailboxes.end()); if (mboxIt == endWithMboxIt) { break; } } for (auto id : suspectedBoxes) { auto it = currentMailboxes.find(id); if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() && it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { currentMailboxes.erase(it); } } } } if (localContext.FinalEventFound) { return true; } if (!localContext.FoundNonEmptyMailboxes.empty()) return true; if (options.CustomFinalCondition && options.CustomFinalCondition()) return true; if (options.FinalEvents.empty()) { for (auto& mbox : currentMailboxes) { if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) continue; if (!mbox.second->IsEmpty()) { if (verbose) { Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; } return true; } } } if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) { return false; } if (dispatchTime >= deadline) { if (verbose) { Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; } ythrow TWithBackTrace(); } if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) { inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval; bool isEmpty = true; TMaybe nearestMailboxDeadline; TVector> nextScheduleMboxes; TMaybe nextScheduleDeadline; for (auto& mbox : currentMailboxes) { if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) { nearestMailboxDeadline = mbox.second->GetInactiveUntil(); } continue; } if (mbox.second->IsScheduledEmpty()) continue; auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline(); if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) { nextScheduleMboxes.clear(); nextScheduleMboxes.emplace_back(mbox.second); nextScheduleDeadline = firstScheduleDeadline; } else if (firstScheduleDeadline == *nextScheduleDeadline) { nextScheduleMboxes.emplace_back(mbox.second); } } for (const auto& nextScheduleMbox : nextScheduleMboxes) { TEventsList selectedEvents; TScheduledEventsList capturedScheduledEvents; nextScheduleMbox->CaptureScheduled(capturedScheduledEvents); ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents); nextScheduleMbox->PushScheduled(capturedScheduledEvents); for (auto& event : selectedEvents) { if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) { Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", "; PrintEvent(event, this); } nextScheduleMbox->Send(event); isEmpty = false; } } if (!isEmpty) { if (verbose) { Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n"; } deadline = dispatchTime + DispatchTimeout; continue; } if (nearestMailboxDeadline.Defined()) { if (verbose) { Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n"; } UpdateCurrentTime(*nearestMailboxDeadline.Get()); continue; } } TDuration waitDelay = TDuration::MilliSeconds(10); dispatchTime += waitDelay; MailboxesHasEvents.WaitT(Mutex, waitDelay); } return false; } void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) { TDispatchContext* context = CurrentDispatchContext; while (context) { const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes; if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) { context->FoundNonEmptyMailboxes.insert(mboxId); } context = context->PrevContext; } } void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) { TDispatchContext* context = CurrentDispatchContext; while (context) { for (const auto& finalEvent : context->Options->FinalEvents) { if (finalEvent.EventCheck(ev)) { auto& freq = context->FinalEventFrequency[&finalEvent]; if (++freq >= finalEvent.RequiredCount) { context->FinalEventFound = true; } } } context = context->PrevContext; } } void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) { TGuard guard(Mutex); Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32, senderNodeIndex, NodeCount); SendInternal(ev, senderNodeIndex, viaActorSystem); } void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) { TGuard guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration; GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr)); if (VERBOSE) Cerr << "Event was added to scheduled queue\n"; } void TTestActorRuntimeBase::ClearCounters() { TGuard guard(Mutex); EvCounters.clear(); } ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const { TGuard guard(Mutex); auto it = EvCounters.find(evType); if (it == EvCounters.end()) return 0; return it->second; } TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) { TGuard guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get(); return node->ActorSystem->LookupLocalService(serviceId); } void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet& edgeFilter, TDuration simTimeout) { TGuard guard(Mutex); ui32 dispatchCount = 0; if (!edgeFilter.empty()) { for (auto edgeActor : edgeFilter) { Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data()); } } const TSet& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter; TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout; for (;;) { for (auto edgeActor : edgeActors) { TEventsList events; auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint()); bool foundEvent = false; mbox.Capture(events); for (auto& ev : events) { if (filter(*this, ev)) { foundEvent = true; break; } } mbox.PushFront(events); if (foundEvent) return; } ++dispatchCount; { if (!DispatchEventsInternal(TDispatchOptions(), deadline)) { return; // Timed out; event was not found } } Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop"); } } TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) { TGuard guard(Mutex); Y_VERIFY(nodeIndexFrom < NodeCount); Y_VERIFY(nodeIndexTo < NodeCount); Y_VERIFY(nodeIndexFrom != nodeIndexTo); TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get(); return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo); } void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) { TGuard guard(Mutex); BlockedOutput.insert(actorId); } void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) { ui64 days = (time.Hours() / 24); DispatcherRandomSeed = (days << 32) ^ iteration; DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed); } IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const { TGuard guard(Mutex); if (nodeIndex == Max()) { Y_VERIFY(actorId.NodeId()); nodeIndex = actorId.NodeId() - FirstNodeId; } Y_VERIFY(nodeIndex < NodeCount); auto nodeIt = Nodes.find(FirstNodeId + nodeIndex); Y_VERIFY(nodeIt != Nodes.end()); TNodeDataBase* node = nodeIt->second.Get(); return FindActor(actorId, node); } void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { TGuard guard(Mutex); if (allow) { if (VERBOSE) { Cerr << "Actor " << actorId << " added to schedule whitelist"; } ScheduleWhiteList.insert(actorId); } else { if (VERBOSE) { Cerr << "Actor " << actorId << " removed from schedule whitelist"; } ScheduleWhiteList.erase(actorId); } } bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const { TGuard guard(Mutex); return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end(); } TIntrusivePtr TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) { TGuard guard(Mutex); Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; TNodeDataBase* node = Nodes[nodeId].Get(); return node->DynamicCounters; } void TTestActorRuntimeBase::SetupMonitoring() { NeedMonitoring = true; } void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) { Y_VERIFY(nodeIndex < NodeCount); ui32 nodeId = FirstNodeId + nodeIndex; TNodeDataBase* node = Nodes[nodeId].Get(); ui32 targetNode = ev->GetRecipientRewrite().NodeId(); ui32 targetNodeIndex; if (targetNode == 0) { targetNodeIndex = nodeIndex; } else { targetNodeIndex = targetNode - FirstNodeId; Y_VERIFY(targetNodeIndex < NodeCount); } if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) { node->ActorSystem->Send(ev); return; } Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex)); TAutoPtr evHolder(ev); if (!AllowSendFrom(node, evHolder)) { return; } ui32 mailboxHint = ev->GetRecipientRewrite().Hint(); TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint); if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) { mbox.PushFront(evHolder); return; } ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId(); if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) { Cerr << "Send event, "; PrintEvent(evHolder, this); } EvCounters[ev->GetTypeRewrite()]++; TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* recipientActor = mailbox->FindActor(recipientLocalId); if (recipientActor) { // Save actorId by value in order to prevent ctx from being invalidated during another Send call. TActorId actorId = ev->GetRecipientRewrite(); node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite(); TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId); TActivationContext *prevTlsActivationContext = TlsActivationContext; TlsActivationContext = &ctx; CurrentRecipient = actorId; { TInverseGuard inverseGuard(Mutex); #ifdef USE_ACTOR_CALLSTACK TCallstack::GetTlsCallstack() = ev->Callstack; TCallstack::GetTlsCallstack().SetLinesToSkip(); #endif recipientActor->Receive(evHolder, ctx); node->ExecutorThread->DropUnregistered(); } CurrentRecipient = TActorId(); TlsActivationContext = prevTlsActivationContext; } else { if (VERBOSE) { Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n"; } auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown); if (!!forwardedEv) { node->ActorSystem->Send(forwardedEv); } } } IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const { ui32 mailboxHint = actorId.Hint(); ui64 localId = actorId.LocalId(); TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint); IActor* actor = mailbox->FindActor(localId); return actor; } THolder TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) { THolder setup(new TActorSystemSetup); setup->NodeId = FirstNodeId + nodeIndex; if (UseRealThreads) { setup->ExecutorsCount = 5; setup->Executors.Reset(new TAutoPtr[5]); setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20)); setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20)); setup->Executors[2].Reset(new TIOExecutorPool(2, 1)); setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20)); setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20)); setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100))); } else { setup->ExecutorsCount = 1; setup->Scheduler.Reset(new TSchedulerThreadStub(this, node)); setup->Executors.Reset(new TAutoPtr[1]); setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } InitActorSystemSetup(*setup); return setup; } THolder TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) { auto setup = MakeActorSystemSetup(nodeIndex, node); node->ExecutorPools.resize(setup->ExecutorsCount); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { node->ExecutorPools[i] = setup->Executors[i].Get(); } const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect"); setup->LocalServices = node->LocalServices; setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount); const TActorId nameserviceId = GetNameserviceActorId(); TIntrusivePtr common; common.Reset(new TInterconnectProxyCommon); common->NameserviceId = nameserviceId; common->MonCounters = interconnectCounters; common->TechnicalSelfHostName = "::1"; if (!UseRealThreads) { common->Settings.DeadPeer = TDuration::Max(); common->Settings.CloseOnIdle = TDuration::Max(); common->Settings.PingPeriod = TDuration::Max(); common->Settings.ForceConfirmPeriod = TDuration::Max(); common->Settings.Handshake = TDuration::Max(); } common->ClusterUUID = ClusterUUID; common->AcceptUUID = {ClusterUUID}; for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) { if (proxyNodeIndex == nodeIndex) continue; const ui32 peerNodeId = FirstNodeId + proxyNodeIndex; IActor *proxyActor = UseRealInterconnect ? new TInterconnectProxyTCP(peerNodeId, common) : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common); setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()}; } setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock); if (UseRealInterconnect) { setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(), NActors::TMailboxType::Simple, InterconnectPoolId())); } if (!SingleSysEnv) { // Single system env should do this self TAutoPtr logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend(); NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings, logBackend, GetCountersForComponent(node->DynamicCounters, "utils")); NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId()); std::pair loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd); setup->LocalServices.push_back(loggerActorPair); } return THolder(new TActorSystem(setup, node->GetAppData(), node->LogSettings)); } TActorSystem* TTestActorRuntimeBase::SingleSys() const { Y_VERIFY(Nodes.size() == 1, "Works only for single system env"); return Nodes.begin()->second->ActorSystem.Get(); } TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() { for (auto& x : Nodes) { return x.second->ActorSystem.Get(); } Y_FAIL("Don't use this method."); } TActorSystem* TTestActorRuntimeBase::GetActorSystem(ui32 nodeId) { auto it = Nodes.find(GetNodeId(nodeId)); Y_VERIFY(it != Nodes.end()); return it->second->ActorSystem.Get(); } TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) { TGuard guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); auto it = Mailboxes.find(mboxId); if (it == Mailboxes.end()) { it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first; } return *it->second; } void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) { TGuard guard(Mutex); auto mboxId = TEventMailboxId(nodeId, hint); Mailboxes.erase(mboxId); } TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const { auto it = ActorNames.find(actorId); if (it != ActorNames.end()) return it->second; return actorId.ToString(); } struct TStrandingActorDecoratorContext : public TThrRefBase { TStrandingActorDecoratorContext() : Queue(new TQueueType) { } typedef TOneOneQueueInplace TQueueType; TAutoPtr Queue; }; class TStrandingActorDecorator : public TActorBootstrapped { public: class TReplyActor : public TActor { public: static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } TReplyActor(TStrandingActorDecorator* owner) : TActor(&TReplyActor::StateFunc) , Owner(owner) { } STFUNC(StateFunc); private: TStrandingActorDecorator* const Owner; }; static constexpr EActivityType ActorActivityType() { return TEST_ACTOR_RUNTIME; } TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector& additionalActors, TSimpleSharedPtr context, TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) : Delegatee(delegatee) , IsSync(isSync) , AdditionalActors(additionalActors) , Context(context) , HasReply(false) , Runtime(runtime) , ReplyChecker(createReplyChecker()) { if (IsSync) { Y_VERIFY(!runtime->IsRealThreads()); } } void Bootstrap(const TActorContext& ctx) { Become(&TStrandingActorDecorator::StateFunc); ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this)); DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint())); for (const auto& actor : AdditionalActors) { DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint())); } DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint())); DelegateeOptions.Quiet = true; } STFUNC(StateFunc) { bool wasEmpty = !Context->Queue->Head(); Context->Queue->Push(ev.Release()); if (wasEmpty) { SendHead(ctx); } } STFUNC(Reply) { Y_VERIFY(!HasReply); IEventHandle *requestEv = Context->Queue->Head(); TActorId originalSender = requestEv->Sender; HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get()); if (HasReply) { delete Context->Queue->Pop(); } ctx.ExecutorThread.Send(ev->Forward(originalSender)); if (!IsSync && Context->Queue->Head()) { SendHead(ctx); } } private: void SendHead(const TActorContext& ctx) { if (!IsSync) { ctx.ExecutorThread.Send(GetForwardedEvent().Release()); } else { while (Context->Queue->Head()) { HasReply = false; ctx.ExecutorThread.Send(GetForwardedEvent().Release()); int count = 100; while (!HasReply && count > 0) { try { Runtime->DispatchEvents(DelegateeOptions); } catch (TEmptyEventQueueException&) { count--; Cerr << "No reply" << Endl; } } Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000)); } } } TAutoPtr GetForwardedEvent() { IEventHandle* ev = Context->Queue->Head(); ReplyChecker->OnRequest(ev); TAutoPtr forwardedEv = ev->HasEvent() ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie) : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie); return forwardedEv; } private: const TActorId Delegatee; const bool IsSync; const TVector AdditionalActors; TSimpleSharedPtr Context; TActorId ReplyId; bool HasReply; TDispatchOptions DelegateeOptions; TTestActorRuntimeBase* Runtime; THolder ReplyChecker; }; void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) { Owner->Reply(ev, ctx); } class TStrandingDecoratorFactory : public IStrandingDecoratorFactory { public: TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) : Context(new TStrandingActorDecoratorContext()) , Runtime(runtime) , CreateReplyChecker(createReplyChecker) { } IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector& additionalActors) override { return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime, CreateReplyChecker); } private: TSimpleSharedPtr Context; TTestActorRuntimeBase* Runtime; TReplyCheckerCreator CreateReplyChecker; }; TAutoPtr CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime, TReplyCheckerCreator createReplyChecker) { return TAutoPtr(new TStrandingDecoratorFactory(runtime, createReplyChecker)); } ui64 DefaultRandomSeed = 9999; }