acceptor.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #include "acceptor.h"
  2. #include "key_value_printer.h"
  3. #include "mb_lwtrace.h"
  4. #include "network.h"
  5. #include <util/network/init.h>
  6. #include <util/system/defaults.h>
  7. #include <util/system/error.h>
  8. #include <util/system/yassert.h>
  9. LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
  10. using namespace NActor;
  11. using namespace NBus;
  12. using namespace NBus::NPrivate;
  13. TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
  14. : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
  15. , AcceptorId(acceptorId)
  16. , Session(session)
  17. , GranStatus(session->Config.Secret.StatusFlushPeriod)
  18. {
  19. SetNonBlock(socket, true);
  20. Channel = Session->ReadEventLoop.Register(socket, this);
  21. Channel->EnableRead();
  22. Stats.AcceptorId = acceptorId;
  23. Stats.Fd = socket;
  24. Stats.ListenAddr = addr;
  25. SendStatus(TInstant::Now());
  26. }
  27. void TAcceptor::Act(TDefaultTag) {
  28. EShutdownState state = ShutdownState.State.Get();
  29. if (state == SS_SHUTDOWN_COMPLETE) {
  30. return;
  31. }
  32. TInstant now = TInstant::Now();
  33. if (state == SS_SHUTDOWN_COMMAND) {
  34. if (!!Channel) {
  35. Channel->Unregister();
  36. Channel.Drop();
  37. Stats.Fd = INVALID_SOCKET;
  38. }
  39. SendStatus(now);
  40. Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
  41. Stats.ResetIncremental();
  42. ShutdownState.CompleteShutdown();
  43. return;
  44. }
  45. THolder<TOpaqueAddr> addr(new TOpaqueAddr());
  46. SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());
  47. int acceptErrno = LastSystemError();
  48. if (acceptedSocket == INVALID_SOCKET) {
  49. if (LastSystemError() != EWOULDBLOCK) {
  50. Stats.LastAcceptErrorErrno = acceptErrno;
  51. Stats.LastAcceptErrorInstant = now;
  52. ++Stats.AcceptErrorCount;
  53. }
  54. } else {
  55. TSocketHolder s(acceptedSocket);
  56. try {
  57. SetKeepAlive(s, true);
  58. SetNoDelay(s, Session->Config.TcpNoDelay);
  59. SetSockOptTcpCork(s, Session->Config.TcpCork);
  60. SetCloseOnExec(s, true);
  61. SetNonBlock(s, true);
  62. if (Session->Config.SocketToS >= 0) {
  63. SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
  64. }
  65. } catch (...) {
  66. // It means that connection was reset just now
  67. // TODO: do something better
  68. goto skipAccept;
  69. }
  70. {
  71. TOnAccept onAccept;
  72. onAccept.s = s.Release();
  73. onAccept.addr = TNetAddr(addr.Release());
  74. onAccept.now = now;
  75. LWPROBE(Accepted, ToString(onAccept.addr));
  76. Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
  77. Stats.LastAcceptSuccessInstant = now;
  78. ++Stats.AcceptSuccessCount;
  79. }
  80. skipAccept:;
  81. }
  82. Channel->EnableRead();
  83. SendStatus(now);
  84. }
  85. void TAcceptor::SendStatus(TInstant now) {
  86. GranStatus.Listen.Update(Stats, now);
  87. }
  88. void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
  89. Y_UNUSED(socket);
  90. Y_UNUSED(cookie);
  91. GetActor()->Schedule();
  92. }
  93. void TAcceptor::Shutdown() {
  94. ShutdownState.ShutdownCommand();
  95. GetActor()->Schedule();
  96. ShutdownState.ShutdownComplete.WaitI();
  97. }