http_proxy_outgoing.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. #include "http_proxy.h"
  2. #include "http_proxy_sock_impl.h"
  3. namespace NHttp {
  4. template <typename TSocketImpl>
  5. class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
  6. public:
  7. using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
  8. using TSelf = TOutgoingConnectionActor<TSocketImpl>;
  9. const TActorId Owner;
  10. const TActorId Poller;
  11. SocketAddressType Address;
  12. TActorId RequestOwner;
  13. THttpOutgoingRequestPtr Request;
  14. THttpIncomingResponsePtr Response;
  15. TInstant LastActivity;
  16. TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
  17. NActors::TPollerToken::TPtr PollerToken;
  18. TOutgoingConnectionActor(const TActorId& owner, const TActorId& poller)
  19. : TBase(&TSelf::StateWaiting)
  20. , Owner(owner)
  21. , Poller(poller)
  22. {
  23. }
  24. static constexpr char ActorName[] = "OUT_CONNECTION_ACTOR";
  25. void Die(const NActors::TActorContext& ctx) override {
  26. ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID));
  27. TSocketImpl::Shutdown(); // to avoid errors when connection already closed
  28. TBase::Die(ctx);
  29. }
  30. TString GetSocketName() {
  31. TStringBuilder builder;
  32. if (TSocketImpl::Socket) {
  33. builder << "(#" << TSocketImpl::GetRawSocket();
  34. if (Address && Address->SockAddr()->sa_family) {
  35. builder << "," << Address;
  36. }
  37. builder << ") ";
  38. }
  39. return builder;
  40. }
  41. void ReplyAndDie(const NActors::TActorContext& ctx) {
  42. LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "-> (" << Response->Status << " " << Response->Message << ")");
  43. ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response));
  44. RequestOwner = TActorId();
  45. THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
  46. ctx.Send(Owner, sensors.Release());
  47. LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection closed");
  48. Die(ctx);
  49. }
  50. void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) {
  51. LOG_ERROR_S(ctx, HttpLog, GetSocketName() << "connection closed with error: " << error);
  52. if (RequestOwner) {
  53. ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
  54. RequestOwner = TActorId();
  55. THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
  56. ctx.Send(Owner, sensors.Release());
  57. Die(ctx);
  58. }
  59. }
  60. protected:
  61. void FailConnection(const NActors::TActorContext& ctx, const TString& error) {
  62. if (Request) {
  63. return ReplyErrorAndDie(ctx, error);
  64. }
  65. return TBase::Become(&TOutgoingConnectionActor::StateFailed);
  66. }
  67. void Connect(const NActors::TActorContext& ctx) {
  68. LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connecting");
  69. TSocketImpl::Create(Address->SockAddr()->sa_family);
  70. TSocketImpl::SetNonBlock();
  71. TSocketImpl::SetTimeout(ConnectionTimeout);
  72. int res = TSocketImpl::Connect(Address);
  73. RegisterPoller(ctx);
  74. switch (-res) {
  75. case 0:
  76. return OnConnect(ctx);
  77. case EINPROGRESS:
  78. case EAGAIN:
  79. return TBase::Become(&TOutgoingConnectionActor::StateConnecting);
  80. default:
  81. return ReplyErrorAndDie(ctx, strerror(-res));
  82. }
  83. }
  84. void FlushOutput(const NActors::TActorContext& ctx) {
  85. if (Request != nullptr) {
  86. Request->Finish();
  87. while (auto size = Request->Size()) {
  88. bool read = false, write = false;
  89. ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write);
  90. if (res > 0) {
  91. Request->ChopHead(res);
  92. } else if (-res == EINTR) {
  93. continue;
  94. } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
  95. if (PollerToken) {
  96. if (!read && !write) {
  97. write = true;
  98. }
  99. PollerToken->Request(read, write);
  100. }
  101. break;
  102. } else {
  103. if (!res) {
  104. ReplyAndDie(ctx);
  105. } else {
  106. ReplyErrorAndDie(ctx, strerror(-res));
  107. }
  108. break;
  109. }
  110. }
  111. }
  112. }
  113. void PullInput(const NActors::TActorContext& ctx) {
  114. for (;;) {
  115. if (Response == nullptr) {
  116. Response = new THttpIncomingResponse(Request);
  117. }
  118. if (!Response->EnsureEnoughSpaceAvailable()) {
  119. return ReplyErrorAndDie(ctx, "Not enough space in socket buffer");
  120. }
  121. bool read = false, write = false;
  122. ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write);
  123. if (res > 0) {
  124. Response->Advance(res);
  125. if (Response->IsDone() && Response->IsReady()) {
  126. return ReplyAndDie(ctx);
  127. }
  128. } else if (-res == EINTR) {
  129. continue;
  130. } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
  131. if (PollerToken) {
  132. if (!read && !write) {
  133. read = true;
  134. }
  135. PollerToken->Request(read, write);
  136. }
  137. return;
  138. } else {
  139. if (!res) {
  140. Response->ConnectionClosed();
  141. }
  142. if (Response->IsDone() && Response->IsReady()) {
  143. return ReplyAndDie(ctx);
  144. }
  145. return ReplyErrorAndDie(ctx, strerror(-res));
  146. }
  147. }
  148. }
  149. void RegisterPoller(const NActors::TActorContext& ctx) {
  150. ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID));
  151. }
  152. void OnConnect(const NActors::TActorContext& ctx) {
  153. bool read = false, write = false;
  154. if (int res = TSocketImpl::OnConnect(read, write); res != 1) {
  155. if (-res == EAGAIN) {
  156. if (PollerToken) {
  157. PollerToken->Request(read, write);
  158. }
  159. return;
  160. } else {
  161. return ReplyErrorAndDie(ctx, strerror(-res));
  162. }
  163. }
  164. LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "outgoing connection opened");
  165. TBase::Become(&TOutgoingConnectionActor::StateConnected);
  166. LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "<- (" << Request->Method << " " << Request->URL << ")");
  167. ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true));
  168. }
  169. static int GetPort(SocketAddressType address) {
  170. switch (address->SockAddr()->sa_family) {
  171. case AF_INET:
  172. return ntohs(reinterpret_cast<sockaddr_in*>(address->SockAddr())->sin_port);
  173. case AF_INET6:
  174. return ntohs(reinterpret_cast<sockaddr_in6*>(address->SockAddr())->sin6_port);
  175. }
  176. return {};
  177. }
  178. static void SetPort(SocketAddressType address, int port) {
  179. switch (address->SockAddr()->sa_family) {
  180. case AF_INET:
  181. reinterpret_cast<sockaddr_in*>(address->SockAddr())->sin_port = htons(port);
  182. break;
  183. case AF_INET6:
  184. reinterpret_cast<sockaddr_in6*>(address->SockAddr())->sin6_port = htons(port);
  185. break;
  186. }
  187. }
  188. void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) {
  189. LastActivity = ctx.Now();
  190. if (!event->Get()->Error.empty()) {
  191. return FailConnection(ctx, event->Get()->Error);
  192. }
  193. Address = event->Get()->Address;
  194. if (GetPort(Address) == 0) {
  195. SetPort(Address, Request->Secure ? 443 : 80);
  196. }
  197. Connect(ctx);
  198. }
  199. void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
  200. LastActivity = ctx.Now();
  201. int res = TSocketImpl::GetError();
  202. if (res == 0) {
  203. OnConnect(ctx);
  204. } else {
  205. FailConnection(ctx, TStringBuilder() << strerror(res));
  206. }
  207. }
  208. void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
  209. PollerToken = std::move(ev->Get()->PollerToken);
  210. LastActivity = ctx.Now();
  211. int res = TSocketImpl::GetError();
  212. if (res == 0) {
  213. OnConnect(ctx);
  214. } else {
  215. FailConnection(ctx, TStringBuilder() << strerror(res));
  216. }
  217. }
  218. void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  219. LastActivity = ctx.Now();
  220. Request = std::move(event->Get()->Request);
  221. TSocketImpl::SetHost(TString(Request->Host));
  222. LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
  223. Request->Timer.Reset();
  224. RequestOwner = event->Sender;
  225. ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(TSocketImpl::Host));
  226. if (event->Get()->Timeout) {
  227. ConnectionTimeout = event->Get()->Timeout;
  228. }
  229. ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
  230. LastActivity = ctx.Now();
  231. TBase::Become(&TOutgoingConnectionActor::StateResolving);
  232. }
  233. void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) {
  234. LastActivity = ctx.Now();
  235. if (event->Get()->Write && RequestOwner) {
  236. FlushOutput(ctx);
  237. }
  238. if (event->Get()->Read && RequestOwner) {
  239. PullInput(ctx);
  240. }
  241. }
  242. void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
  243. PollerToken = std::move(ev->Get()->PollerToken);
  244. LastActivity = ctx.Now();
  245. PullInput(ctx);
  246. FlushOutput(ctx);
  247. }
  248. void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  249. Request = std::move(event->Get()->Request);
  250. RequestOwner = event->Sender;
  251. ReplyErrorAndDie(ctx, "Failed");
  252. }
  253. void HandleTimeout(const NActors::TActorContext& ctx) {
  254. TDuration inactivityTime = ctx.Now() - LastActivity;
  255. if (inactivityTime >= ConnectionTimeout) {
  256. FailConnection(ctx, "Connection timed out");
  257. } else {
  258. ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup());
  259. }
  260. }
  261. STFUNC(StateWaiting) {
  262. switch (ev->GetTypeRewrite()) {
  263. HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
  264. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  265. }
  266. }
  267. STFUNC(StateResolving) {
  268. switch (ev->GetTypeRewrite()) {
  269. HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
  270. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  271. }
  272. }
  273. STFUNC(StateConnecting) {
  274. switch (ev->GetTypeRewrite()) {
  275. HFunc(NActors::TEvPollerReady, HandleConnecting);
  276. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  277. HFunc(NActors::TEvPollerRegisterResult, HandleConnecting);
  278. }
  279. }
  280. STFUNC(StateConnected) {
  281. switch (ev->GetTypeRewrite()) {
  282. HFunc(NActors::TEvPollerReady, HandleConnected);
  283. CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
  284. HFunc(NActors::TEvPollerRegisterResult, HandleConnected);
  285. }
  286. }
  287. STFUNC(StateFailed) {
  288. switch (ev->GetTypeRewrite()) {
  289. HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed);
  290. }
  291. }
  292. };
  293. NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure, const TActorId& poller) {
  294. if (secure) {
  295. return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, poller);
  296. } else {
  297. return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, poller);
  298. }
  299. }
  300. }