1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- #include "stream.h"
- namespace NUnifiedAgent {
- namespace {
- class TDefaultStreamRecordConverter : public IStreamRecordConverter {
- public:
- TDefaultStreamRecordConverter(bool stripTrailingNewLine)
- : StripTrailingNewLine(stripTrailingNewLine)
- {
- }
- TClientMessage Convert(const void* buf, size_t len) const override {
- TStringBuf str(static_cast<const char*>(buf), len);
- if (StripTrailingNewLine) {
- str.ChopSuffix("\n");
- }
- return {
- TString(str),
- {}
- };
- }
- private:
- const bool StripTrailingNewLine;
- };
- class TClientSessionAdapter: public IOutputStream {
- public:
- explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter)
- : Session(session)
- , RecordConverter(std::move(recordConverter))
- {
- }
- void DoWrite(const void* buf, size_t len) override {
- Session->Send(RecordConverter->Convert(buf, len));
- }
- void DoFlush() override {
- }
- private:
- TClientSessionPtr Session;
- THolder<IStreamRecordConverter> RecordConverter;
- };
- class TSessionHolder {
- protected:
- TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
- : Client(MakeClient(parameters))
- , Session(Client->CreateSession(sessionParameters))
- {
- }
- protected:
- TClientPtr Client;
- TClientSessionPtr Session;
- };
- class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter {
- public:
- TAgentOutputStream(const TClientParameters& parameters,
- const TSessionParameters& sessionParameters,
- THolder<IStreamRecordConverter> recordConverter)
- : TSessionHolder(parameters, sessionParameters)
- , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
- {
- }
- ~TAgentOutputStream() override {
- TSessionHolder::Session->Close();
- }
- };
- }
- THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) {
- return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine);
- }
- THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
- const TSessionParameters& sessionParameters,
- THolder<IStreamRecordConverter> recordConverter)
- {
- if (!recordConverter) {
- recordConverter = MakeDefaultStreamRecordConverter();
- }
- return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter));
- }
- }
|