stream.cpp 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #include "stream.h"
  2. namespace NUnifiedAgent {
  3. namespace {
  4. class TDefaultStreamRecordConverter : public IStreamRecordConverter {
  5. public:
  6. TDefaultStreamRecordConverter(bool stripTrailingNewLine)
  7. : StripTrailingNewLine(stripTrailingNewLine)
  8. {
  9. }
  10. TClientMessage Convert(const void* buf, size_t len) const override {
  11. TStringBuf str(static_cast<const char*>(buf), len);
  12. if (StripTrailingNewLine) {
  13. str.ChopSuffix("\n");
  14. }
  15. return {
  16. TString(str),
  17. {}
  18. };
  19. }
  20. private:
  21. const bool StripTrailingNewLine;
  22. };
  23. class TClientSessionAdapter: public IOutputStream {
  24. public:
  25. explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IStreamRecordConverter> recordConverter)
  26. : Session(session)
  27. , RecordConverter(std::move(recordConverter))
  28. {
  29. }
  30. void DoWrite(const void* buf, size_t len) override {
  31. Session->Send(RecordConverter->Convert(buf, len));
  32. }
  33. void DoFlush() override {
  34. }
  35. private:
  36. TClientSessionPtr Session;
  37. THolder<IStreamRecordConverter> RecordConverter;
  38. };
  39. class TSessionHolder {
  40. protected:
  41. TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
  42. : Client(MakeClient(parameters))
  43. , Session(Client->CreateSession(sessionParameters))
  44. {
  45. }
  46. protected:
  47. TClientPtr Client;
  48. TClientSessionPtr Session;
  49. };
  50. class TAgentOutputStream: private TSessionHolder, public TClientSessionAdapter {
  51. public:
  52. TAgentOutputStream(const TClientParameters& parameters,
  53. const TSessionParameters& sessionParameters,
  54. THolder<IStreamRecordConverter> recordConverter)
  55. : TSessionHolder(parameters, sessionParameters)
  56. , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
  57. {
  58. }
  59. ~TAgentOutputStream() override {
  60. TSessionHolder::Session->Close();
  61. }
  62. };
  63. }
  64. THolder<IStreamRecordConverter> MakeDefaultStreamRecordConverter(bool stripTrailingNewLine) {
  65. return MakeHolder<TDefaultStreamRecordConverter>(stripTrailingNewLine);
  66. }
  67. THolder<IOutputStream> MakeOutputStream(const TClientParameters& parameters,
  68. const TSessionParameters& sessionParameters,
  69. THolder<IStreamRecordConverter> recordConverter)
  70. {
  71. if (!recordConverter) {
  72. recordConverter = MakeDefaultStreamRecordConverter();
  73. }
  74. return MakeHolder<TAgentOutputStream>(parameters, sessionParameters, std::move(recordConverter));
  75. }
  76. }