123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- #include "acceptor.h"
- #include "key_value_printer.h"
- #include "mb_lwtrace.h"
- #include "network.h"
- #include <util/network/init.h>
- #include <util/system/defaults.h>
- #include <util/system/error.h>
- #include <util/system/yassert.h>
- LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
- using namespace NActor;
- using namespace NBus;
- using namespace NBus::NPrivate;
- TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
- : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
- , AcceptorId(acceptorId)
- , Session(session)
- , GranStatus(session->Config.Secret.StatusFlushPeriod)
- {
- SetNonBlock(socket, true);
- Channel = Session->ReadEventLoop.Register(socket, this);
- Channel->EnableRead();
- Stats.AcceptorId = acceptorId;
- Stats.Fd = socket;
- Stats.ListenAddr = addr;
- SendStatus(TInstant::Now());
- }
- void TAcceptor::Act(TDefaultTag) {
- EShutdownState state = ShutdownState.State.Get();
- if (state == SS_SHUTDOWN_COMPLETE) {
- return;
- }
- TInstant now = TInstant::Now();
- if (state == SS_SHUTDOWN_COMMAND) {
- if (!!Channel) {
- Channel->Unregister();
- Channel.Drop();
- Stats.Fd = INVALID_SOCKET;
- }
- SendStatus(now);
- Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
- Stats.ResetIncremental();
- ShutdownState.CompleteShutdown();
- return;
- }
- THolder<TOpaqueAddr> addr(new TOpaqueAddr());
- SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());
- int acceptErrno = LastSystemError();
- if (acceptedSocket == INVALID_SOCKET) {
- if (LastSystemError() != EWOULDBLOCK) {
- Stats.LastAcceptErrorErrno = acceptErrno;
- Stats.LastAcceptErrorInstant = now;
- ++Stats.AcceptErrorCount;
- }
- } else {
- TSocketHolder s(acceptedSocket);
- try {
- SetKeepAlive(s, true);
- SetNoDelay(s, Session->Config.TcpNoDelay);
- SetSockOptTcpCork(s, Session->Config.TcpCork);
- SetCloseOnExec(s, true);
- SetNonBlock(s, true);
- if (Session->Config.SocketToS >= 0) {
- SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
- }
- } catch (...) {
- // It means that connection was reset just now
- // TODO: do something better
- goto skipAccept;
- }
- {
- TOnAccept onAccept;
- onAccept.s = s.Release();
- onAccept.addr = TNetAddr(addr.Release());
- onAccept.now = now;
- LWPROBE(Accepted, ToString(onAccept.addr));
- Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
- Stats.LastAcceptSuccessInstant = now;
- ++Stats.AcceptSuccessCount;
- }
- skipAccept:;
- }
- Channel->EnableRead();
- SendStatus(now);
- }
- void TAcceptor::SendStatus(TInstant now) {
- GranStatus.Listen.Update(Stats, now);
- }
- void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
- Y_UNUSED(socket);
- Y_UNUSED(cookie);
- GetActor()->Schedule();
- }
- void TAcceptor::Shutdown() {
- ShutdownState.ShutdownCommand();
- GetActor()->Schedule();
- ShutdownState.ShutdownComplete.WaitI();
- }
|