DataFile.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "DataFile.hh"
  19. #include "Compiler.hh"
  20. #include "Exception.hh"
  21. #include <sstream>
  22. #include <boost/crc.hpp> // for boost::crc_32_type
  23. #include <boost/iostreams/device/file.hpp>
  24. #include <boost/iostreams/filter/gzip.hpp>
  25. #include <boost/iostreams/filter/zlib.hpp>
  26. #include <boost/random/mersenne_twister.hpp>
  27. #ifdef SNAPPY_CODEC_AVAILABLE
  28. #include <snappy.h>
  29. #endif
  30. namespace avro {
  31. using std::copy;
  32. using std::istringstream;
  33. using std::ostringstream;
  34. using std::string;
  35. using std::unique_ptr;
  36. using std::vector;
  37. using std::array;
  38. namespace {
  39. const string AVRO_SCHEMA_KEY("avro.schema");
  40. const string AVRO_CODEC_KEY("avro.codec");
  41. const string AVRO_NULL_CODEC("null");
  42. const string AVRO_DEFLATE_CODEC("deflate");
  43. #ifdef SNAPPY_CODEC_AVAILABLE
  44. const string AVRO_SNAPPY_CODEC = "snappy";
  45. #endif
  46. const size_t minSyncInterval = 32;
  47. const size_t maxSyncInterval = 1u << 30;
  48. boost::iostreams::zlib_params get_zlib_params() {
  49. boost::iostreams::zlib_params ret;
  50. ret.method = boost::iostreams::zlib::deflated;
  51. ret.noheader = true;
  52. return ret;
  53. }
  54. } // namespace
  55. DataFileWriterBase::DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval,
  56. Codec codec) : filename_(filename),
  57. schema_(schema),
  58. encoderPtr_(binaryEncoder()),
  59. syncInterval_(syncInterval),
  60. codec_(codec),
  61. stream_(fileOutputStream(filename)),
  62. buffer_(memoryOutputStream()),
  63. sync_(makeSync()),
  64. objectCount_(0),
  65. lastSync_(0) {
  66. init(schema, syncInterval, codec);
  67. }
  68. DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
  69. const ValidSchema &schema, size_t syncInterval, Codec codec) : filename_(),
  70. schema_(schema),
  71. encoderPtr_(binaryEncoder()),
  72. syncInterval_(syncInterval),
  73. codec_(codec),
  74. stream_(std::move(outputStream)),
  75. buffer_(memoryOutputStream()),
  76. sync_(makeSync()),
  77. objectCount_(0),
  78. lastSync_(0) {
  79. init(schema, syncInterval, codec);
  80. }
  81. void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, const Codec &codec) {
  82. if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
  83. throw Exception(
  84. "Invalid sync interval: {}. Should be between {} and {}",
  85. syncInterval, minSyncInterval, maxSyncInterval);
  86. }
  87. setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
  88. if (codec_ == NULL_CODEC) {
  89. setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
  90. } else if (codec_ == DEFLATE_CODEC) {
  91. setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
  92. #ifdef SNAPPY_CODEC_AVAILABLE
  93. } else if (codec_ == SNAPPY_CODEC) {
  94. setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
  95. #endif
  96. } else {
  97. throw Exception("Unknown codec: {}", int(codec));
  98. }
  99. setMetadata(AVRO_SCHEMA_KEY, schema.toJson(false));
  100. writeHeader();
  101. encoderPtr_->init(*buffer_);
  102. lastSync_ = stream_->byteCount();
  103. }
  104. DataFileWriterBase::~DataFileWriterBase() {
  105. if (stream_) {
  106. try {
  107. close();
  108. } catch (...) {}
  109. }
  110. }
  111. void DataFileWriterBase::close() {
  112. flush();
  113. stream_.reset();
  114. }
  115. void DataFileWriterBase::sync() {
  116. encoderPtr_->flush();
  117. encoderPtr_->init(*stream_);
  118. avro::encode(*encoderPtr_, objectCount_);
  119. if (codec_ == NULL_CODEC) {
  120. int64_t byteCount = buffer_->byteCount();
  121. avro::encode(*encoderPtr_, byteCount);
  122. encoderPtr_->flush();
  123. std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
  124. copy(*in, *stream_);
  125. } else if (codec_ == DEFLATE_CODEC) {
  126. std::vector<char> buf;
  127. {
  128. boost::iostreams::filtering_ostream os;
  129. os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
  130. os.push(boost::iostreams::back_inserter(buf));
  131. const uint8_t *data;
  132. size_t len;
  133. std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
  134. while (input->next(&data, &len)) {
  135. boost::iostreams::write(os, reinterpret_cast<const char *>(data), len);
  136. }
  137. } // make sure all is flushed
  138. std::unique_ptr<InputStream> in = memoryInputStream(
  139. reinterpret_cast<const uint8_t *>(buf.data()), buf.size());
  140. int64_t byteCount = buf.size();
  141. avro::encode(*encoderPtr_, byteCount);
  142. encoderPtr_->flush();
  143. copy(*in, *stream_);
  144. #ifdef SNAPPY_CODEC_AVAILABLE
  145. } else if (codec_ == SNAPPY_CODEC) {
  146. std::vector<char> temp;
  147. std::string compressed;
  148. boost::crc_32_type crc;
  149. {
  150. boost::iostreams::filtering_ostream os;
  151. os.push(boost::iostreams::back_inserter(temp));
  152. const uint8_t *data;
  153. size_t len;
  154. std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
  155. while (input->next(&data, &len)) {
  156. boost::iostreams::write(os, reinterpret_cast<const char *>(data),
  157. len);
  158. }
  159. } // make sure all is flushed
  160. crc.process_bytes(reinterpret_cast<const char *>(temp.data()),
  161. temp.size());
  162. // For Snappy, add the CRC32 checksum
  163. int32_t checksum = crc();
  164. // Now compress
  165. size_t compressed_size = snappy::Compress(
  166. reinterpret_cast<const char *>(temp.data()), temp.size(),
  167. &compressed);
  168. temp.clear();
  169. {
  170. boost::iostreams::filtering_ostream os;
  171. os.push(boost::iostreams::back_inserter(temp));
  172. boost::iostreams::write(os, compressed.c_str(), compressed_size);
  173. }
  174. temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
  175. temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
  176. temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
  177. temp.push_back(static_cast<char>(checksum & 0xFF));
  178. std::unique_ptr<InputStream> in = memoryInputStream(
  179. reinterpret_cast<const uint8_t *>(temp.data()), temp.size());
  180. int64_t byteCount = temp.size();
  181. avro::encode(*encoderPtr_, byteCount);
  182. encoderPtr_->flush();
  183. copy(*in, *stream_);
  184. #endif
  185. }
  186. encoderPtr_->init(*stream_);
  187. avro::encode(*encoderPtr_, sync_);
  188. encoderPtr_->flush();
  189. lastSync_ = stream_->byteCount();
  190. buffer_ = memoryOutputStream();
  191. encoderPtr_->init(*buffer_);
  192. objectCount_ = 0;
  193. }
  194. void DataFileWriterBase::syncIfNeeded() {
  195. encoderPtr_->flush();
  196. if (buffer_->byteCount() >= syncInterval_) {
  197. sync();
  198. }
  199. }
  200. uint64_t DataFileWriterBase::getCurrentBlockStart() const {
  201. return lastSync_;
  202. }
  203. void DataFileWriterBase::flush() {
  204. sync();
  205. }
  206. DataFileSync DataFileWriterBase::makeSync() {
  207. boost::mt19937 random(static_cast<uint32_t>(time(nullptr)));
  208. DataFileSync sync;
  209. std::generate(sync.begin(), sync.end(), random);
  210. return sync;
  211. }
  212. typedef array<uint8_t, 4> Magic;
  213. static Magic magic = {{'O', 'b', 'j', '\x01'}};
  214. void DataFileWriterBase::writeHeader() {
  215. encoderPtr_->init(*stream_);
  216. avro::encode(*encoderPtr_, magic);
  217. avro::encode(*encoderPtr_, metadata_);
  218. avro::encode(*encoderPtr_, sync_);
  219. encoderPtr_->flush();
  220. }
  221. void DataFileWriterBase::setMetadata(const string &key, const string &value) {
  222. vector<uint8_t> v(value.size());
  223. copy(value.begin(), value.end(), v.begin());
  224. metadata_[key] = v;
  225. }
  226. DataFileReaderBase::DataFileReaderBase(const char *filename) : filename_(filename), stream_(fileSeekableInputStream(filename)),
  227. decoder_(binaryDecoder()), objectCount_(0), eof_(false),
  228. codec_(NULL_CODEC), blockStart_(-1), blockEnd_(-1) {
  229. readHeader();
  230. }
  231. DataFileReaderBase::DataFileReaderBase(std::unique_ptr<InputStream> inputStream) : stream_(std::move(inputStream)),
  232. decoder_(binaryDecoder()), objectCount_(0), eof_(false), codec_(NULL_CODEC) {
  233. readHeader();
  234. }
  235. void DataFileReaderBase::init() {
  236. readerSchema_ = dataSchema_;
  237. dataDecoder_ = binaryDecoder();
  238. readDataBlock();
  239. }
  240. void DataFileReaderBase::init(const ValidSchema &readerSchema) {
  241. readerSchema_ = readerSchema;
  242. dataDecoder_ = (readerSchema_.toJson(true) != dataSchema_.toJson(true)) ? resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) : binaryDecoder();
  243. readDataBlock();
  244. }
  245. static void drain(InputStream &in) {
  246. const uint8_t *p = nullptr;
  247. size_t n = 0;
  248. while (in.next(&p, &n))
  249. ;
  250. }
  251. char hex(unsigned int x) {
  252. return static_cast<char>(x + (x < 10 ? '0' : ('a' - 10)));
  253. }
  254. std::ostream &operator<<(std::ostream &os, const DataFileSync &s) {
  255. for (uint8_t i : s) {
  256. os << hex(i / 16) << hex(i % 16) << ' ';
  257. }
  258. os << std::endl;
  259. return os;
  260. }
  261. bool DataFileReaderBase::hasMore() {
  262. for (;;) {
  263. if (eof_) {
  264. return false;
  265. } else if (objectCount_ != 0) {
  266. return true;
  267. }
  268. dataDecoder_->init(*dataStream_);
  269. drain(*dataStream_);
  270. DataFileSync s;
  271. decoder_->init(*stream_);
  272. avro::decode(*decoder_, s);
  273. if (s != sync_) {
  274. throw Exception("Sync mismatch");
  275. }
  276. readDataBlock();
  277. }
  278. }
  279. class BoundedInputStream : public InputStream {
  280. InputStream &in_;
  281. size_t limit_;
  282. bool next(const uint8_t **data, size_t *len) final {
  283. if (limit_ != 0 && in_.next(data, len)) {
  284. if (*len > limit_) {
  285. in_.backup(*len - limit_);
  286. *len = limit_;
  287. }
  288. limit_ -= *len;
  289. return true;
  290. }
  291. return false;
  292. }
  293. void backup(size_t len) final {
  294. in_.backup(len);
  295. limit_ += len;
  296. }
  297. void skip(size_t len) final {
  298. if (len > limit_) {
  299. len = limit_;
  300. }
  301. in_.skip(len);
  302. limit_ -= len;
  303. }
  304. size_t byteCount() const final {
  305. return in_.byteCount();
  306. }
  307. public:
  308. BoundedInputStream(InputStream &in, size_t limit) : in_(in), limit_(limit) {}
  309. };
  310. unique_ptr<InputStream> boundedInputStream(InputStream &in, size_t limit) {
  311. return unique_ptr<InputStream>(new BoundedInputStream(in, limit));
  312. }
  313. void DataFileReaderBase::readDataBlock() {
  314. decoder_->init(*stream_);
  315. blockStart_ = stream_->byteCount();
  316. const uint8_t *p = nullptr;
  317. size_t n = 0;
  318. if (!stream_->next(&p, &n)) {
  319. eof_ = true;
  320. return;
  321. }
  322. stream_->backup(n);
  323. avro::decode(*decoder_, objectCount_);
  324. int64_t byteCount;
  325. avro::decode(*decoder_, byteCount);
  326. decoder_->init(*stream_);
  327. blockEnd_ = stream_->byteCount() + byteCount;
  328. unique_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
  329. if (codec_ == NULL_CODEC) {
  330. dataDecoder_->init(*st);
  331. dataStream_ = std::move(st);
  332. #ifdef SNAPPY_CODEC_AVAILABLE
  333. } else if (codec_ == SNAPPY_CODEC) {
  334. boost::crc_32_type crc;
  335. uint32_t checksum = 0;
  336. compressed_.clear();
  337. uncompressed.clear();
  338. const uint8_t *data;
  339. size_t len;
  340. while (st->next(&data, &len)) {
  341. compressed_.insert(compressed_.end(), data, data + len);
  342. }
  343. len = compressed_.size();
  344. if (len < 4)
  345. throw Exception("Cannot read compressed data, expected at least 4 bytes, got " + std::to_string(len));
  346. int b1 = compressed_[len - 4] & 0xFF;
  347. int b2 = compressed_[len - 3] & 0xFF;
  348. int b3 = compressed_[len - 2] & 0xFF;
  349. int b4 = compressed_[len - 1] & 0xFF;
  350. checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
  351. if (!snappy::Uncompress(reinterpret_cast<const char *>(compressed_.data()),
  352. len - 4, &uncompressed)) {
  353. throw Exception(
  354. "Snappy Compression reported an error when decompressing");
  355. }
  356. crc.process_bytes(uncompressed.c_str(), uncompressed.size());
  357. uint32_t c = crc();
  358. if (checksum != c) {
  359. throw Exception(
  360. "Checksum did not match for Snappy compression: Expected: {}, computed: {}",
  361. checksum, c);
  362. }
  363. os_.reset(new boost::iostreams::filtering_istream());
  364. os_->push(
  365. boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
  366. uncompressed.size()));
  367. std::unique_ptr<InputStream> in = istreamInputStream(*os_);
  368. dataDecoder_->init(*in);
  369. dataStream_ = std::move(in);
  370. #endif
  371. } else {
  372. compressed_.clear();
  373. const uint8_t *data;
  374. size_t len;
  375. while (st->next(&data, &len)) {
  376. compressed_.insert(compressed_.end(), data, data + len);
  377. }
  378. os_.reset(new boost::iostreams::filtering_istream());
  379. os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
  380. os_->push(boost::iostreams::basic_array_source<char>(
  381. compressed_.data(), compressed_.size()));
  382. std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
  383. dataDecoder_->init(*in);
  384. dataStream_ = std::move(in);
  385. }
  386. }
  387. void DataFileReaderBase::close() {
  388. }
  389. static string toString(const vector<uint8_t> &v) {
  390. string result;
  391. result.resize(v.size());
  392. copy(v.begin(), v.end(), result.begin());
  393. return result;
  394. }
  395. static ValidSchema makeSchema(const vector<uint8_t> &v) {
  396. istringstream iss(toString(v));
  397. ValidSchema vs;
  398. compileJsonSchema(iss, vs);
  399. return vs;
  400. }
  401. void DataFileReaderBase::readHeader() {
  402. decoder_->init(*stream_);
  403. Magic m;
  404. avro::decode(*decoder_, m);
  405. if (magic != m) {
  406. throw Exception("Invalid data file. Magic does not match: "
  407. + filename_);
  408. }
  409. avro::decode(*decoder_, metadata_);
  410. Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY);
  411. if (it == metadata_.end()) {
  412. throw Exception("No schema in metadata");
  413. }
  414. dataSchema_ = makeSchema(it->second);
  415. if (!readerSchema_.root()) {
  416. readerSchema_ = dataSchema();
  417. }
  418. it = metadata_.find(AVRO_CODEC_KEY);
  419. if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
  420. codec_ = DEFLATE_CODEC;
  421. #ifdef SNAPPY_CODEC_AVAILABLE
  422. } else if (it != metadata_.end()
  423. && toString(it->second) == AVRO_SNAPPY_CODEC) {
  424. codec_ = SNAPPY_CODEC;
  425. #endif
  426. } else {
  427. codec_ = NULL_CODEC;
  428. if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
  429. throw Exception("Unknown codec in data file: " + toString(it->second));
  430. }
  431. }
  432. avro::decode(*decoder_, sync_);
  433. decoder_->init(*stream_);
  434. blockStart_ = stream_->byteCount();
  435. }
  436. void DataFileReaderBase::doSeek(int64_t position) {
  437. if (auto *ss = dynamic_cast<SeekableInputStream *>(stream_.get())) {
  438. if (!eof_) {
  439. dataDecoder_->init(*dataStream_);
  440. drain(*dataStream_);
  441. }
  442. decoder_->init(*stream_);
  443. ss->seek(position);
  444. eof_ = false;
  445. } else {
  446. throw Exception("seek not supported on non-SeekableInputStream");
  447. }
  448. }
  449. void DataFileReaderBase::seek(int64_t position) {
  450. doSeek(position);
  451. readDataBlock();
  452. }
  453. void DataFileReaderBase::sync(int64_t position) {
  454. doSeek(position);
  455. DataFileSync sync_buffer;
  456. const uint8_t *p = nullptr;
  457. size_t n = 0;
  458. size_t i = 0;
  459. while (i < SyncSize) {
  460. if (n == 0 && !stream_->next(&p, &n)) {
  461. eof_ = true;
  462. return;
  463. }
  464. size_t len = std::min(SyncSize - i, n);
  465. memcpy(&sync_buffer[i], p, len);
  466. p += len;
  467. n -= len;
  468. i += len;
  469. }
  470. for (;;) {
  471. size_t j = 0;
  472. for (; j < SyncSize; ++j) {
  473. if (sync_[j] != sync_buffer[(i + j) % SyncSize]) {
  474. break;
  475. }
  476. }
  477. if (j == SyncSize) {
  478. // Found the sync marker!
  479. break;
  480. }
  481. if (n == 0 && !stream_->next(&p, &n)) {
  482. eof_ = true;
  483. return;
  484. }
  485. sync_buffer[i++ % SyncSize] = *p++;
  486. --n;
  487. }
  488. stream_->backup(n);
  489. readDataBlock();
  490. }
  491. bool DataFileReaderBase::pastSync(int64_t position) {
  492. return !hasMore() || blockStart_ >= position + SyncSize;
  493. }
  494. int64_t DataFileReaderBase::previousSync() const {
  495. return blockStart_;
  496. }
  497. } // namespace avro