123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- #include "service.h"
- #include <library/cpp/coroutine/engine/sockpool.h>
- #include <library/cpp/http/io/stream.h>
- #include <library/cpp/http/fetch/httpheader.h>
- #include <library/cpp/http/fetch/httpfsm.h>
- #include <library/cpp/uri/http_url.h>
- #include <util/generic/buffer.h>
- #include <util/stream/str.h>
- #include <util/stream/buffer.h>
- #include <util/stream/zerocopy.h>
- #include <util/string/vector.h>
- namespace NMonitoring {
- class THttpClient: public IHttpRequest {
- public:
- void ServeRequest(THttpInput& in, IOutputStream& out, const NAddr::IRemoteAddr* remoteAddr, const THandler& Handler) {
- try {
- try {
- RemoteAddr = remoteAddr;
- THttpHeaderParser parser;
- parser.Init(&Header);
- if (parser.Execute(in.FirstLine().data(), in.FirstLine().size()) < 0) {
- out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
- return;
- }
- if (Url.Parse(Header.GetUrl().data()) != THttpURL::ParsedOK) {
- out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n";
- return;
- }
- TString path = GetPath();
- if (!path.StartsWith('/')) {
- out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
- return;
- }
- Headers = &in.Headers();
- CgiParams.Scan(Url.Get(THttpURL::FieldQuery));
- } catch (...) {
- out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n";
- YSYSLOG(TLOG_ERR, "THttpClient: internal error while serving monitoring request: %s", CurrentExceptionMessage().data());
- }
- if (Header.http_method == HTTP_METHOD_POST)
- TransferData(&in, &PostContent);
- Handler(out, *this);
- out.Finish();
- } catch (...) {
- auto msg = CurrentExceptionMessage();
- out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n" << msg;
- out.Finish();
- YSYSLOG(TLOG_ERR, "THttpClient: error while serving monitoring request: %s", msg.data());
- }
- }
- const char* GetURI() const override {
- return Header.request_uri.c_str();
- }
- const char* GetPath() const override {
- return Url.Get(THttpURL::FieldPath);
- }
- const TCgiParameters& GetParams() const override {
- return CgiParams;
- }
- const TCgiParameters& GetPostParams() const override {
- if (PostParams.empty() && !PostContent.Buffer().Empty())
- const_cast<THttpClient*>(this)->ScanPostParams();
- return PostParams;
- }
- TStringBuf GetPostContent() const override {
- return TStringBuf(PostContent.Buffer().Data(), PostContent.Buffer().Size());
- }
- HTTP_METHOD GetMethod() const override {
- return (HTTP_METHOD)Header.http_method;
- }
- void ScanPostParams() {
- PostParams.Scan(TStringBuf(PostContent.Buffer().data(), PostContent.Buffer().size()));
- }
- const THttpHeaders& GetHeaders() const override {
- if (Headers != nullptr) {
- return *Headers;
- }
- static THttpHeaders defaultHeaders;
- return defaultHeaders;
- }
- TString GetRemoteAddr() const override {
- return RemoteAddr ? NAddr::PrintHostAndPort(*RemoteAddr) : TString();
- }
- private:
- THttpRequestHeader Header;
- const THttpHeaders* Headers = nullptr;
- THttpURL Url;
- TCgiParameters CgiParams;
- TCgiParameters PostParams;
- TBufferOutput PostContent;
- const NAddr::IRemoteAddr* RemoteAddr = nullptr;
- };
- /* TCoHttpServer */
- class TCoHttpServer::TConnection: public THttpClient {
- public:
- TConnection(const TCoHttpServer::TAcceptFull& acc, const TCoHttpServer& parent)
- : Socket(acc.S->Release())
- , RemoteAddr(acc.Remote)
- , Parent(parent)
- {
- }
- void operator()(TCont* c) {
- try {
- THolder<TConnection> me(this);
- TContIO io(Socket, c);
- THttpInput in(&io);
- THttpOutput out(&io, &in);
- // buffer reply so there will be ne context switching
- TStringStream s;
- ServeRequest(in, s, RemoteAddr, Parent.Handler);
- out << s.Str();
- out.Finish();
- } catch (...) {
- YSYSLOG(TLOG_WARNING, "TCoHttpServer::TConnection: error: %s\n", CurrentExceptionMessage().data());
- }
- }
- private:
- TSocketHolder Socket;
- const NAddr::IRemoteAddr* RemoteAddr;
- const TCoHttpServer& Parent;
- };
- TCoHttpServer::TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler)
- : Executor(executor)
- , Listener(this, &executor)
- , Handler(std::move(handler))
- , BindAddr(bindAddr)
- , Port(port)
- {
- try {
- Listener.Bind(TIpAddress(bindAddr, port));
- } catch (yexception e) {
- Y_ABORT("TCoHttpServer::TCoHttpServer: couldn't bind to %s:%d\n", bindAddr.data(), port);
- }
- }
- void TCoHttpServer::Start() {
- Listener.Listen();
- }
- void TCoHttpServer::Stop() {
- Listener.Stop();
- }
- void TCoHttpServer::OnAcceptFull(const TAcceptFull& acc) {
- THolder<TConnection> conn(new TConnection(acc, *this));
- Executor.Create(*conn, "client");
- Y_UNUSED(conn.Release());
- }
- void TCoHttpServer::OnError() {
- throw; // just rethrow
- }
- void TCoHttpServer::ProcessRequest(IOutputStream& out, const IHttpRequest& request) {
- try {
- TNetworkAddress addr(BindAddr, Port);
- TSocket sock(addr);
- TSocketOutput sock_out(sock);
- TSocketInput sock_in(sock);
- sock_out << "GET " << request.GetURI() << " HTTP/1.0\r\n\r\n";
- THttpInput http_in(&sock_in);
- try {
- out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
- TransferData(&http_in, &out);
- } catch (...) {
- YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
- }
- } catch (const yexception& /*e*/) {
- out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n";
- YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
- }
- }
- /* TMtHttpServer */
- class TMtHttpServer::TConnection: public TClientRequest, public THttpClient {
- public:
- TConnection(const TMtHttpServer& parent)
- : Parent(parent)
- {
- }
- bool Reply(void*) override {
- ServeRequest(Input(), Output(), NAddr::GetPeerAddr(Socket()).Get(), Parent.Handler);
- return true;
- }
- private:
- const TMtHttpServer& Parent;
- };
- TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool)
- : THttpServer(this, options, pool)
- , Handler(std::move(handler))
- {
- }
- TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool)
- : THttpServer(this, /* mainWorkers = */pool, /* failWorkers = */pool, options)
- , Handler(std::move(handler))
- {
- }
- bool TMtHttpServer::Start() {
- return THttpServer::Start();
- }
- void TMtHttpServer::StartOrThrow() {
- if (!Start()) {
- const auto& opts = THttpServer::Options();
- TNetworkAddress addr = opts.Host
- ? TNetworkAddress(opts.Host, opts.Port)
- : TNetworkAddress(opts.Port);
- ythrow TSystemError(GetErrorCode()) << addr;
- }
- }
- void TMtHttpServer::Stop() {
- THttpServer::Stop();
- }
- TClientRequest* TMtHttpServer::CreateClient() {
- return new TConnection(*this);
- }
- /* TService */
- TMonService::TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort,
- THandler coHandler, THandler mtHandler)
- : CoServer(executor, "127.0.0.1", internalPort, std::move(coHandler))
- , MtServer(THttpServerOptions(externalPort), std::bind(&TMonService::DispatchRequest, this, std::placeholders::_1, std::placeholders::_2))
- , MtHandler(std::move(mtHandler))
- {
- }
- void TMonService::Start() {
- MtServer.Start();
- CoServer.Start();
- }
- void TMonService::Stop() {
- MtServer.Stop();
- CoServer.Stop();
- }
- void TMonService::DispatchRequest(IOutputStream& out, const IHttpRequest& request) {
- if (strcmp(request.GetPath(), "/") == 0) {
- out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
- MtHandler(out, request);
- } else
- CoServer.ProcessRequest(out, request);
- }
- }
|