pagecache.c 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. MRG *main_mrg = NULL;
  5. PGC *main_cache = NULL;
  6. PGC *open_cache = NULL;
  7. PGC *extent_cache = NULL;
  8. struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats = {};
  9. static void main_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
  10. {
  11. // Release storage associated with the page
  12. dbengine_page_free(entry.data, entry.size);
  13. }
  14. static void main_cache_flush_dirty_page_init_callback(PGC *cache __maybe_unused, Word_t section) {
  15. struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
  16. // mark ctx as having flushing in progress
  17. __atomic_add_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
  18. }
  19. static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
  20. {
  21. if(!entries)
  22. return;
  23. struct rrdengine_instance *ctx = (struct rrdengine_instance *) entries_array[0].section;
  24. size_t bytes_per_point = CTX_POINT_SIZE_BYTES(ctx);
  25. struct page_descr_with_data *base = NULL;
  26. for (size_t Index = 0 ; Index < entries; Index++) {
  27. time_t start_time_s = entries_array[Index].start_time_s;
  28. time_t end_time_s = entries_array[Index].end_time_s;
  29. struct page_descr_with_data *descr = page_descriptor_get();
  30. descr->id = mrg_metric_uuid(main_mrg, (METRIC *) entries_array[Index].metric_id);
  31. descr->metric_id = entries_array[Index].metric_id;
  32. descr->start_time_ut = start_time_s * USEC_PER_SEC;
  33. descr->end_time_ut = end_time_s * USEC_PER_SEC;
  34. descr->update_every_s = entries_array[Index].update_every_s;
  35. descr->type = ctx->config.page_type;
  36. descr->page_length = (end_time_s - (start_time_s - descr->update_every_s)) / descr->update_every_s * bytes_per_point;
  37. if(descr->page_length > entries_array[Index].size) {
  38. descr->page_length = entries_array[Index].size;
  39. error_limit_static_global_var(erl, 1, 0);
  40. error_limit(&erl, "DBENGINE: page exceeds the maximum size, adjusting it to max.");
  41. }
  42. descr->page = pgc_page_data(pages_array[Index]);
  43. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, descr, link.prev, link.next);
  44. internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation");
  45. }
  46. struct completion completion;
  47. completion_init(&completion);
  48. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_WRITE, base, &completion, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  49. completion_wait_for(&completion);
  50. completion_destroy(&completion);
  51. }
  52. static void open_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
  53. {
  54. struct rrdengine_datafile *datafile = entry.data;
  55. datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
  56. }
  57. static void open_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
  58. {
  59. ;
  60. }
  61. static void extent_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
  62. {
  63. dbengine_extent_free(entry.data, entry.size);
  64. }
  65. static void extent_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
  66. {
  67. ;
  68. }
  69. inline TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s) {
  70. // page_first_time_s <= wanted_end_time_s && page_last_time_s >= wanted_start_time_s
  71. if(page_last_time_s < wanted_start_time_s)
  72. return PAGE_IS_IN_THE_PAST;
  73. if(page_first_time_s > wanted_end_time_s)
  74. return PAGE_IS_IN_THE_FUTURE;
  75. return PAGE_IS_IN_RANGE;
  76. }
  77. static int journal_metric_uuid_compare(const void *key, const void *metric)
  78. {
  79. return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
  80. }
  81. static inline struct page_details *pdc_find_page_for_time(
  82. Pcvoid_t PArray,
  83. time_t wanted_time_s,
  84. size_t *gaps,
  85. PDC_PAGE_STATUS mode,
  86. PDC_PAGE_STATUS skip_list
  87. ) {
  88. Word_t PIndexF = wanted_time_s, PIndexL = wanted_time_s;
  89. Pvoid_t *PValueF, *PValueL;
  90. struct page_details *pdF = NULL, *pdL = NULL;
  91. bool firstF = true, firstL = true;
  92. PDC_PAGE_STATUS ignore_list = PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | skip_list;
  93. while ((PValueF = PDCJudyLFirstThenNext(PArray, &PIndexF, &firstF))) {
  94. pdF = *PValueF;
  95. PDC_PAGE_STATUS status = __atomic_load_n(&pdF->status, __ATOMIC_ACQUIRE);
  96. if (!(status & (ignore_list | mode)))
  97. break;
  98. pdF = NULL;
  99. }
  100. while ((PValueL = PDCJudyLLastThenPrev(PArray, &PIndexL, &firstL))) {
  101. pdL = *PValueL;
  102. PDC_PAGE_STATUS status = __atomic_load_n(&pdL->status, __ATOMIC_ACQUIRE);
  103. if(status & mode) {
  104. // don't go all the way back to the beginning
  105. // stop at the last processed
  106. pdL = NULL;
  107. break;
  108. }
  109. if (!(status & ignore_list))
  110. break;
  111. pdL = NULL;
  112. }
  113. TIME_RANGE_COMPARE rcF = (pdF) ? is_page_in_time_range(pdF->first_time_s, pdF->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_FUTURE;
  114. TIME_RANGE_COMPARE rcL = (pdL) ? is_page_in_time_range(pdL->first_time_s, pdL->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_PAST;
  115. if (!pdF || pdF == pdL) {
  116. // F is missing, or they are the same
  117. // return L
  118. (*gaps) += (rcL == PAGE_IS_IN_RANGE) ? 0 : 1;
  119. return pdL;
  120. }
  121. if (!pdL) {
  122. // L is missing
  123. // return F
  124. (*gaps) += (rcF == PAGE_IS_IN_RANGE) ? 0 : 1;
  125. return pdF;
  126. }
  127. if (rcF == rcL) {
  128. // both are on the same side,
  129. // but they are different pages
  130. switch (rcF) {
  131. case PAGE_IS_IN_RANGE:
  132. // pick the higher resolution
  133. if (pdF->update_every_s && pdF->update_every_s < pdL->update_every_s)
  134. return pdF;
  135. if (pdL->update_every_s && pdL->update_every_s < pdF->update_every_s)
  136. return pdL;
  137. // same resolution - pick the one that starts earlier
  138. if (pdL->first_time_s < pdF->first_time_s)
  139. return pdL;
  140. return pdF;
  141. break;
  142. case PAGE_IS_IN_THE_FUTURE:
  143. (*gaps)++;
  144. // pick the one that starts earlier
  145. if (pdL->first_time_s < pdF->first_time_s)
  146. return pdL;
  147. return pdF;
  148. break;
  149. default:
  150. case PAGE_IS_IN_THE_PAST:
  151. (*gaps)++;
  152. return NULL;
  153. break;
  154. }
  155. }
  156. if(rcF == PAGE_IS_IN_RANGE) {
  157. // (*gaps) += 0;
  158. return pdF;
  159. }
  160. if(rcL == PAGE_IS_IN_RANGE) {
  161. // (*gaps) += 0;
  162. return pdL;
  163. }
  164. if(rcF == PAGE_IS_IN_THE_FUTURE) {
  165. (*gaps)++;
  166. return pdF;
  167. }
  168. if(rcL == PAGE_IS_IN_THE_FUTURE) {
  169. (*gaps)++;
  170. return pdL;
  171. }
  172. // impossible case
  173. (*gaps)++;
  174. return NULL;
  175. }
  176. static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengine_instance *ctx,
  177. time_t wanted_start_time_s, time_t wanted_end_time_s,
  178. Pvoid_t *JudyL_page_array, size_t *cache_gaps,
  179. bool open_cache_mode, PDC_PAGE_STATUS tags) {
  180. size_t pages_found_in_cache = 0;
  181. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  182. time_t now_s = wanted_start_time_s;
  183. time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
  184. if(!dt_s)
  185. dt_s = default_rrd_update_every;
  186. time_t previous_page_end_time_s = now_s - dt_s;
  187. bool first = true;
  188. do {
  189. PGC_PAGE *page = pgc_page_get_and_acquire(
  190. cache, (Word_t)ctx, (Word_t)metric_id, now_s,
  191. (first) ? PGC_SEARCH_CLOSEST : PGC_SEARCH_NEXT);
  192. first = false;
  193. if(!page) {
  194. if(previous_page_end_time_s < wanted_end_time_s)
  195. (*cache_gaps)++;
  196. break;
  197. }
  198. time_t page_start_time_s = pgc_page_start_time_s(page);
  199. time_t page_end_time_s = pgc_page_end_time_s(page);
  200. time_t page_update_every_s = pgc_page_update_every_s(page);
  201. size_t page_length = pgc_page_data_size(cache, page);
  202. if(!page_update_every_s)
  203. page_update_every_s = dt_s;
  204. if(is_page_in_time_range(page_start_time_s, page_end_time_s, wanted_start_time_s, wanted_end_time_s) != PAGE_IS_IN_RANGE) {
  205. // not a useful page for this query
  206. pgc_page_release(cache, page);
  207. page = NULL;
  208. if(previous_page_end_time_s < wanted_end_time_s)
  209. (*cache_gaps)++;
  210. break;
  211. }
  212. if (page_start_time_s - previous_page_end_time_s > dt_s)
  213. (*cache_gaps)++;
  214. Pvoid_t *PValue = PDCJudyLIns(JudyL_page_array, (Word_t) page_start_time_s, PJE0);
  215. if (!PValue || PValue == PJERR)
  216. fatal("DBENGINE: corrupted judy array in %s()", __FUNCTION__ );
  217. if (unlikely(*PValue)) {
  218. struct page_details *pd = *PValue;
  219. UNUSED(pd);
  220. // internal_error(
  221. // pd->first_time_s != page_first_time_s ||
  222. // pd->last_time_s != page_last_time_s ||
  223. // pd->update_every_s != page_update_every_s,
  224. // "DBENGINE: duplicate page with different retention in %s cache "
  225. // "1st: %ld to %ld, ue %u, size %u "
  226. // "2nd: %ld to %ld, ue %ld size %zu "
  227. // "- ignoring the second",
  228. // cache == open_cache ? "open" : "main",
  229. // pd->first_time_s, pd->last_time_s, pd->update_every_s, pd->page_length,
  230. // page_first_time_s, page_last_time_s, page_update_every_s, page_length);
  231. pgc_page_release(cache, page);
  232. }
  233. else {
  234. internal_fatal(pgc_page_metric(page) != metric_id, "Wrong metric id in page found in cache");
  235. internal_fatal(pgc_page_section(page) != (Word_t)ctx, "Wrong section in page found in cache");
  236. struct page_details *pd = page_details_get();
  237. pd->metric_id = metric_id;
  238. pd->first_time_s = page_start_time_s;
  239. pd->last_time_s = page_end_time_s;
  240. pd->page_length = page_length;
  241. pd->update_every_s = page_update_every_s;
  242. pd->page = (open_cache_mode) ? NULL : page;
  243. pd->status |= tags;
  244. if((pd->page)) {
  245. pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED;
  246. if(pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
  247. pd->status |= PDC_PAGE_EMPTY;
  248. }
  249. if(open_cache_mode) {
  250. struct rrdengine_datafile *datafile = pgc_page_data(page);
  251. if(datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) { // for pd
  252. struct extent_io_data *xio = (struct extent_io_data *) pgc_page_custom_data(cache, page);
  253. pd->datafile.ptr = pgc_page_data(page);
  254. pd->datafile.file = xio->file;
  255. pd->datafile.extent.pos = xio->pos;
  256. pd->datafile.extent.bytes = xio->bytes;
  257. pd->datafile.fileno = pd->datafile.ptr->fileno;
  258. pd->status |= PDC_PAGE_DATAFILE_ACQUIRED | PDC_PAGE_DISK_PENDING;
  259. }
  260. else {
  261. pd->status |= PDC_PAGE_FAILED | PDC_PAGE_FAILED_TO_ACQUIRE_DATAFILE;
  262. }
  263. pgc_page_release(cache, page);
  264. }
  265. *PValue = pd;
  266. pages_found_in_cache++;
  267. }
  268. // prepare for the next iteration
  269. previous_page_end_time_s = page_end_time_s;
  270. if(page_update_every_s > 0)
  271. dt_s = page_update_every_s;
  272. // we are going to as for the NEXT page
  273. // so, set this to our first time
  274. now_s = page_start_time_s;
  275. } while(now_s <= wanted_end_time_s);
  276. return pages_found_in_cache;
  277. }
  278. static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) {
  279. time_t db_first_time_s, db_last_time_s, db_update_every_s;
  280. mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
  281. if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE)
  282. return;
  283. PGC_ENTRY page_entry = {
  284. .hot = false,
  285. .section = (Word_t)ctx,
  286. .metric_id = (Word_t)metric,
  287. .start_time_s = MAX(start_time_s, db_first_time_s),
  288. .end_time_s = MIN(end_time_s, db_last_time_s),
  289. .update_every_s = 0,
  290. .size = 0,
  291. .data = DBENGINE_EMPTY_PAGE,
  292. };
  293. if(page_entry.start_time_s >= page_entry.end_time_s)
  294. return;
  295. PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, NULL);
  296. pgc_page_release(main_cache, page);
  297. }
  298. static size_t list_has_time_gaps(
  299. struct rrdengine_instance *ctx,
  300. METRIC *metric,
  301. Pvoid_t JudyL_page_array,
  302. time_t wanted_start_time_s,
  303. time_t wanted_end_time_s,
  304. size_t *pages_total,
  305. size_t *pages_found_pass4,
  306. size_t *pages_pending,
  307. size_t *pages_overlapping,
  308. time_t *optimal_end_time_s,
  309. bool populate_gaps
  310. ) {
  311. // we will recalculate these, so zero them
  312. *pages_pending = 0;
  313. *pages_overlapping = 0;
  314. *optimal_end_time_s = 0;
  315. bool first;
  316. Pvoid_t *PValue;
  317. Word_t this_page_start_time;
  318. struct page_details *pd;
  319. size_t gaps = 0;
  320. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  321. // ------------------------------------------------------------------------
  322. // PASS 1: remove the preprocessing flags from the pages in PDC
  323. first = true;
  324. this_page_start_time = 0;
  325. while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) {
  326. pd = *PValue;
  327. pd->status &= ~(PDC_PAGE_SKIP|PDC_PAGE_PREPROCESSED);
  328. }
  329. // ------------------------------------------------------------------------
  330. // PASS 2: emulate processing to find the useful pages
  331. time_t now_s = wanted_start_time_s;
  332. time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
  333. if(!dt_s)
  334. dt_s = default_rrd_update_every;
  335. size_t pages_pass2 = 0, pages_pass3 = 0;
  336. while((pd = pdc_find_page_for_time(
  337. JudyL_page_array, now_s, &gaps,
  338. PDC_PAGE_PREPROCESSED, 0))) {
  339. pd->status |= PDC_PAGE_PREPROCESSED;
  340. pages_pass2++;
  341. if(pd->update_every_s)
  342. dt_s = pd->update_every_s;
  343. if(populate_gaps && pd->first_time_s > now_s)
  344. pgc_inject_gap(ctx, metric, now_s, pd->first_time_s);
  345. now_s = pd->last_time_s + dt_s;
  346. if(now_s > wanted_end_time_s) {
  347. *optimal_end_time_s = pd->last_time_s;
  348. break;
  349. }
  350. }
  351. if(populate_gaps && now_s < wanted_end_time_s)
  352. pgc_inject_gap(ctx, metric, now_s, wanted_end_time_s);
  353. // ------------------------------------------------------------------------
  354. // PASS 3: mark as skipped all the pages not useful
  355. first = true;
  356. this_page_start_time = 0;
  357. while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) {
  358. pd = *PValue;
  359. internal_fatal(pd->metric_id != metric_id, "pd has wrong metric_id");
  360. if(!(pd->status & PDC_PAGE_PREPROCESSED)) {
  361. (*pages_overlapping)++;
  362. pd->status |= PDC_PAGE_SKIP;
  363. pd->status &= ~(PDC_PAGE_READY | PDC_PAGE_DISK_PENDING);
  364. continue;
  365. }
  366. pages_pass3++;
  367. if(!pd->page) {
  368. pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, (Word_t) metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
  369. if(pd->page) {
  370. (*pages_found_pass4)++;
  371. pd->status &= ~PDC_PAGE_DISK_PENDING;
  372. pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED | PDC_PAGE_PRELOADED_PASS4;
  373. if(pgc_page_data(pd->page) == DBENGINE_EMPTY_PAGE)
  374. pd->status |= PDC_PAGE_EMPTY;
  375. }
  376. else if(!(pd->status & PDC_PAGE_FAILED) && (pd->status & PDC_PAGE_DATAFILE_ACQUIRED)) {
  377. (*pages_pending)++;
  378. pd->status |= PDC_PAGE_DISK_PENDING;
  379. internal_fatal(pd->status & PDC_PAGE_SKIP, "page is disk pending and skipped");
  380. internal_fatal(!pd->datafile.ptr, "datafile is NULL");
  381. internal_fatal(!pd->datafile.extent.bytes, "datafile.extent.bytes zero");
  382. internal_fatal(!pd->datafile.extent.pos, "datafile.extent.pos is zero");
  383. internal_fatal(!pd->datafile.fileno, "datafile.fileno is zero");
  384. }
  385. }
  386. else {
  387. pd->status &= ~PDC_PAGE_DISK_PENDING;
  388. pd->status |= (PDC_PAGE_READY | PDC_PAGE_PRELOADED);
  389. }
  390. }
  391. internal_fatal(pages_pass2 != pages_pass3,
  392. "DBENGINE: page count does not match");
  393. *pages_total = pages_pass2;
  394. return gaps;
  395. }
  396. typedef void (*page_found_callback_t)(PGC_PAGE *page, void *data);
  397. static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, page_found_callback_t callback, void *callback_data) {
  398. uuid_t *uuid = mrg_metric_uuid(main_mrg, metric);
  399. Word_t metric_id = mrg_metric_id(main_mrg, metric);
  400. time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC);
  401. time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC);
  402. size_t pages_found = 0;
  403. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  404. struct rrdengine_datafile *datafile;
  405. for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
  406. struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL,
  407. wanted_start_time_s,
  408. wanted_end_time_s);
  409. if (unlikely(!j2_header))
  410. continue;
  411. time_t journal_start_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
  412. // the datafile possibly contains useful data for this query
  413. size_t journal_metric_count = (size_t)j2_header->metric_count;
  414. struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
  415. struct journal_metric_list *uuid_entry = bsearch(uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_uuid_compare);
  416. if (unlikely(!uuid_entry)) {
  417. // our UUID is not in this datafile
  418. journalfile_v2_data_release(datafile->journalfile);
  419. continue;
  420. }
  421. struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) j2_header + uuid_entry->page_offset);
  422. struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header));
  423. struct journal_extent_list *extent_list = (void *)((uint8_t *)j2_header + j2_header->extent_offset);
  424. uint32_t uuid_page_entries = page_list_header->entries;
  425. for (uint32_t index = 0; index < uuid_page_entries; index++) {
  426. struct journal_page_list *page_entry_in_journal = &page_list[index];
  427. time_t page_first_time_s = page_entry_in_journal->delta_start_s + journal_start_time_s;
  428. time_t page_last_time_s = page_entry_in_journal->delta_end_s + journal_start_time_s;
  429. TIME_RANGE_COMPARE prc = is_page_in_time_range(page_first_time_s, page_last_time_s, wanted_start_time_s, wanted_end_time_s);
  430. if(prc == PAGE_IS_IN_THE_PAST)
  431. continue;
  432. if(prc == PAGE_IS_IN_THE_FUTURE)
  433. break;
  434. time_t page_update_every_s = page_entry_in_journal->update_every_s;
  435. size_t page_length = page_entry_in_journal->page_length;
  436. if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item
  437. // add this page to open cache
  438. bool added = false;
  439. struct extent_io_data ei = {
  440. .pos = extent_list[page_entry_in_journal->extent_index].datafile_offset,
  441. .bytes = extent_list[page_entry_in_journal->extent_index].datafile_size,
  442. .page_length = page_length,
  443. .file = datafile->file,
  444. .fileno = datafile->fileno,
  445. };
  446. PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, (PGC_ENTRY) {
  447. .hot = false,
  448. .section = (Word_t) ctx,
  449. .metric_id = metric_id,
  450. .start_time_s = page_first_time_s,
  451. .end_time_s = page_last_time_s,
  452. .update_every_s = page_update_every_s,
  453. .data = datafile,
  454. .size = 0,
  455. .custom_data = (uint8_t *) &ei,
  456. }, &added);
  457. if(!added)
  458. datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
  459. callback(page, callback_data);
  460. pgc_page_release(open_cache, page);
  461. pages_found++;
  462. }
  463. }
  464. journalfile_v2_data_release(datafile->journalfile);
  465. }
  466. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  467. return pages_found;
  468. }
  469. void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) {
  470. struct rrdengine_datafile *datafile = pgc_page_data(page);
  471. if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) // for pd
  472. return;
  473. Pvoid_t *PValue = PDCJudyLIns(JudyL_pptr, pgc_page_start_time_s(page), PJE0);
  474. if (!PValue || PValue == PJERR)
  475. fatal("DBENGINE: corrupted judy array");
  476. if (unlikely(*PValue)) {
  477. datafile_release(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS);
  478. return;
  479. }
  480. Word_t metric_id = pgc_page_metric(page);
  481. // let's add it to the judy
  482. struct extent_io_data *ei = pgc_page_custom_data(open_cache, page);
  483. struct page_details *pd = page_details_get();
  484. *PValue = pd;
  485. pd->datafile.extent.pos = ei->pos;
  486. pd->datafile.extent.bytes = ei->bytes;
  487. pd->datafile.file = ei->file;
  488. pd->datafile.fileno = ei->fileno;
  489. pd->first_time_s = pgc_page_start_time_s(page);
  490. pd->last_time_s = pgc_page_end_time_s(page);
  491. pd->datafile.ptr = datafile;
  492. pd->page_length = ei->page_length;
  493. pd->update_every_s = pgc_page_update_every_s(page);
  494. pd->metric_id = metric_id;
  495. pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED;
  496. }
  497. // Return a judyL will all pages that have start_time_ut and end_time_ut
  498. // Pvalue of the judy will be the end time for that page
  499. // DBENGINE2:
  500. #define time_delta(finish, pass) do { if(pass) { usec_t t = pass; (pass) = (finish) - (pass); (finish) = t; } } while(0)
  501. static Pvoid_t get_page_list(
  502. struct rrdengine_instance *ctx,
  503. METRIC *metric,
  504. usec_t start_time_ut,
  505. usec_t end_time_ut,
  506. size_t *pages_to_load,
  507. time_t *optimal_end_time_s
  508. ) {
  509. *optimal_end_time_s = 0;
  510. Pvoid_t JudyL_page_array = (Pvoid_t) NULL;
  511. time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC);
  512. time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC);
  513. size_t pages_found_in_main_cache = 0,
  514. pages_found_in_open_cache = 0,
  515. pages_found_in_journals_v2 = 0,
  516. pages_found_pass4 = 0,
  517. pages_pending = 0,
  518. pages_overlapping = 0,
  519. pages_total = 0;
  520. size_t cache_gaps = 0, query_gaps = 0;
  521. bool done_v2 = false, done_open = false;
  522. usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0;
  523. // --------------------------------------------------------------
  524. // PASS 1: Check what the main page cache has available
  525. pass1_ut = now_monotonic_usec();
  526. size_t pages_pass1 = get_page_list_from_pgc(main_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s,
  527. &JudyL_page_array, &cache_gaps,
  528. false, PDC_PAGE_SOURCE_MAIN_CACHE);
  529. query_gaps += cache_gaps;
  530. pages_found_in_main_cache += pages_pass1;
  531. pages_total += pages_pass1;
  532. if(pages_found_in_main_cache && !cache_gaps) {
  533. query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
  534. &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
  535. optimal_end_time_s, false);
  536. if (pages_total && !query_gaps)
  537. goto we_are_done;
  538. }
  539. // --------------------------------------------------------------
  540. // PASS 2: Check what the open journal page cache has available
  541. // these will be loaded from disk
  542. pass2_ut = now_monotonic_usec();
  543. size_t pages_pass2 = get_page_list_from_pgc(open_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s,
  544. &JudyL_page_array, &cache_gaps,
  545. true, PDC_PAGE_SOURCE_OPEN_CACHE);
  546. query_gaps += cache_gaps;
  547. pages_found_in_open_cache += pages_pass2;
  548. pages_total += pages_pass2;
  549. done_open = true;
  550. if(pages_found_in_open_cache) {
  551. query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
  552. &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
  553. optimal_end_time_s, false);
  554. if (pages_total && !query_gaps)
  555. goto we_are_done;
  556. }
  557. // --------------------------------------------------------------
  558. // PASS 3: Check Journal v2 to fill the gaps
  559. pass3_ut = now_monotonic_usec();
  560. size_t pages_pass3 = get_page_list_from_journal_v2(ctx, metric, start_time_ut, end_time_ut,
  561. add_page_details_from_journal_v2, &JudyL_page_array);
  562. pages_found_in_journals_v2 += pages_pass3;
  563. pages_total += pages_pass3;
  564. done_v2 = true;
  565. // --------------------------------------------------------------
  566. // PASS 4: Check the cache again
  567. // and calculate the time gaps in the query
  568. // THIS IS REQUIRED AFTER JOURNAL V2 LOOKUP
  569. pass4_ut = now_monotonic_usec();
  570. query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
  571. &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
  572. optimal_end_time_s, true);
  573. we_are_done:
  574. if(pages_to_load)
  575. *pages_to_load = pages_pending;
  576. usec_t finish_ut = now_monotonic_usec();
  577. time_delta(finish_ut, pass4_ut);
  578. time_delta(finish_ut, pass3_ut);
  579. time_delta(finish_ut, pass2_ut);
  580. time_delta(finish_ut, pass1_ut);
  581. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_main_cache_lookup, pass1_ut, __ATOMIC_RELAXED);
  582. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_open_cache_lookup, pass2_ut, __ATOMIC_RELAXED);
  583. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_journal_v2_lookup, pass3_ut, __ATOMIC_RELAXED);
  584. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_pass4_lookup, pass4_ut, __ATOMIC_RELAXED);
  585. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries, 1, __ATOMIC_RELAXED);
  586. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_planned_with_gaps, (query_gaps) ? 1 : 0, __ATOMIC_RELAXED);
  587. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_open, done_open ? 1 : 0, __ATOMIC_RELAXED);
  588. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_journal_v2, done_v2 ? 1 : 0, __ATOMIC_RELAXED);
  589. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_total, pages_total, __ATOMIC_RELAXED);
  590. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
  591. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_open_cache, pages_found_in_open_cache, __ATOMIC_RELAXED);
  592. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_journal_v2, pages_found_in_journals_v2, __ATOMIC_RELAXED);
  593. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
  594. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache_at_pass4, pages_found_pass4, __ATOMIC_RELAXED);
  595. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, pages_pending, __ATOMIC_RELAXED);
  596. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_overlapping_skipped, pages_overlapping, __ATOMIC_RELAXED);
  597. return JudyL_page_array;
  598. }
  599. inline void rrdeng_prep_wait(PDC *pdc) {
  600. if (unlikely(pdc && !pdc->prep_done)) {
  601. usec_t started_ut = now_monotonic_usec();
  602. completion_wait_for(&pdc->prep_completion);
  603. pdc->prep_done = true;
  604. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_wait_for_prep, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED);
  605. }
  606. }
  607. void rrdeng_prep_query(PDC *pdc) {
  608. size_t pages_to_load = 0;
  609. pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric,
  610. pdc->start_time_s * USEC_PER_SEC,
  611. pdc->end_time_s * USEC_PER_SEC,
  612. &pages_to_load,
  613. &pdc->optimal_end_time_s);
  614. if (pages_to_load && pdc->page_list_JudyL) {
  615. pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work()
  616. usec_t start_ut = now_monotonic_usec();
  617. // if(likely(priority == STORAGE_PRIORITY_BEST_EFFORT))
  618. // dbengine_load_page_list_directly(ctx, handle->pdc);
  619. // else
  620. pdc_route_asynchronously(pdc->ctx, pdc);
  621. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_to_route, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  622. }
  623. else
  624. completion_mark_complete(&pdc->page_completion);
  625. completion_mark_complete(&pdc->prep_completion);
  626. pdc_release_and_destroy_if_unreferenced(pdc, true, true);
  627. }
  628. /**
  629. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  630. * @param ctx DB context
  631. * @param handle query handle as initialized
  632. * @param start_time_ut inclusive starting time in usec
  633. * @param end_time_ut inclusive ending time in usec
  634. * @return 1 / 0 (pages found or not found)
  635. */
  636. void pg_cache_preload(struct rrdeng_query_handle *handle) {
  637. if (unlikely(!handle || !handle->metric))
  638. return;
  639. __atomic_add_fetch(&handle->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED);
  640. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
  641. handle->pdc = pdc_get();
  642. handle->pdc->metric = mrg_metric_dup(main_mrg, handle->metric);
  643. handle->pdc->start_time_s = handle->start_time_s;
  644. handle->pdc->end_time_s = handle->end_time_s;
  645. handle->pdc->priority = handle->priority;
  646. handle->pdc->optimal_end_time_s = handle->end_time_s;
  647. handle->pdc->ctx = handle->ctx;
  648. handle->pdc->refcount = 1;
  649. netdata_spinlock_init(&handle->pdc->refcount_spinlock);
  650. completion_init(&handle->pdc->prep_completion);
  651. completion_init(&handle->pdc->page_completion);
  652. if(ctx_is_available_for_queries(handle->ctx)) {
  653. handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread
  654. rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL);
  655. }
  656. else {
  657. completion_mark_complete(&handle->pdc->prep_completion);
  658. completion_mark_complete(&handle->pdc->page_completion);
  659. }
  660. }
  661. /*
  662. * Searches for the first page between start_time and end_time and gets a reference.
  663. * start_time and end_time are inclusive.
  664. * If index is NULL lookup by UUID (id).
  665. */
  666. struct pgc_page *pg_cache_lookup_next(
  667. struct rrdengine_instance *ctx,
  668. PDC *pdc,
  669. time_t now_s,
  670. time_t last_update_every_s,
  671. size_t *entries
  672. ) {
  673. if (unlikely(!pdc))
  674. return NULL;
  675. rrdeng_prep_wait(pdc);
  676. if (unlikely(!pdc->page_list_JudyL))
  677. return NULL;
  678. usec_t start_ut = now_monotonic_usec();
  679. size_t gaps = 0;
  680. bool waited = false, preloaded;
  681. PGC_PAGE *page = NULL;
  682. while(!page) {
  683. bool page_from_pd = false;
  684. preloaded = false;
  685. struct page_details *pd = pdc_find_page_for_time(
  686. pdc->page_list_JudyL, now_s, &gaps,
  687. PDC_PAGE_PROCESSED, PDC_PAGE_EMPTY);
  688. if (!pd)
  689. break;
  690. page = pd->page;
  691. page_from_pd = true;
  692. preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
  693. if(!page) {
  694. if(!completion_is_done(&pdc->page_completion)) {
  695. page = pgc_page_get_and_acquire(main_cache, (Word_t)ctx,
  696. pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
  697. page_from_pd = false;
  698. preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
  699. }
  700. if(!page) {
  701. pdc->completed_jobs =
  702. completion_wait_for_a_job(&pdc->page_completion, pdc->completed_jobs);
  703. page = pd->page;
  704. page_from_pd = true;
  705. preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
  706. waited = true;
  707. }
  708. }
  709. if(page && pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
  710. pdc_page_status_set(pd, PDC_PAGE_EMPTY);
  711. if(!page || pdc_page_status_check(pd, PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | PDC_PAGE_EMPTY)) {
  712. page = NULL;
  713. continue;
  714. }
  715. // we now have page and is not empty
  716. time_t page_start_time_s = pgc_page_start_time_s(page);
  717. time_t page_end_time_s = pgc_page_end_time_s(page);
  718. time_t page_update_every_s = pgc_page_update_every_s(page);
  719. size_t page_length = pgc_page_data_size(main_cache, page);
  720. if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) {
  721. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED);
  722. pgc_page_to_clean_evict_or_release(main_cache, page);
  723. pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
  724. pd->page = page = NULL;
  725. continue;
  726. }
  727. else if(page_length > RRDENG_BLOCK_SIZE) {
  728. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_size_skipped, 1, __ATOMIC_RELAXED);
  729. pgc_page_to_clean_evict_or_release(main_cache, page);
  730. pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
  731. pd->page = page = NULL;
  732. continue;
  733. }
  734. else {
  735. if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
  736. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
  737. pd->update_every_s = page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
  738. }
  739. size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx));
  740. size_t entries_by_time = page_entries_by_time(page_start_time_s, page_end_time_s, page_update_every_s);
  741. if(unlikely(entries_by_size < entries_by_time)) {
  742. time_t fixed_page_end_time_s = (time_t)(page_start_time_s + (entries_by_size - 1) * page_update_every_s);
  743. pd->last_time_s = page_end_time_s = pgc_page_fix_end_time_s(page, fixed_page_end_time_s);
  744. entries_by_time = (page_end_time_s - (page_start_time_s - page_update_every_s)) / page_update_every_s;
  745. internal_fatal(entries_by_size != entries_by_time, "DBENGINE: wrong entries by time again!");
  746. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_entries_fixed, 1, __ATOMIC_RELAXED);
  747. }
  748. *entries = entries_by_time;
  749. }
  750. if(unlikely(page_end_time_s < now_s)) {
  751. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_past_time_skipped, 1, __ATOMIC_RELAXED);
  752. pgc_page_release(main_cache, page);
  753. pdc_page_status_set(pd, PDC_PAGE_SKIP | PDC_PAGE_RELEASED);
  754. pd->page = page = NULL;
  755. continue;
  756. }
  757. if(page_from_pd)
  758. // PDC_PAGE_RELEASED is for pdc_destroy() to not release the page twice - the caller will release it
  759. pdc_page_status_set(pd, PDC_PAGE_RELEASED | PDC_PAGE_PROCESSED);
  760. else
  761. pdc_page_status_set(pd, PDC_PAGE_PROCESSED);
  762. }
  763. if(gaps && !pdc->executed_with_gaps)
  764. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_executed_with_gaps, 1, __ATOMIC_RELAXED);
  765. pdc->executed_with_gaps = +gaps;
  766. if(page) {
  767. if(waited)
  768. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_loaded, 1, __ATOMIC_RELAXED);
  769. else
  770. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_loaded, 1, __ATOMIC_RELAXED);
  771. }
  772. else {
  773. if(waited)
  774. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_failed, 1, __ATOMIC_RELAXED);
  775. else
  776. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_failed, 1, __ATOMIC_RELAXED);
  777. }
  778. if(waited) {
  779. if(preloaded)
  780. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  781. else
  782. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  783. }
  784. else {
  785. if(preloaded)
  786. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  787. else
  788. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
  789. }
  790. return page;
  791. }
  792. void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s,
  793. struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) {
  794. if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item
  795. fatal("DBENGINE: cannot acquire datafile to put page in open cache");
  796. struct extent_io_data ext_io_data = {
  797. .file = datafile->file,
  798. .fileno = datafile->fileno,
  799. .pos = extent_offset,
  800. .bytes = extent_size,
  801. .page_length = page_length
  802. };
  803. PGC_ENTRY page_entry = {
  804. .hot = true,
  805. .section = section,
  806. .metric_id = metric_id,
  807. .start_time_s = start_time_s,
  808. .end_time_s = end_time_s,
  809. .update_every_s = update_every_s,
  810. .size = 0,
  811. .data = datafile,
  812. .custom_data = (uint8_t *) &ext_io_data,
  813. };
  814. internal_fatal(!datafile->fileno, "DBENGINE: datafile supplied does not have a number");
  815. bool added = true;
  816. PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, page_entry, &added);
  817. int tries = 100;
  818. while(!added && page_entry.end_time_s > pgc_page_end_time_s(page) && tries--) {
  819. pgc_page_to_clean_evict_or_release(open_cache, page);
  820. page = pgc_page_add_and_acquire(open_cache, page_entry, &added);
  821. }
  822. if(!added) {
  823. datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
  824. internal_fatal(page_entry.end_time_s > pgc_page_end_time_s(page),
  825. "DBENGINE: cannot add longer page to open cache");
  826. }
  827. pgc_page_release(open_cache, (PGC_PAGE *)page);
  828. }
  829. size_t dynamic_open_cache_size(void) {
  830. size_t main_cache_size = pgc_get_wanted_cache_size(main_cache);
  831. size_t target_size = main_cache_size / 100 * 5;
  832. if(target_size < 2 * 1024 * 1024)
  833. target_size = 2 * 1024 * 1024;
  834. return target_size;
  835. }
  836. size_t dynamic_extent_cache_size(void) {
  837. size_t main_cache_size = pgc_get_wanted_cache_size(main_cache);
  838. size_t target_size = main_cache_size / 100 * 5;
  839. if(target_size < 3 * 1024 * 1024)
  840. target_size = 3 * 1024 * 1024;
  841. return target_size;
  842. }
  843. void pgc_and_mrg_initialize(void)
  844. {
  845. main_mrg = mrg_create();
  846. size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL;
  847. size_t main_cache_size = (target_cache_size / 100) * 95;
  848. size_t open_cache_size = 0;
  849. size_t extent_cache_size = (target_cache_size / 100) * 5;
  850. if(extent_cache_size < 3 * 1024 * 1024) {
  851. extent_cache_size = 3 * 1024 * 1024;
  852. main_cache_size = target_cache_size - extent_cache_size;
  853. }
  854. main_cache = pgc_create(
  855. "main_cache",
  856. main_cache_size,
  857. main_cache_free_clean_page_callback,
  858. (size_t) rrdeng_pages_per_extent,
  859. main_cache_flush_dirty_page_init_callback,
  860. main_cache_flush_dirty_page_callback,
  861. 10,
  862. 10240, // if there are that many threads, evict so many at once!
  863. 1000, //
  864. 5, // don't delay too much other threads
  865. PGC_OPTIONS_AUTOSCALE, // AUTOSCALE = 2x max hot pages
  866. 0, // 0 = as many as the system cpus
  867. 0
  868. );
  869. open_cache = pgc_create(
  870. "open_cache",
  871. open_cache_size, // the default is 1MB
  872. open_cache_free_clean_page_callback,
  873. 1,
  874. NULL,
  875. open_cache_flush_dirty_page_callback,
  876. 10,
  877. 10240, // if there are that many threads, evict that many at once!
  878. 1000, //
  879. 3, // don't delay too much other threads
  880. PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE,
  881. 0, // 0 = as many as the system cpus
  882. sizeof(struct extent_io_data)
  883. );
  884. pgc_set_dynamic_target_cache_size_callback(open_cache, dynamic_open_cache_size);
  885. extent_cache = pgc_create(
  886. "extent_cache",
  887. extent_cache_size,
  888. extent_cache_free_clean_page_callback,
  889. 1,
  890. NULL,
  891. extent_cache_flush_dirty_page_callback,
  892. 5,
  893. 10, // it will lose up to that extents at once!
  894. 100, //
  895. 2, // don't delay too much other threads
  896. PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE,
  897. 0, // 0 = as many as the system cpus
  898. 0
  899. );
  900. pgc_set_dynamic_target_cache_size_callback(extent_cache, dynamic_extent_cache_size);
  901. }