123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902 |
- #include "test_runtime.h"
- #include <library/cpp/actors/core/actor_bootstrapped.h>
- #include <library/cpp/actors/core/callstack.h>
- #include <library/cpp/actors/core/executor_pool_basic.h>
- #include <library/cpp/actors/core/executor_pool_io.h>
- #include <library/cpp/actors/core/log.h>
- #include <library/cpp/actors/core/scheduler_basic.h>
- #include <library/cpp/actors/util/datetime.h>
- #include <library/cpp/actors/protos/services_common.pb.h>
- #include <library/cpp/random_provider/random_provider.h>
- #include <library/cpp/actors/interconnect/interconnect.h>
- #include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
- #include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>
- #include <util/generic/maybe.h>
- #include <util/generic/bt_exception.h>
- #include <util/random/mersenne.h>
- #include <util/string/printf.h>
- #include <typeinfo>
- bool VERBOSE = false;
- const bool PRINT_EVENT_BODY = false;
- namespace {
- TString MakeClusterId() {
- pid_t pid = getpid();
- TStringBuilder uuid;
- uuid << "Cluster for process with id: " << pid;
- return uuid;
- }
- }
- namespace NActors {
- ui64 TScheduledEventQueueItem::NextUniqueId = 0;
- void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
- Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
- << ", from " << ev->Sender.LocalId();
- TString name = runtime->GetActorName(ev->Sender);
- if (!name.empty())
- Cerr << " \"" << name << "\"";
- Cerr << ", to " << ev->GetRecipientRewrite().LocalId();
- name = runtime->GetActorName(ev->GetRecipientRewrite());
- if (!name.empty())
- Cerr << " \"" << name << "\"";
- Cerr << ", ";
- if (ev->HasEvent())
- Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader());
- else if (ev->HasBuffer())
- Cerr << " : BUFFER";
- else
- Cerr << " : EMPTY";
- Cerr << "\n";
- }
- TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
- ActorSystemTimestamp = nullptr;
- ActorSystemMonotonic = nullptr;
- }
- void TTestActorRuntimeBase::TNodeDataBase::Stop() {
- if (Poller)
- Poller->Stop();
- if (MailboxTable) {
- for (ui32 round = 0; !MailboxTable->Cleanup(); ++round)
- Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
- }
- if (ActorSystem)
- ActorSystem->Stop();
- ActorSystem.Destroy();
- Poller.Reset();
- }
- TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
- Stop();
- }
- class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
- public:
- static constexpr EActivityType ActorActivityType() {
- return TEST_ACTOR_RUNTIME;
- }
- TEdgeActor(TTestActorRuntimeBase* runtime)
- : TActor(&TEdgeActor::StateFunc)
- , Runtime(runtime)
- {
- }
- STFUNC(StateFunc) {
- Y_UNUSED(ctx);
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
- if (verbose) {
- Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
- PrintEvent(ev, Runtime);
- }
- if (!Runtime->EventFilterFunc(*Runtime, ev)) {
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
- Y_VERIFY(nodeId != 0);
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
- Runtime->MailboxesHasEvents.Signal();
- if (verbose)
- Cerr << "Event was added to sent queue\n";
- }
- else {
- if (verbose)
- Cerr << "Event was dropped\n";
- }
- }
- private:
- TTestActorRuntimeBase* Runtime;
- };
- void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
- IEventHandle* ptr = ev.Get();
- Y_VERIFY(ptr);
- #ifdef DEBUG_ORDER_EVENTS
- ui64 counter = NextToSend++;
- TrackSent[ptr] = counter;
- #endif
- Sent.push_back(ev);
- }
- TAutoPtr<IEventHandle> TEventMailBox::Pop() {
- TAutoPtr<IEventHandle> result = Sent.front();
- Sent.pop_front();
- #ifdef DEBUG_ORDER_EVENTS
- auto it = TrackSent.find(result.Get());
- if (it != TrackSent.end()) {
- Y_VERIFY(ExpectedReceive == it->second);
- TrackSent.erase(result.Get());
- ++ExpectedReceive;
- }
- #endif
- return result;
- }
- bool TEventMailBox::IsEmpty() const {
- return Sent.empty();
- }
- void TEventMailBox::Capture(TEventsList& evList) {
- evList.insert(evList.end(), Sent.begin(), Sent.end());
- Sent.clear();
- }
- void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
- Sent.push_front(ev);
- }
- void TEventMailBox::PushFront(TEventsList& evList) {
- for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
- if (*rit) {
- Sent.push_front(*rit);
- }
- }
- }
- void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
- for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
- evList.insert(*it);
- }
- Scheduled.clear();
- }
- void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
- for (auto it = evList.begin(); it != evList.end(); ++it) {
- if (it->Event) {
- Scheduled.insert(*it);
- }
- }
- evList.clear();
- }
- bool TEventMailBox::IsActive(const TInstant& currentTime) const {
- return currentTime >= InactiveUntil;
- }
- void TEventMailBox::Freeze(const TInstant& deadline) {
- if (deadline > InactiveUntil)
- InactiveUntil = deadline;
- }
- TInstant TEventMailBox::GetInactiveUntil() const {
- return InactiveUntil;
- }
- void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
- Scheduled.insert(item);
- }
- bool TEventMailBox::IsScheduledEmpty() const {
- return Scheduled.empty();
- }
- TInstant TEventMailBox::GetFirstScheduleDeadline() const {
- return Scheduled.begin()->Deadline;
- }
- ui64 TEventMailBox::GetSentEventCount() const {
- return Sent.size();
- }
- class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
- public:
- TTimeProvider(TTestActorRuntimeBase& runtime)
- : Runtime(runtime)
- {
- }
- TInstant Now() override {
- return Runtime.GetCurrentTime();
- }
- private:
- TTestActorRuntimeBase& Runtime;
- };
- class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
- public:
- TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
- : Runtime(runtime)
- , Node(node)
- {
- Y_UNUSED(Runtime);
- }
- void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
- Y_UNUSED(actorSystem);
- Node->ActorSystemTimestamp = currentTimestamp;
- Node->ActorSystemMonotonic = currentMonotonic;
- }
- void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
- Y_UNUSED(readers);
- Y_UNUSED(scheduleReadersCount);
- }
- void Start() override {
- }
- void PrepareStop() override {
- }
- void Stop() override {
- }
- private:
- TTestActorRuntimeBase* Runtime;
- TTestActorRuntimeBase::TNodeDataBase* Node;
- };
- class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
- public:
- TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
- : IExecutorPool(poolId)
- , Runtime(runtime)
- , NodeIndex(nodeIndex)
- , Node(node)
- {
- }
- TTestActorRuntimeBase* GetRuntime() {
- return Runtime;
- }
- // for threads
- ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
- Y_UNUSED(wctx);
- Y_UNUSED(revolvingCounter);
- Y_FAIL();
- }
- void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
- Y_UNUSED(workerId);
- Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
- }
- void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
- DoSchedule(deadline, ev, cookie, workerId);
- }
- void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
- DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId);
- }
- void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
- TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
- DoSchedule(deadline, ev, cookie, workerId);
- }
- void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
- Y_UNUSED(workerId);
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
- if (verbose) {
- Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
- PrintEvent(ev, Runtime);
- }
- auto now = Runtime->GetTimeProvider()->Now();
- if (deadline < now) {
- deadline = now; // avoid going backwards in time
- }
- TDuration delay = (deadline - now);
- if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
- Runtime->MailboxesHasEvents.Signal();
- if (verbose)
- Cerr << "Event was added to scheduled queue\n";
- } else {
- if (cookie) {
- cookie->Detach();
- }
- if (verbose) {
- Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
- }
- }
- }
- // for actorsystem
- bool Send(TAutoPtr<IEventHandle>& ev) override {
- TGuard<TMutex> guard(Runtime->Mutex);
- bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
- if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
- verbose = false;
- }
- if (verbose) {
- Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
- PrintEvent(ev, Runtime);
- }
- if (!Runtime->EventFilterFunc(*Runtime, ev)) {
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
- Y_VERIFY(nodeId != 0);
- TNodeDataBase* node = Runtime->Nodes[nodeId].Get();
- if (!AllowSendFrom(node, ev)) {
- return true;
- }
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
- const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
- TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
- if (ev->GetRecipientRewrite() == logger) {
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
- if (recipientActor) {
- TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
- TActivationContext *prevTlsActivationContext = TlsActivationContext;
- TlsActivationContext = &ctx;
- recipientActor->Receive(ev, ctx);
- TlsActivationContext = prevTlsActivationContext;
- // we expect the logger to never die in tests
- }
- }
- } else {
- Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
- Runtime->MailboxesHasEvents.Signal();
- }
- if (verbose)
- Cerr << "Event was added to sent queue\n";
- } else {
- if (verbose)
- Cerr << "Event was dropped\n";
- }
- return true;
- }
- void ScheduleActivation(ui32 activation) override {
- Y_UNUSED(activation);
- }
- void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
- Y_UNUSED(activation);
- Y_UNUSED(revolvingCounter);
- }
- TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
- const TActorId& parentId) override {
- return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
- }
- TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
- return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
- }
- // lifecycle stuff
- void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
- Y_UNUSED(actorSystem);
- Y_UNUSED(scheduleReaders);
- Y_UNUSED(scheduleSz);
- }
- void Start() override {
- }
- void PrepareStop() override {
- }
- void Shutdown() override {
- }
- bool Cleanup() override {
- return true;
- }
- // generic
- TAffinity* Affinity() const override {
- Y_FAIL();
- }
- private:
- TTestActorRuntimeBase* const Runtime;
- const ui32 NodeIndex;
- TTestActorRuntimeBase::TNodeDataBase* const Node;
- };
- IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
- return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
- }
- ui32 TTestActorRuntimeBase::NextNodeId = 1;
- TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
- : TTestActorRuntimeBase(1, 1, false)
- {
- SingleSysEnv = true;
- }
- TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
- : ScheduledCount(0)
- , ScheduledLimit(100000)
- , MainThreadId(TThread::CurrentThreadId())
- , ClusterUUID(MakeClusterId())
- , FirstNodeId(NextNodeId)
- , NodeCount(nodeCount)
- , DataCenterCount(dataCenterCount)
- , UseRealThreads(useRealThreads)
- , LocalId(0)
- , DispatchCyclesCount(0)
- , DispatchedEventsCount(0)
- , NeedMonitoring(false)
- , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
- , TimeProvider(new TTimeProvider(*this))
- , ShouldContinue()
- , CurrentTimestamp(0)
- , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
- , ReschedulingDelay(TDuration::MicroSeconds(0))
- , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
- , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
- , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
- , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
- , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
- , CurrentDispatchContext(nullptr)
- {
- SetDispatcherRandomSeed(TInstant::Now(), 0);
- EnableActorCallstack();
- }
- void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
- const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
- node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
- NActors::NLog::PRI_WARN, NActors::NLog::PRI_WARN, 0);
- node->LogSettings->SetAllowDrop(false);
- node->LogSettings->SetThrottleDelay(TDuration::Zero());
- node->DynamicCounters = new NMonitoring::TDynamicCounters;
- InitNodeImpl(node, nodeIndex);
- }
- void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
- node->LogSettings->Append(
- NActorsServices::EServiceCommon_MIN,
- NActorsServices::EServiceCommon_MAX,
- NActorsServices::EServiceCommon_Name
- );
- if (!UseRealThreads) {
- node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
- node->MailboxTable.Reset(new TMailboxTable());
- node->ActorSystem = MakeActorSystem(nodeIndex, node);
- node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
- } else {
- node->ActorSystem = MakeActorSystem(nodeIndex, node);
- }
- node->ActorSystem->Start();
- }
- bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
- ui64 senderLocalId = ev->Sender.LocalId();
- ui64 senderMailboxHint = ev->Sender.Hint();
- TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
- if (senderMailbox) {
- IActor* senderActor = senderMailbox->FindActor(senderLocalId);
- TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
- return !decorator || decorator->BeforeSending(ev);
- }
- return true;
- }
- TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
- : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
- }
- TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
- : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
- }
- TTestActorRuntimeBase::~TTestActorRuntimeBase() {
- CleanupNodes();
- Cerr.Flush();
- Cerr.Flush();
- Clog.Flush();
- DisableActorCallstack();
- }
- void TTestActorRuntimeBase::CleanupNodes() {
- Nodes.clear();
- }
- bool TTestActorRuntimeBase::IsRealThreads() const {
- return UseRealThreads;
- }
- TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
- Y_UNUSED(runtime);
- Y_UNUSED(event);
- return EEventAction::PROCESS;
- }
- void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
- Y_UNUSED(runtime);
- Y_UNUSED(queue);
- scheduledEvents.clear();
- }
- bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
- Y_UNUSED(runtime);
- Y_UNUSED(event);
- return false;
- }
- bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
- Y_UNUSED(runtime);
- Y_UNUSED(delay);
- Y_UNUSED(event);
- Y_UNUSED(deadline);
- return true;
- }
- void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
- if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
- runtime.ScheduleWhiteList.insert(actorId);
- runtime.ScheduleWhiteListParent[actorId] = parentId;
- }
- }
- class TScheduledTreeItem {
- public:
- TString Name;
- ui64 Count;
- TVector<TScheduledTreeItem> Children;
- TScheduledTreeItem(const TString& name)
- : Name(name)
- , Count(0)
- {}
- TScheduledTreeItem* GetItem(const TString& name) {
- TScheduledTreeItem* item = nullptr;
- for (TScheduledTreeItem& i : Children) {
- if (i.Name == name) {
- item = &i;
- break;
- }
- }
- if (item != nullptr)
- return item;
- Children.emplace_back(name);
- return &Children.back();
- }
- void RecursiveSort() {
- Sort(Children, [](const TScheduledTreeItem& a, const TScheduledTreeItem& b) -> bool { return a.Count > b.Count; });
- for (TScheduledTreeItem& item : Children) {
- item.RecursiveSort();
- }
- }
- void Print(IOutputStream& stream, const TString& prefix) {
- for (auto it = Children.begin(); it != Children.end(); ++it) {
- bool lastChild = (std::next(it) == Children.end());
- TString connectionPrefix = lastChild ? "└─ " : "├─ ";
- TString subChildPrefix = lastChild ? " " : "│ ";
- stream << prefix << connectionPrefix << it->Name << " (" << it->Count << ")\n";
- it->Print(stream, prefix + subChildPrefix);
- }
- }
- void Print(IOutputStream& stream) {
- stream << Name << " (" << Count << ")\n";
- Print(stream, TString());
- }
- };
- void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
- if (scheduledEvents.empty())
- return;
- TInstant time = scheduledEvents.begin()->Deadline;
- while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
- static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
- auto& item = *scheduledEvents.begin();
- TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
- eventTypes[std::make_pair(item.Event->Recipient, name)]++;
- runtime.ScheduledCount++;
- if (runtime.ScheduledCount > runtime.ScheduledLimit) {
- // TScheduledTreeItem root("Root");
- // TVector<TString> path;
- // for (const auto& pr : eventTypes) {
- // path.clear();
- // path.push_back(runtime.GetActorName(pr.first.first));
- // for (auto it = runtime.ScheduleWhiteListParent.find(pr.first.first); it != runtime.ScheduleWhiteListParent.end(); it = runtime.ScheduleWhiteListParent.find(it->second)) {
- // path.insert(path.begin(), runtime.GetActorName(it->second));
- // }
- // path.push_back("<" + pr.first.second + ">"); // event name;
- // ui64 count = pr.second;
- // TScheduledTreeItem* item = &root;
- // item->Count += count;
- // for (TString name : path) {
- // item = item->GetItem(name);
- // item->Count += count;
- // }
- // }
- // root.RecursiveSort();
- // root.Print(Cerr);
- ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
- }
- if (item.Cookie->Get()) {
- if (item.Cookie->Detach()) {
- queue.push_back(item.Event);
- }
- } else {
- queue.push_back(item.Event);
- }
- scheduledEvents.erase(scheduledEvents.begin());
- }
- runtime.UpdateCurrentTime(time);
- }
- TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
- TGuard<TMutex> guard(Mutex);
- auto result = ObserverFunc;
- ObserverFunc = observerFunc;
- return result;
- }
- TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
- TGuard<TMutex> guard(Mutex);
- auto result = ScheduledEventsSelectorFunc;
- ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
- return result;
- }
- TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
- TGuard<TMutex> guard(Mutex);
- auto result = EventFilterFunc;
- EventFilterFunc = filterFunc;
- return result;
- }
- TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
- TGuard<TMutex> guard(Mutex);
- auto result = ScheduledEventFilterFunc;
- ScheduledEventFilterFunc = filterFunc;
- return result;
- }
- TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
- TGuard<TMutex> guard(Mutex);
- auto result = RegistrationObserver;
- RegistrationObserver = observerFunc;
- return result;
- }
- bool TTestActorRuntimeBase::IsVerbose() {
- return VERBOSE;
- }
- void TTestActorRuntimeBase::SetVerbose(bool verbose) {
- VERBOSE = verbose;
- }
- void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
- Y_VERIFY(!IsInitialized);
- Y_VERIFY(nodeIndex < NodeCount);
- auto node = Nodes[nodeIndex + FirstNodeId];
- if (!node) {
- node = GetNodeFactory().CreateNode();
- Nodes[nodeIndex + FirstNodeId] = node;
- }
- node->LocalServicesActors[actorId] = cmd.Actor;
- node->LocalServices.push_back(std::make_pair(actorId, cmd));
- }
- void TTestActorRuntimeBase::InitNodes() {
- NextNodeId += NodeCount;
- Y_VERIFY(NodeCount > 0);
- for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
- auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
- TNodeDataBase* node = nodeIt->second.Get();
- InitNode(node, nodeIndex);
- }
- }
- void TTestActorRuntimeBase::Initialize() {
- InitNodes();
- IsInitialized = true;
- }
- void SetupCrossDC() {
- }
- TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
- TGuard<TMutex> guard(Mutex);
- TDuration oldTimeout = DispatchTimeout;
- DispatchTimeout = timeout;
- return oldTimeout;
- }
- TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
- TGuard<TMutex> guard(Mutex);
- TDuration oldDelay = ReschedulingDelay;
- ReschedulingDelay = delay;
- return oldDelay;
- }
- void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
- Y_VERIFY(!IsInitialized);
- TGuard<TMutex> guard(Mutex);
- LogBackend = logBackend;
- }
- void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
- TGuard<TMutex> guard(Mutex);
- for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- TString explanation;
- auto status = node->LogSettings->SetLevel(priority, component, explanation);
- if (status) {
- Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
- }
- }
- }
- TInstant TTestActorRuntimeBase::GetCurrentTime() const {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(!UseRealThreads);
- return TInstant::MicroSeconds(CurrentTimestamp);
- }
- void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
- static int counter = 0;
- ++counter;
- if (VERBOSE) {
- Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
- }
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(!UseRealThreads);
- if (newTime.MicroSeconds() > CurrentTimestamp) {
- CurrentTimestamp = newTime.MicroSeconds();
- for (auto& kv : Nodes) {
- AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
- AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
- }
- }
- }
- void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
- UpdateCurrentTime(GetCurrentTime() + duration);
- }
- TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
- Y_VERIFY(!UseRealThreads);
- return TimeProvider;
- }
- ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
- Y_VERIFY(index < NodeCount);
- return FirstNodeId + index;
- }
- ui32 TTestActorRuntimeBase::GetNodeCount() const {
- return NodeCount;
- }
- ui64 TTestActorRuntimeBase::AllocateLocalId() {
- TGuard<TMutex> guard(Mutex);
- ui64 nextId = ++LocalId;
- if (VERBOSE) {
- Cerr << "Allocated id: " << nextId << "\n";
- }
- return nextId;
- }
- ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
- if (UseRealThreads && NSan::TSanIsOn()) {
- // Interconnect coroutines may move across threads
- // Use a special single-threaded pool to avoid that
- return 4;
- }
- return 0;
- }
- TString TTestActorRuntimeBase::GetTempDir() {
- if (!TmpDir)
- TmpDir.Reset(new TTempDir());
- return (*TmpDir)();
- }
- TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
- ui64 revolvingCounter, const TActorId& parentId) {
- Y_VERIFY(nodeIndex < NodeCount);
- TGuard<TMutex> guard(Mutex);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (UseRealThreads) {
- Y_VERIFY(poolId < node->ExecutorPools.size());
- return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
- }
- // first step - find good enough mailbox
- ui32 hint = 0;
- TMailboxHeader *mailbox = nullptr;
- {
- ui32 hintBackoff = 0;
- while (hint == 0) {
- hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
- mailbox = node->MailboxTable->Get(hint);
- if (!mailbox->LockFromFree()) {
- node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
- hintBackoff = hint;
- hint = 0;
- }
- }
- node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
- }
- const ui64 localActorId = AllocateLocalId();
- if (VERBOSE) {
- Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
- }
- // ok, got mailbox
- mailbox->AttachActor(localActorId, actor);
- // do init
- const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
- ActorNames[actorId] = TypeName(*actor);
- RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
- DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
- switch (mailboxType) {
- case TMailboxType::Simple:
- UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::Revolving:
- UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::HTSwap:
- UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::ReadAsFilled:
- UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- case TMailboxType::TinyReadAsFilled:
- UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
- break;
- default:
- Y_FAIL("Unsupported mailbox type");
- }
- return actorId;
- }
- TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
- const TActorId& parentId) {
- Y_VERIFY(nodeIndex < NodeCount);
- TGuard<TMutex> guard(Mutex);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (UseRealThreads) {
- Y_VERIFY(poolId < node->ExecutorPools.size());
- return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
- }
- const ui64 localActorId = AllocateLocalId();
- if (VERBOSE) {
- Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
- }
- mailbox->AttachActor(localActorId, actor);
- const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
- ActorNames[actorId] = TypeName(*actor);
- RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
- DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);
- return actorId;
- }
- TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeIndex < NodeCount);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- if (!UseRealThreads) {
- IActor* actor = FindActor(actorId, node);
- node->LocalServicesActors[serviceId] = actor;
- node->ActorToActorId[actor] = actorId;
- }
- return node->ActorSystem->RegisterLocalService(serviceId, actorId);
- }
- TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeIndex < NodeCount);
- TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
- EdgeActors.insert(edgeActor);
- EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
- return edgeActor;
- }
- TEventsList TTestActorRuntimeBase::CaptureEvents() {
- TGuard<TMutex> guard(Mutex);
- TEventsList result;
- for (auto& mbox : Mailboxes) {
- mbox.second->Capture(result);
- }
- return result;
- }
- TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
- TEventsList result;
- GetMailbox(nodeId, hint).Capture(result);
- return result;
- }
- void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
- TGuard<TMutex> guard(Mutex);
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
- Y_VERIFY(nodeId != 0);
- GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
- }
- void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
- TGuard<TMutex> guard(Mutex);
- for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
- if (*rit) {
- auto& ev = *rit;
- ui32 nodeId = ev->GetRecipientRewrite().NodeId();
- Y_VERIFY(nodeId != 0);
- GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
- }
- }
- events.clear();
- }
- void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
- TEventsList result;
- GetMailbox(nodeId, hint).PushFront(events);
- events.clear();
- }
- TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
- TGuard<TMutex> guard(Mutex);
- TScheduledEventsList result;
- for (auto& mbox : Mailboxes) {
- mbox.second->CaptureScheduled(result);
- }
- return result;
- }
- bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
- return DispatchEvents(options, TInstant::Max());
- }
- bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
- return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout);
- }
- bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
- TGuard<TMutex> guard(Mutex);
- return DispatchEventsInternal(options, simDeadline);
- }
- // Mutex must be locked by caller!
- bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
- TDispatchContext localContext;
- localContext.Options = &options;
- localContext.PrevContext = nullptr;
- bool verbose = !options.Quiet && VERBOSE;
- struct TDispatchContextSetter {
- TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
- : Runtime(runtime)
- {
- lastContext.PrevContext = Runtime.CurrentDispatchContext;
- Runtime.CurrentDispatchContext = &lastContext;
- }
- ~TDispatchContextSetter() {
- Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
- }
- TTestActorRuntimeBase& Runtime;
- } DispatchContextSetter(*this, localContext);
- TInstant dispatchTime = TInstant::MicroSeconds(0);
- TInstant deadline = dispatchTime + DispatchTimeout;
- const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
- TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
- if (verbose) {
- Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
- }
- struct TTempEdgeEventsCaptor {
- TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
- : Runtime(runtime)
- , HasEvents(false)
- {
- for (auto edgeActor : Runtime.EdgeActors) {
- TEventsList events;
- Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
- auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
- auto storeIt = Store.find(mboxId);
- Y_VERIFY(storeIt == Store.end());
- storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
- storeIt->second->PushFront(events);
- if (!events.empty())
- HasEvents = true;
- }
- }
- ~TTempEdgeEventsCaptor() {
- for (auto edgeActor : Runtime.EdgeActors) {
- auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
- auto storeIt = Store.find(mboxId);
- if (storeIt == Store.end()) {
- continue;
- }
- TEventsList events;
- storeIt->second->Capture(events);
- Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
- }
- }
- TTestActorRuntimeBase& Runtime;
- TEventMailBoxList Store;
- bool HasEvents;
- };
- TEventMailBoxList restrictedMailboxes;
- const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
- for (auto mailboxId : options.OnlyMailboxes) {
- auto it = Mailboxes.find(mailboxId);
- if (it == Mailboxes.end()) {
- it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
- }
- restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
- }
- TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
- if (!restrictedMailboxes) {
- tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
- }
- TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
- while (!currentMailboxes.empty()) {
- bool hasProgress = true;
- while (hasProgress) {
- ++DispatchCyclesCount;
- hasProgress = false;
- ui64 eventsToDispatch = 0;
- for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
- if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- eventsToDispatch += mboxIt->second->GetSentEventCount();
- }
- }
- ui32 eventsDispatched = 0;
- //TODO: count events before each cycle, break after dispatching that much events
- bool isEmpty = false;
- while (!isEmpty && eventsDispatched < eventsToDispatch) {
- ui64 mailboxCount = currentMailboxes.size();
- ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
- auto startWithMboxIt = currentMailboxes.begin();
- for (ui64 i = 0; i < startWith; ++i) {
- ++startWithMboxIt;
- }
- auto endWithMboxIt = startWithMboxIt;
- isEmpty = true;
- auto mboxIt = startWithMboxIt;
- TDeque<TEventMailboxId> suspectedBoxes;
- while (true) {
- auto& mbox = *mboxIt;
- bool isIgnored = true;
- if (!mbox.second->IsEmpty()) {
- HandleNonEmptyMailboxesForEachContext(mbox.first);
- if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- bool isEdgeMailbox = false;
- if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
- isEdgeMailbox = true;
- TEventsList events;
- mbox.second->Capture(events);
- for (auto& ev : events) {
- TInverseGuard<TMutex> inverseGuard(Mutex);
- ObserverFunc(*this, ev);
- }
- mbox.second->PushFront(events);
- }
- if (!isEdgeMailbox) {
- isEmpty = false;
- isIgnored = false;
- ++eventsDispatched;
- ++DispatchedEventsCount;
- if (DispatchedEventsCount > DispatchedEventsLimit) {
- ythrow TWithBackTrace<yexception>() << "Dispatched "
- << DispatchedEventsLimit << " events, limit reached.";
- }
- auto ev = mbox.second->Pop();
- if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
- //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
- if (verbose) {
- Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
- PrintEvent(ev, this);
- }
- }
- hasProgress = true;
- EEventAction action;
- {
- TInverseGuard<TMutex> inverseGuard(Mutex);
- action = ObserverFunc(*this, ev);
- }
- switch (action) {
- case EEventAction::PROCESS:
- UpdateFinalEventsStatsForEachContext(*ev);
- SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
- break;
- case EEventAction::DROP:
- // do nothing
- break;
- case EEventAction::RESCHEDULE: {
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
- mbox.second->Freeze(deadline);
- mbox.second->PushFront(ev);
- break;
- }
- default:
- Y_FAIL("Unknown action");
- }
- }
- }
- }
- Y_VERIFY(mboxIt != currentMailboxes.end());
- if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
- mboxIt->second->IsEmpty() &&
- mboxIt->second->IsScheduledEmpty() &&
- mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- suspectedBoxes.push_back(mboxIt->first);
- }
- ++mboxIt;
- if (mboxIt == currentMailboxes.end()) {
- mboxIt = currentMailboxes.begin();
- }
- Y_VERIFY(endWithMboxIt != currentMailboxes.end());
- if (mboxIt == endWithMboxIt) {
- break;
- }
- }
- for (auto id : suspectedBoxes) {
- auto it = currentMailboxes.find(id);
- if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
- it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- currentMailboxes.erase(it);
- }
- }
- }
- }
- if (localContext.FinalEventFound) {
- return true;
- }
- if (!localContext.FoundNonEmptyMailboxes.empty())
- return true;
- if (options.CustomFinalCondition && options.CustomFinalCondition())
- return true;
- if (options.FinalEvents.empty()) {
- for (auto& mbox : currentMailboxes) {
- if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
- continue;
- if (!mbox.second->IsEmpty()) {
- if (verbose) {
- Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
- return true;
- }
- }
- }
- if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
- return false;
- }
- if (dispatchTime >= deadline) {
- if (verbose) {
- Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
- ythrow TWithBackTrace<TEmptyEventQueueException>();
- }
- if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
- inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
- bool isEmpty = true;
- TMaybe<TInstant> nearestMailboxDeadline;
- TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
- TMaybe<TInstant> nextScheduleDeadline;
- for (auto& mbox : currentMailboxes) {
- if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
- nearestMailboxDeadline = mbox.second->GetInactiveUntil();
- }
- continue;
- }
- if (mbox.second->IsScheduledEmpty())
- continue;
- auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
- if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
- nextScheduleMboxes.clear();
- nextScheduleMboxes.emplace_back(mbox.second);
- nextScheduleDeadline = firstScheduleDeadline;
- } else if (firstScheduleDeadline == *nextScheduleDeadline) {
- nextScheduleMboxes.emplace_back(mbox.second);
- }
- }
- for (const auto& nextScheduleMbox : nextScheduleMboxes) {
- TEventsList selectedEvents;
- TScheduledEventsList capturedScheduledEvents;
- nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
- ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
- nextScheduleMbox->PushScheduled(capturedScheduledEvents);
- for (auto& event : selectedEvents) {
- if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
- Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
- PrintEvent(event, this);
- }
- nextScheduleMbox->Send(event);
- isEmpty = false;
- }
- }
- if (!isEmpty) {
- if (verbose) {
- Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
- }
- deadline = dispatchTime + DispatchTimeout;
- continue;
- }
- if (nearestMailboxDeadline.Defined()) {
- if (verbose) {
- Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
- }
- UpdateCurrentTime(*nearestMailboxDeadline.Get());
- continue;
- }
- }
- TDuration waitDelay = TDuration::MilliSeconds(10);
- dispatchTime += waitDelay;
- MailboxesHasEvents.WaitT(Mutex, waitDelay);
- }
- return false;
- }
- void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
- TDispatchContext* context = CurrentDispatchContext;
- while (context) {
- const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
- if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
- context->FoundNonEmptyMailboxes.insert(mboxId);
- }
- context = context->PrevContext;
- }
- }
- void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
- TDispatchContext* context = CurrentDispatchContext;
- while (context) {
- for (const auto& finalEvent : context->Options->FinalEvents) {
- if (finalEvent.EventCheck(ev)) {
- auto& freq = context->FinalEventFrequency[&finalEvent];
- if (++freq >= finalEvent.RequiredCount) {
- context->FinalEventFound = true;
- }
- }
- }
- context = context->PrevContext;
- }
- }
- void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
- senderNodeIndex, NodeCount);
- SendInternal(ev, senderNodeIndex, viaActorSystem);
- }
- void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
- GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
- if (VERBOSE)
- Cerr << "Event was added to scheduled queue\n";
- }
- void TTestActorRuntimeBase::ClearCounters() {
- TGuard<TMutex> guard(Mutex);
- EvCounters.clear();
- }
- ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
- TGuard<TMutex> guard(Mutex);
- auto it = EvCounters.find(evType);
- if (it == EvCounters.end())
- return 0;
- return it->second;
- }
- TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeIndex < NodeCount);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
- return node->ActorSystem->LookupLocalService(serviceId);
- }
- void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
- TGuard<TMutex> guard(Mutex);
- ui32 dispatchCount = 0;
- if (!edgeFilter.empty()) {
- for (auto edgeActor : edgeFilter) {
- Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
- }
- }
- const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
- TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
- for (;;) {
- for (auto edgeActor : edgeActors) {
- TEventsList events;
- auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
- bool foundEvent = false;
- mbox.Capture(events);
- for (auto& ev : events) {
- if (filter(*this, ev)) {
- foundEvent = true;
- break;
- }
- }
- mbox.PushFront(events);
- if (foundEvent)
- return;
- }
- ++dispatchCount;
- {
- if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
- return; // Timed out; event was not found
- }
- }
- Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
- }
- }
- TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeIndexFrom < NodeCount);
- Y_VERIFY(nodeIndexTo < NodeCount);
- Y_VERIFY(nodeIndexFrom != nodeIndexTo);
- TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
- return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
- }
- void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
- TGuard<TMutex> guard(Mutex);
- BlockedOutput.insert(actorId);
- }
- void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
- ui64 days = (time.Hours() / 24);
- DispatcherRandomSeed = (days << 32) ^ iteration;
- DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
- }
- IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
- TGuard<TMutex> guard(Mutex);
- if (nodeIndex == Max<ui32>()) {
- Y_VERIFY(actorId.NodeId());
- nodeIndex = actorId.NodeId() - FirstNodeId;
- }
- Y_VERIFY(nodeIndex < NodeCount);
- auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
- Y_VERIFY(nodeIt != Nodes.end());
- TNodeDataBase* node = nodeIt->second.Get();
- return FindActor(actorId, node);
- }
- void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
- TGuard<TMutex> guard(Mutex);
- if (allow) {
- if (VERBOSE) {
- Cerr << "Actor " << actorId << " added to schedule whitelist";
- }
- ScheduleWhiteList.insert(actorId);
- } else {
- if (VERBOSE) {
- Cerr << "Actor " << actorId << " removed from schedule whitelist";
- }
- ScheduleWhiteList.erase(actorId);
- }
- }
- bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
- TGuard<TMutex> guard(Mutex);
- return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
- }
- TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
- TGuard<TMutex> guard(Mutex);
- Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
- TNodeDataBase* node = Nodes[nodeId].Get();
- return node->DynamicCounters;
- }
- void TTestActorRuntimeBase::SetupMonitoring() {
- NeedMonitoring = true;
- }
- void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
- Y_VERIFY(nodeIndex < NodeCount);
- ui32 nodeId = FirstNodeId + nodeIndex;
- TNodeDataBase* node = Nodes[nodeId].Get();
- ui32 targetNode = ev->GetRecipientRewrite().NodeId();
- ui32 targetNodeIndex;
- if (targetNode == 0) {
- targetNodeIndex = nodeIndex;
- } else {
- targetNodeIndex = targetNode - FirstNodeId;
- Y_VERIFY(targetNodeIndex < NodeCount);
- }
- if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
- node->ActorSystem->Send(ev);
- return;
- }
- Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
- TAutoPtr<IEventHandle> evHolder(ev);
- if (!AllowSendFrom(node, evHolder)) {
- return;
- }
- ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
- TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
- if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
- mbox.PushFront(evHolder);
- return;
- }
- ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
- if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
- Cerr << "Send event, ";
- PrintEvent(evHolder, this);
- }
- EvCounters[ev->GetTypeRewrite()]++;
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* recipientActor = mailbox->FindActor(recipientLocalId);
- if (recipientActor) {
- // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
- TActorId actorId = ev->GetRecipientRewrite();
- node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
- TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
- TActivationContext *prevTlsActivationContext = TlsActivationContext;
- TlsActivationContext = &ctx;
- CurrentRecipient = actorId;
- {
- TInverseGuard<TMutex> inverseGuard(Mutex);
- #ifdef USE_ACTOR_CALLSTACK
- TCallstack::GetTlsCallstack() = ev->Callstack;
- TCallstack::GetTlsCallstack().SetLinesToSkip();
- #endif
- recipientActor->Receive(evHolder, ctx);
- node->ExecutorThread->DropUnregistered();
- }
- CurrentRecipient = TActorId();
- TlsActivationContext = prevTlsActivationContext;
- } else {
- if (VERBOSE) {
- Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
- }
- auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
- if (!!forwardedEv) {
- node->ActorSystem->Send(forwardedEv);
- }
- }
- }
- IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
- ui32 mailboxHint = actorId.Hint();
- ui64 localId = actorId.LocalId();
- TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
- IActor* actor = mailbox->FindActor(localId);
- return actor;
- }
- THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
- THolder<TActorSystemSetup> setup(new TActorSystemSetup);
- setup->NodeId = FirstNodeId + nodeIndex;
- if (UseRealThreads) {
- setup->ExecutorsCount = 5;
- setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
- setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
- setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
- setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
- setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
- setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
- setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
- } else {
- setup->ExecutorsCount = 1;
- setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
- setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
- setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
- }
- InitActorSystemSetup(*setup);
- return setup;
- }
- THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
- auto setup = MakeActorSystemSetup(nodeIndex, node);
- node->ExecutorPools.resize(setup->ExecutorsCount);
- for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- node->ExecutorPools[i] = setup->Executors[i].Get();
- }
- const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");
- setup->LocalServices = node->LocalServices;
- setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
- const TActorId nameserviceId = GetNameserviceActorId();
- TIntrusivePtr<TInterconnectProxyCommon> common;
- common.Reset(new TInterconnectProxyCommon);
- common->NameserviceId = nameserviceId;
- common->MonCounters = interconnectCounters;
- common->TechnicalSelfHostName = "::1";
- if (!UseRealThreads) {
- common->Settings.DeadPeer = TDuration::Max();
- common->Settings.CloseOnIdle = TDuration::Max();
- common->Settings.PingPeriod = TDuration::Max();
- common->Settings.ForceConfirmPeriod = TDuration::Max();
- common->Settings.Handshake = TDuration::Max();
- }
- common->ClusterUUID = ClusterUUID;
- common->AcceptUUID = {ClusterUUID};
- for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
- if (proxyNodeIndex == nodeIndex)
- continue;
- const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;
- IActor *proxyActor = UseRealInterconnect
- ? new TInterconnectProxyTCP(peerNodeId, common)
- : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);
- setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
- }
- setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
- if (UseRealInterconnect) {
- setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
- NActors::TMailboxType::Simple, InterconnectPoolId()));
- }
- if (!SingleSysEnv) { // Single system env should do this self
- TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
- NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
- logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
- NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
- std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
- setup->LocalServices.push_back(loggerActorPair);
- }
- return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
- }
- TActorSystem* TTestActorRuntimeBase::SingleSys() const {
- Y_VERIFY(Nodes.size() == 1, "Works only for single system env");
- return Nodes.begin()->second->ActorSystem.Get();
- }
- TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
- for (auto& x : Nodes) {
- return x.second->ActorSystem.Get();
- }
- Y_FAIL("Don't use this method.");
- }
- TActorSystem* TTestActorRuntimeBase::GetActorSystem(ui32 nodeId) {
- auto it = Nodes.find(GetNodeId(nodeId));
- Y_VERIFY(it != Nodes.end());
- return it->second->ActorSystem.Get();
- }
- TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
- TGuard<TMutex> guard(Mutex);
- auto mboxId = TEventMailboxId(nodeId, hint);
- auto it = Mailboxes.find(mboxId);
- if (it == Mailboxes.end()) {
- it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
- }
- return *it->second;
- }
- void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
- TGuard<TMutex> guard(Mutex);
- auto mboxId = TEventMailboxId(nodeId, hint);
- Mailboxes.erase(mboxId);
- }
- TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
- auto it = ActorNames.find(actorId);
- if (it != ActorNames.end())
- return it->second;
- return actorId.ToString();
- }
- struct TStrandingActorDecoratorContext : public TThrRefBase {
- TStrandingActorDecoratorContext()
- : Queue(new TQueueType)
- {
- }
- typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
- TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
- };
- class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
- public:
- class TReplyActor : public TActor<TReplyActor> {
- public:
- static constexpr EActivityType ActorActivityType() {
- return TEST_ACTOR_RUNTIME;
- }
- TReplyActor(TStrandingActorDecorator* owner)
- : TActor(&TReplyActor::StateFunc)
- , Owner(owner)
- {
- }
- STFUNC(StateFunc);
- private:
- TStrandingActorDecorator* const Owner;
- };
- static constexpr EActivityType ActorActivityType() {
- return TEST_ACTOR_RUNTIME;
- }
- TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
- TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker)
- : Delegatee(delegatee)
- , IsSync(isSync)
- , AdditionalActors(additionalActors)
- , Context(context)
- , HasReply(false)
- , Runtime(runtime)
- , ReplyChecker(createReplyChecker())
- {
- if (IsSync) {
- Y_VERIFY(!runtime->IsRealThreads());
- }
- }
- void Bootstrap(const TActorContext& ctx) {
- Become(&TStrandingActorDecorator::StateFunc);
- ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
- for (const auto& actor : AdditionalActors) {
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
- }
- DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
- DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
- DelegateeOptions.Quiet = true;
- }
- STFUNC(StateFunc) {
- bool wasEmpty = !Context->Queue->Head();
- Context->Queue->Push(ev.Release());
- if (wasEmpty) {
- SendHead(ctx);
- }
- }
- STFUNC(Reply) {
- Y_VERIFY(!HasReply);
- IEventHandle *requestEv = Context->Queue->Head();
- TActorId originalSender = requestEv->Sender;
- HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
- if (HasReply) {
- delete Context->Queue->Pop();
- }
- ctx.ExecutorThread.Send(ev->Forward(originalSender));
- if (!IsSync && Context->Queue->Head()) {
- SendHead(ctx);
- }
- }
- private:
- void SendHead(const TActorContext& ctx) {
- if (!IsSync) {
- ctx.ExecutorThread.Send(GetForwardedEvent().Release());
- } else {
- while (Context->Queue->Head()) {
- HasReply = false;
- ctx.ExecutorThread.Send(GetForwardedEvent().Release());
- int count = 100;
- while (!HasReply && count > 0) {
- try {
- Runtime->DispatchEvents(DelegateeOptions);
- } catch (TEmptyEventQueueException&) {
- count--;
- Cerr << "No reply" << Endl;
- }
- }
- Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
- }
- }
- }
- TAutoPtr<IEventHandle> GetForwardedEvent() {
- IEventHandle* ev = Context->Queue->Head();
- ReplyChecker->OnRequest(ev);
- TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
- ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
- : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);
- return forwardedEv;
- }
- private:
- const TActorId Delegatee;
- const bool IsSync;
- const TVector<TActorId> AdditionalActors;
- TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
- TActorId ReplyId;
- bool HasReply;
- TDispatchOptions DelegateeOptions;
- TTestActorRuntimeBase* Runtime;
- THolder<IReplyChecker> ReplyChecker;
- };
- void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
- Owner->Reply(ev, ctx);
- }
- class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
- public:
- TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker)
- : Context(new TStrandingActorDecoratorContext())
- , Runtime(runtime)
- , CreateReplyChecker(createReplyChecker)
- {
- }
- IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
- return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
- CreateReplyChecker);
- }
- private:
- TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
- TTestActorRuntimeBase* Runtime;
- TReplyCheckerCreator CreateReplyChecker;
- };
- TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
- TReplyCheckerCreator createReplyChecker) {
- return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
- }
- ui64 DefaultRandomSeed = 9999;
- }
|