pagecache.c 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. ARAL page_descr_aral = {
  5. .element_size = sizeof(struct rrdeng_page_descr),
  6. .elements = 20000,
  7. .filename = "page_descriptors",
  8. .cache_dir = &netdata_configured_cache_dir,
  9. .use_mmap = false,
  10. .internal.initialized = false
  11. };
  12. void rrdeng_page_descr_aral_go_singlethreaded(void) {
  13. page_descr_aral.internal.lockless = true;
  14. }
  15. void rrdeng_page_descr_aral_go_multithreaded(void) {
  16. page_descr_aral.internal.lockless = false;
  17. }
  18. struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void) {
  19. struct rrdeng_page_descr *descr;
  20. descr = arrayalloc_mallocz(&page_descr_aral);
  21. return descr;
  22. }
  23. void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr) {
  24. arrayalloc_freez(&page_descr_aral, descr);
  25. }
  26. void rrdeng_page_descr_use_malloc(void) {
  27. if(page_descr_aral.internal.initialized)
  28. error("DBENGINE: cannot change ARAL allocation policy after it has been initialized.");
  29. else
  30. page_descr_aral.use_mmap = false;
  31. }
  32. void rrdeng_page_descr_use_mmap(void) {
  33. if(page_descr_aral.internal.initialized)
  34. error("DBENGINE: cannot change ARAL allocation policy after it has been initialized.");
  35. else
  36. page_descr_aral.use_mmap = true;
  37. }
  38. bool rrdeng_page_descr_is_mmap(void) {
  39. return page_descr_aral.use_mmap;
  40. }
  41. /* Forward declarations */
  42. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
  43. /* always inserts into tail */
  44. static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
  45. struct rrdeng_page_descr *descr)
  46. {
  47. struct page_cache *pg_cache = &ctx->pg_cache;
  48. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  49. if (likely(NULL != pg_cache->replaceQ.tail)) {
  50. pg_cache_descr->prev = pg_cache->replaceQ.tail;
  51. pg_cache->replaceQ.tail->next = pg_cache_descr;
  52. }
  53. if (unlikely(NULL == pg_cache->replaceQ.head)) {
  54. pg_cache->replaceQ.head = pg_cache_descr;
  55. }
  56. pg_cache->replaceQ.tail = pg_cache_descr;
  57. }
  58. static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
  59. struct rrdeng_page_descr *descr)
  60. {
  61. struct page_cache *pg_cache = &ctx->pg_cache;
  62. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
  63. prev = pg_cache_descr->prev;
  64. next = pg_cache_descr->next;
  65. if (likely(NULL != prev)) {
  66. prev->next = next;
  67. }
  68. if (likely(NULL != next)) {
  69. next->prev = prev;
  70. }
  71. if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
  72. pg_cache->replaceQ.head = next;
  73. }
  74. if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
  75. pg_cache->replaceQ.tail = prev;
  76. }
  77. pg_cache_descr->prev = pg_cache_descr->next = NULL;
  78. }
  79. void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
  80. struct rrdeng_page_descr *descr)
  81. {
  82. struct page_cache *pg_cache = &ctx->pg_cache;
  83. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  84. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  85. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  86. }
  87. void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
  88. struct rrdeng_page_descr *descr)
  89. {
  90. struct page_cache *pg_cache = &ctx->pg_cache;
  91. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  92. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  93. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  94. }
  95. void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
  96. struct rrdeng_page_descr *descr)
  97. {
  98. struct page_cache *pg_cache = &ctx->pg_cache;
  99. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  100. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  101. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  102. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  103. }
  104. struct rrdeng_page_descr *pg_cache_create_descr(void)
  105. {
  106. struct rrdeng_page_descr *descr;
  107. descr = rrdeng_page_descr_mallocz();
  108. descr->page_length = 0;
  109. descr->start_time = INVALID_TIME;
  110. descr->end_time = INVALID_TIME;
  111. descr->id = NULL;
  112. descr->extent = NULL;
  113. descr->pg_cache_descr_state = 0;
  114. descr->pg_cache_descr = NULL;
  115. return descr;
  116. }
  117. /* The caller must hold page descriptor lock. */
  118. void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
  119. {
  120. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  121. if (pg_cache_descr->waiters)
  122. uv_cond_broadcast(&pg_cache_descr->cond);
  123. }
  124. void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  125. {
  126. rrdeng_page_descr_mutex_lock(ctx, descr);
  127. pg_cache_wake_up_waiters_unsafe(descr);
  128. rrdeng_page_descr_mutex_unlock(ctx, descr);
  129. }
  130. /*
  131. * The caller must hold page descriptor lock.
  132. * The lock will be released and re-acquired. The descriptor is not guaranteed
  133. * to exist after this function returns.
  134. */
  135. void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
  136. {
  137. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  138. ++pg_cache_descr->waiters;
  139. uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
  140. --pg_cache_descr->waiters;
  141. }
  142. /*
  143. * The caller must hold page descriptor lock.
  144. * The lock will be released and re-acquired. The descriptor is not guaranteed
  145. * to exist after this function returns.
  146. * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
  147. */
  148. int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
  149. {
  150. int ret;
  151. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  152. ++pg_cache_descr->waiters;
  153. ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
  154. --pg_cache_descr->waiters;
  155. return ret;
  156. }
  157. /*
  158. * Returns page flags.
  159. * The lock will be released and re-acquired. The descriptor is not guaranteed
  160. * to exist after this function returns.
  161. */
  162. unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  163. {
  164. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  165. unsigned long flags;
  166. rrdeng_page_descr_mutex_lock(ctx, descr);
  167. pg_cache_wait_event_unsafe(descr);
  168. flags = pg_cache_descr->flags;
  169. rrdeng_page_descr_mutex_unlock(ctx, descr);
  170. return flags;
  171. }
  172. /*
  173. * The caller must hold page descriptor lock.
  174. */
  175. int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  176. {
  177. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  178. if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  179. (exclusive_access && pg_cache_descr->refcnt)) {
  180. return 0;
  181. }
  182. return 1;
  183. }
  184. /*
  185. * The caller must hold page descriptor lock.
  186. * Gets a reference to the page descriptor.
  187. * Returns 1 on success and 0 on failure.
  188. */
  189. int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  190. {
  191. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  192. if (!pg_cache_can_get_unsafe(descr, exclusive_access))
  193. return 0;
  194. if (exclusive_access)
  195. pg_cache_descr->flags |= RRD_PAGE_LOCKED;
  196. ++pg_cache_descr->refcnt;
  197. return 1;
  198. }
  199. /*
  200. * The caller must hold the page descriptor lock.
  201. * This function may block doing cleanup.
  202. */
  203. void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
  204. {
  205. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  206. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  207. if (0 == --pg_cache_descr->refcnt) {
  208. pg_cache_wake_up_waiters_unsafe(descr);
  209. }
  210. }
  211. /*
  212. * This function may block doing cleanup.
  213. */
  214. void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  215. {
  216. rrdeng_page_descr_mutex_lock(ctx, descr);
  217. pg_cache_put_unsafe(descr);
  218. rrdeng_page_descr_mutex_unlock(ctx, descr);
  219. }
  220. /* The caller must hold the page cache lock */
  221. static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
  222. {
  223. struct page_cache *pg_cache = &ctx->pg_cache;
  224. pg_cache->populated_pages -= number;
  225. }
  226. static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
  227. {
  228. struct page_cache *pg_cache = &ctx->pg_cache;
  229. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  230. pg_cache_release_pages_unsafe(ctx, number);
  231. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  232. }
  233. /*
  234. * This function returns the maximum number of pages allowed in the page cache.
  235. */
  236. unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
  237. {
  238. return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers;
  239. }
  240. /*
  241. * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
  242. * number of pages below that number.
  243. */
  244. unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
  245. {
  246. return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
  247. }
  248. /*
  249. * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
  250. * cache.
  251. */
  252. unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
  253. {
  254. /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
  255. return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
  256. }
  257. /*
  258. * This function will block until it reserves #number populated pages.
  259. * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
  260. */
  261. static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  262. {
  263. struct page_cache *pg_cache = &ctx->pg_cache;
  264. unsigned failures = 0;
  265. const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
  266. unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
  267. assert(number < ctx->max_cache_pages);
  268. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  269. if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
  270. debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
  271. number);
  272. while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
  273. if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
  274. /* failed to evict */
  275. struct completion compl;
  276. struct rrdeng_cmd cmd;
  277. ++failures;
  278. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  279. completion_init(&compl);
  280. cmd.opcode = RRDENG_FLUSH_PAGES;
  281. cmd.completion = &compl;
  282. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  283. /* wait for some pages to be flushed */
  284. debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
  285. completion_wait_for(&compl);
  286. completion_destroy(&compl);
  287. if (unlikely(failures > 1)) {
  288. unsigned long slots, usecs_to_sleep;
  289. /* exponential backoff */
  290. slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
  291. usecs_to_sleep = slots * exp_backoff_slot_usec;
  292. if (usecs_to_sleep >= USEC_PER_SEC)
  293. error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC);
  294. (void)sleep_usec(usecs_to_sleep);
  295. }
  296. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  297. }
  298. }
  299. pg_cache->populated_pages += number;
  300. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  301. }
  302. /*
  303. * This function will attempt to reserve #number populated pages.
  304. * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
  305. * Returns 0 on failure and 1 on success.
  306. */
  307. static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  308. {
  309. struct page_cache *pg_cache = &ctx->pg_cache;
  310. unsigned count = 0;
  311. int ret = 0;
  312. assert(number < ctx->max_cache_pages);
  313. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  314. if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
  315. debug(D_RRDENGINE,
  316. "==Page cache full. Trying to reserve %u pages.==",
  317. number);
  318. do {
  319. if (!pg_cache_try_evict_one_page_unsafe(ctx))
  320. break;
  321. ++count;
  322. } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
  323. debug(D_RRDENGINE, "Evicted %u pages.", count);
  324. }
  325. if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
  326. pg_cache->populated_pages += number;
  327. ret = 1; /* success */
  328. }
  329. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  330. return ret;
  331. }
  332. /* The caller must hold the page cache and the page descriptor locks in that order */
  333. static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  334. {
  335. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  336. dbengine_page_free(pg_cache_descr->page);
  337. pg_cache_descr->page = NULL;
  338. pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
  339. pg_cache_release_pages_unsafe(ctx, 1);
  340. ++ctx->stats.pg_cache_evictions;
  341. }
  342. /*
  343. * The caller must hold the page cache lock.
  344. * Lock order: page cache -> replaceQ -> page descriptor
  345. * This function iterates all pages and tries to evict one.
  346. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
  347. * or it sets it to NULL if no write-back is in progress.
  348. *
  349. * Returns 1 on success and 0 on failure.
  350. */
  351. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
  352. {
  353. struct page_cache *pg_cache = &ctx->pg_cache;
  354. unsigned long old_flags;
  355. struct rrdeng_page_descr *descr;
  356. struct page_cache_descr *pg_cache_descr = NULL;
  357. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  358. for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
  359. descr = pg_cache_descr->descr;
  360. rrdeng_page_descr_mutex_lock(ctx, descr);
  361. old_flags = pg_cache_descr->flags;
  362. if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
  363. /* must evict */
  364. pg_cache_evict_unsafe(ctx, descr);
  365. pg_cache_put_unsafe(descr);
  366. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  367. rrdeng_page_descr_mutex_unlock(ctx, descr);
  368. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  369. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  370. return 1;
  371. }
  372. rrdeng_page_descr_mutex_unlock(ctx, descr);
  373. }
  374. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  375. /* failed to evict */
  376. return 0;
  377. }
  378. /**
  379. * Deletes a page from the database.
  380. * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
  381. * @param ctx is the database instance.
  382. * @param descr is the page descriptor.
  383. * @param remove_dirty must be non-zero if the page to be deleted is dirty.
  384. * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
  385. * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
  386. * NULL. Otherwise, metric_id is not set.
  387. * @return 1 if it's safe to delete the metric, 0 otherwise.
  388. */
  389. uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
  390. uint8_t is_exclusive_holder, uuid_t *metric_id)
  391. {
  392. struct page_cache *pg_cache = &ctx->pg_cache;
  393. struct page_cache_descr *pg_cache_descr = NULL;
  394. Pvoid_t *PValue;
  395. struct pg_cache_page_index *page_index = NULL;
  396. int ret;
  397. uint8_t can_delete_metric = 0;
  398. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  399. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  400. fatal_assert(NULL != PValue);
  401. page_index = *PValue;
  402. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  403. uv_rwlock_wrlock(&page_index->lock);
  404. ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  405. if (unlikely(0 == ret)) {
  406. uv_rwlock_wrunlock(&page_index->lock);
  407. if (unlikely(debug_flags & D_RRDENGINE)) {
  408. print_page_descr(descr);
  409. }
  410. goto destroy;
  411. }
  412. --page_index->page_count;
  413. if (!page_index->writers && !page_index->page_count) {
  414. can_delete_metric = 1;
  415. if (metric_id) {
  416. memcpy(metric_id, page_index->id, sizeof(uuid_t));
  417. }
  418. }
  419. uv_rwlock_wrunlock(&page_index->lock);
  420. fatal_assert(1 == ret);
  421. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  422. ++ctx->stats.pg_cache_deletions;
  423. --pg_cache->page_descriptors;
  424. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  425. rrdeng_page_descr_mutex_lock(ctx, descr);
  426. pg_cache_descr = descr->pg_cache_descr;
  427. if (!is_exclusive_holder) {
  428. /* If we don't hold an exclusive page reference get one */
  429. while (!pg_cache_try_get_unsafe(descr, 1)) {
  430. debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
  431. if (unlikely(debug_flags & D_RRDENGINE))
  432. print_page_cache_descr(descr);
  433. pg_cache_wait_event_unsafe(descr);
  434. }
  435. }
  436. if (remove_dirty) {
  437. pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
  438. } else {
  439. /* even a locked page could be dirty */
  440. while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
  441. debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
  442. if (unlikely(debug_flags & D_RRDENGINE))
  443. print_page_cache_descr(descr);
  444. pg_cache_wait_event_unsafe(descr);
  445. }
  446. }
  447. rrdeng_page_descr_mutex_unlock(ctx, descr);
  448. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  449. /* only after locking can it be safely deleted from LRU */
  450. pg_cache_replaceQ_delete(ctx, descr);
  451. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  452. pg_cache_evict_unsafe(ctx, descr);
  453. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  454. }
  455. pg_cache_put(ctx, descr);
  456. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  457. while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  458. rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
  459. (void)sleep_usec(1000); /* 1 msec */
  460. }
  461. destroy:
  462. rrdeng_page_descr_freez(descr);
  463. pg_cache_update_metric_times(page_index);
  464. return can_delete_metric;
  465. }
  466. static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
  467. {
  468. usec_t pg_start, pg_end;
  469. pg_start = descr->start_time;
  470. pg_end = descr->end_time;
  471. return (pg_start < start_time && pg_end >= start_time) ||
  472. (pg_start >= start_time && pg_start <= end_time);
  473. }
  474. static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
  475. {
  476. return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
  477. }
  478. /* The caller must hold the page index lock */
  479. static inline struct rrdeng_page_descr *
  480. find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
  481. {
  482. struct rrdeng_page_descr *descr = NULL;
  483. Pvoid_t *PValue;
  484. Word_t Index;
  485. Index = (Word_t)(start_time / USEC_PER_SEC);
  486. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  487. if (likely(NULL != PValue)) {
  488. descr = *PValue;
  489. if (is_page_in_time_range(descr, start_time, end_time)) {
  490. return descr;
  491. }
  492. }
  493. Index = (Word_t)(start_time / USEC_PER_SEC);
  494. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  495. if (likely(NULL != PValue)) {
  496. descr = *PValue;
  497. if (is_page_in_time_range(descr, start_time, end_time)) {
  498. return descr;
  499. }
  500. }
  501. return NULL;
  502. }
  503. /* Update metric oldest and latest timestamps efficiently when adding new values */
  504. void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
  505. {
  506. usec_t oldest_time = page_index->oldest_time;
  507. usec_t latest_time = page_index->latest_time;
  508. if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
  509. page_index->oldest_time = descr->start_time;
  510. }
  511. if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
  512. page_index->latest_time = descr->end_time;
  513. }
  514. }
  515. /* Update metric oldest and latest timestamps when removing old values */
  516. void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
  517. {
  518. Pvoid_t *firstPValue, *lastPValue;
  519. Word_t firstIndex, lastIndex;
  520. struct rrdeng_page_descr *descr;
  521. usec_t oldest_time = INVALID_TIME;
  522. usec_t latest_time = INVALID_TIME;
  523. uv_rwlock_rdlock(&page_index->lock);
  524. /* Find first page in range */
  525. firstIndex = (Word_t)0;
  526. firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
  527. if (likely(NULL != firstPValue)) {
  528. descr = *firstPValue;
  529. oldest_time = descr->start_time;
  530. }
  531. lastIndex = (Word_t)-1;
  532. lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
  533. if (likely(NULL != lastPValue)) {
  534. descr = *lastPValue;
  535. latest_time = descr->end_time;
  536. }
  537. uv_rwlock_rdunlock(&page_index->lock);
  538. if (unlikely(NULL == firstPValue)) {
  539. fatal_assert(NULL == lastPValue);
  540. page_index->oldest_time = page_index->latest_time = INVALID_TIME;
  541. return;
  542. }
  543. page_index->oldest_time = oldest_time;
  544. page_index->latest_time = latest_time;
  545. }
  546. /* If index is NULL lookup by UUID (descr->id) */
  547. void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
  548. struct rrdeng_page_descr *descr)
  549. {
  550. struct page_cache *pg_cache = &ctx->pg_cache;
  551. Pvoid_t *PValue;
  552. struct pg_cache_page_index *page_index;
  553. unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
  554. if (0 != pg_cache_descr_state) {
  555. /* there is page cache descriptor pre-allocated state */
  556. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  557. fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
  558. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  559. pg_cache_reserve_pages(ctx, 1);
  560. if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
  561. pg_cache_replaceQ_insert(ctx, descr);
  562. }
  563. }
  564. if (unlikely(NULL == index)) {
  565. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  566. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  567. fatal_assert(NULL != PValue);
  568. page_index = *PValue;
  569. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  570. } else {
  571. page_index = index;
  572. }
  573. uv_rwlock_wrlock(&page_index->lock);
  574. PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  575. *PValue = descr;
  576. ++page_index->page_count;
  577. pg_cache_add_new_metric_time(page_index, descr);
  578. uv_rwlock_wrunlock(&page_index->lock);
  579. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  580. ++ctx->stats.pg_cache_insertions;
  581. ++pg_cache->page_descriptors;
  582. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  583. }
  584. usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
  585. {
  586. struct page_cache *pg_cache = &ctx->pg_cache;
  587. struct rrdeng_page_descr *descr = NULL;
  588. Pvoid_t *PValue;
  589. struct pg_cache_page_index *page_index = NULL;
  590. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  591. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  592. if (likely(NULL != PValue)) {
  593. page_index = *PValue;
  594. }
  595. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  596. if (NULL == PValue) {
  597. return INVALID_TIME;
  598. }
  599. uv_rwlock_rdlock(&page_index->lock);
  600. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  601. if (NULL == descr) {
  602. uv_rwlock_rdunlock(&page_index->lock);
  603. return INVALID_TIME;
  604. }
  605. uv_rwlock_rdunlock(&page_index->lock);
  606. return descr->start_time;
  607. }
  608. /**
  609. * Return page information for the first page before point_in_time that satisfies the filter.
  610. * @param ctx DB context
  611. * @param page_index page index of a metric
  612. * @param point_in_time the pages that are searched must be older than this timestamp
  613. * @param filter decides if the page satisfies the caller's criteria
  614. * @param page_info the result of the search is set in this pointer
  615. */
  616. void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
  617. usec_t point_in_time, pg_cache_page_info_filter_t *filter,
  618. struct rrdeng_page_info *page_info)
  619. {
  620. struct page_cache *pg_cache = &ctx->pg_cache;
  621. struct rrdeng_page_descr *descr = NULL;
  622. Pvoid_t *PValue;
  623. Word_t Index;
  624. (void)pg_cache;
  625. fatal_assert(NULL != page_index);
  626. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  627. uv_rwlock_rdlock(&page_index->lock);
  628. do {
  629. PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
  630. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  631. } while (descr != NULL && !filter(descr));
  632. if (unlikely(NULL == descr)) {
  633. page_info->page_length = 0;
  634. page_info->start_time = INVALID_TIME;
  635. page_info->end_time = INVALID_TIME;
  636. } else {
  637. page_info->page_length = descr->page_length;
  638. page_info->start_time = descr->start_time;
  639. page_info->end_time = descr->end_time;
  640. }
  641. uv_rwlock_rdunlock(&page_index->lock);
  642. }
  643. /**
  644. * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
  645. * @param ctx DB context
  646. * @param id lookup by UUID
  647. * @param start_time exact starting time in usec
  648. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  649. * @return the page descriptor or NULL on failure. It can fail if:
  650. * 1. The page is already allocated to the page cache.
  651. * 2. It did not succeed to get a reference.
  652. * 3. It did not succeed to reserve a spot in the page cache.
  653. */
  654. struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
  655. usec_t start_time)
  656. {
  657. struct page_cache *pg_cache = &ctx->pg_cache;
  658. struct rrdeng_page_descr *descr = NULL;
  659. struct page_cache_descr *pg_cache_descr = NULL;
  660. unsigned long flags;
  661. Pvoid_t *PValue;
  662. struct pg_cache_page_index *page_index = NULL;
  663. Word_t Index;
  664. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  665. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  666. if (likely(NULL != PValue)) {
  667. page_index = *PValue;
  668. }
  669. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  670. if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
  671. /* Failed to find page or failed to reserve a spot in the cache */
  672. return NULL;
  673. }
  674. uv_rwlock_rdlock(&page_index->lock);
  675. Index = (Word_t)(start_time / USEC_PER_SEC);
  676. PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
  677. if (likely(NULL != PValue)) {
  678. descr = *PValue;
  679. }
  680. if (NULL == PValue || 0 == descr->page_length) {
  681. /* Failed to find non-empty page */
  682. uv_rwlock_rdunlock(&page_index->lock);
  683. pg_cache_release_pages(ctx, 1);
  684. return NULL;
  685. }
  686. rrdeng_page_descr_mutex_lock(ctx, descr);
  687. pg_cache_descr = descr->pg_cache_descr;
  688. flags = pg_cache_descr->flags;
  689. uv_rwlock_rdunlock(&page_index->lock);
  690. if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) {
  691. /* Failed to get reference or page is already populated */
  692. rrdeng_page_descr_mutex_unlock(ctx, descr);
  693. pg_cache_release_pages(ctx, 1);
  694. return NULL;
  695. }
  696. /* success */
  697. rrdeng_page_descr_mutex_unlock(ctx, descr);
  698. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  699. return descr;
  700. }
  701. /**
  702. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  703. * Does not get a reference.
  704. * @param ctx DB context
  705. * @param id UUID
  706. * @param start_time inclusive starting time in usec
  707. * @param end_time inclusive ending time in usec
  708. * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
  709. * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
  710. * If page_info_arrayp is set to NULL nothing was allocated.
  711. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  712. * @return the number of pages that overlap with the time range [start_time,end_time].
  713. */
  714. unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
  715. struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
  716. {
  717. struct page_cache *pg_cache = &ctx->pg_cache;
  718. struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
  719. struct page_cache_descr *pg_cache_descr = NULL;
  720. unsigned i, j, k, preload_count, count, page_info_array_max_size;
  721. unsigned long flags;
  722. Pvoid_t *PValue;
  723. struct pg_cache_page_index *page_index = NULL;
  724. Word_t Index;
  725. uint8_t failed_to_reserve;
  726. fatal_assert(NULL != ret_page_indexp);
  727. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  728. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  729. if (likely(NULL != PValue)) {
  730. *ret_page_indexp = page_index = *PValue;
  731. }
  732. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  733. if (NULL == PValue) {
  734. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  735. *ret_page_indexp = NULL;
  736. return 0;
  737. }
  738. uv_rwlock_rdlock(&page_index->lock);
  739. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  740. if (NULL == descr) {
  741. uv_rwlock_rdunlock(&page_index->lock);
  742. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  743. *ret_page_indexp = NULL;
  744. return 0;
  745. } else {
  746. Index = (Word_t)(descr->start_time / USEC_PER_SEC);
  747. }
  748. if (page_info_arrayp) {
  749. page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  750. *page_info_arrayp = mallocz(page_info_array_max_size);
  751. }
  752. for (count = 0, preload_count = 0 ;
  753. descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
  754. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
  755. descr = unlikely(NULL == PValue) ? NULL : *PValue) {
  756. /* Iterate all pages in range */
  757. if (unlikely(0 == descr->page_length))
  758. continue;
  759. if (page_info_arrayp) {
  760. if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
  761. page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  762. *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
  763. }
  764. (*page_info_arrayp)[count].start_time = descr->start_time;
  765. (*page_info_arrayp)[count].end_time = descr->end_time;
  766. (*page_info_arrayp)[count].page_length = descr->page_length;
  767. }
  768. ++count;
  769. rrdeng_page_descr_mutex_lock(ctx, descr);
  770. pg_cache_descr = descr->pg_cache_descr;
  771. flags = pg_cache_descr->flags;
  772. if (pg_cache_can_get_unsafe(descr, 0)) {
  773. if (flags & RRD_PAGE_POPULATED) {
  774. /* success */
  775. rrdeng_page_descr_mutex_unlock(ctx, descr);
  776. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  777. continue;
  778. }
  779. }
  780. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  781. preload_array[preload_count++] = descr;
  782. if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
  783. rrdeng_page_descr_mutex_unlock(ctx, descr);
  784. break;
  785. }
  786. }
  787. rrdeng_page_descr_mutex_unlock(ctx, descr);
  788. }
  789. uv_rwlock_rdunlock(&page_index->lock);
  790. failed_to_reserve = 0;
  791. for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
  792. struct rrdeng_cmd cmd;
  793. struct rrdeng_page_descr *next;
  794. descr = preload_array[i];
  795. if (NULL == descr) {
  796. continue;
  797. }
  798. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  799. failed_to_reserve = 1;
  800. break;
  801. }
  802. cmd.opcode = RRDENG_READ_EXTENT;
  803. cmd.read_extent.page_cache_descr[0] = descr;
  804. /* don't use this page again */
  805. preload_array[i] = NULL;
  806. for (j = 0, k = 1 ; j < preload_count ; ++j) {
  807. next = preload_array[j];
  808. if (NULL == next) {
  809. continue;
  810. }
  811. if (descr->extent == next->extent) {
  812. /* same extent, consolidate */
  813. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  814. failed_to_reserve = 1;
  815. break;
  816. }
  817. cmd.read_extent.page_cache_descr[k++] = next;
  818. /* don't use this page again */
  819. preload_array[j] = NULL;
  820. }
  821. }
  822. cmd.read_extent.page_count = k;
  823. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  824. }
  825. if (failed_to_reserve) {
  826. debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
  827. for (i = 0 ; i < preload_count ; ++i) {
  828. descr = preload_array[i];
  829. if (NULL == descr) {
  830. continue;
  831. }
  832. pg_cache_put(ctx, descr);
  833. }
  834. }
  835. if (!preload_count) {
  836. /* no such page */
  837. debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
  838. }
  839. if (unlikely(0 == count && page_info_arrayp)) {
  840. freez(*page_info_arrayp);
  841. *page_info_arrayp = NULL;
  842. }
  843. return count;
  844. }
  845. /*
  846. * Searches for a page and gets a reference.
  847. * When point_in_time is INVALID_TIME get any page.
  848. * If index is NULL lookup by UUID (id).
  849. */
  850. struct rrdeng_page_descr *
  851. pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  852. usec_t point_in_time)
  853. {
  854. struct page_cache *pg_cache = &ctx->pg_cache;
  855. struct rrdeng_page_descr *descr = NULL;
  856. struct page_cache_descr *pg_cache_descr = NULL;
  857. unsigned long flags;
  858. Pvoid_t *PValue;
  859. struct pg_cache_page_index *page_index = NULL;
  860. Word_t Index;
  861. uint8_t page_not_in_cache;
  862. if (unlikely(NULL == index)) {
  863. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  864. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  865. if (likely(NULL != PValue)) {
  866. page_index = *PValue;
  867. }
  868. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  869. if (NULL == PValue) {
  870. return NULL;
  871. }
  872. } else {
  873. page_index = index;
  874. }
  875. pg_cache_reserve_pages(ctx, 1);
  876. page_not_in_cache = 0;
  877. uv_rwlock_rdlock(&page_index->lock);
  878. while (1) {
  879. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  880. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  881. if (likely(NULL != PValue)) {
  882. descr = *PValue;
  883. }
  884. if (NULL == PValue ||
  885. 0 == descr->page_length ||
  886. (INVALID_TIME != point_in_time &&
  887. !is_point_in_time_in_page(descr, point_in_time))) {
  888. /* non-empty page not found */
  889. uv_rwlock_rdunlock(&page_index->lock);
  890. pg_cache_release_pages(ctx, 1);
  891. return NULL;
  892. }
  893. rrdeng_page_descr_mutex_lock(ctx, descr);
  894. pg_cache_descr = descr->pg_cache_descr;
  895. flags = pg_cache_descr->flags;
  896. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  897. /* success */
  898. rrdeng_page_descr_mutex_unlock(ctx, descr);
  899. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  900. break;
  901. }
  902. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  903. struct rrdeng_cmd cmd;
  904. uv_rwlock_rdunlock(&page_index->lock);
  905. cmd.opcode = RRDENG_READ_PAGE;
  906. cmd.read_page.page_cache_descr = descr;
  907. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  908. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  909. if(unlikely(debug_flags & D_RRDENGINE))
  910. print_page_cache_descr(descr);
  911. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  912. pg_cache_wait_event_unsafe(descr);
  913. }
  914. /* success */
  915. /* Downgrade exclusive reference to allow other readers */
  916. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  917. pg_cache_wake_up_waiters_unsafe(descr);
  918. rrdeng_page_descr_mutex_unlock(ctx, descr);
  919. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  920. return descr;
  921. }
  922. uv_rwlock_rdunlock(&page_index->lock);
  923. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  924. if(unlikely(debug_flags & D_RRDENGINE))
  925. print_page_cache_descr(descr);
  926. if (!(flags & RRD_PAGE_POPULATED))
  927. page_not_in_cache = 1;
  928. pg_cache_wait_event_unsafe(descr);
  929. rrdeng_page_descr_mutex_unlock(ctx, descr);
  930. /* reset scan to find again */
  931. uv_rwlock_rdlock(&page_index->lock);
  932. }
  933. uv_rwlock_rdunlock(&page_index->lock);
  934. if (!(flags & RRD_PAGE_DIRTY))
  935. pg_cache_replaceQ_set_hot(ctx, descr);
  936. pg_cache_release_pages(ctx, 1);
  937. if (page_not_in_cache)
  938. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  939. else
  940. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  941. return descr;
  942. }
  943. /*
  944. * Searches for the first page between start_time and end_time and gets a reference.
  945. * start_time and end_time are inclusive.
  946. * If index is NULL lookup by UUID (id).
  947. */
  948. struct rrdeng_page_descr *
  949. pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  950. usec_t start_time, usec_t end_time)
  951. {
  952. struct page_cache *pg_cache = &ctx->pg_cache;
  953. struct rrdeng_page_descr *descr = NULL;
  954. struct page_cache_descr *pg_cache_descr = NULL;
  955. unsigned long flags;
  956. Pvoid_t *PValue;
  957. struct pg_cache_page_index *page_index = NULL;
  958. uint8_t page_not_in_cache;
  959. if (unlikely(NULL == index)) {
  960. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  961. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  962. if (likely(NULL != PValue)) {
  963. page_index = *PValue;
  964. }
  965. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  966. if (NULL == PValue) {
  967. return NULL;
  968. }
  969. } else {
  970. page_index = index;
  971. }
  972. pg_cache_reserve_pages(ctx, 1);
  973. page_not_in_cache = 0;
  974. uv_rwlock_rdlock(&page_index->lock);
  975. int retry_count = 0;
  976. while (1) {
  977. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  978. if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) {
  979. /* non-empty page not found */
  980. if (retry_count == default_rrdeng_page_fetch_retries)
  981. error_report("Page cache timeout while waiting for page %p : returning FAIL", descr);
  982. uv_rwlock_rdunlock(&page_index->lock);
  983. pg_cache_release_pages(ctx, 1);
  984. return NULL;
  985. }
  986. rrdeng_page_descr_mutex_lock(ctx, descr);
  987. pg_cache_descr = descr->pg_cache_descr;
  988. flags = pg_cache_descr->flags;
  989. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  990. /* success */
  991. rrdeng_page_descr_mutex_unlock(ctx, descr);
  992. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  993. break;
  994. }
  995. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  996. struct rrdeng_cmd cmd;
  997. uv_rwlock_rdunlock(&page_index->lock);
  998. cmd.opcode = RRDENG_READ_PAGE;
  999. cmd.read_page.page_cache_descr = descr;
  1000. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  1001. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  1002. if(unlikely(debug_flags & D_RRDENGINE))
  1003. print_page_cache_descr(descr);
  1004. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  1005. pg_cache_wait_event_unsafe(descr);
  1006. }
  1007. /* success */
  1008. /* Downgrade exclusive reference to allow other readers */
  1009. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  1010. pg_cache_wake_up_waiters_unsafe(descr);
  1011. rrdeng_page_descr_mutex_unlock(ctx, descr);
  1012. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  1013. return descr;
  1014. }
  1015. uv_rwlock_rdunlock(&page_index->lock);
  1016. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  1017. if(unlikely(debug_flags & D_RRDENGINE))
  1018. print_page_cache_descr(descr);
  1019. if (!(flags & RRD_PAGE_POPULATED))
  1020. page_not_in_cache = 1;
  1021. if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) {
  1022. error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count);
  1023. ++retry_count;
  1024. }
  1025. rrdeng_page_descr_mutex_unlock(ctx, descr);
  1026. /* reset scan to find again */
  1027. uv_rwlock_rdlock(&page_index->lock);
  1028. }
  1029. uv_rwlock_rdunlock(&page_index->lock);
  1030. if (!(flags & RRD_PAGE_DIRTY))
  1031. pg_cache_replaceQ_set_hot(ctx, descr);
  1032. pg_cache_release_pages(ctx, 1);
  1033. if (page_not_in_cache)
  1034. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  1035. else
  1036. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  1037. return descr;
  1038. }
  1039. struct pg_cache_page_index *create_page_index(uuid_t *id)
  1040. {
  1041. struct pg_cache_page_index *page_index;
  1042. page_index = mallocz(sizeof(*page_index));
  1043. page_index->JudyL_array = (Pvoid_t) NULL;
  1044. uuid_copy(page_index->id, *id);
  1045. fatal_assert(0 == uv_rwlock_init(&page_index->lock));
  1046. page_index->oldest_time = INVALID_TIME;
  1047. page_index->latest_time = INVALID_TIME;
  1048. page_index->prev = NULL;
  1049. page_index->page_count = 0;
  1050. page_index->writers = 0;
  1051. return page_index;
  1052. }
  1053. static void init_metrics_index(struct rrdengine_instance *ctx)
  1054. {
  1055. struct page_cache *pg_cache = &ctx->pg_cache;
  1056. pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
  1057. pg_cache->metrics_index.last_page_index = NULL;
  1058. fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
  1059. }
  1060. static void init_replaceQ(struct rrdengine_instance *ctx)
  1061. {
  1062. struct page_cache *pg_cache = &ctx->pg_cache;
  1063. pg_cache->replaceQ.head = NULL;
  1064. pg_cache->replaceQ.tail = NULL;
  1065. fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
  1066. }
  1067. static void init_committed_page_index(struct rrdengine_instance *ctx)
  1068. {
  1069. struct page_cache *pg_cache = &ctx->pg_cache;
  1070. pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
  1071. fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
  1072. pg_cache->committed_page_index.latest_corr_id = 0;
  1073. pg_cache->committed_page_index.nr_committed_pages = 0;
  1074. }
  1075. void init_page_cache(struct rrdengine_instance *ctx)
  1076. {
  1077. struct page_cache *pg_cache = &ctx->pg_cache;
  1078. pg_cache->page_descriptors = 0;
  1079. pg_cache->populated_pages = 0;
  1080. fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
  1081. init_metrics_index(ctx);
  1082. init_replaceQ(ctx);
  1083. init_committed_page_index(ctx);
  1084. }
  1085. /*
  1086. * METRIC # number
  1087. * 1. INDEX: JudyHS # bytes
  1088. * 2. DATA: page_index # bytes
  1089. *
  1090. * PAGE (1 page of 1 metric) # number
  1091. * 1. INDEX AT METRIC: page_index->JudyL_array # bytes
  1092. * 2. DATA: descr # bytes
  1093. *
  1094. * PAGE CACHE (1 page of 1 metric at the cache) # number
  1095. * 1. pg_cache_descr (if PG_CACHE_DESCR_ALLOCATED) # bytes
  1096. * 2. data (if RRD_PAGE_POPULATED) # bytes
  1097. *
  1098. */
  1099. void free_page_cache(struct rrdengine_instance *ctx)
  1100. {
  1101. struct page_cache *pg_cache = &ctx->pg_cache;
  1102. Pvoid_t *PValue;
  1103. struct pg_cache_page_index *page_index, *prev_page_index;
  1104. Word_t Index;
  1105. struct rrdeng_page_descr *descr;
  1106. struct page_cache_descr *pg_cache_descr;
  1107. Word_t metrics_number = 0,
  1108. metrics_bytes = 0,
  1109. metrics_index_bytes = 0,
  1110. metrics_duration = 0;
  1111. Word_t pages_number = 0,
  1112. pages_bytes = 0,
  1113. pages_index_bytes = 0;
  1114. Word_t pages_size_per_type[256] = { 0 },
  1115. pages_count_per_type[256] = { 0 };
  1116. Word_t cache_pages_number = 0,
  1117. cache_pages_bytes = 0,
  1118. cache_pages_data_bytes = 0;
  1119. size_t points_in_db = 0,
  1120. uncompressed_points_size = 0,
  1121. seconds_in_db = 0,
  1122. single_point_pages = 0;
  1123. Word_t pages_dirty_index_bytes = 0;
  1124. usec_t oldest_time_ut = LONG_MAX, latest_time_ut = 0;
  1125. /* Free committed page index */
  1126. pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
  1127. fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
  1128. for (page_index = pg_cache->metrics_index.last_page_index ;
  1129. page_index != NULL ;
  1130. page_index = prev_page_index) {
  1131. prev_page_index = page_index->prev;
  1132. /* Find first page in range */
  1133. Index = (Word_t) 0;
  1134. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  1135. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1136. size_t metric_duration = 0;
  1137. size_t metric_update_every = 0;
  1138. size_t metric_single_point_pages = 0;
  1139. while (descr != NULL) {
  1140. /* Iterate all page descriptors of this metric */
  1141. if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  1142. cache_pages_number++;
  1143. /* Check rrdenglocking.c */
  1144. pg_cache_descr = descr->pg_cache_descr;
  1145. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  1146. dbengine_page_free(pg_cache_descr->page);
  1147. cache_pages_data_bytes += RRDENG_BLOCK_SIZE;
  1148. }
  1149. rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
  1150. cache_pages_bytes += sizeof(*pg_cache_descr);
  1151. }
  1152. if(descr->start_time < oldest_time_ut)
  1153. oldest_time_ut = descr->start_time;
  1154. if(descr->end_time > latest_time_ut)
  1155. latest_time_ut = descr->end_time;
  1156. pages_size_per_type[descr->type] += descr->page_length;
  1157. pages_count_per_type[descr->type]++;
  1158. size_t points_in_page = (descr->page_length / PAGE_POINT_SIZE_BYTES(descr));
  1159. size_t page_duration = ((descr->end_time - descr->start_time) / USEC_PER_SEC);
  1160. size_t update_every = (page_duration == 0) ? 1 : page_duration / (points_in_page - 1);
  1161. if (!page_duration && metric_update_every) {
  1162. page_duration = metric_update_every;
  1163. update_every = metric_update_every;
  1164. }
  1165. else if(page_duration)
  1166. metric_update_every = update_every;
  1167. uncompressed_points_size += descr->page_length;
  1168. if(page_duration > 0) {
  1169. page_duration = update_every * points_in_page;
  1170. metric_duration += page_duration;
  1171. seconds_in_db += page_duration;
  1172. points_in_db += descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  1173. }
  1174. else
  1175. metric_single_point_pages++;
  1176. rrdeng_page_descr_freez(descr);
  1177. pages_bytes += sizeof(*descr);
  1178. pages_number++;
  1179. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
  1180. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1181. }
  1182. if(metric_single_point_pages && metric_update_every) {
  1183. points_in_db += metric_single_point_pages;
  1184. seconds_in_db += metric_update_every * metric_single_point_pages;
  1185. metric_duration += metric_update_every * metric_single_point_pages;
  1186. }
  1187. else
  1188. single_point_pages += metric_single_point_pages;
  1189. /* Free page index */
  1190. pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0);
  1191. fatal_assert(NULL == page_index->JudyL_array);
  1192. freez(page_index);
  1193. metrics_number++;
  1194. metrics_bytes += sizeof(*page_index);
  1195. metrics_duration += metric_duration;
  1196. }
  1197. /* Free metrics index */
  1198. metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
  1199. fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
  1200. if(!metrics_number) metrics_number = 1;
  1201. if(!pages_number) pages_number = 1;
  1202. if(!cache_pages_number) cache_pages_number = 1;
  1203. if(!points_in_db) points_in_db = 1;
  1204. if(latest_time_ut == oldest_time_ut) oldest_time_ut -= USEC_PER_SEC;
  1205. if(single_point_pages) {
  1206. long double avg_duration = (long double)seconds_in_db / points_in_db;
  1207. points_in_db += single_point_pages;
  1208. seconds_in_db += (size_t)(avg_duration * single_point_pages);
  1209. }
  1210. info("DBENGINE STATISTICS ON METRICS:"
  1211. " Metrics: %lu (structures %lu bytes - per metric %0.2f, index (HS) %lu bytes - per metric %0.2f bytes - duration %zu secs) |"
  1212. " Page descriptors: %lu (structures %lu bytes - per page %0.2f bytes, index (L) %lu bytes - per page %0.2f, dirty index %lu bytes). |"
  1213. " Page cache: %lu pages (structures %lu bytes - per page %0.2f bytes, data %lu bytes). |"
  1214. " Points in db %zu, uncompressed size of points database %zu bytes. |"
  1215. " Duration of all points %zu seconds, average point duration %0.2f seconds."
  1216. " Duration of the database %llu seconds, average metric duration %0.2f seconds, average metric lifetime %0.2f%%."
  1217. , metrics_number, metrics_bytes, (double)metrics_bytes/metrics_number, metrics_index_bytes, (double)metrics_index_bytes/metrics_number, metrics_duration
  1218. , pages_number, pages_bytes, (double)pages_bytes/pages_number, pages_index_bytes, (double)pages_index_bytes/pages_number, pages_dirty_index_bytes
  1219. , cache_pages_number, cache_pages_bytes, (double)cache_pages_bytes/cache_pages_number, cache_pages_data_bytes
  1220. , points_in_db, uncompressed_points_size
  1221. , seconds_in_db, (double)seconds_in_db/points_in_db
  1222. , (latest_time_ut - oldest_time_ut) / USEC_PER_SEC, (double)metrics_duration/metrics_number
  1223. , (double)metrics_duration/metrics_number * 100.0 / ((latest_time_ut - oldest_time_ut) / USEC_PER_SEC)
  1224. );
  1225. for(int i = 0; i < 256 ;i++) {
  1226. if(pages_count_per_type[i])
  1227. info("DBENGINE STATISTICS ON PAGE TYPES: page type %d total pages %lu, average page size %0.2f bytes", i, pages_count_per_type[i], (double)pages_size_per_type[i]/pages_count_per_type[i]);
  1228. }
  1229. }