DataFile.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "DataFile.hh"
  19. #include "Compiler.hh"
  20. #include "Exception.hh"
  21. #include <sstream>
  22. #include <boost/crc.hpp> // for boost::crc_32_type
  23. #include <boost/iostreams/device/file.hpp>
  24. #include <boost/iostreams/filter/gzip.hpp>
  25. #include <boost/iostreams/filter/zlib.hpp>
  26. #include <boost/random/mersenne_twister.hpp>
  27. #ifdef SNAPPY_CODEC_AVAILABLE
  28. #include <snappy.h>
  29. #endif
  30. namespace avro {
  31. using std::copy;
  32. using std::istringstream;
  33. using std::ostringstream;
  34. using std::string;
  35. using std::unique_ptr;
  36. using std::vector;
  37. using std::array;
  38. namespace {
  39. const string AVRO_SCHEMA_KEY("avro.schema");
  40. const string AVRO_CODEC_KEY("avro.codec");
  41. const string AVRO_NULL_CODEC("null");
  42. const string AVRO_DEFLATE_CODEC("deflate");
  43. #ifdef SNAPPY_CODEC_AVAILABLE
  44. const string AVRO_SNAPPY_CODEC = "snappy";
  45. #endif
  46. const size_t minSyncInterval = 32;
  47. const size_t maxSyncInterval = 1u << 30;
  48. boost::iostreams::zlib_params get_zlib_params() {
  49. boost::iostreams::zlib_params ret;
  50. ret.method = boost::iostreams::zlib::deflated;
  51. ret.noheader = true;
  52. return ret;
  53. }
  54. } // namespace
  55. DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval,
  56. Codec codec) : filename_(filename),
  57. schema_(schema),
  58. encoderPtr_(binaryEncoder()),
  59. syncInterval_(syncInterval),
  60. codec_(codec),
  61. stream_(fileOutputStream(filename)),
  62. buffer_(memoryOutputStream()),
  63. sync_(makeSync()),
  64. objectCount_(0),
  65. lastSync_(0) {
  66. init(schema, syncInterval, codec);
  67. }
  68. DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
  69. const ValidSchema &schema, size_t syncInterval, Codec codec) : filename_(),
  70. schema_(schema),
  71. encoderPtr_(binaryEncoder()),
  72. syncInterval_(syncInterval),
  73. codec_(codec),
  74. stream_(std::move(outputStream)),
  75. buffer_(memoryOutputStream()),
  76. sync_(makeSync()),
  77. objectCount_(0),
  78. lastSync_(0) {
  79. init(schema, syncInterval, codec);
  80. }
  81. void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec) {
  82. if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
  83. throw Exception(boost::format("Invalid sync interval: %1%. "
  84. "Should be between %2% and %3%")
  85. % syncInterval % minSyncInterval % maxSyncInterval);
  86. }
  87. setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
  88. if (codec_ == NULL_CODEC) {
  89. setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
  90. } else if (codec_ == DEFLATE_CODEC) {
  91. setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
  92. #ifdef SNAPPY_CODEC_AVAILABLE
  93. } else if (codec_ == SNAPPY_CODEC) {
  94. setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
  95. #endif
  96. } else {
  97. throw Exception(boost::format("Unknown codec: %1%") % codec);
  98. }
  99. setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
  100. writeHeader();
  101. encoderPtr_->init(*buffer_);
  102. lastSync_ = stream_->byteCount();
  103. }
  104. DataFileWriterBase::~DataFileWriterBase() {
  105. if (stream_) {
  106. close();
  107. }
  108. }
  109. void DataFileWriterBase::close() {
  110. flush();
  111. stream_.reset();
  112. }
  113. void DataFileWriterBase::sync() {
  114. encoderPtr_->flush();
  115. encoderPtr_->init(*stream_);
  116. avro::encode(*encoderPtr_, objectCount_);
  117. if (codec_ == NULL_CODEC) {
  118. int64_t byteCount = buffer_->byteCount();
  119. avro::encode(*encoderPtr_, byteCount);
  120. encoderPtr_->flush();
  121. std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
  122. copy(*in, *stream_);
  123. } else if (codec_ == DEFLATE_CODEC) {
  124. std::vector<char> buf;
  125. {
  126. boost::iostreams::filtering_ostream os;
  127. os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
  128. os.push(boost::iostreams::back_inserter(buf));
  129. const uint8_t *data;
  130. size_t len;
  131. std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
  132. while (input->next(&data, &len)) {
  133. boost::iostreams::write(os, reinterpret_cast<const char *>(data), len);
  134. }
  135. } // make sure all is flushed
  136. std::unique_ptr<InputStream> in = memoryInputStream(
  137. reinterpret_cast<const uint8_t *>(buf.data()), buf.size());
  138. int64_t byteCount = buf.size();
  139. avro::encode(*encoderPtr_, byteCount);
  140. encoderPtr_->flush();
  141. copy(*in, *stream_);
  142. #ifdef SNAPPY_CODEC_AVAILABLE
  143. } else if (codec_ == SNAPPY_CODEC) {
  144. std::vector<char> temp;
  145. std::string compressed;
  146. boost::crc_32_type crc;
  147. {
  148. boost::iostreams::filtering_ostream os;
  149. os.push(boost::iostreams::back_inserter(temp));
  150. const uint8_t *data;
  151. size_t len;
  152. std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
  153. while (input->next(&data, &len)) {
  154. boost::iostreams::write(os, reinterpret_cast<const char *>(data),
  155. len);
  156. }
  157. } // make sure all is flushed
  158. crc.process_bytes(reinterpret_cast<const char *>(temp.data()),
  159. temp.size());
  160. // For Snappy, add the CRC32 checksum
  161. int32_t checksum = crc();
  162. // Now compress
  163. size_t compressed_size = snappy::Compress(
  164. reinterpret_cast<const char *>(temp.data()), temp.size(),
  165. &compressed);
  166. temp.clear();
  167. {
  168. boost::iostreams::filtering_ostream os;
  169. os.push(boost::iostreams::back_inserter(temp));
  170. boost::iostreams::write(os, compressed.c_str(), compressed_size);
  171. }
  172. temp.push_back((checksum >> 24) & 0xFF);
  173. temp.push_back((checksum >> 16) & 0xFF);
  174. temp.push_back((checksum >> 8) & 0xFF);
  175. temp.push_back(checksum & 0xFF);
  176. std::unique_ptr<InputStream> in = memoryInputStream(
  177. reinterpret_cast<const uint8_t *>(temp.data()), temp.size());
  178. int64_t byteCount = temp.size();
  179. avro::encode(*encoderPtr_, byteCount);
  180. encoderPtr_->flush();
  181. copy(*in, *stream_);
  182. #endif
  183. }
  184. encoderPtr_->init(*stream_);
  185. avro::encode(*encoderPtr_, sync_);
  186. encoderPtr_->flush();
  187. lastSync_ = stream_->byteCount();
  188. buffer_ = memoryOutputStream();
  189. encoderPtr_->init(*buffer_);
  190. objectCount_ = 0;
  191. }
  192. void DataFileWriterBase::syncIfNeeded() {
  193. encoderPtr_->flush();
  194. if (buffer_->byteCount() >= syncInterval_) {
  195. sync();
  196. }
  197. }
  198. uint64_t DataFileWriterBase::getCurrentBlockStart() const {
  199. return lastSync_;
  200. }
  201. void DataFileWriterBase::flush() {
  202. sync();
  203. }
  204. DataFileSync DataFileWriterBase::makeSync() {
  205. boost::mt19937 random(static_cast<uint32_t>(time(nullptr)));
  206. DataFileSync sync;
  207. std::generate(sync.begin(), sync.end(), random);
  208. return sync;
  209. }
  210. typedef array<uint8_t, 4> Magic;
  211. static Magic magic = {{'O', 'b', 'j', '\x01'}};
  212. void DataFileWriterBase::writeHeader() {
  213. encoderPtr_->init(*stream_);
  214. avro::encode(*encoderPtr_, magic);
  215. avro::encode(*encoderPtr_, metadata_);
  216. avro::encode(*encoderPtr_, sync_);
  217. encoderPtr_->flush();
  218. }
  219. void DataFileWriterBase::setMetadata(const string &key, const string &value) {
  220. vector<uint8_t> v(value.size());
  221. copy(value.begin(), value.end(), v.begin());
  222. metadata_[key] = v;
  223. }
  224. DataFileReaderBase::DataFileReaderBase(const char *filename) : filename_(filename), stream_(fileSeekableInputStream(filename)),
  225. decoder_(binaryDecoder()), objectCount_(0), eof_(false),
  226. codec_(NULL_CODEC), blockStart_(-1), blockEnd_(-1) {
  227. readHeader();
  228. }
  229. DataFileReaderBase::DataFileReaderBase(std::unique_ptr<InputStream> inputStream) : stream_(std::move(inputStream)),
  230. decoder_(binaryDecoder()), objectCount_(0), eof_(false), codec_(NULL_CODEC) {
  231. readHeader();
  232. }
  233. void DataFileReaderBase::init() {
  234. readerSchema_ = dataSchema_;
  235. dataDecoder_ = binaryDecoder();
  236. readDataBlock();
  237. }
  238. void DataFileReaderBase::init(const ValidSchema &readerSchema) {
  239. readerSchema_ = readerSchema;
  240. dataDecoder_ = (readerSchema_.toJson(true) != dataSchema_.toJson(true)) ? resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) : binaryDecoder();
  241. readDataBlock();
  242. }
  243. static void drain(InputStream &in) {
  244. const uint8_t *p = nullptr;
  245. size_t n = 0;
  246. while (in.next(&p, &n))
  247. ;
  248. }
  249. char hex(unsigned int x) {
  250. return static_cast<char>(x + (x < 10 ? '0' : ('a' - 10)));
  251. }
  252. std::ostream &operator<<(std::ostream &os, const DataFileSync &s) {
  253. for (uint8_t i : s) {
  254. os << hex(i / 16) << hex(i % 16) << ' ';
  255. }
  256. os << std::endl;
  257. return os;
  258. }
  259. bool DataFileReaderBase::hasMore() {
  260. for (;;) {
  261. if (eof_) {
  262. return false;
  263. } else if (objectCount_ != 0) {
  264. return true;
  265. }
  266. dataDecoder_->init(*dataStream_);
  267. drain(*dataStream_);
  268. DataFileSync s;
  269. decoder_->init(*stream_);
  270. avro::decode(*decoder_, s);
  271. if (s != sync_) {
  272. throw Exception("Sync mismatch");
  273. }
  274. readDataBlock();
  275. }
  276. }
  277. class BoundedInputStream : public InputStream {
  278. InputStream &in_;
  279. size_t limit_;
  280. bool next(const uint8_t **data, size_t *len) final {
  281. if (limit_ != 0 && in_.next(data, len)) {
  282. if (*len > limit_) {
  283. in_.backup(*len - limit_);
  284. *len = limit_;
  285. }
  286. limit_ -= *len;
  287. return true;
  288. }
  289. return false;
  290. }
  291. void backup(size_t len) final {
  292. in_.backup(len);
  293. limit_ += len;
  294. }
  295. void skip(size_t len) final {
  296. if (len > limit_) {
  297. len = limit_;
  298. }
  299. in_.skip(len);
  300. limit_ -= len;
  301. }
  302. size_t byteCount() const final {
  303. return in_.byteCount();
  304. }
  305. public:
  306. BoundedInputStream(InputStream &in, size_t limit) : in_(in), limit_(limit) {}
  307. };
  308. unique_ptr<InputStream> boundedInputStream(InputStream &in, size_t limit) {
  309. return unique_ptr<InputStream>(new BoundedInputStream(in, limit));
  310. }
  311. void DataFileReaderBase::readDataBlock() {
  312. decoder_->init(*stream_);
  313. blockStart_ = stream_->byteCount();
  314. const uint8_t *p = nullptr;
  315. size_t n = 0;
  316. if (!stream_->next(&p, &n)) {
  317. eof_ = true;
  318. return;
  319. }
  320. stream_->backup(n);
  321. avro::decode(*decoder_, objectCount_);
  322. int64_t byteCount;
  323. avro::decode(*decoder_, byteCount);
  324. decoder_->init(*stream_);
  325. blockEnd_ = stream_->byteCount() + byteCount;
  326. unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
  327. if (codec_ == NULL_CODEC) {
  328. dataDecoder_->init(*st);
  329. dataStream_ = std::move(st);
  330. #ifdef SNAPPY_CODEC_AVAILABLE
  331. } else if (codec_ == SNAPPY_CODEC) {
  332. boost::crc_32_type crc;
  333. uint32_t checksum = 0;
  334. compressed_.clear();
  335. uncompressed.clear();
  336. const uint8_t *data;
  337. size_t len;
  338. while (st->next(&data, &len)) {
  339. compressed_.insert(compressed_.end(), data, data + len);
  340. }
  341. len = compressed_.size();
  342. if (len < 4)
  343. throw Exception("Cannot read compressed data, expected at least 4 bytes, got " + std::to_string(len));
  344. int b1 = compressed_[len - 4] & 0xFF;
  345. int b2 = compressed_[len - 3] & 0xFF;
  346. int b3 = compressed_[len - 2] & 0xFF;
  347. int b4 = compressed_[len - 1] & 0xFF;
  348. checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
  349. if (!snappy::Uncompress(reinterpret_cast<const char *>(compressed_.data()),
  350. len - 4, &uncompressed)) {
  351. throw Exception(
  352. "Snappy Compression reported an error when decompressing");
  353. }
  354. crc.process_bytes(uncompressed.c_str(), uncompressed.size());
  355. uint32_t c = crc();
  356. if (checksum != c) {
  357. throw Exception(
  358. boost::format("Checksum did not match for Snappy compression: Expected: %1%, computed: %2%") % checksum
  359. % c);
  360. }
  361. os_.reset(new boost::iostreams::filtering_istream());
  362. os_->push(
  363. boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
  364. uncompressed.size()));
  365. std::unique_ptr<InputStream> in = istreamInputStream(*os_);
  366. dataDecoder_->init(*in);
  367. dataStream_ = std::move(in);
  368. #endif
  369. } else {
  370. compressed_.clear();
  371. const uint8_t *data;
  372. size_t len;
  373. while (st->next(&data, &len)) {
  374. compressed_.insert(compressed_.end(), data, data + len);
  375. }
  376. os_.reset(new boost::iostreams::filtering_istream());
  377. os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
  378. os_->push(boost::iostreams::basic_array_source<char>(
  379. compressed_.data(), compressed_.size()));
  380. std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
  381. dataDecoder_->init(*in);
  382. dataStream_ = std::move(in);
  383. }
  384. }
  385. void DataFileReaderBase::close() {
  386. }
  387. static string toString(const vector<uint8_t> &v) {
  388. string result;
  389. result.resize(v.size());
  390. copy(v.begin(), v.end(), result.begin());
  391. return result;
  392. }
  393. static ValidSchema makeSchema(const vector<uint8_t> &v) {
  394. istringstream iss(toString(v));
  395. ValidSchema vs;
  396. compileJsonSchema(iss, vs);
  397. return ValidSchema(vs);
  398. }
  399. void DataFileReaderBase::readHeader() {
  400. decoder_->init(*stream_);
  401. Magic m;
  402. avro::decode(*decoder_, m);
  403. if (magic != m) {
  404. throw Exception("Invalid data file. Magic does not match: "
  405. + filename_);
  406. }
  407. avro::decode(*decoder_, metadata_);
  408. Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY);
  409. if (it == metadata_.end()) {
  410. throw Exception("No schema in metadata");
  411. }
  412. dataSchema_ = makeSchema(it->second);
  413. if (!readerSchema_.root()) {
  414. readerSchema_ = dataSchema();
  415. }
  416. it = metadata_.find(AVRO_CODEC_KEY);
  417. if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
  418. codec_ = DEFLATE_CODEC;
  419. #ifdef SNAPPY_CODEC_AVAILABLE
  420. } else if (it != metadata_.end()
  421. && toString(it->second) == AVRO_SNAPPY_CODEC) {
  422. codec_ = SNAPPY_CODEC;
  423. #endif
  424. } else {
  425. codec_ = NULL_CODEC;
  426. if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
  427. throw Exception("Unknown codec in data file: " + toString(it->second));
  428. }
  429. }
  430. avro::decode(*decoder_, sync_);
  431. decoder_->init(*stream_);
  432. blockStart_ = stream_->byteCount();
  433. }
  434. void DataFileReaderBase::doSeek(int64_t position) {
  435. if (auto *ss = dynamic_cast<SeekableInputStream *>(stream_.get())) {
  436. if (!eof_) {
  437. dataDecoder_->init(*dataStream_);
  438. drain(*dataStream_);
  439. }
  440. decoder_->init(*stream_);
  441. ss->seek(position);
  442. eof_ = false;
  443. } else {
  444. throw Exception("seek not supported on non-SeekableInputStream");
  445. }
  446. }
  447. void DataFileReaderBase::seek(int64_t position) {
  448. doSeek(position);
  449. readDataBlock();
  450. }
  451. void DataFileReaderBase::sync(int64_t position) {
  452. doSeek(position);
  453. DataFileSync sync_buffer;
  454. const uint8_t *p = nullptr;
  455. size_t n = 0;
  456. size_t i = 0;
  457. while (i < SyncSize) {
  458. if (n == 0 && !stream_->next(&p, &n)) {
  459. eof_ = true;
  460. return;
  461. }
  462. int len =
  463. std::min(static_cast<size_t>(SyncSize - i), n);
  464. memcpy(&sync_buffer[i], p, len);
  465. p += len;
  466. n -= len;
  467. i += len;
  468. }
  469. for (;;) {
  470. size_t j = 0;
  471. for (; j < SyncSize; ++j) {
  472. if (sync_[j] != sync_buffer[(i + j) % SyncSize]) {
  473. break;
  474. }
  475. }
  476. if (j == SyncSize) {
  477. // Found the sync marker!
  478. break;
  479. }
  480. if (n == 0 && !stream_->next(&p, &n)) {
  481. eof_ = true;
  482. return;
  483. }
  484. sync_buffer[i++ % SyncSize] = *p++;
  485. --n;
  486. }
  487. stream_->backup(n);
  488. readDataBlock();
  489. }
  490. bool DataFileReaderBase::pastSync(int64_t position) {
  491. return !hasMore() || blockStart_ >= position + SyncSize;
  492. }
  493. int64_t DataFileReaderBase::previousSync() const {
  494. return blockStart_;
  495. }
  496. } // namespace avro