#include "acceptor.h" #include "key_value_printer.h" #include "mb_lwtrace.h" #include "network.h" #include #include #include #include LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr) : TActor(session->Queue->WorkQueue.Get()) , AcceptorId(acceptorId) , Session(session) , GranStatus(session->Config.Secret.StatusFlushPeriod) { SetNonBlock(socket, true); Channel = Session->ReadEventLoop.Register(socket, this); Channel->EnableRead(); Stats.AcceptorId = acceptorId; Stats.Fd = socket; Stats.ListenAddr = addr; SendStatus(TInstant::Now()); } void TAcceptor::Act(TDefaultTag) { EShutdownState state = ShutdownState.State.Get(); if (state == SS_SHUTDOWN_COMPLETE) { return; } TInstant now = TInstant::Now(); if (state == SS_SHUTDOWN_COMMAND) { if (!!Channel) { Channel->Unregister(); Channel.Drop(); Stats.Fd = INVALID_SOCKET; } SendStatus(now); Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats); Stats.ResetIncremental(); ShutdownState.CompleteShutdown(); return; } THolder addr(new TOpaqueAddr()); SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr()); int acceptErrno = LastSystemError(); if (acceptedSocket == INVALID_SOCKET) { if (LastSystemError() != EWOULDBLOCK) { Stats.LastAcceptErrorErrno = acceptErrno; Stats.LastAcceptErrorInstant = now; ++Stats.AcceptErrorCount; } } else { TSocketHolder s(acceptedSocket); try { SetKeepAlive(s, true); SetNoDelay(s, Session->Config.TcpNoDelay); SetSockOptTcpCork(s, Session->Config.TcpCork); SetCloseOnExec(s, true); SetNonBlock(s, true); if (Session->Config.SocketToS >= 0) { SetSocketToS(s, addr.Get(), Session->Config.SocketToS); } } catch (...) { // It means that connection was reset just now // TODO: do something better goto skipAccept; } { TOnAccept onAccept; onAccept.s = s.Release(); onAccept.addr = TNetAddr(addr.Release()); onAccept.now = now; LWPROBE(Accepted, ToString(onAccept.addr)); Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept); Stats.LastAcceptSuccessInstant = now; ++Stats.AcceptSuccessCount; } skipAccept:; } Channel->EnableRead(); SendStatus(now); } void TAcceptor::SendStatus(TInstant now) { GranStatus.Listen.Update(Stats, now); } void TAcceptor::HandleEvent(SOCKET socket, void* cookie) { Y_UNUSED(socket); Y_UNUSED(cookie); GetActor()->Schedule(); } void TAcceptor::Shutdown() { ShutdownState.ShutdownCommand(); GetActor()->Schedule(); ShutdownState.ShutdownComplete.WaitI(); }