http_proxy.cpp 14 KB


  1. #include <library/cpp/actors/core/events.h>
  2. #include <library/cpp/monlib/metrics/metric_registry.h>
  3. #include "http_proxy.h"
  4. namespace NHttp {
  5. class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig {
  6. public:
  7. IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
  8. IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller);
  9. TActorId acceptorId = ctx.Register(listeningSocket);
  10. ctx.Send(event->Forward(acceptorId));
  11. Acceptors.emplace_back(acceptorId);
  12. return listeningSocket;
  13. }
  14. IActor* AddOutgoingConnection(bool secure, const NActors::TActorContext& ctx) {
  15. IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, secure, Poller);
  16. TActorId connectionId = ctx.Register(connectionSocket);
  17. Connections.emplace(connectionId);
  18. return connectionSocket;
  19. }
  20. void Bootstrap(const NActors::TActorContext& ctx) {
  21. Poller = ctx.Register(NActors::CreatePollerActor());
  22. Become(&THttpProxy::StateWork);
  23. }
  24. THttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry)
  25. : Registry(std::move(registry))
  26. {}
  27. static constexpr char ActorName[] = "HTTP_PROXY_ACTOR";
  28. protected:
  29. STFUNC(StateWork) {
  30. switch (ev->GetTypeRewrite()) {
  31. HFunc(TEvHttpProxy::TEvAddListeningPort, Handle);
  32. HFunc(TEvHttpProxy::TEvRegisterHandler, Handle);
  33. HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle);
  34. HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
  35. HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle);
  36. HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
  37. HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle);
  38. HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
  39. HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle);
  40. HFunc(TEvHttpProxy::TEvReportSensors, Handle);
  41. HFunc(NActors::TEvents::TEvPoison, Handle);
  42. }
  43. }
  44. void PassAway() override {
  45. Send(Poller, new NActors::TEvents::TEvPoisonPill());
  46. for (const NActors::TActorId& connection : Connections) {
  47. Send(connection, new NActors::TEvents::TEvPoisonPill());
  48. }
  49. for (const NActors::TActorId& acceptor : Acceptors) {
  50. Send(acceptor, new NActors::TEvents::TEvPoisonPill());
  51. }
  52. NActors::TActorBootstrapped<THttpProxy>::PassAway();
  53. }
  54. void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
  55. TStringBuf url = event->Get()->Request->URL.Before('?');
  56. THashMap<TString, TActorId>::iterator it;
  57. while (!url.empty()) {
  58. it = Handlers.find(url);
  59. if (it != Handlers.end()) {
  60. ctx.Send(event->Forward(it->second));
  61. return;
  62. } else {
  63. if (url.EndsWith('/')) {
  64. url.Chop(1);
  65. } else {
  66. size_t pos = url.rfind('/');
  67. if (pos == TStringBuf::npos) {
  68. break;
  69. } else {
  70. url = url.substr(0, pos + 1);
  71. }
  72. }
  73. }
  74. }
  75. ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound()));
  76. }
  77. void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
  78. Y_UNUSED(event);
  79. Y_UNUSED(ctx);
  80. Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly");
  81. }
  82. void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
  83. Y_UNUSED(event);
  84. Y_UNUSED(ctx);
  85. Y_FAIL("This event shouldn't be there, it should go to the http connection directly");
  86. }
  87. void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  88. bool secure(event->Get()->Request->Secure);
  89. NActors::IActor* actor = AddOutgoingConnection(secure, ctx);
  90. ctx.Send(event->Forward(actor->SelfId()));
  91. }
  92. void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
  93. AddListeningPort(event, ctx);
  94. }
  95. void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) {
  96. for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) {
  97. if (*it == event->Get()->ConnectionID) {
  98. Acceptors.erase(it);
  99. break;
  100. }
  101. }
  102. }
  103. void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
  104. Connections.erase(event->Get()->ConnectionID);
  105. }
  106. void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
  107. LOG_TRACE_S(ctx, HttpLog, "Register handler " << event->Get()->Path << " to " << event->Get()->Handler);
  108. Handlers[event->Get()->Path] = event->Get()->Handler;
  109. }
  110. void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) {
  111. const TString& host(event->Get()->Host);
  112. auto it = Hosts.find(host);
  113. if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) {
  114. TString addressPart;
  115. TIpPort portPart = 0;
  116. CrackAddress(host, addressPart, portPart);
  117. if (IsIPv6(addressPart)) {
  118. if (it == Hosts.end()) {
  119. it = Hosts.emplace(host, THostEntry()).first;
  120. }
  121. it->second.Address = std::make_shared<TSockAddrInet6>(addressPart.data(), portPart);
  122. it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
  123. } else if (IsIPv4(addressPart)) {
  124. if (it == Hosts.end()) {
  125. it = Hosts.emplace(host, THostEntry()).first;
  126. }
  127. it->second.Address = std::make_shared<TSockAddrInet>(addressPart.data(), portPart);
  128. it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
  129. } else {
  130. // TODO(xenoxeno): move to another, possible blocking actor
  131. try {
  132. const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart));
  133. if (result != nullptr) {
  134. auto pAddr = result->Addr.Begin();
  135. while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET && pAddr->ai_family != AF_INET6) {
  136. ++pAddr;
  137. }
  138. if (pAddr == result->Addr.End()) {
  139. ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved"));
  140. return;
  141. }
  142. THttpConfig::SocketAddressType address;
  143. switch (pAddr->ai_family) {
  144. case AF_INET:
  145. address = std::make_shared<TSockAddrInet>();
  146. break;
  147. case AF_INET6:
  148. address = std::make_shared<TSockAddrInet6>();
  149. break;
  150. }
  151. if (address) {
  152. memcpy(address->SockAddr(), pAddr->ai_addr, pAddr->ai_addrlen);
  153. LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address->ToString());
  154. if (it == Hosts.end()) {
  155. it = Hosts.emplace(host, THostEntry()).first;
  156. }
  157. it->second.Address = address;
  158. it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
  159. }
  160. } else {
  161. ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host"));
  162. return;
  163. }
  164. }
  165. catch (const yexception& e) {
  166. ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what()));
  167. return;
  168. }
  169. }
  170. }
  171. ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address));
  172. }
  173. void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) {
  174. const TEvHttpProxy::TEvReportSensors& sensors(*event->Get());
  175. const static TString urlNotFound = "not-found";
  176. const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url);
  177. std::shared_ptr<NMonitoring::TMetricRegistry> registry = Registry.lock();
  178. if (registry) {
  179. registry->Rate(
  180. {
  181. {"sensor", "count"},
  182. {"direction", sensors.Direction},
  183. {"peer", sensors.Host},
  184. {"url", url},
  185. {"status", sensors.Status}
  186. })->Inc();
  187. registry->HistogramRate(
  188. {
  189. {"sensor", "time_us"},
  190. {"direction", sensors.Direction},
  191. {"peer", sensors.Host},
  192. {"url", url},
  193. {"status", sensors.Status}
  194. },
  195. NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds());
  196. registry->HistogramRate(
  197. {
  198. {"sensor", "time_ms"},
  199. {"direction", sensors.Direction},
  200. {"peer", sensors.Host},
  201. {"url", url},
  202. {"status", sensors.Status}
  203. },
  204. NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds());
  205. }
  206. }
  207. void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) {
  208. for (const TActorId& acceptor : Acceptors) {
  209. Send(acceptor, new NActors::TEvents::TEvPoisonPill());
  210. }
  211. for (const TActorId& connection : Connections) {
  212. Send(connection, new NActors::TEvents::TEvPoisonPill());
  213. }
  214. PassAway();
  215. }
  216. NActors::TActorId Poller;
  217. TVector<TActorId> Acceptors;
  218. struct THostEntry {
  219. THttpConfig::SocketAddressType Address;
  220. TInstant DeadlineTime;
  221. };
  222. static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60);
  223. THashMap<TString, THostEntry> Hosts;
  224. THashMap<TString, TActorId> Handlers;
  225. THashSet<TActorId> Connections; // outgoing
  226. std::weak_ptr<NMonitoring::TMetricRegistry> Registry;
  227. };
  228. TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) {
  229. return new TEvHttpProxy::TEvReportSensors(
  230. "out",
  231. request->Host,
  232. request->URL.Before('?'),
  233. response ? response->Status : "504",
  234. TDuration::Seconds(std::abs(request->Timer.Passed()))
  235. );
  236. }
  237. TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) {
  238. const auto& sensors = response->Sensors;
  239. if (sensors) {
  240. return new TEvHttpProxy::TEvReportSensors(*sensors);
  241. }
  242. return new TEvHttpProxy::TEvReportSensors(
  243. "in",
  244. request->Host,
  245. request->URL.Before('?'),
  246. response->Status,
  247. TDuration::Seconds(std::abs(request->Timer.Passed()))
  248. );
  249. }
  250. NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry) {
  251. return new THttpProxy(std::move(registry));
  252. }
  253. bool IsIPv6(const TString& host) {
  254. if (host.find_first_not_of(":0123456789abcdef") != TString::npos) {
  255. return false;
  256. }
  257. if (std::count(host.begin(), host.end(), ':') < 2) {
  258. return false;
  259. }
  260. return true;
  261. }
  262. bool IsIPv4(const TString& host) {
  263. if (host.find_first_not_of(".0123456789") != TString::npos) {
  264. return false;
  265. }
  266. if (std::count(host.begin(), host.end(), '.') != 3) {
  267. return false;
  268. }
  269. return true;
  270. }
  271. bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) {
  272. url.TrySplit("://", scheme, url);
  273. auto pos = url.find('/');
  274. if (pos == TStringBuf::npos) {
  275. host = url;
  276. } else {
  277. host = url.substr(0, pos);
  278. uri = url.substr(pos);
  279. }
  280. return true;
  281. }
  282. void CrackAddress(const TString& address, TString& hostname, TIpPort& port) {
  283. size_t first_colon_pos = address.find(':');
  284. if (first_colon_pos != TString::npos) {
  285. size_t last_colon_pos = address.rfind(':');
  286. if (last_colon_pos == first_colon_pos) {
  287. // only one colon, simple case
  288. port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0);
  289. hostname = address.substr(0, first_colon_pos);
  290. } else {
  291. // ipv6?
  292. size_t closing_bracket_pos = address.rfind(']');
  293. if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) {
  294. // whole address is ipv6 host
  295. hostname = address;
  296. } else {
  297. port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0);
  298. hostname = address.substr(0, last_colon_pos);
  299. }
  300. if (hostname.StartsWith('[') && hostname.EndsWith(']')) {
  301. hostname = hostname.substr(1, hostname.size() - 2);
  302. }
  303. }
  304. } else {
  305. hostname = address;
  306. }
  307. }
  308. void TrimBegin(TStringBuf& target, char delim) {
  309. while (!target.empty() && *target.begin() == delim) {
  310. target.Skip(1);
  311. }
  312. }
  313. void TrimEnd(TStringBuf& target, char delim) {
  314. while (!target.empty() && target.back() == delim) {
  315. target.Trunc(target.size() - 1);
  316. }
  317. }
  318. void Trim(TStringBuf& target, char delim) {
  319. TrimBegin(target, delim);
  320. TrimEnd(target, delim);
  321. }
  322. void TrimEnd(TString& target, char delim) {
  323. while (!target.empty() && target.back() == delim) {
  324. target.resize(target.size() - 1);
  325. }
  326. }
  327. }