interconnect_tcp_server.cpp 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. #include "interconnect_tcp_server.h"
  2. #include "interconnect_handshake.h"
  3. #include <library/cpp/actors/core/log.h>
  4. #include <library/cpp/actors/protos/services_common.pb.h>
  5. #include "interconnect_common.h"
  6. namespace NActors {
  7. TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket)
  8. : TActor(&TThis::Initial)
  9. , TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data()))
  10. , Address(address.c_str(), port)
  11. , Listener(
  12. socket
  13. ? new NInterconnect::TStreamSocket(*socket)
  14. : nullptr)
  15. , ExternalSocket(!!Listener)
  16. , ProxyCommonCtx(std::move(common))
  17. {
  18. if (ExternalSocket) {
  19. SetNonBlock(*Listener);
  20. }
  21. }
  22. TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) {
  23. return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0);
  24. }
  25. void TInterconnectListenerTCP::Die(const TActorContext& ctx) {
  26. LOG_DEBUG_IC("ICL08", "Dying");
  27. TActor::Die(ctx);
  28. }
  29. int TInterconnectListenerTCP::Bind() {
  30. NInterconnect::TAddress addr = Address;
  31. if (ProxyCommonCtx->Settings.BindOnAllAddresses) {
  32. switch (addr.GetFamily()) {
  33. case AF_INET: {
  34. auto *sa = reinterpret_cast<sockaddr_in*>(addr.SockAddr());
  35. sa->sin_addr = {INADDR_ANY};
  36. break;
  37. }
  38. case AF_INET6: {
  39. auto *sa = reinterpret_cast<sockaddr_in6*>(addr.SockAddr());
  40. sa->sin6_addr = in6addr_any;
  41. break;
  42. }
  43. default:
  44. Y_FAIL("Unsupported address family");
  45. }
  46. }
  47. Listener = NInterconnect::TStreamSocket::Make(addr.GetFamily());
  48. if (*Listener == -1) {
  49. return errno;
  50. }
  51. SetNonBlock(*Listener);
  52. Listener->SetSendBufferSize(ProxyCommonCtx->Settings.GetSendBufferSize()); // TODO(alexvru): WTF?
  53. SetSockOpt(*Listener, SOL_SOCKET, SO_REUSEADDR, 1);
  54. if (const auto e = -Listener->Bind(addr)) {
  55. return e;
  56. } else if (const auto e = -Listener->Listen(SOMAXCONN)) {
  57. return e;
  58. } else {
  59. return 0;
  60. }
  61. }
  62. void TInterconnectListenerTCP::Bootstrap(const TActorContext& ctx) {
  63. if (!Listener) {
  64. if (const int err = Bind()) {
  65. LOG_ERROR_IC("ICL01", "Bind failed: %s (%s)", strerror(err), Address.ToString().data());
  66. Listener.Reset();
  67. Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap);
  68. return;
  69. }
  70. }
  71. if (const auto& callback = ProxyCommonCtx->InitWhiteboard) {
  72. callback(Address.GetPort(), TlsActivationContext->ExecutorThread.ActorSystem);
  73. }
  74. const bool success = ctx.Send(MakePollerActorId(), new TEvPollerRegister(Listener, SelfId(), {}));
  75. Y_VERIFY(success);
  76. Become(&TThis::Listen);
  77. }
  78. void TInterconnectListenerTCP::Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx) {
  79. PollerToken = std::move(ev->Get()->PollerToken);
  80. Process(ctx);
  81. }
  82. void TInterconnectListenerTCP::Process(const TActorContext& ctx) {
  83. for (;;) {
  84. NInterconnect::TAddress address;
  85. const int r = Listener->Accept(address);
  86. if (r >= 0) {
  87. LOG_DEBUG_IC("ICL04", "Accepted from: %s", address.ToString().data());
  88. auto socket = MakeIntrusive<NInterconnect::TStreamSocket>(static_cast<SOCKET>(r));
  89. ctx.Register(CreateIncomingHandshakeActor(ProxyCommonCtx, std::move(socket)));
  90. continue;
  91. } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
  92. Y_VERIFY(-r != ENFILE && -r != EMFILE && !ExternalSocket);
  93. LOG_ERROR_IC("ICL06", "Listen failed: %s (%s)", strerror(-r), Address.ToString().data());
  94. Listener.Reset();
  95. PollerToken.Reset();
  96. Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap);
  97. } else if (PollerToken) {
  98. PollerToken->Request(true, false);
  99. }
  100. break;
  101. }
  102. }
  103. }