write_batch.cc 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // WriteBatch::rep_ :=
  6. // sequence: fixed64
  7. // count: fixed32
  8. // data: record[count]
  9. // record :=
  10. // kTypeValue varstring varstring |
  11. // kTypeDeletion varstring
  12. // varstring :=
  13. // len: varint32
  14. // data: uint8[len]
  15. #include "leveldb/write_batch.h"
  16. #include "db/dbformat.h"
  17. #include "db/memtable.h"
  18. #include "db/write_batch_internal.h"
  19. #include "leveldb/db.h"
  20. #include "util/coding.h"
  21. namespace leveldb {
  22. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
  23. static const size_t kHeader = 12;
  24. WriteBatch::WriteBatch() { Clear(); }
  25. WriteBatch::~WriteBatch() = default;
  26. WriteBatch::Handler::~Handler() = default;
  27. void WriteBatch::Clear() {
  28. rep_.clear();
  29. rep_.resize(kHeader);
  30. }
  31. size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
  32. Status WriteBatch::Iterate(Handler* handler) const {
  33. Slice input(rep_);
  34. if (input.size() < kHeader) {
  35. return Status::Corruption("malformed WriteBatch (too small)");
  36. }
  37. input.remove_prefix(kHeader);
  38. Slice key, value;
  39. int found = 0;
  40. while (!input.empty()) {
  41. found++;
  42. char tag = input[0];
  43. input.remove_prefix(1);
  44. switch (tag) {
  45. case kTypeValue:
  46. if (GetLengthPrefixedSlice(&input, &key) &&
  47. GetLengthPrefixedSlice(&input, &value)) {
  48. handler->Put(key, value);
  49. } else {
  50. return Status::Corruption("bad WriteBatch Put");
  51. }
  52. break;
  53. case kTypeDeletion:
  54. if (GetLengthPrefixedSlice(&input, &key)) {
  55. handler->Delete(key);
  56. } else {
  57. return Status::Corruption("bad WriteBatch Delete");
  58. }
  59. break;
  60. default:
  61. return Status::Corruption("unknown WriteBatch tag");
  62. }
  63. }
  64. if (found != WriteBatchInternal::Count(this)) {
  65. return Status::Corruption("WriteBatch has wrong count");
  66. } else {
  67. return Status::OK();
  68. }
  69. }
  70. int WriteBatchInternal::Count(const WriteBatch* b) {
  71. return DecodeFixed32(b->rep_.data() + 8);
  72. }
  73. void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  74. EncodeFixed32(&b->rep_[8], n);
  75. }
  76. SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  77. return SequenceNumber(DecodeFixed64(b->rep_.data()));
  78. }
  79. void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  80. EncodeFixed64(&b->rep_[0], seq);
  81. }
  82. void WriteBatch::Put(const Slice& key, const Slice& value) {
  83. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  84. rep_.push_back(static_cast<char>(kTypeValue));
  85. PutLengthPrefixedSlice(&rep_, key);
  86. PutLengthPrefixedSlice(&rep_, value);
  87. }
  88. void WriteBatch::Delete(const Slice& key) {
  89. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  90. rep_.push_back(static_cast<char>(kTypeDeletion));
  91. PutLengthPrefixedSlice(&rep_, key);
  92. }
  93. void WriteBatch::Append(const WriteBatch& source) {
  94. WriteBatchInternal::Append(this, &source);
  95. }
  96. namespace {
  97. class MemTableInserter : public WriteBatch::Handler {
  98. public:
  99. SequenceNumber sequence_;
  100. MemTable* mem_;
  101. void Put(const Slice& key, const Slice& value) override {
  102. mem_->Add(sequence_, kTypeValue, key, value);
  103. sequence_++;
  104. }
  105. void Delete(const Slice& key) override {
  106. mem_->Add(sequence_, kTypeDeletion, key, Slice());
  107. sequence_++;
  108. }
  109. };
  110. } // namespace
  111. Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
  112. MemTableInserter inserter;
  113. inserter.sequence_ = WriteBatchInternal::Sequence(b);
  114. inserter.mem_ = memtable;
  115. return b->Iterate(&inserter);
  116. }
  117. void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
  118. assert(contents.size() >= kHeader);
  119. b->rep_.assign(contents.data(), contents.size());
  120. }
  121. void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  122. SetCount(dst, Count(dst) + Count(src));
  123. assert(src->rep_.size() >= kHeader);
  124. dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
  125. }
  126. } // namespace leveldb