log_test.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/log_reader.h"
  5. #include "db/log_writer.h"
  6. #include "leveldb/env.h"
  7. #include "util/coding.h"
  8. #include "util/crc32c.h"
  9. #include "util/random.h"
  10. #include "util/testharness.h"
  11. namespace leveldb {
  12. namespace log {
  13. // Construct a string of the specified length made out of the supplied
  14. // partial string.
  15. static std::string BigString(const std::string& partial_string, size_t n) {
  16. std::string result;
  17. while (result.size() < n) {
  18. result.append(partial_string);
  19. }
  20. result.resize(n);
  21. return result;
  22. }
  23. // Construct a string from a number
  24. static std::string NumberString(int n) {
  25. char buf[50];
  26. snprintf(buf, sizeof(buf), "%d.", n);
  27. return std::string(buf);
  28. }
  29. // Return a skewed potentially long string
  30. static std::string RandomSkewedString(int i, Random* rnd) {
  31. return BigString(NumberString(i), rnd->Skewed(17));
  32. }
  33. class LogTest {
  34. private:
  35. class StringDest : public WritableFile {
  36. public:
  37. std::string contents_;
  38. virtual Status Close() { return Status::OK(); }
  39. virtual Status Flush() { return Status::OK(); }
  40. virtual Status Sync() { return Status::OK(); }
  41. virtual Status Append(const Slice& slice) {
  42. contents_.append(slice.data(), slice.size());
  43. return Status::OK();
  44. }
  45. };
  46. class StringSource : public SequentialFile {
  47. public:
  48. Slice contents_;
  49. bool force_error_;
  50. bool returned_partial_;
  51. StringSource() : force_error_(false), returned_partial_(false) { }
  52. virtual Status Read(size_t n, Slice* result, char* scratch) {
  53. ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
  54. if (force_error_) {
  55. force_error_ = false;
  56. returned_partial_ = true;
  57. return Status::Corruption("read error");
  58. }
  59. if (contents_.size() < n) {
  60. n = contents_.size();
  61. returned_partial_ = true;
  62. }
  63. *result = Slice(contents_.data(), n);
  64. contents_.remove_prefix(n);
  65. return Status::OK();
  66. }
  67. virtual Status Skip(uint64_t n) {
  68. if (n > contents_.size()) {
  69. contents_.clear();
  70. return Status::NotFound("in-memory file skipped past end");
  71. }
  72. contents_.remove_prefix(n);
  73. return Status::OK();
  74. }
  75. };
  76. class ReportCollector : public Reader::Reporter {
  77. public:
  78. size_t dropped_bytes_;
  79. std::string message_;
  80. ReportCollector() : dropped_bytes_(0) { }
  81. virtual void Corruption(size_t bytes, const Status& status) {
  82. dropped_bytes_ += bytes;
  83. message_.append(status.ToString());
  84. }
  85. };
  86. StringDest dest_;
  87. StringSource source_;
  88. ReportCollector report_;
  89. bool reading_;
  90. Writer* writer_;
  91. Reader* reader_;
  92. // Record metadata for testing initial offset functionality
  93. static size_t initial_offset_record_sizes_[];
  94. static uint64_t initial_offset_last_record_offsets_[];
  95. static int num_initial_offset_records_;
  96. public:
  97. LogTest() : reading_(false),
  98. writer_(new Writer(&dest_)),
  99. reader_(new Reader(&source_, &report_, true/*checksum*/,
  100. 0/*initial_offset*/)) {
  101. }
  102. ~LogTest() {
  103. delete writer_;
  104. delete reader_;
  105. }
  106. void ReopenForAppend() {
  107. delete writer_;
  108. writer_ = new Writer(&dest_, dest_.contents_.size());
  109. }
  110. void Write(const std::string& msg) {
  111. ASSERT_TRUE(!reading_) << "Write() after starting to read";
  112. writer_->AddRecord(Slice(msg));
  113. }
  114. size_t WrittenBytes() const {
  115. return dest_.contents_.size();
  116. }
  117. std::string Read() {
  118. if (!reading_) {
  119. reading_ = true;
  120. source_.contents_ = Slice(dest_.contents_);
  121. }
  122. std::string scratch;
  123. Slice record;
  124. if (reader_->ReadRecord(&record, &scratch)) {
  125. return record.ToString();
  126. } else {
  127. return "EOF";
  128. }
  129. }
  130. void IncrementByte(int offset, int delta) {
  131. dest_.contents_[offset] += delta;
  132. }
  133. void SetByte(int offset, char new_byte) {
  134. dest_.contents_[offset] = new_byte;
  135. }
  136. void ShrinkSize(int bytes) {
  137. dest_.contents_.resize(dest_.contents_.size() - bytes);
  138. }
  139. void FixChecksum(int header_offset, int len) {
  140. // Compute crc of type/len/data
  141. uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
  142. crc = crc32c::Mask(crc);
  143. EncodeFixed32(&dest_.contents_[header_offset], crc);
  144. }
  145. void ForceError() {
  146. source_.force_error_ = true;
  147. }
  148. size_t DroppedBytes() const {
  149. return report_.dropped_bytes_;
  150. }
  151. std::string ReportMessage() const {
  152. return report_.message_;
  153. }
  154. // Returns OK iff recorded error message contains "msg"
  155. std::string MatchError(const std::string& msg) const {
  156. if (report_.message_.find(msg) == std::string::npos) {
  157. return report_.message_;
  158. } else {
  159. return "OK";
  160. }
  161. }
  162. void WriteInitialOffsetLog() {
  163. for (int i = 0; i < num_initial_offset_records_; i++) {
  164. std::string record(initial_offset_record_sizes_[i],
  165. static_cast<char>('a' + i));
  166. Write(record);
  167. }
  168. }
  169. void StartReadingAt(uint64_t initial_offset) {
  170. delete reader_;
  171. reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset);
  172. }
  173. void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
  174. WriteInitialOffsetLog();
  175. reading_ = true;
  176. source_.contents_ = Slice(dest_.contents_);
  177. Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
  178. WrittenBytes() + offset_past_end);
  179. Slice record;
  180. std::string scratch;
  181. ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
  182. delete offset_reader;
  183. }
  184. void CheckInitialOffsetRecord(uint64_t initial_offset,
  185. int expected_record_offset) {
  186. WriteInitialOffsetLog();
  187. reading_ = true;
  188. source_.contents_ = Slice(dest_.contents_);
  189. Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
  190. initial_offset);
  191. // Read all records from expected_record_offset through the last one.
  192. ASSERT_LT(expected_record_offset, num_initial_offset_records_);
  193. for (; expected_record_offset < num_initial_offset_records_;
  194. ++expected_record_offset) {
  195. Slice record;
  196. std::string scratch;
  197. ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
  198. ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
  199. record.size());
  200. ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
  201. offset_reader->LastRecordOffset());
  202. ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
  203. }
  204. delete offset_reader;
  205. }
  206. };
  207. size_t LogTest::initial_offset_record_sizes_[] =
  208. {10000, // Two sizable records in first block
  209. 10000,
  210. 2 * log::kBlockSize - 1000, // Span three blocks
  211. 1,
  212. 13716, // Consume all but two bytes of block 3.
  213. log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
  214. };
  215. uint64_t LogTest::initial_offset_last_record_offsets_[] =
  216. {0,
  217. kHeaderSize + 10000,
  218. 2 * (kHeaderSize + 10000),
  219. 2 * (kHeaderSize + 10000) +
  220. (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
  221. 2 * (kHeaderSize + 10000) +
  222. (2 * log::kBlockSize - 1000) + 3 * kHeaderSize
  223. + kHeaderSize + 1,
  224. 3 * log::kBlockSize,
  225. };
  226. // LogTest::initial_offset_last_record_offsets_ must be defined before this.
  227. int LogTest::num_initial_offset_records_ =
  228. sizeof(LogTest::initial_offset_last_record_offsets_)/sizeof(uint64_t);
  229. TEST(LogTest, Empty) {
  230. ASSERT_EQ("EOF", Read());
  231. }
  232. TEST(LogTest, ReadWrite) {
  233. Write("foo");
  234. Write("bar");
  235. Write("");
  236. Write("xxxx");
  237. ASSERT_EQ("foo", Read());
  238. ASSERT_EQ("bar", Read());
  239. ASSERT_EQ("", Read());
  240. ASSERT_EQ("xxxx", Read());
  241. ASSERT_EQ("EOF", Read());
  242. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  243. }
  244. TEST(LogTest, ManyBlocks) {
  245. for (int i = 0; i < 100000; i++) {
  246. Write(NumberString(i));
  247. }
  248. for (int i = 0; i < 100000; i++) {
  249. ASSERT_EQ(NumberString(i), Read());
  250. }
  251. ASSERT_EQ("EOF", Read());
  252. }
  253. TEST(LogTest, Fragmentation) {
  254. Write("small");
  255. Write(BigString("medium", 50000));
  256. Write(BigString("large", 100000));
  257. ASSERT_EQ("small", Read());
  258. ASSERT_EQ(BigString("medium", 50000), Read());
  259. ASSERT_EQ(BigString("large", 100000), Read());
  260. ASSERT_EQ("EOF", Read());
  261. }
  262. TEST(LogTest, MarginalTrailer) {
  263. // Make a trailer that is exactly the same length as an empty record.
  264. const int n = kBlockSize - 2*kHeaderSize;
  265. Write(BigString("foo", n));
  266. ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
  267. Write("");
  268. Write("bar");
  269. ASSERT_EQ(BigString("foo", n), Read());
  270. ASSERT_EQ("", Read());
  271. ASSERT_EQ("bar", Read());
  272. ASSERT_EQ("EOF", Read());
  273. }
  274. TEST(LogTest, MarginalTrailer2) {
  275. // Make a trailer that is exactly the same length as an empty record.
  276. const int n = kBlockSize - 2*kHeaderSize;
  277. Write(BigString("foo", n));
  278. ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
  279. Write("bar");
  280. ASSERT_EQ(BigString("foo", n), Read());
  281. ASSERT_EQ("bar", Read());
  282. ASSERT_EQ("EOF", Read());
  283. ASSERT_EQ(0, DroppedBytes());
  284. ASSERT_EQ("", ReportMessage());
  285. }
  286. TEST(LogTest, ShortTrailer) {
  287. const int n = kBlockSize - 2*kHeaderSize + 4;
  288. Write(BigString("foo", n));
  289. ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
  290. Write("");
  291. Write("bar");
  292. ASSERT_EQ(BigString("foo", n), Read());
  293. ASSERT_EQ("", Read());
  294. ASSERT_EQ("bar", Read());
  295. ASSERT_EQ("EOF", Read());
  296. }
  297. TEST(LogTest, AlignedEof) {
  298. const int n = kBlockSize - 2*kHeaderSize + 4;
  299. Write(BigString("foo", n));
  300. ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
  301. ASSERT_EQ(BigString("foo", n), Read());
  302. ASSERT_EQ("EOF", Read());
  303. }
  304. TEST(LogTest, OpenForAppend) {
  305. Write("hello");
  306. ReopenForAppend();
  307. Write("world");
  308. ASSERT_EQ("hello", Read());
  309. ASSERT_EQ("world", Read());
  310. ASSERT_EQ("EOF", Read());
  311. }
  312. TEST(LogTest, RandomRead) {
  313. const int N = 500;
  314. Random write_rnd(301);
  315. for (int i = 0; i < N; i++) {
  316. Write(RandomSkewedString(i, &write_rnd));
  317. }
  318. Random read_rnd(301);
  319. for (int i = 0; i < N; i++) {
  320. ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
  321. }
  322. ASSERT_EQ("EOF", Read());
  323. }
  324. // Tests of all the error paths in log_reader.cc follow:
  325. TEST(LogTest, ReadError) {
  326. Write("foo");
  327. ForceError();
  328. ASSERT_EQ("EOF", Read());
  329. ASSERT_EQ(kBlockSize, DroppedBytes());
  330. ASSERT_EQ("OK", MatchError("read error"));
  331. }
  332. TEST(LogTest, BadRecordType) {
  333. Write("foo");
  334. // Type is stored in header[6]
  335. IncrementByte(6, 100);
  336. FixChecksum(0, 3);
  337. ASSERT_EQ("EOF", Read());
  338. ASSERT_EQ(3, DroppedBytes());
  339. ASSERT_EQ("OK", MatchError("unknown record type"));
  340. }
  341. TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
  342. Write("foo");
  343. ShrinkSize(4); // Drop all payload as well as a header byte
  344. ASSERT_EQ("EOF", Read());
  345. // Truncated last record is ignored, not treated as an error.
  346. ASSERT_EQ(0, DroppedBytes());
  347. ASSERT_EQ("", ReportMessage());
  348. }
  349. TEST(LogTest, BadLength) {
  350. const int kPayloadSize = kBlockSize - kHeaderSize;
  351. Write(BigString("bar", kPayloadSize));
  352. Write("foo");
  353. // Least significant size byte is stored in header[4].
  354. IncrementByte(4, 1);
  355. ASSERT_EQ("foo", Read());
  356. ASSERT_EQ(kBlockSize, DroppedBytes());
  357. ASSERT_EQ("OK", MatchError("bad record length"));
  358. }
  359. TEST(LogTest, BadLengthAtEndIsIgnored) {
  360. Write("foo");
  361. ShrinkSize(1);
  362. ASSERT_EQ("EOF", Read());
  363. ASSERT_EQ(0, DroppedBytes());
  364. ASSERT_EQ("", ReportMessage());
  365. }
  366. TEST(LogTest, ChecksumMismatch) {
  367. Write("foo");
  368. IncrementByte(0, 10);
  369. ASSERT_EQ("EOF", Read());
  370. ASSERT_EQ(10, DroppedBytes());
  371. ASSERT_EQ("OK", MatchError("checksum mismatch"));
  372. }
  373. TEST(LogTest, UnexpectedMiddleType) {
  374. Write("foo");
  375. SetByte(6, kMiddleType);
  376. FixChecksum(0, 3);
  377. ASSERT_EQ("EOF", Read());
  378. ASSERT_EQ(3, DroppedBytes());
  379. ASSERT_EQ("OK", MatchError("missing start"));
  380. }
  381. TEST(LogTest, UnexpectedLastType) {
  382. Write("foo");
  383. SetByte(6, kLastType);
  384. FixChecksum(0, 3);
  385. ASSERT_EQ("EOF", Read());
  386. ASSERT_EQ(3, DroppedBytes());
  387. ASSERT_EQ("OK", MatchError("missing start"));
  388. }
  389. TEST(LogTest, UnexpectedFullType) {
  390. Write("foo");
  391. Write("bar");
  392. SetByte(6, kFirstType);
  393. FixChecksum(0, 3);
  394. ASSERT_EQ("bar", Read());
  395. ASSERT_EQ("EOF", Read());
  396. ASSERT_EQ(3, DroppedBytes());
  397. ASSERT_EQ("OK", MatchError("partial record without end"));
  398. }
  399. TEST(LogTest, UnexpectedFirstType) {
  400. Write("foo");
  401. Write(BigString("bar", 100000));
  402. SetByte(6, kFirstType);
  403. FixChecksum(0, 3);
  404. ASSERT_EQ(BigString("bar", 100000), Read());
  405. ASSERT_EQ("EOF", Read());
  406. ASSERT_EQ(3, DroppedBytes());
  407. ASSERT_EQ("OK", MatchError("partial record without end"));
  408. }
  409. TEST(LogTest, MissingLastIsIgnored) {
  410. Write(BigString("bar", kBlockSize));
  411. // Remove the LAST block, including header.
  412. ShrinkSize(14);
  413. ASSERT_EQ("EOF", Read());
  414. ASSERT_EQ("", ReportMessage());
  415. ASSERT_EQ(0, DroppedBytes());
  416. }
  417. TEST(LogTest, PartialLastIsIgnored) {
  418. Write(BigString("bar", kBlockSize));
  419. // Cause a bad record length in the LAST block.
  420. ShrinkSize(1);
  421. ASSERT_EQ("EOF", Read());
  422. ASSERT_EQ("", ReportMessage());
  423. ASSERT_EQ(0, DroppedBytes());
  424. }
  425. TEST(LogTest, SkipIntoMultiRecord) {
  426. // Consider a fragmented record:
  427. // first(R1), middle(R1), last(R1), first(R2)
  428. // If initial_offset points to a record after first(R1) but before first(R2)
  429. // incomplete fragment errors are not actual errors, and must be suppressed
  430. // until a new first or full record is encountered.
  431. Write(BigString("foo", 3*kBlockSize));
  432. Write("correct");
  433. StartReadingAt(kBlockSize);
  434. ASSERT_EQ("correct", Read());
  435. ASSERT_EQ("", ReportMessage());
  436. ASSERT_EQ(0, DroppedBytes());
  437. ASSERT_EQ("EOF", Read());
  438. }
  439. TEST(LogTest, ErrorJoinsRecords) {
  440. // Consider two fragmented records:
  441. // first(R1) last(R1) first(R2) last(R2)
  442. // where the middle two fragments disappear. We do not want
  443. // first(R1),last(R2) to get joined and returned as a valid record.
  444. // Write records that span two blocks
  445. Write(BigString("foo", kBlockSize));
  446. Write(BigString("bar", kBlockSize));
  447. Write("correct");
  448. // Wipe the middle block
  449. for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
  450. SetByte(offset, 'x');
  451. }
  452. ASSERT_EQ("correct", Read());
  453. ASSERT_EQ("EOF", Read());
  454. const size_t dropped = DroppedBytes();
  455. ASSERT_LE(dropped, 2*kBlockSize + 100);
  456. ASSERT_GE(dropped, 2*kBlockSize);
  457. }
  458. TEST(LogTest, ReadStart) {
  459. CheckInitialOffsetRecord(0, 0);
  460. }
  461. TEST(LogTest, ReadSecondOneOff) {
  462. CheckInitialOffsetRecord(1, 1);
  463. }
  464. TEST(LogTest, ReadSecondTenThousand) {
  465. CheckInitialOffsetRecord(10000, 1);
  466. }
  467. TEST(LogTest, ReadSecondStart) {
  468. CheckInitialOffsetRecord(10007, 1);
  469. }
  470. TEST(LogTest, ReadThirdOneOff) {
  471. CheckInitialOffsetRecord(10008, 2);
  472. }
  473. TEST(LogTest, ReadThirdStart) {
  474. CheckInitialOffsetRecord(20014, 2);
  475. }
  476. TEST(LogTest, ReadFourthOneOff) {
  477. CheckInitialOffsetRecord(20015, 3);
  478. }
  479. TEST(LogTest, ReadFourthFirstBlockTrailer) {
  480. CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
  481. }
  482. TEST(LogTest, ReadFourthMiddleBlock) {
  483. CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
  484. }
  485. TEST(LogTest, ReadFourthLastBlock) {
  486. CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
  487. }
  488. TEST(LogTest, ReadFourthStart) {
  489. CheckInitialOffsetRecord(
  490. 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
  491. 3);
  492. }
  493. TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
  494. CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
  495. }
  496. TEST(LogTest, ReadEnd) {
  497. CheckOffsetPastEndReturnsNoRecords(0);
  498. }
  499. TEST(LogTest, ReadPastEnd) {
  500. CheckOffsetPastEndReturnsNoRecords(5);
  501. }
  502. } // namespace log
  503. } // namespace leveldb
  504. int main(int argc, char** argv) {
  505. return leveldb::test::RunAllTests();
  506. }