Stream.hh 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef avro_Stream_hh__
  19. #define avro_Stream_hh__
  20. #include <cstdint>
  21. #include <cstring>
  22. #include <memory>
  23. #include "boost/utility.hpp"
  24. #include "Config.hh"
  25. #include "Exception.hh"
  26. namespace avro {
  27. /**
  28. * A no-copy input stream.
  29. */
  30. class AVRO_DECL InputStream : boost::noncopyable {
  31. protected:
  32. /**
  33. * An empty constructor.
  34. */
  35. InputStream() = default;
  36. public:
  37. /**
  38. * Destructor.
  39. */
  40. virtual ~InputStream() = default;
  41. /**
  42. * Returns some of available data.
  43. *
  44. * Returns true if some data is available, false if no more data is
  45. * available or an error has occurred.
  46. */
  47. virtual bool next(const uint8_t **data, size_t *len) = 0;
  48. /**
  49. * "Returns" back some of the data to the stream. The returned
  50. * data must be less than what was obtained in the last call to
  51. * next().
  52. */
  53. virtual void backup(size_t len) = 0;
  54. /**
  55. * Skips number of bytes specified by len.
  56. */
  57. virtual void skip(size_t len) = 0;
  58. /**
  59. * Returns the number of bytes read from this stream so far.
  60. * All the bytes made available through next are considered
  61. * to be used unless, returned back using backup.
  62. */
  63. virtual size_t byteCount() const = 0;
  64. };
  65. typedef std::unique_ptr<InputStream> InputStreamPtr;
  66. /**
  67. * An InputStream which also supports seeking to a specific offset.
  68. */
  69. class AVRO_DECL SeekableInputStream : public InputStream {
  70. protected:
  71. /**
  72. * An empty constructor.
  73. */
  74. SeekableInputStream() = default;
  75. public:
  76. /**
  77. * Destructor.
  78. */
  79. ~SeekableInputStream() override = default;
  80. /**
  81. * Seek to a specific position in the stream. This may invalidate pointers
  82. * returned from next(). This will also reset byteCount() to the given
  83. * position.
  84. */
  85. virtual void seek(int64_t position) = 0;
  86. };
  87. typedef std::unique_ptr<SeekableInputStream> SeekableInputStreamPtr;
  88. /**
  89. * A no-copy output stream.
  90. */
  91. class AVRO_DECL OutputStream : boost::noncopyable {
  92. protected:
  93. /**
  94. * An empty constructor.
  95. */
  96. OutputStream() = default;
  97. public:
  98. /**
  99. * Destructor.
  100. */
  101. virtual ~OutputStream() = default;
  102. /**
  103. * Returns a buffer that can be written into.
  104. * On successful return, data has the pointer to the buffer
  105. * and len has the number of bytes available at data.
  106. */
  107. virtual bool next(uint8_t **data, size_t *len) = 0;
  108. /**
  109. * "Returns" back to the stream some of the buffer obtained
  110. * from in the last call to next().
  111. */
  112. virtual void backup(size_t len) = 0;
  113. /**
  114. * Number of bytes written so far into this stream. The whole buffer
  115. * returned by next() is assumed to be written unless some of
  116. * it was returned using backup().
  117. */
  118. virtual uint64_t byteCount() const = 0;
  119. /**
  120. * Flushes any data remaining in the buffer to the stream's underlying
  121. * store, if any.
  122. */
  123. virtual void flush() = 0;
  124. };
  125. typedef std::unique_ptr<OutputStream> OutputStreamPtr;
  126. /**
  127. * Returns a new OutputStream, which grows in memory chunks of specified size.
  128. */
  129. AVRO_DECL OutputStreamPtr memoryOutputStream(size_t chunkSize = 4 * 1024);
  130. /**
  131. * Returns a new InputStream, with the data from the given byte array.
  132. * It does not copy the data, the byte array should remain valid
  133. * until the InputStream is used.
  134. */
  135. AVRO_DECL InputStreamPtr memoryInputStream(const uint8_t *data, size_t len);
  136. /**
  137. * Returns a new InputStream with the contents written into an
  138. * OutputStream. The output stream must have been returned by
  139. * an earlier call to memoryOutputStream(). The contents for the new
  140. * InputStream are the snapshot of the output stream. One can construct
  141. * any number of memory input stream from a single memory output stream.
  142. */
  143. AVRO_DECL InputStreamPtr memoryInputStream(const OutputStream &source);
  144. /**
  145. * Returns the contents written so far into the output stream, which should
  146. * be a memory output stream. That is it must have been returned by a previous
  147. * call to memoryOutputStream().
  148. */
  149. AVRO_DECL std::shared_ptr<std::vector<uint8_t>> snapshot(const OutputStream &source);
  150. /**
  151. * Returns a new OutputStream whose contents would be stored in a file.
  152. * Data is written in chunks of given buffer size.
  153. *
  154. * If there is a file with the given name, it is truncated and overwritten.
  155. * If there is no file with the given name, it is created.
  156. */
  157. AVRO_DECL OutputStreamPtr fileOutputStream(const char *filename,
  158. size_t bufferSize = 8 * 1024);
  159. /**
  160. * Returns a new InputStream whose contents come from the given file.
  161. * Data is read in chunks of given buffer size.
  162. */
  163. AVRO_DECL InputStreamPtr fileInputStream(
  164. const char *filename, size_t bufferSize = 8 * 1024);
  165. AVRO_DECL SeekableInputStreamPtr fileSeekableInputStream(
  166. const char *filename, size_t bufferSize = 8 * 1024);
  167. /**
  168. * Returns a new OutputStream whose contents will be sent to the given
  169. * std::ostream. The std::ostream object should outlive the returned
  170. * OutputStream.
  171. */
  172. AVRO_DECL OutputStreamPtr ostreamOutputStream(std::ostream &os,
  173. size_t bufferSize = 8 * 1024);
  174. /**
  175. * Returns a new InputStream whose contents come from the given
  176. * std::istream. The std::istream object should outlive the returned
  177. * InputStream.
  178. */
  179. AVRO_DECL InputStreamPtr istreamInputStream(
  180. std::istream &in, size_t bufferSize = 8 * 1024);
  181. /**
  182. * Returns a new InputStream whose contents come from the given
  183. * std::istream. Use this instead of istreamInputStream if
  184. * the istream does not support seekg (e.g. compressed streams).
  185. * The returned InputStream would read off bytes instead of seeking.
  186. * Of, course it has a performance penalty when reading instead of seeking;
  187. * So, use this only when seekg does not work.
  188. * The std::istream object should outlive the returned
  189. * InputStream.
  190. */
  191. AVRO_DECL InputStreamPtr nonSeekableIstreamInputStream(
  192. std::istream &is, size_t bufferSize = 8 * 1024);
  193. /** A convenience class for reading from an InputStream */
  194. struct StreamReader {
  195. /**
  196. * The underlying input stream.
  197. */
  198. InputStream *in_;
  199. /**
  200. * The next location to read from.
  201. */
  202. const uint8_t *next_;
  203. /**
  204. * One past the last valid location.
  205. */
  206. const uint8_t *end_;
  207. /**
  208. * Constructs an empty reader.
  209. */
  210. StreamReader() : in_(nullptr), next_(nullptr), end_(nullptr) {}
  211. /**
  212. * Constructs a reader with the given underlying stream.
  213. */
  214. explicit StreamReader(InputStream &in) : in_(nullptr), next_(nullptr), end_(nullptr) { reset(in); }
  215. /**
  216. * Replaces the current input stream with the given one after backing up
  217. * the original one if required.
  218. */
  219. void reset(InputStream &is) {
  220. if (in_ != nullptr && end_ != next_) {
  221. in_->backup(end_ - next_);
  222. }
  223. in_ = &is;
  224. next_ = end_ = nullptr;
  225. }
  226. /**
  227. * Read just one byte from the underlying stream. If there are no
  228. * more data, throws an exception.
  229. */
  230. uint8_t read() {
  231. if (next_ == end_) {
  232. more();
  233. }
  234. return *next_++;
  235. }
  236. /**
  237. * Reads the given number of bytes from the underlying stream.
  238. * If there are not that many bytes, throws an exception.
  239. */
  240. void readBytes(uint8_t *b, size_t n) {
  241. while (n > 0) {
  242. if (next_ == end_) {
  243. more();
  244. }
  245. size_t q = end_ - next_;
  246. if (q > n) {
  247. q = n;
  248. }
  249. ::memcpy(b, next_, q);
  250. next_ += q;
  251. b += q;
  252. n -= q;
  253. }
  254. }
  255. /**
  256. * Skips the given number of bytes. Of there are not so that many
  257. * bytes, throws an exception.
  258. */
  259. void skipBytes(size_t n) {
  260. if (n > static_cast<size_t>(end_ - next_)) {
  261. n -= end_ - next_;
  262. next_ = end_;
  263. in_->skip(n);
  264. } else {
  265. next_ += n;
  266. }
  267. }
  268. /**
  269. * Get as many byes from the underlying stream as possible in a single
  270. * chunk.
  271. * \return true if some data could be obtained. False is no more
  272. * data is available on the stream.
  273. */
  274. bool fill() {
  275. size_t n = 0;
  276. while (in_->next(&next_, &n)) {
  277. if (n != 0) {
  278. end_ = next_ + n;
  279. return true;
  280. }
  281. }
  282. return false;
  283. }
  284. /**
  285. * Tries to get more data and if it cannot, throws an exception.
  286. */
  287. void more() {
  288. if (!fill()) {
  289. throw Exception("EOF reached");
  290. }
  291. }
  292. /**
  293. * Returns true if and only if the end of stream is not reached.
  294. */
  295. bool hasMore() {
  296. return next_ != end_ || fill();
  297. }
  298. /**
  299. * Returns unused bytes back to the underlying stream.
  300. * If unRead is true the last byte read is also pushed back.
  301. */
  302. void drain(bool unRead) {
  303. if (unRead) {
  304. --next_;
  305. }
  306. in_->backup(end_ - next_);
  307. end_ = next_;
  308. }
  309. };
  310. /**
  311. * A convenience class to write data into an OutputStream.
  312. */
  313. struct StreamWriter {
  314. /**
  315. * The underlying output stream for this writer.
  316. */
  317. OutputStream *out_;
  318. /**
  319. * The next location to write to.
  320. */
  321. uint8_t *next_;
  322. /**
  323. * One past the last location one can write to.
  324. */
  325. uint8_t *end_;
  326. /**
  327. * Constructs a writer with no underlying stream.
  328. */
  329. StreamWriter() : out_(nullptr), next_(nullptr), end_(nullptr) {}
  330. /**
  331. * Constructs a new writer with the given underlying stream.
  332. */
  333. explicit StreamWriter(OutputStream &out) : out_(nullptr), next_(nullptr), end_(nullptr) { reset(out); }
  334. /**
  335. * Replaces the current underlying stream with a new one.
  336. * If required, it backs up unused bytes in the previous stream.
  337. */
  338. void reset(OutputStream &os) {
  339. if (out_ != nullptr && end_ != next_) {
  340. out_->backup(end_ - next_);
  341. }
  342. out_ = &os;
  343. next_ = end_;
  344. }
  345. /**
  346. * Writes a single byte.
  347. */
  348. void write(uint8_t c) {
  349. if (next_ == end_) {
  350. more();
  351. }
  352. *next_++ = c;
  353. }
  354. /**
  355. * Writes the specified number of bytes starting at \p b.
  356. */
  357. void writeBytes(const uint8_t *b, size_t n) {
  358. while (n > 0) {
  359. if (next_ == end_) {
  360. more();
  361. }
  362. size_t q = end_ - next_;
  363. if (q > n) {
  364. q = n;
  365. }
  366. ::memcpy(next_, b, q);
  367. next_ += q;
  368. b += q;
  369. n -= q;
  370. }
  371. }
  372. /**
  373. * backs up upto the currently written data and flushes the
  374. * underlying stream.
  375. */
  376. void flush() {
  377. if (next_ != end_) {
  378. out_->backup(end_ - next_);
  379. next_ = end_;
  380. }
  381. out_->flush();
  382. }
  383. /**
  384. * Return the number of bytes written so far. For a meaningful
  385. * result, call this after a flush().
  386. */
  387. int64_t byteCount() const {
  388. return out_->byteCount();
  389. }
  390. /**
  391. * Gets more space to write to. Throws an exception it cannot.
  392. */
  393. void more() {
  394. size_t n = 0;
  395. while (out_->next(&next_, &n)) {
  396. if (n != 0) {
  397. end_ = next_ + n;
  398. return;
  399. }
  400. }
  401. throw Exception("EOF reached");
  402. }
  403. };
  404. /**
  405. * A convenience function to copy all the contents of an input stream into
  406. * an output stream.
  407. */
  408. inline void copy(InputStream &in, OutputStream &out) {
  409. const uint8_t *p = nullptr;
  410. size_t n = 0;
  411. StreamWriter w(out);
  412. while (in.next(&p, &n)) {
  413. w.writeBytes(p, n);
  414. }
  415. w.flush();
  416. }
  417. } // namespace avro
  418. #endif