#include "poller_actor.h" #include "interconnect_common.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/probes.h> #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/util/funnel_queue.h> #include <util/generic/intrlist.h> #include <util/system/thread.h> #include <util/system/event.h> #include <util/system/pipe.h> #include <variant> namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); namespace { int LastSocketError() { #if defined(_win_) return WSAGetLastError(); #else return errno; #endif } } struct TSocketRecord : TThrRefBase { const TIntrusivePtr<TSharedDescriptor> Socket; const TActorId ReadActorId; const TActorId WriteActorId; std::atomic_uint32_t Flags = 0; TSocketRecord(TEvPollerRegister& ev) : Socket(std::move(ev.Socket)) , ReadActorId(ev.ReadActorId) , WriteActorId(ev.WriteActorId) {} }; template<typename TDerived> class TPollerThreadBase : public ISimpleThread { protected: struct TPollerExitThread {}; // issued then we need to terminate the poller thread struct TPollerWakeup {}; struct TPollerUnregisterSocket { TIntrusivePtr<TSharedDescriptor> Socket; TPollerUnregisterSocket(TIntrusivePtr<TSharedDescriptor> socket) : Socket(std::move(socket)) {} }; using TPollerSyncOperation = std::variant<TPollerExitThread, TPollerWakeup, TPollerUnregisterSocket>; struct TPollerSyncOperationWrapper { TPollerSyncOperation Operation; TManualEvent Event; TPollerSyncOperationWrapper(TPollerSyncOperation&& operation) : Operation(std::move(operation)) {} void Wait() { Event.WaitI(); } void SignalDone() { Event.Signal(); } }; TActorSystem *ActorSystem; TPipeHandle ReadEnd, WriteEnd; // pipe for sync event processor TFunnelQueue<TPollerSyncOperationWrapper*> SyncOperationsQ; // operation queue public: TPollerThreadBase(TActorSystem *actorSystem) : ActorSystem(actorSystem) { // create a pipe for notifications try { TPipeHandle::Pipe(ReadEnd, WriteEnd, CloseOnExec); } catch (const TFileError& err) { Y_FAIL("failed to create pipe"); } // switch the read/write ends to nonblocking mode SetNonBlock(ReadEnd); SetNonBlock(WriteEnd); } void UnregisterSocket(const TIntrusivePtr<TSocketRecord>& record) { ExecuteSyncOperation(TPollerUnregisterSocket(record->Socket)); } protected: void Notify(TSocketRecord *record, bool read, bool write) { auto issue = [&](const TActorId& recipient) { ActorSystem->Send(new IEventHandle(recipient, {}, new TEvPollerReady(record->Socket, read, write))); }; if (read && record->ReadActorId) { issue(record->ReadActorId); if (write && record->WriteActorId && record->WriteActorId != record->ReadActorId) { issue(record->WriteActorId); } } else if (write && record->WriteActorId) { issue(record->WriteActorId); } } void Stop() { // signal poller thread to stop and wait for the thread ExecuteSyncOperation(TPollerExitThread()); ISimpleThread::Join(); } void ExecuteSyncOperation(TPollerSyncOperation&& op) { TPollerSyncOperationWrapper wrapper(std::move(op)); if (SyncOperationsQ.Push(&wrapper)) { // this was the first entry, so we push notification through the pipe for (;;) { char buffer = '\x00'; ssize_t nwritten = WriteEnd.Write(&buffer, sizeof(buffer)); if (nwritten < 0) { const int err = LastSocketError(); if (err == EINTR) { continue; } else { Y_FAIL("WriteEnd.Write() failed with %s", strerror(err)); } } else { Y_VERIFY(nwritten); break; } } } // wait for operation to complete wrapper.Wait(); } bool DrainReadEnd() { size_t totalRead = 0; char buffer[4096]; for (;;) { ssize_t n = ReadEnd.Read(buffer, sizeof(buffer)); if (n < 0) { const int error = LastSocketError(); if (error == EINTR) { continue; } else if (error == EAGAIN || error == EWOULDBLOCK) { break; } else { Y_FAIL("read() failed with %s", strerror(errno)); } } else { Y_VERIFY(n); totalRead += n; } } return totalRead; } bool ProcessSyncOpQueue() { if (DrainReadEnd()) { Y_VERIFY(!SyncOperationsQ.IsEmpty()); do { TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); op->SignalDone(); } else if (std::get_if<TPollerExitThread>(&op->Operation)) { op->SignalDone(); return false; // terminate the thread } else if (std::get_if<TPollerWakeup>(&op->Operation)) { op->SignalDone(); } else { Y_FAIL(); } } while (SyncOperationsQ.Pop()); } return true; } void *ThreadProc() override { SetCurrentThreadName("network poller"); while (ProcessSyncOpQueue()) { static_cast<TDerived&>(*this).ProcessEventsInLoop(); } return nullptr; } }; } // namespace NActors #if defined(_linux_) # include "poller_actor_linux.h" #elif defined(_darwin_) # include "poller_actor_darwin.h" #elif defined(_win_) # include "poller_actor_win.h" #else # error "Unsupported platform" #endif namespace NActors { class TPollerToken::TImpl { std::weak_ptr<TPollerThread> Thread; TIntrusivePtr<TSocketRecord> Record; // valid only when Thread is held locked public: TImpl(std::shared_ptr<TPollerThread> thread, TIntrusivePtr<TSocketRecord> record) : Thread(thread) , Record(std::move(record)) { thread->RegisterSocket(Record); } ~TImpl() { if (auto thread = Thread.lock()) { thread->UnregisterSocket(Record); } } void Request(bool read, bool write) { if (auto thread = Thread.lock()) { thread->Request(Record, read, write); } } const TIntrusivePtr<TSharedDescriptor>& Socket() const { return Record->Socket; } }; class TPollerActor: public TActorBootstrapped<TPollerActor> { // poller thread std::shared_ptr<TPollerThread> PollerThread; public: static constexpr IActor::EActivityType ActorActivityType() { return IActor::INTERCONNECT_POLLER; } void Bootstrap() { PollerThread = std::make_shared<TPollerThread>(TlsActivationContext->ExecutorThread.ActorSystem); Become(&TPollerActor::StateFunc); } STRICT_STFUNC(StateFunc, hFunc(TEvPollerRegister, Handle); cFunc(TEvents::TSystem::Poison, PassAway); ) void Handle(TEvPollerRegister::TPtr& ev) { auto *msg = ev->Get(); auto impl = std::make_unique<TPollerToken::TImpl>(PollerThread, MakeIntrusive<TSocketRecord>(*msg)); auto socket = impl->Socket(); TPollerToken::TPtr token(new TPollerToken(std::move(impl))); if (msg->ReadActorId && msg->WriteActorId && msg->WriteActorId != msg->ReadActorId) { Send(msg->ReadActorId, new TEvPollerRegisterResult(socket, token)); Send(msg->WriteActorId, new TEvPollerRegisterResult(socket, std::move(token))); } else if (msg->ReadActorId) { Send(msg->ReadActorId, new TEvPollerRegisterResult(socket, std::move(token))); } else if (msg->WriteActorId) { Send(msg->WriteActorId, new TEvPollerRegisterResult(socket, std::move(token))); } } }; TPollerToken::TPollerToken(std::unique_ptr<TImpl> impl) : Impl(std::move(impl)) {} TPollerToken::~TPollerToken() {} void TPollerToken::Request(bool read, bool write) { Impl->Request(read, write); } IActor* CreatePollerActor() { return new TPollerActor; } }