123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- #include "gtest/gtest.h"
- #include "db/log_reader.h"
- #include "db/log_writer.h"
- #include "leveldb/env.h"
- #include "util/coding.h"
- #include "util/crc32c.h"
- #include "util/random.h"
- namespace leveldb {
- namespace log {
- // Construct a string of the specified length made out of the supplied
- // partial string.
- static std::string BigString(const std::string& partial_string, size_t n) {
- std::string result;
- while (result.size() < n) {
- result.append(partial_string);
- }
- result.resize(n);
- return result;
- }
- // Construct a string from a number
- static std::string NumberString(int n) {
- char buf[50];
- std::snprintf(buf, sizeof(buf), "%d.", n);
- return std::string(buf);
- }
- // Return a skewed potentially long string
- static std::string RandomSkewedString(int i, Random* rnd) {
- return BigString(NumberString(i), rnd->Skewed(17));
- }
- class LogTest : public testing::Test {
- public:
- LogTest()
- : reading_(false),
- writer_(new Writer(&dest_)),
- reader_(new Reader(&source_, &report_, true /*checksum*/,
- 0 /*initial_offset*/)) {}
- ~LogTest() {
- delete writer_;
- delete reader_;
- }
- void ReopenForAppend() {
- delete writer_;
- writer_ = new Writer(&dest_, dest_.contents_.size());
- }
- void Write(const std::string& msg) {
- ASSERT_TRUE(!reading_) << "Write() after starting to read";
- writer_->AddRecord(Slice(msg));
- }
- size_t WrittenBytes() const { return dest_.contents_.size(); }
- std::string Read() {
- if (!reading_) {
- reading_ = true;
- source_.contents_ = Slice(dest_.contents_);
- }
- std::string scratch;
- Slice record;
- if (reader_->ReadRecord(&record, &scratch)) {
- return record.ToString();
- } else {
- return "EOF";
- }
- }
- void IncrementByte(int offset, int delta) {
- dest_.contents_[offset] += delta;
- }
- void SetByte(int offset, char new_byte) {
- dest_.contents_[offset] = new_byte;
- }
- void ShrinkSize(int bytes) {
- dest_.contents_.resize(dest_.contents_.size() - bytes);
- }
- void FixChecksum(int header_offset, int len) {
- // Compute crc of type/len/data
- uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len);
- crc = crc32c::Mask(crc);
- EncodeFixed32(&dest_.contents_[header_offset], crc);
- }
- void ForceError() { source_.force_error_ = true; }
- size_t DroppedBytes() const { return report_.dropped_bytes_; }
- std::string ReportMessage() const { return report_.message_; }
- // Returns OK iff recorded error message contains "msg"
- std::string MatchError(const std::string& msg) const {
- if (report_.message_.find(msg) == std::string::npos) {
- return report_.message_;
- } else {
- return "OK";
- }
- }
- void WriteInitialOffsetLog() {
- for (int i = 0; i < num_initial_offset_records_; i++) {
- std::string record(initial_offset_record_sizes_[i],
- static_cast<char>('a' + i));
- Write(record);
- }
- }
- void StartReadingAt(uint64_t initial_offset) {
- delete reader_;
- reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset);
- }
- void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
- WriteInitialOffsetLog();
- reading_ = true;
- source_.contents_ = Slice(dest_.contents_);
- Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/,
- WrittenBytes() + offset_past_end);
- Slice record;
- std::string scratch;
- ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
- delete offset_reader;
- }
- void CheckInitialOffsetRecord(uint64_t initial_offset,
- int expected_record_offset) {
- WriteInitialOffsetLog();
- reading_ = true;
- source_.contents_ = Slice(dest_.contents_);
- Reader* offset_reader =
- new Reader(&source_, &report_, true /*checksum*/, initial_offset);
- // Read all records from expected_record_offset through the last one.
- ASSERT_LT(expected_record_offset, num_initial_offset_records_);
- for (; expected_record_offset < num_initial_offset_records_;
- ++expected_record_offset) {
- Slice record;
- std::string scratch;
- ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
- ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
- record.size());
- ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
- offset_reader->LastRecordOffset());
- ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
- }
- delete offset_reader;
- }
- private:
- class StringDest : public WritableFile {
- public:
- Status Close() override { return Status::OK(); }
- Status Flush() override { return Status::OK(); }
- Status Sync() override { return Status::OK(); }
- Status Append(const Slice& slice) override {
- contents_.append(slice.data(), slice.size());
- return Status::OK();
- }
- std::string contents_;
- };
- class StringSource : public SequentialFile {
- public:
- StringSource() : force_error_(false), returned_partial_(false) {}
- Status Read(size_t n, Slice* result, char* scratch) override {
- EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
- if (force_error_) {
- force_error_ = false;
- returned_partial_ = true;
- return Status::Corruption("read error");
- }
- if (contents_.size() < n) {
- n = contents_.size();
- returned_partial_ = true;
- }
- *result = Slice(contents_.data(), n);
- contents_.remove_prefix(n);
- return Status::OK();
- }
- Status Skip(uint64_t n) override {
- if (n > contents_.size()) {
- contents_.clear();
- return Status::NotFound("in-memory file skipped past end");
- }
- contents_.remove_prefix(n);
- return Status::OK();
- }
- Slice contents_;
- bool force_error_;
- bool returned_partial_;
- };
- class ReportCollector : public Reader::Reporter {
- public:
- ReportCollector() : dropped_bytes_(0) {}
- void Corruption(size_t bytes, const Status& status) override {
- dropped_bytes_ += bytes;
- message_.append(status.ToString());
- }
- size_t dropped_bytes_;
- std::string message_;
- };
- // Record metadata for testing initial offset functionality
- static size_t initial_offset_record_sizes_[];
- static uint64_t initial_offset_last_record_offsets_[];
- static int num_initial_offset_records_;
- StringDest dest_;
- StringSource source_;
- ReportCollector report_;
- bool reading_;
- Writer* writer_;
- Reader* reader_;
- };
- size_t LogTest::initial_offset_record_sizes_[] = {
- 10000, // Two sizable records in first block
- 10000,
- 2 * log::kBlockSize - 1000, // Span three blocks
- 1,
- 13716, // Consume all but two bytes of block 3.
- log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
- };
- uint64_t LogTest::initial_offset_last_record_offsets_[] = {
- 0,
- kHeaderSize + 10000,
- 2 * (kHeaderSize + 10000),
- 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
- 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
- kHeaderSize + 1,
- 3 * log::kBlockSize,
- };
- // LogTest::initial_offset_last_record_offsets_ must be defined before this.
- int LogTest::num_initial_offset_records_ =
- sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t);
- TEST_F(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
- TEST_F(LogTest, ReadWrite) {
- Write("foo");
- Write("bar");
- Write("");
- Write("xxxx");
- ASSERT_EQ("foo", Read());
- ASSERT_EQ("bar", Read());
- ASSERT_EQ("", Read());
- ASSERT_EQ("xxxx", Read());
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
- }
- TEST_F(LogTest, ManyBlocks) {
- for (int i = 0; i < 100000; i++) {
- Write(NumberString(i));
- }
- for (int i = 0; i < 100000; i++) {
- ASSERT_EQ(NumberString(i), Read());
- }
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, Fragmentation) {
- Write("small");
- Write(BigString("medium", 50000));
- Write(BigString("large", 100000));
- ASSERT_EQ("small", Read());
- ASSERT_EQ(BigString("medium", 50000), Read());
- ASSERT_EQ(BigString("large", 100000), Read());
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, MarginalTrailer) {
- // Make a trailer that is exactly the same length as an empty record.
- const int n = kBlockSize - 2 * kHeaderSize;
- Write(BigString("foo", n));
- ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
- Write("");
- Write("bar");
- ASSERT_EQ(BigString("foo", n), Read());
- ASSERT_EQ("", Read());
- ASSERT_EQ("bar", Read());
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, MarginalTrailer2) {
- // Make a trailer that is exactly the same length as an empty record.
- const int n = kBlockSize - 2 * kHeaderSize;
- Write(BigString("foo", n));
- ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
- Write("bar");
- ASSERT_EQ(BigString("foo", n), Read());
- ASSERT_EQ("bar", Read());
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(0, DroppedBytes());
- ASSERT_EQ("", ReportMessage());
- }
- TEST_F(LogTest, ShortTrailer) {
- const int n = kBlockSize - 2 * kHeaderSize + 4;
- Write(BigString("foo", n));
- ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
- Write("");
- Write("bar");
- ASSERT_EQ(BigString("foo", n), Read());
- ASSERT_EQ("", Read());
- ASSERT_EQ("bar", Read());
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, AlignedEof) {
- const int n = kBlockSize - 2 * kHeaderSize + 4;
- Write(BigString("foo", n));
- ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
- ASSERT_EQ(BigString("foo", n), Read());
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, OpenForAppend) {
- Write("hello");
- ReopenForAppend();
- Write("world");
- ASSERT_EQ("hello", Read());
- ASSERT_EQ("world", Read());
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, RandomRead) {
- const int N = 500;
- Random write_rnd(301);
- for (int i = 0; i < N; i++) {
- Write(RandomSkewedString(i, &write_rnd));
- }
- Random read_rnd(301);
- for (int i = 0; i < N; i++) {
- ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
- }
- ASSERT_EQ("EOF", Read());
- }
- // Tests of all the error paths in log_reader.cc follow:
- TEST_F(LogTest, ReadError) {
- Write("foo");
- ForceError();
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(kBlockSize, DroppedBytes());
- ASSERT_EQ("OK", MatchError("read error"));
- }
- TEST_F(LogTest, BadRecordType) {
- Write("foo");
- // Type is stored in header[6]
- IncrementByte(6, 100);
- FixChecksum(0, 3);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(3, DroppedBytes());
- ASSERT_EQ("OK", MatchError("unknown record type"));
- }
- TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
- Write("foo");
- ShrinkSize(4); // Drop all payload as well as a header byte
- ASSERT_EQ("EOF", Read());
- // Truncated last record is ignored, not treated as an error.
- ASSERT_EQ(0, DroppedBytes());
- ASSERT_EQ("", ReportMessage());
- }
- TEST_F(LogTest, BadLength) {
- const int kPayloadSize = kBlockSize - kHeaderSize;
- Write(BigString("bar", kPayloadSize));
- Write("foo");
- // Least significant size byte is stored in header[4].
- IncrementByte(4, 1);
- ASSERT_EQ("foo", Read());
- ASSERT_EQ(kBlockSize, DroppedBytes());
- ASSERT_EQ("OK", MatchError("bad record length"));
- }
- TEST_F(LogTest, BadLengthAtEndIsIgnored) {
- Write("foo");
- ShrinkSize(1);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(0, DroppedBytes());
- ASSERT_EQ("", ReportMessage());
- }
- TEST_F(LogTest, ChecksumMismatch) {
- Write("foo");
- IncrementByte(0, 10);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(10, DroppedBytes());
- ASSERT_EQ("OK", MatchError("checksum mismatch"));
- }
- TEST_F(LogTest, UnexpectedMiddleType) {
- Write("foo");
- SetByte(6, kMiddleType);
- FixChecksum(0, 3);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(3, DroppedBytes());
- ASSERT_EQ("OK", MatchError("missing start"));
- }
- TEST_F(LogTest, UnexpectedLastType) {
- Write("foo");
- SetByte(6, kLastType);
- FixChecksum(0, 3);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(3, DroppedBytes());
- ASSERT_EQ("OK", MatchError("missing start"));
- }
- TEST_F(LogTest, UnexpectedFullType) {
- Write("foo");
- Write("bar");
- SetByte(6, kFirstType);
- FixChecksum(0, 3);
- ASSERT_EQ("bar", Read());
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(3, DroppedBytes());
- ASSERT_EQ("OK", MatchError("partial record without end"));
- }
- TEST_F(LogTest, UnexpectedFirstType) {
- Write("foo");
- Write(BigString("bar", 100000));
- SetByte(6, kFirstType);
- FixChecksum(0, 3);
- ASSERT_EQ(BigString("bar", 100000), Read());
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ(3, DroppedBytes());
- ASSERT_EQ("OK", MatchError("partial record without end"));
- }
- TEST_F(LogTest, MissingLastIsIgnored) {
- Write(BigString("bar", kBlockSize));
- // Remove the LAST block, including header.
- ShrinkSize(14);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ("", ReportMessage());
- ASSERT_EQ(0, DroppedBytes());
- }
- TEST_F(LogTest, PartialLastIsIgnored) {
- Write(BigString("bar", kBlockSize));
- // Cause a bad record length in the LAST block.
- ShrinkSize(1);
- ASSERT_EQ("EOF", Read());
- ASSERT_EQ("", ReportMessage());
- ASSERT_EQ(0, DroppedBytes());
- }
- TEST_F(LogTest, SkipIntoMultiRecord) {
- // Consider a fragmented record:
- // first(R1), middle(R1), last(R1), first(R2)
- // If initial_offset points to a record after first(R1) but before first(R2)
- // incomplete fragment errors are not actual errors, and must be suppressed
- // until a new first or full record is encountered.
- Write(BigString("foo", 3 * kBlockSize));
- Write("correct");
- StartReadingAt(kBlockSize);
- ASSERT_EQ("correct", Read());
- ASSERT_EQ("", ReportMessage());
- ASSERT_EQ(0, DroppedBytes());
- ASSERT_EQ("EOF", Read());
- }
- TEST_F(LogTest, ErrorJoinsRecords) {
- // Consider two fragmented records:
- // first(R1) last(R1) first(R2) last(R2)
- // where the middle two fragments disappear. We do not want
- // first(R1),last(R2) to get joined and returned as a valid record.
- // Write records that span two blocks
- Write(BigString("foo", kBlockSize));
- Write(BigString("bar", kBlockSize));
- Write("correct");
- // Wipe the middle block
- for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
- SetByte(offset, 'x');
- }
- ASSERT_EQ("correct", Read());
- ASSERT_EQ("EOF", Read());
- const size_t dropped = DroppedBytes();
- ASSERT_LE(dropped, 2 * kBlockSize + 100);
- ASSERT_GE(dropped, 2 * kBlockSize);
- }
- TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
- TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
- TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
- TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
- TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
- TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
- TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
- TEST_F(LogTest, ReadFourthFirstBlockTrailer) {
- CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
- }
- TEST_F(LogTest, ReadFourthMiddleBlock) {
- CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
- }
- TEST_F(LogTest, ReadFourthLastBlock) {
- CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
- }
- TEST_F(LogTest, ReadFourthStart) {
- CheckInitialOffsetRecord(
- 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
- 3);
- }
- TEST_F(LogTest, ReadInitialOffsetIntoBlockPadding) {
- CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
- }
- TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
- TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
- } // namespace log
- } // namespace leveldb
|