#include "test_runtime.h"

#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/callstack.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/executor_pool_io.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
#include <library/cpp/actors/interconnect/interconnect_proxy_wrapper.h>

#include <util/generic/maybe.h>
#include <util/generic/bt_exception.h>
#include <util/random/mersenne.h>
#include <util/string/printf.h>
#include <typeinfo>

bool VERBOSE = false;
const bool PRINT_EVENT_BODY = false;

namespace {

    TString MakeClusterId() {
        pid_t pid = getpid();
        TStringBuilder uuid;
        uuid << "Cluster for process with id: " << pid;
        return uuid;
    }
}

namespace NActors {
    ui64 TScheduledEventQueueItem::NextUniqueId = 0;

    void PrintEvent(TAutoPtr<IEventHandle>& ev, const TTestActorRuntimeBase* runtime) {
        Cerr << "mailbox: " << ev->GetRecipientRewrite().Hint() << ", type: " << Sprintf("%08x", ev->GetTypeRewrite())
            << ", from " << ev->Sender.LocalId();
        TString name = runtime->GetActorName(ev->Sender);
        if (!name.empty())
            Cerr << " \"" << name << "\"";
        Cerr << ", to " << ev->GetRecipientRewrite().LocalId();
        name = runtime->GetActorName(ev->GetRecipientRewrite());
        if (!name.empty())
            Cerr << " \"" << name << "\"";
        Cerr << ", ";
        if (ev->HasEvent())
            Cerr << " : " << (PRINT_EVENT_BODY ? ev->GetBase()->ToString() : ev->GetBase()->ToStringHeader());
        else if (ev->HasBuffer())
            Cerr << " : BUFFER";
        else
            Cerr << " : EMPTY";

        Cerr << "\n";
    }

    TTestActorRuntimeBase::TNodeDataBase::TNodeDataBase() {
        ActorSystemTimestamp = nullptr;
        ActorSystemMonotonic = nullptr;
    }

    void TTestActorRuntimeBase::TNodeDataBase::Stop() {
        if (Poller)
            Poller->Stop();

        if (MailboxTable) {
            for (ui32 round = 0; !MailboxTable->Cleanup(); ++round)
                Y_VERIFY(round < 10, "cyclic event/actor spawn while trying to shutdown actorsystem stub");
        }

        if (ActorSystem)
            ActorSystem->Stop();

        ActorSystem.Destroy();
        Poller.Reset();
    }

    TTestActorRuntimeBase::TNodeDataBase::~TNodeDataBase() {
        Stop();
    }


    class TTestActorRuntimeBase::TEdgeActor : public TActor<TEdgeActor> {
    public:
        static constexpr EActivityType ActorActivityType() {
            return TEST_ACTOR_RUNTIME;
        }

        TEdgeActor(TTestActorRuntimeBase* runtime)
            : TActor(&TEdgeActor::StateFunc)
            , Runtime(runtime)
        {
        }

        STFUNC(StateFunc) {
            Y_UNUSED(ctx);
            TGuard<TMutex> guard(Runtime->Mutex);
            bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
            if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
                verbose = false;
            }

            if (verbose) {
                Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
                PrintEvent(ev, Runtime);
            }

            if (!Runtime->EventFilterFunc(*Runtime, ev)) {
                ui32 nodeId = ev->GetRecipientRewrite().NodeId();
                Y_VERIFY(nodeId != 0);
                ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
                Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
                Runtime->MailboxesHasEvents.Signal();
                if (verbose)
                    Cerr << "Event was added to sent queue\n";
            }
            else {
                if (verbose)
                    Cerr << "Event was dropped\n";
            }
        }

    private:
        TTestActorRuntimeBase* Runtime;
    };

    void TEventMailBox::Send(TAutoPtr<IEventHandle> ev) {
        IEventHandle* ptr = ev.Get();
        Y_VERIFY(ptr);
#ifdef DEBUG_ORDER_EVENTS
        ui64 counter = NextToSend++;
        TrackSent[ptr] = counter;
#endif
        Sent.push_back(ev);
    }

    TAutoPtr<IEventHandle> TEventMailBox::Pop() {
        TAutoPtr<IEventHandle> result = Sent.front();
        Sent.pop_front();
#ifdef DEBUG_ORDER_EVENTS
        auto it = TrackSent.find(result.Get());
        if (it != TrackSent.end()) {
            Y_VERIFY(ExpectedReceive == it->second);
            TrackSent.erase(result.Get());
            ++ExpectedReceive;
        }
#endif
        return result;
    }

    bool TEventMailBox::IsEmpty() const {
        return Sent.empty();
    }

    void TEventMailBox::Capture(TEventsList& evList) {
        evList.insert(evList.end(), Sent.begin(), Sent.end());
        Sent.clear();
    }

    void TEventMailBox::PushFront(TAutoPtr<IEventHandle>& ev) {
        Sent.push_front(ev);
    }

    void TEventMailBox::PushFront(TEventsList& evList) {
        for (auto rit = evList.rbegin(); rit != evList.rend(); ++rit) {
            if (*rit) {
                Sent.push_front(*rit);
            }
        }
    }

    void TEventMailBox::CaptureScheduled(TScheduledEventsList& evList) {
        for (auto it = Scheduled.begin(); it != Scheduled.end(); ++it) {
            evList.insert(*it);
        }

        Scheduled.clear();
    }

    void TEventMailBox::PushScheduled(TScheduledEventsList& evList) {
        for (auto it = evList.begin(); it != evList.end(); ++it) {
            if (it->Event) {
                Scheduled.insert(*it);
            }
        }

        evList.clear();
    }

    bool TEventMailBox::IsActive(const TInstant& currentTime) const {
        return currentTime >= InactiveUntil;
    }

    void TEventMailBox::Freeze(const TInstant& deadline) {
        if (deadline > InactiveUntil)
            InactiveUntil = deadline;
    }

    TInstant TEventMailBox::GetInactiveUntil() const {
        return InactiveUntil;
    }

    void TEventMailBox::Schedule(const TScheduledEventQueueItem& item) {
        Scheduled.insert(item);
    }

    bool TEventMailBox::IsScheduledEmpty() const {
        return Scheduled.empty();
    }

    TInstant TEventMailBox::GetFirstScheduleDeadline() const {
        return Scheduled.begin()->Deadline;
    }

    ui64 TEventMailBox::GetSentEventCount() const {
        return Sent.size();
    }

    class TTestActorRuntimeBase::TTimeProvider : public ITimeProvider {
    public:
        TTimeProvider(TTestActorRuntimeBase& runtime)
            : Runtime(runtime)
        {
        }

        TInstant Now() override {
            return Runtime.GetCurrentTime();
        }

    private:
        TTestActorRuntimeBase& Runtime;
    };

    class TTestActorRuntimeBase::TSchedulerThreadStub : public ISchedulerThread {
    public:
        TSchedulerThreadStub(TTestActorRuntimeBase* runtime, TTestActorRuntimeBase::TNodeDataBase* node)
            : Runtime(runtime)
            , Node(node)
        {
            Y_UNUSED(Runtime);
        }

        void Prepare(TActorSystem *actorSystem, volatile ui64 *currentTimestamp, volatile ui64 *currentMonotonic) override {
            Y_UNUSED(actorSystem);
            Node->ActorSystemTimestamp = currentTimestamp;
            Node->ActorSystemMonotonic = currentMonotonic;
        }

        void PrepareSchedules(NSchedulerQueue::TReader **readers, ui32 scheduleReadersCount) override {
            Y_UNUSED(readers);
            Y_UNUSED(scheduleReadersCount);
        }

        void Start() override {
        }

        void PrepareStop() override {
        }

        void Stop() override {
        }

    private:
        TTestActorRuntimeBase* Runtime;
        TTestActorRuntimeBase::TNodeDataBase* Node;
    };

    class TTestActorRuntimeBase::TExecutorPoolStub : public IExecutorPool {
    public:
        TExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId)
            : IExecutorPool(poolId)
            , Runtime(runtime)
            , NodeIndex(nodeIndex)
            , Node(node)
        {
        }

        TTestActorRuntimeBase* GetRuntime() {
            return Runtime;
        }

        // for threads
        ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override {
            Y_UNUSED(wctx);
            Y_UNUSED(revolvingCounter);
            Y_FAIL();
        }

        void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) override {
            Y_UNUSED(workerId);
            Node->MailboxTable->ReclaimMailbox(mailboxType, hint, revolvingCounter);
        }

        void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
            DoSchedule(deadline, ev, cookie, workerId);
        }

        void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
            DoSchedule(TInstant::FromValue(deadline.GetValue()), ev, cookie, workerId);
        }

        void Schedule(TDuration delay, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) override {
            TInstant deadline = Runtime->GetTimeProvider()->Now() + delay;
            DoSchedule(deadline, ev, cookie, workerId);
        }

        void DoSchedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, TWorkerId workerId) {
            Y_UNUSED(workerId);

            TGuard<TMutex> guard(Runtime->Mutex);
            bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
            if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
                verbose = false;
            }

            if (verbose) {
                Cerr << "Got scheduled event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
                PrintEvent(ev, Runtime);
            }

            auto now = Runtime->GetTimeProvider()->Now();
            if (deadline < now) {
                deadline = now; // avoid going backwards in time
            }
            TDuration delay = (deadline - now);

            if (Runtime->SingleSysEnv || !Runtime->ScheduledEventFilterFunc(*Runtime, ev, delay, deadline)) {
                ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
                Runtime->GetMailbox(Runtime->FirstNodeId + NodeIndex, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, cookie));
                Runtime->MailboxesHasEvents.Signal();
                if (verbose)
                    Cerr << "Event was added to scheduled queue\n";
            } else {
                if (cookie) {
                    cookie->Detach();
                }
                if (verbose) {
                    Cerr << "Scheduled event for " << ev->GetRecipientRewrite().ToString() << " was dropped\n";
                }
            }
        }

        // for actorsystem
        bool Send(TAutoPtr<IEventHandle>& ev) override {
            TGuard<TMutex> guard(Runtime->Mutex);
            bool verbose = (Runtime->CurrentDispatchContext ? !Runtime->CurrentDispatchContext->Options->Quiet : true) && VERBOSE;
            if (Runtime->BlockedOutput.find(ev->Sender) != Runtime->BlockedOutput.end()) {
                verbose = false;
            }

            if (verbose) {
                Cerr << "Got event at " << TInstant::MicroSeconds(Runtime->CurrentTimestamp) << ", ";
                PrintEvent(ev, Runtime);
            }

            if (!Runtime->EventFilterFunc(*Runtime, ev)) {
                ui32 nodeId = ev->GetRecipientRewrite().NodeId();
                Y_VERIFY(nodeId != 0);
                TNodeDataBase* node = Runtime->Nodes[nodeId].Get();

                if (!AllowSendFrom(node, ev)) {
                    return true;
                }

                ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
                if (ev->GetTypeRewrite() == ui32(NActors::NLog::EEv::Log)) {
                    const NActors::TActorId loggerActorId = NActors::TActorId(nodeId, "logger");
                    TActorId logger = node->ActorSystem->LookupLocalService(loggerActorId);
                    if (ev->GetRecipientRewrite() == logger) {
                        TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
                        IActor* recipientActor = mailbox->FindActor(ev->GetRecipientRewrite().LocalId());
                        if (recipientActor) {
                            TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), ev->GetRecipientRewrite());
                            TActivationContext *prevTlsActivationContext = TlsActivationContext;
                            TlsActivationContext = &ctx;
                            recipientActor->Receive(ev, ctx);
                            TlsActivationContext = prevTlsActivationContext;
                            // we expect the logger to never die in tests
                        }
                    }
                } else {
                    Runtime->GetMailbox(nodeId, mailboxHint).Send(ev);
                    Runtime->MailboxesHasEvents.Signal();
                }
                if (verbose)
                    Cerr << "Event was added to sent queue\n";
            } else {
                if (verbose)
                    Cerr << "Event was dropped\n";
            }
            return true;
        }

        void ScheduleActivation(ui32 activation) override {
            Y_UNUSED(activation);
        }

        void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) override {
            Y_UNUSED(activation);
            Y_UNUSED(revolvingCounter);
        }

        TActorId Register(IActor *actor, TMailboxType::EType mailboxType, ui64 revolvingCounter,
            const TActorId& parentId) override {
            return Runtime->Register(actor, NodeIndex, PoolId, mailboxType, revolvingCounter, parentId);
        }

        TActorId Register(IActor *actor, TMailboxHeader *mailbox, ui32 hint, const TActorId& parentId) override {
            return Runtime->Register(actor, NodeIndex, PoolId, mailbox, hint, parentId);
        }

        // lifecycle stuff
        void Prepare(TActorSystem *actorSystem, NSchedulerQueue::TReader **scheduleReaders, ui32 *scheduleSz) override {
            Y_UNUSED(actorSystem);
            Y_UNUSED(scheduleReaders);
            Y_UNUSED(scheduleSz);
        }

        void Start() override {
        }

        void PrepareStop() override {
        }

        void Shutdown() override {
        }

        bool Cleanup() override {
            return true;
        }

        // generic
        TAffinity* Affinity() const override {
            Y_FAIL();
        }

    private:
        TTestActorRuntimeBase* const Runtime;
        const ui32 NodeIndex;
        TTestActorRuntimeBase::TNodeDataBase* const Node;
    };

    IExecutorPool* TTestActorRuntimeBase::CreateExecutorPoolStub(TTestActorRuntimeBase* runtime, ui32 nodeIndex, TTestActorRuntimeBase::TNodeDataBase* node, ui32 poolId) {
        return new TExecutorPoolStub{runtime, nodeIndex, node, poolId};
    }


    ui32 TTestActorRuntimeBase::NextNodeId = 1;

    TTestActorRuntimeBase::TTestActorRuntimeBase(THeSingleSystemEnv)
        : TTestActorRuntimeBase(1, 1, false)
    {
        SingleSysEnv = true;
    }

    TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
        : ScheduledCount(0)
        , ScheduledLimit(100000)
        , MainThreadId(TThread::CurrentThreadId())
        , ClusterUUID(MakeClusterId())
        , FirstNodeId(NextNodeId)
        , NodeCount(nodeCount)
        , DataCenterCount(dataCenterCount)
        , UseRealThreads(useRealThreads)
        , LocalId(0)
        , DispatchCyclesCount(0)
        , DispatchedEventsCount(0)
        , NeedMonitoring(false)
        , RandomProvider(CreateDeterministicRandomProvider(DefaultRandomSeed))
        , TimeProvider(new TTimeProvider(*this))
        , ShouldContinue()
        , CurrentTimestamp(0)
        , DispatchTimeout(DEFAULT_DISPATCH_TIMEOUT)
        , ReschedulingDelay(TDuration::MicroSeconds(0))
        , ObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc)
        , ScheduledEventsSelectorFunc(&CollapsedTimeScheduledEventsSelector)
        , EventFilterFunc(&TTestActorRuntimeBase::DefaultFilterFunc)
        , ScheduledEventFilterFunc(&TTestActorRuntimeBase::NopFilterFunc)
        , RegistrationObserver(&TTestActorRuntimeBase::DefaultRegistrationObserver)
        , CurrentDispatchContext(nullptr)
    {
        SetDispatcherRandomSeed(TInstant::Now(), 0);
        EnableActorCallstack();
    }

    void TTestActorRuntimeBase::InitNode(TNodeDataBase* node, size_t nodeIndex) {
        const NActors::TActorId loggerActorId = NActors::TActorId(FirstNodeId + nodeIndex, "logger");
        node->LogSettings = new NActors::NLog::TSettings(loggerActorId, 410 /* NKikimrServices::LOGGER */,
            NActors::NLog::PRI_WARN,  NActors::NLog::PRI_WARN, 0);
        node->LogSettings->SetAllowDrop(false);
        node->LogSettings->SetThrottleDelay(TDuration::Zero());
        node->DynamicCounters = new NMonitoring::TDynamicCounters;

        InitNodeImpl(node, nodeIndex);
    }

    void TTestActorRuntimeBase::InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) {
        node->LogSettings->Append(
            NActorsServices::EServiceCommon_MIN,
            NActorsServices::EServiceCommon_MAX,
            NActorsServices::EServiceCommon_Name
        );

        if (!UseRealThreads) {
            node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
            node->MailboxTable.Reset(new TMailboxTable());
            node->ActorSystem = MakeActorSystem(nodeIndex, node);
            node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
        } else {
            node->ActorSystem = MakeActorSystem(nodeIndex, node);
        }

        node->ActorSystem->Start();
    }

    bool TTestActorRuntimeBase::AllowSendFrom(TNodeDataBase* node, TAutoPtr<IEventHandle>& ev) {
        ui64 senderLocalId = ev->Sender.LocalId();
        ui64 senderMailboxHint = ev->Sender.Hint();
        TMailboxHeader* senderMailbox = node->MailboxTable->Get(senderMailboxHint);
        if (senderMailbox) {
            IActor* senderActor = senderMailbox->FindActor(senderLocalId);
            TTestDecorator *decorator = dynamic_cast<TTestDecorator*>(senderActor);
            return !decorator || decorator->BeforeSending(ev);
        }
        return true;
    }

    TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount)
        : TTestActorRuntimeBase(nodeCount, dataCenterCount, false) {
    }

    TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, bool useRealThreads)
        : TTestActorRuntimeBase(nodeCount, nodeCount, useRealThreads) {
    }

    TTestActorRuntimeBase::~TTestActorRuntimeBase() {
        CleanupNodes();
        Cerr.Flush();
        Cerr.Flush();
        Clog.Flush();

        DisableActorCallstack();
    }

    void TTestActorRuntimeBase::CleanupNodes() {
        Nodes.clear();
    }

    bool TTestActorRuntimeBase::IsRealThreads() const {
        return UseRealThreads;
    }

    TTestActorRuntimeBase::EEventAction TTestActorRuntimeBase::DefaultObserverFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
        Y_UNUSED(runtime);
        Y_UNUSED(event);
        return EEventAction::PROCESS;
    }

    void TTestActorRuntimeBase::DroppingScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
        Y_UNUSED(runtime);
        Y_UNUSED(queue);
        scheduledEvents.clear();
    }

    bool TTestActorRuntimeBase::DefaultFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
        Y_UNUSED(runtime);
        Y_UNUSED(event);
        return false;
    }

    bool TTestActorRuntimeBase::NopFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration delay, TInstant& deadline) {
        Y_UNUSED(runtime);
        Y_UNUSED(delay);
        Y_UNUSED(event);
        Y_UNUSED(deadline);
        return true;
    }


    void TTestActorRuntimeBase::DefaultRegistrationObserver(TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
        if (runtime.ScheduleWhiteList.find(parentId) != runtime.ScheduleWhiteList.end()) {
            runtime.ScheduleWhiteList.insert(actorId);
            runtime.ScheduleWhiteListParent[actorId] = parentId;
        }
    }

    class TScheduledTreeItem {
    public:
        TString Name;
        ui64 Count;
        TVector<TScheduledTreeItem> Children;

        TScheduledTreeItem(const TString& name)
            : Name(name)
            , Count(0)
        {}

        TScheduledTreeItem* GetItem(const TString& name) {
            TScheduledTreeItem* item = nullptr;
            for (TScheduledTreeItem& i : Children) {
                if (i.Name == name) {
                    item = &i;
                    break;
                }
            }
            if (item != nullptr)
                return item;
            Children.emplace_back(name);
            return &Children.back();
        }

        void RecursiveSort() {
            Sort(Children, [](const TScheduledTreeItem& a, const TScheduledTreeItem& b) -> bool { return a.Count > b.Count; });
            for (TScheduledTreeItem& item : Children) {
                item.RecursiveSort();
            }
        }

        void Print(IOutputStream& stream, const TString& prefix) {
            for (auto it = Children.begin(); it != Children.end(); ++it) {
                bool lastChild = (std::next(it) == Children.end());
                TString connectionPrefix = lastChild ? "└─ " : "├─ ";
                TString subChildPrefix = lastChild ? "   " : "│  ";
                stream << prefix << connectionPrefix << it->Name << " (" << it->Count << ")\n";
                it->Print(stream, prefix + subChildPrefix);
            }
        }

        void Print(IOutputStream& stream) {
            stream << Name << " (" << Count << ")\n";
            Print(stream, TString());
        }
    };

    void TTestActorRuntimeBase::CollapsedTimeScheduledEventsSelector(TTestActorRuntimeBase& runtime, TScheduledEventsList& scheduledEvents, TEventsList& queue) {
        if (scheduledEvents.empty())
            return;

        TInstant time = scheduledEvents.begin()->Deadline;
        while (!scheduledEvents.empty() && scheduledEvents.begin()->Deadline == time) {
            static THashMap<std::pair<TActorId, TString>, ui64> eventTypes;
            auto& item = *scheduledEvents.begin();
            TString name = item.Event->GetBase() ? TypeName(*item.Event->GetBase()) : Sprintf("%08" PRIx32, item.Event->Type);
            eventTypes[std::make_pair(item.Event->Recipient, name)]++;
            runtime.ScheduledCount++;
            if (runtime.ScheduledCount > runtime.ScheduledLimit) {
//                TScheduledTreeItem root("Root");
//                TVector<TString> path;
//                for (const auto& pr : eventTypes) {
//                    path.clear();
//                    path.push_back(runtime.GetActorName(pr.first.first));
//                    for (auto it = runtime.ScheduleWhiteListParent.find(pr.first.first); it != runtime.ScheduleWhiteListParent.end(); it = runtime.ScheduleWhiteListParent.find(it->second)) {
//                        path.insert(path.begin(), runtime.GetActorName(it->second));
//                    }
//                    path.push_back("<" + pr.first.second + ">"); // event name;
//                    ui64 count = pr.second;
//                    TScheduledTreeItem* item = &root;
//                    item->Count += count;
//                    for (TString name : path) {
//                        item = item->GetItem(name);
//                        item->Count += count;
//                    }
//                }
//                root.RecursiveSort();
//                root.Print(Cerr);

                ythrow TSchedulingLimitReachedException(runtime.ScheduledLimit);
            }
            if (item.Cookie->Get()) {
                if (item.Cookie->Detach()) {
                    queue.push_back(item.Event);
                }
            } else {
                queue.push_back(item.Event);
            }

            scheduledEvents.erase(scheduledEvents.begin());
        }

        runtime.UpdateCurrentTime(time);
    }

    TTestActorRuntimeBase::TEventObserver TTestActorRuntimeBase::SetObserverFunc(TEventObserver observerFunc) {
        TGuard<TMutex> guard(Mutex);
        auto result = ObserverFunc;
        ObserverFunc = observerFunc;
        return result;
    }

    TTestActorRuntimeBase::TScheduledEventsSelector TTestActorRuntimeBase::SetScheduledEventsSelectorFunc(TScheduledEventsSelector scheduledEventsSelectorFunc) {
        TGuard<TMutex> guard(Mutex);
        auto result = ScheduledEventsSelectorFunc;
        ScheduledEventsSelectorFunc = scheduledEventsSelectorFunc;
        return result;
    }

    TTestActorRuntimeBase::TEventFilter TTestActorRuntimeBase::SetEventFilter(TEventFilter filterFunc) {
        TGuard<TMutex> guard(Mutex);
        auto result = EventFilterFunc;
        EventFilterFunc = filterFunc;
        return result;
    }

    TTestActorRuntimeBase::TScheduledEventFilter TTestActorRuntimeBase::SetScheduledEventFilter(TScheduledEventFilter filterFunc) {
        TGuard<TMutex> guard(Mutex);
        auto result = ScheduledEventFilterFunc;
        ScheduledEventFilterFunc = filterFunc;
        return result;
    }

    TTestActorRuntimeBase::TRegistrationObserver TTestActorRuntimeBase::SetRegistrationObserverFunc(TRegistrationObserver observerFunc) {
        TGuard<TMutex> guard(Mutex);
        auto result = RegistrationObserver;
        RegistrationObserver = observerFunc;
        return result;
    }

    bool TTestActorRuntimeBase::IsVerbose() {
        return VERBOSE;
    }

    void TTestActorRuntimeBase::SetVerbose(bool verbose) {
        VERBOSE = verbose;
    }

    void TTestActorRuntimeBase::AddLocalService(const TActorId& actorId, const TActorSetupCmd& cmd, ui32 nodeIndex) {
        Y_VERIFY(!IsInitialized);
        Y_VERIFY(nodeIndex < NodeCount);
        auto node = Nodes[nodeIndex + FirstNodeId];
        if (!node) {
            node = GetNodeFactory().CreateNode();
            Nodes[nodeIndex + FirstNodeId] = node;
        }

        node->LocalServicesActors[actorId] = cmd.Actor;
        node->LocalServices.push_back(std::make_pair(actorId, cmd));
    }

    void TTestActorRuntimeBase::InitNodes() {
        NextNodeId += NodeCount;
        Y_VERIFY(NodeCount > 0);

        for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
            auto nodeIt = Nodes.emplace(FirstNodeId + nodeIndex, GetNodeFactory().CreateNode()).first;
            TNodeDataBase* node = nodeIt->second.Get();
            InitNode(node, nodeIndex);
        }

    }

    void TTestActorRuntimeBase::Initialize() {
        InitNodes();
        IsInitialized = true;
    }

    void SetupCrossDC() {

    }

    TDuration TTestActorRuntimeBase::SetDispatchTimeout(TDuration timeout) {
        TGuard<TMutex> guard(Mutex);
        TDuration oldTimeout = DispatchTimeout;
        DispatchTimeout = timeout;
        return oldTimeout;
    }

    TDuration TTestActorRuntimeBase::SetReschedulingDelay(TDuration delay) {
        TGuard<TMutex> guard(Mutex);
        TDuration oldDelay = ReschedulingDelay;
        ReschedulingDelay = delay;
        return oldDelay;
    }

    void TTestActorRuntimeBase::SetLogBackend(const TAutoPtr<TLogBackend> logBackend) {
        Y_VERIFY(!IsInitialized);
        TGuard<TMutex> guard(Mutex);
        LogBackend = logBackend;
    }

    void TTestActorRuntimeBase::SetLogPriority(NActors::NLog::EComponent component, NActors::NLog::EPriority priority) {
        TGuard<TMutex> guard(Mutex);
        for (ui32 nodeIndex = 0; nodeIndex < NodeCount; ++nodeIndex) {
            TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
            TString explanation;
            auto status = node->LogSettings->SetLevel(priority, component, explanation);
            if (status) {
                Y_FAIL("SetLogPriority failed: %s", explanation.c_str());
            }
        }
    }

    TInstant TTestActorRuntimeBase::GetCurrentTime() const {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(!UseRealThreads);
        return TInstant::MicroSeconds(CurrentTimestamp);
    }

    void TTestActorRuntimeBase::UpdateCurrentTime(TInstant newTime) {
        static int counter = 0;
        ++counter;
        if (VERBOSE) {
            Cerr << "UpdateCurrentTime(" << counter << "," << newTime << ")\n";
        }
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(!UseRealThreads);
        if (newTime.MicroSeconds() > CurrentTimestamp) {
            CurrentTimestamp = newTime.MicroSeconds();
            for (auto& kv : Nodes) {
                AtomicStore(kv.second->ActorSystemTimestamp, CurrentTimestamp);
                AtomicStore(kv.second->ActorSystemMonotonic, CurrentTimestamp);
            }
        }
    }

    void TTestActorRuntimeBase::AdvanceCurrentTime(TDuration duration) {
        UpdateCurrentTime(GetCurrentTime() + duration);
    }

    TIntrusivePtr<ITimeProvider> TTestActorRuntimeBase::GetTimeProvider() {
        Y_VERIFY(!UseRealThreads);
        return TimeProvider;
    }

    ui32 TTestActorRuntimeBase::GetNodeId(ui32 index) const {
        Y_VERIFY(index < NodeCount);
        return FirstNodeId + index;
    }

    ui32 TTestActorRuntimeBase::GetNodeCount() const {
        return NodeCount;
    }

    ui64 TTestActorRuntimeBase::AllocateLocalId() {
        TGuard<TMutex> guard(Mutex);
        ui64 nextId = ++LocalId;
        if (VERBOSE) {
            Cerr << "Allocated id: " << nextId << "\n";
        }

        return nextId;
    }

    ui32 TTestActorRuntimeBase::InterconnectPoolId() const {
        if (UseRealThreads && NSan::TSanIsOn()) {
            // Interconnect coroutines may move across threads
            // Use a special single-threaded pool to avoid that
            return 4;
        }
        return 0;
    }

    TString TTestActorRuntimeBase::GetTempDir() {
        if (!TmpDir)
            TmpDir.Reset(new TTempDir());
        return (*TmpDir)();
    }

    TActorId TTestActorRuntimeBase::Register(IActor* actor, ui32 nodeIndex, ui32 poolId, TMailboxType::EType mailboxType,
        ui64 revolvingCounter, const TActorId& parentId) {
        Y_VERIFY(nodeIndex < NodeCount);
        TGuard<TMutex> guard(Mutex);
        TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
        if (UseRealThreads) {
            Y_VERIFY(poolId < node->ExecutorPools.size());
            return node->ExecutorPools[poolId]->Register(actor, mailboxType, revolvingCounter, parentId);
        }

        // first step - find good enough mailbox
        ui32 hint = 0;
        TMailboxHeader *mailbox = nullptr;

        {
            ui32 hintBackoff = 0;

            while (hint == 0) {
                hint = node->MailboxTable->AllocateMailbox(mailboxType, ++revolvingCounter);
                mailbox = node->MailboxTable->Get(hint);

                if (!mailbox->LockFromFree()) {
                    node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
                    hintBackoff = hint;
                    hint = 0;
                }
            }

            node->MailboxTable->ReclaimMailbox(mailboxType, hintBackoff, ++revolvingCounter);
        }

        const ui64 localActorId = AllocateLocalId();
        if (VERBOSE) {
            Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << ", mailbox: " << hint << "\n";
        }

        // ok, got mailbox
        mailbox->AttachActor(localActorId, actor);

        // do init
        const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
        ActorNames[actorId] = TypeName(*actor);
        RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
        DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);

        switch (mailboxType) {
        case TMailboxType::Simple:
            UnlockFromExecution((TMailboxTable::TSimpleMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
            break;
        case TMailboxType::Revolving:
            UnlockFromExecution((TMailboxTable::TRevolvingMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
            break;
        case TMailboxType::HTSwap:
            UnlockFromExecution((TMailboxTable::THTSwapMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
            break;
        case TMailboxType::ReadAsFilled:
            UnlockFromExecution((TMailboxTable::TReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
            break;
        case TMailboxType::TinyReadAsFilled:
            UnlockFromExecution((TMailboxTable::TTinyReadAsFilledMailbox *)mailbox, node->ExecutorPools[0], false, hint, MaxWorkers, ++revolvingCounter);
            break;
        default:
            Y_FAIL("Unsupported mailbox type");
        }

        return actorId;
    }

    TActorId TTestActorRuntimeBase::Register(IActor *actor, ui32 nodeIndex, ui32 poolId, TMailboxHeader *mailbox, ui32 hint,
        const TActorId& parentId) {
        Y_VERIFY(nodeIndex < NodeCount);
        TGuard<TMutex> guard(Mutex);
        TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
        if (UseRealThreads) {
            Y_VERIFY(poolId < node->ExecutorPools.size());
            return node->ExecutorPools[poolId]->Register(actor, mailbox, hint, parentId);
        }

        const ui64 localActorId = AllocateLocalId();
        if (VERBOSE) {
            Cerr << "Register actor " << TypeName(*actor) << " as " << localActorId << "\n";
        }

        mailbox->AttachActor(localActorId, actor);
        const TActorId actorId(FirstNodeId + nodeIndex, poolId, localActorId, hint);
        ActorNames[actorId] = TypeName(*actor);
        RegistrationObserver(*this, parentId ? parentId : CurrentRecipient, actorId);
        DoActorInit(node->ActorSystem.Get(), actor, actorId, parentId ? parentId : CurrentRecipient);

        return actorId;
    }

    TActorId TTestActorRuntimeBase::RegisterService(const TActorId& serviceId, const TActorId& actorId, ui32 nodeIndex) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeIndex < NodeCount);
        TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
        if (!UseRealThreads) {
            IActor* actor = FindActor(actorId, node);
            node->LocalServicesActors[serviceId] = actor;
            node->ActorToActorId[actor] = actorId;
        }

        return node->ActorSystem->RegisterLocalService(serviceId, actorId);
    }

    TActorId TTestActorRuntimeBase::AllocateEdgeActor(ui32 nodeIndex) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeIndex < NodeCount);
        TActorId edgeActor = Register(new TEdgeActor(this), nodeIndex);
        EdgeActors.insert(edgeActor);
        EdgeActorByMailbox[TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint())] = edgeActor;
        return edgeActor;
    }

    TEventsList TTestActorRuntimeBase::CaptureEvents() {
        TGuard<TMutex> guard(Mutex);
        TEventsList result;
        for (auto& mbox : Mailboxes) {
            mbox.second->Capture(result);
        }

        return result;
    }

    TEventsList TTestActorRuntimeBase::CaptureMailboxEvents(ui32 hint, ui32 nodeId) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
        TEventsList result;
        GetMailbox(nodeId, hint).Capture(result);
        return result;
    }

    void TTestActorRuntimeBase::PushFront(TAutoPtr<IEventHandle>& ev) {
        TGuard<TMutex> guard(Mutex);
        ui32 nodeId = ev->GetRecipientRewrite().NodeId();
        Y_VERIFY(nodeId != 0);
        GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
    }

    void TTestActorRuntimeBase::PushEventsFront(TEventsList& events) {
        TGuard<TMutex> guard(Mutex);
        for (auto rit = events.rbegin(); rit != events.rend(); ++rit) {
            if (*rit) {
                auto& ev = *rit;
                ui32 nodeId = ev->GetRecipientRewrite().NodeId();
                Y_VERIFY(nodeId != 0);
                GetMailbox(nodeId, ev->GetRecipientRewrite().Hint()).PushFront(ev);
            }
        }

        events.clear();
    }

    void TTestActorRuntimeBase::PushMailboxEventsFront(ui32 hint, ui32 nodeId, TEventsList& events) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeId >= FirstNodeId && nodeId < FirstNodeId + NodeCount);
        TEventsList result;
        GetMailbox(nodeId, hint).PushFront(events);
        events.clear();
    }

    TScheduledEventsList TTestActorRuntimeBase::CaptureScheduledEvents() {
        TGuard<TMutex> guard(Mutex);
        TScheduledEventsList result;
        for (auto& mbox : Mailboxes) {
            mbox.second->CaptureScheduled(result);
        }

        return result;
    }

    bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options) {
        return DispatchEvents(options, TInstant::Max());
    }

    bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TDuration simTimeout) {
        return DispatchEvents(options, TInstant::MicroSeconds(CurrentTimestamp) + simTimeout);
    }

    bool TTestActorRuntimeBase::DispatchEvents(const TDispatchOptions& options, TInstant simDeadline) {
        TGuard<TMutex> guard(Mutex);
        return DispatchEventsInternal(options, simDeadline);
    }

    // Mutex must be locked by caller!
    bool TTestActorRuntimeBase::DispatchEventsInternal(const TDispatchOptions& options, TInstant simDeadline) {
        TDispatchContext localContext;
        localContext.Options = &options;
        localContext.PrevContext = nullptr;
        bool verbose = !options.Quiet && VERBOSE;

        struct TDispatchContextSetter {
            TDispatchContextSetter(TTestActorRuntimeBase& runtime, TDispatchContext& lastContext)
                : Runtime(runtime)
            {
                lastContext.PrevContext = Runtime.CurrentDispatchContext;
                Runtime.CurrentDispatchContext = &lastContext;
            }

            ~TDispatchContextSetter() {
                Runtime.CurrentDispatchContext = Runtime.CurrentDispatchContext->PrevContext;
            }

            TTestActorRuntimeBase& Runtime;
        } DispatchContextSetter(*this, localContext);

        TInstant dispatchTime = TInstant::MicroSeconds(0);
        TInstant deadline = dispatchTime + DispatchTimeout;
        const TDuration scheduledEventsInspectInterval = TDuration::MilliSeconds(10);
        TInstant inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
        if (verbose) {
            Cerr << "Start dispatch at " << TInstant::MicroSeconds(CurrentTimestamp) << ", deadline is " << deadline << "\n";
        }

        struct TTempEdgeEventsCaptor {
            TTempEdgeEventsCaptor(TTestActorRuntimeBase& runtime)
                : Runtime(runtime)
                , HasEvents(false)
            {
                for (auto edgeActor : Runtime.EdgeActors) {
                    TEventsList events;
                    Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).Capture(events);
                    auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
                    auto storeIt = Store.find(mboxId);
                    Y_VERIFY(storeIt == Store.end());
                    storeIt = Store.insert(std::make_pair(mboxId, new TEventMailBox)).first;
                    storeIt->second->PushFront(events);
                    if (!events.empty())
                        HasEvents = true;
                }
            }

            ~TTempEdgeEventsCaptor() {
                for (auto edgeActor : Runtime.EdgeActors) {
                    auto mboxId = TEventMailboxId(edgeActor.NodeId(), edgeActor.Hint());
                    auto storeIt = Store.find(mboxId);
                    if (storeIt == Store.end()) {
                        continue;
                    }

                    TEventsList events;
                    storeIt->second->Capture(events);
                    Runtime.GetMailbox(edgeActor.NodeId(), edgeActor.Hint()).PushFront(events);
                }
            }

            TTestActorRuntimeBase& Runtime;
            TEventMailBoxList Store;
            bool HasEvents;
        };

        TEventMailBoxList restrictedMailboxes;
        const bool useRestrictedMailboxes = !options.OnlyMailboxes.empty();
        for (auto mailboxId : options.OnlyMailboxes) {
            auto it = Mailboxes.find(mailboxId);
            if (it == Mailboxes.end()) {
                it = Mailboxes.insert(std::make_pair(mailboxId, new TEventMailBox())).first;
            }

            restrictedMailboxes.insert(std::make_pair(mailboxId, it->second));
        }

        TAutoPtr<TTempEdgeEventsCaptor> tempEdgeEventsCaptor;
        if (!restrictedMailboxes) {
            tempEdgeEventsCaptor.Reset(new TTempEdgeEventsCaptor(*this));
        }

        TEventMailBoxList& currentMailboxes = useRestrictedMailboxes ? restrictedMailboxes : Mailboxes;
        while (!currentMailboxes.empty()) {
            bool hasProgress = true;
            while (hasProgress) {
                ++DispatchCyclesCount;
                hasProgress = false;

                ui64 eventsToDispatch = 0;
                for (auto mboxIt = currentMailboxes.begin(); mboxIt != currentMailboxes.end(); ++mboxIt) {
                    if (mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
                        eventsToDispatch += mboxIt->second->GetSentEventCount();
                    }
                }
                ui32 eventsDispatched = 0;

                //TODO: count events before each cycle, break after dispatching that much events
                bool isEmpty = false;
                while (!isEmpty && eventsDispatched < eventsToDispatch) {
                    ui64 mailboxCount = currentMailboxes.size();
                    ui64 startWith = mailboxCount ? DispatcherRandomProvider->GenRand64() % mailboxCount : 0ull;
                    auto startWithMboxIt = currentMailboxes.begin();
                        for (ui64 i = 0; i < startWith; ++i) {
                            ++startWithMboxIt;
                        }
                    auto endWithMboxIt = startWithMboxIt;

                    isEmpty = true;
                    auto mboxIt = startWithMboxIt;
                    TDeque<TEventMailboxId> suspectedBoxes;
                    while (true) {
                        auto& mbox = *mboxIt;
                        bool isIgnored = true;
                        if (!mbox.second->IsEmpty()) {
                            HandleNonEmptyMailboxesForEachContext(mbox.first);
                            if (mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {

                                bool isEdgeMailbox = false;
                                if (EdgeActorByMailbox.FindPtr(TEventMailboxId(mbox.first.NodeId, mbox.first.Hint))) {
                                    isEdgeMailbox = true;
                                    TEventsList events;
                                    mbox.second->Capture(events);
                                    for (auto& ev : events) {
                                        TInverseGuard<TMutex> inverseGuard(Mutex);
                                        ObserverFunc(*this, ev);
                                    }
                                    mbox.second->PushFront(events);
                                }

                                if (!isEdgeMailbox) {
                                    isEmpty = false;
                                    isIgnored = false;
                                    ++eventsDispatched;
                                    ++DispatchedEventsCount;
                                    if (DispatchedEventsCount > DispatchedEventsLimit) {
                                        ythrow TWithBackTrace<yexception>() << "Dispatched "
                                            << DispatchedEventsLimit << " events, limit reached.";
                                    }

                                    auto ev = mbox.second->Pop();
                                    if (BlockedOutput.find(ev->Sender) == BlockedOutput.end()) {
                                        //UpdateCurrentTime(TInstant::MicroSeconds(CurrentTimestamp + 10));
                                        if (verbose) {
                                            Cerr << "Process event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
                                            PrintEvent(ev, this);
                                        }
                                    }

                                    hasProgress = true;
                                    EEventAction action;
                                    {
                                        TInverseGuard<TMutex> inverseGuard(Mutex);
                                        action = ObserverFunc(*this, ev);
                                    }

                                    switch (action) {
                                        case EEventAction::PROCESS:
                                            UpdateFinalEventsStatsForEachContext(*ev);
                                            SendInternal(ev.Release(), mbox.first.NodeId - FirstNodeId, false);
                                        break;
                                        case EEventAction::DROP:
                                            // do nothing
                                        break;
                                        case EEventAction::RESCHEDULE: {
                                            TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + ReschedulingDelay;
                                            mbox.second->Freeze(deadline);
                                            mbox.second->PushFront(ev);
                                            break;
                                        }
                                        default:
                                            Y_FAIL("Unknown action");
                                    }
                                }
                            }

                        }
                        Y_VERIFY(mboxIt != currentMailboxes.end());
                        if (!isIgnored && !CurrentDispatchContext->PrevContext && !restrictedMailboxes &&
                                mboxIt->second->IsEmpty() &&
                                mboxIt->second->IsScheduledEmpty() &&
                                mboxIt->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
                            suspectedBoxes.push_back(mboxIt->first);
                        }
                        ++mboxIt;
                        if (mboxIt == currentMailboxes.end()) {
                            mboxIt = currentMailboxes.begin();
                        }
                        Y_VERIFY(endWithMboxIt != currentMailboxes.end());
                        if (mboxIt == endWithMboxIt) {
                            break;
                        }
                    }

                    for (auto id : suspectedBoxes) {
                        auto it = currentMailboxes.find(id);
                        if (it != currentMailboxes.end() && it->second->IsEmpty() && it->second->IsScheduledEmpty() &&
                                it->second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
                            currentMailboxes.erase(it);
                        }
                    }
                }
            }

            if (localContext.FinalEventFound) {
                return true;
            }

            if (!localContext.FoundNonEmptyMailboxes.empty())
                return true;

            if (options.CustomFinalCondition && options.CustomFinalCondition())
                return true;

            if (options.FinalEvents.empty()) {
                for (auto& mbox : currentMailboxes) {
                    if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp)))
                        continue;

                    if (!mbox.second->IsEmpty()) {
                        if (verbose) {
                            Cerr << "Dispatch complete with non-empty queue at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
                        }

                        return true;
                    }
                }
            }

            if (TInstant::MicroSeconds(CurrentTimestamp) > simDeadline) {
                return false;
            }

            if (dispatchTime >= deadline) {
                if (verbose) {
                    Cerr << "Reach deadline at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
                }

                ythrow TWithBackTrace<TEmptyEventQueueException>();
            }

            if (!options.Quiet && dispatchTime >= inspectScheduledEventsAt) {
                inspectScheduledEventsAt = dispatchTime + scheduledEventsInspectInterval;
                bool isEmpty = true;
                TMaybe<TInstant> nearestMailboxDeadline;
                TVector<TIntrusivePtr<TEventMailBox>> nextScheduleMboxes;
                TMaybe<TInstant> nextScheduleDeadline;
                for (auto& mbox : currentMailboxes) {
                    if (!mbox.second->IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
                        if (!nearestMailboxDeadline.Defined() || *nearestMailboxDeadline.Get() > mbox.second->GetInactiveUntil()) {
                            nearestMailboxDeadline = mbox.second->GetInactiveUntil();
                        }

                        continue;
                    }

                    if (mbox.second->IsScheduledEmpty())
                        continue;

                    auto firstScheduleDeadline = mbox.second->GetFirstScheduleDeadline();
                    if (!nextScheduleDeadline || firstScheduleDeadline < *nextScheduleDeadline) {
                        nextScheduleMboxes.clear();
                        nextScheduleMboxes.emplace_back(mbox.second);
                        nextScheduleDeadline = firstScheduleDeadline;
                    } else if (firstScheduleDeadline == *nextScheduleDeadline) {
                        nextScheduleMboxes.emplace_back(mbox.second);
                    }
                }

                for (const auto& nextScheduleMbox : nextScheduleMboxes) {
                    TEventsList selectedEvents;
                    TScheduledEventsList capturedScheduledEvents;
                    nextScheduleMbox->CaptureScheduled(capturedScheduledEvents);
                    ScheduledEventsSelectorFunc(*this, capturedScheduledEvents, selectedEvents);
                    nextScheduleMbox->PushScheduled(capturedScheduledEvents);
                    for (auto& event : selectedEvents) {
                        if (verbose && (BlockedOutput.find(event->Sender) == BlockedOutput.end())) {
                            Cerr << "Selected scheduled event at " << TInstant::MicroSeconds(CurrentTimestamp) << ", ";
                            PrintEvent(event, this);
                        }

                        nextScheduleMbox->Send(event);
                        isEmpty = false;
                    }
                }

                if (!isEmpty) {
                    if (verbose) {
                        Cerr << "Process selected events at " << TInstant::MicroSeconds(CurrentTimestamp) << "\n";
                    }

                    deadline = dispatchTime +  DispatchTimeout;
                    continue;
                }

                if (nearestMailboxDeadline.Defined()) {
                    if (verbose) {
                        Cerr << "Forward time to " << *nearestMailboxDeadline.Get() << "\n";
                    }

                    UpdateCurrentTime(*nearestMailboxDeadline.Get());
                    continue;
                }
            }

            TDuration waitDelay = TDuration::MilliSeconds(10);
            dispatchTime += waitDelay;
            MailboxesHasEvents.WaitT(Mutex, waitDelay);
        }
        return false;
    }

    void TTestActorRuntimeBase::HandleNonEmptyMailboxesForEachContext(TEventMailboxId mboxId) {
        TDispatchContext* context = CurrentDispatchContext;
        while (context) {
            const auto& nonEmptyMailboxes = context->Options->NonEmptyMailboxes;
            if (Find(nonEmptyMailboxes.begin(), nonEmptyMailboxes.end(), mboxId) != nonEmptyMailboxes.end()) {
                context->FoundNonEmptyMailboxes.insert(mboxId);
            }

            context = context->PrevContext;
        }
    }

    void TTestActorRuntimeBase::UpdateFinalEventsStatsForEachContext(IEventHandle& ev) {
        TDispatchContext* context = CurrentDispatchContext;
        while (context) {
            for (const auto& finalEvent : context->Options->FinalEvents) {
                if (finalEvent.EventCheck(ev)) {
                    auto& freq = context->FinalEventFrequency[&finalEvent];
                    if (++freq >= finalEvent.RequiredCount) {
                        context->FinalEventFound = true;
                    }
                }
            }

            context = context->PrevContext;
        }
    }

    void TTestActorRuntimeBase::Send(IEventHandle* ev, ui32 senderNodeIndex, bool viaActorSystem) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(senderNodeIndex < NodeCount, "senderNodeIndex# %" PRIu32 " < NodeCount# %" PRIu32,
            senderNodeIndex, NodeCount);
        SendInternal(ev, senderNodeIndex, viaActorSystem);
    }

    void TTestActorRuntimeBase::Schedule(IEventHandle* ev, const TDuration& duration, ui32 nodeIndex) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeIndex < NodeCount);
        ui32 nodeId = FirstNodeId + nodeIndex;
        ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
        TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + duration;
        GetMailbox(nodeId, mailboxHint).Schedule(TScheduledEventQueueItem(deadline, ev, nullptr));
        if (VERBOSE)
            Cerr << "Event was added to scheduled queue\n";
    }

    void TTestActorRuntimeBase::ClearCounters() {
        TGuard<TMutex> guard(Mutex);
        EvCounters.clear();
    }

    ui64 TTestActorRuntimeBase::GetCounter(ui32 evType) const {
        TGuard<TMutex> guard(Mutex);
        auto it = EvCounters.find(evType);
        if (it == EvCounters.end())
            return 0;

        return it->second;
    }

    TActorId TTestActorRuntimeBase::GetLocalServiceId(const TActorId& serviceId, ui32 nodeIndex) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeIndex < NodeCount);
        TNodeDataBase* node = Nodes[FirstNodeId + nodeIndex].Get();
        return node->ActorSystem->LookupLocalService(serviceId);
    }

    void TTestActorRuntimeBase::WaitForEdgeEvents(TEventFilter filter, const TSet<TActorId>& edgeFilter, TDuration simTimeout) {
        TGuard<TMutex> guard(Mutex);
        ui32 dispatchCount = 0;
        if (!edgeFilter.empty()) {
            for (auto edgeActor : edgeFilter) {
                Y_VERIFY(EdgeActors.contains(edgeActor), "%s is not an edge actor", ToString(edgeActor).data());
            }
        }
        const TSet<TActorId>& edgeActors = edgeFilter.empty() ? EdgeActors : edgeFilter;
        TInstant deadline = TInstant::MicroSeconds(CurrentTimestamp) + simTimeout;
        for (;;) {
            for (auto edgeActor : edgeActors) {
                TEventsList events;
                auto& mbox = GetMailbox(edgeActor.NodeId(), edgeActor.Hint());
                bool foundEvent = false;
                mbox.Capture(events);
                for (auto& ev : events) {
                    if (filter(*this, ev)) {
                        foundEvent = true;
                        break;
                    }
                }

                mbox.PushFront(events);
                if (foundEvent)
                    return;
            }

            ++dispatchCount;
            {
                if (!DispatchEventsInternal(TDispatchOptions(), deadline)) {
                    return; // Timed out; event was not found
                }
            }

            Y_VERIFY(dispatchCount < 1000, "Hard limit to prevent endless loop");
        }
    }

    TActorId TTestActorRuntimeBase::GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeIndexFrom < NodeCount);
        Y_VERIFY(nodeIndexTo < NodeCount);
        Y_VERIFY(nodeIndexFrom != nodeIndexTo);
        TNodeDataBase* node = Nodes[FirstNodeId + nodeIndexFrom].Get();
        return node->ActorSystem->InterconnectProxy(FirstNodeId + nodeIndexTo);
    }

    void TTestActorRuntimeBase::BlockOutputForActor(const TActorId& actorId) {
        TGuard<TMutex> guard(Mutex);
        BlockedOutput.insert(actorId);
    }

    void TTestActorRuntimeBase::SetDispatcherRandomSeed(TInstant time, ui64 iteration) {
        ui64 days = (time.Hours() / 24);
        DispatcherRandomSeed = (days << 32) ^ iteration;
        DispatcherRandomProvider = CreateDeterministicRandomProvider(DispatcherRandomSeed);
    }

    IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, ui32 nodeIndex) const {
        TGuard<TMutex> guard(Mutex);
        if (nodeIndex == Max<ui32>()) {
            Y_VERIFY(actorId.NodeId());
            nodeIndex = actorId.NodeId() - FirstNodeId;
        }

        Y_VERIFY(nodeIndex < NodeCount);
        auto nodeIt = Nodes.find(FirstNodeId + nodeIndex);
        Y_VERIFY(nodeIt != Nodes.end());
        TNodeDataBase* node = nodeIt->second.Get();
        return FindActor(actorId, node);
    }

    void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
        TGuard<TMutex> guard(Mutex);
        if (allow) {
            if (VERBOSE) {
                Cerr << "Actor " << actorId << " added to schedule whitelist";
            }
            ScheduleWhiteList.insert(actorId);
        } else {
            if (VERBOSE) {
                Cerr << "Actor " << actorId << " removed from schedule whitelist";
            }
            ScheduleWhiteList.erase(actorId);
        }
    }

    bool TTestActorRuntimeBase::IsScheduleForActorEnabled(const TActorId& actorId) const {
        TGuard<TMutex> guard(Mutex);
        return ScheduleWhiteList.find(actorId) != ScheduleWhiteList.end();
    }

    TIntrusivePtr<NMonitoring::TDynamicCounters> TTestActorRuntimeBase::GetDynamicCounters(ui32 nodeIndex) {
        TGuard<TMutex> guard(Mutex);
        Y_VERIFY(nodeIndex < NodeCount);
        ui32 nodeId = FirstNodeId + nodeIndex;
        TNodeDataBase* node = Nodes[nodeId].Get();
        return node->DynamicCounters;
    }

    void TTestActorRuntimeBase::SetupMonitoring() {
        NeedMonitoring = true;
    }

    void TTestActorRuntimeBase::SendInternal(IEventHandle* ev, ui32 nodeIndex, bool viaActorSystem) {
        Y_VERIFY(nodeIndex < NodeCount);
        ui32 nodeId = FirstNodeId + nodeIndex;
        TNodeDataBase* node = Nodes[nodeId].Get();
        ui32 targetNode = ev->GetRecipientRewrite().NodeId();
        ui32 targetNodeIndex;
        if (targetNode == 0) {
            targetNodeIndex = nodeIndex;
        } else {
            targetNodeIndex = targetNode - FirstNodeId;
            Y_VERIFY(targetNodeIndex < NodeCount);
        }

        if (viaActorSystem || UseRealThreads || ev->GetRecipientRewrite().IsService() || (targetNodeIndex != nodeIndex)) {
            node->ActorSystem->Send(ev);
            return;
        }

        Y_VERIFY(!ev->GetRecipientRewrite().IsService() && (targetNodeIndex == nodeIndex));
        TAutoPtr<IEventHandle> evHolder(ev);

        if (!AllowSendFrom(node, evHolder)) {
            return;
        }

        ui32 mailboxHint = ev->GetRecipientRewrite().Hint();
        TEventMailBox& mbox = GetMailbox(nodeId, mailboxHint);
        if (!mbox.IsActive(TInstant::MicroSeconds(CurrentTimestamp))) {
            mbox.PushFront(evHolder);
            return;
        }

        ui64 recipientLocalId = ev->GetRecipientRewrite().LocalId();
        if ((BlockedOutput.find(ev->Sender) == BlockedOutput.end()) && VERBOSE) {
            Cerr << "Send event, ";
            PrintEvent(evHolder, this);
        }

        EvCounters[ev->GetTypeRewrite()]++;

        TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
        IActor* recipientActor = mailbox->FindActor(recipientLocalId);
        if (recipientActor) {
            // Save actorId by value in order to prevent ctx from being invalidated during another Send call.
            TActorId actorId = ev->GetRecipientRewrite();
            node->ActorToActorId[recipientActor] = ev->GetRecipientRewrite();
            TActorContext ctx(*mailbox, *node->ExecutorThread, GetCycleCountFast(), actorId);
            TActivationContext *prevTlsActivationContext = TlsActivationContext;
            TlsActivationContext = &ctx;
            CurrentRecipient = actorId;
            {
                TInverseGuard<TMutex> inverseGuard(Mutex);
#ifdef USE_ACTOR_CALLSTACK
                TCallstack::GetTlsCallstack() = ev->Callstack;
                TCallstack::GetTlsCallstack().SetLinesToSkip();
#endif
                recipientActor->Receive(evHolder, ctx);
                node->ExecutorThread->DropUnregistered();
            }
            CurrentRecipient = TActorId();
            TlsActivationContext = prevTlsActivationContext;
        } else {
            if (VERBOSE) {
                Cerr << "Failed to find actor with local id: " << recipientLocalId << "\n";
            }

            auto forwardedEv = ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown);
            if (!!forwardedEv) {
                node->ActorSystem->Send(forwardedEv);
            }
        }
    }

    IActor* TTestActorRuntimeBase::FindActor(const TActorId& actorId, TNodeDataBase* node) const {
        ui32 mailboxHint = actorId.Hint();
        ui64 localId = actorId.LocalId();
        TMailboxHeader* mailbox = node->MailboxTable->Get(mailboxHint);
        IActor* actor = mailbox->FindActor(localId);
        return actor;
    }

    THolder<TActorSystemSetup> TTestActorRuntimeBase::MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node) {
        THolder<TActorSystemSetup> setup(new TActorSystemSetup);
        setup->NodeId = FirstNodeId + nodeIndex;

        if (UseRealThreads) {
            setup->ExecutorsCount = 5;
            setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
            setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
            setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
            setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
            setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
            setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
            setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
        } else {
            setup->ExecutorsCount = 1;
            setup->Scheduler.Reset(new TSchedulerThreadStub(this, node));
            setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
            setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
        }

        InitActorSystemSetup(*setup);

        return setup;
    }

    THolder<TActorSystem> TTestActorRuntimeBase::MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node) {
        auto setup = MakeActorSystemSetup(nodeIndex, node);

        node->ExecutorPools.resize(setup->ExecutorsCount);
        for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
            node->ExecutorPools[i] = setup->Executors[i].Get();
        }

        const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");

        setup->LocalServices = node->LocalServices;
        setup->Interconnect.ProxyActors.resize(FirstNodeId + NodeCount);
        const TActorId nameserviceId = GetNameserviceActorId();

        TIntrusivePtr<TInterconnectProxyCommon> common;
        common.Reset(new TInterconnectProxyCommon);
        common->NameserviceId = nameserviceId;
        common->MonCounters = interconnectCounters;
        common->TechnicalSelfHostName = "::1";

        if (!UseRealThreads) {
            common->Settings.DeadPeer = TDuration::Max();
            common->Settings.CloseOnIdle = TDuration::Max();
            common->Settings.PingPeriod = TDuration::Max();
            common->Settings.ForceConfirmPeriod = TDuration::Max();
            common->Settings.Handshake = TDuration::Max();
        }

        common->ClusterUUID = ClusterUUID;
        common->AcceptUUID = {ClusterUUID};

        for (ui32 proxyNodeIndex = 0; proxyNodeIndex < NodeCount; ++proxyNodeIndex) {
            if (proxyNodeIndex == nodeIndex)
                continue;

            const ui32 peerNodeId = FirstNodeId + proxyNodeIndex;

            IActor *proxyActor = UseRealInterconnect
                ? new TInterconnectProxyTCP(peerNodeId, common)
                : InterconnectMock.CreateProxyMock(setup->NodeId, peerNodeId, common);

            setup->Interconnect.ProxyActors[peerNodeId] = {proxyActor, TMailboxType::ReadAsFilled, InterconnectPoolId()};
        }

        setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);

        if (UseRealInterconnect) {
            setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
                NActors::TMailboxType::Simple, InterconnectPoolId()));
        }

        if (!SingleSysEnv) { // Single system env should do this self
            TAutoPtr<TLogBackend> logBackend = LogBackend ? LogBackend : NActors::CreateStderrBackend();
            NActors::TLoggerActor *loggerActor = new NActors::TLoggerActor(node->LogSettings,
                logBackend, GetCountersForComponent(node->DynamicCounters, "utils"));
            NActors::TActorSetupCmd loggerActorCmd(loggerActor, NActors::TMailboxType::Simple, node->GetLoggerPoolId());
            std::pair<NActors::TActorId, NActors::TActorSetupCmd> loggerActorPair(node->LogSettings->LoggerActorId, loggerActorCmd);
            setup->LocalServices.push_back(loggerActorPair);
        }

        return THolder<TActorSystem>(new TActorSystem(setup, node->GetAppData(), node->LogSettings));
    }

    TActorSystem* TTestActorRuntimeBase::SingleSys() const {
        Y_VERIFY(Nodes.size() == 1, "Works only for single system env");

        return Nodes.begin()->second->ActorSystem.Get();
    }

    TActorSystem* TTestActorRuntimeBase::GetAnyNodeActorSystem() {
        for (auto& x : Nodes) {
            return x.second->ActorSystem.Get();
        }
        Y_FAIL("Don't use this method.");
    }

    TActorSystem* TTestActorRuntimeBase::GetActorSystem(ui32 nodeId) {
        auto it = Nodes.find(GetNodeId(nodeId));
        Y_VERIFY(it != Nodes.end());
        return it->second->ActorSystem.Get();
    }


    TEventMailBox& TTestActorRuntimeBase::GetMailbox(ui32 nodeId, ui32 hint) {
        TGuard<TMutex> guard(Mutex);
        auto mboxId = TEventMailboxId(nodeId, hint);
        auto it = Mailboxes.find(mboxId);
        if (it == Mailboxes.end()) {
            it = Mailboxes.insert(std::make_pair(mboxId, new TEventMailBox())).first;
        }

        return *it->second;
    }

    void TTestActorRuntimeBase::ClearMailbox(ui32 nodeId, ui32 hint) {
        TGuard<TMutex> guard(Mutex);
        auto mboxId = TEventMailboxId(nodeId, hint);
        Mailboxes.erase(mboxId);
    }

    TString TTestActorRuntimeBase::GetActorName(const TActorId& actorId) const {
        auto it = ActorNames.find(actorId);
        if (it != ActorNames.end())
            return it->second;
        return actorId.ToString();
    }

    struct TStrandingActorDecoratorContext : public TThrRefBase {
        TStrandingActorDecoratorContext()
            : Queue(new TQueueType)
        {
        }

        typedef TOneOneQueueInplace<IEventHandle*, 32> TQueueType;
        TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
    };

    class TStrandingActorDecorator : public TActorBootstrapped<TStrandingActorDecorator> {
    public:
        class TReplyActor : public TActor<TReplyActor> {
        public:
            static constexpr EActivityType ActorActivityType() {
                return TEST_ACTOR_RUNTIME;
            }

            TReplyActor(TStrandingActorDecorator* owner)
                : TActor(&TReplyActor::StateFunc)
                , Owner(owner)
            {
            }

            STFUNC(StateFunc);

        private:
            TStrandingActorDecorator* const Owner;
        };

        static constexpr EActivityType ActorActivityType() {
            return TEST_ACTOR_RUNTIME;
        }

        TStrandingActorDecorator(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors,
            TSimpleSharedPtr<TStrandingActorDecoratorContext> context, TTestActorRuntimeBase* runtime,
            TReplyCheckerCreator createReplyChecker)
            : Delegatee(delegatee)
            , IsSync(isSync)
            , AdditionalActors(additionalActors)
            , Context(context)
            , HasReply(false)
            , Runtime(runtime)
            , ReplyChecker(createReplyChecker())
        {
            if (IsSync) {
                Y_VERIFY(!runtime->IsRealThreads());
            }
        }

        void Bootstrap(const TActorContext& ctx) {
            Become(&TStrandingActorDecorator::StateFunc);
            ReplyId = ctx.RegisterWithSameMailbox(new TReplyActor(this));
            DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(Delegatee.NodeId(), Delegatee.Hint()));
            for (const auto& actor : AdditionalActors) {
                DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(actor.NodeId(), actor.Hint()));
            }

            DelegateeOptions.OnlyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
            DelegateeOptions.NonEmptyMailboxes.push_back(TEventMailboxId(ReplyId.NodeId(), ReplyId.Hint()));
            DelegateeOptions.Quiet = true;
        }

        STFUNC(StateFunc) {
            bool wasEmpty = !Context->Queue->Head();
            Context->Queue->Push(ev.Release());
            if (wasEmpty) {
                SendHead(ctx);
            }
        }

        STFUNC(Reply) {
            Y_VERIFY(!HasReply);
            IEventHandle *requestEv = Context->Queue->Head();
            TActorId originalSender = requestEv->Sender;
            HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get());
            if (HasReply) {
                delete Context->Queue->Pop();
            }
            ctx.ExecutorThread.Send(ev->Forward(originalSender));
            if (!IsSync && Context->Queue->Head()) {
                SendHead(ctx);
            }
        }

    private:
        void SendHead(const TActorContext& ctx) {
            if (!IsSync) {
                ctx.ExecutorThread.Send(GetForwardedEvent().Release());
            } else {
                while (Context->Queue->Head()) {
                    HasReply = false;
                    ctx.ExecutorThread.Send(GetForwardedEvent().Release());
                    int count = 100;
                    while (!HasReply && count > 0) {
                        try {
                            Runtime->DispatchEvents(DelegateeOptions);
                        } catch (TEmptyEventQueueException&) {
                            count--;
                            Cerr << "No reply" << Endl;
                        }
                    }

                    Runtime->UpdateCurrentTime(Runtime->GetCurrentTime() + TDuration::MicroSeconds(1000));
                }
            }
        }

        TAutoPtr<IEventHandle> GetForwardedEvent() {
            IEventHandle* ev = Context->Queue->Head();
            ReplyChecker->OnRequest(ev);
            TAutoPtr<IEventHandle> forwardedEv = ev->HasEvent()
                ? new IEventHandle(Delegatee, ReplyId, ev->ReleaseBase().Release(), ev->Flags, ev->Cookie)
                : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie);

            return forwardedEv;
        }
    private:
        const TActorId Delegatee;
        const bool IsSync;
        const TVector<TActorId> AdditionalActors;
        TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
        TActorId ReplyId;
        bool HasReply;
        TDispatchOptions DelegateeOptions;
        TTestActorRuntimeBase* Runtime;
        THolder<IReplyChecker> ReplyChecker;
    };

    void TStrandingActorDecorator::TReplyActor::StateFunc(STFUNC_SIG) {
        Owner->Reply(ev, ctx);
    }

    class TStrandingDecoratorFactory : public IStrandingDecoratorFactory {
    public:
        TStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
            TReplyCheckerCreator createReplyChecker)
            : Context(new TStrandingActorDecoratorContext())
            , Runtime(runtime)
            , CreateReplyChecker(createReplyChecker)
        {
        }

        IActor* Wrap(const TActorId& delegatee, bool isSync, const TVector<TActorId>& additionalActors) override {
            return new TStrandingActorDecorator(delegatee, isSync, additionalActors, Context, Runtime,
                CreateReplyChecker);
        }

    private:
        TSimpleSharedPtr<TStrandingActorDecoratorContext> Context;
        TTestActorRuntimeBase* Runtime;
        TReplyCheckerCreator CreateReplyChecker;
    };

    TAutoPtr<IStrandingDecoratorFactory> CreateStrandingDecoratorFactory(TTestActorRuntimeBase* runtime,
            TReplyCheckerCreator createReplyChecker) {
        return TAutoPtr<IStrandingDecoratorFactory>(new TStrandingDecoratorFactory(runtime, createReplyChecker));
    }

    ui64 DefaultRandomSeed = 9999;
}