journalfile.c 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. // DBENGINE2: Helper
  4. static void update_metric_retention_and_granularity_by_uuid(
  5. struct rrdengine_instance *ctx, uuid_t *uuid,
  6. time_t first_time_s, time_t last_time_s,
  7. time_t update_every_s, time_t now_s)
  8. {
  9. if(unlikely(last_time_s > now_s)) {
  10. error_limit_static_global_var(erl, 1, 0);
  11. error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
  12. "fixing last time to now",
  13. first_time_s, last_time_s, now_s);
  14. last_time_s = now_s;
  15. }
  16. if (unlikely(first_time_s > last_time_s)) {
  17. error_limit_static_global_var(erl, 1, 0);
  18. error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
  19. "fixing first time to last time",
  20. first_time_s, last_time_s, now_s);
  21. first_time_s = last_time_s;
  22. }
  23. if (unlikely(first_time_s == 0 || last_time_s == 0)) {
  24. error_limit_static_global_var(erl, 1, 0);
  25. error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
  26. "using them as-is",
  27. first_time_s, last_time_s, now_s);
  28. }
  29. bool added = false;
  30. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
  31. if (!metric) {
  32. MRG_ENTRY entry = {
  33. .section = (Word_t) ctx,
  34. .first_time_s = first_time_s,
  35. .last_time_s = last_time_s,
  36. .latest_update_every_s = update_every_s
  37. };
  38. uuid_copy(entry.uuid, *uuid);
  39. metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
  40. }
  41. if (likely(!added))
  42. mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
  43. mrg_metric_release(main_mrg, metric);
  44. }
  45. static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
  46. {
  47. worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
  48. WAL *wal = req->data;
  49. struct generic_io_descriptor *io_descr = &wal->io_descr;
  50. struct rrdengine_instance *ctx = io_descr->ctx;
  51. debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
  52. if (req->result < 0) {
  53. ctx_io_error(ctx);
  54. error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
  55. } else {
  56. debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
  57. }
  58. uv_fs_req_cleanup(req);
  59. wal_release(wal);
  60. __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
  61. worker_is_idle();
  62. }
  63. /* Careful to always call this before creating a new journal file */
  64. void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
  65. {
  66. int ret;
  67. struct generic_io_descriptor *io_descr;
  68. struct rrdengine_journalfile *journalfile = datafile->journalfile;
  69. io_descr = &wal->io_descr;
  70. io_descr->ctx = ctx;
  71. if (wal->size < wal->buf_size) {
  72. /* simulate an empty transaction to skip the rest of the block */
  73. *(uint8_t *) (wal->buf + wal->size) = STORE_PADDING;
  74. }
  75. io_descr->buf = wal->buf;
  76. io_descr->bytes = wal->buf_size;
  77. netdata_spinlock_lock(&journalfile->unsafe.spinlock);
  78. io_descr->pos = journalfile->unsafe.pos;
  79. journalfile->unsafe.pos += wal->buf_size;
  80. netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
  81. io_descr->req.data = wal;
  82. io_descr->data = journalfile;
  83. io_descr->completion = NULL;
  84. io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
  85. ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
  86. (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io);
  87. fatal_assert(-1 != ret);
  88. ctx_current_disk_space_increase(ctx, wal->buf_size);
  89. ctx_io_write_op_bytes(ctx, wal->buf_size);
  90. }
  91. void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  92. {
  93. (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
  94. datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
  95. }
  96. void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  97. {
  98. (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
  99. datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
  100. }
  101. static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
  102. struct journal_v2_header *j2_header = NULL;
  103. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  104. if(!journalfile->mmap.data) {
  105. journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
  106. if (journalfile->mmap.data == MAP_FAILED) {
  107. internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2");
  108. close(journalfile->mmap.fd);
  109. journalfile->mmap.fd = -1;
  110. journalfile->mmap.data = NULL;
  111. journalfile->mmap.size = 0;
  112. netdata_spinlock_lock(&journalfile->v2.spinlock);
  113. journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
  114. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  115. ctx_fs_error(journalfile->datafile->ctx);
  116. }
  117. else {
  118. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
  119. madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
  120. madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
  121. madvise_random(journalfile->mmap.data, journalfile->mmap.size);
  122. madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
  123. netdata_spinlock_lock(&journalfile->v2.spinlock);
  124. journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
  125. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  126. }
  127. }
  128. if(journalfile->mmap.data) {
  129. j2_header = journalfile->mmap.data;
  130. if (data_size)
  131. *data_size = journalfile->mmap.size;
  132. }
  133. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  134. return j2_header;
  135. }
  136. static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks, bool wait) {
  137. bool unmounted = false;
  138. if(!have_locks) {
  139. if(!wait) {
  140. if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
  141. return false;
  142. }
  143. else
  144. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  145. if(!wait) {
  146. if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
  147. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  148. return false;
  149. }
  150. }
  151. else
  152. netdata_spinlock_lock(&journalfile->v2.spinlock);
  153. }
  154. if(!journalfile->v2.refcount) {
  155. if(journalfile->mmap.data) {
  156. if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
  157. char path[RRDENG_PATH_MAX];
  158. journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
  159. error("DBENGINE: failed to unmap index file '%s'", path);
  160. internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
  161. ctx_fs_error(journalfile->datafile->ctx);
  162. }
  163. else {
  164. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
  165. journalfile->mmap.data = NULL;
  166. journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED;
  167. }
  168. }
  169. unmounted = true;
  170. }
  171. if(!have_locks) {
  172. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  173. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  174. }
  175. return unmounted;
  176. }
  177. void journalfile_v2_data_unmount_cleanup(time_t now_s) {
  178. // DO NOT WAIT ON ANY LOCK!!!
  179. for(size_t tier = 0; tier < (size_t)storage_tiers ;tier++) {
  180. struct rrdengine_instance *ctx = multidb_ctx[tier];
  181. if(!ctx) continue;
  182. struct rrdengine_datafile *datafile;
  183. if(uv_rwlock_tryrdlock(&ctx->datafiles.rwlock) != 0)
  184. continue;
  185. for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
  186. struct rrdengine_journalfile *journalfile = datafile->journalfile;
  187. if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
  188. continue;
  189. bool unmount = false;
  190. if (!journalfile->v2.refcount && (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED)) {
  191. // this journal has no references and it is mounted
  192. if (!journalfile->v2.not_needed_since_s)
  193. journalfile->v2.not_needed_since_s = now_s;
  194. else if (now_s - journalfile->v2.not_needed_since_s >= 120)
  195. // 2 minutes have passed since last use
  196. unmount = true;
  197. }
  198. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  199. if (unmount)
  200. journalfile_v2_mounted_data_unmount(journalfile, false, false);
  201. }
  202. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  203. }
  204. }
  205. struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
  206. netdata_spinlock_lock(&journalfile->v2.spinlock);
  207. bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
  208. bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
  209. bool do_we_need_it = false;
  210. if(has_data) {
  211. if (!wanted_first_time_s || !wanted_last_time_s ||
  212. is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s,
  213. wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) {
  214. journalfile->v2.refcount++;
  215. do_we_need_it = true;
  216. if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted)
  217. journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
  218. else
  219. journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
  220. }
  221. }
  222. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  223. if(do_we_need_it)
  224. return journalfile_v2_mounted_data_get(journalfile, data_size);
  225. return NULL;
  226. }
  227. void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
  228. netdata_spinlock_lock(&journalfile->v2.spinlock);
  229. internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
  230. internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
  231. bool unmount = false;
  232. journalfile->v2.refcount--;
  233. if(journalfile->v2.refcount == 0) {
  234. journalfile->v2.not_needed_since_s = 0;
  235. if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
  236. unmount = true;
  237. }
  238. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  239. if(unmount)
  240. journalfile_v2_mounted_data_unmount(journalfile, false, true);
  241. }
  242. bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
  243. netdata_spinlock_lock(&journalfile->v2.spinlock);
  244. bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
  245. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  246. return has_data;
  247. }
  248. size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
  249. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  250. size_t data_size = journalfile->mmap.size;
  251. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  252. return data_size;
  253. }
  254. void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
  255. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  256. netdata_spinlock_lock(&journalfile->v2.spinlock);
  257. internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
  258. internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
  259. internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile");
  260. journalfile->mmap.fd = fd;
  261. journalfile->mmap.data = journal_data;
  262. journalfile->mmap.size = journal_data_size;
  263. journalfile->v2.not_needed_since_s = now_monotonic_sec();
  264. journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
  265. struct journal_v2_header *j2_header = journalfile->mmap.data;
  266. journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
  267. journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
  268. journalfile_v2_mounted_data_unmount(journalfile, true, true);
  269. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  270. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  271. }
  272. static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
  273. bool has_references = false;
  274. do {
  275. if (has_references)
  276. sleep_usec(10 * USEC_PER_MS);
  277. netdata_spinlock_lock(&journalfile->mmap.spinlock);
  278. netdata_spinlock_lock(&journalfile->v2.spinlock);
  279. if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
  280. if(journalfile->mmap.fd != -1)
  281. close(journalfile->mmap.fd);
  282. journalfile->mmap.fd = -1;
  283. journalfile->mmap.data = NULL;
  284. journalfile->mmap.size = 0;
  285. journalfile->v2.first_time_s = 0;
  286. journalfile->v2.last_time_s = 0;
  287. journalfile->v2.flags = 0;
  288. }
  289. else {
  290. has_references = true;
  291. internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
  292. }
  293. netdata_spinlock_unlock(&journalfile->v2.spinlock);
  294. netdata_spinlock_unlock(&journalfile->mmap.spinlock);
  295. } while(has_references);
  296. }
  297. struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile)
  298. {
  299. struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
  300. journalfile->datafile = datafile;
  301. netdata_spinlock_init(&journalfile->mmap.spinlock);
  302. netdata_spinlock_init(&journalfile->v2.spinlock);
  303. netdata_spinlock_init(&journalfile->unsafe.spinlock);
  304. journalfile->mmap.fd = -1;
  305. datafile->journalfile = journalfile;
  306. return journalfile;
  307. }
  308. static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
  309. {
  310. int ret;
  311. char path[RRDENG_PATH_MAX];
  312. uv_fs_t req;
  313. ret = uv_fs_close(NULL, &req, file, NULL);
  314. if (ret < 0) {
  315. journalfile_v1_generate_path(datafile, path, sizeof(path));
  316. error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
  317. ctx_fs_error(datafile->ctx);
  318. }
  319. uv_fs_req_cleanup(&req);
  320. return ret;
  321. }
  322. int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  323. {
  324. if(journalfile_v2_data_available(journalfile)) {
  325. journalfile_v2_data_unmap_permanently(journalfile);
  326. return 0;
  327. }
  328. return close_uv_file(datafile, journalfile->file);
  329. }
  330. int journalfile_unlink(struct rrdengine_journalfile *journalfile)
  331. {
  332. struct rrdengine_datafile *datafile = journalfile->datafile;
  333. struct rrdengine_instance *ctx = datafile->ctx;
  334. uv_fs_t req;
  335. int ret;
  336. char path[RRDENG_PATH_MAX];
  337. journalfile_v1_generate_path(datafile, path, sizeof(path));
  338. ret = uv_fs_unlink(NULL, &req, path, NULL);
  339. if (ret < 0) {
  340. error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  341. ctx_fs_error(ctx);
  342. }
  343. uv_fs_req_cleanup(&req);
  344. __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED);
  345. return ret;
  346. }
  347. int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  348. {
  349. struct rrdengine_instance *ctx = datafile->ctx;
  350. uv_fs_t req;
  351. int ret;
  352. char path[RRDENG_PATH_MAX];
  353. char path_v2[RRDENG_PATH_MAX];
  354. journalfile_v1_generate_path(datafile, path, sizeof(path));
  355. journalfile_v2_generate_path(datafile, path_v2, sizeof(path));
  356. if (journalfile->file) {
  357. ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
  358. if (ret < 0) {
  359. error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
  360. ctx_fs_error(ctx);
  361. }
  362. uv_fs_req_cleanup(&req);
  363. (void) close_uv_file(datafile, journalfile->file);
  364. }
  365. // This is the new journal v2 index file
  366. ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
  367. if (ret < 0) {
  368. error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  369. ctx_fs_error(ctx);
  370. }
  371. uv_fs_req_cleanup(&req);
  372. ret = uv_fs_unlink(NULL, &req, path, NULL);
  373. if (ret < 0) {
  374. error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  375. ctx_fs_error(ctx);
  376. }
  377. uv_fs_req_cleanup(&req);
  378. __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED);
  379. if(journalfile_v2_data_available(journalfile))
  380. journalfile_v2_data_unmap_permanently(journalfile);
  381. return ret;
  382. }
  383. int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  384. {
  385. struct rrdengine_instance *ctx = datafile->ctx;
  386. uv_fs_t req;
  387. uv_file file;
  388. int ret, fd;
  389. struct rrdeng_jf_sb *superblock;
  390. uv_buf_t iov;
  391. char path[RRDENG_PATH_MAX];
  392. journalfile_v1_generate_path(datafile, path, sizeof(path));
  393. fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io);
  394. if (fd < 0) {
  395. ctx_fs_error(ctx);
  396. return fd;
  397. }
  398. journalfile->file = file;
  399. __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED);
  400. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  401. if (unlikely(ret)) {
  402. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  403. }
  404. memset(superblock, 0, sizeof(*superblock));
  405. (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
  406. (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
  407. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  408. ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
  409. if (ret < 0) {
  410. fatal_assert(req.result < 0);
  411. error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
  412. ctx_io_error(ctx);
  413. }
  414. uv_fs_req_cleanup(&req);
  415. posix_memfree(superblock);
  416. if (ret < 0) {
  417. journalfile_destroy_unsafe(journalfile, datafile);
  418. return ret;
  419. }
  420. journalfile->unsafe.pos = sizeof(*superblock);
  421. ctx_io_write_op_bytes(ctx, sizeof(*superblock));
  422. return 0;
  423. }
  424. static int journalfile_check_superblock(uv_file file)
  425. {
  426. int ret;
  427. struct rrdeng_jf_sb *superblock;
  428. uv_buf_t iov;
  429. uv_fs_t req;
  430. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  431. if (unlikely(ret)) {
  432. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  433. }
  434. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  435. ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
  436. if (ret < 0) {
  437. error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
  438. uv_fs_req_cleanup(&req);
  439. goto error;
  440. }
  441. fatal_assert(req.result >= 0);
  442. uv_fs_req_cleanup(&req);
  443. if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
  444. strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
  445. error("DBENGINE: File has invalid superblock.");
  446. ret = UV_EINVAL;
  447. } else {
  448. ret = 0;
  449. }
  450. error:
  451. posix_memfree(superblock);
  452. return ret;
  453. }
  454. static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
  455. {
  456. static BITMAP256 page_error_map;
  457. unsigned i, count, payload_length, descr_size;
  458. struct rrdeng_jf_store_data *jf_metric_data;
  459. jf_metric_data = buf;
  460. count = jf_metric_data->number_of_pages;
  461. descr_size = sizeof(*jf_metric_data->descr) * count;
  462. payload_length = sizeof(*jf_metric_data) + descr_size;
  463. if (payload_length > max_size) {
  464. error("DBENGINE: corrupted transaction payload.");
  465. return;
  466. }
  467. time_t now_s = max_acceptable_collected_time();
  468. for (i = 0; i < count ; ++i) {
  469. uuid_t *temp_id;
  470. uint8_t page_type = jf_metric_data->descr[i].type;
  471. if (page_type > PAGE_TYPE_MAX) {
  472. if (!bitmap256_get_bit(&page_error_map, page_type)) {
  473. error("DBENGINE: unknown page type %d encountered.", page_type);
  474. bitmap256_set_bit(&page_error_map, page_type, 1);
  475. }
  476. continue;
  477. }
  478. temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
  479. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, temp_id, (Word_t) ctx);
  480. struct rrdeng_extent_page_descr *descr = &jf_metric_data->descr[i];
  481. VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
  482. descr, now_s,
  483. (metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
  484. false);
  485. if(!vd.is_valid) {
  486. if(metric)
  487. mrg_metric_release(main_mrg, metric);
  488. continue;
  489. }
  490. bool update_metric_time = true;
  491. if (!metric) {
  492. MRG_ENTRY entry = {
  493. .section = (Word_t)ctx,
  494. .first_time_s = vd.start_time_s,
  495. .last_time_s = vd.end_time_s,
  496. .latest_update_every_s = vd.update_every_s,
  497. };
  498. uuid_copy(entry.uuid, *temp_id);
  499. bool added;
  500. metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
  501. if(added)
  502. update_metric_time = false;
  503. }
  504. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  505. if (update_metric_time)
  506. mrg_metric_expand_retention(main_mrg, metric, vd.start_time_s, vd.end_time_s, vd.update_every_s);
  507. pgc_open_add_hot_page(
  508. (Word_t)ctx, metric_id, vd.start_time_s, vd.end_time_s, vd.update_every_s,
  509. journalfile->datafile,
  510. jf_metric_data->extent_offset, jf_metric_data->extent_size, jf_metric_data->descr[i].page_length);
  511. mrg_metric_release(main_mrg, metric);
  512. }
  513. }
  514. /*
  515. * Replays transaction by interpreting up to max_size bytes from buf.
  516. * Sets id to the current transaction id or to 0 if unknown.
  517. * Returns size of transaction record or 0 for unknown size.
  518. */
  519. static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
  520. void *buf, uint64_t *id, unsigned max_size)
  521. {
  522. unsigned payload_length, size_bytes;
  523. int ret;
  524. /* persistent structures */
  525. struct rrdeng_jf_transaction_header *jf_header;
  526. struct rrdeng_jf_transaction_trailer *jf_trailer;
  527. uLong crc;
  528. *id = 0;
  529. jf_header = buf;
  530. if (STORE_PADDING == jf_header->type) {
  531. debug(D_RRDENGINE, "Skipping padding.");
  532. return 0;
  533. }
  534. if (sizeof(*jf_header) > max_size) {
  535. error("DBENGINE: corrupted transaction record, skipping.");
  536. return 0;
  537. }
  538. *id = jf_header->id;
  539. payload_length = jf_header->payload_length;
  540. size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
  541. if (size_bytes > max_size) {
  542. error("DBENGINE: corrupted transaction record, skipping.");
  543. return 0;
  544. }
  545. jf_trailer = buf + sizeof(*jf_header) + payload_length;
  546. crc = crc32(0L, Z_NULL, 0);
  547. crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
  548. ret = crc32cmp(jf_trailer->checksum, crc);
  549. debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
  550. if (unlikely(ret)) {
  551. error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
  552. return size_bytes;
  553. }
  554. switch (jf_header->type) {
  555. case STORE_DATA:
  556. debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
  557. journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
  558. break;
  559. default:
  560. error("DBENGINE: unknown transaction type, skipping record.");
  561. break;
  562. }
  563. return size_bytes;
  564. }
  565. #define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
  566. /*
  567. * Iterates journal file transactions and populates the page cache.
  568. * Page cache must already be initialized.
  569. * Returns the maximum transaction id it discovered.
  570. */
  571. static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
  572. {
  573. uv_file file;
  574. uint64_t file_size;
  575. int ret;
  576. uint64_t pos, pos_i, max_id, id;
  577. unsigned size_bytes;
  578. void *buf;
  579. uv_buf_t iov;
  580. uv_fs_t req;
  581. file = journalfile->file;
  582. file_size = journalfile->unsafe.pos;
  583. max_id = 1;
  584. ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
  585. if (unlikely(ret))
  586. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  587. for (pos = sizeof(struct rrdeng_jf_sb); pos < file_size; pos += READAHEAD_BYTES) {
  588. size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
  589. iov = uv_buf_init(buf, size_bytes);
  590. ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
  591. if (ret < 0) {
  592. error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
  593. uv_fs_req_cleanup(&req);
  594. goto skip_file;
  595. }
  596. fatal_assert(req.result >= 0);
  597. uv_fs_req_cleanup(&req);
  598. ctx_io_read_op_bytes(ctx, size_bytes);
  599. for (pos_i = 0; pos_i < size_bytes;) {
  600. unsigned max_size;
  601. max_size = pos + size_bytes - pos_i;
  602. ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
  603. if (!ret) /* TODO: support transactions bigger than 4K */
  604. /* unknown transaction size, move on to the next block */
  605. pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
  606. else
  607. pos_i += ret;
  608. max_id = MAX(max_id, id);
  609. }
  610. }
  611. skip_file:
  612. posix_memfree(buf);
  613. return max_id;
  614. }
  615. // Checks that the extent list checksum is valid
  616. static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
  617. {
  618. UNUSED(file_size);
  619. uLong crc;
  620. struct journal_v2_header *j2_header = (void *) data_start;
  621. struct journal_v2_block_trailer *journal_v2_trailer;
  622. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
  623. crc = crc32(0L, Z_NULL, 0);
  624. crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
  625. if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
  626. error("DBENGINE: extent list CRC32 check: FAILED");
  627. return 1;
  628. }
  629. return 0;
  630. }
  631. // Checks that the metric list (UUIDs) checksum is valid
  632. static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
  633. {
  634. UNUSED(file_size);
  635. uLong crc;
  636. struct journal_v2_header *j2_header = (void *) data_start;
  637. struct journal_v2_block_trailer *journal_v2_trailer;
  638. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
  639. crc = crc32(0L, Z_NULL, 0);
  640. crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
  641. if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
  642. error("DBENGINE: metric list CRC32 check: FAILED");
  643. return 1;
  644. }
  645. return 0;
  646. }
  647. //
  648. // Return
  649. // 0 Ok
  650. // 1 Invalid
  651. // 2 Force rebuild
  652. // 3 skip
  653. static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size, size_t journal_v1_file_size)
  654. {
  655. int rc;
  656. uLong crc;
  657. struct journal_v2_header *j2_header = (void *) data_start;
  658. struct journal_v2_block_trailer *journal_v2_trailer;
  659. if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
  660. return 2;
  661. if (j2_header->magic == JOURVAL_V2_SKIP_MAGIC)
  662. return 3;
  663. // Magic failure
  664. if (j2_header->magic != JOURVAL_V2_MAGIC)
  665. return 1;
  666. if (j2_header->journal_v2_file_size != journal_v2_file_size)
  667. return 1;
  668. if (journal_v1_file_size && j2_header->journal_v1_file_size != journal_v1_file_size)
  669. return 1;
  670. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + journal_v2_file_size - sizeof(*journal_v2_trailer));
  671. crc = crc32(0L, Z_NULL, 0);
  672. crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
  673. rc = crc32cmp(journal_v2_trailer->checksum, crc);
  674. if (unlikely(rc)) {
  675. error("DBENGINE: file CRC32 check: FAILED");
  676. return 1;
  677. }
  678. rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
  679. if (rc) return 1;
  680. rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
  681. if (rc) return 1;
  682. if (!db_engine_journal_check)
  683. return 0;
  684. // Verify complete UUID chain
  685. struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
  686. unsigned verified = 0;
  687. unsigned entries;
  688. unsigned total_pages = 0;
  689. info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
  690. for (entries = 0; entries < j2_header->metric_count; entries++) {
  691. char uuid_str[UUID_STR_LEN];
  692. uuid_unparse_lower(metric->uuid, uuid_str);
  693. struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
  694. struct journal_page_header local_metric_list_header = *metric_list_header;
  695. local_metric_list_header.crc = JOURVAL_V2_MAGIC;
  696. crc = crc32(0L, Z_NULL, 0);
  697. crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header));
  698. rc = crc32cmp(metric_list_header->checksum, crc);
  699. if (!rc) {
  700. struct journal_v2_block_trailer *journal_trailer =
  701. (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list));
  702. crc = crc32(0L, Z_NULL, 0);
  703. crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list));
  704. rc = crc32cmp(journal_trailer->checksum, crc);
  705. internal_error(rc, "DBENGINE: index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
  706. crc, metric_list_header->crc);
  707. if (!rc) {
  708. total_pages += metric_list_header->entries;
  709. verified++;
  710. }
  711. }
  712. metric++;
  713. if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) {
  714. info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
  715. return 1;
  716. }
  717. }
  718. if (entries != verified) {
  719. info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
  720. return 1;
  721. }
  722. info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
  723. return 0;
  724. }
  725. void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) {
  726. usec_t started_ut = now_monotonic_usec();
  727. size_t data_size = 0;
  728. struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, &data_size, 0, 0);
  729. if(!j2_header)
  730. return;
  731. uint8_t *data_start = (uint8_t *)j2_header;
  732. uint32_t entries = j2_header->metric_count;
  733. struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
  734. time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
  735. time_t now_s = max_acceptable_collected_time();
  736. for (size_t i=0; i < entries; i++) {
  737. time_t start_time_s = header_start_time_s + metric->delta_start_s;
  738. time_t end_time_s = header_start_time_s + metric->delta_end_s;
  739. time_t update_every_s = (metric->entries > 1) ? ((end_time_s - start_time_s) / (entries - 1)) : 0;
  740. update_metric_retention_and_granularity_by_uuid(
  741. ctx, &metric->uuid, start_time_s, end_time_s, update_every_s, now_s);
  742. #ifdef NETDATA_INTERNAL_CHECKS
  743. struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
  744. fatal_assert(uuid_compare(metric_list_header->uuid, metric->uuid) == 0);
  745. fatal_assert(metric->entries == metric_list_header->entries);
  746. #endif
  747. metric++;
  748. }
  749. journalfile_v2_data_release(journalfile);
  750. usec_t ended_ut = now_monotonic_usec();
  751. info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
  752. , ctx->config.tier, journalfile->datafile->fileno
  753. , (double)data_size / 1024 / 1024
  754. , (double)entries / 1000
  755. , ((double)(ended_ut - started_ut) / USEC_PER_MS)
  756. );
  757. }
  758. int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  759. {
  760. int ret, fd;
  761. char path_v1[RRDENG_PATH_MAX];
  762. char path_v2[RRDENG_PATH_MAX];
  763. struct stat statbuf;
  764. size_t journal_v1_file_size = 0;
  765. size_t journal_v2_file_size;
  766. journalfile_v1_generate_path(datafile, path_v1, sizeof(path_v1));
  767. ret = stat(path_v1, &statbuf);
  768. if (!ret)
  769. journal_v1_file_size = (uint32_t)statbuf.st_size;
  770. journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2));
  771. fd = open(path_v2, O_RDONLY);
  772. if (fd < 0) {
  773. if (errno == ENOENT)
  774. return 1;
  775. ctx_fs_error(ctx);
  776. error("DBENGINE: failed to open '%s'", path_v2);
  777. return 1;
  778. }
  779. ret = fstat(fd, &statbuf);
  780. if (ret) {
  781. error("DBENGINE: failed to get file information for '%s'", path_v2);
  782. close(fd);
  783. return 1;
  784. }
  785. journal_v2_file_size = (size_t)statbuf.st_size;
  786. if (journal_v2_file_size < sizeof(struct journal_v2_header)) {
  787. error_report("Invalid file %s. Not the expected size", path_v2);
  788. close(fd);
  789. return 1;
  790. }
  791. usec_t mmap_start_ut = now_monotonic_usec();
  792. uint8_t *data_start = mmap(NULL, journal_v2_file_size, PROT_READ, MAP_SHARED, fd, 0);
  793. if (data_start == MAP_FAILED) {
  794. close(fd);
  795. return 1;
  796. }
  797. info("DBENGINE: checking integrity of '%s'", path_v2);
  798. usec_t validation_start_ut = now_monotonic_usec();
  799. int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
  800. if (unlikely(rc)) {
  801. if (rc == 2)
  802. error_report("File %s needs to be rebuilt", path_v2);
  803. else if (rc == 3)
  804. error_report("File %s will be skipped", path_v2);
  805. else
  806. error_report("File %s is invalid and it will be rebuilt", path_v2);
  807. if (unlikely(munmap(data_start, journal_v2_file_size)))
  808. error("DBENGINE: failed to unmap '%s'", path_v2);
  809. close(fd);
  810. return rc;
  811. }
  812. struct journal_v2_header *j2_header = (void *) data_start;
  813. uint32_t entries = j2_header->metric_count;
  814. if (unlikely(!entries)) {
  815. if (unlikely(munmap(data_start, journal_v2_file_size)))
  816. error("DBENGINE: failed to unmap '%s'", path_v2);
  817. close(fd);
  818. return 1;
  819. }
  820. usec_t finished_ut = now_monotonic_usec();
  821. info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
  822. "mmap: %0.2f ms, validate: %0.2f ms"
  823. , path_v2
  824. , (double)journal_v2_file_size / 1024 / 1024
  825. , (double)entries / 1000
  826. , ((double)(validation_start_ut - mmap_start_ut) / USEC_PER_MS)
  827. , ((double)(finished_ut - validation_start_ut) / USEC_PER_MS)
  828. );
  829. // Initialize the journal file to be able to access the data
  830. journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
  831. ctx_current_disk_space_increase(ctx, journal_v2_file_size);
  832. // File is OK load it
  833. return 0;
  834. }
  835. struct journal_metric_list_to_sort {
  836. struct jv2_metrics_info *metric_info;
  837. };
  838. static int journalfile_metric_compare (const void *item1, const void *item2)
  839. {
  840. const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
  841. const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
  842. return uuid_compare(*(metric1->uuid), *(metric2->uuid));
  843. }
  844. // Write list of extents for the journalfile
  845. void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
  846. {
  847. Pvoid_t *PValue;
  848. struct journal_extent_list *j2_extent_base = (void *) data;
  849. struct jv2_extents_info *ext_info;
  850. bool first = true;
  851. Word_t pos = 0;
  852. size_t count = 0;
  853. while ((PValue = JudyLFirstThenNext(JudyL_extents_pos, &pos, &first))) {
  854. ext_info = *PValue;
  855. size_t index = ext_info->index;
  856. j2_extent_base[index].file_index = 0;
  857. j2_extent_base[index].datafile_offset = ext_info->pos;
  858. j2_extent_base[index].datafile_size = ext_info->bytes;
  859. j2_extent_base[index].pages = ext_info->number_of_pages;
  860. count++;
  861. }
  862. return j2_extent_base + count;
  863. }
  864. static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
  865. {
  866. if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->journal_v2_file_size - sizeof(struct journal_v2_block_trailer)))
  867. return 1;
  868. return 0;
  869. }
  870. void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset)
  871. {
  872. struct journal_metric_list *metric = (void *) data;
  873. if (journalfile_verify_space(j2_header, data, sizeof(*metric)))
  874. return NULL;
  875. uuid_copy(metric->uuid, *metric_info->uuid);
  876. metric->entries = metric_info->number_of_pages;
  877. metric->page_offset = pages_offset;
  878. metric->delta_start_s = (uint32_t)(metric_info->first_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
  879. metric->delta_end_s = (uint32_t)(metric_info->last_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
  880. return ++metric;
  881. }
  882. void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset)
  883. {
  884. struct journal_page_header *data_page_header = (void *) data;
  885. uLong crc;
  886. uuid_copy(data_page_header->uuid, *metric_info->uuid);
  887. data_page_header->entries = metric_info->number_of_pages;
  888. data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory
  889. data_page_header->crc = JOURVAL_V2_MAGIC;
  890. crc = crc32(0L, Z_NULL, 0);
  891. crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header));
  892. crc32set(data_page_header->checksum, crc);
  893. return ++data_page_header;
  894. }
  895. void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
  896. {
  897. struct journal_page_header *data_page_header = (void *) page_header;
  898. struct journal_v2_block_trailer *journal_trailer = (void *) data;
  899. uLong crc;
  900. crc = crc32(0L, Z_NULL, 0);
  901. crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list));
  902. crc32set(journal_trailer->checksum, crc);
  903. return ++journal_trailer;
  904. }
  905. void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info)
  906. {
  907. struct journal_page_list *data_page = data;
  908. if (journalfile_verify_space(j2_header, data, sizeof(*data_page)))
  909. return NULL;
  910. struct extent_io_data *ei = page_info->custom_data;
  911. data_page->delta_start_s = (uint32_t) (page_info->start_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
  912. data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
  913. data_page->extent_index = page_info->extent_index;
  914. data_page->update_every_s = page_info->update_every_s;
  915. data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
  916. data_page->type = 0;
  917. return ++data_page;
  918. }
  919. // Must be recorded in metric_info->entries
  920. void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info)
  921. {
  922. Pvoid_t *PValue;
  923. struct journal_page_list *data_page = (void *)data;
  924. // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s
  925. // that belong to this journal file
  926. Pvoid_t JudyL_array = metric_info->JudyL_pages_by_start_time;
  927. Word_t index_time = 0;
  928. bool first = true;
  929. struct jv2_page_info *page_info;
  930. while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) {
  931. page_info = *PValue;
  932. // Write one descriptor and return the next data page location
  933. data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
  934. if (NULL == data_page)
  935. break;
  936. }
  937. return data_page;
  938. }
  939. // Migrate the journalfile pointed by datafile
  940. // activate : make the new file active immediately
  941. // journafile data will be set and descriptors (if deleted) will be repopulated as needed
  942. // startup : if the migration is done during agent startup
  943. // this will allow us to optimize certain things
  944. void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
  945. Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
  946. size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data)
  947. {
  948. char path[RRDENG_PATH_MAX];
  949. Pvoid_t *PValue;
  950. struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
  951. struct rrdengine_journalfile *journalfile = (struct rrdengine_journalfile *) user_data;
  952. struct rrdengine_datafile *datafile = journalfile->datafile;
  953. time_t min_time_s = LONG_MAX;
  954. time_t max_time_s = 0;
  955. struct jv2_metrics_info *metric_info;
  956. journalfile_v2_generate_path(datafile, path, sizeof(path));
  957. info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
  958. path,
  959. number_of_extents,
  960. number_of_metrics,
  961. number_of_pages);
  962. #ifdef NETDATA_INTERNAL_CHECKS
  963. usec_t start_loading = now_monotonic_usec();
  964. #endif
  965. size_t total_file_size = 0;
  966. total_file_size += (sizeof(struct journal_v2_header) + JOURNAL_V2_HEADER_PADDING_SZ);
  967. // Extents will start here
  968. uint32_t extent_offset = total_file_size;
  969. total_file_size += (number_of_extents * sizeof(struct journal_extent_list));
  970. uint32_t extent_offset_trailer = total_file_size;
  971. total_file_size += sizeof(struct journal_v2_block_trailer);
  972. // UUID list will start here
  973. uint32_t metrics_offset = total_file_size;
  974. total_file_size += (number_of_metrics * sizeof(struct journal_metric_list));
  975. // UUID list trailer
  976. uint32_t metric_offset_trailer = total_file_size;
  977. total_file_size += sizeof(struct journal_v2_block_trailer);
  978. // descr @ time will start here
  979. uint32_t pages_offset = total_file_size;
  980. total_file_size += (number_of_pages * (sizeof(struct journal_page_list) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer)));
  981. // File trailer
  982. uint32_t trailer_offset = total_file_size;
  983. total_file_size += sizeof(struct journal_v2_block_trailer);
  984. int fd_v2;
  985. uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2);
  986. uint8_t *data = data_start;
  987. memset(data_start, 0, extent_offset);
  988. // Write header
  989. struct journal_v2_header j2_header;
  990. memset(&j2_header, 0, sizeof(j2_header));
  991. j2_header.magic = JOURVAL_V2_MAGIC;
  992. j2_header.start_time_ut = 0;
  993. j2_header.end_time_ut = 0;
  994. j2_header.extent_count = number_of_extents;
  995. j2_header.extent_offset = extent_offset;
  996. j2_header.metric_count = number_of_metrics;
  997. j2_header.metric_offset = metrics_offset;
  998. j2_header.page_count = number_of_pages;
  999. j2_header.page_offset = pages_offset;
  1000. j2_header.extent_trailer_offset = extent_offset_trailer;
  1001. j2_header.metric_trailer_offset = metric_offset_trailer;
  1002. j2_header.journal_v2_file_size = total_file_size;
  1003. j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile);
  1004. j2_header.data = data_start; // Used during migration
  1005. struct journal_v2_block_trailer *journal_v2_trailer;
  1006. data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
  1007. internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1008. fatal_assert(data == data_start + extent_offset_trailer);
  1009. // Calculate CRC for extents
  1010. journal_v2_trailer = (struct journal_v2_block_trailer *) (data_start + extent_offset_trailer);
  1011. uLong crc;
  1012. crc = crc32(0L, Z_NULL, 0);
  1013. crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list));
  1014. crc32set(journal_v2_trailer->checksum, crc);
  1015. internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1016. // Skip the trailer, point to the metrics off
  1017. data += sizeof(struct journal_v2_block_trailer);
  1018. // Sanity check -- we must be at the metrics_offset
  1019. fatal_assert(data == data_start + metrics_offset);
  1020. // Allocate array to sort UUIDs and keep them sorted in the journal because we want to do binary search when we do lookups
  1021. struct journal_metric_list_to_sort *uuid_list = mallocz(number_of_metrics * sizeof(struct journal_metric_list_to_sort));
  1022. Word_t Index = 0;
  1023. size_t count = 0;
  1024. bool first_then_next = true;
  1025. while ((PValue = JudyLFirstThenNext(JudyL_metrics, &Index, &first_then_next))) {
  1026. metric_info = *PValue;
  1027. fatal_assert(count < number_of_metrics);
  1028. uuid_list[count++].metric_info = metric_info;
  1029. min_time_s = MIN(min_time_s, metric_info->first_time_s);
  1030. max_time_s = MAX(max_time_s, metric_info->last_time_s);
  1031. }
  1032. // Store in the header
  1033. j2_header.start_time_ut = min_time_s * USEC_PER_SEC;
  1034. j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
  1035. qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
  1036. internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1037. uint32_t resize_file_to = total_file_size;
  1038. for (Index = 0; Index < number_of_metrics; Index++) {
  1039. metric_info = uuid_list[Index].metric_info;
  1040. // Calculate current UUID offset from start of file. We will store this in the data page header
  1041. uint32_t uuid_offset = data - data_start;
  1042. // Write the UUID we are processing
  1043. data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
  1044. if (unlikely(!data))
  1045. break;
  1046. // Next we will write
  1047. // Header
  1048. // Detailed entries (descr @ time)
  1049. // Trailer (checksum)
  1050. // Keep the page_list_header, to be used for migration when where agent is running
  1051. metric_info->page_list_header = pages_offset;
  1052. // Write page header
  1053. void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info,
  1054. uuid_offset);
  1055. // Start writing descr @ time
  1056. void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info);
  1057. if (unlikely(!page_trailer))
  1058. break;
  1059. // Trailer (checksum)
  1060. uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer,
  1061. data_start + pages_offset);
  1062. // Calculate start of the pages start for next descriptor
  1063. pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer));
  1064. // Verify we are at the right location
  1065. if (pages_offset != (uint32_t)(next_page_address - data_start)) {
  1066. // make sure checks fail so that we abort
  1067. data = data_start;
  1068. break;
  1069. }
  1070. }
  1071. if (data == data_start + metric_offset_trailer) {
  1072. internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1073. // Calculate CRC for metrics
  1074. journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer);
  1075. crc = crc32(0L, Z_NULL, 0);
  1076. crc =
  1077. crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list));
  1078. crc32set(journal_v2_trailer->checksum, crc);
  1079. internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1080. // Prepare to write checksum for the file
  1081. j2_header.data = NULL;
  1082. journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + trailer_offset);
  1083. crc = crc32(0L, Z_NULL, 0);
  1084. crc = crc32(crc, (void *)&j2_header, sizeof(j2_header));
  1085. crc32set(journal_v2_trailer->checksum, crc);
  1086. // Write header to the file
  1087. memcpy(data_start, &j2_header, sizeof(j2_header));
  1088. internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1089. info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
  1090. // msync(data_start, total_file_size, MS_SYNC);
  1091. journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
  1092. internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1093. ctx_current_disk_space_increase(ctx, total_file_size);
  1094. freez(uuid_list);
  1095. return;
  1096. }
  1097. else {
  1098. info("DBENGINE: failed to build index '%s', file will be skipped", path);
  1099. j2_header.data = NULL;
  1100. j2_header.magic = JOURVAL_V2_SKIP_MAGIC;
  1101. memcpy(data_start, &j2_header, sizeof(j2_header));
  1102. resize_file_to = sizeof(j2_header);
  1103. }
  1104. netdata_munmap(data_start, total_file_size);
  1105. freez(uuid_list);
  1106. if (likely(resize_file_to == total_file_size))
  1107. return;
  1108. int ret = truncate(path, (long) resize_file_to);
  1109. if (ret < 0) {
  1110. ctx_current_disk_space_increase(ctx, total_file_size);
  1111. ctx_fs_error(ctx);
  1112. error("DBENGINE: failed to resize file '%s'", path);
  1113. }
  1114. else
  1115. ctx_current_disk_space_increase(ctx, resize_file_to);
  1116. }
  1117. int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
  1118. struct rrdengine_datafile *datafile)
  1119. {
  1120. uv_fs_t req;
  1121. uv_file file;
  1122. int ret, fd, error;
  1123. uint64_t file_size, max_id;
  1124. char path[RRDENG_PATH_MAX];
  1125. bool loaded_v2 = false;
  1126. // Do not try to load jv2 of the latest file
  1127. if (datafile->fileno != ctx_last_fileno_get(ctx))
  1128. loaded_v2 = journalfile_v2_load(ctx, journalfile, datafile) == 0;
  1129. journalfile_v1_generate_path(datafile, path, sizeof(path));
  1130. fd = open_file_for_io(path, O_RDWR, &file, use_direct_io);
  1131. if (fd < 0) {
  1132. ctx_fs_error(ctx);
  1133. if(loaded_v2)
  1134. return 0;
  1135. return fd;
  1136. }
  1137. ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
  1138. if (ret) {
  1139. error = ret;
  1140. goto cleanup;
  1141. }
  1142. if(loaded_v2) {
  1143. journalfile->unsafe.pos = file_size;
  1144. error = 0;
  1145. goto cleanup;
  1146. }
  1147. file_size = ALIGN_BYTES_FLOOR(file_size);
  1148. journalfile->unsafe.pos = file_size;
  1149. journalfile->file = file;
  1150. ret = journalfile_check_superblock(file);
  1151. if (ret) {
  1152. info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
  1153. error = ret;
  1154. goto cleanup;
  1155. }
  1156. ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
  1157. info("DBENGINE: loading journal file '%s'", path);
  1158. max_id = journalfile_iterate_transactions(ctx, journalfile);
  1159. __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
  1160. info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
  1161. bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
  1162. if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
  1163. ctx->loading.create_new_datafile_pair = false;
  1164. return 0;
  1165. }
  1166. pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
  1167. journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
  1168. if (is_last_file)
  1169. ctx->loading.create_new_datafile_pair = true;
  1170. return 0;
  1171. cleanup:
  1172. ret = uv_fs_close(NULL, &req, file, NULL);
  1173. if (ret < 0) {
  1174. error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
  1175. ctx_fs_error(ctx);
  1176. }
  1177. uv_fs_req_cleanup(&req);
  1178. return error;
  1179. }