journalfile.c 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
  4. {
  5. worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
  6. WAL *wal = req->data;
  7. struct generic_io_descriptor *io_descr = &wal->io_descr;
  8. struct rrdengine_instance *ctx = io_descr->ctx;
  9. netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
  10. if (req->result < 0) {
  11. ctx_io_error(ctx);
  12. netdata_log_error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
  13. } else {
  14. netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
  15. }
  16. uv_fs_req_cleanup(req);
  17. wal_release(wal);
  18. __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
  19. worker_is_idle();
  20. }
  21. /* Careful to always call this before creating a new journal file */
  22. void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, WAL *wal, uv_loop_t *loop)
  23. {
  24. int ret;
  25. struct generic_io_descriptor *io_descr;
  26. struct rrdengine_journalfile *journalfile = datafile->journalfile;
  27. io_descr = &wal->io_descr;
  28. io_descr->ctx = ctx;
  29. if (wal->size < wal->buf_size) {
  30. /* simulate an empty transaction to skip the rest of the block */
  31. *(uint8_t *) (wal->buf + wal->size) = STORE_PADDING;
  32. }
  33. io_descr->buf = wal->buf;
  34. io_descr->bytes = wal->buf_size;
  35. spinlock_lock(&journalfile->unsafe.spinlock);
  36. io_descr->pos = journalfile->unsafe.pos;
  37. journalfile->unsafe.pos += wal->buf_size;
  38. spinlock_unlock(&journalfile->unsafe.spinlock);
  39. io_descr->req.data = wal;
  40. io_descr->data = journalfile;
  41. io_descr->completion = NULL;
  42. io_descr->iov = uv_buf_init((void *)io_descr->buf, wal->buf_size);
  43. ret = uv_fs_write(loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
  44. (int64_t)io_descr->pos, after_extent_write_journalfile_v1_io);
  45. fatal_assert(-1 != ret);
  46. ctx_current_disk_space_increase(ctx, wal->buf_size);
  47. ctx_io_write_op_bytes(ctx, wal->buf_size);
  48. }
  49. void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  50. {
  51. (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION_V2,
  52. datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
  53. }
  54. void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
  55. {
  56. (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
  57. datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
  58. }
  59. // ----------------------------------------------------------------------------
  60. struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) {
  61. struct rrdengine_datafile *datafile = NULL;
  62. rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock);
  63. Pvoid_t *PValue = NULL;
  64. if(unlikely(!s->init)) {
  65. s->init = true;
  66. s->last = s->wanted_start_time_s;
  67. PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
  68. if (unlikely(PValue == PJERR))
  69. fatal("DBENGINE: NJFV2IDX corrupted judy array");
  70. if(!PValue) {
  71. s->last = 0;
  72. PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
  73. if (unlikely(PValue == PJERR))
  74. fatal("DBENGINE: NJFV2IDX corrupted judy array");
  75. if(!PValue)
  76. s->last = s->wanted_start_time_s;
  77. }
  78. }
  79. while(1) {
  80. if (likely(!PValue)) {
  81. PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
  82. if (unlikely(PValue == PJERR))
  83. fatal("DBENGINE: NJFV2IDX corrupted judy array");
  84. if(!PValue) {
  85. // cannot find anything after that point
  86. datafile = NULL;
  87. break;
  88. }
  89. }
  90. datafile = *PValue;
  91. TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s,
  92. datafile->journalfile->v2.last_time_s,
  93. s->wanted_start_time_s,
  94. s->wanted_end_time_s);
  95. if(rc == PAGE_IS_IN_RANGE) {
  96. // this is good to return
  97. break;
  98. }
  99. else if(rc == PAGE_IS_IN_THE_PAST) {
  100. // continue to get the next
  101. datafile = NULL;
  102. PValue = NULL;
  103. continue;
  104. }
  105. else /* PAGE_IS_IN_THE_FUTURE */ {
  106. // we finished - no more datafiles
  107. datafile = NULL;
  108. PValue = NULL;
  109. break;
  110. }
  111. }
  112. if(datafile)
  113. s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL,
  114. s->wanted_start_time_s,
  115. s->wanted_end_time_s);
  116. else
  117. s->j2_header_acquired = NULL;
  118. rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock);
  119. return datafile;
  120. }
  121. static void njfv2idx_add(struct rrdengine_datafile *datafile) {
  122. internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s");
  123. rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
  124. datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s;
  125. do {
  126. internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed");
  127. Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
  128. if (!PValue || PValue == PJERR)
  129. fatal("DBENGINE: NJFV2IDX corrupted judy array");
  130. if (unlikely(*PValue)) {
  131. // already there
  132. datafile->journalfile->njfv2idx.indexed_as++;
  133. }
  134. else {
  135. *PValue = datafile;
  136. break;
  137. }
  138. } while(0);
  139. rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
  140. }
  141. static void njfv2idx_remove(struct rrdengine_datafile *datafile) {
  142. internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed");
  143. rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
  144. int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
  145. (void)rc;
  146. internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry");
  147. datafile->journalfile->njfv2idx.indexed_as = 0;
  148. rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
  149. }
  150. // ----------------------------------------------------------------------------
  151. static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
  152. struct journal_v2_header *j2_header = NULL;
  153. spinlock_lock(&journalfile->mmap.spinlock);
  154. if(!journalfile->mmap.data) {
  155. journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
  156. if (journalfile->mmap.data == MAP_FAILED) {
  157. internal_fatal(true, "DBENGINE: failed to re-mmap() journal file v2");
  158. close(journalfile->mmap.fd);
  159. journalfile->mmap.fd = -1;
  160. journalfile->mmap.data = NULL;
  161. journalfile->mmap.size = 0;
  162. spinlock_lock(&journalfile->v2.spinlock);
  163. journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
  164. spinlock_unlock(&journalfile->v2.spinlock);
  165. ctx_fs_error(journalfile->datafile->ctx);
  166. }
  167. else {
  168. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_mapped, 1, __ATOMIC_RELAXED);
  169. madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
  170. madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
  171. spinlock_lock(&journalfile->v2.spinlock);
  172. journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
  173. JOURNALFILE_FLAGS flags = journalfile->v2.flags;
  174. spinlock_unlock(&journalfile->v2.spinlock);
  175. if(flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) {
  176. // we need the entire metrics directory into memory to process it
  177. madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory);
  178. }
  179. else {
  180. // let the kernel know that we don't want read-ahead on this file
  181. madvise_random(journalfile->mmap.data, journalfile->mmap.size);
  182. // madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
  183. }
  184. }
  185. }
  186. if(journalfile->mmap.data) {
  187. j2_header = journalfile->mmap.data;
  188. if (data_size)
  189. *data_size = journalfile->mmap.size;
  190. }
  191. spinlock_unlock(&journalfile->mmap.spinlock);
  192. return j2_header;
  193. }
  194. static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *journalfile, bool have_locks, bool wait) {
  195. bool unmounted = false;
  196. if(!have_locks) {
  197. if(!wait) {
  198. if (!spinlock_trylock(&journalfile->mmap.spinlock))
  199. return false;
  200. }
  201. else
  202. spinlock_lock(&journalfile->mmap.spinlock);
  203. if(!wait) {
  204. if(!spinlock_trylock(&journalfile->v2.spinlock)) {
  205. spinlock_unlock(&journalfile->mmap.spinlock);
  206. return false;
  207. }
  208. }
  209. else
  210. spinlock_lock(&journalfile->v2.spinlock);
  211. }
  212. if(!journalfile->v2.refcount) {
  213. if(journalfile->mmap.data) {
  214. if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
  215. char path[RRDENG_PATH_MAX];
  216. journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
  217. netdata_log_error("DBENGINE: failed to unmap index file '%s'", path);
  218. internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
  219. ctx_fs_error(journalfile->datafile->ctx);
  220. }
  221. else {
  222. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.journal_v2_unmapped, 1, __ATOMIC_RELAXED);
  223. journalfile->mmap.data = NULL;
  224. journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_MOUNTED;
  225. }
  226. }
  227. unmounted = true;
  228. }
  229. if(!have_locks) {
  230. spinlock_unlock(&journalfile->v2.spinlock);
  231. spinlock_unlock(&journalfile->mmap.spinlock);
  232. }
  233. return unmounted;
  234. }
  235. void journalfile_v2_data_unmount_cleanup(time_t now_s) {
  236. // DO NOT WAIT ON ANY LOCK!!!
  237. for(size_t tier = 0; tier < (size_t)storage_tiers ;tier++) {
  238. struct rrdengine_instance *ctx = multidb_ctx[tier];
  239. if(!ctx) continue;
  240. struct rrdengine_datafile *datafile;
  241. if(uv_rwlock_tryrdlock(&ctx->datafiles.rwlock) != 0)
  242. continue;
  243. for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
  244. struct rrdengine_journalfile *journalfile = datafile->journalfile;
  245. if(!spinlock_trylock(&journalfile->v2.spinlock))
  246. continue;
  247. bool unmount = false;
  248. if (!journalfile->v2.refcount && (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED)) {
  249. // this journal has no references and it is mounted
  250. if (!journalfile->v2.not_needed_since_s)
  251. journalfile->v2.not_needed_since_s = now_s;
  252. else if (now_s - journalfile->v2.not_needed_since_s >= 120)
  253. // 2 minutes have passed since last use
  254. unmount = true;
  255. }
  256. spinlock_unlock(&journalfile->v2.spinlock);
  257. if (unmount)
  258. journalfile_v2_mounted_data_unmount(journalfile, false, false);
  259. }
  260. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  261. }
  262. }
  263. struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
  264. spinlock_lock(&journalfile->v2.spinlock);
  265. bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
  266. bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
  267. bool do_we_need_it = false;
  268. if(has_data) {
  269. if (!wanted_first_time_s || !wanted_last_time_s ||
  270. is_page_in_time_range(journalfile->v2.first_time_s, journalfile->v2.last_time_s,
  271. wanted_first_time_s, wanted_last_time_s) == PAGE_IS_IN_RANGE) {
  272. journalfile->v2.refcount++;
  273. do_we_need_it = true;
  274. if (!wanted_first_time_s && !wanted_last_time_s && !is_mounted)
  275. journalfile->v2.flags |= JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
  276. else
  277. journalfile->v2.flags &= ~JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION;
  278. }
  279. }
  280. spinlock_unlock(&journalfile->v2.spinlock);
  281. if(do_we_need_it)
  282. return journalfile_v2_mounted_data_get(journalfile, data_size);
  283. return NULL;
  284. }
  285. void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
  286. spinlock_lock(&journalfile->v2.spinlock);
  287. internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
  288. internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
  289. bool unmount = false;
  290. journalfile->v2.refcount--;
  291. if(journalfile->v2.refcount == 0) {
  292. journalfile->v2.not_needed_since_s = 0;
  293. if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
  294. unmount = true;
  295. }
  296. spinlock_unlock(&journalfile->v2.spinlock);
  297. if(unmount)
  298. journalfile_v2_mounted_data_unmount(journalfile, false, true);
  299. }
  300. bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
  301. spinlock_lock(&journalfile->v2.spinlock);
  302. bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
  303. spinlock_unlock(&journalfile->v2.spinlock);
  304. return has_data;
  305. }
  306. size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
  307. spinlock_lock(&journalfile->mmap.spinlock);
  308. size_t data_size = journalfile->mmap.size;
  309. spinlock_unlock(&journalfile->mmap.spinlock);
  310. return data_size;
  311. }
  312. void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
  313. spinlock_lock(&journalfile->mmap.spinlock);
  314. spinlock_lock(&journalfile->v2.spinlock);
  315. internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
  316. internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
  317. internal_fatal(journalfile->v2.refcount, "DBENGINE JOURNALFILE: trying to re-set journal_data of referenced journalfile");
  318. journalfile->mmap.fd = fd;
  319. journalfile->mmap.data = journal_data;
  320. journalfile->mmap.size = journal_data_size;
  321. journalfile->v2.not_needed_since_s = now_monotonic_sec();
  322. journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
  323. struct journal_v2_header *j2_header = journalfile->mmap.data;
  324. journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
  325. journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
  326. journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list);
  327. journalfile_v2_mounted_data_unmount(journalfile, true, true);
  328. spinlock_unlock(&journalfile->v2.spinlock);
  329. spinlock_unlock(&journalfile->mmap.spinlock);
  330. njfv2idx_add(journalfile->datafile);
  331. }
  332. static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
  333. njfv2idx_remove(journalfile->datafile);
  334. bool has_references = false;
  335. do {
  336. if (has_references)
  337. sleep_usec(10 * USEC_PER_MS);
  338. spinlock_lock(&journalfile->mmap.spinlock);
  339. spinlock_lock(&journalfile->v2.spinlock);
  340. if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
  341. if(journalfile->mmap.fd != -1)
  342. close(journalfile->mmap.fd);
  343. journalfile->mmap.fd = -1;
  344. journalfile->mmap.data = NULL;
  345. journalfile->mmap.size = 0;
  346. journalfile->v2.first_time_s = 0;
  347. journalfile->v2.last_time_s = 0;
  348. journalfile->v2.flags = 0;
  349. }
  350. else {
  351. has_references = true;
  352. internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
  353. }
  354. spinlock_unlock(&journalfile->v2.spinlock);
  355. spinlock_unlock(&journalfile->mmap.spinlock);
  356. } while(has_references);
  357. }
  358. struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafile *datafile)
  359. {
  360. struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
  361. journalfile->datafile = datafile;
  362. spinlock_init(&journalfile->mmap.spinlock);
  363. spinlock_init(&journalfile->v2.spinlock);
  364. spinlock_init(&journalfile->unsafe.spinlock);
  365. journalfile->mmap.fd = -1;
  366. datafile->journalfile = journalfile;
  367. return journalfile;
  368. }
  369. static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
  370. {
  371. int ret;
  372. char path[RRDENG_PATH_MAX];
  373. uv_fs_t req;
  374. ret = uv_fs_close(NULL, &req, file, NULL);
  375. if (ret < 0) {
  376. journalfile_v1_generate_path(datafile, path, sizeof(path));
  377. netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
  378. ctx_fs_error(datafile->ctx);
  379. }
  380. uv_fs_req_cleanup(&req);
  381. return ret;
  382. }
  383. int journalfile_close(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  384. {
  385. if(journalfile_v2_data_available(journalfile)) {
  386. journalfile_v2_data_unmap_permanently(journalfile);
  387. return 0;
  388. }
  389. return close_uv_file(datafile, journalfile->file);
  390. }
  391. int journalfile_unlink(struct rrdengine_journalfile *journalfile)
  392. {
  393. struct rrdengine_datafile *datafile = journalfile->datafile;
  394. struct rrdengine_instance *ctx = datafile->ctx;
  395. uv_fs_t req;
  396. int ret;
  397. char path[RRDENG_PATH_MAX];
  398. journalfile_v1_generate_path(datafile, path, sizeof(path));
  399. ret = uv_fs_unlink(NULL, &req, path, NULL);
  400. if (ret < 0) {
  401. netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  402. ctx_fs_error(ctx);
  403. }
  404. uv_fs_req_cleanup(&req);
  405. __atomic_add_fetch(&ctx->stats.journalfile_deletions, 1, __ATOMIC_RELAXED);
  406. return ret;
  407. }
  408. int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  409. {
  410. struct rrdengine_instance *ctx = datafile->ctx;
  411. uv_fs_t req;
  412. int ret;
  413. char path[RRDENG_PATH_MAX];
  414. char path_v2[RRDENG_PATH_MAX];
  415. journalfile_v1_generate_path(datafile, path, sizeof(path));
  416. journalfile_v2_generate_path(datafile, path_v2, sizeof(path));
  417. if (journalfile->file) {
  418. ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
  419. if (ret < 0) {
  420. netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
  421. ctx_fs_error(ctx);
  422. }
  423. uv_fs_req_cleanup(&req);
  424. (void) close_uv_file(datafile, journalfile->file);
  425. }
  426. // This is the new journal v2 index file
  427. ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
  428. if (ret < 0) {
  429. netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  430. ctx_fs_error(ctx);
  431. }
  432. uv_fs_req_cleanup(&req);
  433. ret = uv_fs_unlink(NULL, &req, path, NULL);
  434. if (ret < 0) {
  435. netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
  436. ctx_fs_error(ctx);
  437. }
  438. uv_fs_req_cleanup(&req);
  439. __atomic_add_fetch(&ctx->stats.journalfile_deletions, 2, __ATOMIC_RELAXED);
  440. if(journalfile_v2_data_available(journalfile))
  441. journalfile_v2_data_unmap_permanently(journalfile);
  442. return ret;
  443. }
  444. int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  445. {
  446. struct rrdengine_instance *ctx = datafile->ctx;
  447. uv_fs_t req;
  448. uv_file file;
  449. int ret, fd;
  450. struct rrdeng_jf_sb *superblock;
  451. uv_buf_t iov;
  452. char path[RRDENG_PATH_MAX];
  453. journalfile_v1_generate_path(datafile, path, sizeof(path));
  454. fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io);
  455. if (fd < 0) {
  456. ctx_fs_error(ctx);
  457. return fd;
  458. }
  459. journalfile->file = file;
  460. __atomic_add_fetch(&ctx->stats.journalfile_creations, 1, __ATOMIC_RELAXED);
  461. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  462. if (unlikely(ret)) {
  463. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  464. }
  465. memset(superblock, 0, sizeof(*superblock));
  466. (void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
  467. (void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
  468. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  469. ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
  470. if (ret < 0) {
  471. fatal_assert(req.result < 0);
  472. netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
  473. ctx_io_error(ctx);
  474. }
  475. uv_fs_req_cleanup(&req);
  476. posix_memfree(superblock);
  477. if (ret < 0) {
  478. journalfile_destroy_unsafe(journalfile, datafile);
  479. return ret;
  480. }
  481. journalfile->unsafe.pos = sizeof(*superblock);
  482. ctx_io_write_op_bytes(ctx, sizeof(*superblock));
  483. return 0;
  484. }
  485. static int journalfile_check_superblock(uv_file file)
  486. {
  487. int ret;
  488. struct rrdeng_jf_sb *superblock;
  489. uv_buf_t iov;
  490. uv_fs_t req;
  491. ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
  492. if (unlikely(ret)) {
  493. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  494. }
  495. iov = uv_buf_init((void *)superblock, sizeof(*superblock));
  496. ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
  497. if (ret < 0) {
  498. netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
  499. uv_fs_req_cleanup(&req);
  500. goto error;
  501. }
  502. fatal_assert(req.result >= 0);
  503. uv_fs_req_cleanup(&req);
  504. if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
  505. strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
  506. netdata_log_error("DBENGINE: File has invalid superblock.");
  507. ret = UV_EINVAL;
  508. } else {
  509. ret = 0;
  510. }
  511. error:
  512. posix_memfree(superblock);
  513. return ret;
  514. }
  515. static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
  516. {
  517. static BITMAP256 page_error_map = BITMAP256_INITIALIZER;
  518. unsigned i, count, payload_length, descr_size;
  519. struct rrdeng_jf_store_data *jf_metric_data;
  520. jf_metric_data = buf;
  521. count = jf_metric_data->number_of_pages;
  522. descr_size = sizeof(*jf_metric_data->descr) * count;
  523. payload_length = sizeof(*jf_metric_data) + descr_size;
  524. if (payload_length > max_size) {
  525. netdata_log_error("DBENGINE: corrupted transaction payload.");
  526. return;
  527. }
  528. time_t now_s = max_acceptable_collected_time();
  529. for (i = 0; i < count ; ++i) {
  530. uuid_t *temp_id;
  531. uint8_t page_type = jf_metric_data->descr[i].type;
  532. if (page_type > PAGE_TYPE_MAX) {
  533. if (!bitmap256_get_bit(&page_error_map, page_type)) {
  534. netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type);
  535. bitmap256_set_bit(&page_error_map, page_type, 1);
  536. }
  537. continue;
  538. }
  539. temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
  540. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, temp_id, (Word_t) ctx);
  541. struct rrdeng_extent_page_descr *descr = &jf_metric_data->descr[i];
  542. VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
  543. descr, now_s,
  544. (metric) ? mrg_metric_get_update_every_s(main_mrg, metric) : 0,
  545. false);
  546. if(!vd.is_valid) {
  547. if(metric)
  548. mrg_metric_release(main_mrg, metric);
  549. continue;
  550. }
  551. bool update_metric_time = true;
  552. if (!metric) {
  553. MRG_ENTRY entry = {
  554. .uuid = temp_id,
  555. .section = (Word_t)ctx,
  556. .first_time_s = vd.start_time_s,
  557. .last_time_s = vd.end_time_s,
  558. .latest_update_every_s = (uint32_t) vd.update_every_s,
  559. };
  560. bool added;
  561. metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
  562. if(added)
  563. update_metric_time = false;
  564. }
  565. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  566. if (update_metric_time)
  567. mrg_metric_expand_retention(main_mrg, metric, vd.start_time_s, vd.end_time_s, vd.update_every_s);
  568. pgc_open_add_hot_page(
  569. (Word_t)ctx, metric_id, vd.start_time_s, vd.end_time_s, vd.update_every_s,
  570. journalfile->datafile,
  571. jf_metric_data->extent_offset, jf_metric_data->extent_size, jf_metric_data->descr[i].page_length);
  572. mrg_metric_release(main_mrg, metric);
  573. }
  574. }
  575. /*
  576. * Replays transaction by interpreting up to max_size bytes from buf.
  577. * Sets id to the current transaction id or to 0 if unknown.
  578. * Returns size of transaction record or 0 for unknown size.
  579. */
  580. static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
  581. void *buf, uint64_t *id, unsigned max_size)
  582. {
  583. unsigned payload_length, size_bytes;
  584. int ret;
  585. /* persistent structures */
  586. struct rrdeng_jf_transaction_header *jf_header;
  587. struct rrdeng_jf_transaction_trailer *jf_trailer;
  588. uLong crc;
  589. *id = 0;
  590. jf_header = buf;
  591. if (STORE_PADDING == jf_header->type) {
  592. netdata_log_debug(D_RRDENGINE, "Skipping padding.");
  593. return 0;
  594. }
  595. if (sizeof(*jf_header) > max_size) {
  596. netdata_log_error("DBENGINE: corrupted transaction record, skipping.");
  597. return 0;
  598. }
  599. *id = jf_header->id;
  600. payload_length = jf_header->payload_length;
  601. size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
  602. if (size_bytes > max_size) {
  603. netdata_log_error("DBENGINE: corrupted transaction record, skipping.");
  604. return 0;
  605. }
  606. jf_trailer = buf + sizeof(*jf_header) + payload_length;
  607. crc = crc32(0L, Z_NULL, 0);
  608. crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
  609. ret = crc32cmp(jf_trailer->checksum, crc);
  610. netdata_log_debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
  611. if (unlikely(ret)) {
  612. netdata_log_error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
  613. return size_bytes;
  614. }
  615. switch (jf_header->type) {
  616. case STORE_DATA:
  617. netdata_log_debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
  618. journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
  619. break;
  620. default:
  621. netdata_log_error("DBENGINE: unknown transaction type, skipping record.");
  622. break;
  623. }
  624. return size_bytes;
  625. }
  626. #define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
  627. /*
  628. * Iterates journal file transactions and populates the page cache.
  629. * Page cache must already be initialized.
  630. * Returns the maximum transaction id it discovered.
  631. */
  632. static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
  633. {
  634. uv_file file;
  635. uint64_t file_size;
  636. int ret;
  637. uint64_t pos, pos_i, max_id, id;
  638. unsigned size_bytes;
  639. void *buf;
  640. uv_buf_t iov;
  641. uv_fs_t req;
  642. file = journalfile->file;
  643. file_size = journalfile->unsafe.pos;
  644. max_id = 1;
  645. ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
  646. if (unlikely(ret))
  647. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  648. for (pos = sizeof(struct rrdeng_jf_sb); pos < file_size; pos += READAHEAD_BYTES) {
  649. size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
  650. iov = uv_buf_init(buf, size_bytes);
  651. ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
  652. if (ret < 0) {
  653. netdata_log_error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
  654. uv_fs_req_cleanup(&req);
  655. goto skip_file;
  656. }
  657. fatal_assert(req.result >= 0);
  658. uv_fs_req_cleanup(&req);
  659. ctx_io_read_op_bytes(ctx, size_bytes);
  660. for (pos_i = 0; pos_i < size_bytes;) {
  661. unsigned max_size;
  662. max_size = pos + size_bytes - pos_i;
  663. ret = journalfile_replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
  664. if (!ret) /* TODO: support transactions bigger than 4K */
  665. /* unknown transaction size, move on to the next block */
  666. pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
  667. else
  668. pos_i += ret;
  669. max_id = MAX(max_id, id);
  670. }
  671. }
  672. skip_file:
  673. posix_memfree(buf);
  674. return max_id;
  675. }
  676. // Checks that the extent list checksum is valid
  677. static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
  678. {
  679. UNUSED(file_size);
  680. uLong crc;
  681. struct journal_v2_header *j2_header = (void *) data_start;
  682. struct journal_v2_block_trailer *journal_v2_trailer;
  683. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->extent_trailer_offset);
  684. crc = crc32(0L, Z_NULL, 0);
  685. crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
  686. if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
  687. netdata_log_error("DBENGINE: extent list CRC32 check: FAILED");
  688. return 1;
  689. }
  690. return 0;
  691. }
  692. // Checks that the metric list (UUIDs) checksum is valid
  693. static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
  694. {
  695. UNUSED(file_size);
  696. uLong crc;
  697. struct journal_v2_header *j2_header = (void *) data_start;
  698. struct journal_v2_block_trailer *journal_v2_trailer;
  699. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + j2_header->metric_trailer_offset);
  700. crc = crc32(0L, Z_NULL, 0);
  701. crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
  702. if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
  703. netdata_log_error("DBENGINE: metric list CRC32 check: FAILED");
  704. return 1;
  705. }
  706. return 0;
  707. }
  708. //
  709. // Return
  710. // 0 Ok
  711. // 1 Invalid
  712. // 2 Force rebuild
  713. // 3 skip
  714. static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size, size_t journal_v1_file_size)
  715. {
  716. int rc;
  717. uLong crc;
  718. struct journal_v2_header *j2_header = (void *) data_start;
  719. struct journal_v2_block_trailer *journal_v2_trailer;
  720. if (j2_header->magic == JOURVAL_V2_REBUILD_MAGIC)
  721. return 2;
  722. if (j2_header->magic == JOURVAL_V2_SKIP_MAGIC)
  723. return 3;
  724. // Magic failure
  725. if (j2_header->magic != JOURVAL_V2_MAGIC)
  726. return 1;
  727. if (j2_header->journal_v2_file_size != journal_v2_file_size)
  728. return 1;
  729. if (journal_v1_file_size && j2_header->journal_v1_file_size != journal_v1_file_size)
  730. return 1;
  731. journal_v2_trailer = (struct journal_v2_block_trailer *) ((uint8_t *) data_start + journal_v2_file_size - sizeof(*journal_v2_trailer));
  732. crc = crc32(0L, Z_NULL, 0);
  733. crc = crc32(crc, (void *) j2_header, sizeof(*j2_header));
  734. rc = crc32cmp(journal_v2_trailer->checksum, crc);
  735. if (unlikely(rc)) {
  736. netdata_log_error("DBENGINE: file CRC32 check: FAILED");
  737. return 1;
  738. }
  739. rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
  740. if (rc) return 1;
  741. if (!db_engine_journal_check)
  742. return 0;
  743. rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
  744. if (rc) return 1;
  745. // Verify complete UUID chain
  746. struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
  747. unsigned verified = 0;
  748. unsigned entries;
  749. unsigned total_pages = 0;
  750. netdata_log_info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
  751. for (entries = 0; entries < j2_header->metric_count; entries++) {
  752. char uuid_str[UUID_STR_LEN];
  753. uuid_unparse_lower(metric->uuid, uuid_str);
  754. struct journal_page_header *metric_list_header = (void *) (data_start + metric->page_offset);
  755. struct journal_page_header local_metric_list_header = *metric_list_header;
  756. local_metric_list_header.crc = JOURVAL_V2_MAGIC;
  757. crc = crc32(0L, Z_NULL, 0);
  758. crc = crc32(crc, (void *) &local_metric_list_header, sizeof(local_metric_list_header));
  759. rc = crc32cmp(metric_list_header->checksum, crc);
  760. if (!rc) {
  761. struct journal_v2_block_trailer *journal_trailer =
  762. (void *) data_start + metric->page_offset + sizeof(struct journal_page_header) + (metric_list_header->entries * sizeof(struct journal_page_list));
  763. crc = crc32(0L, Z_NULL, 0);
  764. crc = crc32(crc, (uint8_t *) metric_list_header + sizeof(struct journal_page_header), metric_list_header->entries * sizeof(struct journal_page_list));
  765. rc = crc32cmp(journal_trailer->checksum, crc);
  766. internal_error(rc, "DBENGINE: index %u : %s entries %u at offset %u verified, DATA CRC computed %lu, stored %u", entries, uuid_str, metric->entries, metric->page_offset,
  767. crc, metric_list_header->crc);
  768. if (!rc) {
  769. total_pages += metric_list_header->entries;
  770. verified++;
  771. }
  772. }
  773. metric++;
  774. if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) {
  775. netdata_log_info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
  776. return 1;
  777. }
  778. }
  779. if (entries != verified) {
  780. netdata_log_info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
  781. return 1;
  782. }
  783. netdata_log_info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
  784. return 0;
  785. }
  786. void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile) {
  787. usec_t started_ut = now_monotonic_usec();
  788. size_t data_size = 0;
  789. struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, &data_size, 0, 0);
  790. if(!j2_header)
  791. return;
  792. uint8_t *data_start = (uint8_t *)j2_header;
  793. uint32_t entries = j2_header->metric_count;
  794. if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) {
  795. journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK;
  796. if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) {
  797. journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE;
  798. // needs rebuild
  799. return;
  800. }
  801. }
  802. struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
  803. time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
  804. time_t global_first_time_s = header_start_time_s;
  805. time_t now_s = max_acceptable_collected_time();
  806. for (size_t i=0; i < entries; i++) {
  807. time_t start_time_s = header_start_time_s + metric->delta_start_s;
  808. time_t end_time_s = header_start_time_s + metric->delta_end_s;
  809. mrg_update_metric_retention_and_granularity_by_uuid(
  810. main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
  811. metric++;
  812. }
  813. journalfile_v2_data_release(journalfile);
  814. usec_t ended_ut = now_monotonic_usec();
  815. netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
  816. , ctx->config.tier, journalfile->datafile->fileno
  817. , (double)data_size / 1024 / 1024
  818. , (double)entries / 1000
  819. , ((double)(ended_ut - started_ut) / USEC_PER_MS)
  820. );
  821. time_t old = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);;
  822. do {
  823. if(old <= global_first_time_s)
  824. break;
  825. } while(!__atomic_compare_exchange_n(&ctx->atomic.first_time_s, &old, global_first_time_s, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
  826. }
  827. int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
  828. {
  829. int ret, fd;
  830. char path_v1[RRDENG_PATH_MAX];
  831. char path_v2[RRDENG_PATH_MAX];
  832. struct stat statbuf;
  833. size_t journal_v1_file_size = 0;
  834. size_t journal_v2_file_size;
  835. journalfile_v1_generate_path(datafile, path_v1, sizeof(path_v1));
  836. ret = stat(path_v1, &statbuf);
  837. if (!ret)
  838. journal_v1_file_size = (uint32_t)statbuf.st_size;
  839. journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2));
  840. fd = open(path_v2, O_RDONLY);
  841. if (fd < 0) {
  842. if (errno == ENOENT)
  843. return 1;
  844. ctx_fs_error(ctx);
  845. netdata_log_error("DBENGINE: failed to open '%s'", path_v2);
  846. return 1;
  847. }
  848. ret = fstat(fd, &statbuf);
  849. if (ret) {
  850. netdata_log_error("DBENGINE: failed to get file information for '%s'", path_v2);
  851. close(fd);
  852. return 1;
  853. }
  854. journal_v2_file_size = (size_t)statbuf.st_size;
  855. if (journal_v2_file_size < sizeof(struct journal_v2_header)) {
  856. error_report("Invalid file %s. Not the expected size", path_v2);
  857. close(fd);
  858. return 1;
  859. }
  860. usec_t mmap_start_ut = now_monotonic_usec();
  861. uint8_t *data_start = mmap(NULL, journal_v2_file_size, PROT_READ, MAP_SHARED, fd, 0);
  862. if (data_start == MAP_FAILED) {
  863. close(fd);
  864. return 1;
  865. }
  866. netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2);
  867. usec_t validation_start_ut = now_monotonic_usec();
  868. int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
  869. if (unlikely(rc)) {
  870. if (rc == 2)
  871. error_report("File %s needs to be rebuilt", path_v2);
  872. else if (rc == 3)
  873. error_report("File %s will be skipped", path_v2);
  874. else
  875. error_report("File %s is invalid and it will be rebuilt", path_v2);
  876. if (unlikely(munmap(data_start, journal_v2_file_size)))
  877. netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2);
  878. close(fd);
  879. return rc;
  880. }
  881. struct journal_v2_header *j2_header = (void *) data_start;
  882. uint32_t entries = j2_header->metric_count;
  883. if (unlikely(!entries)) {
  884. if (unlikely(munmap(data_start, journal_v2_file_size)))
  885. netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2);
  886. close(fd);
  887. return 1;
  888. }
  889. usec_t finished_ut = now_monotonic_usec();
  890. netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
  891. "mmap: %0.2f ms, validate: %0.2f ms"
  892. , path_v2
  893. , (double)journal_v2_file_size / 1024 / 1024
  894. , (double)entries / 1000
  895. , ((double)(validation_start_ut - mmap_start_ut) / USEC_PER_MS)
  896. , ((double)(finished_ut - validation_start_ut) / USEC_PER_MS)
  897. );
  898. // Initialize the journal file to be able to access the data
  899. if (!db_engine_journal_check)
  900. journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK;
  901. journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
  902. ctx_current_disk_space_increase(ctx, journal_v2_file_size);
  903. // File is OK load it
  904. return 0;
  905. }
  906. struct journal_metric_list_to_sort {
  907. struct jv2_metrics_info *metric_info;
  908. };
  909. static int journalfile_metric_compare (const void *item1, const void *item2)
  910. {
  911. const struct jv2_metrics_info *metric1 = ((struct journal_metric_list_to_sort *) item1)->metric_info;
  912. const struct jv2_metrics_info *metric2 = ((struct journal_metric_list_to_sort *) item2)->metric_info;
  913. return memcmp(metric1->uuid, metric2->uuid, sizeof(uuid_t));
  914. }
  915. // Write list of extents for the journalfile
  916. void *journalfile_v2_write_extent_list(Pvoid_t JudyL_extents_pos, void *data)
  917. {
  918. Pvoid_t *PValue;
  919. struct journal_extent_list *j2_extent_base = (void *) data;
  920. struct jv2_extents_info *ext_info;
  921. bool first = true;
  922. Word_t pos = 0;
  923. size_t count = 0;
  924. while ((PValue = JudyLFirstThenNext(JudyL_extents_pos, &pos, &first))) {
  925. ext_info = *PValue;
  926. size_t index = ext_info->index;
  927. j2_extent_base[index].file_index = 0;
  928. j2_extent_base[index].datafile_offset = ext_info->pos;
  929. j2_extent_base[index].datafile_size = ext_info->bytes;
  930. j2_extent_base[index].pages = ext_info->number_of_pages;
  931. count++;
  932. }
  933. return j2_extent_base + count;
  934. }
  935. static int journalfile_verify_space(struct journal_v2_header *j2_header, void *data, uint32_t bytes)
  936. {
  937. if ((unsigned long)(((uint8_t *) data - (uint8_t *) j2_header->data) + bytes) > (j2_header->journal_v2_file_size - sizeof(struct journal_v2_block_trailer)))
  938. return 1;
  939. return 0;
  940. }
  941. void *journalfile_v2_write_metric_page(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info, uint32_t pages_offset)
  942. {
  943. struct journal_metric_list *metric = (void *) data;
  944. if (journalfile_verify_space(j2_header, data, sizeof(*metric)))
  945. return NULL;
  946. uuid_copy(metric->uuid, *metric_info->uuid);
  947. metric->entries = metric_info->number_of_pages;
  948. metric->page_offset = pages_offset;
  949. metric->delta_start_s = (uint32_t)(metric_info->first_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
  950. metric->delta_end_s = (uint32_t)(metric_info->last_time_s - (time_t)(j2_header->start_time_ut / USEC_PER_SEC));
  951. metric->update_every_s = 0;
  952. return ++metric;
  953. }
  954. void *journalfile_v2_write_data_page_header(struct journal_v2_header *j2_header __maybe_unused, void *data, struct jv2_metrics_info *metric_info, uint32_t uuid_offset)
  955. {
  956. struct journal_page_header *data_page_header = (void *) data;
  957. uLong crc;
  958. uuid_copy(data_page_header->uuid, *metric_info->uuid);
  959. data_page_header->entries = metric_info->number_of_pages;
  960. data_page_header->uuid_offset = uuid_offset; // data header OFFSET poings to METRIC in the directory
  961. data_page_header->crc = JOURVAL_V2_MAGIC;
  962. crc = crc32(0L, Z_NULL, 0);
  963. crc = crc32(crc, (void *) data_page_header, sizeof(*data_page_header));
  964. crc32set(data_page_header->checksum, crc);
  965. return ++data_page_header;
  966. }
  967. void *journalfile_v2_write_data_page_trailer(struct journal_v2_header *j2_header __maybe_unused, void *data, void *page_header)
  968. {
  969. struct journal_page_header *data_page_header = (void *) page_header;
  970. struct journal_v2_block_trailer *journal_trailer = (void *) data;
  971. uLong crc;
  972. crc = crc32(0L, Z_NULL, 0);
  973. crc = crc32(crc, (uint8_t *) page_header + sizeof(struct journal_page_header), data_page_header->entries * sizeof(struct journal_page_list));
  974. crc32set(journal_trailer->checksum, crc);
  975. return ++journal_trailer;
  976. }
  977. void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *data, struct jv2_page_info *page_info)
  978. {
  979. struct journal_page_list *data_page = data;
  980. if (journalfile_verify_space(j2_header, data, sizeof(*data_page)))
  981. return NULL;
  982. struct extent_io_data *ei = page_info->custom_data;
  983. data_page->delta_start_s = (uint32_t) (page_info->start_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
  984. data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
  985. data_page->extent_index = page_info->extent_index;
  986. data_page->update_every_s = (uint32_t) page_info->update_every_s;
  987. data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
  988. data_page->type = 0;
  989. return ++data_page;
  990. }
  991. // Must be recorded in metric_info->entries
  992. static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_header, void *data, struct jv2_metrics_info *metric_info,
  993. struct journal_metric_list *current_metric)
  994. {
  995. Pvoid_t *PValue;
  996. struct journal_page_list *data_page = (void *)data;
  997. // We need to write all descriptors with index metric_info->min_index_time_s, metric_info->max_index_time_s
  998. // that belong to this journal file
  999. Pvoid_t JudyL_array = metric_info->JudyL_pages_by_start_time;
  1000. Word_t index_time = 0;
  1001. bool first = true;
  1002. struct jv2_page_info *page_info;
  1003. uint32_t update_every_s = 0;
  1004. while ((PValue = JudyLFirstThenNext(JudyL_array, &index_time, &first))) {
  1005. page_info = *PValue;
  1006. // Write one descriptor and return the next data page location
  1007. data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
  1008. update_every_s = (uint32_t) page_info->update_every_s;
  1009. if (NULL == data_page)
  1010. break;
  1011. }
  1012. current_metric->update_every_s = update_every_s;
  1013. return data_page;
  1014. }
  1015. // Migrate the journalfile pointed by datafile
  1016. // activate : make the new file active immediately
  1017. // journafile data will be set and descriptors (if deleted) will be repopulated as needed
  1018. // startup : if the migration is done during agent startup
  1019. // this will allow us to optimize certain things
  1020. void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno __maybe_unused, uint8_t type __maybe_unused,
  1021. Pvoid_t JudyL_metrics, Pvoid_t JudyL_extents_pos,
  1022. size_t number_of_extents, size_t number_of_metrics, size_t number_of_pages, void *user_data)
  1023. {
  1024. char path[RRDENG_PATH_MAX];
  1025. Pvoid_t *PValue;
  1026. struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
  1027. struct rrdengine_journalfile *journalfile = (struct rrdengine_journalfile *) user_data;
  1028. struct rrdengine_datafile *datafile = journalfile->datafile;
  1029. time_t min_time_s = LONG_MAX;
  1030. time_t max_time_s = 0;
  1031. struct jv2_metrics_info *metric_info;
  1032. journalfile_v2_generate_path(datafile, path, sizeof(path));
  1033. netdata_log_info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
  1034. path,
  1035. number_of_extents,
  1036. number_of_metrics,
  1037. number_of_pages);
  1038. #ifdef NETDATA_INTERNAL_CHECKS
  1039. usec_t start_loading = now_monotonic_usec();
  1040. #endif
  1041. size_t total_file_size = 0;
  1042. total_file_size += (sizeof(struct journal_v2_header) + JOURNAL_V2_HEADER_PADDING_SZ);
  1043. // Extents will start here
  1044. uint32_t extent_offset = total_file_size;
  1045. total_file_size += (number_of_extents * sizeof(struct journal_extent_list));
  1046. uint32_t extent_offset_trailer = total_file_size;
  1047. total_file_size += sizeof(struct journal_v2_block_trailer);
  1048. // UUID list will start here
  1049. uint32_t metrics_offset = total_file_size;
  1050. total_file_size += (number_of_metrics * sizeof(struct journal_metric_list));
  1051. // UUID list trailer
  1052. uint32_t metric_offset_trailer = total_file_size;
  1053. total_file_size += sizeof(struct journal_v2_block_trailer);
  1054. // descr @ time will start here
  1055. uint32_t pages_offset = total_file_size;
  1056. total_file_size += (number_of_pages * (sizeof(struct journal_page_list) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer)));
  1057. // File trailer
  1058. uint32_t trailer_offset = total_file_size;
  1059. total_file_size += sizeof(struct journal_v2_block_trailer);
  1060. int fd_v2;
  1061. uint8_t *data_start = netdata_mmap(path, total_file_size, MAP_SHARED, 0, false, &fd_v2);
  1062. uint8_t *data = data_start;
  1063. memset(data_start, 0, extent_offset);
  1064. // Write header
  1065. struct journal_v2_header j2_header;
  1066. memset(&j2_header, 0, sizeof(j2_header));
  1067. j2_header.magic = JOURVAL_V2_MAGIC;
  1068. j2_header.start_time_ut = 0;
  1069. j2_header.end_time_ut = 0;
  1070. j2_header.extent_count = number_of_extents;
  1071. j2_header.extent_offset = extent_offset;
  1072. j2_header.metric_count = number_of_metrics;
  1073. j2_header.metric_offset = metrics_offset;
  1074. j2_header.page_count = number_of_pages;
  1075. j2_header.page_offset = pages_offset;
  1076. j2_header.extent_trailer_offset = extent_offset_trailer;
  1077. j2_header.metric_trailer_offset = metric_offset_trailer;
  1078. j2_header.journal_v2_file_size = total_file_size;
  1079. j2_header.journal_v1_file_size = (uint32_t)journalfile_current_size(journalfile);
  1080. j2_header.data = data_start; // Used during migration
  1081. struct journal_v2_block_trailer *journal_v2_trailer;
  1082. data = journalfile_v2_write_extent_list(JudyL_extents_pos, data_start + extent_offset);
  1083. internal_error(true, "DBENGINE: write extent list so far %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1084. fatal_assert(data == data_start + extent_offset_trailer);
  1085. // Calculate CRC for extents
  1086. journal_v2_trailer = (struct journal_v2_block_trailer *) (data_start + extent_offset_trailer);
  1087. uLong crc;
  1088. crc = crc32(0L, Z_NULL, 0);
  1089. crc = crc32(crc, (uint8_t *) data_start + extent_offset, number_of_extents * sizeof(struct journal_extent_list));
  1090. crc32set(journal_v2_trailer->checksum, crc);
  1091. internal_error(true, "DBENGINE: CALCULATE CRC FOR EXTENT %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1092. // Skip the trailer, point to the metrics off
  1093. data += sizeof(struct journal_v2_block_trailer);
  1094. // Sanity check -- we must be at the metrics_offset
  1095. fatal_assert(data == data_start + metrics_offset);
  1096. // Allocate array to sort UUIDs and keep them sorted in the journal because we want to do binary search when we do lookups
  1097. struct journal_metric_list_to_sort *uuid_list = mallocz(number_of_metrics * sizeof(struct journal_metric_list_to_sort));
  1098. Word_t Index = 0;
  1099. size_t count = 0;
  1100. bool first_then_next = true;
  1101. while ((PValue = JudyLFirstThenNext(JudyL_metrics, &Index, &first_then_next))) {
  1102. metric_info = *PValue;
  1103. fatal_assert(count < number_of_metrics);
  1104. uuid_list[count++].metric_info = metric_info;
  1105. min_time_s = MIN(min_time_s, metric_info->first_time_s);
  1106. max_time_s = MAX(max_time_s, metric_info->last_time_s);
  1107. }
  1108. // Store in the header
  1109. j2_header.start_time_ut = min_time_s * USEC_PER_SEC;
  1110. j2_header.end_time_ut = max_time_s * USEC_PER_SEC;
  1111. qsort(&uuid_list[0], number_of_metrics, sizeof(struct journal_metric_list_to_sort), journalfile_metric_compare);
  1112. internal_error(true, "DBENGINE: traverse and qsort UUID %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1113. uint32_t resize_file_to = total_file_size;
  1114. for (Index = 0; Index < number_of_metrics; Index++) {
  1115. metric_info = uuid_list[Index].metric_info;
  1116. // Calculate current UUID offset from start of file. We will store this in the data page header
  1117. uint32_t uuid_offset = data - data_start;
  1118. struct journal_metric_list *current_metric = (void *) data;
  1119. // Write the UUID we are processing
  1120. data = (void *) journalfile_v2_write_metric_page(&j2_header, data, metric_info, pages_offset);
  1121. if (unlikely(!data))
  1122. break;
  1123. // Next we will write
  1124. // Header
  1125. // Detailed entries (descr @ time)
  1126. // Trailer (checksum)
  1127. // Keep the page_list_header, to be used for migration when where agent is running
  1128. metric_info->page_list_header = pages_offset;
  1129. // Write page header
  1130. void *metric_page = journalfile_v2_write_data_page_header(&j2_header, data_start + pages_offset, metric_info,
  1131. uuid_offset);
  1132. // Start writing descr @ time
  1133. void *page_trailer = journalfile_v2_write_descriptors(&j2_header, metric_page, metric_info, current_metric);
  1134. if (unlikely(!page_trailer))
  1135. break;
  1136. // Trailer (checksum)
  1137. uint8_t *next_page_address = journalfile_v2_write_data_page_trailer(&j2_header, page_trailer,
  1138. data_start + pages_offset);
  1139. // Calculate start of the pages start for next descriptor
  1140. pages_offset += (metric_info->number_of_pages * (sizeof(struct journal_page_list)) + sizeof(struct journal_page_header) + sizeof(struct journal_v2_block_trailer));
  1141. // Verify we are at the right location
  1142. if (pages_offset != (uint32_t)(next_page_address - data_start)) {
  1143. // make sure checks fail so that we abort
  1144. data = data_start;
  1145. break;
  1146. }
  1147. }
  1148. if (data == data_start + metric_offset_trailer) {
  1149. internal_error(true, "DBENGINE: WRITE METRICS AND PAGES %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1150. // Calculate CRC for metrics
  1151. journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + metric_offset_trailer);
  1152. crc = crc32(0L, Z_NULL, 0);
  1153. crc =
  1154. crc32(crc, (uint8_t *)data_start + metrics_offset, number_of_metrics * sizeof(struct journal_metric_list));
  1155. crc32set(journal_v2_trailer->checksum, crc);
  1156. internal_error(true, "DBENGINE: CALCULATE CRC FOR UUIDs %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1157. // Prepare to write checksum for the file
  1158. j2_header.data = NULL;
  1159. journal_v2_trailer = (struct journal_v2_block_trailer *)(data_start + trailer_offset);
  1160. crc = crc32(0L, Z_NULL, 0);
  1161. crc = crc32(crc, (void *)&j2_header, sizeof(j2_header));
  1162. crc32set(journal_v2_trailer->checksum, crc);
  1163. // Write header to the file
  1164. memcpy(data_start, &j2_header, sizeof(j2_header));
  1165. internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1166. netdata_log_info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
  1167. // msync(data_start, total_file_size, MS_SYNC);
  1168. journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
  1169. internal_error(true, "DBENGINE: ACTIVATING NEW INDEX JNL %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
  1170. ctx_current_disk_space_increase(ctx, total_file_size);
  1171. freez(uuid_list);
  1172. return;
  1173. }
  1174. else {
  1175. netdata_log_info("DBENGINE: failed to build index '%s', file will be skipped", path);
  1176. j2_header.data = NULL;
  1177. j2_header.magic = JOURVAL_V2_SKIP_MAGIC;
  1178. memcpy(data_start, &j2_header, sizeof(j2_header));
  1179. resize_file_to = sizeof(j2_header);
  1180. }
  1181. netdata_munmap(data_start, total_file_size);
  1182. freez(uuid_list);
  1183. if (likely(resize_file_to == total_file_size))
  1184. return;
  1185. int ret = truncate(path, (long) resize_file_to);
  1186. if (ret < 0) {
  1187. ctx_current_disk_space_increase(ctx, total_file_size);
  1188. ctx_fs_error(ctx);
  1189. netdata_log_error("DBENGINE: failed to resize file '%s'", path);
  1190. }
  1191. else
  1192. ctx_current_disk_space_increase(ctx, resize_file_to);
  1193. }
  1194. int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
  1195. struct rrdengine_datafile *datafile)
  1196. {
  1197. uv_fs_t req;
  1198. uv_file file;
  1199. int ret, fd, error;
  1200. uint64_t file_size, max_id;
  1201. char path[RRDENG_PATH_MAX];
  1202. bool loaded_v2 = false;
  1203. // Do not try to load jv2 of the latest file
  1204. if (datafile->fileno != ctx_last_fileno_get(ctx))
  1205. loaded_v2 = journalfile_v2_load(ctx, journalfile, datafile) == 0;
  1206. journalfile_v1_generate_path(datafile, path, sizeof(path));
  1207. fd = open_file_for_io(path, O_RDWR, &file, use_direct_io);
  1208. if (fd < 0) {
  1209. ctx_fs_error(ctx);
  1210. if(loaded_v2)
  1211. return 0;
  1212. return fd;
  1213. }
  1214. ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
  1215. if (ret) {
  1216. error = ret;
  1217. goto cleanup;
  1218. }
  1219. if(loaded_v2) {
  1220. journalfile->unsafe.pos = file_size;
  1221. error = 0;
  1222. goto cleanup;
  1223. }
  1224. file_size = ALIGN_BYTES_FLOOR(file_size);
  1225. journalfile->unsafe.pos = file_size;
  1226. journalfile->file = file;
  1227. ret = journalfile_check_superblock(file);
  1228. if (ret) {
  1229. netdata_log_info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
  1230. error = ret;
  1231. goto cleanup;
  1232. }
  1233. ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
  1234. netdata_log_info("DBENGINE: loading journal file '%s'", path);
  1235. max_id = journalfile_iterate_transactions(ctx, journalfile);
  1236. __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
  1237. netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
  1238. bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
  1239. if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
  1240. ctx->loading.create_new_datafile_pair = false;
  1241. return 0;
  1242. }
  1243. pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
  1244. journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
  1245. if (is_last_file)
  1246. ctx->loading.create_new_datafile_pair = true;
  1247. return 0;
  1248. cleanup:
  1249. ret = uv_fs_close(NULL, &req, file, NULL);
  1250. if (ret < 0) {
  1251. netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
  1252. ctx_fs_error(ctx);
  1253. }
  1254. uv_fs_req_cleanup(&req);
  1255. return error;
  1256. }