journalfile.c 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. // DBENGINE2: Helper
  4. static void update_metric_retention_and_granularity_by_uuid(
  5. struct rrdengine_instance *ctx, uuid_t *uuid,
  6. time_t first_time_s, time_t last_time_s,
  7. time_t update_every_s, time_t now_s)
  8. {
  9. if(unlikely(last_time_s > now_s)) {
  10. error_limit_static_global_var(erl, 1, 0);
  11. error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
  12. "fixing last time to now",
  13. first_time_s, last_time_s, now_s);
  14. last_time_s = now_s;
  15. }
  16. if (unlikely(first_time_s > last_time_s)) {
  17. error_limit_static_global_var(erl, 1, 0);
  18. error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
  19. "fixing first time to last time",
  20. first_time_s, last_time_s, now_s);
  21. first_time_s = last_time_s;
  22. }
  23. if (unlikely(first_time_s == 0 || last_time_s == 0)) {
  24. error_limit_static_global_var(erl, 1, 0);
  25. error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
  26. "using them as-is",
  27. first_time_s, last_time_s, now_s);
  28. }
  29. bool added = false;
  30. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
  31. if (!metric) {
  32. MRG_ENTRY entry = {
  33. .section = (Word_t) ctx,
  34. .first_time_s = first_time_s,
  35. .last_time_s = last_time_s,
  36. .latest_update_every_s = (uint32_t) update_every_s
  37. };
  38. uuid_copy(entry.uuid, *uuid);
  39. metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
  40. }
  41. if (likely(!added))
  42. mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
  43. mrg_metric_release(main_mrg, metric);
  44. }
  45. static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
  46. {
  47. worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
  48. WAL *wal = req->data;
  49. struct generic_io_descriptor *io_descr = &wal->io_descr;
  50. struct rrdengine_instance *ctx = io_descr->ctx;
  51. debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
  52. if (req->result < 0) {
  53. ctx_io_error(ctx);
  54. error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
  55. } else {
  56. debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
  57. }
  58. uv_fs_req_cleanup(req);
  59. wal_release(wal);
  60. __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
  61. worker_is_idle();
  62. }
  63. /* Careful to always call this before creating a new journal file */
  64. void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
  65. {
  66. int ret;
  67. struct generic_io_descriptor *io_descr;
  68. struct rrdengine_journalfile *journalfile = datafile->journalfile;
  69. io_descr = &wal->io_descr;
  70. io_descr->ctx = ctx;
  71. if (wal->size < wal->buf_size) {
  72. /* simulate an empty transaction to skip the rest of the block */
  73. *(uint8_t *) (wal->buf + wal->size) = STORE_PADDING;
  74. }
  75. io_descr->buf = wal->buf;
  76. io_descr->bytes = wal->buf_size;
  77. netdata_spinlock_lock(&journalfile->unsafe.spinlock);
  78. io_descr->pos = journalfile->unsafe.pos;
  79. journalfile->unsafe.pos += wal->buf_size;
  80. netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
  81. io_descr->req.data = wal;
  82. io_descr->data = journalfile;
  83. io_descr->completion = NULL;
  84. io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
  85. ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
  86. (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io);
  87. fatal_assert(-1 != ret);
  88. ctx_current_disk_space_increase(ctx, wal->buf_size);
  89. ctx_io_write_op_bytes(ctx, wal->buf_size);
  90. }
  91. void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  92. {
  93. (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
  94. datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
  95. }
  96. void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  97. {
  98. (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
  99. datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
  100. }
  101. static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
  102. struct journal_v2_header *j2_header = NULL;
  103. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  104. if(!journalfile->mmap.data) {
  105. journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
  106. if (journalfile->mmap.data == MAP_FAILED) {
  107. internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2");
  108. close(journalfile->mmap.fd);
  109. journalfile->mmap.fd = -1;
  110. journalfile->mmap.data = NULL;
  111. journalfile->mmap.size = 0;
  112. netdata_spinlock_lock(&journalfile->v2.spinlock);
  113. journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
  114. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  115. ctx_fs_error(journalfile->datafile->ctx);
  116. }
  117. else {
  118. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
  119. madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
  120. madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
  121. madvise_random(journalfile->mmap.data, journalfile->mmap.size);
  122. madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
  123. netdata_spinlock_lock(&journalfile->v2.spinlock);
  124. journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
  125. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  126. }
  127. }
  128. if(journalfile->mmap.data) {
  129. j2_header = journalfile->mmap.data;
  130. if (data_size)
  131. *data_size = journalfile->mmap.size;
  132. }
  133. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  134. return j2_header;
  135. }
  136. static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks, bool wait) {
  137. bool unmounted = false;
  138. if(!have_locks) {
  139. if(!wait) {
  140. if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
  141. return false;
  142. }
  143. else
  144. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  145. if(!wait) {
  146. if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
  147. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  148. return false;
  149. }
  150. }
  151. else
  152. netdata_spinlock_lock(&journalfile->v2.spinlock);
  153. }
  154. if(!journalfile->v2.refcount) {
  155. if(journalfile->mmap.data) {
  156. if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
  157. char path[RRDENG_PATH_MAX];
  158. journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
  159. error("DBENGINE: failed to unmap index file '%s'", path);
  160. internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
  161. ctx_fs_error(journalfile->datafile->ctx);
  162. }
  163. else {
  164. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
  165. journalfile->mmap.data = NULL;
  166. journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED;
  167. }
  168. }
  169. unmounted = true;
  170. }
  171. if(!have_locks) {
  172. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  173. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  174. }
  175. return unmounted;
  176. }
  177. void journalfile_v2_data_unmount_cleanup(time_t now_s) {
  178. // DO NOT WAIT ON ANY LOCK!!!
  179. for(size_t tier = 0; tier < (size_t)storage_tiers ;tier++) {
  180. struct rrdengine_instance *ctx = multidb_ctx[tier];
  181. if(!ctx) continue;
  182. struct rrdengine_datafile *datafile;
  183. if(uv_rwlock_tryrdlock(&ctx->datafiles.rwlock) != 0)
  184. continue;
  185. for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
  186. struct rrdengine_journalfile *journalfile = datafile->journalfile;
  187. if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
  188. continue;
  189. bool unmount = false;
  190. if (!journalfile->v2.refcount && (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED)) {
  191. // this journal has no references and it is mounted
  192. if (!journalfile->v2.not_needed_since_s)
  193. journalfile->v2.not_needed_since_s = now_s;
  194. else if (now_s - journalfile->v2.not_needed_since_s >= 120)
  195. // 2 minutes have passed since last use
  196. unmount = true;
  197. }
  198. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  199. if (unmount)
  200. journalfile_v2_mounted_data_unmount(journalfile, false, false);
  201. }
  202. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  203. }
  204. }
  205. struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
  206. netdata_spinlock_lock(&journalfile->v2.spinlock);
  207. bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
  208. bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
  209. bool do_we_need_it = false;
  210. if(has_data) {
  211. if (!wanted_first_time_s || !wanted_last_time_s ||
  212. is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s,
  213. wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) {
  214. journalfile->v2.refcount++;
  215. do_we_need_it = true;
  216. if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted)
  217. journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
  218. else
  219. journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
  220. }
  221. }
  222. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  223. if(do_we_need_it)
  224. return journalfile_v2_mounted_data_get(journalfile, data_size);
  225. return NULL;
  226. }
  227. void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
  228. netdata_spinlock_lock(&journalfile->v2.spinlock);
  229. internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
  230. internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
  231. bool unmount = false;
  232. journalfile->v2.refcount--;
  233. if(journalfile->v2.refcount == 0) {
  234. journalfile->v2.not_needed_since_s = 0;
  235. if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
  236. unmount = true;
  237. }
  238. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  239. if(unmount)
  240. journalfile_v2_mounted_data_unmount(journalfile, false, true);
  241. }
  242. bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
  243. netdata_spinlock_lock(&journalfile->v2.spinlock);
  244. bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
  245. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  246. return has_data;
  247. }
  248. size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
  249. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  250. size_t data_size = journalfile->mmap.size;
  251. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  252. return data_size;
  253. }
  254. void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
  255. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  256. netdata_spinlock_lock(&journalfile->v2.spinlock);
  257. internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
  258. internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
  259. internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile");
  260. journalfile->mmap.fd = fd;
  261. journalfile->mmap.data = journal_data;
  262. journalfile->mmap.size = journal_data_size;
  263. journalfile->v2.not_needed_since_s = now_monotonic_sec();
  264. journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
  265. struct journal_v2_header *j2_header = journalfile->mmap.data;
  266. journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
  267. journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
  268. journalfile_v2_mounted_data_unmount(journalfile, true, true);
  269. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  270. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  271. }
  272. static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
  273. bool has_references = false;
  274. do {
  275. if (has_references)
  276. sleep_usec(10 * USEC_PER_MS);
  277. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  278. netdata_spinlock_lock(&journalfile->v2.spinlock);
  279. if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
  280. if(journalfile->mmap.fd != -1)
  281. close(journalfile->mmap.fd);
  282. journalfile->mmap.fd = -1;
  283. journalfile->mmap.data = NULL;
  284. journalfile->mmap.size = 0;
  285. journalfile->v2.first_time_s = 0;
  286. journalfile->v2.last_time_s = 0;
  287. journalfile->v2.flags = 0;
  288. }
  289. else {
  290. has_references = true;
  291. internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
  292. }
  293. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  294. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  295. } while(has_references);
  296. }
  297. struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile)
  298. {
  299. struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
  300. journalfile->datafile = datafile;
  301. netdata_spinlock_init(&journalfile->mmap.spinlock);
  302. netdata_spinlock_init(&journalfile->v2.spinlock);
  303. netdata_spinlock_init(&journalfile->unsafe.spinlock);
  304. journalfile->mmap.fd = -1;
  305. datafile->journalfile = journalfile;
  306. return journalfile;
  307. }
  308. static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
  309. {
  310. int ret;
  311. char path[RRDENG_PATH_MAX];
  312. uv_fs_t req;
  313. ret = uv_fs_close(NULL, &req, file, NULL);
  314. if (ret < 0) {
  315. journalfile_v1_generate_path(datafile, path, sizeof(path));
  316. error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
  317. ctx_fs_error(datafile->ctx);
  318. }
  319. uv_fs_req_cleanup(&req);
  320. return ret;
  321. }
  322. int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  323. {
  324. if(journalfile_v2_data_available(journalfile)) {
  325. journalfile_v2_data_unmap_permanently(journalfile);
  326. return 0;
  327. }
  328. return close_uv_file(datafile, journalfile->file);
  329. }
  330. int journalfile_unlink(struct rrdengine_journalfile *journalfile)
  331. {
  332. struct rrdengine_datafile *datafile = journalfile->datafile;
  333. struct rrdengine_instance *ctx = datafile->ctx;
  334. uv_fs_t req;
  335. int ret;
  336. char path[RRDENG_PATH_MAX];
  337. journalfile_v1_generate_path(datafile, path, sizeof(path));
  338. ret = uv_fs_unlink(NULL, &req, path, NULL);
  339. if (ret < 0) {
  340. error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  341. ctx_fs_error(ctx);
  342. }
  343. uv_fs_req_cleanup(&req);
  344. __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED);
  345. return ret;
  346. }
  347. int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  348. {
  349. struct rrdengine_instance *ctx = datafile->ctx;
  350. uv_fs_t req;
  351. int ret;
  352. char path[RRDENG_PATH_MAX];
  353. char path_v2[RRDENG_PATH_MAX];
  354. journalfile_v1_generate_path(datafile, path, sizeof(path));
  355. journalfile_v2_generate_path(datafile, path_v2, sizeof(path));
  356. if (journalfile->file) {
  357. ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
  358. if (ret < 0) {
  359. error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
  360. ctx_fs_error(ctx);
  361. }
  362. uv_fs_req_cleanup(&req);
  363. (void) close_uv_file(datafile, journalfile->file);
  364. }
  365. // This is the new journal v2 index file
  366. ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
  367. if (ret < 0) {
  368. error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  369. ctx_fs_error(ctx);
  370. }
  371. uv_fs_req_cleanup(&req);
  372. ret = uv_fs_unlink(NULL, &req, path, NULL);
  373. if (ret < 0) {
  374. error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  375. ctx_fs_error(ctx);
  376. }
  377. uv_fs_req_cleanup(&req);
  378. __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED);
  379. if(journalfile_v2_data_available(journalfile))
  380. journalfile_v2_data_unmap_permanently(journalfile);
  381. return ret;
  382. }
  383. int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  384. {
  385. struct rrdengine_instance *ctx = datafile->ctx;
  386. uv_fs_t req;
  387. uv_file file;
  388. int ret, fd;
  389. struct rrdeng_jf_sb *superblock;
  390. uv_buf_t iov;
  391. char path[RRDENG_PATH_MAX];
  392. journalfile_v1_generate_path(datafile, path, sizeof(path));
  393. fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io);
  394. if (fd < 0) {
  395. ctx_fs_error(ctx);
  396. return fd;
  397. }
  398. journalfile->file = file;
  399. __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED);
  400. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  401. if (unlikely(ret)) {
  402. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  403. }
  404. memset(superblock, 0, sizeof(*superblock));
  405. (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
  406. (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
  407. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  408. ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
  409. if (ret < 0) {
  410. fatal_assert(req.result < 0);
  411. error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
  412. ctx_io_error(ctx);
  413. }
  414. uv_fs_req_cleanup(&req);
  415. posix_memfree(superblock);
  416. if (ret < 0) {
  417. journalfile_destroy_unsafe(journalfile, datafile);
  418. return ret;
  419. }
  420. journalfile->unsafe.pos = sizeof(*superblock);
  421. ctx_io_write_op_bytes(ctx, sizeof(*superblock));
  422. return 0;
  423. }
  424. static int journalfile_check_superblock(uv_file file)
  425. {
  426. int ret;
  427. struct rrdeng_jf_sb *superblock;
  428. uv_buf_t iov;
  429. uv_fs_t req;
  430. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  431. if (unlikely(ret)) {
  432. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  433. }
  434. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  435. ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
  436. if (ret < 0) {
  437. error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
  438. uv_fs_req_cleanup(&req);
  439. goto error;
  440. }
  441. fatal_assert(req.result >= 0);
  442. uv_fs_req_cleanup(&req);
  443. if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
  444. strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
  445. error("DBENGINE: File has invalid superblock.");
  446. ret = UV_EINVAL;
  447. } else {
  448. ret = 0;
  449. }
  450. error:
  451. posix_memfree(superblock);
  452. return ret;
  453. }
  454. static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
  455. {
  456. static BITMAP256 page_error_map;
  457. unsigned i, count, payload_length, descr_size;
  458. struct rrdeng_jf_store_data *jf_metric_data;
  459. jf_metric_data = buf;
  460. count = jf_metric_data->number_of_pages;
  461. descr_size = sizeof(*jf_metric_data->descr) * count;
  462. payload_length = sizeof(*jf_metric_data) + descr_size;
  463. if (payload_length > max_size) {
  464. error("DBENGINE: corrupted transaction payload.");
  465. return;
  466. }
  467. time_t now_s = max_acceptable_collected_time();
  468. for (i = 0; i < count ; ++i) {
  469. uuid_t *temp_id;
  470. uint8_t page_type = jf_metric_data->descr[i].type;
  471. if (page_type > PAGE_TYPE_MAX) {
  472. if (!bitmap256_get_bit(&page_error_map, page_type)) {
  473. error("DBENGINE: unknown page type %d encountered.", page_type);
  474. bitmap256_set_bit(&page_error_map, page_type, 1);
  475. }
  476. continue;
  477. }
  478. temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
  479. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, temp_id, (Word_t) ctx);
  480. struct rrdeng_extent_page_descr *descr = &jf_metric_data->descr[i];
  481. VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
  482. descr, now_s,
  483. (metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
  484. false);
  485. if(!vd.is_valid) {
  486. if(metric)
  487. mrg_metric_release(main_mrg, metric);
  488. continue;
  489. }
  490. bool update_metric_time = true;
  491. if (!metric) {
  492. MRG_ENTRY entry = {
  493. .section = (Word_t)ctx,
  494. .first_time_s = vd.start_time_s,
  495. .last_time_s = vd.end_time_s,
  496. .latest_update_every_s = (uint32_t) vd.update_every_s,
  497. };
  498. uuid_copy(entry.uuid, *temp_id);
  499. bool added;
  500. metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
  501. if(added)
  502. update_metric_time = false;
  503. }
  504. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  505. if (update_metric_time)
  506. mrg_metric_expand_retention(main_mrg, metric, vd.start_time_s, vd.end_time_s, vd.update_every_s);
  507. pgc_open_add_hot_page(
  508. (Word_t)ctx, metric_id, vd.start_time_s, vd.end_time_s, vd.update_every_s,
  509. journalfile->datafile,
  510. jf_metric_data->extent_offset, jf_metric_data->extent_size, jf_metric_data->descr[i].page_length);
  511. mrg_metric_release(main_mrg, metric);
  512. }
  513. }
  514. /*
  515. * Replays transaction by interpreting up to max_size bytes from buf.
  516. * Sets id to the current transaction id or to 0 if unknown.
  517. * Returns size of transaction record or 0 for unknown size.
  518. */
  519. static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
  520. void *buf, uint64_t *id, unsigned max_size)
  521. {
  522. unsigned payload_length, size_bytes;
  523. int ret;
  524. /* persistent structures */
  525. struct rrdeng_jf_transaction_header *jf_header;
  526. struct rrdeng_jf_transaction_trailer *jf_trailer;
  527. uLong crc;
  528. *id = 0;
  529. jf_header = buf;
  530. if (STORE_PADDING == jf_header->type) {
  531. debug(D_RRDENGINE, "Skipping padding.");
  532. return 0;
  533. }
  534. if (sizeof(*jf_header) > max_size) {
  535. error("DBENGINE: corrupted transaction record, skipping.");
  536. return 0;
  537. }
  538. *id = jf_header->id;
  539. payload_length = jf_header->payload_length;
  540. size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
  541. if (size_bytes > max_size) {
  542. error("DBENGINE: corrupted transaction record, skipping.");
  543. return 0;
  544. }
  545. jf_trailer = buf + sizeof(*jf_header) + payload_length;
  546. crc = crc32(0L, Z_NULL, 0);
  547. crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
  548. ret = crc32cmp(jf_trailer->checksum, crc);
  549. debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
  550. if (unlikely(ret)) {
  551. error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
  552. return size_bytes;
  553. }
  554. switch (jf_header->type) {
  555. case STORE_DATA:
  556. debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
  557. journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
  558. break;
  559. default:
  560. error("DBENGINE: unknown transaction type, skipping record.");
  561. break;
  562. }
  563. return size_bytes;
  564. }
  565. #define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
  566. /*
  567. * Iterates journal file transactions and populates the page cache.
  568. * Page cache must already be initialized.
  569. * Returns the maximum transaction id it discovered.
  570. */
  571. static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
  572. {
  573. uv_file file;
  574. uint64_t file_size;
  575. int ret;
  576. uint64_t pos, pos_i, max_id, id;
  577. unsigned size_bytes;
  578. void *buf;
  579. uv_buf_t iov;
  580. uv_fs_t req;
  581. file = journalfile->file;
  582. file_size = journalfile->unsafe.pos;
  583. max_id = 1;
  584. ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
  585. if (unlikely(ret))
  586. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  587. for (pos = sizeof(struct rrdeng_jf_sb); pos < file_size; pos += READAHEAD_BYTES) {
  588. size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
  589. iov = uv_buf_init(buf, size_bytes);
  590. ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
  591. if (ret < 0) {
  592. error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
  593. uv_fs_req_cleanup(&req);
  594. goto skip_file;
  595. }
  596. fatal_assert(req.result >= 0);
  597. uv_fs_req_cleanup(&req);
  598. ctx_io_read_op_bytes(ctx, size_bytes);
  599. for (pos_i = 0; pos_i < size_bytes;) {
  600. unsigned max_size;
  601. max_size = pos + size_bytes - pos_i;
  602. ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
  603. if (!ret) /* TODO: support transactions bigger than 4K */
  604. /* unknown transaction size, move on to the next block */
  605. pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
  606. else
  607. pos_i += ret;
  608. max_id = MAX(max_id, id);
  609. }
  610. }
  611. skip_file:
  612. posix_memfree(buf);
  613. return max_id;
  614. }
  615. // Checks that the extent list checksum is valid
  616. static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
  617. {
  618. UNUSED(file_size);
  619. uLong crc;
  620. struct journal_v2_header *j2_header = (void *) data_start;
  621. struct journal_v2_block_trailer *journal_v2_trailer;
  622. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
  623. crc = crc32(0L, Z_NULL, 0);
  624. crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
  625. if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
  626. error("DBENGINE: extent list CRC32 check: FAILED");
  627. return 1;
  628. }
  629. return 0;
  630. }
  631. // Checks that the metric list (UUIDs) checksum is valid
  632. static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
  633. {
  634. UNUSED(file_size);
  635. uLong crc;
  636. struct journal_v2_header *j2_header = (void *) data_start;
  637. struct journal_v2_block_trailer *journal_v2_trailer;
  638. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
  639. crc = crc32(0L, Z_NULL, 0);
  640. crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
  641. if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
  642. error("DBENGINE: metric list CRC32 check: FAILED");
  643. return 1;
  644. }
  645. return 0;
  646. }
  647. //
  648. // Return
  649. // 0 Ok
  650. // 1 Invalid
  651. // 2 Force rebuild
  652. // 3 skip
  653. static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size, size_t journal_v1_file_size)
  654. {
  655. int rc;
  656. uLong crc;
  657. struct journal_v2_header *j2_header = (void *) data_start;
  658. struct journal_v2_block_trailer *journal_v2_trailer;
  659. if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
  660. return 2;
  661. if (j2_header->magic == JOURVAL_V2_SKIP_MAGIC)
  662. return 3;
  663. // Magic failure
  664. if (j2_header->magic != JOURVAL_V2_MAGIC)
  665. return 1;
  666. if (j2_header->journal_v2_file_size != journal_v2_file_size)
  667. return 1;
  668. if (journal_v1_file_size && j2_header->journal_v1_file_size != journal_v1_file_size)
  669. return 1;
  670. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + journal_v2_file_size - sizeof(*journal_v2_trailer));
  671. crc = crc32(0L, Z_NULL, 0);
  672. crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
  673. rc = crc32cmp(journal_v2_trailer->checksum, crc);
  674. if (unlikely(rc)) {
  675. error("DBENGINE: file CRC32 check: FAILED");
  676. return 1;
  677. }
  678. rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
  679. if (rc) return 1;
  680. rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
  681. if (rc) return 1;
  682. if (!db_engine_journal_check)
  683. return 0;
  684. // Verify complete UUID chain
  685. struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
  686. unsigned verified = 0;
  687. unsigned entries;
  688. unsigned total_pages = 0;
  689. info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
  690. for (entries = 0; entries < j2_header->metric_count; entries++) {
  691. char uuid_str[UUID_STR_LEN];
  692. uuid_unparse_lower(metric->uuid, uuid_str);
  693. struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
  694. struct journal_page_header local_metric_list_header = *metric_list_header;
  695. local_metric_list_header.crc = JOURVAL_V2_MAGIC;
  696. crc = crc32(0L, Z_NULL, 0);
  697. crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header));
  698. rc = crc32cmp(metric_list_header->checksum, crc);
  699. if (!rc) {
  700. struct journal_v2_block_trailer *journal_trailer =
  701. (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list));
  702. crc = crc32(0L, Z_NULL, 0);
  703. crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list));
  704. rc = crc32cmp(journal_trailer->checksum, crc);
  705. internal_error(rc, "DBENGINE: index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
  706. crc, metric_list_header->crc);
  707. if (!rc) {
  708. total_pages += metric_list_header->entries;
  709. verified++;
  710. }
  711. }
  712. metric++;
  713. if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) {
  714. info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
  715. return 1;
  716. }
  717. }
  718. if (entries != verified) {
  719. info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
  720. return 1;
  721. }
  722. info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
  723. return 0;
  724. }
  725. void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) {
  726. usec_t started_ut = now_monotonic_usec();
  727. size_t data_size = 0;
  728. struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, &data_size, 0, 0);
  729. if(!j2_header)
  730. return;
  731. uint8_t *data_start = (uint8_t *)j2_header;
  732. uint32_t entries = j2_header->metric_count;
  733. struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
  734. time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
  735. time_t now_s = max_acceptable_collected_time();
  736. for (size_t i=0; i < entries; i++) {
  737. time_t start_time_s = header_start_time_s + metric->delta_start_s;
  738. time_t end_time_s = header_start_time_s + metric->delta_end_s;
  739. update_metric_retention_and_granularity_by_uuid(
  740. ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
  741. metric++;
  742. }
  743. journalfile_v2_data_release(journalfile);
  744. usec_t ended_ut = now_monotonic_usec();
  745. info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
  746. , ctx->config.tier, journalfile->datafile->fileno
  747. , (double)data_size / 1024 / 1024
  748. , (double)entries / 1000
  749. , ((double)(ended_ut - started_ut) / USEC_PER_MS)
  750. );
  751. }
  752. int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  753. {
  754. int ret, fd;
  755. char path_v1[RRDENG_PATH_MAX];
  756. char path_v2[RRDENG_PATH_MAX];
  757. struct stat statbuf;
  758. size_t journal_v1_file_size = 0;
  759. size_t journal_v2_file_size;
  760. journalfile_v1_generate_path(datafile, path_v1, sizeof(path_v1));
  761. ret = stat(path_v1, &statbuf);
  762. if (!ret)
  763. journal_v1_file_size = (uint32_t)statbuf.st_size;
  764. journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2));
  765. fd = open(path_v2, O_RDONLY);
  766. if (fd < 0) {
  767. if (errno == ENOENT)
  768. return 1;
  769. ctx_fs_error(ctx);
  770. error("DBENGINE: failed to open '%s'", path_v2);
  771. return 1;
  772. }
  773. ret = fstat(fd, &statbuf);
  774. if (ret) {
  775. error("DBENGINE: failed to get file information for '%s'", path_v2);
  776. close(fd);
  777. return 1;
  778. }
  779. journal_v2_file_size = (size_t)statbuf.st_size;
  780. if (journal_v2_file_size < sizeof(struct journal_v2_header)) {
  781. error_report("Invalid file %s. Not the expected size", path_v2);
  782. close(fd);
  783. return 1;
  784. }
  785. usec_t mmap_start_ut = now_monotonic_usec();
  786. uint8_t *data_start = mmap(NULL, journal_v2_file_size, PROT_READ, MAP_SHARED, fd, 0);
  787. if (data_start == MAP_FAILED) {
  788. close(fd);
  789. return 1;
  790. }
  791. info("DBENGINE: checking integrity of '%s'", path_v2);
  792. usec_t validation_start_ut = now_monotonic_usec();
  793. int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
  794. if (unlikely(rc)) {
  795. if (rc == 2)
  796. error_report("File %s needs to be rebuilt", path_v2);
  797. else if (rc == 3)
  798. error_report("File %s will be skipped", path_v2);
  799. else
  800. error_report("File %s is invalid and it will be rebuilt", path_v2);
  801. if (unlikely(munmap(data_start, journal_v2_file_size)))
  802. error("DBENGINE: failed to unmap '%s'", path_v2);
  803. close(fd);
  804. return rc;
  805. }
  806. struct journal_v2_header *j2_header = (void *) data_start;
  807. uint32_t entries = j2_header->metric_count;
  808. if (unlikely(!entries)) {
  809. if (unlikely(munmap(data_start, journal_v2_file_size)))
  810. error("DBENGINE: failed to unmap '%s'", path_v2);
  811. close(fd);
  812. return 1;
  813. }
  814. usec_t finished_ut = now_monotonic_usec();
  815. info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
  816. "mmap: %0.2f ms, validate: %0.2f ms"
  817. , path_v2
  818. , (double)journal_v2_file_size / 1024 / 1024
  819. , (double)entries / 1000
  820. , ((double)(validation_start_ut - mmap_start_ut) / USEC_PER_MS)
  821. , ((double)(finished_ut - validation_start_ut) / USEC_PER_MS)
  822. );
  823. // Initialize the journal file to be able to access the data
  824. journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
  825. ctx_current_disk_space_increase(ctx, journal_v2_file_size);
  826. // File is OK load it
  827. return 0;
  828. }
  829. struct journal_metric_list_to_sort {
  830. struct jv2_metrics_info *metric_info;
  831. };
  832. static int journalfile_metric_compare (const void *item1, const void *item2)
  833. {
  834. const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
  835. const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
  836. return memcmp(metric1->uuid, metric2->uuid, sizeof(uuid_t));
  837. }
  838. // Write list of extents for the journalfile
  839. void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
  840. {
  841. Pvoid_t *PValue;
  842. struct journal_extent_list *j2_extent_base = (void *) data;
  843. struct jv2_extents_info *ext_info;
  844. bool first = true;
  845. Word_t pos = 0;
  846. size_t count = 0;
  847. while ((PValue = JudyLFirstThenNext(JudyL_extents_pos, &pos, &first))) {
  848. ext_info = *PValue;
  849. size_t index = ext_info->index;
  850. j2_extent_base[index].file_index = 0;
  851. j2_extent_base[index].datafile_offset = ext_info->pos;
  852. j2_extent_base[index].datafile_size = ext_info->bytes;
  853. j2_extent_base[index].pages = ext_info->number_of_pages;
  854. count++;
  855. }
  856. return j2_extent_base + count;
  857. }
  858. static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
  859. {
  860. if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->journal_v2_file_size - sizeof(struct journal_v2_block_trailer)))
  861. return 1;
  862. return 0;
  863. }
  864. void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset)
  865. {
  866. struct journal_metric_list *metric = (void *) data;
  867. if (journalfile_verify_space(j2_header, data, sizeof(*metric)))
  868. return NULL;
  869. uuid_copy(metric->uuid, *metric_info->uuid);
  870. metric->entries = metric_info->number_of_pages;
  871. metric->page_offset = pages_offset;
  872. metric->delta_start_s = (uint32_t)(metric_info->first_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
  873. metric->delta_end_s = (uint32_t)(metric_info->last_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
  874. metric->update_every_s = 0;
  875. return ++metric;
  876. }
  877. void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset)
  878. {
  879. struct journal_page_header *data_page_header = (void *) data;
  880. uLong crc;
  881. uuid_copy(data_page_header->uuid, *metric_info->uuid);
  882. data_page_header->entries = metric_info->number_of_pages;
  883. data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory
  884. data_page_header->crc = JOURVAL_V2_MAGIC;
  885. crc = crc32(0L, Z_NULL, 0);
  886. crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header));
  887. crc32set(data_page_header->checksum, crc);
  888. return ++data_page_header;
  889. }
  890. void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
  891. {
  892. struct journal_page_header *data_page_header = (void *) page_header;
  893. struct journal_v2_block_trailer *journal_trailer = (void *) data;
  894. uLong crc;
  895. crc = crc32(0L, Z_NULL, 0);
  896. crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list));
  897. crc32set(journal_trailer->checksum, crc);
  898. return ++journal_trailer;
  899. }
  900. void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info)
  901. {
  902. struct journal_page_list *data_page = data;
  903. if (journalfile_verify_space(j2_header, data, sizeof(*data_page)))
  904. return NULL;
  905. struct extent_io_data *ei = page_info->custom_data;
  906. data_page->delta_start_s = (uint32_t) (page_info->start_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
  907. data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
  908. data_page->extent_index = page_info->extent_index;
  909. data_page->update_every_s = (uint32_t) page_info->update_every_s;
  910. data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
  911. data_page->type = 0;
  912. return ++data_page;
  913. }
  914. // Must be recorded in metric_info->entries
  915. static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info,
  916. struct journal_metric_list *current_metric)
  917. {
  918. Pvoid_t *PValue;
  919. struct journal_page_list *data_page = (void *)data;
  920. // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s
  921. // that belong to this journal file
  922. Pvoid_t JudyL_array = metric_info->JudyL_pages_by_start_time;
  923. Word_t index_time = 0;
  924. bool first = true;
  925. struct jv2_page_info *page_info;
  926. uint32_t update_every_s = 0;
  927. while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) {
  928. page_info = *PValue;
  929. // Write one descriptor and return the next data page location
  930. data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
  931. update_every_s = (uint32_t) page_info->update_every_s;
  932. if (NULL == data_page)
  933. break;
  934. }
  935. current_metric->update_every_s = update_every_s;
  936. return data_page;
  937. }
  938. // Migrate the journalfile pointed by datafile
  939. // activate : make the new file active immediately
  940. // journafile data will be set and descriptors (if deleted) will be repopulated as needed
  941. // startup : if the migration is done during agent startup
  942. // this will allow us to optimize certain things
  943. void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
  944. Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
  945. size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data)
  946. {
  947. char path[RRDENG_PATH_MAX];
  948. Pvoid_t *PValue;
  949. struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
  950. struct rrdengine_journalfile *journalfile = (struct rrdengine_journalfile *) user_data;
  951. struct rrdengine_datafile *datafile = journalfile->datafile;
  952. time_t min_time_s = LONG_MAX;
  953. time_t max_time_s = 0;
  954. struct jv2_metrics_info *metric_info;
  955. journalfile_v2_generate_path(datafile, path, sizeof(path));
  956. info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
  957. path,
  958. number_of_extents,
  959. number_of_metrics,
  960. number_of_pages);
  961. #ifdef NETDATA_INTERNAL_CHECKS
  962. usec_t start_loading = now_monotonic_usec();
  963. #endif
  964. size_t total_file_size = 0;
  965. total_file_size += (sizeof(struct journal_v2_header) + JOURNAL_V2_HEADER_PADDING_SZ);
  966. // Extents will start here
  967. uint32_t extent_offset = total_file_size;
  968. total_file_size += (number_of_extents * sizeof(struct journal_extent_list));
  969. uint32_t extent_offset_trailer = total_file_size;
  970. total_file_size += sizeof(struct journal_v2_block_trailer);
  971. // UUID list will start here
  972. uint32_t metrics_offset = total_file_size;
  973. total_file_size += (number_of_metrics * sizeof(struct journal_metric_list));
  974. // UUID list trailer
  975. uint32_t metric_offset_trailer = total_file_size;
  976. total_file_size += sizeof(struct journal_v2_block_trailer);
  977. // descr @ time will start here
  978. uint32_t pages_offset = total_file_size;
  979. total_file_size += (number_of_pages * (sizeof(struct journal_page_list) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer)));
  980. // File trailer
  981. uint32_t trailer_offset = total_file_size;
  982. total_file_size += sizeof(struct journal_v2_block_trailer);
  983. int fd_v2;
  984. uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2);
  985. uint8_t *data = data_start;
  986. memset(data_start, 0, extent_offset);
  987. // Write header
  988. struct journal_v2_header j2_header;
  989. memset(&j2_header, 0, sizeof(j2_header));
  990. j2_header.magic = JOURVAL_V2_MAGIC;
  991. j2_header.start_time_ut = 0;
  992. j2_header.end_time_ut = 0;
  993. j2_header.extent_count = number_of_extents;
  994. j2_header.extent_offset = extent_offset;
  995. j2_header.metric_count = number_of_metrics;
  996. j2_header.metric_offset = metrics_offset;
  997. j2_header.page_count = number_of_pages;
  998. j2_header.page_offset = pages_offset;
  999. j2_header.extent_trailer_offset = extent_offset_trailer;
  1000. j2_header.metric_trailer_offset = metric_offset_trailer;
  1001. j2_header.journal_v2_file_size = total_file_size;
  1002. j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile);
  1003. j2_header.data = data_start; // Used during migration
  1004. struct journal_v2_block_trailer *journal_v2_trailer;
  1005. data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
  1006. internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1007. fatal_assert(data == data_start + extent_offset_trailer);
  1008. // Calculate CRC for extents
  1009. journal_v2_trailer = (struct journal_v2_block_trailer *) (data_start + extent_offset_trailer);
  1010. uLong crc;
  1011. crc = crc32(0L, Z_NULL, 0);
  1012. crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list));
  1013. crc32set(journal_v2_trailer->checksum, crc);
  1014. internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1015. // Skip the trailer, point to the metrics off
  1016. data += sizeof(struct journal_v2_block_trailer);
  1017. // Sanity check -- we must be at the metrics_offset
  1018. fatal_assert(data == data_start + metrics_offset);
  1019. // Allocate array to sort UUIDs and keep them sorted in the journal because we want to do binary search when we do lookups
  1020. struct journal_metric_list_to_sort *uuid_list = mallocz(number_of_metrics * sizeof(struct journal_metric_list_to_sort));
  1021. Word_t Index = 0;
  1022. size_t count = 0;
  1023. bool first_then_next = true;
  1024. while ((PValue = JudyLFirstThenNext(JudyL_metrics, &Index, &first_then_next))) {
  1025. metric_info = *PValue;
  1026. fatal_assert(count < number_of_metrics);
  1027. uuid_list[count++].metric_info = metric_info;
  1028. min_time_s = MIN(min_time_s, metric_info->first_time_s);
  1029. max_time_s = MAX(max_time_s, metric_info->last_time_s);
  1030. }
  1031. // Store in the header
  1032. j2_header.start_time_ut = min_time_s * USEC_PER_SEC;
  1033. j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
  1034. qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
  1035. internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1036. uint32_t resize_file_to = total_file_size;
  1037. for (Index = 0; Index < number_of_metrics; Index++) {
  1038. metric_info = uuid_list[Index].metric_info;
  1039. // Calculate current UUID offset from start of file. We will store this in the data page header
  1040. uint32_t uuid_offset = data - data_start;
  1041. struct journal_metric_list *current_metric = (void *) data;
  1042. // Write the UUID we are processing
  1043. data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
  1044. if (unlikely(!data))
  1045. break;
  1046. // Next we will write
  1047. // Header
  1048. // Detailed entries (descr @ time)
  1049. // Trailer (checksum)
  1050. // Keep the page_list_header, to be used for migration when where agent is running
  1051. metric_info->page_list_header = pages_offset;
  1052. // Write page header
  1053. void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info,
  1054. uuid_offset);
  1055. // Start writing descr @ time
  1056. void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info, current_metric);
  1057. if (unlikely(!page_trailer))
  1058. break;
  1059. // Trailer (checksum)
  1060. uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer,
  1061. data_start + pages_offset);
  1062. // Calculate start of the pages start for next descriptor
  1063. pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer));
  1064. // Verify we are at the right location
  1065. if (pages_offset != (uint32_t)(next_page_address - data_start)) {
  1066. // make sure checks fail so that we abort
  1067. data = data_start;
  1068. break;
  1069. }
  1070. }
  1071. if (data == data_start + metric_offset_trailer) {
  1072. internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1073. // Calculate CRC for metrics
  1074. journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer);
  1075. crc = crc32(0L, Z_NULL, 0);
  1076. crc =
  1077. crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list));
  1078. crc32set(journal_v2_trailer->checksum, crc);
  1079. internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1080. // Prepare to write checksum for the file
  1081. j2_header.data = NULL;
  1082. journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + trailer_offset);
  1083. crc = crc32(0L, Z_NULL, 0);
  1084. crc = crc32(crc, (void *)&j2_header, sizeof(j2_header));
  1085. crc32set(journal_v2_trailer->checksum, crc);
  1086. // Write header to the file
  1087. memcpy(data_start, &j2_header, sizeof(j2_header));
  1088. internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1089. info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
  1090. // msync(data_start, total_file_size, MS_SYNC);
  1091. journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
  1092. internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1093. ctx_current_disk_space_increase(ctx, total_file_size);
  1094. freez(uuid_list);
  1095. return;
  1096. }
  1097. else {
  1098. info("DBENGINE: failed to build index '%s', file will be skipped", path);
  1099. j2_header.data = NULL;
  1100. j2_header.magic = JOURVAL_V2_SKIP_MAGIC;
  1101. memcpy(data_start, &j2_header, sizeof(j2_header));
  1102. resize_file_to = sizeof(j2_header);
  1103. }
  1104. netdata_munmap(data_start, total_file_size);
  1105. freez(uuid_list);
  1106. if (likely(resize_file_to == total_file_size))
  1107. return;
  1108. int ret = truncate(path, (long) resize_file_to);
  1109. if (ret < 0) {
  1110. ctx_current_disk_space_increase(ctx, total_file_size);
  1111. ctx_fs_error(ctx);
  1112. error("DBENGINE: failed to resize file '%s'", path);
  1113. }
  1114. else
  1115. ctx_current_disk_space_increase(ctx, resize_file_to);
  1116. }
  1117. int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
  1118. struct rrdengine_datafile *datafile)
  1119. {
  1120. uv_fs_t req;
  1121. uv_file file;
  1122. int ret, fd, error;
  1123. uint64_t file_size, max_id;
  1124. char path[RRDENG_PATH_MAX];
  1125. bool loaded_v2 = false;
  1126. // Do not try to load jv2 of the latest file
  1127. if (datafile->fileno != ctx_last_fileno_get(ctx))
  1128. loaded_v2 = journalfile_v2_load(ctx, journalfile, datafile) == 0;
  1129. journalfile_v1_generate_path(datafile, path, sizeof(path));
  1130. fd = open_file_for_io(path, O_RDWR, &file, use_direct_io);
  1131. if (fd < 0) {
  1132. ctx_fs_error(ctx);
  1133. if(loaded_v2)
  1134. return 0;
  1135. return fd;
  1136. }
  1137. ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
  1138. if (ret) {
  1139. error = ret;
  1140. goto cleanup;
  1141. }
  1142. if(loaded_v2) {
  1143. journalfile->unsafe.pos = file_size;
  1144. error = 0;
  1145. goto cleanup;
  1146. }
  1147. file_size = ALIGN_BYTES_FLOOR(file_size);
  1148. journalfile->unsafe.pos = file_size;
  1149. journalfile->file = file;
  1150. ret = journalfile_check_superblock(file);
  1151. if (ret) {
  1152. info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
  1153. error = ret;
  1154. goto cleanup;
  1155. }
  1156. ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
  1157. info("DBENGINE: loading journal file '%s'", path);
  1158. max_id = journalfile_iterate_transactions(ctx, journalfile);
  1159. __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
  1160. info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
  1161. bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
  1162. if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
  1163. ctx->loading.create_new_datafile_pair = false;
  1164. return 0;
  1165. }
  1166. pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
  1167. journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
  1168. if (is_last_file)
  1169. ctx->loading.create_new_datafile_pair = true;
  1170. return 0;
  1171. cleanup:
  1172. ret = uv_fs_close(NULL, &req, file, NULL);
  1173. if (ret < 0) {
  1174. error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
  1175. ctx_fs_error(ctx);
  1176. }
  1177. uv_fs_req_cleanup(&req);
  1178. return error;
  1179. }