123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- #include <library/cpp/actors/core/events.h>
- #include <library/cpp/monlib/metrics/metric_registry.h>
- #include "http_proxy.h"
- namespace NHttp {
- class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig {
- public:
- IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller);
- TActorId acceptorId = ctx.Register(listeningSocket);
- ctx.Send(event->Forward(acceptorId));
- Acceptors.emplace_back(acceptorId);
- return listeningSocket;
- }
- IActor* AddOutgoingConnection(bool secure, const NActors::TActorContext& ctx) {
- IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, secure, Poller);
- TActorId connectionId = ctx.Register(connectionSocket);
- Connections.emplace(connectionId);
- return connectionSocket;
- }
- void Bootstrap(const NActors::TActorContext& ctx) {
- Poller = ctx.Register(NActors::CreatePollerActor());
- Become(&THttpProxy::StateWork);
- }
- THttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry)
- : Registry(std::move(registry))
- {}
- static constexpr char ActorName[] = "HTTP_PROXY_ACTOR";
- protected:
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvHttpProxy::TEvAddListeningPort, Handle);
- HFunc(TEvHttpProxy::TEvRegisterHandler, Handle);
- HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
- HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle);
- HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
- HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle);
- HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
- HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle);
- HFunc(TEvHttpProxy::TEvReportSensors, Handle);
- HFunc(NActors::TEvents::TEvPoison, Handle);
- }
- }
- void PassAway() override {
- Send(Poller, new NActors::TEvents::TEvPoisonPill());
- for (const NActors::TActorId& connection : Connections) {
- Send(connection, new NActors::TEvents::TEvPoisonPill());
- }
- for (const NActors::TActorId& acceptor : Acceptors) {
- Send(acceptor, new NActors::TEvents::TEvPoisonPill());
- }
- NActors::TActorBootstrapped<THttpProxy>::PassAway();
- }
- void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
- TStringBuf url = event->Get()->Request->URL.Before('?');
- THashMap<TString, TActorId>::iterator it;
- while (!url.empty()) {
- it = Handlers.find(url);
- if (it != Handlers.end()) {
- ctx.Send(event->Forward(it->second));
- return;
- } else {
- if (url.EndsWith('/')) {
- url.Chop(1);
- } else {
- size_t pos = url.rfind('/');
- if (pos == TStringBuf::npos) {
- break;
- } else {
- url = url.substr(0, pos + 1);
- }
- }
- }
- }
- ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound()));
- }
- void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
- Y_UNUSED(event);
- Y_UNUSED(ctx);
- Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly");
- }
- void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
- Y_UNUSED(event);
- Y_UNUSED(ctx);
- Y_FAIL("This event shouldn't be there, it should go to the http connection directly");
- }
- void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
- bool secure(event->Get()->Request->Secure);
- NActors::IActor* actor = AddOutgoingConnection(secure, ctx);
- ctx.Send(event->Forward(actor->SelfId()));
- }
- void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- AddListeningPort(event, ctx);
- }
- void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) {
- for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) {
- if (*it == event->Get()->ConnectionID) {
- Acceptors.erase(it);
- break;
- }
- }
- }
- void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
- Connections.erase(event->Get()->ConnectionID);
- }
- void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
- LOG_TRACE_S(ctx, HttpLog, "Register handler " << event->Get()->Path << " to " << event->Get()->Handler);
- Handlers[event->Get()->Path] = event->Get()->Handler;
- }
- void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) {
- const TString& host(event->Get()->Host);
- auto it = Hosts.find(host);
- if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) {
- TString addressPart;
- TIpPort portPart = 0;
- CrackAddress(host, addressPart, portPart);
- if (IsIPv6(addressPart)) {
- if (it == Hosts.end()) {
- it = Hosts.emplace(host, THostEntry()).first;
- }
- it->second.Address = std::make_shared<TSockAddrInet6>(addressPart.data(), portPart);
- it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
- } else if (IsIPv4(addressPart)) {
- if (it == Hosts.end()) {
- it = Hosts.emplace(host, THostEntry()).first;
- }
- it->second.Address = std::make_shared<TSockAddrInet>(addressPart.data(), portPart);
- it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
- } else {
- // TODO(xenoxeno): move to another, possible blocking actor
- try {
- const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart));
- if (result != nullptr) {
- auto pAddr = result->Addr.Begin();
- while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET && pAddr->ai_family != AF_INET6) {
- ++pAddr;
- }
- if (pAddr == result->Addr.End()) {
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved"));
- return;
- }
- THttpConfig::SocketAddressType address;
- switch (pAddr->ai_family) {
- case AF_INET:
- address = std::make_shared<TSockAddrInet>();
- break;
- case AF_INET6:
- address = std::make_shared<TSockAddrInet6>();
- break;
- }
- if (address) {
- memcpy(address->SockAddr(), pAddr->ai_addr, pAddr->ai_addrlen);
- LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address->ToString());
- if (it == Hosts.end()) {
- it = Hosts.emplace(host, THostEntry()).first;
- }
- it->second.Address = address;
- it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
- }
- } else {
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host"));
- return;
- }
- }
- catch (const yexception& e) {
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what()));
- return;
- }
- }
- }
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address));
- }
- void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) {
- const TEvHttpProxy::TEvReportSensors& sensors(*event->Get());
- const static TString urlNotFound = "not-found";
- const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url);
- std::shared_ptr<NMonitoring::TMetricRegistry> registry = Registry.lock();
- if (registry) {
- registry->Rate(
- {
- {"sensor", "count"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
- {"url", url},
- {"status", sensors.Status}
- })->Inc();
- registry->HistogramRate(
- {
- {"sensor", "time_us"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
- {"url", url},
- {"status", sensors.Status}
- },
- NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds());
- registry->HistogramRate(
- {
- {"sensor", "time_ms"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
- {"url", url},
- {"status", sensors.Status}
- },
- NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds());
- }
- }
- void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) {
- for (const TActorId& acceptor : Acceptors) {
- Send(acceptor, new NActors::TEvents::TEvPoisonPill());
- }
- for (const TActorId& connection : Connections) {
- Send(connection, new NActors::TEvents::TEvPoisonPill());
- }
- PassAway();
- }
- NActors::TActorId Poller;
- TVector<TActorId> Acceptors;
- struct THostEntry {
- THttpConfig::SocketAddressType Address;
- TInstant DeadlineTime;
- };
- static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60);
- THashMap<TString, THostEntry> Hosts;
- THashMap<TString, TActorId> Handlers;
- THashSet<TActorId> Connections; // outgoing
- std::weak_ptr<NMonitoring::TMetricRegistry> Registry;
- };
- TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) {
- return new TEvHttpProxy::TEvReportSensors(
- "out",
- request->Host,
- request->URL.Before('?'),
- response ? response->Status : "504",
- TDuration::Seconds(std::abs(request->Timer.Passed()))
- );
- }
- TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) {
- const auto& sensors = response->Sensors;
- if (sensors) {
- return new TEvHttpProxy::TEvReportSensors(*sensors);
- }
- return new TEvHttpProxy::TEvReportSensors(
- "in",
- request->Host,
- request->URL.Before('?'),
- response->Status,
- TDuration::Seconds(std::abs(request->Timer.Passed()))
- );
- }
- NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry) {
- return new THttpProxy(std::move(registry));
- }
- bool IsIPv6(const TString& host) {
- if (host.find_first_not_of(":0123456789abcdef") != TString::npos) {
- return false;
- }
- if (std::count(host.begin(), host.end(), ':') < 2) {
- return false;
- }
- return true;
- }
- bool IsIPv4(const TString& host) {
- if (host.find_first_not_of(".0123456789") != TString::npos) {
- return false;
- }
- if (std::count(host.begin(), host.end(), '.') != 3) {
- return false;
- }
- return true;
- }
- bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) {
- url.TrySplit("://", scheme, url);
- auto pos = url.find('/');
- if (pos == TStringBuf::npos) {
- host = url;
- } else {
- host = url.substr(0, pos);
- uri = url.substr(pos);
- }
- return true;
- }
- void CrackAddress(const TString& address, TString& hostname, TIpPort& port) {
- size_t first_colon_pos = address.find(':');
- if (first_colon_pos != TString::npos) {
- size_t last_colon_pos = address.rfind(':');
- if (last_colon_pos == first_colon_pos) {
- // only one colon, simple case
- port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0);
- hostname = address.substr(0, first_colon_pos);
- } else {
- // ipv6?
- size_t closing_bracket_pos = address.rfind(']');
- if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) {
- // whole address is ipv6 host
- hostname = address;
- } else {
- port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0);
- hostname = address.substr(0, last_colon_pos);
- }
- if (hostname.StartsWith('[') && hostname.EndsWith(']')) {
- hostname = hostname.substr(1, hostname.size() - 2);
- }
- }
- } else {
- hostname = address;
- }
- }
- void TrimBegin(TStringBuf& target, char delim) {
- while (!target.empty() && *target.begin() == delim) {
- target.Skip(1);
- }
- }
- void TrimEnd(TStringBuf& target, char delim) {
- while (!target.empty() && target.back() == delim) {
- target.Trunc(target.size() - 1);
- }
- }
- void Trim(TStringBuf& target, char delim) {
- TrimBegin(target, delim);
- TrimEnd(target, delim);
- }
- void TrimEnd(TString& target, char delim) {
- while (!target.empty() && target.back() == delim) {
- target.resize(target.size() - 1);
- }
- }
- }
|