Просмотр исходного кода

add async monitoring http KIKIMR-14742

ref:c51d608f0ae78f08597b88f837491da33a953ef6
Alexey Efimov 2 лет назад
Родитель
Сommit
6faf680f58

+ 55 - 6
library/cpp/actors/http/http.cpp

@@ -62,6 +62,14 @@ void THttpRequest::Clear() {
     new (this) THttpRequest(); // reset all fields
 }
 
+template <>
+bool THttpParser<THttpRequest, TSocketBuffer>::HaveBody() const {
+    if (!Body.empty()) {
+        return true;
+    }
+    return (!ContentType.empty() || !ContentLength.empty() || !TransferEncoding.empty());
+}
+
 template <>
 void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
     TStringBuf data(Pos(), len);
@@ -98,7 +106,6 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
             case EParseStage::Header: {
                 if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) {
                     if (Header.empty()) {
-                        Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin());
                         if (HaveBody()) {
                             Stage = EParseStage::Body;
                         } else {
@@ -107,6 +114,7 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
                     } else {
                         ProcessHeader(Header);
                     }
+                    Headers = TStringBuf(Headers.data(), data.data() - Headers.data());
                 }
                 break;
             }
@@ -116,8 +124,13 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
                         Body = Content;
                         Stage = EParseStage::Done;
                     }
-                } else if (TransferEncoding == "chunked") {
+                } else if (TEqNoCase()(TransferEncoding, "chunked")) {
                     Stage = EParseStage::ChunkLength;
+                } else if (TotalSize.has_value()) {
+                    if (ProcessData(Content, data, GetBodySizeFromTotalSize())) {
+                        Body = Content;
+                        Stage = EParseStage::Done;
+                    }
                 } else {
                     // Invalid body encoding
                     Stage = EParseStage::Error;
@@ -188,6 +201,15 @@ THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest,
     return EParseStage::Method;
 }
 
+template <>
+bool THttpParser<THttpResponse, TSocketBuffer>::HaveBody() const {
+    if (!Body.empty()) {
+        return true;
+    }
+    return (!Status.starts_with("1") && Status != "204" && Status != "304")
+        && (!ContentType.empty() || !ContentLength.empty() || !TransferEncoding.empty());
+}
+
 template <>
 THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() {
     return EParseStage::Protocol;
@@ -237,6 +259,8 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
                     if (Header.empty()) {
                         if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) {
                             Stage = EParseStage::Body;
+                        } else if (TotalSize.has_value() && !data.empty()) {
+                            Stage = EParseStage::Body;
                         } else {
                             Stage = EParseStage::Done;
                         }
@@ -252,8 +276,13 @@ void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
                     if (ProcessData(Body, data, FromString(ContentLength))) {
                         Stage = EParseStage::Done;
                     }
-                } else if (TransferEncoding == "chunked") {
+                } else if (TEqNoCase()(TransferEncoding, "chunked")) {
                     Stage = EParseStage::ChunkLength;
+                } else if (TotalSize.has_value()) {
+                    if (ProcessData(Content, data, GetBodySizeFromTotalSize())) {
+                        Body = Content;
+                        Stage = EParseStage::Done;
+                    }
                 } else {
                     // Invalid body encoding
                     Stage = EParseStage::Error;
@@ -333,9 +362,19 @@ void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() {
 }
 
 THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
+    THttpParser<THttpResponse, TSocketBuffer> parser(data);
+    THeadersBuilder headers(parser.Headers);
+    if (!WorkerName.empty()) {
+        headers.Set("X-Worker-Name", WorkerName);
+    }
     THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
-    response->Append(data);
-    response->Reparse();
+    response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
+    response->Set(headers);
+    if (parser.HaveBody()) {
+        response->SetBody(parser.Body);
+    } else {
+        response->Set<&THttpResponse::ContentLength>("0");
+    }
     return response;
 }
 
@@ -601,11 +640,17 @@ void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) {
 }
 
 THeaders::THeaders(TStringBuf headers) {
+    Parse(headers);
+}
+
+size_t THeaders::Parse(TStringBuf headers) {
+    auto start = headers.begin();
     for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) {
         TStringBuf name = param.NextTok(":");
         param.SkipPrefix(" ");
         Headers[name] = param;
     }
+    return headers.begin() - start;
 }
 
 TStringBuf THeaders::operator [](TStringBuf name) const {
@@ -636,7 +681,11 @@ TString THeaders::Render() const {
 }
 
 THeadersBuilder::THeadersBuilder()
-    :THeaders(TStringBuf())
+    : THeaders(TStringBuf())
+{}
+
+THeadersBuilder::THeadersBuilder(TStringBuf headers)
+    : THeaders(headers)
 {}
 
 THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {

+ 48 - 7
library/cpp/actors/http/http.h

@@ -40,6 +40,17 @@ struct TLessNoCase {
     }
 };
 
+struct TEqNoCase {
+    bool operator()(TStringBuf l, TStringBuf r) const {
+        auto ll = l.length();
+        auto rl = r.length();
+        if (ll != rl) {
+            return false;
+        }
+        return strnicmp(l.data(), r.data(), ll) == 0;
+    }
+};
+
 struct TUrlParameters {
     THashMap<TStringBuf, TStringBuf> Parameters;
 
@@ -77,6 +88,7 @@ struct THeaders {
     TStringBuf operator [](TStringBuf name) const;
     bool Has(TStringBuf name) const;
     TStringBuf Get(TStringBuf name) const; // raw
+    size_t Parse(TStringBuf headers);
     TString Render() const;
 };
 
@@ -84,6 +96,7 @@ struct THeadersBuilder : THeaders {
     TDeque<std::pair<TString, TString>> Data;
 
     THeadersBuilder();
+    THeadersBuilder(TStringBuf headers);
     THeadersBuilder(const THeadersBuilder& builder);
     void Set(TStringBuf name, TStringBuf data);
 };
@@ -188,6 +201,7 @@ public:
     size_t ChunkLength = 0;
     size_t ContentSize = 0;
     TString Content;
+    std::optional<size_t> TotalSize;
 
     THttpParser(const THttpParser& src)
         : HeaderType(src)
@@ -285,6 +299,10 @@ public:
     void Advance(size_t len);
     void ConnectionClosed();
 
+    size_t GetBodySizeFromTotalSize() const {
+        return TotalSize.value() - (HeaderType::Headers.end() - BufferType::Data());
+    }
+
     void Clear() {
         BufferType::Clear();
         HeaderType::Clear();
@@ -333,9 +351,7 @@ public:
         return IsReady() || IsError();
     }
 
-    bool HaveBody() const {
-        return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty();
-    }
+    bool HaveBody() const;
 
     bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) {
         bool result = BufferType::EnsureEnoughSpaceAvailable(need);
@@ -395,6 +411,16 @@ public:
         : Stage(GetInitialStage())
         , LastSuccessStage(Stage)
     {}
+
+    THttpParser(TStringBuf data)
+        : Stage(GetInitialStage())
+        , LastSuccessStage(Stage)
+    {
+        BufferType::Assign(data.data(), data.size());
+        BufferType::Clear(); // reset position to 0
+        TotalSize = data.size();
+        Advance(data.size());
+    }
 };
 
 template <typename HeaderType, typename BufferType>
@@ -440,14 +466,21 @@ public:
         Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
         Append(name);
         Append(": ");
+        auto data = BufferType::Pos();
         Append(value);
+        auto cit = HeaderType::HeadersLocation.find(name);
+        if (cit != HeaderType::HeadersLocation.end()) {
+            (this->*cit->second) = TStringBuf(data, BufferType::Pos());
+        }
         Append("\r\n");
         HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
     }
 
     void Set(const THeaders& headers) {
         Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
-        Append(headers.Render());
+        for (const auto& [name, value] : headers.Headers) {
+            Set(name, value);
+        }
         HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
     }
 
@@ -497,6 +530,10 @@ public:
         Stage = ERenderStage::Done;
     }
 
+    void FinishBody() {
+        Stage = ERenderStage::Done;
+    }
+
     bool IsDone() const {
         return Stage == ERenderStage::Done;
     }
@@ -505,6 +542,10 @@ public:
         switch (Stage) {
         case ERenderStage::Header:
             FinishHeader();
+            FinishBody();
+            break;
+        case ERenderStage::Body:
+            FinishBody();
             break;
         default:
             break;
@@ -599,7 +640,7 @@ public:
         if (Connection.empty()) {
             return Version == "1.0";
         } else {
-            return Connection == "close";
+            return TEqNoCase()(Connection, "close");
         }
     }
 
@@ -679,14 +720,14 @@ public:
 
     bool IsConnectionClose() const {
         if (!Connection.empty()) {
-            return Connection == "close";
+            return TEqNoCase()(Connection, "close");
         } else {
             return Request->IsConnectionClose();
         }
     }
 
     bool IsNeedBody() const {
-        return Status != "204";
+        return GetRequest()->Method != "HEAD" && Status != "204";
     }
 
     THttpIncomingRequestPtr GetRequest() const {

+ 15 - 7
library/cpp/actors/http/http_proxy.cpp

@@ -68,13 +68,14 @@ protected:
                 return;
             } else {
                 if (url.EndsWith('/')) {
-                    url.Trunc(url.size() - 1);
-                }
-                size_t pos = url.rfind('/');
-                if (pos == TStringBuf::npos) {
-                    break;
+                    url.Chop(1);
                 } else {
-                    url = url.substr(0, pos + 1);
+                    size_t pos = url.rfind('/');
+                    if (pos == TStringBuf::npos) {
+                        break;
+                    } else {
+                        url = url.substr(0, pos + 1);
+                    }
                 }
             }
         }
@@ -117,7 +118,8 @@ protected:
         Connections.erase(event->Get()->ConnectionID);
     }
 
-    void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) {
+    void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
+        LOG_DEBUG_S(ctx, HttpLog, "Register handler " << event->Get()->Path << " to " << event->Get()->Handler);
         Handlers[event->Get()->Path] = event->Get()->Handler;
     }
 
@@ -207,6 +209,12 @@ protected:
     }
 
     void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) {
+        for (const TActorId& acceptor : Acceptors) {
+            Send(acceptor, new NActors::TEvents::TEvPoisonPill());
+        }
+        for (const TActorId& connection : Connections) {
+            Send(connection, new NActors::TEvents::TEvPoisonPill());
+        }
         PassAway();
     }
 

+ 15 - 2
library/cpp/actors/http/http_ut.cpp

@@ -50,7 +50,20 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
         UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
     }
 
-    Y_UNIT_TEST(BasicParsingChunkedBody) {
+    Y_UNIT_TEST(BasicParsingChunkedBodyRequest) {
+        NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
+        EatWholeString(request, "POST /Url HTTP/1.1\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
+        UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
+        UNIT_ASSERT_EQUAL(request->Method, "POST");
+        UNIT_ASSERT_EQUAL(request->URL, "/Url");
+        UNIT_ASSERT_EQUAL(request->Connection, "close");
+        UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
+        UNIT_ASSERT_EQUAL(request->Version, "1.1");
+        UNIT_ASSERT_EQUAL(request->TransferEncoding, "chunked");
+        UNIT_ASSERT_EQUAL(request->Body, "this is test.");
+    }
+
+    Y_UNIT_TEST(BasicParsingChunkedBodyResponse) {
         NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
         NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
         EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
@@ -83,7 +96,7 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
         UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test.");
     }
 
-    Y_UNIT_TEST(CreateRepsonseWithCompressedBody) {
+    Y_UNIT_TEST(CreateResponseWithCompressedBody) {
         NHttp::THttpIncomingRequestPtr request = nullptr;
         NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK");
         response->Set<&NHttp::THttpResponse::ContentEncoding>("gzip");

+ 3 - 3
library/cpp/lwtrace/mon/mon_lwtrace.cpp

@@ -4671,12 +4671,12 @@ private:
     }
 };
 
-void RegisterPages(NMonitoring::TMonService2* mon, bool allowUnsafe) {
+void RegisterPages(NMonitoring::TIndexMonPage* index, bool allowUnsafe) {
     THolder<NLwTraceMonPage::TLWTraceMonPage> p = MakeHolder<NLwTraceMonPage::TLWTraceMonPage>(allowUnsafe);
-    mon->Register(p.Release());
+    index->Register(p.Release());
 
 #define WWW_STATIC_FILE(file, type) \
-        mon->Register(new TResourceMonPage(file, file, NMonitoring::TResourceMonPage::type));
+        index->Register(new TResourceMonPage(file, file, NMonitoring::TResourceMonPage::type));
     WWW_STATIC_FILE("lwtrace/mon/static/common.css", CSS);
     WWW_STATIC_FILE("lwtrace/mon/static/common.js", JAVASCRIPT);
     WWW_STATIC_FILE("lwtrace/mon/static/css/bootstrap.min.css", CSS);

+ 1 - 1
library/cpp/lwtrace/mon/mon_lwtrace.h

@@ -19,7 +19,7 @@ public:
     void Output(TStringStream& ss);
 };
 
-void RegisterPages(NMonitoring::TMonService2* mon, bool allowUnsafe = false);
+void RegisterPages(NMonitoring::TIndexMonPage* index, bool allowUnsafe = false);
 NLWTrace::TProbeRegistry& ProbeRegistry(); // This is not safe to use this function before main()
 NLWTrace::TManager& TraceManager(bool allowUnsafe = false);
 TDashboardRegistry& DashboardRegistry();

+ 4 - 0
library/cpp/monlib/service/monservice.h

@@ -68,6 +68,10 @@ namespace NMonitoring {
         IMonPage* FindPage(const TString& relativePath);
         TIndexMonPage* FindIndexPage(const TString& relativePath);
         void SortPages();
+
+        TIndexMonPage* GetRoot() {
+            return IndexMonPage.Get();
+        }
     };
 
 }

+ 2 - 1
ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_run.cpp

@@ -4,6 +4,7 @@
 #include "blobstorage_pdisk_ut_base_test.h"
 
 #include <ydb/core/base/appdata.h>
+#include <ydb/core/mon/sync_http_mon.h>
 #include <ydb/core/blobstorage/crypto/default.h>
 #include <ydb/library/pdisk_io/aio.h>
 
@@ -138,7 +139,7 @@ void Run(TVector<IActor*> tests, TTestRunConfig runCfg) {
 
         if (IsMonitoringEnabled) {
             // Monitoring startup
-            monitoring.Reset(new NActors::TMon({
+            monitoring.Reset(new NActors::TSyncHttpMon({
                 .Port = pm.GetPort(8081),
                 .Title = "TestYard monitoring"
             }));

+ 2 - 2
ydb/core/blobstorage/ut_vdisk/lib/astest.h

@@ -6,7 +6,7 @@
 #include <library/cpp/actors/core/executor_pool_basic.h>
 #include <library/cpp/actors/core/executor_pool_io.h>
 #include <library/cpp/actors/core/scheduler_basic.h>
-#include <ydb/core/mon/mon.h>
+#include <ydb/core/mon/sync_http_mon.h>
 #include <library/cpp/actors/interconnect/interconnect.h>
 #include <library/cpp/actors/protos/services_common.pb.h>
 #include <ydb/core/base/appdata.h>
@@ -106,7 +106,7 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
     if (!MonPort) {
         MonPort = pm.GetPort(MonPort);
     }
-    Monitoring.reset(new NActors::TMon({
+    Monitoring.reset(new NActors::TSyncHttpMon({
         .Port = MonPort,
         .Title = "at"
     }));

+ 3 - 1
ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp

@@ -6,6 +6,8 @@
 #include <ydb/core/blobstorage/vdisk/vdisk_actor.h>
 #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
 
+#include <ydb/core/mon/sync_http_mon.h>
+
 #include <ydb/core/scheme/scheme_type_registry.h>
 
 #include <library/cpp/actors/core/executor_pool_basic.h>
@@ -359,7 +361,7 @@ void TConfiguration::Prepare(IVDiskSetup *vdiskSetup, bool newPDisks, bool runRe
     //////////////////////////////////////////////////////////////////////////////
 
     ///////////////////////// MONITORING SETTINGS /////////////////////////////////
-    Monitoring.reset(new NActors::TMon({
+    Monitoring.reset(new NActors::TSyncHttpMon({
         .Port = 8088,
         .Title = "at"
     }));

Некоторые файлы не были показаны из-за большого количества измененных файлов