db_impl.cc 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/db_impl.h"
  5. #include <stdint.h>
  6. #include <stdio.h>
  7. #include <algorithm>
  8. #include <atomic>
  9. #include <set>
  10. #include <string>
  11. #include <vector>
  12. #include "db/builder.h"
  13. #include "db/db_iter.h"
  14. #include "db/dbformat.h"
  15. #include "db/filename.h"
  16. #include "db/log_reader.h"
  17. #include "db/log_writer.h"
  18. #include "db/memtable.h"
  19. #include "db/table_cache.h"
  20. #include "db/version_set.h"
  21. #include "db/write_batch_internal.h"
  22. #include "leveldb/db.h"
  23. #include "leveldb/env.h"
  24. #include "leveldb/status.h"
  25. #include "leveldb/table.h"
  26. #include "leveldb/table_builder.h"
  27. #include "port/port.h"
  28. #include "table/block.h"
  29. #include "table/merger.h"
  30. #include "table/two_level_iterator.h"
  31. #include "util/coding.h"
  32. #include "util/logging.h"
  33. #include "util/mutexlock.h"
  34. namespace leveldb {
  35. const int kNumNonTableCacheFiles = 10;
  36. // Information kept for every waiting writer
  37. struct DBImpl::Writer {
  38. Status status;
  39. WriteBatch* batch;
  40. bool sync;
  41. bool done;
  42. port::CondVar cv;
  43. explicit Writer(port::Mutex* mu) : cv(mu) { }
  44. };
  45. struct DBImpl::CompactionState {
  46. Compaction* const compaction;
  47. // Sequence numbers < smallest_snapshot are not significant since we
  48. // will never have to service a snapshot below smallest_snapshot.
  49. // Therefore if we have seen a sequence number S <= smallest_snapshot,
  50. // we can drop all entries for the same key with sequence numbers < S.
  51. SequenceNumber smallest_snapshot;
  52. // Files produced by compaction
  53. struct Output {
  54. uint64_t number;
  55. uint64_t file_size;
  56. InternalKey smallest, largest;
  57. };
  58. std::vector<Output> outputs;
  59. // State kept for output being generated
  60. WritableFile* outfile;
  61. TableBuilder* builder;
  62. uint64_t total_bytes;
  63. Output* current_output() { return &outputs[outputs.size()-1]; }
  64. explicit CompactionState(Compaction* c)
  65. : compaction(c),
  66. outfile(nullptr),
  67. builder(nullptr),
  68. total_bytes(0) {
  69. }
  70. };
  71. // Fix user-supplied options to be reasonable
  72. template <class T, class V>
  73. static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  74. if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  75. if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  76. }
  77. Options SanitizeOptions(const std::string& dbname,
  78. const InternalKeyComparator* icmp,
  79. const InternalFilterPolicy* ipolicy,
  80. const Options& src) {
  81. Options result = src;
  82. result.comparator = icmp;
  83. result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
  84. ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
  85. ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
  86. ClipToRange(&result.max_file_size, 1<<20, 1<<30);
  87. ClipToRange(&result.block_size, 1<<10, 4<<20);
  88. if (result.info_log == nullptr) {
  89. // Open a log file in the same directory as the db
  90. src.env->CreateDir(dbname); // In case it does not exist
  91. src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
  92. Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
  93. if (!s.ok()) {
  94. // No place suitable for logging
  95. result.info_log = nullptr;
  96. }
  97. }
  98. if (result.block_cache == nullptr) {
  99. result.block_cache = NewLRUCache(8 << 20);
  100. }
  101. return result;
  102. }
  103. static int TableCacheSize(const Options& sanitized_options) {
  104. // Reserve ten files or so for other uses and give the rest to TableCache.
  105. return sanitized_options.max_open_files - kNumNonTableCacheFiles;
  106. }
  107. DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
  108. : env_(raw_options.env),
  109. internal_comparator_(raw_options.comparator),
  110. internal_filter_policy_(raw_options.filter_policy),
  111. options_(SanitizeOptions(dbname, &internal_comparator_,
  112. &internal_filter_policy_, raw_options)),
  113. owns_info_log_(options_.info_log != raw_options.info_log),
  114. owns_cache_(options_.block_cache != raw_options.block_cache),
  115. dbname_(dbname),
  116. table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
  117. db_lock_(nullptr),
  118. shutting_down_(false),
  119. background_work_finished_signal_(&mutex_),
  120. mem_(nullptr),
  121. imm_(nullptr),
  122. has_imm_(false),
  123. logfile_(nullptr),
  124. logfile_number_(0),
  125. log_(nullptr),
  126. seed_(0),
  127. tmp_batch_(new WriteBatch),
  128. background_compaction_scheduled_(false),
  129. manual_compaction_(nullptr),
  130. versions_(new VersionSet(dbname_, &options_, table_cache_,
  131. &internal_comparator_)) {}
  132. DBImpl::~DBImpl() {
  133. // Wait for background work to finish.
  134. mutex_.Lock();
  135. shutting_down_.store(true, std::memory_order_release);
  136. while (background_compaction_scheduled_) {
  137. background_work_finished_signal_.Wait();
  138. }
  139. mutex_.Unlock();
  140. if (db_lock_ != nullptr) {
  141. env_->UnlockFile(db_lock_);
  142. }
  143. delete versions_;
  144. if (mem_ != nullptr) mem_->Unref();
  145. if (imm_ != nullptr) imm_->Unref();
  146. delete tmp_batch_;
  147. delete log_;
  148. delete logfile_;
  149. delete table_cache_;
  150. if (owns_info_log_) {
  151. delete options_.info_log;
  152. }
  153. if (owns_cache_) {
  154. delete options_.block_cache;
  155. }
  156. }
  157. Status DBImpl::NewDB() {
  158. VersionEdit new_db;
  159. new_db.SetComparatorName(user_comparator()->Name());
  160. new_db.SetLogNumber(0);
  161. new_db.SetNextFile(2);
  162. new_db.SetLastSequence(0);
  163. const std::string manifest = DescriptorFileName(dbname_, 1);
  164. WritableFile* file;
  165. Status s = env_->NewWritableFile(manifest, &file);
  166. if (!s.ok()) {
  167. return s;
  168. }
  169. {
  170. log::Writer log(file);
  171. std::string record;
  172. new_db.EncodeTo(&record);
  173. s = log.AddRecord(record);
  174. if (s.ok()) {
  175. s = file->Close();
  176. }
  177. }
  178. delete file;
  179. if (s.ok()) {
  180. // Make "CURRENT" file that points to the new manifest file.
  181. s = SetCurrentFile(env_, dbname_, 1);
  182. } else {
  183. env_->DeleteFile(manifest);
  184. }
  185. return s;
  186. }
  187. void DBImpl::MaybeIgnoreError(Status* s) const {
  188. if (s->ok() || options_.paranoid_checks) {
  189. // No change needed
  190. } else {
  191. Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
  192. *s = Status::OK();
  193. }
  194. }
  195. void DBImpl::DeleteObsoleteFiles() {
  196. mutex_.AssertHeld();
  197. if (!bg_error_.ok()) {
  198. // After a background error, we don't know whether a new version may
  199. // or may not have been committed, so we cannot safely garbage collect.
  200. return;
  201. }
  202. // Make a set of all of the live files
  203. std::set<uint64_t> live = pending_outputs_;
  204. versions_->AddLiveFiles(&live);
  205. std::vector<std::string> filenames;
  206. env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
  207. uint64_t number;
  208. FileType type;
  209. for (size_t i = 0; i < filenames.size(); i++) {
  210. if (ParseFileName(filenames[i], &number, &type)) {
  211. bool keep = true;
  212. switch (type) {
  213. case kLogFile:
  214. keep = ((number >= versions_->LogNumber()) ||
  215. (number == versions_->PrevLogNumber()));
  216. break;
  217. case kDescriptorFile:
  218. // Keep my manifest file, and any newer incarnations'
  219. // (in case there is a race that allows other incarnations)
  220. keep = (number >= versions_->ManifestFileNumber());
  221. break;
  222. case kTableFile:
  223. keep = (live.find(number) != live.end());
  224. break;
  225. case kTempFile:
  226. // Any temp files that are currently being written to must
  227. // be recorded in pending_outputs_, which is inserted into "live"
  228. keep = (live.find(number) != live.end());
  229. break;
  230. case kCurrentFile:
  231. case kDBLockFile:
  232. case kInfoLogFile:
  233. keep = true;
  234. break;
  235. }
  236. if (!keep) {
  237. if (type == kTableFile) {
  238. table_cache_->Evict(number);
  239. }
  240. Log(options_.info_log, "Delete type=%d #%lld\n",
  241. static_cast<int>(type),
  242. static_cast<unsigned long long>(number));
  243. env_->DeleteFile(dbname_ + "/" + filenames[i]);
  244. }
  245. }
  246. }
  247. }
  248. Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
  249. mutex_.AssertHeld();
  250. // Ignore error from CreateDir since the creation of the DB is
  251. // committed only when the descriptor is created, and this directory
  252. // may already exist from a previous failed creation attempt.
  253. env_->CreateDir(dbname_);
  254. assert(db_lock_ == nullptr);
  255. Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  256. if (!s.ok()) {
  257. return s;
  258. }
  259. if (!env_->FileExists(CurrentFileName(dbname_))) {
  260. if (options_.create_if_missing) {
  261. s = NewDB();
  262. if (!s.ok()) {
  263. return s;
  264. }
  265. } else {
  266. return Status::InvalidArgument(
  267. dbname_, "does not exist (create_if_missing is false)");
  268. }
  269. } else {
  270. if (options_.error_if_exists) {
  271. return Status::InvalidArgument(
  272. dbname_, "exists (error_if_exists is true)");
  273. }
  274. }
  275. s = versions_->Recover(save_manifest);
  276. if (!s.ok()) {
  277. return s;
  278. }
  279. SequenceNumber max_sequence(0);
  280. // Recover from all newer log files than the ones named in the
  281. // descriptor (new log files may have been added by the previous
  282. // incarnation without registering them in the descriptor).
  283. //
  284. // Note that PrevLogNumber() is no longer used, but we pay
  285. // attention to it in case we are recovering a database
  286. // produced by an older version of leveldb.
  287. const uint64_t min_log = versions_->LogNumber();
  288. const uint64_t prev_log = versions_->PrevLogNumber();
  289. std::vector<std::string> filenames;
  290. s = env_->GetChildren(dbname_, &filenames);
  291. if (!s.ok()) {
  292. return s;
  293. }
  294. std::set<uint64_t> expected;
  295. versions_->AddLiveFiles(&expected);
  296. uint64_t number;
  297. FileType type;
  298. std::vector<uint64_t> logs;
  299. for (size_t i = 0; i < filenames.size(); i++) {
  300. if (ParseFileName(filenames[i], &number, &type)) {
  301. expected.erase(number);
  302. if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
  303. logs.push_back(number);
  304. }
  305. }
  306. if (!expected.empty()) {
  307. char buf[50];
  308. snprintf(buf, sizeof(buf), "%d missing files; e.g.",
  309. static_cast<int>(expected.size()));
  310. return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
  311. }
  312. // Recover in the order in which the logs were generated
  313. std::sort(logs.begin(), logs.end());
  314. for (size_t i = 0; i < logs.size(); i++) {
  315. s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
  316. &max_sequence);
  317. if (!s.ok()) {
  318. return s;
  319. }
  320. // The previous incarnation may not have written any MANIFEST
  321. // records after allocating this log number. So we manually
  322. // update the file number allocation counter in VersionSet.
  323. versions_->MarkFileNumberUsed(logs[i]);
  324. }
  325. if (versions_->LastSequence() < max_sequence) {
  326. versions_->SetLastSequence(max_sequence);
  327. }
  328. return Status::OK();
  329. }
  330. Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
  331. bool* save_manifest, VersionEdit* edit,
  332. SequenceNumber* max_sequence) {
  333. struct LogReporter : public log::Reader::Reporter {
  334. Env* env;
  335. Logger* info_log;
  336. const char* fname;
  337. Status* status; // null if options_.paranoid_checks==false
  338. virtual void Corruption(size_t bytes, const Status& s) {
  339. Log(info_log, "%s%s: dropping %d bytes; %s",
  340. (this->status == nullptr ? "(ignoring error) " : ""),
  341. fname, static_cast<int>(bytes), s.ToString().c_str());
  342. if (this->status != nullptr && this->status->ok()) *this->status = s;
  343. }
  344. };
  345. mutex_.AssertHeld();
  346. // Open the log file
  347. std::string fname = LogFileName(dbname_, log_number);
  348. SequentialFile* file;
  349. Status status = env_->NewSequentialFile(fname, &file);
  350. if (!status.ok()) {
  351. MaybeIgnoreError(&status);
  352. return status;
  353. }
  354. // Create the log reader.
  355. LogReporter reporter;
  356. reporter.env = env_;
  357. reporter.info_log = options_.info_log;
  358. reporter.fname = fname.c_str();
  359. reporter.status = (options_.paranoid_checks ? &status : nullptr);
  360. // We intentionally make log::Reader do checksumming even if
  361. // paranoid_checks==false so that corruptions cause entire commits
  362. // to be skipped instead of propagating bad information (like overly
  363. // large sequence numbers).
  364. log::Reader reader(file, &reporter, true/*checksum*/,
  365. 0/*initial_offset*/);
  366. Log(options_.info_log, "Recovering log #%llu",
  367. (unsigned long long) log_number);
  368. // Read all the records and add to a memtable
  369. std::string scratch;
  370. Slice record;
  371. WriteBatch batch;
  372. int compactions = 0;
  373. MemTable* mem = nullptr;
  374. while (reader.ReadRecord(&record, &scratch) &&
  375. status.ok()) {
  376. if (record.size() < 12) {
  377. reporter.Corruption(
  378. record.size(), Status::Corruption("log record too small"));
  379. continue;
  380. }
  381. WriteBatchInternal::SetContents(&batch, record);
  382. if (mem == nullptr) {
  383. mem = new MemTable(internal_comparator_);
  384. mem->Ref();
  385. }
  386. status = WriteBatchInternal::InsertInto(&batch, mem);
  387. MaybeIgnoreError(&status);
  388. if (!status.ok()) {
  389. break;
  390. }
  391. const SequenceNumber last_seq =
  392. WriteBatchInternal::Sequence(&batch) +
  393. WriteBatchInternal::Count(&batch) - 1;
  394. if (last_seq > *max_sequence) {
  395. *max_sequence = last_seq;
  396. }
  397. if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
  398. compactions++;
  399. *save_manifest = true;
  400. status = WriteLevel0Table(mem, edit, nullptr);
  401. mem->Unref();
  402. mem = nullptr;
  403. if (!status.ok()) {
  404. // Reflect errors immediately so that conditions like full
  405. // file-systems cause the DB::Open() to fail.
  406. break;
  407. }
  408. }
  409. }
  410. delete file;
  411. // See if we should keep reusing the last log file.
  412. if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
  413. assert(logfile_ == nullptr);
  414. assert(log_ == nullptr);
  415. assert(mem_ == nullptr);
  416. uint64_t lfile_size;
  417. if (env_->GetFileSize(fname, &lfile_size).ok() &&
  418. env_->NewAppendableFile(fname, &logfile_).ok()) {
  419. Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
  420. log_ = new log::Writer(logfile_, lfile_size);
  421. logfile_number_ = log_number;
  422. if (mem != nullptr) {
  423. mem_ = mem;
  424. mem = nullptr;
  425. } else {
  426. // mem can be nullptr if lognum exists but was empty.
  427. mem_ = new MemTable(internal_comparator_);
  428. mem_->Ref();
  429. }
  430. }
  431. }
  432. if (mem != nullptr) {
  433. // mem did not get reused; compact it.
  434. if (status.ok()) {
  435. *save_manifest = true;
  436. status = WriteLevel0Table(mem, edit, nullptr);
  437. }
  438. mem->Unref();
  439. }
  440. return status;
  441. }
  442. Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
  443. Version* base) {
  444. mutex_.AssertHeld();
  445. const uint64_t start_micros = env_->NowMicros();
  446. FileMetaData meta;
  447. meta.number = versions_->NewFileNumber();
  448. pending_outputs_.insert(meta.number);
  449. Iterator* iter = mem->NewIterator();
  450. Log(options_.info_log, "Level-0 table #%llu: started",
  451. (unsigned long long) meta.number);
  452. Status s;
  453. {
  454. mutex_.Unlock();
  455. s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
  456. mutex_.Lock();
  457. }
  458. Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
  459. (unsigned long long) meta.number,
  460. (unsigned long long) meta.file_size,
  461. s.ToString().c_str());
  462. delete iter;
  463. pending_outputs_.erase(meta.number);
  464. // Note that if file_size is zero, the file has been deleted and
  465. // should not be added to the manifest.
  466. int level = 0;
  467. if (s.ok() && meta.file_size > 0) {
  468. const Slice min_user_key = meta.smallest.user_key();
  469. const Slice max_user_key = meta.largest.user_key();
  470. if (base != nullptr) {
  471. level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
  472. }
  473. edit->AddFile(level, meta.number, meta.file_size,
  474. meta.smallest, meta.largest);
  475. }
  476. CompactionStats stats;
  477. stats.micros = env_->NowMicros() - start_micros;
  478. stats.bytes_written = meta.file_size;
  479. stats_[level].Add(stats);
  480. return s;
  481. }
  482. void DBImpl::CompactMemTable() {
  483. mutex_.AssertHeld();
  484. assert(imm_ != nullptr);
  485. // Save the contents of the memtable as a new Table
  486. VersionEdit edit;
  487. Version* base = versions_->current();
  488. base->Ref();
  489. Status s = WriteLevel0Table(imm_, &edit, base);
  490. base->Unref();
  491. if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
  492. s = Status::IOError("Deleting DB during memtable compaction");
  493. }
  494. // Replace immutable memtable with the generated Table
  495. if (s.ok()) {
  496. edit.SetPrevLogNumber(0);
  497. edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
  498. s = versions_->LogAndApply(&edit, &mutex_);
  499. }
  500. if (s.ok()) {
  501. // Commit to the new state
  502. imm_->Unref();
  503. imm_ = nullptr;
  504. has_imm_.store(false, std::memory_order_release);
  505. DeleteObsoleteFiles();
  506. } else {
  507. RecordBackgroundError(s);
  508. }
  509. }
  510. void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
  511. int max_level_with_files = 1;
  512. {
  513. MutexLock l(&mutex_);
  514. Version* base = versions_->current();
  515. for (int level = 1; level < config::kNumLevels; level++) {
  516. if (base->OverlapInLevel(level, begin, end)) {
  517. max_level_with_files = level;
  518. }
  519. }
  520. }
  521. TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
  522. for (int level = 0; level < max_level_with_files; level++) {
  523. TEST_CompactRange(level, begin, end);
  524. }
  525. }
  526. void DBImpl::TEST_CompactRange(int level, const Slice* begin,
  527. const Slice* end) {
  528. assert(level >= 0);
  529. assert(level + 1 < config::kNumLevels);
  530. InternalKey begin_storage, end_storage;
  531. ManualCompaction manual;
  532. manual.level = level;
  533. manual.done = false;
  534. if (begin == nullptr) {
  535. manual.begin = nullptr;
  536. } else {
  537. begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
  538. manual.begin = &begin_storage;
  539. }
  540. if (end == nullptr) {
  541. manual.end = nullptr;
  542. } else {
  543. end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
  544. manual.end = &end_storage;
  545. }
  546. MutexLock l(&mutex_);
  547. while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
  548. bg_error_.ok()) {
  549. if (manual_compaction_ == nullptr) { // Idle
  550. manual_compaction_ = &manual;
  551. MaybeScheduleCompaction();
  552. } else { // Running either my compaction or another compaction.
  553. background_work_finished_signal_.Wait();
  554. }
  555. }
  556. if (manual_compaction_ == &manual) {
  557. // Cancel my manual compaction since we aborted early for some reason.
  558. manual_compaction_ = nullptr;
  559. }
  560. }
  561. Status DBImpl::TEST_CompactMemTable() {
  562. // nullptr batch means just wait for earlier writes to be done
  563. Status s = Write(WriteOptions(), nullptr);
  564. if (s.ok()) {
  565. // Wait until the compaction completes
  566. MutexLock l(&mutex_);
  567. while (imm_ != nullptr && bg_error_.ok()) {
  568. background_work_finished_signal_.Wait();
  569. }
  570. if (imm_ != nullptr) {
  571. s = bg_error_;
  572. }
  573. }
  574. return s;
  575. }
  576. void DBImpl::RecordBackgroundError(const Status& s) {
  577. mutex_.AssertHeld();
  578. if (bg_error_.ok()) {
  579. bg_error_ = s;
  580. background_work_finished_signal_.SignalAll();
  581. }
  582. }
  583. void DBImpl::MaybeScheduleCompaction() {
  584. mutex_.AssertHeld();
  585. if (background_compaction_scheduled_) {
  586. // Already scheduled
  587. } else if (shutting_down_.load(std::memory_order_acquire)) {
  588. // DB is being deleted; no more background compactions
  589. } else if (!bg_error_.ok()) {
  590. // Already got an error; no more changes
  591. } else if (imm_ == nullptr &&
  592. manual_compaction_ == nullptr &&
  593. !versions_->NeedsCompaction()) {
  594. // No work to be done
  595. } else {
  596. background_compaction_scheduled_ = true;
  597. env_->Schedule(&DBImpl::BGWork, this);
  598. }
  599. }
  600. void DBImpl::BGWork(void* db) {
  601. reinterpret_cast<DBImpl*>(db)->BackgroundCall();
  602. }
  603. void DBImpl::BackgroundCall() {
  604. MutexLock l(&mutex_);
  605. assert(background_compaction_scheduled_);
  606. if (shutting_down_.load(std::memory_order_acquire)) {
  607. // No more background work when shutting down.
  608. } else if (!bg_error_.ok()) {
  609. // No more background work after a background error.
  610. } else {
  611. BackgroundCompaction();
  612. }
  613. background_compaction_scheduled_ = false;
  614. // Previous compaction may have produced too many files in a level,
  615. // so reschedule another compaction if needed.
  616. MaybeScheduleCompaction();
  617. background_work_finished_signal_.SignalAll();
  618. }
  619. void DBImpl::BackgroundCompaction() {
  620. mutex_.AssertHeld();
  621. if (imm_ != nullptr) {
  622. CompactMemTable();
  623. return;
  624. }
  625. Compaction* c;
  626. bool is_manual = (manual_compaction_ != nullptr);
  627. InternalKey manual_end;
  628. if (is_manual) {
  629. ManualCompaction* m = manual_compaction_;
  630. c = versions_->CompactRange(m->level, m->begin, m->end);
  631. m->done = (c == nullptr);
  632. if (c != nullptr) {
  633. manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
  634. }
  635. Log(options_.info_log,
  636. "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
  637. m->level,
  638. (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
  639. (m->end ? m->end->DebugString().c_str() : "(end)"),
  640. (m->done ? "(end)" : manual_end.DebugString().c_str()));
  641. } else {
  642. c = versions_->PickCompaction();
  643. }
  644. Status status;
  645. if (c == nullptr) {
  646. // Nothing to do
  647. } else if (!is_manual && c->IsTrivialMove()) {
  648. // Move file to next level
  649. assert(c->num_input_files(0) == 1);
  650. FileMetaData* f = c->input(0, 0);
  651. c->edit()->DeleteFile(c->level(), f->number);
  652. c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
  653. f->smallest, f->largest);
  654. status = versions_->LogAndApply(c->edit(), &mutex_);
  655. if (!status.ok()) {
  656. RecordBackgroundError(status);
  657. }
  658. VersionSet::LevelSummaryStorage tmp;
  659. Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
  660. static_cast<unsigned long long>(f->number),
  661. c->level() + 1,
  662. static_cast<unsigned long long>(f->file_size),
  663. status.ToString().c_str(),
  664. versions_->LevelSummary(&tmp));
  665. } else {
  666. CompactionState* compact = new CompactionState(c);
  667. status = DoCompactionWork(compact);
  668. if (!status.ok()) {
  669. RecordBackgroundError(status);
  670. }
  671. CleanupCompaction(compact);
  672. c->ReleaseInputs();
  673. DeleteObsoleteFiles();
  674. }
  675. delete c;
  676. if (status.ok()) {
  677. // Done
  678. } else if (shutting_down_.load(std::memory_order_acquire)) {
  679. // Ignore compaction errors found during shutting down
  680. } else {
  681. Log(options_.info_log,
  682. "Compaction error: %s", status.ToString().c_str());
  683. }
  684. if (is_manual) {
  685. ManualCompaction* m = manual_compaction_;
  686. if (!status.ok()) {
  687. m->done = true;
  688. }
  689. if (!m->done) {
  690. // We only compacted part of the requested range. Update *m
  691. // to the range that is left to be compacted.
  692. m->tmp_storage = manual_end;
  693. m->begin = &m->tmp_storage;
  694. }
  695. manual_compaction_ = nullptr;
  696. }
  697. }
  698. void DBImpl::CleanupCompaction(CompactionState* compact) {
  699. mutex_.AssertHeld();
  700. if (compact->builder != nullptr) {
  701. // May happen if we get a shutdown call in the middle of compaction
  702. compact->builder->Abandon();
  703. delete compact->builder;
  704. } else {
  705. assert(compact->outfile == nullptr);
  706. }
  707. delete compact->outfile;
  708. for (size_t i = 0; i < compact->outputs.size(); i++) {
  709. const CompactionState::Output& out = compact->outputs[i];
  710. pending_outputs_.erase(out.number);
  711. }
  712. delete compact;
  713. }
  714. Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
  715. assert(compact != nullptr);
  716. assert(compact->builder == nullptr);
  717. uint64_t file_number;
  718. {
  719. mutex_.Lock();
  720. file_number = versions_->NewFileNumber();
  721. pending_outputs_.insert(file_number);
  722. CompactionState::Output out;
  723. out.number = file_number;
  724. out.smallest.Clear();
  725. out.largest.Clear();
  726. compact->outputs.push_back(out);
  727. mutex_.Unlock();
  728. }
  729. // Make the output file
  730. std::string fname = TableFileName(dbname_, file_number);
  731. Status s = env_->NewWritableFile(fname, &compact->outfile);
  732. if (s.ok()) {
  733. compact->builder = new TableBuilder(options_, compact->outfile);
  734. }
  735. return s;
  736. }
  737. Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
  738. Iterator* input) {
  739. assert(compact != nullptr);
  740. assert(compact->outfile != nullptr);
  741. assert(compact->builder != nullptr);
  742. const uint64_t output_number = compact->current_output()->number;
  743. assert(output_number != 0);
  744. // Check for iterator errors
  745. Status s = input->status();
  746. const uint64_t current_entries = compact->builder->NumEntries();
  747. if (s.ok()) {
  748. s = compact->builder->Finish();
  749. } else {
  750. compact->builder->Abandon();
  751. }
  752. const uint64_t current_bytes = compact->builder->FileSize();
  753. compact->current_output()->file_size = current_bytes;
  754. compact->total_bytes += current_bytes;
  755. delete compact->builder;
  756. compact->builder = nullptr;
  757. // Finish and check for file errors
  758. if (s.ok()) {
  759. s = compact->outfile->Sync();
  760. }
  761. if (s.ok()) {
  762. s = compact->outfile->Close();
  763. }
  764. delete compact->outfile;
  765. compact->outfile = nullptr;
  766. if (s.ok() && current_entries > 0) {
  767. // Verify that the table is usable
  768. Iterator* iter = table_cache_->NewIterator(ReadOptions(),
  769. output_number,
  770. current_bytes);
  771. s = iter->status();
  772. delete iter;
  773. if (s.ok()) {
  774. Log(options_.info_log,
  775. "Generated table #%llu@%d: %lld keys, %lld bytes",
  776. (unsigned long long) output_number,
  777. compact->compaction->level(),
  778. (unsigned long long) current_entries,
  779. (unsigned long long) current_bytes);
  780. }
  781. }
  782. return s;
  783. }
  784. Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  785. mutex_.AssertHeld();
  786. Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
  787. compact->compaction->num_input_files(0),
  788. compact->compaction->level(),
  789. compact->compaction->num_input_files(1),
  790. compact->compaction->level() + 1,
  791. static_cast<long long>(compact->total_bytes));
  792. // Add compaction outputs
  793. compact->compaction->AddInputDeletions(compact->compaction->edit());
  794. const int level = compact->compaction->level();
  795. for (size_t i = 0; i < compact->outputs.size(); i++) {
  796. const CompactionState::Output& out = compact->outputs[i];
  797. compact->compaction->edit()->AddFile(
  798. level + 1,
  799. out.number, out.file_size, out.smallest, out.largest);
  800. }
  801. return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
  802. }
  803. Status DBImpl::DoCompactionWork(CompactionState* compact) {
  804. const uint64_t start_micros = env_->NowMicros();
  805. int64_t imm_micros = 0; // Micros spent doing imm_ compactions
  806. Log(options_.info_log, "Compacting %d@%d + %d@%d files",
  807. compact->compaction->num_input_files(0),
  808. compact->compaction->level(),
  809. compact->compaction->num_input_files(1),
  810. compact->compaction->level() + 1);
  811. assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
  812. assert(compact->builder == nullptr);
  813. assert(compact->outfile == nullptr);
  814. if (snapshots_.empty()) {
  815. compact->smallest_snapshot = versions_->LastSequence();
  816. } else {
  817. compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
  818. }
  819. // Release mutex while we're actually doing the compaction work
  820. mutex_.Unlock();
  821. Iterator* input = versions_->MakeInputIterator(compact->compaction);
  822. input->SeekToFirst();
  823. Status status;
  824. ParsedInternalKey ikey;
  825. std::string current_user_key;
  826. bool has_current_user_key = false;
  827. SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  828. for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) {
  829. // Prioritize immutable compaction work
  830. if (has_imm_.load(std::memory_order_relaxed)) {
  831. const uint64_t imm_start = env_->NowMicros();
  832. mutex_.Lock();
  833. if (imm_ != nullptr) {
  834. CompactMemTable();
  835. // Wake up MakeRoomForWrite() if necessary.
  836. background_work_finished_signal_.SignalAll();
  837. }
  838. mutex_.Unlock();
  839. imm_micros += (env_->NowMicros() - imm_start);
  840. }
  841. Slice key = input->key();
  842. if (compact->compaction->ShouldStopBefore(key) &&
  843. compact->builder != nullptr) {
  844. status = FinishCompactionOutputFile(compact, input);
  845. if (!status.ok()) {
  846. break;
  847. }
  848. }
  849. // Handle key/value, add to state, etc.
  850. bool drop = false;
  851. if (!ParseInternalKey(key, &ikey)) {
  852. // Do not hide error keys
  853. current_user_key.clear();
  854. has_current_user_key = false;
  855. last_sequence_for_key = kMaxSequenceNumber;
  856. } else {
  857. if (!has_current_user_key ||
  858. user_comparator()->Compare(ikey.user_key,
  859. Slice(current_user_key)) != 0) {
  860. // First occurrence of this user key
  861. current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
  862. has_current_user_key = true;
  863. last_sequence_for_key = kMaxSequenceNumber;
  864. }
  865. if (last_sequence_for_key <= compact->smallest_snapshot) {
  866. // Hidden by an newer entry for same user key
  867. drop = true; // (A)
  868. } else if (ikey.type == kTypeDeletion &&
  869. ikey.sequence <= compact->smallest_snapshot &&
  870. compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
  871. // For this user key:
  872. // (1) there is no data in higher levels
  873. // (2) data in lower levels will have larger sequence numbers
  874. // (3) data in layers that are being compacted here and have
  875. // smaller sequence numbers will be dropped in the next
  876. // few iterations of this loop (by rule (A) above).
  877. // Therefore this deletion marker is obsolete and can be dropped.
  878. drop = true;
  879. }
  880. last_sequence_for_key = ikey.sequence;
  881. }
  882. #if 0
  883. Log(options_.info_log,
  884. " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
  885. "%d smallest_snapshot: %d",
  886. ikey.user_key.ToString().c_str(),
  887. (int)ikey.sequence, ikey.type, kTypeValue, drop,
  888. compact->compaction->IsBaseLevelForKey(ikey.user_key),
  889. (int)last_sequence_for_key, (int)compact->smallest_snapshot);
  890. #endif
  891. if (!drop) {
  892. // Open output file if necessary
  893. if (compact->builder == nullptr) {
  894. status = OpenCompactionOutputFile(compact);
  895. if (!status.ok()) {
  896. break;
  897. }
  898. }
  899. if (compact->builder->NumEntries() == 0) {
  900. compact->current_output()->smallest.DecodeFrom(key);
  901. }
  902. compact->current_output()->largest.DecodeFrom(key);
  903. compact->builder->Add(key, input->value());
  904. // Close output file if it is big enough
  905. if (compact->builder->FileSize() >=
  906. compact->compaction->MaxOutputFileSize()) {
  907. status = FinishCompactionOutputFile(compact, input);
  908. if (!status.ok()) {
  909. break;
  910. }
  911. }
  912. }
  913. input->Next();
  914. }
  915. if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
  916. status = Status::IOError("Deleting DB during compaction");
  917. }
  918. if (status.ok() && compact->builder != nullptr) {
  919. status = FinishCompactionOutputFile(compact, input);
  920. }
  921. if (status.ok()) {
  922. status = input->status();
  923. }
  924. delete input;
  925. input = nullptr;
  926. CompactionStats stats;
  927. stats.micros = env_->NowMicros() - start_micros - imm_micros;
  928. for (int which = 0; which < 2; which++) {
  929. for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
  930. stats.bytes_read += compact->compaction->input(which, i)->file_size;
  931. }
  932. }
  933. for (size_t i = 0; i < compact->outputs.size(); i++) {
  934. stats.bytes_written += compact->outputs[i].file_size;
  935. }
  936. mutex_.Lock();
  937. stats_[compact->compaction->level() + 1].Add(stats);
  938. if (status.ok()) {
  939. status = InstallCompactionResults(compact);
  940. }
  941. if (!status.ok()) {
  942. RecordBackgroundError(status);
  943. }
  944. VersionSet::LevelSummaryStorage tmp;
  945. Log(options_.info_log,
  946. "compacted to: %s", versions_->LevelSummary(&tmp));
  947. return status;
  948. }
  949. namespace {
  950. struct IterState {
  951. port::Mutex* const mu;
  952. Version* const version GUARDED_BY(mu);
  953. MemTable* const mem GUARDED_BY(mu);
  954. MemTable* const imm GUARDED_BY(mu);
  955. IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
  956. : mu(mutex), version(version), mem(mem), imm(imm) { }
  957. };
  958. static void CleanupIteratorState(void* arg1, void* arg2) {
  959. IterState* state = reinterpret_cast<IterState*>(arg1);
  960. state->mu->Lock();
  961. state->mem->Unref();
  962. if (state->imm != nullptr) state->imm->Unref();
  963. state->version->Unref();
  964. state->mu->Unlock();
  965. delete state;
  966. }
  967. } // anonymous namespace
  968. Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
  969. SequenceNumber* latest_snapshot,
  970. uint32_t* seed) {
  971. mutex_.Lock();
  972. *latest_snapshot = versions_->LastSequence();
  973. // Collect together all needed child iterators
  974. std::vector<Iterator*> list;
  975. list.push_back(mem_->NewIterator());
  976. mem_->Ref();
  977. if (imm_ != nullptr) {
  978. list.push_back(imm_->NewIterator());
  979. imm_->Ref();
  980. }
  981. versions_->current()->AddIterators(options, &list);
  982. Iterator* internal_iter =
  983. NewMergingIterator(&internal_comparator_, &list[0], list.size());
  984. versions_->current()->Ref();
  985. IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
  986. internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
  987. *seed = ++seed_;
  988. mutex_.Unlock();
  989. return internal_iter;
  990. }
  991. Iterator* DBImpl::TEST_NewInternalIterator() {
  992. SequenceNumber ignored;
  993. uint32_t ignored_seed;
  994. return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
  995. }
  996. int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
  997. MutexLock l(&mutex_);
  998. return versions_->MaxNextLevelOverlappingBytes();
  999. }
  1000. Status DBImpl::Get(const ReadOptions& options,
  1001. const Slice& key,
  1002. std::string* value) {
  1003. Status s;
  1004. MutexLock l(&mutex_);
  1005. SequenceNumber snapshot;
  1006. if (options.snapshot != nullptr) {
  1007. snapshot =
  1008. static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
  1009. } else {
  1010. snapshot = versions_->LastSequence();
  1011. }
  1012. MemTable* mem = mem_;
  1013. MemTable* imm = imm_;
  1014. Version* current = versions_->current();
  1015. mem->Ref();
  1016. if (imm != nullptr) imm->Ref();
  1017. current->Ref();
  1018. bool have_stat_update = false;
  1019. Version::GetStats stats;
  1020. // Unlock while reading from files and memtables
  1021. {
  1022. mutex_.Unlock();
  1023. // First look in the memtable, then in the immutable memtable (if any).
  1024. LookupKey lkey(key, snapshot);
  1025. if (mem->Get(lkey, value, &s)) {
  1026. // Done
  1027. } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
  1028. // Done
  1029. } else {
  1030. s = current->Get(options, lkey, value, &stats);
  1031. have_stat_update = true;
  1032. }
  1033. mutex_.Lock();
  1034. }
  1035. if (have_stat_update && current->UpdateStats(stats)) {
  1036. MaybeScheduleCompaction();
  1037. }
  1038. mem->Unref();
  1039. if (imm != nullptr) imm->Unref();
  1040. current->Unref();
  1041. return s;
  1042. }
  1043. Iterator* DBImpl::NewIterator(const ReadOptions& options) {
  1044. SequenceNumber latest_snapshot;
  1045. uint32_t seed;
  1046. Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
  1047. return NewDBIterator(
  1048. this, user_comparator(), iter,
  1049. (options.snapshot != nullptr
  1050. ? static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number()
  1051. : latest_snapshot),
  1052. seed);
  1053. }
  1054. void DBImpl::RecordReadSample(Slice key) {
  1055. MutexLock l(&mutex_);
  1056. if (versions_->current()->RecordReadSample(key)) {
  1057. MaybeScheduleCompaction();
  1058. }
  1059. }
  1060. const Snapshot* DBImpl::GetSnapshot() {
  1061. MutexLock l(&mutex_);
  1062. return snapshots_.New(versions_->LastSequence());
  1063. }
  1064. void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
  1065. MutexLock l(&mutex_);
  1066. snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
  1067. }
  1068. // Convenience methods
  1069. Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  1070. return DB::Put(o, key, val);
  1071. }
  1072. Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
  1073. return DB::Delete(options, key);
  1074. }
  1075. Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  1076. Writer w(&mutex_);
  1077. w.batch = my_batch;
  1078. w.sync = options.sync;
  1079. w.done = false;
  1080. MutexLock l(&mutex_);
  1081. writers_.push_back(&w);
  1082. while (!w.done && &w != writers_.front()) {
  1083. w.cv.Wait();
  1084. }
  1085. if (w.done) {
  1086. return w.status;
  1087. }
  1088. // May temporarily unlock and wait.
  1089. Status status = MakeRoomForWrite(my_batch == nullptr);
  1090. uint64_t last_sequence = versions_->LastSequence();
  1091. Writer* last_writer = &w;
  1092. if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
  1093. WriteBatch* updates = BuildBatchGroup(&last_writer);
  1094. WriteBatchInternal::SetSequence(updates, last_sequence + 1);
  1095. last_sequence += WriteBatchInternal::Count(updates);
  1096. // Add to log and apply to memtable. We can release the lock
  1097. // during this phase since &w is currently responsible for logging
  1098. // and protects against concurrent loggers and concurrent writes
  1099. // into mem_.
  1100. {
  1101. mutex_.Unlock();
  1102. status = log_->AddRecord(WriteBatchInternal::Contents(updates));
  1103. bool sync_error = false;
  1104. if (status.ok() && options.sync) {
  1105. status = logfile_->Sync();
  1106. if (!status.ok()) {
  1107. sync_error = true;
  1108. }
  1109. }
  1110. if (status.ok()) {
  1111. status = WriteBatchInternal::InsertInto(updates, mem_);
  1112. }
  1113. mutex_.Lock();
  1114. if (sync_error) {
  1115. // The state of the log file is indeterminate: the log record we
  1116. // just added may or may not show up when the DB is re-opened.
  1117. // So we force the DB into a mode where all future writes fail.
  1118. RecordBackgroundError(status);
  1119. }
  1120. }
  1121. if (updates == tmp_batch_) tmp_batch_->Clear();
  1122. versions_->SetLastSequence(last_sequence);
  1123. }
  1124. while (true) {
  1125. Writer* ready = writers_.front();
  1126. writers_.pop_front();
  1127. if (ready != &w) {
  1128. ready->status = status;
  1129. ready->done = true;
  1130. ready->cv.Signal();
  1131. }
  1132. if (ready == last_writer) break;
  1133. }
  1134. // Notify new head of write queue
  1135. if (!writers_.empty()) {
  1136. writers_.front()->cv.Signal();
  1137. }
  1138. return status;
  1139. }
  1140. // REQUIRES: Writer list must be non-empty
  1141. // REQUIRES: First writer must have a non-null batch
  1142. WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  1143. mutex_.AssertHeld();
  1144. assert(!writers_.empty());
  1145. Writer* first = writers_.front();
  1146. WriteBatch* result = first->batch;
  1147. assert(result != nullptr);
  1148. size_t size = WriteBatchInternal::ByteSize(first->batch);
  1149. // Allow the group to grow up to a maximum size, but if the
  1150. // original write is small, limit the growth so we do not slow
  1151. // down the small write too much.
  1152. size_t max_size = 1 << 20;
  1153. if (size <= (128<<10)) {
  1154. max_size = size + (128<<10);
  1155. }
  1156. *last_writer = first;
  1157. std::deque<Writer*>::iterator iter = writers_.begin();
  1158. ++iter; // Advance past "first"
  1159. for (; iter != writers_.end(); ++iter) {
  1160. Writer* w = *iter;
  1161. if (w->sync && !first->sync) {
  1162. // Do not include a sync write into a batch handled by a non-sync write.
  1163. break;
  1164. }
  1165. if (w->batch != nullptr) {
  1166. size += WriteBatchInternal::ByteSize(w->batch);
  1167. if (size > max_size) {
  1168. // Do not make batch too big
  1169. break;
  1170. }
  1171. // Append to *result
  1172. if (result == first->batch) {
  1173. // Switch to temporary batch instead of disturbing caller's batch
  1174. result = tmp_batch_;
  1175. assert(WriteBatchInternal::Count(result) == 0);
  1176. WriteBatchInternal::Append(result, first->batch);
  1177. }
  1178. WriteBatchInternal::Append(result, w->batch);
  1179. }
  1180. *last_writer = w;
  1181. }
  1182. return result;
  1183. }
  1184. // REQUIRES: mutex_ is held
  1185. // REQUIRES: this thread is currently at the front of the writer queue
  1186. Status DBImpl::MakeRoomForWrite(bool force) {
  1187. mutex_.AssertHeld();
  1188. assert(!writers_.empty());
  1189. bool allow_delay = !force;
  1190. Status s;
  1191. while (true) {
  1192. if (!bg_error_.ok()) {
  1193. // Yield previous error
  1194. s = bg_error_;
  1195. break;
  1196. } else if (
  1197. allow_delay &&
  1198. versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
  1199. // We are getting close to hitting a hard limit on the number of
  1200. // L0 files. Rather than delaying a single write by several
  1201. // seconds when we hit the hard limit, start delaying each
  1202. // individual write by 1ms to reduce latency variance. Also,
  1203. // this delay hands over some CPU to the compaction thread in
  1204. // case it is sharing the same core as the writer.
  1205. mutex_.Unlock();
  1206. env_->SleepForMicroseconds(1000);
  1207. allow_delay = false; // Do not delay a single write more than once
  1208. mutex_.Lock();
  1209. } else if (!force &&
  1210. (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
  1211. // There is room in current memtable
  1212. break;
  1213. } else if (imm_ != nullptr) {
  1214. // We have filled up the current memtable, but the previous
  1215. // one is still being compacted, so we wait.
  1216. Log(options_.info_log, "Current memtable full; waiting...\n");
  1217. background_work_finished_signal_.Wait();
  1218. } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
  1219. // There are too many level-0 files.
  1220. Log(options_.info_log, "Too many L0 files; waiting...\n");
  1221. background_work_finished_signal_.Wait();
  1222. } else {
  1223. // Attempt to switch to a new memtable and trigger compaction of old
  1224. assert(versions_->PrevLogNumber() == 0);
  1225. uint64_t new_log_number = versions_->NewFileNumber();
  1226. WritableFile* lfile = nullptr;
  1227. s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
  1228. if (!s.ok()) {
  1229. // Avoid chewing through file number space in a tight loop.
  1230. versions_->ReuseFileNumber(new_log_number);
  1231. break;
  1232. }
  1233. delete log_;
  1234. delete logfile_;
  1235. logfile_ = lfile;
  1236. logfile_number_ = new_log_number;
  1237. log_ = new log::Writer(lfile);
  1238. imm_ = mem_;
  1239. has_imm_.store(true, std::memory_order_release);
  1240. mem_ = new MemTable(internal_comparator_);
  1241. mem_->Ref();
  1242. force = false; // Do not force another compaction if have room
  1243. MaybeScheduleCompaction();
  1244. }
  1245. }
  1246. return s;
  1247. }
  1248. bool DBImpl::GetProperty(const Slice& property, std::string* value) {
  1249. value->clear();
  1250. MutexLock l(&mutex_);
  1251. Slice in = property;
  1252. Slice prefix("leveldb.");
  1253. if (!in.starts_with(prefix)) return false;
  1254. in.remove_prefix(prefix.size());
  1255. if (in.starts_with("num-files-at-level")) {
  1256. in.remove_prefix(strlen("num-files-at-level"));
  1257. uint64_t level;
  1258. bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
  1259. if (!ok || level >= config::kNumLevels) {
  1260. return false;
  1261. } else {
  1262. char buf[100];
  1263. snprintf(buf, sizeof(buf), "%d",
  1264. versions_->NumLevelFiles(static_cast<int>(level)));
  1265. *value = buf;
  1266. return true;
  1267. }
  1268. } else if (in == "stats") {
  1269. char buf[200];
  1270. snprintf(buf, sizeof(buf),
  1271. " Compactions\n"
  1272. "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
  1273. "--------------------------------------------------\n"
  1274. );
  1275. value->append(buf);
  1276. for (int level = 0; level < config::kNumLevels; level++) {
  1277. int files = versions_->NumLevelFiles(level);
  1278. if (stats_[level].micros > 0 || files > 0) {
  1279. snprintf(
  1280. buf, sizeof(buf),
  1281. "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
  1282. level,
  1283. files,
  1284. versions_->NumLevelBytes(level) / 1048576.0,
  1285. stats_[level].micros / 1e6,
  1286. stats_[level].bytes_read / 1048576.0,
  1287. stats_[level].bytes_written / 1048576.0);
  1288. value->append(buf);
  1289. }
  1290. }
  1291. return true;
  1292. } else if (in == "sstables") {
  1293. *value = versions_->current()->DebugString();
  1294. return true;
  1295. } else if (in == "approximate-memory-usage") {
  1296. size_t total_usage = options_.block_cache->TotalCharge();
  1297. if (mem_) {
  1298. total_usage += mem_->ApproximateMemoryUsage();
  1299. }
  1300. if (imm_) {
  1301. total_usage += imm_->ApproximateMemoryUsage();
  1302. }
  1303. char buf[50];
  1304. snprintf(buf, sizeof(buf), "%llu",
  1305. static_cast<unsigned long long>(total_usage));
  1306. value->append(buf);
  1307. return true;
  1308. }
  1309. return false;
  1310. }
  1311. void DBImpl::GetApproximateSizes(
  1312. const Range* range, int n,
  1313. uint64_t* sizes) {
  1314. // TODO(opt): better implementation
  1315. Version* v;
  1316. {
  1317. MutexLock l(&mutex_);
  1318. versions_->current()->Ref();
  1319. v = versions_->current();
  1320. }
  1321. for (int i = 0; i < n; i++) {
  1322. // Convert user_key into a corresponding internal key.
  1323. InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
  1324. InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
  1325. uint64_t start = versions_->ApproximateOffsetOf(v, k1);
  1326. uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
  1327. sizes[i] = (limit >= start ? limit - start : 0);
  1328. }
  1329. {
  1330. MutexLock l(&mutex_);
  1331. v->Unref();
  1332. }
  1333. }
  1334. // Default implementations of convenience methods that subclasses of DB
  1335. // can call if they wish
  1336. Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  1337. WriteBatch batch;
  1338. batch.Put(key, value);
  1339. return Write(opt, &batch);
  1340. }
  1341. Status DB::Delete(const WriteOptions& opt, const Slice& key) {
  1342. WriteBatch batch;
  1343. batch.Delete(key);
  1344. return Write(opt, &batch);
  1345. }
  1346. DB::~DB() { }
  1347. Status DB::Open(const Options& options, const std::string& dbname,
  1348. DB** dbptr) {
  1349. *dbptr = nullptr;
  1350. DBImpl* impl = new DBImpl(options, dbname);
  1351. impl->mutex_.Lock();
  1352. VersionEdit edit;
  1353. // Recover handles create_if_missing, error_if_exists
  1354. bool save_manifest = false;
  1355. Status s = impl->Recover(&edit, &save_manifest);
  1356. if (s.ok() && impl->mem_ == nullptr) {
  1357. // Create new log and a corresponding memtable.
  1358. uint64_t new_log_number = impl->versions_->NewFileNumber();
  1359. WritableFile* lfile;
  1360. s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
  1361. &lfile);
  1362. if (s.ok()) {
  1363. edit.SetLogNumber(new_log_number);
  1364. impl->logfile_ = lfile;
  1365. impl->logfile_number_ = new_log_number;
  1366. impl->log_ = new log::Writer(lfile);
  1367. impl->mem_ = new MemTable(impl->internal_comparator_);
  1368. impl->mem_->Ref();
  1369. }
  1370. }
  1371. if (s.ok() && save_manifest) {
  1372. edit.SetPrevLogNumber(0); // No older logs needed after recovery.
  1373. edit.SetLogNumber(impl->logfile_number_);
  1374. s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
  1375. }
  1376. if (s.ok()) {
  1377. impl->DeleteObsoleteFiles();
  1378. impl->MaybeScheduleCompaction();
  1379. }
  1380. impl->mutex_.Unlock();
  1381. if (s.ok()) {
  1382. assert(impl->mem_ != nullptr);
  1383. *dbptr = impl;
  1384. } else {
  1385. delete impl;
  1386. }
  1387. return s;
  1388. }
  1389. Snapshot::~Snapshot() {
  1390. }
  1391. Status DestroyDB(const std::string& dbname, const Options& options) {
  1392. Env* env = options.env;
  1393. std::vector<std::string> filenames;
  1394. Status result = env->GetChildren(dbname, &filenames);
  1395. if (!result.ok()) {
  1396. // Ignore error in case directory does not exist
  1397. return Status::OK();
  1398. }
  1399. FileLock* lock;
  1400. const std::string lockname = LockFileName(dbname);
  1401. result = env->LockFile(lockname, &lock);
  1402. if (result.ok()) {
  1403. uint64_t number;
  1404. FileType type;
  1405. for (size_t i = 0; i < filenames.size(); i++) {
  1406. if (ParseFileName(filenames[i], &number, &type) &&
  1407. type != kDBLockFile) { // Lock file will be deleted at end
  1408. Status del = env->DeleteFile(dbname + "/" + filenames[i]);
  1409. if (result.ok() && !del.ok()) {
  1410. result = del;
  1411. }
  1412. }
  1413. }
  1414. env->UnlockFile(lock); // Ignore error since state is already gone
  1415. env->DeleteFile(lockname);
  1416. env->DeleteDir(dbname); // Ignore error in case dir contains other files
  1417. }
  1418. return result;
  1419. }
  1420. } // namespace leveldb