pagecache.c 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. MRG *main_mrg = NULL;
  5. PGC *main_cache = NULL;
  6. PGC *open_cache = NULL;
  7. PGC *extent_cache = NULL;
  8. struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats = {};
  9. static void main_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
  10. {
  11. // Release storage associated with the page
  12. dbengine_page_free(entry.data, entry.size);
  13. }
  14. static void main_cache_flush_dirty_page_init_callback(PGC *cache __maybe_unused, Word_t section) {
  15. struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
  16. // mark ctx as having flushing in progress
  17. __atomic_add_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
  18. }
  19. static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
  20. {
  21. if(!entries)
  22. return;
  23. struct rrdengine_instance *ctx = (struct rrdengine_instance *) entries_array[0].section;
  24. size_t bytes_per_point = CTX_POINT_SIZE_BYTES(ctx);
  25. struct page_descr_with_data *base = NULL;
  26. for (size_t Index = 0 ; Index < entries; Index++) {
  27. time_t start_time_s = entries_array[Index].start_time_s;
  28. time_t end_time_s = entries_array[Index].end_time_s;
  29. struct page_descr_with_data *descr = page_descriptor_get();
  30. descr->id = mrg_metric_uuid(main_mrg, (METRIC *) entries_array[Index].metric_id);
  31. descr->metric_id = entries_array[Index].metric_id;
  32. descr->start_time_ut = start_time_s * USEC_PER_SEC;
  33. descr->end_time_ut = end_time_s * USEC_PER_SEC;
  34. descr->update_every_s = entries_array[Index].update_every_s;
  35. descr->type = ctx->config.page_type;
  36. descr->page_length = (end_time_s - (start_time_s - descr->update_every_s)) / descr->update_every_s * bytes_per_point;
  37. if(descr->page_length > entries_array[Index].size) {
  38. descr->page_length = entries_array[Index].size;
  39. error_limit_static_global_var(erl, 1, 0);
  40. error_limit(&erl, "DBENGINE: page exceeds the maximum size, adjusting it to max.");
  41. }
  42. descr->page = pgc_page_data(pages_array[Index]);
  43. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, descr, link.prev, link.next);
  44. internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation");
  45. }
  46. struct completion completion;
  47. completion_init(&completion);
  48. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_WRITE, base, &completion, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  49. completion_wait_for(&completion);
  50. completion_destroy(&completion);
  51. }
  52. static void open_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
  53. {
  54. struct rrdengine_datafile *datafile = entry.data;
  55. datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
  56. }
  57. static void open_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
  58. {
  59. ;
  60. }
  61. static void extent_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
  62. {
  63. dbengine_extent_free(entry.data, entry.size);
  64. }
  65. static void extent_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
  66. {
  67. ;
  68. }
  69. inline TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s) {
  70. // page_first_time_s <= wanted_end_time_s && page_last_time_s >= wanted_start_time_s
  71. if(page_last_time_s < wanted_start_time_s)
  72. return PAGE_IS_IN_THE_PAST;
  73. if(page_first_time_s > wanted_end_time_s)
  74. return PAGE_IS_IN_THE_FUTURE;
  75. return PAGE_IS_IN_RANGE;
  76. }
  77. static inline struct page_details *pdc_find_page_for_time(
  78. Pcvoid_t PArray,
  79. time_t wanted_time_s,
  80. size_t *gaps,
  81. PDC_PAGE_STATUS mode,
  82. PDC_PAGE_STATUS skip_list
  83. ) {
  84. Word_t PIndexF = wanted_time_s, PIndexL = wanted_time_s;
  85. Pvoid_t *PValueF, *PValueL;
  86. struct page_details *pdF = NULL, *pdL = NULL;
  87. bool firstF = true, firstL = true;
  88. PDC_PAGE_STATUS ignore_list = PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | skip_list;
  89. while ((PValueF = PDCJudyLFirstThenNext(PArray, &PIndexF, &firstF))) {
  90. pdF = *PValueF;
  91. PDC_PAGE_STATUS status = __atomic_load_n(&pdF->status, __ATOMIC_ACQUIRE);
  92. if (!(status & (ignore_list | mode)))
  93. break;
  94. pdF = NULL;
  95. }
  96. while ((PValueL = PDCJudyLLastThenPrev(PArray, &PIndexL, &firstL))) {
  97. pdL = *PValueL;
  98. PDC_PAGE_STATUS status = __atomic_load_n(&pdL->status, __ATOMIC_ACQUIRE);
  99. if(status & mode) {
  100. // don't go all the way back to the beginning
  101. // stop at the last processed
  102. pdL = NULL;
  103. break;
  104. }
  105. if (!(status & ignore_list))
  106. break;
  107. pdL = NULL;
  108. }
  109. TIME_RANGE_COMPARE rcF = (pdF) ? is_page_in_time_range(pdF->first_time_s, pdF->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_FUTURE;
  110. TIME_RANGE_COMPARE rcL = (pdL) ? is_page_in_time_range(pdL->first_time_s, pdL->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_PAST;
  111. if (!pdF || pdF == pdL) {
  112. // F is missing, or they are the same
  113. // return L
  114. (*gaps) += (rcL == PAGE_IS_IN_RANGE) ? 0 : 1;
  115. return pdL;
  116. }
  117. if (!pdL) {
  118. // L is missing
  119. // return F
  120. (*gaps) += (rcF == PAGE_IS_IN_RANGE) ? 0 : 1;
  121. return pdF;
  122. }
  123. if (rcF == rcL) {
  124. // both are on the same side,
  125. // but they are different pages
  126. switch (rcF) {
  127. case PAGE_IS_IN_RANGE:
  128. // pick the higher resolution
  129. if (pdF->update_every_s && pdF->update_every_s < pdL->update_every_s)
  130. return pdF;
  131. if (pdL->update_every_s && pdL->update_every_s < pdF->update_every_s)
  132. return pdL;
  133. // same resolution - pick the one that starts earlier
  134. if (pdL->first_time_s < pdF->first_time_s)
  135. return pdL;
  136. return pdF;
  137. break;
  138. case PAGE_IS_IN_THE_FUTURE:
  139. (*gaps)++;
  140. // pick the one that starts earlier
  141. if (pdL->first_time_s < pdF->first_time_s)
  142. return pdL;
  143. return pdF;
  144. break;
  145. default:
  146. case PAGE_IS_IN_THE_PAST:
  147. (*gaps)++;
  148. return NULL;
  149. break;
  150. }
  151. }
  152. if(rcF == PAGE_IS_IN_RANGE) {
  153. // (*gaps) += 0;
  154. return pdF;
  155. }
  156. if(rcL == PAGE_IS_IN_RANGE) {
  157. // (*gaps) += 0;
  158. return pdL;
  159. }
  160. if(rcF == PAGE_IS_IN_THE_FUTURE) {
  161. (*gaps)++;
  162. return pdF;
  163. }
  164. if(rcL == PAGE_IS_IN_THE_FUTURE) {
  165. (*gaps)++;
  166. return pdL;
  167. }
  168. // impossible case
  169. (*gaps)++;
  170. return NULL;
  171. }
  172. static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengine_instance *ctx,
  173. time_t wanted_start_time_s, time_t wanted_end_time_s,
  174. Pvoid_t *JudyL_page_array, size_t *cache_gaps,
  175. bool open_cache_mode, PDC_PAGE_STATUS tags) {
  176. size_t pages_found_in_cache = 0;
  177. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  178. time_t now_s = wanted_start_time_s;
  179. time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
  180. if(!dt_s)
  181. dt_s = default_rrd_update_every;
  182. time_t previous_page_end_time_s = now_s - dt_s;
  183. bool first = true;
  184. do {
  185. PGC_PAGE *page = pgc_page_get_and_acquire(
  186. cache, (Word_t)ctx, (Word_t)metric_id, now_s,
  187. (first) ? PGC_SEARCH_CLOSEST : PGC_SEARCH_NEXT);
  188. first = false;
  189. if(!page) {
  190. if(previous_page_end_time_s < wanted_end_time_s)
  191. (*cache_gaps)++;
  192. break;
  193. }
  194. time_t page_start_time_s = pgc_page_start_time_s(page);
  195. time_t page_end_time_s = pgc_page_end_time_s(page);
  196. time_t page_update_every_s = pgc_page_update_every_s(page);
  197. size_t page_length = pgc_page_data_size(cache, page);
  198. if(!page_update_every_s)
  199. page_update_every_s = dt_s;
  200. if(is_page_in_time_range(page_start_time_s, page_end_time_s, wanted_start_time_s, wanted_end_time_s) != PAGE_IS_IN_RANGE) {
  201. // not a useful page for this query
  202. pgc_page_release(cache, page);
  203. page = NULL;
  204. if(previous_page_end_time_s < wanted_end_time_s)
  205. (*cache_gaps)++;
  206. break;
  207. }
  208. if (page_start_time_s - previous_page_end_time_s > dt_s)
  209. (*cache_gaps)++;
  210. Pvoid_t *PValue = PDCJudyLIns(JudyL_page_array, (Word_t) page_start_time_s, PJE0);
  211. if (!PValue || PValue == PJERR)
  212. fatal("DBENGINE: corrupted judy array in %s()", __FUNCTION__ );
  213. if (unlikely(*PValue)) {
  214. struct page_details *pd = *PValue;
  215. UNUSED(pd);
  216. // internal_error(
  217. // pd->first_time_s != page_first_time_s ||
  218. // pd->last_time_s != page_last_time_s ||
  219. // pd->update_every_s != page_update_every_s,
  220. // "DBENGINE: duplicate page with different retention in %s cache "
  221. // "1st: %ld to %ld, ue %u, size %u "
  222. // "2nd: %ld to %ld, ue %ld size %zu "
  223. // "- ignoring the second",
  224. // cache == open_cache ? "open" : "main",
  225. // pd->first_time_s, pd->last_time_s, pd->update_every_s, pd->page_length,
  226. // page_first_time_s, page_last_time_s, page_update_every_s, page_length);
  227. pgc_page_release(cache, page);
  228. }
  229. else {
  230. internal_fatal(pgc_page_metric(page) != metric_id, "Wrong metric id in page found in cache");
  231. internal_fatal(pgc_page_section(page) != (Word_t)ctx, "Wrong section in page found in cache");
  232. struct page_details *pd = page_details_get();
  233. pd->metric_id = metric_id;
  234. pd->first_time_s = page_start_time_s;
  235. pd->last_time_s = page_end_time_s;
  236. pd->page_length = page_length;
  237. pd->update_every_s = (uint32_t) page_update_every_s;
  238. pd->page = (open_cache_mode) ? NULL : page;
  239. pd->status |= tags;
  240. if((pd->page)) {
  241. pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED;
  242. if(pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
  243. pd->status |= PDC_PAGE_EMPTY;
  244. }
  245. if(open_cache_mode) {
  246. struct rrdengine_datafile *datafile = pgc_page_data(page);
  247. if(datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) { // for pd
  248. struct extent_io_data *xio = (struct extent_io_data *) pgc_page_custom_data(cache, page);
  249. pd->datafile.ptr = pgc_page_data(page);
  250. pd->datafile.file = xio->file;
  251. pd->datafile.extent.pos = xio->pos;
  252. pd->datafile.extent.bytes = xio->bytes;
  253. pd->datafile.fileno = pd->datafile.ptr->fileno;
  254. pd->status |= PDC_PAGE_DATAFILE_ACQUIRED | PDC_PAGE_DISK_PENDING;
  255. }
  256. else {
  257. pd->status |= PDC_PAGE_FAILED | PDC_PAGE_FAILED_TO_ACQUIRE_DATAFILE;
  258. }
  259. pgc_page_release(cache, page);
  260. }
  261. *PValue = pd;
  262. pages_found_in_cache++;
  263. }
  264. // prepare for the next iteration
  265. previous_page_end_time_s = page_end_time_s;
  266. if(page_update_every_s > 0)
  267. dt_s = page_update_every_s;
  268. // we are going to as for the NEXT page
  269. // so, set this to our first time
  270. now_s = page_start_time_s;
  271. } while(now_s <= wanted_end_time_s);
  272. return pages_found_in_cache;
  273. }
  274. static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) {
  275. time_t db_first_time_s, db_last_time_s, db_update_every_s;
  276. mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
  277. if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE)
  278. return;
  279. PGC_ENTRY page_entry = {
  280. .hot = false,
  281. .section = (Word_t)ctx,
  282. .metric_id = (Word_t)metric,
  283. .start_time_s = MAX(start_time_s, db_first_time_s),
  284. .end_time_s = MIN(end_time_s, db_last_time_s),
  285. .update_every_s = 0,
  286. .size = 0,
  287. .data = DBENGINE_EMPTY_PAGE,
  288. };
  289. if(page_entry.start_time_s >= page_entry.end_time_s)
  290. return;
  291. PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, NULL);
  292. pgc_page_release(main_cache, page);
  293. }
  294. static size_t list_has_time_gaps(
  295. struct rrdengine_instance *ctx,
  296. METRIC *metric,
  297. Pvoid_t JudyL_page_array,
  298. time_t wanted_start_time_s,
  299. time_t wanted_end_time_s,
  300. size_t *pages_total,
  301. size_t *pages_found_pass4,
  302. size_t *pages_to_load_from_disk,
  303. size_t *pages_overlapping,
  304. time_t *optimal_end_time_s,
  305. bool populate_gaps,
  306. PDC_PAGE_STATUS *common_status
  307. ) {
  308. // we will recalculate these, so zero them
  309. *pages_to_load_from_disk = 0;
  310. *pages_overlapping = 0;
  311. *optimal_end_time_s = 0;
  312. *common_status = 0;
  313. bool first;
  314. Pvoid_t *PValue;
  315. Word_t this_page_start_time;
  316. struct page_details *pd;
  317. size_t gaps = 0;
  318. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  319. // ------------------------------------------------------------------------
  320. // PASS 1: remove the preprocessing flags from the pages in PDC
  321. first = true;
  322. this_page_start_time = 0;
  323. while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) {
  324. pd = *PValue;
  325. pd->status &= ~(PDC_PAGE_SKIP|PDC_PAGE_PREPROCESSED);
  326. }
  327. // ------------------------------------------------------------------------
  328. // PASS 2: emulate processing to find the useful pages
  329. time_t now_s = wanted_start_time_s;
  330. time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
  331. if(!dt_s)
  332. dt_s = default_rrd_update_every;
  333. size_t pages_pass2 = 0, pages_pass3 = 0;
  334. while((pd = pdc_find_page_for_time(
  335. JudyL_page_array, now_s, &gaps,
  336. PDC_PAGE_PREPROCESSED, 0))) {
  337. pd->status |= PDC_PAGE_PREPROCESSED;
  338. pages_pass2++;
  339. if(pd->update_every_s)
  340. dt_s = pd->update_every_s;
  341. if(populate_gaps && pd->first_time_s > now_s)
  342. pgc_inject_gap(ctx, metric, now_s, pd->first_time_s);
  343. now_s = pd->last_time_s + dt_s;
  344. if(now_s > wanted_end_time_s) {
  345. *optimal_end_time_s = pd->last_time_s;
  346. break;
  347. }
  348. }
  349. if(populate_gaps && now_s < wanted_end_time_s)
  350. pgc_inject_gap(ctx, metric, now_s, wanted_end_time_s);
  351. // ------------------------------------------------------------------------
  352. // PASS 3: mark as skipped all the pages not useful
  353. first = true;
  354. this_page_start_time = 0;
  355. while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) {
  356. pd = *PValue;
  357. internal_fatal(pd->metric_id != metric_id, "pd has wrong metric_id");
  358. if(!(pd->status & PDC_PAGE_PREPROCESSED)) {
  359. (*pages_overlapping)++;
  360. pd->status |= PDC_PAGE_SKIP;
  361. pd->status &= ~(PDC_PAGE_READY | PDC_PAGE_DISK_PENDING);
  362. *common_status |= pd->status;
  363. continue;
  364. }
  365. pages_pass3++;
  366. if(!pd->page) {
  367. pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, (Word_t) metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
  368. if(pd->page) {
  369. (*pages_found_pass4)++;
  370. pd->status &= ~PDC_PAGE_DISK_PENDING;
  371. pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED | PDC_PAGE_PRELOADED_PASS4;
  372. if(pgc_page_data(pd->page) == DBENGINE_EMPTY_PAGE)
  373. pd->status |= PDC_PAGE_EMPTY;
  374. }
  375. else if(!(pd->status & PDC_PAGE_FAILED) && (pd->status & PDC_PAGE_DATAFILE_ACQUIRED)) {
  376. (*pages_to_load_from_disk)++;
  377. pd->status |= PDC_PAGE_DISK_PENDING;
  378. internal_fatal(pd->status & PDC_PAGE_SKIP, "page is disk pending and skipped");
  379. internal_fatal(!pd->datafile.ptr, "datafile is NULL");
  380. internal_fatal(!pd->datafile.extent.bytes, "datafile.extent.bytes zero");
  381. internal_fatal(!pd->datafile.extent.pos, "datafile.extent.pos is zero");
  382. internal_fatal(!pd->datafile.fileno, "datafile.fileno is zero");
  383. }
  384. }
  385. else {
  386. pd->status &= ~PDC_PAGE_DISK_PENDING;
  387. pd->status |= (PDC_PAGE_READY | PDC_PAGE_PRELOADED);
  388. }
  389. *common_status |= pd->status;
  390. }
  391. internal_fatal(pages_pass2 != pages_pass3,
  392. "DBENGINE: page count does not match");
  393. *pages_total = pages_pass2;
  394. return gaps;
  395. }
  396. // ----------------------------------------------------------------------------
  397. typedef void (*page_found_callback_t)(PGC_PAGE *page, void *data);
  398. static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, page_found_callback_t callback, void *callback_data) {
  399. uuid_t *uuid = mrg_metric_uuid(main_mrg, metric);
  400. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  401. time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC);
  402. time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC);
  403. size_t pages_found = 0;
  404. NJFV2IDX_FIND_STATE state = {
  405. .init = false,
  406. .last = 0,
  407. .ctx = ctx,
  408. .wanted_start_time_s = wanted_start_time_s,
  409. .wanted_end_time_s = wanted_end_time_s,
  410. .j2_header_acquired = NULL,
  411. };
  412. struct rrdengine_datafile *datafile;
  413. while((datafile = njfv2idx_find_and_acquire_j2_header(&state))) {
  414. struct journal_v2_header *j2_header = state.j2_header_acquired;
  415. if (unlikely(!j2_header))
  416. continue;
  417. time_t journal_start_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
  418. // the datafile possibly contains useful data for this query
  419. size_t journal_metric_count = (size_t)j2_header->metric_count;
  420. struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
  421. struct journal_metric_list *uuid_entry = bsearch(uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_uuid_compare);
  422. if (unlikely(!uuid_entry)) {
  423. // our UUID is not in this datafile
  424. journalfile_v2_data_release(datafile->journalfile);
  425. continue;
  426. }
  427. struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) j2_header + uuid_entry->page_offset);
  428. struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header));
  429. struct journal_extent_list *extent_list = (void *)((uint8_t *)j2_header + j2_header->extent_offset);
  430. uint32_t uuid_page_entries = page_list_header->entries;
  431. for (uint32_t index = 0; index < uuid_page_entries; index++) {
  432. struct journal_page_list *page_entry_in_journal = &page_list[index];
  433. time_t page_first_time_s = page_entry_in_journal->delta_start_s + journal_start_time_s;
  434. time_t page_last_time_s = page_entry_in_journal->delta_end_s + journal_start_time_s;
  435. TIME_RANGE_COMPARE prc = is_page_in_time_range(page_first_time_s, page_last_time_s, wanted_start_time_s, wanted_end_time_s);
  436. if(prc == PAGE_IS_IN_THE_PAST)
  437. continue;
  438. if(prc == PAGE_IS_IN_THE_FUTURE)
  439. break;
  440. time_t page_update_every_s = page_entry_in_journal->update_every_s;
  441. size_t page_length = page_entry_in_journal->page_length;
  442. if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item
  443. // add this page to open cache
  444. bool added = false;
  445. struct extent_io_data ei = {
  446. .pos = extent_list[page_entry_in_journal->extent_index].datafile_offset,
  447. .bytes = extent_list[page_entry_in_journal->extent_index].datafile_size,
  448. .page_length = page_length,
  449. .file = datafile->file,
  450. .fileno = datafile->fileno,
  451. };
  452. PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, (PGC_ENTRY) {
  453. .hot = false,
  454. .section = (Word_t) ctx,
  455. .metric_id = metric_id,
  456. .start_time_s = page_first_time_s,
  457. .end_time_s = page_last_time_s,
  458. .update_every_s = (uint32_t) page_update_every_s,
  459. .data = datafile,
  460. .size = 0,
  461. .custom_data = (uint8_t *) &ei,
  462. }, &added);
  463. if(!added)
  464. datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
  465. callback(page, callback_data);
  466. pgc_page_release(open_cache, page);
  467. pages_found++;
  468. }
  469. }
  470. journalfile_v2_data_release(datafile->journalfile);
  471. }
  472. return pages_found;
  473. }
  474. void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) {
  475. struct rrdengine_datafile *datafile = pgc_page_data(page);
  476. if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) // for pd
  477. return;
  478. Pvoid_t *PValue = PDCJudyLIns(JudyL_pptr, pgc_page_start_time_s(page), PJE0);
  479. if (!PValue || PValue == PJERR)
  480. fatal("DBENGINE: corrupted judy array");
  481. if (unlikely(*PValue)) {
  482. datafile_release(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS);
  483. return;
  484. }
  485. Word_t metric_id = pgc_page_metric(page);
  486. // let's add it to the judy
  487. struct extent_io_data *ei = pgc_page_custom_data(open_cache, page);
  488. struct page_details *pd = page_details_get();
  489. *PValue = pd;
  490. pd->datafile.extent.pos = ei->pos;
  491. pd->datafile.extent.bytes = ei->bytes;
  492. pd->datafile.file = ei->file;
  493. pd->datafile.fileno = ei->fileno;
  494. pd->first_time_s = pgc_page_start_time_s(page);
  495. pd->last_time_s = pgc_page_end_time_s(page);
  496. pd->datafile.ptr = datafile;
  497. pd->page_length = ei->page_length;
  498. pd->update_every_s = (uint32_t) pgc_page_update_every_s(page);
  499. pd->metric_id = metric_id;
  500. pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED;
  501. }
  502. // Return a judyL will all pages that have start_time_ut and end_time_ut
  503. // Pvalue of the judy will be the end time for that page
  504. // DBENGINE2:
  505. #define time_delta(finish, pass) do { if(pass) { usec_t t = pass; (pass) = (finish) - (pass); (finish) = t; } } while(0)
  506. static Pvoid_t get_page_list(
  507. struct rrdengine_instance *ctx,
  508. METRIC *metric,
  509. usec_t start_time_ut,
  510. usec_t end_time_ut,
  511. time_t *optimal_end_time_s,
  512. size_t *pages_to_load_from_disk,
  513. PDC_PAGE_STATUS *common_status
  514. ) {
  515. *optimal_end_time_s = 0;
  516. *pages_to_load_from_disk = 0;
  517. *common_status = 0;
  518. Pvoid_t JudyL_page_array = (Pvoid_t) NULL;
  519. time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC);
  520. time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC);
  521. size_t pages_found_in_main_cache = 0,
  522. pages_found_in_open_cache = 0,
  523. pages_found_in_journals_v2 = 0,
  524. pages_found_pass4 = 0,
  525. pages_overlapping = 0,
  526. pages_total = 0;
  527. size_t cache_gaps = 0, query_gaps = 0;
  528. bool done_v2 = false, done_open = false;
  529. usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0, finish_ut = 0;
  530. // --------------------------------------------------------------
  531. // PASS 1: Check what the main page cache has available
  532. pass1_ut = now_monotonic_usec();
  533. size_t pages_pass1 = get_page_list_from_pgc(main_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s,
  534. &JudyL_page_array, &cache_gaps,
  535. false, PDC_PAGE_SOURCE_MAIN_CACHE);
  536. query_gaps += cache_gaps;
  537. pages_found_in_main_cache += pages_pass1;
  538. pages_total += pages_pass1;
  539. if(pages_found_in_main_cache && !cache_gaps) {
  540. query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
  541. &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping,
  542. optimal_end_time_s, false, common_status);
  543. if (pages_total && !query_gaps)
  544. goto we_are_done;
  545. }
  546. // --------------------------------------------------------------
  547. // PASS 2: Check what the open journal page cache has available
  548. // these will be loaded from disk
  549. pass2_ut = now_monotonic_usec();
  550. size_t pages_pass2 = get_page_list_from_pgc(open_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s,
  551. &JudyL_page_array, &cache_gaps,
  552. true, PDC_PAGE_SOURCE_OPEN_CACHE);
  553. query_gaps += cache_gaps;
  554. pages_found_in_open_cache += pages_pass2;
  555. pages_total += pages_pass2;
  556. done_open = true;
  557. if(pages_found_in_open_cache) {
  558. query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
  559. &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping,
  560. optimal_end_time_s, false, common_status);
  561. if (pages_total && !query_gaps)
  562. goto we_are_done;
  563. }
  564. // --------------------------------------------------------------
  565. // PASS 3: Check Journal v2 to fill the gaps
  566. pass3_ut = now_monotonic_usec();
  567. size_t pages_pass3 = get_page_list_from_journal_v2(ctx, metric, start_time_ut, end_time_ut,
  568. add_page_details_from_journal_v2, &JudyL_page_array);
  569. pages_found_in_journals_v2 += pages_pass3;
  570. pages_total += pages_pass3;
  571. done_v2 = true;
  572. // --------------------------------------------------------------
  573. // PASS 4: Check the cache again
  574. // and calculate the time gaps in the query
  575. // THIS IS REQUIRED AFTER JOURNAL V2 LOOKUP
  576. pass4_ut = now_monotonic_usec();
  577. query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
  578. &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping,
  579. optimal_end_time_s, true, common_status);
  580. we_are_done:
  581. finish_ut = now_monotonic_usec();
  582. time_delta(finish_ut, pass4_ut);
  583. time_delta(finish_ut, pass3_ut);
  584. time_delta(finish_ut, pass2_ut);
  585. time_delta(finish_ut, pass1_ut);
  586. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_main_cache_lookup, pass1_ut, __ATOMIC_RELAXED);
  587. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_open_cache_lookup, pass2_ut, __ATOMIC_RELAXED);
  588. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_journal_v2_lookup, pass3_ut, __ATOMIC_RELAXED);
  589. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_pass4_lookup, pass4_ut, __ATOMIC_RELAXED);
  590. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries, 1, __ATOMIC_RELAXED);
  591. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_planned_with_gaps, (query_gaps) ? 1 : 0, __ATOMIC_RELAXED);
  592. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_open, done_open ? 1 : 0, __ATOMIC_RELAXED);
  593. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_journal_v2, done_v2 ? 1 : 0, __ATOMIC_RELAXED);
  594. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_total, pages_total, __ATOMIC_RELAXED);
  595. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
  596. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_open_cache, pages_found_in_open_cache, __ATOMIC_RELAXED);
  597. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_journal_v2, pages_found_in_journals_v2, __ATOMIC_RELAXED);
  598. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
  599. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache_at_pass4, pages_found_pass4, __ATOMIC_RELAXED);
  600. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, *pages_to_load_from_disk, __ATOMIC_RELAXED);
  601. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_overlapping_skipped, pages_overlapping, __ATOMIC_RELAXED);
  602. return JudyL_page_array;
  603. }
  604. inline void rrdeng_prep_wait(PDC *pdc) {
  605. if (unlikely(pdc && !pdc->prep_done)) {
  606. usec_t started_ut = now_monotonic_usec();
  607. completion_wait_for(&pdc->prep_completion);
  608. pdc->prep_done = true;
  609. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_wait_for_prep, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED);
  610. }
  611. }
  612. void rrdeng_prep_query(struct page_details_control *pdc, bool worker) {
  613. if(worker)
  614. worker_is_busy(UV_EVENT_DBENGINE_QUERY);
  615. pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric,
  616. pdc->start_time_s * USEC_PER_SEC,
  617. pdc->end_time_s * USEC_PER_SEC,
  618. &pdc->optimal_end_time_s,
  619. &pdc->pages_to_load_from_disk,
  620. &pdc->common_status);
  621. internal_fatal(pdc->pages_to_load_from_disk && !(pdc->common_status & PDC_PAGE_DISK_PENDING),
  622. "DBENGINE: PDC reports there are %zu pages to load from disk, "
  623. "but none of the pages has the PDC_PAGE_DISK_PENDING flag",
  624. pdc->pages_to_load_from_disk);
  625. internal_fatal(!pdc->pages_to_load_from_disk && (pdc->common_status & PDC_PAGE_DISK_PENDING),
  626. "DBENGINE: PDC reports there are no pages to load from disk, "
  627. "but one or more pages have the PDC_PAGE_DISK_PENDING flag");
  628. if (pdc->pages_to_load_from_disk && pdc->page_list_JudyL) {
  629. pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work()
  630. usec_t start_ut = now_monotonic_usec();
  631. if(likely(pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS))
  632. pdc_route_synchronously(pdc->ctx, pdc);
  633. else
  634. pdc_route_asynchronously(pdc->ctx, pdc);
  635. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_to_route, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  636. }
  637. else
  638. completion_mark_complete(&pdc->page_completion);
  639. completion_mark_complete(&pdc->prep_completion);
  640. pdc_release_and_destroy_if_unreferenced(pdc, true, true);
  641. if(worker)
  642. worker_is_idle();
  643. }
  644. /**
  645. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  646. * @param ctx DB context
  647. * @param handle query handle as initialized
  648. * @param start_time_ut inclusive starting time in usec
  649. * @param end_time_ut inclusive ending time in usec
  650. * @return 1 / 0 (pages found or not found)
  651. */
  652. void pg_cache_preload(struct rrdeng_query_handle *handle) {
  653. if (unlikely(!handle || !handle->metric))
  654. return;
  655. __atomic_add_fetch(&handle->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED);
  656. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
  657. handle->pdc = pdc_get();
  658. handle->pdc->metric = mrg_metric_dup(main_mrg, handle->metric);
  659. handle->pdc->start_time_s = handle->start_time_s;
  660. handle->pdc->end_time_s = handle->end_time_s;
  661. handle->pdc->priority = handle->priority;
  662. handle->pdc->optimal_end_time_s = handle->end_time_s;
  663. handle->pdc->ctx = handle->ctx;
  664. handle->pdc->refcount = 1;
  665. spinlock_init(&handle->pdc->refcount_spinlock);
  666. completion_init(&handle->pdc->prep_completion);
  667. completion_init(&handle->pdc->page_completion);
  668. if(ctx_is_available_for_queries(handle->ctx)) {
  669. handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread
  670. if(unlikely(handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS))
  671. rrdeng_prep_query(handle->pdc, false);
  672. else
  673. rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL);
  674. }
  675. else {
  676. completion_mark_complete(&handle->pdc->prep_completion);
  677. completion_mark_complete(&handle->pdc->page_completion);
  678. }
  679. }
  680. /*
  681. * Searches for the first page between start_time and end_time and gets a reference.
  682. * start_time and end_time are inclusive.
  683. * If index is NULL lookup by UUID (id).
  684. */
  685. struct pgc_page *pg_cache_lookup_next(
  686. struct rrdengine_instance *ctx,
  687. PDC *pdc,
  688. time_t now_s,
  689. time_t last_update_every_s,
  690. size_t *entries
  691. ) {
  692. if (unlikely(!pdc))
  693. return NULL;
  694. rrdeng_prep_wait(pdc);
  695. if (unlikely(!pdc->page_list_JudyL))
  696. return NULL;
  697. usec_t start_ut = now_monotonic_usec();
  698. size_t gaps = 0;
  699. bool waited = false, preloaded;
  700. PGC_PAGE *page = NULL;
  701. while(!page) {
  702. bool page_from_pd = false;
  703. preloaded = false;
  704. struct page_details *pd = pdc_find_page_for_time(
  705. pdc->page_list_JudyL, now_s, &gaps,
  706. PDC_PAGE_PROCESSED, PDC_PAGE_EMPTY);
  707. if (!pd)
  708. break;
  709. page = pd->page;
  710. page_from_pd = true;
  711. preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
  712. if(!page) {
  713. if(!completion_is_done(&pdc->page_completion)) {
  714. page = pgc_page_get_and_acquire(main_cache, (Word_t)ctx,
  715. pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
  716. page_from_pd = false;
  717. preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
  718. }
  719. if(!page) {
  720. pdc->completed_jobs =
  721. completion_wait_for_a_job(&pdc->page_completion, pdc->completed_jobs);
  722. page = pd->page;
  723. page_from_pd = true;
  724. preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
  725. waited = true;
  726. }
  727. }
  728. if(page && pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
  729. pdc_page_status_set(pd, PDC_PAGE_EMPTY);
  730. if(!page || pdc_page_status_check(pd, PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | PDC_PAGE_EMPTY)) {
  731. page = NULL;
  732. continue;
  733. }
  734. // we now have page and is not empty
  735. time_t page_start_time_s = pgc_page_start_time_s(page);
  736. time_t page_end_time_s = pgc_page_end_time_s(page);
  737. time_t page_update_every_s = pgc_page_update_every_s(page);
  738. size_t page_length = pgc_page_data_size(main_cache, page);
  739. if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) {
  740. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED);
  741. pgc_page_to_clean_evict_or_release(main_cache, page);
  742. pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
  743. pd->page = page = NULL;
  744. continue;
  745. }
  746. else if(page_length > RRDENG_BLOCK_SIZE) {
  747. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_size_skipped, 1, __ATOMIC_RELAXED);
  748. pgc_page_to_clean_evict_or_release(main_cache, page);
  749. pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
  750. pd->page = page = NULL;
  751. continue;
  752. }
  753. else {
  754. if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
  755. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
  756. page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
  757. pd->update_every_s = (uint32_t) page_update_every_s;
  758. }
  759. size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx));
  760. size_t entries_by_time = page_entries_by_time(page_start_time_s, page_end_time_s, page_update_every_s);
  761. if(unlikely(entries_by_size < entries_by_time)) {
  762. time_t fixed_page_end_time_s = (time_t)(page_start_time_s + (entries_by_size - 1) * page_update_every_s);
  763. pd->last_time_s = page_end_time_s = pgc_page_fix_end_time_s(page, fixed_page_end_time_s);
  764. entries_by_time = (page_end_time_s - (page_start_time_s - page_update_every_s)) / page_update_every_s;
  765. internal_fatal(entries_by_size != entries_by_time, "DBENGINE: wrong entries by time again!");
  766. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_entries_fixed, 1, __ATOMIC_RELAXED);
  767. }
  768. *entries = entries_by_time;
  769. }
  770. if(unlikely(page_end_time_s < now_s)) {
  771. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_past_time_skipped, 1, __ATOMIC_RELAXED);
  772. pgc_page_release(main_cache, page);
  773. pdc_page_status_set(pd, PDC_PAGE_SKIP | PDC_PAGE_RELEASED);
  774. pd->page = page = NULL;
  775. continue;
  776. }
  777. if(page_from_pd)
  778. // PDC_PAGE_RELEASED is for pdc_destroy() to not release the page twice - the caller will release it
  779. pdc_page_status_set(pd, PDC_PAGE_RELEASED | PDC_PAGE_PROCESSED);
  780. else
  781. pdc_page_status_set(pd, PDC_PAGE_PROCESSED);
  782. }
  783. if(gaps && !pdc->executed_with_gaps)
  784. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_executed_with_gaps, 1, __ATOMIC_RELAXED);
  785. pdc->executed_with_gaps = +gaps;
  786. if(page) {
  787. if(waited)
  788. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_loaded, 1, __ATOMIC_RELAXED);
  789. else
  790. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_loaded, 1, __ATOMIC_RELAXED);
  791. }
  792. else {
  793. if(waited)
  794. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_failed, 1, __ATOMIC_RELAXED);
  795. else
  796. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_failed, 1, __ATOMIC_RELAXED);
  797. }
  798. if(waited) {
  799. if(preloaded)
  800. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  801. else
  802. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  803. }
  804. else {
  805. if(preloaded)
  806. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  807. else
  808. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  809. }
  810. return page;
  811. }
  812. void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s,
  813. struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) {
  814. if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item
  815. fatal("DBENGINE: cannot acquire datafile to put page in open cache");
  816. struct extent_io_data ext_io_data = {
  817. .file = datafile->file,
  818. .fileno = datafile->fileno,
  819. .pos = extent_offset,
  820. .bytes = extent_size,
  821. .page_length = page_length
  822. };
  823. PGC_ENTRY page_entry = {
  824. .hot = true,
  825. .section = section,
  826. .metric_id = metric_id,
  827. .start_time_s = start_time_s,
  828. .end_time_s = end_time_s,
  829. .update_every_s = (uint32_t) update_every_s,
  830. .size = 0,
  831. .data = datafile,
  832. .custom_data = (uint8_t *) &ext_io_data,
  833. };
  834. internal_fatal(!datafile->fileno, "DBENGINE: datafile supplied does not have a number");
  835. bool added = true;
  836. PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, page_entry, &added);
  837. int tries = 100;
  838. while(!added && page_entry.end_time_s > pgc_page_end_time_s(page) && tries--) {
  839. pgc_page_to_clean_evict_or_release(open_cache, page);
  840. page = pgc_page_add_and_acquire(open_cache, page_entry, &added);
  841. }
  842. if(!added) {
  843. datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
  844. internal_fatal(page_entry.end_time_s > pgc_page_end_time_s(page),
  845. "DBENGINE: cannot add longer page to open cache");
  846. }
  847. pgc_page_release(open_cache, (PGC_PAGE *)page);
  848. }
  849. size_t dynamic_open_cache_size(void) {
  850. size_t main_cache_size = pgc_get_wanted_cache_size(main_cache);
  851. size_t target_size = main_cache_size / 100 * 5;
  852. if(target_size < 2 * 1024 * 1024)
  853. target_size = 2 * 1024 * 1024;
  854. return target_size;
  855. }
  856. size_t dynamic_extent_cache_size(void) {
  857. size_t main_cache_size = pgc_get_wanted_cache_size(main_cache);
  858. size_t target_size = main_cache_size / 100 * 5;
  859. if(target_size < 3 * 1024 * 1024)
  860. target_size = 3 * 1024 * 1024;
  861. return target_size;
  862. }
  863. void pgc_and_mrg_initialize(void)
  864. {
  865. main_mrg = mrg_create(0);
  866. size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL;
  867. size_t main_cache_size = (target_cache_size / 100) * 95;
  868. size_t open_cache_size = 0;
  869. size_t extent_cache_size = (target_cache_size / 100) * 5;
  870. if(extent_cache_size < 3 * 1024 * 1024) {
  871. extent_cache_size = 3 * 1024 * 1024;
  872. main_cache_size = target_cache_size - extent_cache_size;
  873. }
  874. extent_cache_size += (size_t)(default_rrdeng_extent_cache_mb * 1024ULL * 1024ULL);
  875. main_cache = pgc_create(
  876. "main_cache",
  877. main_cache_size,
  878. main_cache_free_clean_page_callback,
  879. (size_t) rrdeng_pages_per_extent,
  880. main_cache_flush_dirty_page_init_callback,
  881. main_cache_flush_dirty_page_callback,
  882. 10,
  883. 10240, // if there are that many threads, evict so many at once!
  884. 1000, //
  885. 5, // don't delay too much other threads
  886. PGC_OPTIONS_AUTOSCALE, // AUTOSCALE = 2x max hot pages
  887. 0, // 0 = as many as the system cpus
  888. 0
  889. );
  890. open_cache = pgc_create(
  891. "open_cache",
  892. open_cache_size, // the default is 1MB
  893. open_cache_free_clean_page_callback,
  894. 1,
  895. NULL,
  896. open_cache_flush_dirty_page_callback,
  897. 10,
  898. 10240, // if there are that many threads, evict that many at once!
  899. 1000, //
  900. 3, // don't delay too much other threads
  901. PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE,
  902. 0, // 0 = as many as the system cpus
  903. sizeof(struct extent_io_data)
  904. );
  905. pgc_set_dynamic_target_cache_size_callback(open_cache, dynamic_open_cache_size);
  906. extent_cache = pgc_create(
  907. "extent_cache",
  908. extent_cache_size,
  909. extent_cache_free_clean_page_callback,
  910. 1,
  911. NULL,
  912. extent_cache_flush_dirty_page_callback,
  913. 5,
  914. 10, // it will lose up to that extents at once!
  915. 100, //
  916. 2, // don't delay too much other threads
  917. PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE,
  918. 0, // 0 = as many as the system cpus
  919. 0
  920. );
  921. pgc_set_dynamic_target_cache_size_callback(extent_cache, dynamic_extent_cache_size);
  922. }