pagecache.c 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. /* Forward declerations */
  5. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
  6. /* always inserts into tail */
  7. static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
  8. struct rrdeng_page_descr *descr)
  9. {
  10. struct page_cache *pg_cache = &ctx->pg_cache;
  11. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  12. if (likely(NULL != pg_cache->replaceQ.tail)) {
  13. pg_cache_descr->prev = pg_cache->replaceQ.tail;
  14. pg_cache->replaceQ.tail->next = pg_cache_descr;
  15. }
  16. if (unlikely(NULL == pg_cache->replaceQ.head)) {
  17. pg_cache->replaceQ.head = pg_cache_descr;
  18. }
  19. pg_cache->replaceQ.tail = pg_cache_descr;
  20. }
  21. static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
  22. struct rrdeng_page_descr *descr)
  23. {
  24. struct page_cache *pg_cache = &ctx->pg_cache;
  25. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
  26. prev = pg_cache_descr->prev;
  27. next = pg_cache_descr->next;
  28. if (likely(NULL != prev)) {
  29. prev->next = next;
  30. }
  31. if (likely(NULL != next)) {
  32. next->prev = prev;
  33. }
  34. if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
  35. pg_cache->replaceQ.head = next;
  36. }
  37. if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
  38. pg_cache->replaceQ.tail = prev;
  39. }
  40. pg_cache_descr->prev = pg_cache_descr->next = NULL;
  41. }
  42. void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
  43. struct rrdeng_page_descr *descr)
  44. {
  45. struct page_cache *pg_cache = &ctx->pg_cache;
  46. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  47. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  48. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  49. }
  50. void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
  51. struct rrdeng_page_descr *descr)
  52. {
  53. struct page_cache *pg_cache = &ctx->pg_cache;
  54. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  55. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  56. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  57. }
  58. void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
  59. struct rrdeng_page_descr *descr)
  60. {
  61. struct page_cache *pg_cache = &ctx->pg_cache;
  62. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  63. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  64. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  65. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  66. }
  67. struct rrdeng_page_descr *pg_cache_create_descr(void)
  68. {
  69. struct rrdeng_page_descr *descr;
  70. descr = mallocz(sizeof(*descr));
  71. descr->page_length = 0;
  72. descr->start_time = INVALID_TIME;
  73. descr->end_time = INVALID_TIME;
  74. descr->id = NULL;
  75. descr->extent = NULL;
  76. descr->pg_cache_descr_state = 0;
  77. descr->pg_cache_descr = NULL;
  78. return descr;
  79. }
  80. /* The caller must hold page descriptor lock. */
  81. void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
  82. {
  83. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  84. if (pg_cache_descr->waiters)
  85. uv_cond_broadcast(&pg_cache_descr->cond);
  86. }
  87. void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  88. {
  89. rrdeng_page_descr_mutex_lock(ctx, descr);
  90. pg_cache_wake_up_waiters_unsafe(descr);
  91. rrdeng_page_descr_mutex_unlock(ctx, descr);
  92. }
  93. /*
  94. * The caller must hold page descriptor lock.
  95. * The lock will be released and re-acquired. The descriptor is not guaranteed
  96. * to exist after this function returns.
  97. */
  98. void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
  99. {
  100. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  101. ++pg_cache_descr->waiters;
  102. uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
  103. --pg_cache_descr->waiters;
  104. }
  105. /*
  106. * Returns page flags.
  107. * The lock will be released and re-acquired. The descriptor is not guaranteed
  108. * to exist after this function returns.
  109. */
  110. unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  111. {
  112. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  113. unsigned long flags;
  114. rrdeng_page_descr_mutex_lock(ctx, descr);
  115. pg_cache_wait_event_unsafe(descr);
  116. flags = pg_cache_descr->flags;
  117. rrdeng_page_descr_mutex_unlock(ctx, descr);
  118. return flags;
  119. }
  120. /*
  121. * The caller must hold page descriptor lock.
  122. */
  123. int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  124. {
  125. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  126. if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  127. (exclusive_access && pg_cache_descr->refcnt)) {
  128. return 0;
  129. }
  130. return 1;
  131. }
  132. /*
  133. * The caller must hold page descriptor lock.
  134. * Gets a reference to the page descriptor.
  135. * Returns 1 on success and 0 on failure.
  136. */
  137. int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  138. {
  139. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  140. if (!pg_cache_can_get_unsafe(descr, exclusive_access))
  141. return 0;
  142. if (exclusive_access)
  143. pg_cache_descr->flags |= RRD_PAGE_LOCKED;
  144. ++pg_cache_descr->refcnt;
  145. return 1;
  146. }
  147. /*
  148. * The caller must hold the page descriptor lock.
  149. * This function may block doing cleanup.
  150. */
  151. void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
  152. {
  153. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  154. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  155. if (0 == --pg_cache_descr->refcnt) {
  156. pg_cache_wake_up_waiters_unsafe(descr);
  157. }
  158. }
  159. /*
  160. * This function may block doing cleanup.
  161. */
  162. void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  163. {
  164. rrdeng_page_descr_mutex_lock(ctx, descr);
  165. pg_cache_put_unsafe(descr);
  166. rrdeng_page_descr_mutex_unlock(ctx, descr);
  167. }
  168. /* The caller must hold the page cache lock */
  169. static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
  170. {
  171. struct page_cache *pg_cache = &ctx->pg_cache;
  172. pg_cache->populated_pages -= number;
  173. }
  174. static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
  175. {
  176. struct page_cache *pg_cache = &ctx->pg_cache;
  177. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  178. pg_cache_release_pages_unsafe(ctx, number);
  179. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  180. }
  181. /*
  182. * This function returns the maximum number of pages allowed in the page cache.
  183. */
  184. unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
  185. {
  186. /* it's twice the number of producers since we pin 2 pages per producer */
  187. return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers;
  188. }
  189. /*
  190. * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
  191. * number of pages below that number.
  192. */
  193. unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
  194. {
  195. /* it's twice the number of producers since we pin 2 pages per producer */
  196. return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers;
  197. }
  198. /*
  199. * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
  200. * cache.
  201. */
  202. unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
  203. {
  204. /* We remove the active pages of the producers from the calculation and only allow 50% of the extra pinned pages */
  205. return ctx->cache_pages_low_watermark + (unsigned long)ctx->stats.metric_API_producers / 2;
  206. }
  207. /*
  208. * This function will block until it reserves #number populated pages.
  209. * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
  210. */
  211. static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  212. {
  213. struct page_cache *pg_cache = &ctx->pg_cache;
  214. unsigned failures = 0;
  215. const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
  216. unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
  217. assert(number < ctx->max_cache_pages);
  218. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  219. if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
  220. debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
  221. number);
  222. while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
  223. if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
  224. /* failed to evict */
  225. struct completion compl;
  226. struct rrdeng_cmd cmd;
  227. ++failures;
  228. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  229. init_completion(&compl);
  230. cmd.opcode = RRDENG_FLUSH_PAGES;
  231. cmd.completion = &compl;
  232. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  233. /* wait for some pages to be flushed */
  234. debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
  235. wait_for_completion(&compl);
  236. destroy_completion(&compl);
  237. if (unlikely(failures > 1)) {
  238. unsigned long slots;
  239. /* exponential backoff */
  240. slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
  241. (void)sleep_usec(slots * exp_backoff_slot_usec);
  242. }
  243. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  244. }
  245. }
  246. pg_cache->populated_pages += number;
  247. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  248. }
  249. /*
  250. * This function will attempt to reserve #number populated pages.
  251. * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
  252. * Returns 0 on failure and 1 on success.
  253. */
  254. static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  255. {
  256. struct page_cache *pg_cache = &ctx->pg_cache;
  257. unsigned count = 0;
  258. int ret = 0;
  259. assert(number < ctx->max_cache_pages);
  260. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  261. if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
  262. debug(D_RRDENGINE,
  263. "==Page cache full. Trying to reserve %u pages.==",
  264. number);
  265. do {
  266. if (!pg_cache_try_evict_one_page_unsafe(ctx))
  267. break;
  268. ++count;
  269. } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
  270. debug(D_RRDENGINE, "Evicted %u pages.", count);
  271. }
  272. if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
  273. pg_cache->populated_pages += number;
  274. ret = 1; /* success */
  275. }
  276. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  277. return ret;
  278. }
  279. /* The caller must hold the page cache and the page descriptor locks in that order */
  280. static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  281. {
  282. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  283. freez(pg_cache_descr->page);
  284. pg_cache_descr->page = NULL;
  285. pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
  286. pg_cache_release_pages_unsafe(ctx, 1);
  287. ++ctx->stats.pg_cache_evictions;
  288. }
  289. /*
  290. * The caller must hold the page cache lock.
  291. * Lock order: page cache -> replaceQ -> page descriptor
  292. * This function iterates all pages and tries to evict one.
  293. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
  294. * or it sets it to NULL if no write-back is in progress.
  295. *
  296. * Returns 1 on success and 0 on failure.
  297. */
  298. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
  299. {
  300. struct page_cache *pg_cache = &ctx->pg_cache;
  301. unsigned long old_flags;
  302. struct rrdeng_page_descr *descr;
  303. struct page_cache_descr *pg_cache_descr = NULL;
  304. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  305. for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
  306. descr = pg_cache_descr->descr;
  307. rrdeng_page_descr_mutex_lock(ctx, descr);
  308. old_flags = pg_cache_descr->flags;
  309. if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
  310. /* must evict */
  311. pg_cache_evict_unsafe(ctx, descr);
  312. pg_cache_put_unsafe(descr);
  313. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  314. rrdeng_page_descr_mutex_unlock(ctx, descr);
  315. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  316. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  317. return 1;
  318. }
  319. rrdeng_page_descr_mutex_unlock(ctx, descr);
  320. }
  321. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  322. /* failed to evict */
  323. return 0;
  324. }
  325. /*
  326. * Callers of this function need to make sure they're not deleting the same descriptor concurrently
  327. */
  328. void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
  329. uint8_t is_exclusive_holder)
  330. {
  331. struct page_cache *pg_cache = &ctx->pg_cache;
  332. struct page_cache_descr *pg_cache_descr = NULL;
  333. Pvoid_t *PValue;
  334. struct pg_cache_page_index *page_index;
  335. int ret;
  336. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  337. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  338. assert(NULL != PValue);
  339. page_index = *PValue;
  340. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  341. uv_rwlock_wrlock(&page_index->lock);
  342. ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  343. uv_rwlock_wrunlock(&page_index->lock);
  344. if (unlikely(0 == ret)) {
  345. error("Page under deletion was not in index.");
  346. if (unlikely(debug_flags & D_RRDENGINE)) {
  347. print_page_descr(descr);
  348. }
  349. goto destroy;
  350. }
  351. assert(1 == ret);
  352. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  353. ++ctx->stats.pg_cache_deletions;
  354. --pg_cache->page_descriptors;
  355. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  356. rrdeng_page_descr_mutex_lock(ctx, descr);
  357. pg_cache_descr = descr->pg_cache_descr;
  358. if (!is_exclusive_holder) {
  359. /* If we don't hold an exclusive page reference get one */
  360. while (!pg_cache_try_get_unsafe(descr, 1)) {
  361. debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
  362. if (unlikely(debug_flags & D_RRDENGINE))
  363. print_page_cache_descr(descr);
  364. pg_cache_wait_event_unsafe(descr);
  365. }
  366. }
  367. if (remove_dirty) {
  368. pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
  369. } else {
  370. /* even a locked page could be dirty */
  371. while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
  372. debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
  373. if (unlikely(debug_flags & D_RRDENGINE))
  374. print_page_cache_descr(descr);
  375. pg_cache_wait_event_unsafe(descr);
  376. }
  377. }
  378. rrdeng_page_descr_mutex_unlock(ctx, descr);
  379. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  380. /* only after locking can it be safely deleted from LRU */
  381. pg_cache_replaceQ_delete(ctx, descr);
  382. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  383. pg_cache_evict_unsafe(ctx, descr);
  384. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  385. }
  386. pg_cache_put(ctx, descr);
  387. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  388. while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  389. rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
  390. (void)sleep_usec(1000); /* 1 msec */
  391. }
  392. destroy:
  393. freez(descr);
  394. pg_cache_update_metric_times(page_index);
  395. }
  396. static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
  397. {
  398. usec_t pg_start, pg_end;
  399. pg_start = descr->start_time;
  400. pg_end = descr->end_time;
  401. return (pg_start < start_time && pg_end >= start_time) ||
  402. (pg_start >= start_time && pg_start <= end_time);
  403. }
  404. static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
  405. {
  406. return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
  407. }
  408. /* The caller must hold the page index lock */
  409. static inline struct rrdeng_page_descr *
  410. find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
  411. {
  412. struct rrdeng_page_descr *descr = NULL;
  413. Pvoid_t *PValue;
  414. Word_t Index;
  415. Index = (Word_t)(start_time / USEC_PER_SEC);
  416. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  417. if (likely(NULL != PValue)) {
  418. descr = *PValue;
  419. if (is_page_in_time_range(descr, start_time, end_time)) {
  420. return descr;
  421. }
  422. }
  423. Index = (Word_t)(start_time / USEC_PER_SEC);
  424. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  425. if (likely(NULL != PValue)) {
  426. descr = *PValue;
  427. if (is_page_in_time_range(descr, start_time, end_time)) {
  428. return descr;
  429. }
  430. }
  431. return NULL;
  432. }
  433. /* Update metric oldest and latest timestamps efficiently when adding new values */
  434. void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
  435. {
  436. usec_t oldest_time = page_index->oldest_time;
  437. usec_t latest_time = page_index->latest_time;
  438. if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
  439. page_index->oldest_time = descr->start_time;
  440. }
  441. if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
  442. page_index->latest_time = descr->end_time;
  443. }
  444. }
  445. /* Update metric oldest and latest timestamps when removing old values */
  446. void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
  447. {
  448. Pvoid_t *firstPValue, *lastPValue;
  449. Word_t firstIndex, lastIndex;
  450. struct rrdeng_page_descr *descr;
  451. usec_t oldest_time = INVALID_TIME;
  452. usec_t latest_time = INVALID_TIME;
  453. uv_rwlock_rdlock(&page_index->lock);
  454. /* Find first page in range */
  455. firstIndex = (Word_t)0;
  456. firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
  457. if (likely(NULL != firstPValue)) {
  458. descr = *firstPValue;
  459. oldest_time = descr->start_time;
  460. }
  461. lastIndex = (Word_t)-1;
  462. lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
  463. if (likely(NULL != lastPValue)) {
  464. descr = *lastPValue;
  465. latest_time = descr->end_time;
  466. }
  467. uv_rwlock_rdunlock(&page_index->lock);
  468. if (unlikely(NULL == firstPValue)) {
  469. assert(NULL == lastPValue);
  470. page_index->oldest_time = page_index->latest_time = INVALID_TIME;
  471. return;
  472. }
  473. page_index->oldest_time = oldest_time;
  474. page_index->latest_time = latest_time;
  475. }
  476. /* If index is NULL lookup by UUID (descr->id) */
  477. void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
  478. struct rrdeng_page_descr *descr)
  479. {
  480. struct page_cache *pg_cache = &ctx->pg_cache;
  481. Pvoid_t *PValue;
  482. struct pg_cache_page_index *page_index;
  483. unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
  484. if (0 != pg_cache_descr_state) {
  485. /* there is page cache descriptor pre-allocated state */
  486. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  487. assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
  488. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  489. pg_cache_reserve_pages(ctx, 1);
  490. if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
  491. pg_cache_replaceQ_insert(ctx, descr);
  492. }
  493. }
  494. if (unlikely(NULL == index)) {
  495. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  496. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  497. assert(NULL != PValue);
  498. page_index = *PValue;
  499. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  500. } else {
  501. page_index = index;
  502. }
  503. uv_rwlock_wrlock(&page_index->lock);
  504. PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  505. *PValue = descr;
  506. pg_cache_add_new_metric_time(page_index, descr);
  507. uv_rwlock_wrunlock(&page_index->lock);
  508. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  509. ++ctx->stats.pg_cache_insertions;
  510. ++pg_cache->page_descriptors;
  511. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  512. }
  513. usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
  514. {
  515. struct page_cache *pg_cache = &ctx->pg_cache;
  516. struct rrdeng_page_descr *descr = NULL;
  517. Pvoid_t *PValue;
  518. struct pg_cache_page_index *page_index;
  519. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  520. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  521. if (likely(NULL != PValue)) {
  522. page_index = *PValue;
  523. }
  524. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  525. if (NULL == PValue) {
  526. return INVALID_TIME;
  527. }
  528. uv_rwlock_rdlock(&page_index->lock);
  529. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  530. if (NULL == descr) {
  531. uv_rwlock_rdunlock(&page_index->lock);
  532. return INVALID_TIME;
  533. }
  534. uv_rwlock_rdunlock(&page_index->lock);
  535. return descr->start_time;
  536. }
  537. /**
  538. * Return page information for the first page before point_in_time that satisfies the filter.
  539. * @param ctx DB context
  540. * @param page_index page index of a metric
  541. * @param point_in_time the pages that are searched must be older than this timestamp
  542. * @param filter decides if the page satisfies the caller's criteria
  543. * @param page_info the result of the search is set in this pointer
  544. */
  545. void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
  546. usec_t point_in_time, pg_cache_page_info_filter_t *filter,
  547. struct rrdeng_page_info *page_info)
  548. {
  549. struct page_cache *pg_cache = &ctx->pg_cache;
  550. struct rrdeng_page_descr *descr = NULL;
  551. Pvoid_t *PValue;
  552. Word_t Index;
  553. (void)pg_cache;
  554. assert(NULL != page_index);
  555. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  556. uv_rwlock_rdlock(&page_index->lock);
  557. do {
  558. PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
  559. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  560. } while (descr != NULL && !filter(descr));
  561. if (unlikely(NULL == descr)) {
  562. page_info->page_length = 0;
  563. page_info->start_time = INVALID_TIME;
  564. page_info->end_time = INVALID_TIME;
  565. } else {
  566. page_info->page_length = descr->page_length;
  567. page_info->start_time = descr->start_time;
  568. page_info->end_time = descr->end_time;
  569. }
  570. uv_rwlock_rdunlock(&page_index->lock);
  571. }
  572. /**
  573. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  574. * Does not get a reference.
  575. * @param ctx DB context
  576. * @param id UUID
  577. * @param start_time inclusive starting time in usec
  578. * @param end_time inclusive ending time in usec
  579. * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
  580. * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
  581. * If page_info_arrayp is set to NULL nothing was allocated.
  582. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  583. * @return the number of pages that overlap with the time range [start_time,end_time].
  584. */
  585. unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
  586. struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
  587. {
  588. struct page_cache *pg_cache = &ctx->pg_cache;
  589. struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
  590. struct page_cache_descr *pg_cache_descr = NULL;
  591. unsigned i, j, k, preload_count, count, page_info_array_max_size;
  592. unsigned long flags;
  593. Pvoid_t *PValue;
  594. struct pg_cache_page_index *page_index;
  595. Word_t Index;
  596. uint8_t failed_to_reserve;
  597. assert(NULL != ret_page_indexp);
  598. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  599. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  600. if (likely(NULL != PValue)) {
  601. *ret_page_indexp = page_index = *PValue;
  602. }
  603. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  604. if (NULL == PValue) {
  605. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  606. *ret_page_indexp = NULL;
  607. return 0;
  608. }
  609. uv_rwlock_rdlock(&page_index->lock);
  610. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  611. if (NULL == descr) {
  612. uv_rwlock_rdunlock(&page_index->lock);
  613. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  614. *ret_page_indexp = NULL;
  615. return 0;
  616. } else {
  617. Index = (Word_t)(descr->start_time / USEC_PER_SEC);
  618. }
  619. if (page_info_arrayp) {
  620. page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  621. *page_info_arrayp = mallocz(page_info_array_max_size);
  622. }
  623. for (count = 0, preload_count = 0 ;
  624. descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
  625. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
  626. descr = unlikely(NULL == PValue) ? NULL : *PValue) {
  627. /* Iterate all pages in range */
  628. if (unlikely(0 == descr->page_length))
  629. continue;
  630. if (page_info_arrayp) {
  631. if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
  632. page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  633. *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
  634. }
  635. (*page_info_arrayp)[count].start_time = descr->start_time;
  636. (*page_info_arrayp)[count].end_time = descr->end_time;
  637. (*page_info_arrayp)[count].page_length = descr->page_length;
  638. }
  639. ++count;
  640. rrdeng_page_descr_mutex_lock(ctx, descr);
  641. pg_cache_descr = descr->pg_cache_descr;
  642. flags = pg_cache_descr->flags;
  643. if (pg_cache_can_get_unsafe(descr, 0)) {
  644. if (flags & RRD_PAGE_POPULATED) {
  645. /* success */
  646. rrdeng_page_descr_mutex_unlock(ctx, descr);
  647. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  648. continue;
  649. }
  650. }
  651. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  652. preload_array[preload_count++] = descr;
  653. if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
  654. rrdeng_page_descr_mutex_unlock(ctx, descr);
  655. break;
  656. }
  657. }
  658. rrdeng_page_descr_mutex_unlock(ctx, descr);
  659. }
  660. uv_rwlock_rdunlock(&page_index->lock);
  661. failed_to_reserve = 0;
  662. for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
  663. struct rrdeng_cmd cmd;
  664. struct rrdeng_page_descr *next;
  665. descr = preload_array[i];
  666. if (NULL == descr) {
  667. continue;
  668. }
  669. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  670. failed_to_reserve = 1;
  671. break;
  672. }
  673. cmd.opcode = RRDENG_READ_EXTENT;
  674. cmd.read_extent.page_cache_descr[0] = descr;
  675. /* don't use this page again */
  676. preload_array[i] = NULL;
  677. for (j = 0, k = 1 ; j < preload_count ; ++j) {
  678. next = preload_array[j];
  679. if (NULL == next) {
  680. continue;
  681. }
  682. if (descr->extent == next->extent) {
  683. /* same extent, consolidate */
  684. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  685. failed_to_reserve = 1;
  686. break;
  687. }
  688. cmd.read_extent.page_cache_descr[k++] = next;
  689. /* don't use this page again */
  690. preload_array[j] = NULL;
  691. }
  692. }
  693. cmd.read_extent.page_count = k;
  694. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  695. }
  696. if (failed_to_reserve) {
  697. debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
  698. for (i = 0 ; i < preload_count ; ++i) {
  699. descr = preload_array[i];
  700. if (NULL == descr) {
  701. continue;
  702. }
  703. pg_cache_put(ctx, descr);
  704. }
  705. }
  706. if (!preload_count) {
  707. /* no such page */
  708. debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
  709. }
  710. if (unlikely(0 == count && page_info_arrayp)) {
  711. freez(*page_info_arrayp);
  712. *page_info_arrayp = NULL;
  713. }
  714. return count;
  715. }
  716. /*
  717. * Searches for a page and gets a reference.
  718. * When point_in_time is INVALID_TIME get any page.
  719. * If index is NULL lookup by UUID (id).
  720. */
  721. struct rrdeng_page_descr *
  722. pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  723. usec_t point_in_time)
  724. {
  725. struct page_cache *pg_cache = &ctx->pg_cache;
  726. struct rrdeng_page_descr *descr = NULL;
  727. struct page_cache_descr *pg_cache_descr = NULL;
  728. unsigned long flags;
  729. Pvoid_t *PValue;
  730. struct pg_cache_page_index *page_index;
  731. Word_t Index;
  732. uint8_t page_not_in_cache;
  733. if (unlikely(NULL == index)) {
  734. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  735. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  736. if (likely(NULL != PValue)) {
  737. page_index = *PValue;
  738. }
  739. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  740. if (NULL == PValue) {
  741. return NULL;
  742. }
  743. } else {
  744. page_index = index;
  745. }
  746. pg_cache_reserve_pages(ctx, 1);
  747. page_not_in_cache = 0;
  748. uv_rwlock_rdlock(&page_index->lock);
  749. while (1) {
  750. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  751. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  752. if (likely(NULL != PValue)) {
  753. descr = *PValue;
  754. }
  755. if (NULL == PValue ||
  756. 0 == descr->page_length ||
  757. (INVALID_TIME != point_in_time &&
  758. !is_point_in_time_in_page(descr, point_in_time))) {
  759. /* non-empty page not found */
  760. uv_rwlock_rdunlock(&page_index->lock);
  761. pg_cache_release_pages(ctx, 1);
  762. return NULL;
  763. }
  764. rrdeng_page_descr_mutex_lock(ctx, descr);
  765. pg_cache_descr = descr->pg_cache_descr;
  766. flags = pg_cache_descr->flags;
  767. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  768. /* success */
  769. rrdeng_page_descr_mutex_unlock(ctx, descr);
  770. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  771. break;
  772. }
  773. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  774. struct rrdeng_cmd cmd;
  775. uv_rwlock_rdunlock(&page_index->lock);
  776. cmd.opcode = RRDENG_READ_PAGE;
  777. cmd.read_page.page_cache_descr = descr;
  778. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  779. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  780. if(unlikely(debug_flags & D_RRDENGINE))
  781. print_page_cache_descr(descr);
  782. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  783. pg_cache_wait_event_unsafe(descr);
  784. }
  785. /* success */
  786. /* Downgrade exclusive reference to allow other readers */
  787. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  788. pg_cache_wake_up_waiters_unsafe(descr);
  789. rrdeng_page_descr_mutex_unlock(ctx, descr);
  790. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  791. return descr;
  792. }
  793. uv_rwlock_rdunlock(&page_index->lock);
  794. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  795. if(unlikely(debug_flags & D_RRDENGINE))
  796. print_page_cache_descr(descr);
  797. if (!(flags & RRD_PAGE_POPULATED))
  798. page_not_in_cache = 1;
  799. pg_cache_wait_event_unsafe(descr);
  800. rrdeng_page_descr_mutex_unlock(ctx, descr);
  801. /* reset scan to find again */
  802. uv_rwlock_rdlock(&page_index->lock);
  803. }
  804. uv_rwlock_rdunlock(&page_index->lock);
  805. if (!(flags & RRD_PAGE_DIRTY))
  806. pg_cache_replaceQ_set_hot(ctx, descr);
  807. pg_cache_release_pages(ctx, 1);
  808. if (page_not_in_cache)
  809. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  810. else
  811. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  812. return descr;
  813. }
  814. /*
  815. * Searches for the first page between start_time and end_time and gets a reference.
  816. * start_time and end_time are inclusive.
  817. * If index is NULL lookup by UUID (id).
  818. */
  819. struct rrdeng_page_descr *
  820. pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  821. usec_t start_time, usec_t end_time)
  822. {
  823. struct page_cache *pg_cache = &ctx->pg_cache;
  824. struct rrdeng_page_descr *descr = NULL;
  825. struct page_cache_descr *pg_cache_descr = NULL;
  826. unsigned long flags;
  827. Pvoid_t *PValue;
  828. struct pg_cache_page_index *page_index;
  829. uint8_t page_not_in_cache;
  830. if (unlikely(NULL == index)) {
  831. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  832. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  833. if (likely(NULL != PValue)) {
  834. page_index = *PValue;
  835. }
  836. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  837. if (NULL == PValue) {
  838. return NULL;
  839. }
  840. } else {
  841. page_index = index;
  842. }
  843. pg_cache_reserve_pages(ctx, 1);
  844. page_not_in_cache = 0;
  845. uv_rwlock_rdlock(&page_index->lock);
  846. while (1) {
  847. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  848. if (NULL == descr || 0 == descr->page_length) {
  849. /* non-empty page not found */
  850. uv_rwlock_rdunlock(&page_index->lock);
  851. pg_cache_release_pages(ctx, 1);
  852. return NULL;
  853. }
  854. rrdeng_page_descr_mutex_lock(ctx, descr);
  855. pg_cache_descr = descr->pg_cache_descr;
  856. flags = pg_cache_descr->flags;
  857. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  858. /* success */
  859. rrdeng_page_descr_mutex_unlock(ctx, descr);
  860. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  861. break;
  862. }
  863. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  864. struct rrdeng_cmd cmd;
  865. uv_rwlock_rdunlock(&page_index->lock);
  866. cmd.opcode = RRDENG_READ_PAGE;
  867. cmd.read_page.page_cache_descr = descr;
  868. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  869. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  870. if(unlikely(debug_flags & D_RRDENGINE))
  871. print_page_cache_descr(descr);
  872. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  873. pg_cache_wait_event_unsafe(descr);
  874. }
  875. /* success */
  876. /* Downgrade exclusive reference to allow other readers */
  877. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  878. pg_cache_wake_up_waiters_unsafe(descr);
  879. rrdeng_page_descr_mutex_unlock(ctx, descr);
  880. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  881. return descr;
  882. }
  883. uv_rwlock_rdunlock(&page_index->lock);
  884. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  885. if(unlikely(debug_flags & D_RRDENGINE))
  886. print_page_cache_descr(descr);
  887. if (!(flags & RRD_PAGE_POPULATED))
  888. page_not_in_cache = 1;
  889. pg_cache_wait_event_unsafe(descr);
  890. rrdeng_page_descr_mutex_unlock(ctx, descr);
  891. /* reset scan to find again */
  892. uv_rwlock_rdlock(&page_index->lock);
  893. }
  894. uv_rwlock_rdunlock(&page_index->lock);
  895. if (!(flags & RRD_PAGE_DIRTY))
  896. pg_cache_replaceQ_set_hot(ctx, descr);
  897. pg_cache_release_pages(ctx, 1);
  898. if (page_not_in_cache)
  899. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  900. else
  901. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  902. return descr;
  903. }
  904. struct pg_cache_page_index *create_page_index(uuid_t *id)
  905. {
  906. struct pg_cache_page_index *page_index;
  907. page_index = mallocz(sizeof(*page_index));
  908. page_index->JudyL_array = (Pvoid_t) NULL;
  909. uuid_copy(page_index->id, *id);
  910. assert(0 == uv_rwlock_init(&page_index->lock));
  911. page_index->oldest_time = INVALID_TIME;
  912. page_index->latest_time = INVALID_TIME;
  913. page_index->prev = NULL;
  914. return page_index;
  915. }
  916. static void init_metrics_index(struct rrdengine_instance *ctx)
  917. {
  918. struct page_cache *pg_cache = &ctx->pg_cache;
  919. pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
  920. pg_cache->metrics_index.last_page_index = NULL;
  921. assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
  922. }
  923. static void init_replaceQ(struct rrdengine_instance *ctx)
  924. {
  925. struct page_cache *pg_cache = &ctx->pg_cache;
  926. pg_cache->replaceQ.head = NULL;
  927. pg_cache->replaceQ.tail = NULL;
  928. assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
  929. }
  930. static void init_committed_page_index(struct rrdengine_instance *ctx)
  931. {
  932. struct page_cache *pg_cache = &ctx->pg_cache;
  933. pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
  934. assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
  935. pg_cache->committed_page_index.latest_corr_id = 0;
  936. pg_cache->committed_page_index.nr_committed_pages = 0;
  937. }
  938. void init_page_cache(struct rrdengine_instance *ctx)
  939. {
  940. struct page_cache *pg_cache = &ctx->pg_cache;
  941. pg_cache->page_descriptors = 0;
  942. pg_cache->populated_pages = 0;
  943. assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
  944. init_metrics_index(ctx);
  945. init_replaceQ(ctx);
  946. init_committed_page_index(ctx);
  947. }
  948. void free_page_cache(struct rrdengine_instance *ctx)
  949. {
  950. struct page_cache *pg_cache = &ctx->pg_cache;
  951. Word_t ret_Judy, bytes_freed = 0;
  952. Pvoid_t *PValue;
  953. struct pg_cache_page_index *page_index, *prev_page_index;
  954. Word_t Index;
  955. struct rrdeng_page_descr *descr;
  956. struct page_cache_descr *pg_cache_descr;
  957. /* Free committed page index */
  958. ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
  959. assert(NULL == pg_cache->committed_page_index.JudyL_array);
  960. bytes_freed += ret_Judy;
  961. for (page_index = pg_cache->metrics_index.last_page_index ;
  962. page_index != NULL ;
  963. page_index = prev_page_index) {
  964. prev_page_index = page_index->prev;
  965. /* Find first page in range */
  966. Index = (Word_t) 0;
  967. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  968. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  969. while (descr != NULL) {
  970. /* Iterate all page descriptors of this metric */
  971. if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  972. /* Check rrdenglocking.c */
  973. pg_cache_descr = descr->pg_cache_descr;
  974. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  975. freez(pg_cache_descr->page);
  976. bytes_freed += RRDENG_BLOCK_SIZE;
  977. }
  978. rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
  979. bytes_freed += sizeof(*pg_cache_descr);
  980. }
  981. freez(descr);
  982. bytes_freed += sizeof(*descr);
  983. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
  984. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  985. }
  986. /* Free page index */
  987. ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
  988. assert(NULL == page_index->JudyL_array);
  989. bytes_freed += ret_Judy;
  990. freez(page_index);
  991. bytes_freed += sizeof(*page_index);
  992. }
  993. /* Free metrics index */
  994. ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
  995. assert(NULL == pg_cache->metrics_index.JudyHS_array);
  996. bytes_freed += ret_Judy;
  997. info("Freed %lu bytes of memory from page cache.", bytes_freed);
  998. }