DataFile.hh 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef avro_DataFile_hh__
  19. #define avro_DataFile_hh__
  20. #include "Config.hh"
  21. #include "Encoder.hh"
  22. #include "Specific.hh"
  23. #include "Stream.hh"
  24. #include "ValidSchema.hh"
  25. #include "buffer/Buffer.hh"
  26. #include <map>
  27. #include <string>
  28. #include <vector>
  29. #include "array"
  30. #include "boost/utility.hpp"
  31. #include <boost/iostreams/filtering_stream.hpp>
  32. namespace avro {
  33. /** Specify type of compression to use when writing data files. */
  34. enum Codec {
  35. NULL_CODEC,
  36. DEFLATE_CODEC,
  37. #ifdef SNAPPY_CODEC_AVAILABLE
  38. SNAPPY_CODEC
  39. #endif
  40. };
  41. const int SyncSize = 16;
  42. /**
  43. * The sync value.
  44. */
  45. typedef std::array<uint8_t, SyncSize> DataFileSync;
  46. /**
  47. * Type-independent portion of DataFileWriter.
  48. * At any given point in time, at most one file can be written using
  49. * this object.
  50. */
  51. class AVRO_DECL DataFileWriterBase : boost::noncopyable {
  52. const std::string filename_;
  53. const ValidSchema schema_;
  54. const EncoderPtr encoderPtr_;
  55. const size_t syncInterval_;
  56. Codec codec_;
  57. std::unique_ptr<OutputStream> stream_;
  58. std::unique_ptr<OutputStream> buffer_;
  59. const DataFileSync sync_;
  60. int64_t objectCount_;
  61. typedef std::map<std::string, std::vector<uint8_t>> Metadata;
  62. Metadata metadata_;
  63. int64_t lastSync_;
  64. static std::unique_ptr<OutputStream> makeStream(const char *filename);
  65. static DataFileSync makeSync();
  66. void writeHeader();
  67. void setMetadata(const std::string &key, const std::string &value);
  68. /**
  69. * Generates a sync marker in the file.
  70. */
  71. void sync();
  72. /**
  73. * Shared constructor portion since we aren't using C++11
  74. */
  75. void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec);
  76. public:
  77. /**
  78. * Returns the current encoder for this writer.
  79. */
  80. Encoder &encoder() const { return *encoderPtr_; }
  81. /**
  82. * Returns true if the buffer has sufficient data for a sync to be
  83. * inserted.
  84. */
  85. void syncIfNeeded();
  86. /**
  87. * Returns the byte offset (within the current file) of the start of the current block being written.
  88. */
  89. uint64_t getCurrentBlockStart() const;
  90. /**
  91. * Increments the object count.
  92. */
  93. void incr() {
  94. ++objectCount_;
  95. }
  96. /**
  97. * Constructs a data file writer with the given sync interval and name.
  98. */
  99. DataFileWriterBase(const char *filename, const ValidSchema &schema,
  100. size_t syncInterval, Codec codec = NULL_CODEC);
  101. DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
  102. const ValidSchema &schema, size_t syncInterval, Codec codec);
  103. ~DataFileWriterBase();
  104. /**
  105. * Closes the current file. Once closed this datafile object cannot be
  106. * used for writing any more.
  107. */
  108. void close();
  109. /**
  110. * Returns the schema for this data file.
  111. */
  112. const ValidSchema &schema() const { return schema_; }
  113. /**
  114. * Flushes any unwritten data into the file.
  115. */
  116. void flush();
  117. };
  118. /**
  119. * An Avro datafile that can store objects of type T.
  120. */
  121. template<typename T>
  122. class DataFileWriter : boost::noncopyable {
  123. std::unique_ptr<DataFileWriterBase> base_;
  124. public:
  125. /**
  126. * Constructs a new data file.
  127. */
  128. DataFileWriter(const char *filename, const ValidSchema &schema,
  129. size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) {}
  130. DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
  131. size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) {}
  132. /**
  133. * Writes the given piece of data into the file.
  134. */
  135. void write(const T &datum) {
  136. base_->syncIfNeeded();
  137. avro::encode(base_->encoder(), datum);
  138. base_->incr();
  139. }
  140. /**
  141. * Returns the byte offset (within the current file) of the start of the current block being written.
  142. */
  143. uint64_t getCurrentBlockStart() { return base_->getCurrentBlockStart(); }
  144. /**
  145. * Closes the current file. Once closed this datafile object cannot be
  146. * used for writing any more.
  147. */
  148. void close() { base_->close(); }
  149. /**
  150. * Returns the schema for this data file.
  151. */
  152. const ValidSchema &schema() const { return base_->schema(); }
  153. /**
  154. * Flushes any unwritten data into the file.
  155. */
  156. void flush() { base_->flush(); }
  157. };
  158. /**
  159. * The type independent portion of reader.
  160. */
  161. class AVRO_DECL DataFileReaderBase : boost::noncopyable {
  162. const std::string filename_;
  163. const std::unique_ptr<InputStream> stream_;
  164. const DecoderPtr decoder_;
  165. int64_t objectCount_;
  166. bool eof_;
  167. Codec codec_;
  168. int64_t blockStart_{};
  169. int64_t blockEnd_{};
  170. ValidSchema readerSchema_;
  171. ValidSchema dataSchema_;
  172. DecoderPtr dataDecoder_;
  173. std::unique_ptr<InputStream> dataStream_;
  174. typedef std::map<std::string, std::vector<uint8_t>> Metadata;
  175. Metadata metadata_;
  176. DataFileSync sync_{};
  177. // for compressed buffer
  178. std::unique_ptr<boost::iostreams::filtering_istream> os_;
  179. std::vector<char> compressed_;
  180. std::string uncompressed;
  181. void readHeader();
  182. void readDataBlock();
  183. void doSeek(int64_t position);
  184. public:
  185. /**
  186. * Returns the current decoder for this reader.
  187. */
  188. Decoder &decoder() { return *dataDecoder_; }
  189. /**
  190. * Returns true if and only if there is more to read.
  191. */
  192. bool hasMore();
  193. /**
  194. * Decrements the number of objects yet to read.
  195. */
  196. void decr() { --objectCount_; }
  197. /**
  198. * Constructs the reader for the given file and the reader is
  199. * expected to use the schema that is used with data.
  200. * This function should be called exactly once after constructing
  201. * the DataFileReaderBase object.
  202. */
  203. explicit DataFileReaderBase(const char *filename);
  204. explicit DataFileReaderBase(std::unique_ptr<InputStream> inputStream);
  205. /**
  206. * Initializes the reader so that the reader and writer schemas
  207. * are the same.
  208. */
  209. void init();
  210. /**
  211. * Initializes the reader to read objects according to the given
  212. * schema. This gives an opportunity for the reader to see the schema
  213. * in the data file before deciding the right schema to use for reading.
  214. * This must be called exactly once after constructing the
  215. * DataFileReaderBase object.
  216. */
  217. void init(const ValidSchema &readerSchema);
  218. /**
  219. * Returns the schema for this object.
  220. */
  221. const ValidSchema &readerSchema() { return readerSchema_; }
  222. /**
  223. * Returns the schema stored with the data file.
  224. */
  225. const ValidSchema &dataSchema() { return dataSchema_; }
  226. /**
  227. * Closes the reader. No further operation is possible on this reader.
  228. */
  229. void close();
  230. /**
  231. * Move to a specific, known synchronization point, for example one returned
  232. * from tell() after sync().
  233. */
  234. void seek(int64_t position);
  235. /**
  236. * Move to the next synchronization point after a position. To process a
  237. * range of file entries, call this with the starting position, then check
  238. * pastSync() with the end point before each use of decoder().
  239. */
  240. void sync(int64_t position);
  241. /**
  242. * Return true if past the next synchronization point after a position.
  243. */
  244. bool pastSync(int64_t position);
  245. /**
  246. * Return the last synchronization point before our current position.
  247. */
  248. int64_t previousSync() const;
  249. };
  250. /**
  251. * Reads the contents of data file one after another.
  252. */
  253. template<typename T>
  254. class DataFileReader : boost::noncopyable {
  255. std::unique_ptr<DataFileReaderBase> base_;
  256. public:
  257. /**
  258. * Constructs the reader for the given file and the reader is
  259. * expected to use the given schema.
  260. */
  261. DataFileReader(const char *filename, const ValidSchema &readerSchema) : base_(new DataFileReaderBase(filename)) {
  262. base_->init(readerSchema);
  263. }
  264. DataFileReader(std::unique_ptr<InputStream> inputStream, const ValidSchema &readerSchema) : base_(new DataFileReaderBase(std::move(inputStream))) {
  265. base_->init(readerSchema);
  266. }
  267. /**
  268. * Constructs the reader for the given file and the reader is
  269. * expected to use the schema that is used with data.
  270. */
  271. explicit DataFileReader(const char *filename) : base_(new DataFileReaderBase(filename)) {
  272. base_->init();
  273. }
  274. explicit DataFileReader(std::unique_ptr<InputStream> inputStream) : base_(new DataFileReaderBase(std::move(inputStream))) {
  275. base_->init();
  276. }
  277. /**
  278. * Constructs a reader using the reader base. This form of constructor
  279. * allows the user to examine the schema of a given file and then
  280. * decide to use the right type of data to be deserialize. Without this
  281. * the user must know the type of data for the template _before_
  282. * he knows the schema within the file.
  283. * The schema present in the data file will be used for reading
  284. * from this reader.
  285. */
  286. explicit DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) {
  287. base_->init();
  288. }
  289. /**
  290. * Constructs a reader using the reader base. This form of constructor
  291. * allows the user to examine the schema of a given file and then
  292. * decide to use the right type of data to be deserialize. Without this
  293. * the user must know the type of data for the template _before_
  294. * he knows the schema within the file.
  295. * The argument readerSchema will be used for reading
  296. * from this reader.
  297. */
  298. DataFileReader(std::unique_ptr<DataFileReaderBase> base,
  299. const ValidSchema &readerSchema) : base_(std::move(base)) {
  300. base_->init(readerSchema);
  301. }
  302. /**
  303. * Reads the next entry from the data file.
  304. * \return true if an object has been successfully read into \p datum and
  305. * false if there are no more entries in the file.
  306. */
  307. bool read(T &datum) {
  308. if (base_->hasMore()) {
  309. base_->decr();
  310. avro::decode(base_->decoder(), datum);
  311. return true;
  312. }
  313. return false;
  314. }
  315. /**
  316. * Returns the schema for this object.
  317. */
  318. const ValidSchema &readerSchema() { return base_->readerSchema(); }
  319. /**
  320. * Returns the schema stored with the data file.
  321. */
  322. const ValidSchema &dataSchema() { return base_->dataSchema(); }
  323. /**
  324. * Closes the reader. No further operation is possible on this reader.
  325. */
  326. void close() { return base_->close(); }
  327. /**
  328. * Move to a specific, known synchronization point, for example one returned
  329. * from previousSync().
  330. */
  331. void seek(int64_t position) { base_->seek(position); }
  332. /**
  333. * Move to the next synchronization point after a position. To process a
  334. * range of file entries, call this with the starting position, then check
  335. * pastSync() with the end point before each call to read().
  336. */
  337. void sync(int64_t position) { base_->sync(position); }
  338. /**
  339. * Return true if past the next synchronization point after a position.
  340. */
  341. bool pastSync(int64_t position) { return base_->pastSync(position); }
  342. /**
  343. * Return the last synchronization point before our current position.
  344. */
  345. int64_t previousSync() { return base_->previousSync(); }
  346. };
  347. } // namespace avro
  348. #endif