pagecache.c 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. /* Forward declerations */
  5. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
  6. /* always inserts into tail */
  7. static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
  8. struct rrdeng_page_cache_descr *descr)
  9. {
  10. struct page_cache *pg_cache = &ctx->pg_cache;
  11. if (likely(NULL != pg_cache->replaceQ.tail)) {
  12. descr->prev = pg_cache->replaceQ.tail;
  13. pg_cache->replaceQ.tail->next = descr;
  14. }
  15. if (unlikely(NULL == pg_cache->replaceQ.head)) {
  16. pg_cache->replaceQ.head = descr;
  17. }
  18. pg_cache->replaceQ.tail = descr;
  19. }
  20. static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
  21. struct rrdeng_page_cache_descr *descr)
  22. {
  23. struct page_cache *pg_cache = &ctx->pg_cache;
  24. struct rrdeng_page_cache_descr *prev, *next;
  25. prev = descr->prev;
  26. next = descr->next;
  27. if (likely(NULL != prev)) {
  28. prev->next = next;
  29. }
  30. if (likely(NULL != next)) {
  31. next->prev = prev;
  32. }
  33. if (unlikely(descr == pg_cache->replaceQ.head)) {
  34. pg_cache->replaceQ.head = next;
  35. }
  36. if (unlikely(descr == pg_cache->replaceQ.tail)) {
  37. pg_cache->replaceQ.tail = prev;
  38. }
  39. descr->prev = descr->next = NULL;
  40. }
  41. void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
  42. struct rrdeng_page_cache_descr *descr)
  43. {
  44. struct page_cache *pg_cache = &ctx->pg_cache;
  45. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  46. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  47. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  48. }
  49. void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
  50. struct rrdeng_page_cache_descr *descr)
  51. {
  52. struct page_cache *pg_cache = &ctx->pg_cache;
  53. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  54. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  55. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  56. }
  57. void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
  58. struct rrdeng_page_cache_descr *descr)
  59. {
  60. struct page_cache *pg_cache = &ctx->pg_cache;
  61. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  62. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  63. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  64. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  65. }
  66. struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
  67. {
  68. struct rrdeng_page_cache_descr *descr;
  69. descr = mallocz(sizeof(*descr));
  70. descr->page = NULL;
  71. descr->page_length = 0;
  72. descr->start_time = INVALID_TIME;
  73. descr->end_time = INVALID_TIME;
  74. descr->id = NULL;
  75. descr->extent = NULL;
  76. descr->flags = 0;
  77. descr->prev = descr->next = descr->private = NULL;
  78. descr->refcnt = 0;
  79. descr->waiters = 0;
  80. descr->handle = NULL;
  81. assert(0 == uv_cond_init(&descr->cond));
  82. assert(0 == uv_mutex_init(&descr->mutex));
  83. return descr;
  84. }
  85. void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
  86. {
  87. uv_cond_destroy(&descr->cond);
  88. uv_mutex_destroy(&descr->mutex);
  89. free(descr);
  90. }
  91. /* The caller must hold page descriptor lock. */
  92. void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
  93. {
  94. if (descr->waiters)
  95. uv_cond_broadcast(&descr->cond);
  96. }
  97. /*
  98. * The caller must hold page descriptor lock.
  99. * The lock will be released and re-acquired. The descriptor is not guaranteed
  100. * to exist after this function returns.
  101. */
  102. void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
  103. {
  104. ++descr->waiters;
  105. uv_cond_wait(&descr->cond, &descr->mutex);
  106. --descr->waiters;
  107. }
  108. /*
  109. * Returns page flags.
  110. * The lock will be released and re-acquired. The descriptor is not guaranteed
  111. * to exist after this function returns.
  112. */
  113. unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
  114. {
  115. unsigned long flags;
  116. uv_mutex_lock(&descr->mutex);
  117. pg_cache_wait_event_unsafe(descr);
  118. flags = descr->flags;
  119. uv_mutex_unlock(&descr->mutex);
  120. return flags;
  121. }
  122. /*
  123. * The caller must hold page descriptor lock.
  124. * Gets a reference to the page descriptor.
  125. * Returns 1 on success and 0 on failure.
  126. */
  127. int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
  128. {
  129. if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  130. (exclusive_access && descr->refcnt)) {
  131. return 0;
  132. }
  133. if (exclusive_access)
  134. descr->flags |= RRD_PAGE_LOCKED;
  135. ++descr->refcnt;
  136. return 1;
  137. }
  138. /*
  139. * The caller must hold page descriptor lock.
  140. * Same return values as pg_cache_try_get_unsafe() without doing anything.
  141. */
  142. int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
  143. {
  144. if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  145. (exclusive_access && descr->refcnt)) {
  146. return 0;
  147. }
  148. return 1;
  149. }
  150. /*
  151. * The caller must hold the page descriptor lock.
  152. * This function may block doing cleanup.
  153. */
  154. void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
  155. {
  156. descr->flags &= ~RRD_PAGE_LOCKED;
  157. if (0 == --descr->refcnt) {
  158. pg_cache_wake_up_waiters_unsafe(descr);
  159. }
  160. /* TODO: perform cleanup */
  161. }
  162. /*
  163. * This function may block doing cleanup.
  164. */
  165. void pg_cache_put(struct rrdeng_page_cache_descr *descr)
  166. {
  167. uv_mutex_lock(&descr->mutex);
  168. pg_cache_put_unsafe(descr);
  169. uv_mutex_unlock(&descr->mutex);
  170. }
  171. /* The caller must hold the page cache lock */
  172. static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
  173. {
  174. struct page_cache *pg_cache = &ctx->pg_cache;
  175. pg_cache->populated_pages -= number;
  176. }
  177. static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
  178. {
  179. struct page_cache *pg_cache = &ctx->pg_cache;
  180. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  181. pg_cache_release_pages_unsafe(ctx, number);
  182. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  183. }
  184. /*
  185. * This function will block until it reserves #number populated pages.
  186. * It will trigger evictions or dirty page flushing if the ctx->max_cache_pages limit is hit.
  187. */
  188. static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  189. {
  190. struct page_cache *pg_cache = &ctx->pg_cache;
  191. assert(number < ctx->max_cache_pages);
  192. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  193. if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
  194. debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
  195. number);
  196. while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
  197. if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
  198. /* failed to evict */
  199. struct completion compl;
  200. struct rrdeng_cmd cmd;
  201. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  202. init_completion(&compl);
  203. cmd.opcode = RRDENG_FLUSH_PAGES;
  204. cmd.completion = &compl;
  205. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  206. /* wait for some pages to be flushed */
  207. debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
  208. wait_for_completion(&compl);
  209. destroy_completion(&compl);
  210. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  211. }
  212. }
  213. pg_cache->populated_pages += number;
  214. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  215. }
  216. /*
  217. * This function will attempt to reserve #number populated pages.
  218. * It may trigger evictions if the ctx->cache_pages_low_watermark limit is hit.
  219. * Returns 0 on failure and 1 on success.
  220. */
  221. static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  222. {
  223. struct page_cache *pg_cache = &ctx->pg_cache;
  224. unsigned count = 0;
  225. int ret = 0;
  226. assert(number < ctx->max_cache_pages);
  227. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  228. if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
  229. debug(D_RRDENGINE,
  230. "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
  231. number);
  232. do {
  233. if (!pg_cache_try_evict_one_page_unsafe(ctx))
  234. break;
  235. ++count;
  236. } while (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1);
  237. debug(D_RRDENGINE, "Evicted %u pages.", count);
  238. }
  239. if (pg_cache->populated_pages + number < ctx->max_cache_pages + 1) {
  240. pg_cache->populated_pages += number;
  241. ret = 1; /* success */
  242. }
  243. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  244. return ret;
  245. }
  246. /* The caller must hold the page cache and the page descriptor locks in that order */
  247. static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
  248. {
  249. free(descr->page);
  250. descr->page = NULL;
  251. descr->flags &= ~RRD_PAGE_POPULATED;
  252. pg_cache_release_pages_unsafe(ctx, 1);
  253. ++ctx->stats.pg_cache_evictions;
  254. }
  255. /*
  256. * The caller must hold the page cache lock.
  257. * Lock order: page cache -> replaceQ -> descriptor
  258. * This function iterates all pages and tries to evict one.
  259. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
  260. * or it sets it to NULL if no write-back is in progress.
  261. *
  262. * Returns 1 on success and 0 on failure.
  263. */
  264. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
  265. {
  266. struct page_cache *pg_cache = &ctx->pg_cache;
  267. unsigned long old_flags;
  268. struct rrdeng_page_cache_descr *descr;
  269. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  270. for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
  271. uv_mutex_lock(&descr->mutex);
  272. old_flags = descr->flags;
  273. if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
  274. /* must evict */
  275. pg_cache_evict_unsafe(ctx, descr);
  276. pg_cache_put_unsafe(descr);
  277. uv_mutex_unlock(&descr->mutex);
  278. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  279. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  280. return 1;
  281. }
  282. uv_mutex_unlock(&descr->mutex);
  283. };
  284. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  285. /* failed to evict */
  286. return 0;
  287. }
  288. /*
  289. * TODO: last waiter frees descriptor ?
  290. */
  291. void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
  292. {
  293. struct page_cache *pg_cache = &ctx->pg_cache;
  294. Pvoid_t *PValue;
  295. struct pg_cache_page_index *page_index;
  296. int ret;
  297. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  298. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  299. assert(NULL != PValue);
  300. page_index = *PValue;
  301. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  302. uv_rwlock_wrlock(&page_index->lock);
  303. ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  304. uv_rwlock_wrunlock(&page_index->lock);
  305. if (unlikely(0 == ret)) {
  306. error("Page under deletion was not in index.");
  307. if (unlikely(debug_flags & D_RRDENGINE))
  308. print_page_cache_descr(descr);
  309. goto destroy;
  310. }
  311. assert(1 == ret);
  312. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  313. ++ctx->stats.pg_cache_deletions;
  314. --pg_cache->page_descriptors;
  315. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  316. uv_mutex_lock(&descr->mutex);
  317. while (!pg_cache_try_get_unsafe(descr, 1)) {
  318. debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
  319. if(unlikely(debug_flags & D_RRDENGINE))
  320. print_page_cache_descr(descr);
  321. pg_cache_wait_event_unsafe(descr);
  322. }
  323. /* even a locked page could be dirty */
  324. while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
  325. debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
  326. if (unlikely(debug_flags & D_RRDENGINE))
  327. print_page_cache_descr(descr);
  328. pg_cache_wait_event_unsafe(descr);
  329. }
  330. uv_mutex_unlock(&descr->mutex);
  331. if (descr->flags & RRD_PAGE_POPULATED) {
  332. /* only after locking can it be safely deleted from LRU */
  333. pg_cache_replaceQ_delete(ctx, descr);
  334. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  335. pg_cache_evict_unsafe(ctx, descr);
  336. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  337. }
  338. pg_cache_put(descr);
  339. destroy:
  340. pg_cache_destroy_descr(descr);
  341. pg_cache_update_metric_times(page_index);
  342. }
  343. static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
  344. {
  345. usec_t pg_start, pg_end;
  346. pg_start = descr->start_time;
  347. pg_end = descr->end_time;
  348. return (pg_start < start_time && pg_end >= start_time) ||
  349. (pg_start >= start_time && pg_start <= end_time);
  350. }
  351. static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
  352. {
  353. return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
  354. }
  355. /* Update metric oldest and latest timestamps efficiently when adding new values */
  356. void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
  357. {
  358. usec_t oldest_time = page_index->oldest_time;
  359. usec_t latest_time = page_index->latest_time;
  360. if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
  361. page_index->oldest_time = descr->start_time;
  362. }
  363. if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
  364. page_index->latest_time = descr->end_time;
  365. }
  366. }
  367. /* Update metric oldest and latest timestamps when removing old values */
  368. void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
  369. {
  370. Pvoid_t *firstPValue, *lastPValue;
  371. Word_t firstIndex, lastIndex;
  372. struct rrdeng_page_cache_descr *descr;
  373. usec_t oldest_time = INVALID_TIME;
  374. usec_t latest_time = INVALID_TIME;
  375. uv_rwlock_rdlock(&page_index->lock);
  376. /* Find first page in range */
  377. firstIndex = (Word_t)0;
  378. firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
  379. if (likely(NULL != firstPValue)) {
  380. descr = *firstPValue;
  381. oldest_time = descr->start_time;
  382. }
  383. lastIndex = (Word_t)-1;
  384. lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
  385. if (likely(NULL != lastPValue)) {
  386. descr = *lastPValue;
  387. latest_time = descr->end_time;
  388. }
  389. uv_rwlock_rdunlock(&page_index->lock);
  390. if (unlikely(NULL == firstPValue)) {
  391. assert(NULL == lastPValue);
  392. page_index->oldest_time = page_index->latest_time = INVALID_TIME;
  393. return;
  394. }
  395. page_index->oldest_time = oldest_time;
  396. page_index->latest_time = latest_time;
  397. }
  398. /* If index is NULL lookup by UUID (descr->id) */
  399. void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
  400. struct rrdeng_page_cache_descr *descr)
  401. {
  402. struct page_cache *pg_cache = &ctx->pg_cache;
  403. Pvoid_t *PValue;
  404. struct pg_cache_page_index *page_index;
  405. if (descr->flags & RRD_PAGE_POPULATED) {
  406. pg_cache_reserve_pages(ctx, 1);
  407. if (!(descr->flags & RRD_PAGE_DIRTY))
  408. pg_cache_replaceQ_insert(ctx, descr);
  409. }
  410. if (unlikely(NULL == index)) {
  411. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  412. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  413. assert(NULL != PValue);
  414. page_index = *PValue;
  415. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  416. } else {
  417. page_index = index;
  418. }
  419. uv_rwlock_wrlock(&page_index->lock);
  420. PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  421. *PValue = descr;
  422. pg_cache_add_new_metric_time(page_index, descr);
  423. uv_rwlock_wrunlock(&page_index->lock);
  424. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  425. ++ctx->stats.pg_cache_insertions;
  426. ++pg_cache->page_descriptors;
  427. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  428. }
  429. /*
  430. * Searches for a page and triggers disk I/O if necessary and possible.
  431. * Does not get a reference.
  432. * Returns page index pointer for given metric UUID.
  433. */
  434. struct pg_cache_page_index *
  435. pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
  436. {
  437. struct page_cache *pg_cache = &ctx->pg_cache;
  438. struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
  439. int i, j, k, count, found;
  440. unsigned long flags;
  441. Pvoid_t *PValue;
  442. struct pg_cache_page_index *page_index;
  443. Word_t Index;
  444. uint8_t failed_to_reserve;
  445. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  446. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  447. if (likely(NULL != PValue)) {
  448. page_index = *PValue;
  449. }
  450. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  451. if (NULL == PValue) {
  452. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  453. return NULL;
  454. }
  455. uv_rwlock_rdlock(&page_index->lock);
  456. /* Find first page in range */
  457. found = 0;
  458. Index = (Word_t)(start_time / USEC_PER_SEC);
  459. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  460. if (likely(NULL != PValue)) {
  461. descr = *PValue;
  462. if (is_page_in_time_range(descr, start_time, end_time)) {
  463. found = 1;
  464. }
  465. }
  466. if (!found) {
  467. Index = (Word_t)(start_time / USEC_PER_SEC);
  468. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  469. if (likely(NULL != PValue)) {
  470. descr = *PValue;
  471. if (is_page_in_time_range(descr, start_time, end_time)) {
  472. found = 1;
  473. }
  474. }
  475. }
  476. if (!found) {
  477. uv_rwlock_rdunlock(&page_index->lock);
  478. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  479. return page_index;
  480. }
  481. for (count = 0 ;
  482. descr != NULL && is_page_in_time_range(descr, start_time, end_time);
  483. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
  484. descr = unlikely(NULL == PValue) ? NULL : *PValue) {
  485. /* Iterate all pages in range */
  486. if (unlikely(0 == descr->page_length))
  487. continue;
  488. uv_mutex_lock(&descr->mutex);
  489. flags = descr->flags;
  490. if (pg_cache_can_get_unsafe(descr, 0)) {
  491. if (flags & RRD_PAGE_POPULATED) {
  492. /* success */
  493. uv_mutex_unlock(&descr->mutex);
  494. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  495. continue;
  496. }
  497. }
  498. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  499. preload_array[count++] = descr;
  500. if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
  501. uv_mutex_unlock(&descr->mutex);
  502. break;
  503. }
  504. }
  505. uv_mutex_unlock(&descr->mutex);
  506. };
  507. uv_rwlock_rdunlock(&page_index->lock);
  508. failed_to_reserve = 0;
  509. for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
  510. struct rrdeng_cmd cmd;
  511. struct rrdeng_page_cache_descr *next;
  512. descr = preload_array[i];
  513. if (NULL == descr) {
  514. continue;
  515. }
  516. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  517. failed_to_reserve = 1;
  518. break;
  519. }
  520. cmd.opcode = RRDENG_READ_EXTENT;
  521. cmd.read_extent.page_cache_descr[0] = descr;
  522. /* don't use this page again */
  523. preload_array[i] = NULL;
  524. for (j = 0, k = 1 ; j < count ; ++j) {
  525. next = preload_array[j];
  526. if (NULL == next) {
  527. continue;
  528. }
  529. if (descr->extent == next->extent) {
  530. /* same extent, consolidate */
  531. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  532. failed_to_reserve = 1;
  533. break;
  534. }
  535. cmd.read_extent.page_cache_descr[k++] = next;
  536. /* don't use this page again */
  537. preload_array[j] = NULL;
  538. }
  539. }
  540. cmd.read_extent.page_count = k;
  541. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  542. }
  543. if (failed_to_reserve) {
  544. debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
  545. for (i = 0 ; i < count ; ++i) {
  546. descr = preload_array[i];
  547. if (NULL == descr) {
  548. continue;
  549. }
  550. pg_cache_put(descr);
  551. }
  552. }
  553. if (!count) {
  554. /* no such page */
  555. debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
  556. }
  557. return page_index;
  558. }
  559. /*
  560. * Searches for a page and gets a reference.
  561. * When point_in_time is INVALID_TIME get any page.
  562. * If index is NULL lookup by UUID (id).
  563. */
  564. struct rrdeng_page_cache_descr *
  565. pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  566. usec_t point_in_time)
  567. {
  568. struct page_cache *pg_cache = &ctx->pg_cache;
  569. struct rrdeng_page_cache_descr *descr = NULL;
  570. unsigned long flags;
  571. Pvoid_t *PValue;
  572. struct pg_cache_page_index *page_index;
  573. Word_t Index;
  574. uint8_t page_not_in_cache;
  575. if (unlikely(NULL == index)) {
  576. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  577. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  578. if (likely(NULL != PValue)) {
  579. page_index = *PValue;
  580. }
  581. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  582. if (NULL == PValue) {
  583. return NULL;
  584. }
  585. } else {
  586. page_index = index;
  587. }
  588. pg_cache_reserve_pages(ctx, 1);
  589. page_not_in_cache = 0;
  590. uv_rwlock_rdlock(&page_index->lock);
  591. while (1) {
  592. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  593. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  594. if (likely(NULL != PValue)) {
  595. descr = *PValue;
  596. }
  597. if (NULL == PValue ||
  598. 0 == descr->page_length ||
  599. (INVALID_TIME != point_in_time &&
  600. !is_point_in_time_in_page(descr, point_in_time))) {
  601. /* non-empty page not found */
  602. uv_rwlock_rdunlock(&page_index->lock);
  603. pg_cache_release_pages(ctx, 1);
  604. return NULL;
  605. }
  606. uv_mutex_lock(&descr->mutex);
  607. flags = descr->flags;
  608. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  609. /* success */
  610. uv_mutex_unlock(&descr->mutex);
  611. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  612. break;
  613. }
  614. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  615. struct rrdeng_cmd cmd;
  616. uv_rwlock_rdunlock(&page_index->lock);
  617. cmd.opcode = RRDENG_READ_PAGE;
  618. cmd.read_page.page_cache_descr = descr;
  619. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  620. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  621. if(unlikely(debug_flags & D_RRDENGINE))
  622. print_page_cache_descr(descr);
  623. while (!(descr->flags & RRD_PAGE_POPULATED)) {
  624. pg_cache_wait_event_unsafe(descr);
  625. }
  626. /* success */
  627. /* Downgrade exclusive reference to allow other readers */
  628. descr->flags &= ~RRD_PAGE_LOCKED;
  629. pg_cache_wake_up_waiters_unsafe(descr);
  630. uv_mutex_unlock(&descr->mutex);
  631. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  632. return descr;
  633. }
  634. uv_rwlock_rdunlock(&page_index->lock);
  635. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  636. if(unlikely(debug_flags & D_RRDENGINE))
  637. print_page_cache_descr(descr);
  638. if (!(flags & RRD_PAGE_POPULATED))
  639. page_not_in_cache = 1;
  640. pg_cache_wait_event_unsafe(descr);
  641. uv_mutex_unlock(&descr->mutex);
  642. /* reset scan to find again */
  643. uv_rwlock_rdlock(&page_index->lock);
  644. }
  645. uv_rwlock_rdunlock(&page_index->lock);
  646. if (!(flags & RRD_PAGE_DIRTY))
  647. pg_cache_replaceQ_set_hot(ctx, descr);
  648. pg_cache_release_pages(ctx, 1);
  649. if (page_not_in_cache)
  650. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  651. else
  652. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  653. return descr;
  654. }
  655. struct pg_cache_page_index *create_page_index(uuid_t *id)
  656. {
  657. struct pg_cache_page_index *page_index;
  658. page_index = mallocz(sizeof(*page_index));
  659. page_index->JudyL_array = (Pvoid_t) NULL;
  660. uuid_copy(page_index->id, *id);
  661. assert(0 == uv_rwlock_init(&page_index->lock));
  662. page_index->oldest_time = INVALID_TIME;
  663. page_index->latest_time = INVALID_TIME;
  664. return page_index;
  665. }
  666. static void init_metrics_index(struct rrdengine_instance *ctx)
  667. {
  668. struct page_cache *pg_cache = &ctx->pg_cache;
  669. pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
  670. assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
  671. }
  672. static void init_replaceQ(struct rrdengine_instance *ctx)
  673. {
  674. struct page_cache *pg_cache = &ctx->pg_cache;
  675. pg_cache->replaceQ.head = NULL;
  676. pg_cache->replaceQ.tail = NULL;
  677. assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
  678. }
  679. static void init_commited_page_index(struct rrdengine_instance *ctx)
  680. {
  681. struct page_cache *pg_cache = &ctx->pg_cache;
  682. pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL;
  683. assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock));
  684. pg_cache->commited_page_index.latest_corr_id = 0;
  685. pg_cache->commited_page_index.nr_commited_pages = 0;
  686. }
  687. void init_page_cache(struct rrdengine_instance *ctx)
  688. {
  689. struct page_cache *pg_cache = &ctx->pg_cache;
  690. pg_cache->page_descriptors = 0;
  691. pg_cache->populated_pages = 0;
  692. assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
  693. init_metrics_index(ctx);
  694. init_replaceQ(ctx);
  695. init_commited_page_index(ctx);
  696. }