http_proxy_incoming.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. #include "http_proxy.h"
  2. #include "http_proxy_sock_impl.h"
  3. namespace NHttp {
  4. using namespace NActors;
  5. template <typename TSocketImpl>
  6. class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
  7. public:
  8. using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>;
  9. static constexpr bool RecycleRequests = true;
  10. const TEndpointInfo& Endpoint;
  11. SocketAddressType Address;
  12. TList<THttpIncomingRequestPtr> Requests;
  13. THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses;
  14. THttpIncomingRequestPtr CurrentRequest;
  15. THttpOutgoingResponsePtr CurrentResponse;
  16. TDeque<THttpIncomingRequestPtr> RecycledRequests;
  17. THPTimer InactivityTimer;
  18. static constexpr TDuration InactivityTimeout = TDuration::Minutes(2);
  19. TEvPollerReady* InactivityEvent = nullptr;
  20. TPollerToken::TPtr PollerToken;
  21. TIncomingConnectionActor(
  22. const TEndpointInfo& endpoint,
  23. TIntrusivePtr<TSocketDescriptor> socket,
  24. SocketAddressType address,
  25. THttpIncomingRequestPtr recycledRequest = nullptr)
  26. : TBase(&TIncomingConnectionActor::StateAccepting)
  27. , TSocketImpl(std::move(socket))
  28. , Endpoint(endpoint)
  29. , Address(address)
  30. {
  31. if (recycledRequest != nullptr) {
  32. RecycledRequests.emplace_back(std::move(recycledRequest));
  33. }
  34. TSocketImpl::SetNonBlock();
  35. }
  36. void CleanupRequest(THttpIncomingRequestPtr& request) {
  37. if (RecycleRequests) {
  38. request->Clear();
  39. RecycledRequests.push_back(std::move(request));
  40. } else {
  41. request = nullptr;
  42. }
  43. }
  44. void CleanupResponse(THttpOutgoingResponsePtr& response) {
  45. CleanupRequest(response->Request);
  46. // TODO: maybe recycle too?
  47. response = nullptr;
  48. }
  49. TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
  50. return new IEventHandle(self, parent, new TEvents::TEvBootstrap());
  51. }
  52. void Die(const TActorContext& ctx) override {
  53. ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
  54. TSocketImpl::Shutdown();
  55. TBase::Die(ctx);
  56. }
  57. protected:
  58. void Bootstrap(const TActorContext& ctx) {
  59. InactivityTimer.Reset();
  60. ctx.Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
  61. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") incoming connection opened");
  62. OnAccept(ctx);
  63. }
  64. void OnAccept(const NActors::TActorContext& ctx) {
  65. int res;
  66. bool read = false, write = false;
  67. if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) {
  68. if (-res == EAGAIN) {
  69. if (PollerToken) {
  70. PollerToken->Request(read, write);
  71. }
  72. return; // wait for further notifications
  73. } else {
  74. LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res));
  75. return Die(ctx);
  76. }
  77. }
  78. TBase::Become(&TIncomingConnectionActor::StateConnected);
  79. ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true));
  80. }
  81. void HandleAccepting(TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
  82. PollerToken = std::move(ev->Get()->PollerToken);
  83. OnAccept(ctx);
  84. }
  85. void HandleAccepting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
  86. OnAccept(ctx);
  87. }
  88. void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {
  89. if (event->Get()->Read) {
  90. for (;;) {
  91. if (CurrentRequest == nullptr) {
  92. if (RecycleRequests && !RecycledRequests.empty()) {
  93. CurrentRequest = std::move(RecycledRequests.front());
  94. RecycledRequests.pop_front();
  95. } else {
  96. CurrentRequest = new THttpIncomingRequest();
  97. }
  98. CurrentRequest->Address = Address;
  99. CurrentRequest->WorkerName = Endpoint.WorkerName;
  100. CurrentRequest->Secure = Endpoint.Secure;
  101. }
  102. if (!CurrentRequest->EnsureEnoughSpaceAvailable()) {
  103. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available");
  104. return Die(ctx);
  105. }
  106. ssize_t need = CurrentRequest->Avail();
  107. bool read = false, write = false;
  108. ssize_t res = TSocketImpl::Recv(CurrentRequest->Pos(), need, read, write);
  109. if (res > 0) {
  110. InactivityTimer.Reset();
  111. CurrentRequest->Advance(res);
  112. if (CurrentRequest->IsDone()) {
  113. Requests.emplace_back(CurrentRequest);
  114. CurrentRequest->Timer.Reset();
  115. if (CurrentRequest->IsReady()) {
  116. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
  117. ctx.Send(Endpoint.Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest));
  118. CurrentRequest = nullptr;
  119. } else if (CurrentRequest->IsError()) {
  120. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
  121. bool success = Respond(CurrentRequest->CreateResponseBadRequest(), ctx);
  122. if (!success) {
  123. return;
  124. }
  125. CurrentRequest = nullptr;
  126. }
  127. }
  128. } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
  129. if (PollerToken) {
  130. if (!read && !write) {
  131. read = true;
  132. }
  133. PollerToken->Request(read, write);
  134. }
  135. break;
  136. } else if (-res == EINTR) {
  137. continue;
  138. } else if (!res) {
  139. // connection closed
  140. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
  141. return Die(ctx);
  142. } else {
  143. LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Receive: " << strerror(-res));
  144. return Die(ctx);
  145. }
  146. }
  147. if (event->Get() == InactivityEvent) {
  148. const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed()));
  149. if (passed >= InactivityTimeout) {
  150. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed by inactivity timeout");
  151. return Die(ctx); // timeout
  152. } else {
  153. ctx.Schedule(InactivityTimeout - passed, InactivityEvent = new TEvPollerReady(nullptr, false, false));
  154. }
  155. }
  156. }
  157. if (event->Get()->Write) {
  158. FlushOutput(ctx);
  159. }
  160. }
  161. void HandleConnected(TEvPollerRegisterResult::TPtr ev, const TActorContext& /*ctx*/) {
  162. PollerToken = std::move(ev->Get()->PollerToken);
  163. PollerToken->Request(true, true);
  164. }
  165. void HandleConnected(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const TActorContext& ctx) {
  166. Respond(event->Get()->Response, ctx);
  167. }
  168. bool Respond(THttpOutgoingResponsePtr response, const TActorContext& ctx) {
  169. THttpIncomingRequestPtr request = response->GetRequest();
  170. response->Finish();
  171. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << response->Status << " " << response->Message << ")");
  172. if (response->Status != "200" && response->Status != "404") {
  173. static constexpr size_t MAX_LOGGED_SIZE = 1024;
  174. LOG_DEBUG_S(ctx, HttpLog,
  175. "(#"
  176. << TSocketImpl::GetRawSocket()
  177. << ","
  178. << Address
  179. << ") Request: "
  180. << request->GetObfuscatedData().substr(0, MAX_LOGGED_SIZE));
  181. LOG_DEBUG_S(ctx, HttpLog,
  182. "(#"
  183. << TSocketImpl::GetRawSocket()
  184. << ","
  185. << Address
  186. << ") Response: "
  187. << TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE));
  188. }
  189. THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildIncomingRequestSensors(request, response));
  190. ctx.Send(Endpoint.Owner, sensors.Release());
  191. if (request == Requests.front() && CurrentResponse == nullptr) {
  192. CurrentResponse = response;
  193. return FlushOutput(ctx);
  194. } else {
  195. // we are ahead of our pipeline
  196. Responses.emplace(request, response);
  197. return true;
  198. }
  199. }
  200. bool FlushOutput(const TActorContext& ctx) {
  201. while (CurrentResponse != nullptr) {
  202. size_t size = CurrentResponse->Size();
  203. if (size == 0) {
  204. Y_VERIFY(Requests.front() == CurrentResponse->GetRequest());
  205. bool close = CurrentResponse->IsConnectionClose();
  206. Requests.pop_front();
  207. CleanupResponse(CurrentResponse);
  208. if (!Requests.empty()) {
  209. auto it = Responses.find(Requests.front());
  210. if (it != Responses.end()) {
  211. CurrentResponse = it->second;
  212. Responses.erase(it);
  213. continue;
  214. } else {
  215. LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - FlushOutput request not found");
  216. Die(ctx);
  217. return false;
  218. }
  219. } else {
  220. if (close) {
  221. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
  222. Die(ctx);
  223. return false;
  224. } else {
  225. continue;
  226. }
  227. }
  228. }
  229. bool read = false, write = false;
  230. ssize_t res = TSocketImpl::Send(CurrentResponse->Data(), size, read, write);
  231. if (res > 0) {
  232. CurrentResponse->ChopHead(res);
  233. } else if (-res == EINTR) {
  234. continue;
  235. } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
  236. if (PollerToken) {
  237. if (!read && !write) {
  238. write = true;
  239. }
  240. PollerToken->Request(read, write);
  241. }
  242. break;
  243. } else {
  244. CleanupResponse(CurrentResponse);
  245. LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in FlushOutput: " << strerror(-res));
  246. Die(ctx);
  247. return false;
  248. }
  249. }
  250. return true;
  251. }
  252. STFUNC(StateAccepting) {
  253. switch (ev->GetTypeRewrite()) {
  254. CFunc(TEvents::TEvBootstrap::EventType, Bootstrap);
  255. HFunc(TEvPollerReady, HandleAccepting);
  256. HFunc(TEvPollerRegisterResult, HandleAccepting);
  257. }
  258. }
  259. STFUNC(StateConnected) {
  260. switch (ev->GetTypeRewrite()) {
  261. HFunc(TEvPollerReady, HandleConnected);
  262. HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, HandleConnected);
  263. HFunc(TEvPollerRegisterResult, HandleConnected);
  264. }
  265. }
  266. };
  267. IActor* CreateIncomingConnectionActor(
  268. const TEndpointInfo& endpoint,
  269. TIntrusivePtr<TSocketDescriptor> socket,
  270. THttpConfig::SocketAddressType address,
  271. THttpIncomingRequestPtr recycledRequest) {
  272. if (endpoint.Secure) {
  273. return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
  274. } else {
  275. return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
  276. }
  277. }
  278. }