table_builder.cc 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "leveldb/table_builder.h"
  5. #include <cassert>
  6. #include "leveldb/comparator.h"
  7. #include "leveldb/env.h"
  8. #include "leveldb/filter_policy.h"
  9. #include "leveldb/options.h"
  10. #include "table/block_builder.h"
  11. #include "table/filter_block.h"
  12. #include "table/format.h"
  13. #include "util/coding.h"
  14. #include "util/crc32c.h"
  15. namespace leveldb {
  16. struct TableBuilder::Rep {
  17. Rep(const Options& opt, WritableFile* f)
  18. : options(opt),
  19. index_block_options(opt),
  20. file(f),
  21. offset(0),
  22. data_block(&options),
  23. index_block(&index_block_options),
  24. num_entries(0),
  25. closed(false),
  26. filter_block(opt.filter_policy == nullptr
  27. ? nullptr
  28. : new FilterBlockBuilder(opt.filter_policy)),
  29. pending_index_entry(false) {
  30. index_block_options.block_restart_interval = 1;
  31. }
  32. Options options;
  33. Options index_block_options;
  34. WritableFile* file;
  35. uint64_t offset;
  36. Status status;
  37. BlockBuilder data_block;
  38. BlockBuilder index_block;
  39. std::string last_key;
  40. int64_t num_entries;
  41. bool closed; // Either Finish() or Abandon() has been called.
  42. FilterBlockBuilder* filter_block;
  43. // We do not emit the index entry for a block until we have seen the
  44. // first key for the next data block. This allows us to use shorter
  45. // keys in the index block. For example, consider a block boundary
  46. // between the keys "the quick brown fox" and "the who". We can use
  47. // "the r" as the key for the index block entry since it is >= all
  48. // entries in the first block and < all entries in subsequent
  49. // blocks.
  50. //
  51. // Invariant: r->pending_index_entry is true only if data_block is empty.
  52. bool pending_index_entry;
  53. BlockHandle pending_handle; // Handle to add to index block
  54. std::string compressed_output;
  55. };
  56. TableBuilder::TableBuilder(const Options& options, WritableFile* file)
  57. : rep_(new Rep(options, file)) {
  58. if (rep_->filter_block != nullptr) {
  59. rep_->filter_block->StartBlock(0);
  60. }
  61. }
  62. TableBuilder::~TableBuilder() {
  63. assert(rep_->closed); // Catch errors where caller forgot to call Finish()
  64. delete rep_->filter_block;
  65. delete rep_;
  66. }
  67. Status TableBuilder::ChangeOptions(const Options& options) {
  68. // Note: if more fields are added to Options, update
  69. // this function to catch changes that should not be allowed to
  70. // change in the middle of building a Table.
  71. if (options.comparator != rep_->options.comparator) {
  72. return Status::InvalidArgument("changing comparator while building table");
  73. }
  74. // Note that any live BlockBuilders point to rep_->options and therefore
  75. // will automatically pick up the updated options.
  76. rep_->options = options;
  77. rep_->index_block_options = options;
  78. rep_->index_block_options.block_restart_interval = 1;
  79. return Status::OK();
  80. }
  81. void TableBuilder::Add(const Slice& key, const Slice& value) {
  82. Rep* r = rep_;
  83. assert(!r->closed);
  84. if (!ok()) return;
  85. if (r->num_entries > 0) {
  86. assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  87. }
  88. if (r->pending_index_entry) {
  89. assert(r->data_block.empty());
  90. r->options.comparator->FindShortestSeparator(&r->last_key, key);
  91. std::string handle_encoding;
  92. r->pending_handle.EncodeTo(&handle_encoding);
  93. r->index_block.Add(r->last_key, Slice(handle_encoding));
  94. r->pending_index_entry = false;
  95. }
  96. if (r->filter_block != nullptr) {
  97. r->filter_block->AddKey(key);
  98. }
  99. r->last_key.assign(key.data(), key.size());
  100. r->num_entries++;
  101. r->data_block.Add(key, value);
  102. const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  103. if (estimated_block_size >= r->options.block_size) {
  104. Flush();
  105. }
  106. }
  107. void TableBuilder::Flush() {
  108. Rep* r = rep_;
  109. assert(!r->closed);
  110. if (!ok()) return;
  111. if (r->data_block.empty()) return;
  112. assert(!r->pending_index_entry);
  113. WriteBlock(&r->data_block, &r->pending_handle);
  114. if (ok()) {
  115. r->pending_index_entry = true;
  116. r->status = r->file->Flush();
  117. }
  118. if (r->filter_block != nullptr) {
  119. r->filter_block->StartBlock(r->offset);
  120. }
  121. }
  122. void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  123. // File format contains a sequence of blocks where each block has:
  124. // block_data: uint8[n]
  125. // type: uint8
  126. // crc: uint32
  127. assert(ok());
  128. Rep* r = rep_;
  129. Slice raw = block->Finish();
  130. Slice block_contents;
  131. CompressionType type = r->options.compression;
  132. // TODO(postrelease): Support more compression options: zlib?
  133. switch (type) {
  134. case kNoCompression:
  135. block_contents = raw;
  136. break;
  137. case kSnappyCompression: {
  138. std::string* compressed = &r->compressed_output;
  139. if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
  140. compressed->size() < raw.size() - (raw.size() / 8u)) {
  141. block_contents = *compressed;
  142. } else {
  143. // Snappy not supported, or compressed less than 12.5%, so just
  144. // store uncompressed form
  145. block_contents = raw;
  146. type = kNoCompression;
  147. }
  148. break;
  149. }
  150. case kZstdCompression: {
  151. std::string* compressed = &r->compressed_output;
  152. if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
  153. raw.size(), compressed) &&
  154. compressed->size() < raw.size() - (raw.size() / 8u)) {
  155. block_contents = *compressed;
  156. } else {
  157. // Zstd not supported, or compressed less than 12.5%, so just
  158. // store uncompressed form
  159. block_contents = raw;
  160. type = kNoCompression;
  161. }
  162. break;
  163. }
  164. }
  165. WriteRawBlock(block_contents, type, handle);
  166. r->compressed_output.clear();
  167. block->Reset();
  168. }
  169. void TableBuilder::WriteRawBlock(const Slice& block_contents,
  170. CompressionType type, BlockHandle* handle) {
  171. Rep* r = rep_;
  172. handle->set_offset(r->offset);
  173. handle->set_size(block_contents.size());
  174. r->status = r->file->Append(block_contents);
  175. if (r->status.ok()) {
  176. char trailer[kBlockTrailerSize];
  177. trailer[0] = type;
  178. uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
  179. crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
  180. EncodeFixed32(trailer + 1, crc32c::Mask(crc));
  181. r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
  182. if (r->status.ok()) {
  183. r->offset += block_contents.size() + kBlockTrailerSize;
  184. }
  185. }
  186. }
  187. Status TableBuilder::status() const { return rep_->status; }
  188. Status TableBuilder::Finish() {
  189. Rep* r = rep_;
  190. Flush();
  191. assert(!r->closed);
  192. r->closed = true;
  193. BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
  194. // Write filter block
  195. if (ok() && r->filter_block != nullptr) {
  196. WriteRawBlock(r->filter_block->Finish(), kNoCompression,
  197. &filter_block_handle);
  198. }
  199. // Write metaindex block
  200. if (ok()) {
  201. BlockBuilder meta_index_block(&r->options);
  202. if (r->filter_block != nullptr) {
  203. // Add mapping from "filter.Name" to location of filter data
  204. std::string key = "filter.";
  205. key.append(r->options.filter_policy->Name());
  206. std::string handle_encoding;
  207. filter_block_handle.EncodeTo(&handle_encoding);
  208. meta_index_block.Add(key, handle_encoding);
  209. }
  210. // TODO(postrelease): Add stats and other meta blocks
  211. WriteBlock(&meta_index_block, &metaindex_block_handle);
  212. }
  213. // Write index block
  214. if (ok()) {
  215. if (r->pending_index_entry) {
  216. r->options.comparator->FindShortSuccessor(&r->last_key);
  217. std::string handle_encoding;
  218. r->pending_handle.EncodeTo(&handle_encoding);
  219. r->index_block.Add(r->last_key, Slice(handle_encoding));
  220. r->pending_index_entry = false;
  221. }
  222. WriteBlock(&r->index_block, &index_block_handle);
  223. }
  224. // Write footer
  225. if (ok()) {
  226. Footer footer;
  227. footer.set_metaindex_handle(metaindex_block_handle);
  228. footer.set_index_handle(index_block_handle);
  229. std::string footer_encoding;
  230. footer.EncodeTo(&footer_encoding);
  231. r->status = r->file->Append(footer_encoding);
  232. if (r->status.ok()) {
  233. r->offset += footer_encoding.size();
  234. }
  235. }
  236. return r->status;
  237. }
  238. void TableBuilder::Abandon() {
  239. Rep* r = rep_;
  240. assert(!r->closed);
  241. r->closed = true;
  242. }
  243. uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
  244. uint64_t TableBuilder::FileSize() const { return rep_->offset; }
  245. } // namespace leveldb