pdc.c 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "pdc.h"
  4. struct extent_page_details_list {
  5. uv_file file;
  6. uint64_t extent_offset;
  7. uint32_t extent_size;
  8. unsigned number_of_pages_in_JudyL;
  9. Pvoid_t page_details_by_metric_id_JudyL;
  10. struct page_details_control *pdc;
  11. struct rrdengine_datafile *datafile;
  12. struct rrdeng_cmd *cmd;
  13. bool head_to_datafile_extent_queries_pending_for_extent;
  14. struct {
  15. struct extent_page_details_list *prev;
  16. struct extent_page_details_list *next;
  17. } query;
  18. };
  19. typedef struct datafile_extent_offset_list {
  20. uv_file file;
  21. unsigned fileno;
  22. Pvoid_t extent_pd_list_by_extent_offset_JudyL;
  23. } DEOL;
  24. // ----------------------------------------------------------------------------
  25. // PDC cache
  26. static struct {
  27. struct {
  28. ARAL *ar;
  29. } pdc;
  30. struct {
  31. ARAL *ar;
  32. } pd;
  33. struct {
  34. ARAL *ar;
  35. } epdl;
  36. struct {
  37. ARAL *ar;
  38. } deol;
  39. } pdc_globals = {};
  40. void pdc_init(void) {
  41. pdc_globals.pdc.ar = aral_create(
  42. "dbengine-pdc",
  43. sizeof(PDC),
  44. 0,
  45. 65536,
  46. NULL,
  47. NULL, NULL, false, false
  48. );
  49. }
  50. PDC *pdc_get(void) {
  51. PDC *pdc = aral_mallocz(pdc_globals.pdc.ar);
  52. memset(pdc, 0, sizeof(PDC));
  53. return pdc;
  54. }
  55. static void pdc_release(PDC *pdc) {
  56. aral_freez(pdc_globals.pdc.ar, pdc);
  57. }
  58. size_t pdc_cache_size(void) {
  59. return aral_overhead(pdc_globals.pdc.ar) + aral_structures(pdc_globals.pdc.ar);
  60. }
  61. // ----------------------------------------------------------------------------
  62. // PD cache
  63. void page_details_init(void) {
  64. pdc_globals.pd.ar = aral_create(
  65. "dbengine-pd",
  66. sizeof(struct page_details),
  67. 0,
  68. 65536,
  69. NULL,
  70. NULL, NULL, false, false
  71. );
  72. }
  73. struct page_details *page_details_get(void) {
  74. struct page_details *pd = aral_mallocz(pdc_globals.pd.ar);
  75. memset(pd, 0, sizeof(struct page_details));
  76. return pd;
  77. }
  78. static void page_details_release(struct page_details *pd) {
  79. aral_freez(pdc_globals.pd.ar, pd);
  80. }
  81. size_t pd_cache_size(void) {
  82. return aral_overhead(pdc_globals.pd.ar) + aral_structures(pdc_globals.pd.ar);
  83. }
  84. // ----------------------------------------------------------------------------
  85. // epdl cache
  86. void epdl_init(void) {
  87. pdc_globals.epdl.ar = aral_create(
  88. "dbengine-epdl",
  89. sizeof(EPDL),
  90. 0,
  91. 65536,
  92. NULL,
  93. NULL, NULL, false, false
  94. );
  95. }
  96. static EPDL *epdl_get(void) {
  97. EPDL *epdl = aral_mallocz(pdc_globals.epdl.ar);
  98. memset(epdl, 0, sizeof(EPDL));
  99. return epdl;
  100. }
  101. static void epdl_release(EPDL *epdl) {
  102. aral_freez(pdc_globals.epdl.ar, epdl);
  103. }
  104. size_t epdl_cache_size(void) {
  105. return aral_overhead(pdc_globals.epdl.ar) + aral_structures(pdc_globals.epdl.ar);
  106. }
  107. // ----------------------------------------------------------------------------
  108. // deol cache
  109. void deol_init(void) {
  110. pdc_globals.deol.ar = aral_create(
  111. "dbengine-deol",
  112. sizeof(DEOL),
  113. 0,
  114. 65536,
  115. NULL,
  116. NULL, NULL, false, false
  117. );
  118. }
  119. static DEOL *deol_get(void) {
  120. DEOL *deol = aral_mallocz(pdc_globals.deol.ar);
  121. memset(deol, 0, sizeof(DEOL));
  122. return deol;
  123. }
  124. static void deol_release(DEOL *deol) {
  125. aral_freez(pdc_globals.deol.ar, deol);
  126. }
  127. size_t deol_cache_size(void) {
  128. return aral_overhead(pdc_globals.deol.ar) + aral_structures(pdc_globals.deol.ar);
  129. }
  130. // ----------------------------------------------------------------------------
  131. // extent with buffer cache
  132. static struct {
  133. struct {
  134. SPINLOCK spinlock;
  135. struct extent_buffer *available_items;
  136. size_t available;
  137. } protected;
  138. struct {
  139. size_t allocated;
  140. size_t allocated_bytes;
  141. } atomics;
  142. size_t max_size;
  143. } extent_buffer_globals = {
  144. .protected = {
  145. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  146. .available_items = NULL,
  147. .available = 0,
  148. },
  149. .atomics = {
  150. .allocated = 0,
  151. .allocated_bytes = 0,
  152. },
  153. .max_size = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE,
  154. };
  155. void extent_buffer_init(void) {
  156. size_t max_extent_uncompressed = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE;
  157. size_t max_size = (size_t)LZ4_compressBound(MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE);
  158. if(max_size < max_extent_uncompressed)
  159. max_size = max_extent_uncompressed;
  160. extent_buffer_globals.max_size = max_size;
  161. }
  162. void extent_buffer_cleanup1(void) {
  163. struct extent_buffer *item = NULL;
  164. if(!spinlock_trylock(&extent_buffer_globals.protected.spinlock))
  165. return;
  166. if(extent_buffer_globals.protected.available_items && extent_buffer_globals.protected.available > 1) {
  167. item = extent_buffer_globals.protected.available_items;
  168. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, item, cache.prev, cache.next);
  169. extent_buffer_globals.protected.available--;
  170. }
  171. spinlock_unlock(&extent_buffer_globals.protected.spinlock);
  172. if(item) {
  173. size_t bytes = sizeof(struct extent_buffer) + item->bytes;
  174. freez(item);
  175. __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
  176. __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
  177. }
  178. }
  179. struct extent_buffer *extent_buffer_get(size_t size) {
  180. internal_fatal(size > extent_buffer_globals.max_size, "DBENGINE: extent size is too big");
  181. struct extent_buffer *eb = NULL;
  182. if(size < extent_buffer_globals.max_size)
  183. size = extent_buffer_globals.max_size;
  184. spinlock_lock(&extent_buffer_globals.protected.spinlock);
  185. if(likely(extent_buffer_globals.protected.available_items)) {
  186. eb = extent_buffer_globals.protected.available_items;
  187. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
  188. extent_buffer_globals.protected.available--;
  189. }
  190. spinlock_unlock(&extent_buffer_globals.protected.spinlock);
  191. if(unlikely(eb && eb->bytes < size)) {
  192. size_t bytes = sizeof(struct extent_buffer) + eb->bytes;
  193. freez(eb);
  194. eb = NULL;
  195. __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
  196. __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
  197. }
  198. if(unlikely(!eb)) {
  199. size_t bytes = sizeof(struct extent_buffer) + size;
  200. eb = mallocz(bytes);
  201. eb->bytes = size;
  202. __atomic_add_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
  203. __atomic_add_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
  204. }
  205. return eb;
  206. }
  207. void extent_buffer_release(struct extent_buffer *eb) {
  208. if(unlikely(!eb)) return;
  209. spinlock_lock(&extent_buffer_globals.protected.spinlock);
  210. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
  211. extent_buffer_globals.protected.available++;
  212. spinlock_unlock(&extent_buffer_globals.protected.spinlock);
  213. }
  214. size_t extent_buffer_cache_size(void) {
  215. return __atomic_load_n(&extent_buffer_globals.atomics.allocated_bytes, __ATOMIC_RELAXED);
  216. }
  217. // ----------------------------------------------------------------------------
  218. // epdl logic
  219. static void epdl_destroy(EPDL *epdl)
  220. {
  221. Pvoid_t *pd_by_start_time_s_JudyL;
  222. Word_t metric_id_index = 0;
  223. bool metric_id_first = true;
  224. while ((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(
  225. epdl->page_details_by_metric_id_JudyL,
  226. &metric_id_index, &metric_id_first)))
  227. PDCJudyLFreeArray(pd_by_start_time_s_JudyL, PJE0);
  228. PDCJudyLFreeArray(&epdl->page_details_by_metric_id_JudyL, PJE0);
  229. epdl_release(epdl);
  230. }
  231. static void epdl_mark_all_not_loaded_pages_as_failed(EPDL *epdl, PDC_PAGE_STATUS tags, size_t *statistics_counter)
  232. {
  233. size_t pages_matched = 0;
  234. Word_t metric_id_index = 0;
  235. bool metric_id_first = true;
  236. Pvoid_t *pd_by_start_time_s_JudyL;
  237. while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
  238. Word_t start_time_index = 0;
  239. bool start_time_first = true;
  240. Pvoid_t *PValue;
  241. while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
  242. struct page_details *pd = *PValue;
  243. if(!pd->page && !pdc_page_status_check(pd, PDC_PAGE_FAILED|PDC_PAGE_READY)) {
  244. pdc_page_status_set(pd, PDC_PAGE_FAILED | tags);
  245. pages_matched++;
  246. }
  247. }
  248. }
  249. if(pages_matched && statistics_counter)
  250. __atomic_add_fetch(statistics_counter, pages_matched, __ATOMIC_RELAXED);
  251. }
  252. /*
  253. static bool epdl_check_if_pages_are_already_in_cache(struct rrdengine_instance *ctx, EPDL *epdl, PDC_PAGE_STATUS tags)
  254. {
  255. size_t count_remaining = 0;
  256. size_t found = 0;
  257. Word_t metric_id_index = 0;
  258. bool metric_id_first = true;
  259. Pvoid_t *pd_by_start_time_s_JudyL;
  260. while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
  261. Word_t start_time_index = 0;
  262. bool start_time_first = true;
  263. Pvoid_t *PValue;
  264. while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
  265. struct page_details *pd = *PValue;
  266. if (pd->page)
  267. continue;
  268. pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
  269. if (pd->page) {
  270. found++;
  271. pdc_page_status_set(pd, PDC_PAGE_READY | tags);
  272. }
  273. else
  274. count_remaining++;
  275. }
  276. }
  277. if(found) {
  278. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_preloaded, found, __ATOMIC_RELAXED);
  279. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, found, __ATOMIC_RELAXED);
  280. }
  281. return count_remaining == 0;
  282. }
  283. */
  284. // ----------------------------------------------------------------------------
  285. // PDC logic
  286. static void pdc_destroy(PDC *pdc) {
  287. mrg_metric_release(main_mrg, pdc->metric);
  288. completion_destroy(&pdc->prep_completion);
  289. completion_destroy(&pdc->page_completion);
  290. Pvoid_t *PValue;
  291. struct page_details *pd;
  292. Word_t time_index = 0;
  293. bool first_then_next = true;
  294. size_t unroutable = 0, cancelled = 0;
  295. while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
  296. pd = *PValue;
  297. // no need for atomics here - we are done...
  298. PDC_PAGE_STATUS status = pd->status;
  299. if(status & PDC_PAGE_DATAFILE_ACQUIRED) {
  300. datafile_release(pd->datafile.ptr, DATAFILE_ACQUIRE_PAGE_DETAILS);
  301. pd->datafile.ptr = NULL;
  302. }
  303. internal_fatal(pd->datafile.ptr, "DBENGINE: page details has a datafile.ptr that is not released.");
  304. if(!pd->page && !(status & (PDC_PAGE_READY | PDC_PAGE_FAILED | PDC_PAGE_RELEASED | PDC_PAGE_SKIP | PDC_PAGE_INVALID | PDC_PAGE_CANCELLED))) {
  305. // pdc_page_status_set(pd, PDC_PAGE_FAILED);
  306. unroutable++;
  307. }
  308. else if(!pd->page && (status & PDC_PAGE_CANCELLED))
  309. cancelled++;
  310. if(pd->page && !(status & PDC_PAGE_RELEASED)) {
  311. pgc_page_release(main_cache, pd->page);
  312. // pdc_page_status_set(pd, PDC_PAGE_RELEASED);
  313. }
  314. page_details_release(pd);
  315. }
  316. PDCJudyLFreeArray(&pdc->page_list_JudyL, PJE0);
  317. __atomic_sub_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
  318. __atomic_sub_fetch(&pdc->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED);
  319. pdc_release(pdc);
  320. if(unroutable)
  321. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_unroutable, unroutable, __ATOMIC_RELAXED);
  322. if(cancelled)
  323. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_cancelled, cancelled, __ATOMIC_RELAXED);
  324. }
  325. void pdc_acquire(PDC *pdc) {
  326. spinlock_lock(&pdc->refcount_spinlock);
  327. if(pdc->refcount < 1)
  328. fatal("DBENGINE: pdc is not referenced and cannot be acquired");
  329. pdc->refcount++;
  330. spinlock_unlock(&pdc->refcount_spinlock);
  331. }
  332. bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) {
  333. if(unlikely(!pdc))
  334. return true;
  335. spinlock_lock(&pdc->refcount_spinlock);
  336. if(pdc->refcount <= 0)
  337. fatal("DBENGINE: pdc is not referenced and cannot be released");
  338. pdc->refcount--;
  339. if (pdc->refcount <= 1 && worker) {
  340. // when 1 refcount is remaining, and we are a worker,
  341. // we can mark the job completed:
  342. // - if the remaining refcount is from the query caller, we will wake it up
  343. // - if the remaining refcount is from another worker, the query thread is already away
  344. completion_mark_complete(&pdc->page_completion);
  345. }
  346. if (pdc->refcount == 0) {
  347. spinlock_unlock(&pdc->refcount_spinlock);
  348. pdc_destroy(pdc);
  349. return true;
  350. }
  351. spinlock_unlock(&pdc->refcount_spinlock);
  352. return false;
  353. }
  354. void epdl_cmd_queued(void *epdl_ptr, struct rrdeng_cmd *cmd) {
  355. EPDL *epdl = epdl_ptr;
  356. epdl->cmd = cmd;
  357. }
  358. void epdl_cmd_dequeued(void *epdl_ptr) {
  359. EPDL *epdl = epdl_ptr;
  360. epdl->cmd = NULL;
  361. }
  362. static struct rrdeng_cmd *epdl_get_cmd(void *epdl_ptr) {
  363. EPDL *epdl = epdl_ptr;
  364. return epdl->cmd;
  365. }
  366. static bool epdl_pending_add(EPDL *epdl) {
  367. bool added_new;
  368. spinlock_lock(&epdl->datafile->extent_queries.spinlock);
  369. Pvoid_t *PValue = JudyLIns(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
  370. internal_fatal(!PValue || PValue == PJERR, "DBENGINE: corrupted pending extent judy");
  371. EPDL *base = *PValue;
  372. if(!base) {
  373. added_new = true;
  374. epdl->head_to_datafile_extent_queries_pending_for_extent = true;
  375. }
  376. else {
  377. added_new = false;
  378. epdl->head_to_datafile_extent_queries_pending_for_extent = false;
  379. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_extent_merged, 1, __ATOMIC_RELAXED);
  380. if(base->pdc->priority > epdl->pdc->priority)
  381. rrdeng_req_cmd(epdl_get_cmd, base, epdl->pdc->priority);
  382. }
  383. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, epdl, query.prev, query.next);
  384. *PValue = base;
  385. spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
  386. return added_new;
  387. }
  388. static void epdl_pending_del(EPDL *epdl) {
  389. spinlock_lock(&epdl->datafile->extent_queries.spinlock);
  390. if(epdl->head_to_datafile_extent_queries_pending_for_extent) {
  391. epdl->head_to_datafile_extent_queries_pending_for_extent = false;
  392. int rc = JudyLDel(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
  393. (void) rc;
  394. internal_fatal(!rc, "DBENGINE: epdl not found in pending list");
  395. }
  396. spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
  397. }
  398. void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent_page_details_list_t exec_first_extent_list, execute_extent_page_details_list_t exec_rest_extent_list)
  399. {
  400. Pvoid_t *PValue;
  401. Pvoid_t *PValue1;
  402. Pvoid_t *PValue2;
  403. Word_t time_index = 0;
  404. struct page_details *pd = NULL;
  405. // this is the entire page list
  406. // Lets do some deduplication
  407. // 1. Per datafile
  408. // 2. Per extent
  409. // 3. Pages per extent will be added to the cache either as acquired or not
  410. Pvoid_t JudyL_datafile_list = NULL;
  411. DEOL *deol;
  412. EPDL *epdl;
  413. if (pdc->page_list_JudyL) {
  414. bool first_then_next = true;
  415. while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
  416. pd = *PValue;
  417. internal_fatal(!pd,
  418. "DBENGINE: pdc page list has an empty page details entry");
  419. if (!(pd->status & PDC_PAGE_DISK_PENDING))
  420. continue;
  421. internal_fatal(!(pd->status & PDC_PAGE_DATAFILE_ACQUIRED),
  422. "DBENGINE: page details has not acquired the datafile");
  423. internal_fatal((pd->status & (PDC_PAGE_READY | PDC_PAGE_FAILED)),
  424. "DBENGINE: page details has disk pending flag but it is ready/failed");
  425. internal_fatal(pd->page,
  426. "DBENGINE: page details has a page linked to it, but it is marked for loading");
  427. PValue1 = PDCJudyLIns(&JudyL_datafile_list, pd->datafile.fileno, PJE0);
  428. if (PValue1 && !*PValue1) {
  429. *PValue1 = deol = deol_get();
  430. deol->extent_pd_list_by_extent_offset_JudyL = NULL;
  431. deol->fileno = pd->datafile.fileno;
  432. }
  433. else
  434. deol = *PValue1;
  435. PValue2 = PDCJudyLIns(&deol->extent_pd_list_by_extent_offset_JudyL, pd->datafile.extent.pos, PJE0);
  436. if (PValue2 && !*PValue2) {
  437. *PValue2 = epdl = epdl_get();
  438. epdl->page_details_by_metric_id_JudyL = NULL;
  439. epdl->number_of_pages_in_JudyL = 0;
  440. epdl->file = pd->datafile.file;
  441. epdl->extent_offset = pd->datafile.extent.pos;
  442. epdl->extent_size = pd->datafile.extent.bytes;
  443. epdl->datafile = pd->datafile.ptr;
  444. }
  445. else
  446. epdl = *PValue2;
  447. epdl->number_of_pages_in_JudyL++;
  448. Pvoid_t *pd_by_first_time_s_judyL = PDCJudyLIns(&epdl->page_details_by_metric_id_JudyL, pd->metric_id, PJE0);
  449. Pvoid_t *pd_pptr = PDCJudyLIns(pd_by_first_time_s_judyL, pd->first_time_s, PJE0);
  450. *pd_pptr = pd;
  451. }
  452. size_t extent_list_no = 0;
  453. Word_t datafile_no = 0;
  454. first_then_next = true;
  455. while((PValue = PDCJudyLFirstThenNext(JudyL_datafile_list, &datafile_no, &first_then_next))) {
  456. deol = *PValue;
  457. bool first_then_next_extent = true;
  458. Word_t pos = 0;
  459. while ((PValue = PDCJudyLFirstThenNext(deol->extent_pd_list_by_extent_offset_JudyL, &pos, &first_then_next_extent))) {
  460. epdl = *PValue;
  461. internal_fatal(!epdl, "DBENGINE: extent_list is not populated properly");
  462. // The extent page list can be dispatched to a worker
  463. // It will need to populate the cache with "acquired" pages that are in the list (pd) only
  464. // the rest of the extent pages will be added to the cache butnot acquired
  465. pdc_acquire(pdc); // we do this for the next worker: do_read_extent_work()
  466. epdl->pdc = pdc;
  467. if(epdl_pending_add(epdl)) {
  468. if (extent_list_no++ == 0)
  469. exec_first_extent_list(ctx, epdl, pdc->priority);
  470. else
  471. exec_rest_extent_list(ctx, epdl, pdc->priority);
  472. }
  473. }
  474. PDCJudyLFreeArray(&deol->extent_pd_list_by_extent_offset_JudyL, PJE0);
  475. deol_release(deol);
  476. }
  477. PDCJudyLFreeArray(&JudyL_datafile_list, PJE0);
  478. }
  479. pdc_release_and_destroy_if_unreferenced(pdc, true, true);
  480. }
  481. void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
  482. if(flags & RRDENG_PAGE_PAST_COLLECTION)
  483. buffer_strcat(wb, "PAST_COLLECTION ");
  484. if(flags & RRDENG_PAGE_REPEATED_COLLECTION)
  485. buffer_strcat(wb, "REPEATED_COLLECTION ");
  486. if(flags & RRDENG_PAGE_BIG_GAP)
  487. buffer_strcat(wb, "BIG_GAP ");
  488. if(flags & RRDENG_PAGE_GAP)
  489. buffer_strcat(wb, "GAP ");
  490. if(flags & RRDENG_PAGE_FUTURE_POINT)
  491. buffer_strcat(wb, "FUTURE_POINT ");
  492. if(flags & RRDENG_PAGE_CREATED_IN_FUTURE)
  493. buffer_strcat(wb, "CREATED_IN_FUTURE ");
  494. if(flags & RRDENG_PAGE_COMPLETED_IN_FUTURE)
  495. buffer_strcat(wb, "COMPLETED_IN_FUTURE ");
  496. if(flags & RRDENG_PAGE_UNALIGNED)
  497. buffer_strcat(wb, "UNALIGNED ");
  498. if(flags & RRDENG_PAGE_CONFLICT)
  499. buffer_strcat(wb, "CONFLICT ");
  500. if(flags & RRDENG_PAGE_FULL)
  501. buffer_strcat(wb, "PAGE_FULL");
  502. if(flags & RRDENG_PAGE_COLLECT_FINALIZE)
  503. buffer_strcat(wb, "COLLECT_FINALIZE");
  504. if(flags & RRDENG_PAGE_UPDATE_EVERY_CHANGE)
  505. buffer_strcat(wb, "UPDATE_EVERY_CHANGE");
  506. if(flags & RRDENG_PAGE_STEP_TOO_SMALL)
  507. buffer_strcat(wb, "STEP_TOO_SMALL");
  508. if(flags & RRDENG_PAGE_STEP_UNALIGNED)
  509. buffer_strcat(wb, "STEP_UNALIGNED");
  510. }
  511. inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
  512. time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
  513. time_t end_time_s;
  514. size_t entries;
  515. switch (descr->type) {
  516. case PAGE_METRICS:
  517. case PAGE_TIER:
  518. end_time_s = descr->end_time_ut / USEC_PER_SEC;
  519. entries = 0;
  520. break;
  521. case PAGE_GORILLA_METRICS:
  522. end_time_s = start_time_s + descr->gorilla.delta_time_s;
  523. entries = descr->gorilla.entries;
  524. break;
  525. default:
  526. fatal("Unknown page type: %uc\n", descr->type);
  527. }
  528. return validate_page(
  529. (uuid_t *)descr->uuid,
  530. start_time_s,
  531. end_time_s,
  532. 0,
  533. descr->page_length,
  534. descr->type,
  535. entries,
  536. now_s,
  537. overwrite_zero_update_every_s,
  538. have_read_error,
  539. "loaded", 0);
  540. }
  541. VALIDATED_PAGE_DESCRIPTOR validate_page(
  542. uuid_t *uuid,
  543. time_t start_time_s,
  544. time_t end_time_s,
  545. time_t update_every_s, // can be zero, if unknown
  546. size_t page_length,
  547. uint8_t page_type,
  548. size_t entries, // can be zero, if unknown
  549. time_t now_s, // can be zero, to disable future timestamp check
  550. time_t overwrite_zero_update_every_s, // can be zero, if unknown
  551. bool have_read_error,
  552. const char *msg,
  553. RRDENG_COLLECT_PAGE_FLAGS flags) {
  554. VALIDATED_PAGE_DESCRIPTOR vd = {
  555. .start_time_s = start_time_s,
  556. .end_time_s = end_time_s,
  557. .update_every_s = update_every_s,
  558. .page_length = page_length,
  559. .type = page_type,
  560. .is_valid = true,
  561. };
  562. vd.point_size = page_type_size[vd.type];
  563. switch (page_type) {
  564. case PAGE_METRICS:
  565. case PAGE_TIER:
  566. // always calculate entries by size
  567. vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
  568. // allow to be called without entries (when loading pages from disk)
  569. if(!entries)
  570. entries = vd.entries;
  571. break;
  572. case PAGE_GORILLA_METRICS:
  573. internal_fatal(entries == 0, "0 number of entries found on gorilla page");
  574. vd.entries = entries;
  575. break;
  576. default:
  577. // TODO: should set vd.is_valid false instead?
  578. fatal("Unknown page type: %uc", page_type);
  579. }
  580. // allow to be called without update every (when loading pages from disk)
  581. if(!update_every_s) {
  582. vd.update_every_s = (vd.entries > 1) ? ((vd.end_time_s - vd.start_time_s) / (time_t) (vd.entries - 1))
  583. : overwrite_zero_update_every_s;
  584. update_every_s = vd.update_every_s;
  585. }
  586. // another such set of checks exists in
  587. // update_metric_retention_and_granularity_by_uuid()
  588. bool updated = false;
  589. size_t max_page_length = RRDENG_BLOCK_SIZE;
  590. // If gorilla can not compress the data we might end up needing slightly more
  591. // than 4KiB. However, gorilla pages extend the page length by increments of
  592. // 512 bytes.
  593. max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
  594. if( have_read_error ||
  595. vd.page_length == 0 ||
  596. vd.page_length > max_page_length ||
  597. vd.start_time_s > vd.end_time_s ||
  598. (now_s && vd.end_time_s > now_s) ||
  599. vd.start_time_s <= 0 ||
  600. vd.end_time_s <= 0 ||
  601. vd.update_every_s < 0 ||
  602. (vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
  603. (vd.update_every_s == 0 && vd.entries > 1))
  604. {
  605. vd.is_valid = false;
  606. }
  607. else {
  608. if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s))
  609. updated = true;
  610. if (likely(vd.update_every_s)) {
  611. size_t entries_by_time = page_entries_by_time(vd.start_time_s, vd.end_time_s, vd.update_every_s);
  612. if (vd.entries != entries_by_time) {
  613. if (overwrite_zero_update_every_s < vd.update_every_s)
  614. vd.update_every_s = overwrite_zero_update_every_s;
  615. time_t new_end_time_s = (time_t)(vd.start_time_s + (vd.entries - 1) * vd.update_every_s);
  616. if(new_end_time_s <= vd.end_time_s) {
  617. // end time is wrong
  618. vd.end_time_s = new_end_time_s;
  619. }
  620. else {
  621. // update every is wrong
  622. vd.update_every_s = overwrite_zero_update_every_s;
  623. vd.end_time_s = (time_t)(vd.start_time_s + (vd.entries - 1) * vd.update_every_s);
  624. }
  625. updated = true;
  626. }
  627. }
  628. else if(overwrite_zero_update_every_s) {
  629. vd.update_every_s = overwrite_zero_update_every_s;
  630. updated = true;
  631. }
  632. }
  633. if(unlikely(!vd.is_valid || updated)) {
  634. #ifndef NETDATA_INTERNAL_CHECKS
  635. nd_log_limit_static_global_var(erl, 1, 0);
  636. #endif
  637. char uuid_str[UUID_STR_LEN + 1];
  638. uuid_unparse(*uuid, uuid_str);
  639. BUFFER *wb = NULL;
  640. if(flags) {
  641. wb = buffer_create(0, NULL);
  642. collect_page_flags_to_buffer(wb, flags);
  643. }
  644. if(!vd.is_valid) {
  645. #ifdef NETDATA_INTERNAL_CHECKS
  646. internal_error(true,
  647. #else
  648. nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
  649. #endif
  650. "DBENGINE: metric '%s' %s invalid page of type %u "
  651. "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
  652. uuid_str, msg, vd.type,
  653. vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
  654. );
  655. }
  656. else {
  657. const char *err_valid = (vd.is_valid) ? "" : "found invalid, ";
  658. const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, ";
  659. const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, ";
  660. const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, ";
  661. const char *err_length = (vd.page_length == page_length) ? "" : "page length updated, ";
  662. const char *err_entries = (vd.entries == entries) ? "" : "entries updated, ";
  663. const char *err_future = (now_s && vd.end_time_s <= now_s) ? "" : "future end time, ";
  664. #ifdef NETDATA_INTERNAL_CHECKS
  665. internal_error(true,
  666. #else
  667. nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
  668. #endif
  669. "DBENGINE: metric '%s' %s page of type %u "
  670. "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
  671. "found inconsistent - the right is "
  672. "from %ld to %ld, update every %ld, page length %zu, entries %zu: "
  673. "%s%s%s%s%s%s%s",
  674. uuid_str, msg, vd.type,
  675. start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"",
  676. vd.start_time_s, vd.end_time_s, vd.update_every_s, vd.page_length, vd.entries,
  677. err_valid, err_start, err_end, err_update, err_length, err_entries, err_future
  678. );
  679. }
  680. buffer_free(wb);
  681. }
  682. return vd;
  683. }
  684. static inline struct page_details *epdl_get_pd_load_link_list_from_metric_start_time(EPDL *epdl, Word_t metric_id, time_t start_time_s) {
  685. if(unlikely(epdl->head_to_datafile_extent_queries_pending_for_extent))
  686. // stop appending more pages to this epdl
  687. epdl_pending_del(epdl);
  688. struct page_details *pd_list = NULL;
  689. for(EPDL *ep = epdl; ep ;ep = ep->query.next) {
  690. Pvoid_t *pd_by_start_time_s_judyL = PDCJudyLGet(ep->page_details_by_metric_id_JudyL, metric_id, PJE0);
  691. internal_fatal(pd_by_start_time_s_judyL == PJERR, "DBENGINE: corrupted extent metrics JudyL");
  692. if (unlikely(pd_by_start_time_s_judyL && *pd_by_start_time_s_judyL)) {
  693. Pvoid_t *pd_pptr = PDCJudyLGet(*pd_by_start_time_s_judyL, start_time_s, PJE0);
  694. internal_fatal(pd_pptr == PJERR, "DBENGINE: corrupted metric page details JudyHS");
  695. if(likely(pd_pptr && *pd_pptr)) {
  696. struct page_details *pd = *pd_pptr;
  697. internal_fatal(metric_id != pd->metric_id, "DBENGINE: metric ids do not match");
  698. if(likely(!pd->page)) {
  699. if (unlikely(__atomic_load_n(&ep->pdc->workers_should_stop, __ATOMIC_RELAXED)))
  700. pdc_page_status_set(pd, PDC_PAGE_FAILED | PDC_PAGE_CANCELLED);
  701. else
  702. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(pd_list, pd, load.prev, load.next);
  703. }
  704. }
  705. }
  706. }
  707. return pd_list;
  708. }
  709. static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *epdl, struct rrdeng_extent_page_descr *descr, const char *msg) {
  710. char uuid[UUID_STR_LEN] = "";
  711. time_t start_time_s = 0;
  712. time_t end_time_s = 0;
  713. bool used_epdl = false;
  714. bool used_descr = false;
  715. if (descr) {
  716. start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
  717. switch (descr->type) {
  718. case PAGE_METRICS:
  719. case PAGE_TIER:
  720. end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
  721. break;
  722. case PAGE_GORILLA_METRICS:
  723. end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
  724. break;
  725. }
  726. uuid_unparse_lower(descr->uuid, uuid);
  727. used_descr = true;
  728. }
  729. else {
  730. struct page_details *pd = NULL;
  731. Word_t start = 0;
  732. Pvoid_t *pd_by_start_time_s_judyL = PDCJudyLFirst(epdl->page_details_by_metric_id_JudyL, &start, PJE0);
  733. if(pd_by_start_time_s_judyL) {
  734. start = 0;
  735. Pvoid_t *pd_pptr = PDCJudyLFirst(*pd_by_start_time_s_judyL, &start, PJE0);
  736. if(pd_pptr) {
  737. pd = *pd_pptr;
  738. start_time_s = pd->first_time_s;
  739. end_time_s = pd->last_time_s;
  740. METRIC *metric = (METRIC *)pd->metric_id;
  741. uuid_t *u = mrg_metric_uuid(main_mrg, metric);
  742. uuid_unparse_lower(*u, uuid);
  743. used_epdl = true;
  744. }
  745. }
  746. }
  747. if(!used_epdl && !used_descr && epdl->pdc) {
  748. start_time_s = epdl->pdc->start_time_s;
  749. end_time_s = epdl->pdc->end_time_s;
  750. }
  751. char start_time_str[LOG_DATE_LENGTH + 1] = "";
  752. if(start_time_s)
  753. log_date(start_time_str, LOG_DATE_LENGTH, start_time_s);
  754. char end_time_str[LOG_DATE_LENGTH + 1] = "";
  755. if(end_time_s)
  756. log_date(end_time_str, LOG_DATE_LENGTH, end_time_s);
  757. nd_log_limit_static_global_var(erl, 1, 0);
  758. nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
  759. "DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) "
  760. "%s from %ld (%s) to %ld (%s) %s%s: "
  761. "%s",
  762. epdl->datafile->fileno, ctx->config.tier,
  763. epdl->extent_offset, epdl->extent_size,
  764. used_epdl ? "to extract page (PD)" : used_descr ? "expected page (DESCR)" : "part of a query (PDC)",
  765. start_time_s, start_time_str, end_time_s, end_time_str,
  766. used_epdl || used_descr ? " of metric " : "",
  767. used_epdl || used_descr ? uuid : "",
  768. msg);
  769. }
  770. static bool epdl_populate_pages_from_extent_data(
  771. struct rrdengine_instance *ctx,
  772. void *data,
  773. size_t data_length,
  774. EPDL *epdl,
  775. bool worker,
  776. PDC_PAGE_STATUS tags,
  777. bool cached_extent)
  778. {
  779. int ret;
  780. unsigned i, count;
  781. void *uncompressed_buf = NULL;
  782. uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0;
  783. bool have_read_error = false;
  784. /* persistent structures */
  785. struct rrdeng_df_extent_header *header;
  786. struct rrdeng_df_extent_trailer *trailer;
  787. struct extent_buffer *eb = NULL;
  788. uLong crc;
  789. bool can_use_data = true;
  790. if(data_length < sizeof(*header) + sizeof(header->descr[0]) + sizeof(*trailer)) {
  791. can_use_data = false;
  792. // added to satisfy the requirements of older compilers (prevent warnings)
  793. payload_length = 0;
  794. payload_offset = 0;
  795. trailer_offset = 0;
  796. count = 0;
  797. header = NULL;
  798. trailer = NULL;
  799. }
  800. else {
  801. header = data;
  802. payload_length = header->payload_length;
  803. count = header->number_of_pages;
  804. payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
  805. trailer_offset = data_length - sizeof(*trailer);
  806. trailer = data + trailer_offset;
  807. }
  808. if( !can_use_data ||
  809. count < 1 ||
  810. count > MAX_PAGES_PER_EXTENT ||
  811. (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) ||
  812. (payload_length != trailer_offset - payload_offset) ||
  813. (data_length != payload_offset + payload_length + sizeof(*trailer))
  814. ) {
  815. epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID");
  816. return false;
  817. }
  818. crc = crc32(0L, Z_NULL, 0);
  819. crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer));
  820. ret = crc32cmp(trailer->checksum, crc);
  821. if (unlikely(ret)) {
  822. ctx_io_error(ctx);
  823. have_read_error = true;
  824. epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED");
  825. }
  826. if(worker)
  827. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION);
  828. if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) {
  829. // find the uncompressed extent size
  830. uncompressed_payload_length = 0;
  831. for (i = 0; i < count; ++i) {
  832. size_t page_length = header->descr[i].page_length;
  833. if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
  834. (header->descr[i].type == PAGE_GORILLA_METRICS &&
  835. (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
  836. have_read_error = true;
  837. break;
  838. }
  839. uncompressed_payload_length += header->descr[i].page_length;
  840. }
  841. if(unlikely(uncompressed_payload_length > MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE))
  842. have_read_error = true;
  843. if(likely(!have_read_error)) {
  844. eb = extent_buffer_get(uncompressed_payload_length);
  845. uncompressed_buf = eb->data;
  846. ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf,
  847. (int) payload_length, (int) uncompressed_payload_length);
  848. __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
  849. __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED);
  850. }
  851. }
  852. if(worker)
  853. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_LOOKUP);
  854. size_t stats_data_from_main_cache = 0;
  855. size_t stats_data_from_extent = 0;
  856. size_t stats_load_compressed = 0;
  857. size_t stats_load_uncompressed = 0;
  858. size_t stats_load_invalid_page = 0;
  859. size_t stats_cache_hit_while_inserting = 0;
  860. uint32_t page_offset = 0, page_length;
  861. time_t now_s = max_acceptable_collected_time();
  862. for (i = 0; i < count; i++, page_offset += page_length) {
  863. page_length = header->descr[i].page_length;
  864. time_t start_time_s = (time_t) (header->descr[i].start_time_ut / USEC_PER_SEC);
  865. if(!page_length || !start_time_s) {
  866. char log[200 + 1];
  867. snprintfz(log, sizeof(log) - 1, "page %u (out of %u) is EMPTY", i, count);
  868. epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
  869. continue;
  870. }
  871. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &header->descr[i].uuid, (Word_t)ctx);
  872. Word_t metric_id = (Word_t)metric;
  873. if(!metric) {
  874. char log[200 + 1];
  875. snprintfz(log, sizeof(log) - 1, "page %u (out of %u) has unknown UUID", i, count);
  876. epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
  877. continue;
  878. }
  879. mrg_metric_release(main_mrg, metric);
  880. struct page_details *pd_list = epdl_get_pd_load_link_list_from_metric_start_time(epdl, metric_id, start_time_s);
  881. if(likely(!pd_list))
  882. continue;
  883. VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
  884. &header->descr[i], now_s,
  885. (pd_list) ? pd_list->update_every_s : 0,
  886. have_read_error);
  887. if(worker)
  888. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_ALLOCATION);
  889. PGD *pgd;
  890. if (unlikely(!vd.is_valid)) {
  891. pgd = PGD_EMPTY;
  892. stats_load_invalid_page++;
  893. }
  894. else {
  895. if (RRD_NO_COMPRESSION == header->compression_algorithm) {
  896. pgd = pgd_create_from_disk_data(header->descr[i].type,
  897. data + payload_offset + page_offset,
  898. vd.page_length);
  899. stats_load_uncompressed++;
  900. }
  901. else {
  902. if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) {
  903. char log[200 + 1];
  904. snprintfz(log, sizeof(log) - 1, "page %u (out of %u) offset %u + page length %zu, "
  905. "exceeds the uncompressed buffer size %u",
  906. i, count, page_offset, vd.page_length, uncompressed_payload_length);
  907. epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
  908. pgd = PGD_EMPTY;
  909. stats_load_invalid_page++;
  910. }
  911. else {
  912. pgd = pgd_create_from_disk_data(header->descr[i].type,
  913. uncompressed_buf + page_offset,
  914. vd.page_length);
  915. stats_load_compressed++;
  916. }
  917. }
  918. }
  919. if(worker)
  920. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_POPULATION);
  921. PGC_ENTRY page_entry = {
  922. .hot = false,
  923. .section = (Word_t)ctx,
  924. .metric_id = metric_id,
  925. .start_time_s = vd.start_time_s,
  926. .end_time_s = vd.end_time_s,
  927. .update_every_s = (uint32_t) vd.update_every_s,
  928. .size = pgd_memory_footprint(pgd), // the footprint of the entire PGD, for accurate memory management
  929. .data = pgd,
  930. };
  931. bool added = true;
  932. PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
  933. if (false == added) {
  934. pgd_free(pgd);
  935. stats_cache_hit_while_inserting++;
  936. stats_data_from_main_cache++;
  937. }
  938. else
  939. stats_data_from_extent++;
  940. struct page_details *pd = pd_list;
  941. do {
  942. if(pd != pd_list)
  943. pgc_page_dup(main_cache, page);
  944. pd->page = page;
  945. pdc_page_status_set(pd, PDC_PAGE_READY | tags | (pgd_is_empty(pgd) ? PDC_PAGE_EMPTY : 0));
  946. pd = pd->load.next;
  947. } while(pd);
  948. if(worker)
  949. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_LOOKUP);
  950. }
  951. if(stats_data_from_main_cache)
  952. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, stats_data_from_main_cache, __ATOMIC_RELAXED);
  953. if(cached_extent)
  954. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_extent_cache, stats_data_from_extent, __ATOMIC_RELAXED);
  955. else {
  956. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_disk, stats_data_from_extent, __ATOMIC_RELAXED);
  957. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.extents_loaded_from_disk, 1, __ATOMIC_RELAXED);
  958. }
  959. if(stats_cache_hit_while_inserting)
  960. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_loaded_but_cache_hit_while_inserting, stats_cache_hit_while_inserting, __ATOMIC_RELAXED);
  961. if(stats_load_compressed)
  962. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_compressed, stats_load_compressed, __ATOMIC_RELAXED);
  963. if(stats_load_uncompressed)
  964. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_uncompressed, stats_load_uncompressed, __ATOMIC_RELAXED);
  965. if(stats_load_invalid_page)
  966. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_invalid_page_in_extent, stats_load_invalid_page, __ATOMIC_RELAXED);
  967. if(worker)
  968. worker_is_idle();
  969. extent_buffer_release(eb);
  970. return true;
  971. }
  972. static inline void *datafile_extent_read(struct rrdengine_instance *ctx, uv_file file, unsigned pos, unsigned size_bytes)
  973. {
  974. void *buffer;
  975. uv_fs_t request;
  976. unsigned real_io_size = ALIGN_BYTES_CEILING(size_bytes);
  977. int ret = posix_memalign(&buffer, RRDFILE_ALIGNMENT, real_io_size);
  978. if (unlikely(ret))
  979. fatal("DBENGINE: posix_memalign(): %s", strerror(ret));
  980. uv_buf_t iov = uv_buf_init(buffer, real_io_size);
  981. ret = uv_fs_read(NULL, &request, file, &iov, 1, pos, NULL);
  982. if (unlikely(-1 == ret)) {
  983. ctx_io_error(ctx);
  984. posix_memfree(buffer);
  985. buffer = NULL;
  986. }
  987. else
  988. ctx_io_read_op_bytes(ctx, real_io_size);
  989. uv_fs_req_cleanup(&request);
  990. return buffer;
  991. }
  992. static inline void datafile_extent_read_free(void *buffer) {
  993. posix_memfree(buffer);
  994. }
  995. void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *epdl, bool worker) {
  996. if(worker)
  997. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
  998. size_t *statistics_counter = NULL;
  999. PDC_PAGE_STATUS not_loaded_pages_tag = 0, loaded_pages_tag = 0;
  1000. bool should_stop = __atomic_load_n(&epdl->pdc->workers_should_stop, __ATOMIC_RELAXED);
  1001. for(EPDL *ep = epdl->query.next; ep ;ep = ep->query.next) {
  1002. internal_fatal(ep->datafile != epdl->datafile, "DBENGINE: datafiles do not match");
  1003. internal_fatal(ep->extent_offset != epdl->extent_offset, "DBENGINE: extent offsets do not match");
  1004. internal_fatal(ep->extent_size != epdl->extent_size, "DBENGINE: extent sizes do not match");
  1005. internal_fatal(ep->file != epdl->file, "DBENGINE: files do not match");
  1006. if(!__atomic_load_n(&ep->pdc->workers_should_stop, __ATOMIC_RELAXED)) {
  1007. should_stop = false;
  1008. break;
  1009. }
  1010. }
  1011. if(unlikely(should_stop)) {
  1012. statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_cancelled;
  1013. not_loaded_pages_tag = PDC_PAGE_CANCELLED;
  1014. goto cleanup;
  1015. }
  1016. bool extent_found_in_cache = false;
  1017. void *extent_compressed_data = NULL;
  1018. PGC_PAGE *extent_cache_page = pgc_page_get_and_acquire(
  1019. extent_cache, (Word_t)ctx,
  1020. (Word_t)epdl->datafile->fileno, (time_t)epdl->extent_offset,
  1021. PGC_SEARCH_EXACT);
  1022. if(extent_cache_page) {
  1023. extent_compressed_data = pgc_page_data(extent_cache_page);
  1024. internal_fatal(epdl->extent_size != pgc_page_data_size(extent_cache, extent_cache_page),
  1025. "DBENGINE: cache size does not match the expected size");
  1026. loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_CACHE;
  1027. not_loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_CACHE;
  1028. extent_found_in_cache = true;
  1029. }
  1030. else {
  1031. if(worker)
  1032. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_MMAP);
  1033. void *extent_data = datafile_extent_read(ctx, epdl->file, epdl->extent_offset, epdl->extent_size);
  1034. if(extent_data != NULL) {
  1035. void *copied_extent_compressed_data = dbengine_extent_alloc(epdl->extent_size);
  1036. memcpy(copied_extent_compressed_data, extent_data, epdl->extent_size);
  1037. datafile_extent_read_free(extent_data);
  1038. if(worker)
  1039. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
  1040. bool added = false;
  1041. extent_cache_page = pgc_page_add_and_acquire(extent_cache, (PGC_ENTRY) {
  1042. .hot = false,
  1043. .section = (Word_t) ctx,
  1044. .metric_id = (Word_t) epdl->datafile->fileno,
  1045. .start_time_s = (time_t) epdl->extent_offset,
  1046. .size = epdl->extent_size,
  1047. .end_time_s = 0,
  1048. .update_every_s = 0,
  1049. .data = copied_extent_compressed_data,
  1050. }, &added);
  1051. if (!added) {
  1052. dbengine_extent_free(copied_extent_compressed_data, epdl->extent_size);
  1053. internal_fatal(epdl->extent_size != pgc_page_data_size(extent_cache, extent_cache_page),
  1054. "DBENGINE: cache size does not match the expected size");
  1055. }
  1056. extent_compressed_data = pgc_page_data(extent_cache_page);
  1057. loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_DISK;
  1058. not_loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_DISK;
  1059. }
  1060. }
  1061. if(extent_compressed_data) {
  1062. // Need to decompress and then process the pagelist
  1063. bool extent_used = epdl_populate_pages_from_extent_data(
  1064. ctx, extent_compressed_data, epdl->extent_size,
  1065. epdl, worker, loaded_pages_tag, extent_found_in_cache);
  1066. if(extent_used) {
  1067. // since the extent was used, all the pages that are not
  1068. // loaded from this extent, were not found in the extent
  1069. not_loaded_pages_tag |= PDC_PAGE_FAILED_NOT_IN_EXTENT;
  1070. statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_not_found;
  1071. }
  1072. else {
  1073. not_loaded_pages_tag |= PDC_PAGE_FAILED_INVALID_EXTENT;
  1074. statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_invalid_extent;
  1075. }
  1076. }
  1077. else {
  1078. not_loaded_pages_tag |= PDC_PAGE_FAILED_TO_MAP_EXTENT;
  1079. statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_cant_mmap_extent;
  1080. }
  1081. if(extent_cache_page)
  1082. pgc_page_release(extent_cache, extent_cache_page);
  1083. cleanup:
  1084. // remove it from the datafile extent_queries
  1085. // this can be called multiple times safely
  1086. epdl_pending_del(epdl);
  1087. // mark all pending pages as failed
  1088. for(EPDL *ep = epdl; ep ;ep = ep->query.next) {
  1089. epdl_mark_all_not_loaded_pages_as_failed(
  1090. ep, not_loaded_pages_tag, statistics_counter);
  1091. }
  1092. for(EPDL *ep = epdl, *next = NULL; ep ; ep = next) {
  1093. next = ep->query.next;
  1094. completion_mark_complete_a_job(&ep->pdc->page_completion);
  1095. pdc_release_and_destroy_if_unreferenced(ep->pdc, true, false);
  1096. // Free the Judy that holds the requested pagelist and the extents
  1097. epdl_destroy(ep);
  1098. }
  1099. if(worker)
  1100. worker_is_idle();
  1101. }