http_proxy_outgoing.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. #include "http_proxy.h"
  2. #include "http_proxy_sock_impl.h"
  3. namespace NHttp {
  4. template <typename TSocketImpl>
  5. class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
  6. public:
  7. using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
  8. using TSelf = TOutgoingConnectionActor<TSocketImpl>;
  9. const TActorId Owner;
  10. const TActorId Poller;
  11. SocketAddressType Address;
  12. TString Host;
  13. TActorId RequestOwner;
  14. THttpOutgoingRequestPtr Request;
  15. THttpIncomingResponsePtr Response;
  16. TInstant LastActivity;
  17. TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
  18. NActors::TPollerToken::TPtr PollerToken;
  19. TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller)
  20. : TBase(&TSelf::StateWaiting)
  21. , Owner(owner)
  22. , Poller(poller)
  23. , Host(host)
  24. {
  25. TSocketImpl::SetNonBlock();
  26. TSocketImpl::SetTimeout(SOCKET_TIMEOUT);
  27. }
  28. void Die(const NActors::TActorContext& ctx) override {
  29. ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID));
  30. TSocketImpl::Shutdown(); // to avoid errors when connection already closed
  31. TBase::Die(ctx);
  32. }
  33. void ReplyAndDie(const NActors::TActorContext& ctx) {
  34. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")");
  35. ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response));
  36. RequestOwner = TActorId();
  37. THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
  38. ctx.Send(Owner, sensors.Release());
  39. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
  40. Die(ctx);
  41. }
  42. void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) {
  43. LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error);
  44. if (RequestOwner) {
  45. ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
  46. RequestOwner = TActorId();
  47. THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
  48. ctx.Send(Owner, sensors.Release());
  49. Die(ctx);
  50. }
  51. }
  52. protected:
  53. void FailConnection(const NActors::TActorContext& ctx, const TString& error) {
  54. if (Request) {
  55. return ReplyErrorAndDie(ctx, error);
  56. }
  57. return TBase::Become(&TOutgoingConnectionActor::StateFailed);
  58. }
  59. void Connect(const NActors::TActorContext& ctx) {
  60. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting");
  61. int res = TSocketImpl::Connect(Address);
  62. RegisterPoller(ctx);
  63. switch (-res) {
  64. case 0:
  65. return OnConnect(ctx);
  66. case EINPROGRESS:
  67. case EAGAIN:
  68. return TBase::Become(&TOutgoingConnectionActor::StateConnecting);
  69. default:
  70. return ReplyErrorAndDie(ctx, strerror(-res));
  71. }
  72. }
  73. void FlushOutput(const NActors::TActorContext& ctx) {
  74. if (Request != nullptr) {
  75. Request->Finish();
  76. while (auto size = Request->Size()) {
  77. bool read = false, write = false;
  78. ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write);
  79. if (res > 0) {
  80. Request->ChopHead(res);
  81. } else if (-res == EINTR) {
  82. continue;
  83. } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
  84. if (PollerToken) {
  85. if (!read && !write) {
  86. write = true;
  87. }
  88. PollerToken->Request(read, write);
  89. }
  90. break;
  91. } else {
  92. if (!res) {
  93. ReplyAndDie(ctx);
  94. } else {
  95. ReplyErrorAndDie(ctx, strerror(-res));
  96. }
  97. break;
  98. }
  99. }
  100. }
  101. }
  102. void PullInput(const NActors::TActorContext& ctx) {
  103. for (;;) {
  104. if (Response == nullptr) {
  105. Response = new THttpIncomingResponse(Request);
  106. }
  107. if (!Response->EnsureEnoughSpaceAvailable()) {
  108. return ReplyErrorAndDie(ctx, "Not enough space in socket buffer");
  109. }
  110. bool read = false, write = false;
  111. ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write);
  112. if (res > 0) {
  113. Response->Advance(res);
  114. if (Response->IsDone() && Response->IsReady()) {
  115. return ReplyAndDie(ctx);
  116. }
  117. } else if (-res == EINTR) {
  118. continue;
  119. } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
  120. if (PollerToken) {
  121. if (!read && !write) {
  122. read = true;
  123. }
  124. PollerToken->Request(read, write);
  125. }
  126. return;
  127. } else {
  128. if (!res) {
  129. Response->ConnectionClosed();
  130. }
  131. if (Response->IsDone() && Response->IsReady()) {
  132. return ReplyAndDie(ctx);
  133. }
  134. return ReplyErrorAndDie(ctx, strerror(-res));
  135. }
  136. }
  137. }
  138. void RegisterPoller(const NActors::TActorContext& ctx) {
  139. ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID));
  140. }
  141. void OnConnect(const NActors::TActorContext& ctx) {
  142. bool read = false, write = false;
  143. if (int res = TSocketImpl::OnConnect(read, write); res != 1) {
  144. if (-res == EAGAIN) {
  145. if (PollerToken) {
  146. PollerToken->Request(read, write);
  147. }
  148. return;
  149. } else {
  150. return ReplyErrorAndDie(ctx, strerror(-res));
  151. }
  152. }
  153. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened");
  154. TBase::Become(&TOutgoingConnectionActor::StateConnected);
  155. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")");
  156. ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true));
  157. }
  158. void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) {
  159. LastActivity = ctx.Now();
  160. if (!event->Get()->Error.empty()) {
  161. return FailConnection(ctx, event->Get()->Error);
  162. }
  163. Address = event->Get()->Address;
  164. if (Address.GetPort() == 0) {
  165. Address.SetPort(Request->Secure ? 443 : 80);
  166. }
  167. Connect(ctx);
  168. }
  169. void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
  170. LastActivity = ctx.Now();
  171. int res = TSocketImpl::GetError();
  172. if (res == 0) {
  173. OnConnect(ctx);
  174. } else {
  175. FailConnection(ctx, TStringBuilder() << strerror(res));
  176. }
  177. }
  178. void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
  179. PollerToken = std::move(ev->Get()->PollerToken);
  180. LastActivity = ctx.Now();
  181. int res = TSocketImpl::GetError();
  182. if (res == 0) {
  183. OnConnect(ctx);
  184. } else {
  185. FailConnection(ctx, TStringBuilder() << strerror(res));
  186. }
  187. }
  188. void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  189. LastActivity = ctx.Now();
  190. Request = std::move(event->Get()->Request);
  191. Host = Request->Host;
  192. LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host);
  193. Request->Timer.Reset();
  194. RequestOwner = event->Sender;
  195. ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host));
  196. if (event->Get()->Timeout) {
  197. ConnectionTimeout = event->Get()->Timeout;
  198. TSocketImpl::SetTimeout(ConnectionTimeout);
  199. }
  200. ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
  201. LastActivity = ctx.Now();
  202. TBase::Become(&TOutgoingConnectionActor::StateResolving);
  203. }
  204. void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) {
  205. LastActivity = ctx.Now();
  206. if (event->Get()->Read) {
  207. PullInput(ctx);
  208. }
  209. if (event->Get()->Write) {
  210. FlushOutput(ctx);
  211. }
  212. }
  213. void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
  214. PollerToken = std::move(ev->Get()->PollerToken);
  215. LastActivity = ctx.Now();
  216. PullInput(ctx);
  217. FlushOutput(ctx);
  218. }
  219. void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  220. Request = std::move(event->Get()->Request);
  221. RequestOwner = event->Sender;
  222. ReplyErrorAndDie(ctx, "Failed");
  223. }
  224. void HandleTimeout(const NActors::TActorContext& ctx) {
  225. TDuration inactivityTime = ctx.Now() - LastActivity;
  226. if (inactivityTime >= ConnectionTimeout) {
  227. FailConnection(ctx, "Connection timed out");
  228. } else {
  229. ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup());
  230. }
  231. }
  232. STFUNC(StateWaiting) {
  233. switch (ev->GetTypeRewrite()) {
  234. HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
  235. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  236. }
  237. }
  238. STFUNC(StateResolving) {
  239. switch (ev->GetTypeRewrite()) {
  240. HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
  241. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  242. }
  243. }
  244. STFUNC(StateConnecting) {
  245. switch (ev->GetTypeRewrite()) {
  246. HFunc(NActors::TEvPollerReady, HandleConnecting);
  247. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  248. HFunc(NActors::TEvPollerRegisterResult, HandleConnecting);
  249. }
  250. }
  251. STFUNC(StateConnected) {
  252. switch (ev->GetTypeRewrite()) {
  253. HFunc(NActors::TEvPollerReady, HandleConnected);
  254. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  255. HFunc(NActors::TEvPollerRegisterResult, HandleConnected);
  256. }
  257. }
  258. STFUNC(StateFailed) {
  259. switch (ev->GetTypeRewrite()) {
  260. HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed);
  261. }
  262. }
  263. };
  264. NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) {
  265. if (secure) {
  266. return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller);
  267. } else {
  268. return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller);
  269. }
  270. }
  271. }