Browse Source

Add UnifiedAgentWriterFactory
commit_hash:af6dedadd4d7fe292bcb7a8b6de366aff4e630b1

lo-r-d 1 month ago
parent
commit
13b3cd2845

+ 89 - 0
library/cpp/unified_agent_client/stream.cpp

@@ -0,0 +1,89 @@
+#include "stream.h"
+
+namespace NUnifiedAgent {
+    namespace {
+        class TDefaultStreamRecordConverter : public IStreamRecordConverter {
+        public:
+            TDefaultStreamRecordConverter(bool stripTrailingNewLine)
+                : StripTrailingNewLine(stripTrailingNewLine)
+            {
+            }
+
+            TClientMessage Convert(const void* buf, size_t len) const override {
+                TStringBuf str(static_cast<const char*>(buf), len);
+                if (StripTrailingNewLine) {
+                    str.ChopSuffix("\n");
+                }
+                return {
+                    TString(str),
+                    {}
+                };
+            }
+
+        private:
+            const bool StripTrailingNewLine;
+        };
+
+        class TClientSessionAdapter: public IOutputStream {
+        public:
+            explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter)
+                : Session(session)
+                , RecordConverter(std::move(recordConverter))
+            {
+            }
+
+            void DoWrite(const void* buf, size_t len) override {
+                Session->Send(RecordConverter->Convert(buf, len));
+            }
+
+            void DoFlush() override {
+            }
+
+        private:
+            TClientSessionPtr Session;
+            THolder<IStreamRecordConverter> RecordConverter;
+        };
+
+        class TSessionHolder {
+        protected:
+            TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
+                : Client(MakeClient(parameters))
+                , Session(Client->CreateSession(sessionParameters))
+            {
+            }
+
+        protected:
+            TClientPtr Client;
+            TClientSessionPtr Session;
+        };
+
+        class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter {
+        public:
+            TAgentOutputStream(const TClientParameters& parameters,
+                const TSessionParameters& sessionParameters,
+                THolder<IStreamRecordConverter> recordConverter)
+                : TSessionHolder(parameters, sessionParameters)
+                , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
+            {
+            }
+
+            ~TAgentOutputStream() override {
+                TSessionHolder::Session->Close();
+            }
+        };
+    }
+
+    THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) {
+        return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine);
+    }
+
+    THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
+        const TSessionParameters& sessionParameters,
+        THolder<IStreamRecordConverter> recordConverter)
+    {
+        if (!recordConverter) {
+            recordConverter = MakeDefaultStreamRecordConverter();
+        }
+        return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter));
+    }
+}

+ 18 - 0
library/cpp/unified_agent_client/stream.h

@@ -0,0 +1,18 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/client.h>
+
+namespace NUnifiedAgent {
+    class IStreamRecordConverter {
+    public:
+        virtual ~IStreamRecordConverter() = default;
+
+        virtual TClientMessage Convert(const void* buf, size_t len) const = 0;
+    };
+
+    THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine = true);
+
+    THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
+        const TSessionParameters& sessionParameters = {},
+        THolder<IStreamRecordConverter> recordConverter = {});
+}

+ 1 - 0
library/cpp/unified_agent_client/ya.make

@@ -16,6 +16,7 @@ SRCS(
     clock.cpp
     duration_counter.cpp
     logger.cpp
+    stream.cpp
     throttling.cpp
     proto_weighing.cpp
     GLOBAL registrar.cpp