write_batch.cc 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // WriteBatch::rep_ :=
  6. // sequence: fixed64
  7. // count: fixed32
  8. // data: record[count]
  9. // record :=
  10. // kTypeValue varstring varstring |
  11. // kTypeDeletion varstring
  12. // varstring :=
  13. // len: varint32
  14. // data: uint8[len]
  15. #include "leveldb/write_batch.h"
  16. #include "leveldb/db.h"
  17. #include "db/dbformat.h"
  18. #include "db/memtable.h"
  19. #include "db/write_batch_internal.h"
  20. #include "util/coding.h"
  21. namespace leveldb {
  22. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
  23. static const size_t kHeader = 12;
  24. WriteBatch::WriteBatch() {
  25. Clear();
  26. }
  27. WriteBatch::~WriteBatch() { }
  28. WriteBatch::Handler::~Handler() { }
  29. void WriteBatch::Clear() {
  30. rep_.clear();
  31. rep_.resize(kHeader);
  32. }
  33. size_t WriteBatch::ApproximateSize() const {
  34. return rep_.size();
  35. }
  36. Status WriteBatch::Iterate(Handler* handler) const {
  37. Slice input(rep_);
  38. if (input.size() < kHeader) {
  39. return Status::Corruption("malformed WriteBatch (too small)");
  40. }
  41. input.remove_prefix(kHeader);
  42. Slice key, value;
  43. int found = 0;
  44. while (!input.empty()) {
  45. found++;
  46. char tag = input[0];
  47. input.remove_prefix(1);
  48. switch (tag) {
  49. case kTypeValue:
  50. if (GetLengthPrefixedSlice(&input, &key) &&
  51. GetLengthPrefixedSlice(&input, &value)) {
  52. handler->Put(key, value);
  53. } else {
  54. return Status::Corruption("bad WriteBatch Put");
  55. }
  56. break;
  57. case kTypeDeletion:
  58. if (GetLengthPrefixedSlice(&input, &key)) {
  59. handler->Delete(key);
  60. } else {
  61. return Status::Corruption("bad WriteBatch Delete");
  62. }
  63. break;
  64. default:
  65. return Status::Corruption("unknown WriteBatch tag");
  66. }
  67. }
  68. if (found != WriteBatchInternal::Count(this)) {
  69. return Status::Corruption("WriteBatch has wrong count");
  70. } else {
  71. return Status::OK();
  72. }
  73. }
  74. int WriteBatchInternal::Count(const WriteBatch* b) {
  75. return DecodeFixed32(b->rep_.data() + 8);
  76. }
  77. void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  78. EncodeFixed32(&b->rep_[8], n);
  79. }
  80. SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  81. return SequenceNumber(DecodeFixed64(b->rep_.data()));
  82. }
  83. void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  84. EncodeFixed64(&b->rep_[0], seq);
  85. }
  86. void WriteBatch::Put(const Slice& key, const Slice& value) {
  87. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  88. rep_.push_back(static_cast<char>(kTypeValue));
  89. PutLengthPrefixedSlice(&rep_, key);
  90. PutLengthPrefixedSlice(&rep_, value);
  91. }
  92. void WriteBatch::Delete(const Slice& key) {
  93. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  94. rep_.push_back(static_cast<char>(kTypeDeletion));
  95. PutLengthPrefixedSlice(&rep_, key);
  96. }
  97. void WriteBatch::Append(const WriteBatch &source) {
  98. WriteBatchInternal::Append(this, &source);
  99. }
  100. namespace {
  101. class MemTableInserter : public WriteBatch::Handler {
  102. public:
  103. SequenceNumber sequence_;
  104. MemTable* mem_;
  105. virtual void Put(const Slice& key, const Slice& value) {
  106. mem_->Add(sequence_, kTypeValue, key, value);
  107. sequence_++;
  108. }
  109. virtual void Delete(const Slice& key) {
  110. mem_->Add(sequence_, kTypeDeletion, key, Slice());
  111. sequence_++;
  112. }
  113. };
  114. } // namespace
  115. Status WriteBatchInternal::InsertInto(const WriteBatch* b,
  116. MemTable* memtable) {
  117. MemTableInserter inserter;
  118. inserter.sequence_ = WriteBatchInternal::Sequence(b);
  119. inserter.mem_ = memtable;
  120. return b->Iterate(&inserter);
  121. }
  122. void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
  123. assert(contents.size() >= kHeader);
  124. b->rep_.assign(contents.data(), contents.size());
  125. }
  126. void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  127. SetCount(dst, Count(dst) + Count(src));
  128. assert(src->rep_.size() >= kHeader);
  129. dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
  130. }
  131. } // namespace leveldb