buffered.h 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. #pragma once
  2. #include "zerocopy.h"
  3. #include "zerocopy_output.h"
  4. #include <utility>
  5. #include <util/generic/ptr.h>
  6. #include <util/generic/typetraits.h>
  7. #include <util/generic/store_policy.h>
  8. /**
  9. * @addtogroup Streams_Buffered
  10. * @{
  11. */
  12. /**
  13. * Input stream that wraps the given stream and adds a buffer on top of it,
  14. * thus making sure that data is read from the underlying stream in big chunks.
  15. *
  16. * Note that it does not claim ownership of the underlying stream, so it's up
  17. * to the user to free it.
  18. */
  19. class TBufferedInput: public IZeroCopyInput {
  20. public:
  21. TBufferedInput(IInputStream* slave, size_t buflen = 8192);
  22. TBufferedInput(TBufferedInput&&) noexcept;
  23. TBufferedInput& operator=(TBufferedInput&&) noexcept;
  24. ~TBufferedInput() override;
  25. /**
  26. * Switches the underlying stream to the one provided. Does not clear the
  27. * data that was already buffered.
  28. *
  29. * @param slave New underlying stream.
  30. */
  31. void Reset(IInputStream* slave);
  32. protected:
  33. size_t DoRead(void* buf, size_t len) override;
  34. size_t DoReadTo(TString& st, char ch) override;
  35. size_t DoSkip(size_t len) override;
  36. size_t DoNext(const void** ptr, size_t len) override;
  37. private:
  38. class TImpl;
  39. THolder<TImpl> Impl_;
  40. };
  41. /**
  42. * Output stream that wraps the given stream and adds a buffer on top of it,
  43. * thus making sure that data is written to the underlying stream in big chunks.
  44. *
  45. * Note that by default this stream does not propagate `Flush` and `Finish`
  46. * calls to the underlying stream, instead simply flushing out the buffer.
  47. * You can change this behavior by using propagation mode setters.
  48. *
  49. * Also note that this stream does not claim ownership of the underlying stream,
  50. * so it's up to the user to free it.
  51. */
  52. class TBufferedOutputBase: public IZeroCopyOutput {
  53. public:
  54. /**
  55. * Constructs a buffered stream that dynamically adjusts the size of the
  56. * buffer. This works best when the amount of data that will be passed
  57. * through this stream is not known and can range in size from several
  58. * kilobytes to several gigabytes.
  59. *
  60. * @param slave Underlying stream.
  61. */
  62. TBufferedOutputBase(IOutputStream* slave);
  63. /**
  64. * Constructs a buffered stream with the given size of the buffer.
  65. *
  66. * @param slave Underlying stream.
  67. * @param buflen Size of the buffer.
  68. */
  69. TBufferedOutputBase(IOutputStream* slave, size_t buflen);
  70. TBufferedOutputBase(TBufferedOutputBase&&) noexcept;
  71. TBufferedOutputBase& operator=(TBufferedOutputBase&&) noexcept;
  72. ~TBufferedOutputBase() override;
  73. /**
  74. * @param propagate Whether `Flush` and `Finish` calls should
  75. * be propagated to the underlying stream.
  76. * By default they are not.
  77. */
  78. inline void SetPropagateMode(bool propagate) noexcept {
  79. SetFlushPropagateMode(propagate);
  80. SetFinishPropagateMode(propagate);
  81. }
  82. /**
  83. * @param propagate Whether `Flush` calls should be propagated
  84. * to the underlying stream. By default they
  85. * are not.
  86. */
  87. void SetFlushPropagateMode(bool propagate) noexcept;
  88. /**
  89. * @param propagate Whether `Finish` calls should be propagated
  90. * to the underlying stream. By default they
  91. * are not.
  92. */
  93. void SetFinishPropagateMode(bool propagate) noexcept;
  94. class TImpl;
  95. protected:
  96. size_t DoNext(void** ptr) override;
  97. void DoUndo(size_t len) override;
  98. void DoWrite(const void* data, size_t len) override;
  99. void DoWriteC(char c) override;
  100. void DoFlush() override;
  101. void DoFinish() override;
  102. private:
  103. THolder<TImpl> Impl_;
  104. };
  105. /**
  106. * Buffered output stream with a fixed-size buffer.
  107. *
  108. * @see TBufferedOutputBase
  109. */
  110. class TBufferedOutput: public TBufferedOutputBase {
  111. public:
  112. TBufferedOutput(IOutputStream* slave, size_t buflen = 8192);
  113. ~TBufferedOutput() override;
  114. TBufferedOutput(TBufferedOutput&&) noexcept = default;
  115. TBufferedOutput& operator=(TBufferedOutput&&) noexcept = default;
  116. };
  117. /**
  118. * Buffered output stream that dynamically adjusts the size of the buffer based
  119. * on the amount of data that's passed through it.
  120. *
  121. * @see TBufferedOutputBase
  122. */
  123. class TAdaptiveBufferedOutput: public TBufferedOutputBase {
  124. public:
  125. TAdaptiveBufferedOutput(IOutputStream* slave);
  126. ~TAdaptiveBufferedOutput() override;
  127. TAdaptiveBufferedOutput(TAdaptiveBufferedOutput&&) noexcept = default;
  128. TAdaptiveBufferedOutput& operator=(TAdaptiveBufferedOutput&&) noexcept = default;
  129. };
  130. namespace NPrivate {
  131. struct TMyBufferedOutput: public TBufferedOutput {
  132. inline TMyBufferedOutput(IOutputStream* slave, size_t buflen)
  133. : TBufferedOutput(slave, buflen)
  134. {
  135. SetFinishPropagateMode(true);
  136. }
  137. };
  138. template <class T>
  139. struct TBufferedStreamFor {
  140. using TResult = std::conditional_t<std::is_base_of<IInputStream, T>::value, TBufferedInput, TMyBufferedOutput>;
  141. };
  142. }
  143. /**
  144. * A mixin class that turns unbuffered stream into a buffered one.
  145. *
  146. * Note that using this mixin with a stream that is already buffered won't
  147. * result in double buffering, e.g. `TBuffered<TBuffered<TUnbufferedFileInput>>` and
  148. * `TBuffered<TUnbufferedFileInput>` are basically the same types.
  149. *
  150. * Example usage:
  151. * @code
  152. * TBuffered<TUnbufferedFileInput> file_input(1024, "/path/to/file");
  153. * TBuffered<TUnbufferedFileOutput> file_output(1024, "/path/to/file");
  154. * @endcode
  155. * Here 1024 is the size of the buffer.
  156. */
  157. template <class TSlave>
  158. class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult {
  159. using TSlaveBase = TEmbedPolicy<TSlave>;
  160. using TBufferedBase = typename ::NPrivate::TBufferedStreamFor<TSlave>::TResult;
  161. public:
  162. template <typename... Args>
  163. inline TBuffered(size_t b, Args&&... args)
  164. : TSlaveBase(std::forward<Args>(args)...)
  165. , TBufferedBase(TSlaveBase::Ptr(), b)
  166. {
  167. }
  168. inline TSlave& Slave() noexcept {
  169. return *this->Ptr();
  170. }
  171. TBuffered(const TBuffered&) = delete;
  172. TBuffered& operator=(const TBuffered&) = delete;
  173. TBuffered(TBuffered&&) = delete;
  174. TBuffered& operator=(TBuffered&&) = delete;
  175. };
  176. /**
  177. * A mixin class that turns unbuffered stream into an adaptively buffered one.
  178. * Created stream differs from the one created via `TBuffered` template in that
  179. * it dynamically adjusts the size of the buffer based on the amount of data
  180. * that's passed through it.
  181. *
  182. * Example usage:
  183. * @code
  184. * TAdaptivelyBuffered<TUnbufferedFileOutput> file_output("/path/to/file");
  185. * @endcode
  186. */
  187. template <class TSlave>
  188. class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput {
  189. using TSlaveBase = TEmbedPolicy<TSlave>;
  190. public:
  191. template <typename... Args>
  192. inline TAdaptivelyBuffered(Args&&... args)
  193. : TSlaveBase(std::forward<Args>(args)...)
  194. , TAdaptiveBufferedOutput(TSlaveBase::Ptr())
  195. {
  196. }
  197. TAdaptivelyBuffered(const TAdaptivelyBuffered&) = delete;
  198. TAdaptivelyBuffered& operator=(const TAdaptivelyBuffered&) = delete;
  199. TAdaptivelyBuffered(TAdaptivelyBuffered&& other) = delete;
  200. TAdaptivelyBuffered& operator=(TAdaptivelyBuffered&& other) = delete;
  201. };
  202. /** @} */