buffered.cpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. #include "mem.h"
  2. #include "buffered.h"
  3. #include <util/memory/addstorage.h>
  4. #include <util/generic/yexception.h>
  5. #include <util/generic/buffer.h>
  6. class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> {
  7. public:
  8. inline TImpl(IInputStream* slave)
  9. : Slave_(slave)
  10. , MemInput_(nullptr, 0)
  11. {
  12. }
  13. inline ~TImpl() = default;
  14. inline size_t Next(const void** ptr, size_t len) {
  15. if (MemInput_.Exhausted()) {
  16. MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
  17. }
  18. return MemInput_.Next(ptr, len);
  19. }
  20. inline size_t Read(void* buf, size_t len) {
  21. if (MemInput_.Exhausted()) {
  22. if (len > BufLen() / 2) {
  23. return Slave_->Read(buf, len);
  24. }
  25. MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
  26. }
  27. return MemInput_.Read(buf, len);
  28. }
  29. inline size_t Skip(size_t len) {
  30. size_t totalSkipped = 0;
  31. while (len) {
  32. const size_t skipped = DoSkip(len);
  33. if (skipped == 0) {
  34. break;
  35. }
  36. totalSkipped += skipped;
  37. len -= skipped;
  38. }
  39. return totalSkipped;
  40. }
  41. inline size_t DoSkip(size_t len) {
  42. if (MemInput_.Exhausted()) {
  43. if (len > BufLen() / 2) {
  44. return Slave_->Skip(len);
  45. }
  46. MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
  47. }
  48. return MemInput_.Skip(len);
  49. }
  50. inline size_t ReadTo(TString& st, char to) {
  51. st.clear();
  52. TString s_tmp;
  53. size_t ret = 0;
  54. while (true) {
  55. if (MemInput_.Exhausted()) {
  56. const size_t bytesRead = Slave_->Read(Buf(), BufLen());
  57. if (!bytesRead) {
  58. break;
  59. }
  60. MemInput_.Reset(Buf(), bytesRead);
  61. }
  62. const size_t a_len(MemInput_.Avail());
  63. size_t s_len = 0;
  64. if (st.empty()) {
  65. ret += MemInput_.ReadTo(st, to);
  66. s_len = st.length();
  67. } else {
  68. ret += MemInput_.ReadTo(s_tmp, to);
  69. s_len = s_tmp.length();
  70. st.append(s_tmp);
  71. }
  72. if (s_len != a_len) {
  73. break;
  74. }
  75. }
  76. return ret;
  77. }
  78. inline void Reset(IInputStream* slave) {
  79. Slave_ = slave;
  80. }
  81. private:
  82. inline size_t BufLen() const noexcept {
  83. return AdditionalDataLength();
  84. }
  85. inline void* Buf() const noexcept {
  86. return AdditionalData();
  87. }
  88. private:
  89. IInputStream* Slave_;
  90. TMemoryInput MemInput_;
  91. };
  92. TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen)
  93. : Impl_(new (buflen) TImpl(slave))
  94. {
  95. }
  96. TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default;
  97. TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default;
  98. TBufferedInput::~TBufferedInput() = default;
  99. size_t TBufferedInput::DoRead(void* buf, size_t len) {
  100. return Impl_->Read(buf, len);
  101. }
  102. size_t TBufferedInput::DoSkip(size_t len) {
  103. return Impl_->Skip(len);
  104. }
  105. size_t TBufferedInput::DoNext(const void** ptr, size_t len) {
  106. return Impl_->Next(ptr, len);
  107. }
  108. size_t TBufferedInput::DoReadTo(TString& st, char ch) {
  109. return Impl_->ReadTo(st, ch);
  110. }
  111. void TBufferedInput::Reset(IInputStream* slave) {
  112. Impl_->Reset(slave);
  113. }
  114. class TBufferedOutputBase::TImpl {
  115. public:
  116. inline TImpl(IOutputStream* slave)
  117. : Slave_(slave)
  118. , MemOut_(nullptr, 0)
  119. , PropagateFlush_(false)
  120. , PropagateFinish_(false)
  121. {
  122. }
  123. virtual ~TImpl() = default;
  124. inline void Reset() {
  125. MemOut_.Reset(Buf(), Len());
  126. }
  127. inline size_t Next(void** ptr) {
  128. if (MemOut_.Avail() == 0) {
  129. Slave_->Write(Buf(), Stored());
  130. OnBufferExhausted();
  131. Reset();
  132. }
  133. return MemOut_.Next(ptr);
  134. }
  135. inline void Undo(size_t len) {
  136. Y_ABORT_UNLESS(len <= Stored(), "trying to undo more bytes than actually written");
  137. MemOut_.Undo(len);
  138. }
  139. inline void Write(const void* buf, size_t len) {
  140. if (len <= MemOut_.Avail()) {
  141. /*
  142. * fast path
  143. */
  144. MemOut_.Write(buf, len);
  145. } else {
  146. const size_t stored = Stored();
  147. const size_t full_len = stored + len;
  148. const size_t good_len = DownToBufferGranularity(full_len);
  149. const size_t write_from_buf = good_len - stored;
  150. using TPart = IOutputStream::TPart;
  151. alignas(TPart) char data[2 * sizeof(TPart)];
  152. TPart* parts = reinterpret_cast<TPart*>(data);
  153. TPart* end = parts;
  154. if (stored) {
  155. new (end++) TPart(Buf(), stored);
  156. }
  157. if (write_from_buf) {
  158. new (end++) TPart(buf, write_from_buf);
  159. }
  160. Slave_->Write(parts, end - parts);
  161. // grow buffer only on full flushes
  162. OnBufferExhausted();
  163. Reset();
  164. if (write_from_buf < len) {
  165. MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf);
  166. }
  167. }
  168. }
  169. inline void Write(char c) {
  170. if (Y_UNLIKELY(MemOut_.Avail() == 0)) {
  171. Slave_->Write(Buf(), Stored());
  172. OnBufferExhausted();
  173. Reset();
  174. }
  175. MemOut_.Write(c);
  176. }
  177. inline void SetFlushPropagateMode(bool mode) noexcept {
  178. PropagateFlush_ = mode;
  179. }
  180. inline void SetFinishPropagateMode(bool mode) noexcept {
  181. PropagateFinish_ = mode;
  182. }
  183. inline void Flush() {
  184. {
  185. Slave_->Write(Buf(), Stored());
  186. Reset();
  187. }
  188. if (PropagateFlush_) {
  189. Slave_->Flush();
  190. }
  191. }
  192. inline void Finish() {
  193. try {
  194. Flush();
  195. } catch (...) {
  196. try {
  197. DoFinish();
  198. } catch (...) {
  199. // ¯\_(ツ)_/¯
  200. }
  201. throw;
  202. }
  203. DoFinish();
  204. }
  205. private:
  206. inline void DoFinish() {
  207. if (PropagateFinish_) {
  208. Slave_->Finish();
  209. }
  210. }
  211. inline size_t Stored() const noexcept {
  212. return Len() - MemOut_.Avail();
  213. }
  214. inline size_t DownToBufferGranularity(size_t l) const noexcept {
  215. return l - (l % Len());
  216. }
  217. virtual void OnBufferExhausted() = 0;
  218. virtual void* Buf() const noexcept = 0;
  219. virtual size_t Len() const noexcept = 0;
  220. private:
  221. IOutputStream* Slave_;
  222. TMemoryOutput MemOut_;
  223. bool PropagateFlush_;
  224. bool PropagateFinish_;
  225. };
  226. namespace {
  227. struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> {
  228. inline TSimpleImpl(IOutputStream* slave)
  229. : TBufferedOutputBase::TImpl(slave)
  230. {
  231. Reset();
  232. }
  233. ~TSimpleImpl() override = default;
  234. void OnBufferExhausted() final {
  235. }
  236. void* Buf() const noexcept override {
  237. return AdditionalData();
  238. }
  239. size_t Len() const noexcept override {
  240. return AdditionalDataLength();
  241. }
  242. };
  243. struct TAdaptiveImpl: public TBufferedOutputBase::TImpl {
  244. enum {
  245. Step = 4096
  246. };
  247. inline TAdaptiveImpl(IOutputStream* slave)
  248. : TBufferedOutputBase::TImpl(slave)
  249. , N_(0)
  250. {
  251. B_.Reserve(Step);
  252. Reset();
  253. }
  254. ~TAdaptiveImpl() override = default;
  255. void OnBufferExhausted() final {
  256. const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10);
  257. if (c > B_.Capacity()) {
  258. TBuffer(c).Swap(B_);
  259. }
  260. }
  261. void* Buf() const noexcept override {
  262. return (void*)B_.Data();
  263. }
  264. size_t Len() const noexcept override {
  265. return B_.Capacity();
  266. }
  267. TBuffer B_;
  268. ui64 N_;
  269. };
  270. } // namespace
  271. TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave)
  272. : Impl_(new TAdaptiveImpl(slave))
  273. {
  274. }
  275. TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen)
  276. : Impl_(new (buflen) TSimpleImpl(slave))
  277. {
  278. }
  279. TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default;
  280. TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default;
  281. TBufferedOutputBase::~TBufferedOutputBase() {
  282. try {
  283. Finish();
  284. } catch (...) {
  285. // ¯\_(ツ)_/¯
  286. }
  287. }
  288. size_t TBufferedOutputBase::DoNext(void** ptr) {
  289. Y_ENSURE(Impl_.Get(), "cannot call next in finished stream");
  290. return Impl_->Next(ptr);
  291. }
  292. void TBufferedOutputBase::DoUndo(size_t len) {
  293. Y_ENSURE(Impl_.Get(), "cannot call undo in finished stream");
  294. Impl_->Undo(len);
  295. }
  296. void TBufferedOutputBase::DoWrite(const void* data, size_t len) {
  297. Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
  298. Impl_->Write(data, len);
  299. }
  300. void TBufferedOutputBase::DoWriteC(char c) {
  301. Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
  302. Impl_->Write(c);
  303. }
  304. void TBufferedOutputBase::DoFlush() {
  305. if (Impl_.Get()) {
  306. Impl_->Flush();
  307. }
  308. }
  309. void TBufferedOutputBase::DoFinish() {
  310. THolder<TImpl> impl(Impl_.Release());
  311. if (impl) {
  312. impl->Finish();
  313. }
  314. }
  315. void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept {
  316. if (Impl_.Get()) {
  317. Impl_->SetFlushPropagateMode(propagate);
  318. }
  319. }
  320. void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept {
  321. if (Impl_.Get()) {
  322. Impl_->SetFinishPropagateMode(propagate);
  323. }
  324. }
  325. TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen)
  326. : TBufferedOutputBase(slave, buflen)
  327. {
  328. }
  329. TBufferedOutput::~TBufferedOutput() = default;
  330. TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave)
  331. : TBufferedOutputBase(slave)
  332. {
  333. }
  334. TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default;