http_proxy_acceptor.cpp 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #include <util/network/sock.h>
  2. #include "http_proxy.h"
  3. #include "http_proxy_ssl.h"
  4. namespace NHttp {
  5. class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
  6. public:
  7. using TBase = NActors::TActor<TAcceptorActor>;
  8. const TActorId Owner;
  9. const TActorId Poller;
  10. TIntrusivePtr<TSocketDescriptor> Socket;
  11. NActors::TPollerToken::TPtr PollerToken;
  12. THashSet<TActorId> Connections;
  13. TDeque<THttpIncomingRequestPtr> RecycledRequests;
  14. std::shared_ptr<TPrivateEndpointInfo> Endpoint;
  15. TAcceptorActor(const TActorId& owner, const TActorId& poller)
  16. : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
  17. , Owner(owner)
  18. , Poller(poller)
  19. {
  20. }
  21. static constexpr char ActorName[] = "HTTP_ACCEPTOR_ACTOR";
  22. protected:
  23. STFUNC(StateListening) {
  24. switch (ev->GetTypeRewrite()) {
  25. HFunc(NActors::TEvPollerRegisterResult, Handle);
  26. HFunc(NActors::TEvPollerReady, Handle);
  27. HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
  28. HFunc(TEvHttpProxy::TEvReportSensors, Handle);
  29. }
  30. }
  31. STFUNC(StateInit) {
  32. switch (ev->GetTypeRewrite()) {
  33. HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit);
  34. }
  35. }
  36. void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
  37. TString address = event->Get()->Address;
  38. ui16 port = event->Get()->Port;
  39. Socket = new TSocketDescriptor(SocketType::GuessAddressFamily(address));
  40. // for unit tests :(
  41. SetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true);
  42. #ifdef SO_REUSEPORT
  43. SetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true);
  44. #endif
  45. SocketAddressType bindAddress(Socket->Socket.MakeAddress(address, port));
  46. Endpoint = std::make_shared<TPrivateEndpointInfo>(event->Get()->CompressContentTypes);
  47. Endpoint->Owner = ctx.SelfID;
  48. Endpoint->Proxy = Owner;
  49. Endpoint->WorkerName = event->Get()->WorkerName;
  50. Endpoint->Secure = event->Get()->Secure;
  51. int err = 0;
  52. if (Endpoint->Secure) {
  53. if (!event->Get()->SslCertificatePem.empty()) {
  54. Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
  55. } else {
  56. Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
  57. }
  58. if (Endpoint->SecureContext == nullptr) {
  59. err = -1;
  60. LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
  61. }
  62. }
  63. if (err == 0) {
  64. err = Socket->Socket.Bind(bindAddress.get());
  65. if (err != 0) {
  66. LOG_WARN_S(
  67. ctx,
  68. HttpLog,
  69. "Failed to bind " << bindAddress->ToString()
  70. << ", code: " << err);
  71. }
  72. }
  73. TStringBuf schema = Endpoint->Secure ? "https://" : "http://";
  74. if (err == 0) {
  75. err = Socket->Socket.Listen(LISTEN_QUEUE);
  76. if (err == 0) {
  77. LOG_INFO_S(ctx, HttpLog, "Listening on " << schema << bindAddress->ToString());
  78. SetNonBlock(Socket->Socket);
  79. ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
  80. TBase::Become(&TAcceptorActor::StateListening);
  81. ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie);
  82. return;
  83. } else {
  84. LOG_WARN_S(
  85. ctx,
  86. HttpLog,
  87. "Failed to listen on " << schema << bindAddress->ToString()
  88. << ", code: " << err);
  89. }
  90. }
  91. LOG_WARN_S(ctx, HttpLog, "Failed to init - retrying...");
  92. ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
  93. }
  94. void Die(const NActors::TActorContext& ctx) override {
  95. ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
  96. for (const NActors::TActorId& connection : Connections) {
  97. ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
  98. }
  99. }
  100. void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) {
  101. PollerToken = std::move(ev->Get()->PollerToken);
  102. PollerToken->Request(true, false); // request read polling
  103. }
  104. void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
  105. for (;;) {
  106. SocketAddressType addr;
  107. std::optional<SocketType> s = Socket->Socket.Accept(addr);
  108. if (!s) {
  109. break;
  110. }
  111. TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(std::move(s).value());
  112. NActors::IActor* connectionSocket = nullptr;
  113. if (RecycledRequests.empty()) {
  114. connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr);
  115. } else {
  116. connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
  117. RecycledRequests.pop_front();
  118. }
  119. NActors::TActorId connectionId = ctx.Register(connectionSocket);
  120. ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
  121. Connections.emplace(connectionId);
  122. }
  123. int err = errno;
  124. if (err == EAGAIN || err == EWOULDBLOCK) { // request poller for further connection polling
  125. Y_VERIFY(PollerToken);
  126. PollerToken->Request(true, false);
  127. }
  128. }
  129. void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
  130. Connections.erase(event->Get()->ConnectionID);
  131. for (auto& req : event->Get()->RecycledRequests) {
  132. req->Clear();
  133. RecycledRequests.push_back(std::move(req));
  134. }
  135. }
  136. void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) {
  137. ctx.Send(event->Forward(Owner));
  138. }
  139. };
  140. NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
  141. return new TAcceptorActor(owner, poller);
  142. }
  143. }