pagecache.c 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. /* Forward declarations */
  5. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
  6. /* always inserts into tail */
  7. static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
  8. struct rrdeng_page_descr *descr)
  9. {
  10. struct page_cache *pg_cache = &ctx->pg_cache;
  11. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  12. if (likely(NULL != pg_cache->replaceQ.tail)) {
  13. pg_cache_descr->prev = pg_cache->replaceQ.tail;
  14. pg_cache->replaceQ.tail->next = pg_cache_descr;
  15. }
  16. if (unlikely(NULL == pg_cache->replaceQ.head)) {
  17. pg_cache->replaceQ.head = pg_cache_descr;
  18. }
  19. pg_cache->replaceQ.tail = pg_cache_descr;
  20. }
  21. static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
  22. struct rrdeng_page_descr *descr)
  23. {
  24. struct page_cache *pg_cache = &ctx->pg_cache;
  25. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
  26. prev = pg_cache_descr->prev;
  27. next = pg_cache_descr->next;
  28. if (likely(NULL != prev)) {
  29. prev->next = next;
  30. }
  31. if (likely(NULL != next)) {
  32. next->prev = prev;
  33. }
  34. if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
  35. pg_cache->replaceQ.head = next;
  36. }
  37. if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
  38. pg_cache->replaceQ.tail = prev;
  39. }
  40. pg_cache_descr->prev = pg_cache_descr->next = NULL;
  41. }
  42. void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
  43. struct rrdeng_page_descr *descr)
  44. {
  45. struct page_cache *pg_cache = &ctx->pg_cache;
  46. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  47. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  48. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  49. }
  50. void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
  51. struct rrdeng_page_descr *descr)
  52. {
  53. struct page_cache *pg_cache = &ctx->pg_cache;
  54. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  55. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  56. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  57. }
  58. void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
  59. struct rrdeng_page_descr *descr)
  60. {
  61. struct page_cache *pg_cache = &ctx->pg_cache;
  62. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  63. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  64. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  65. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  66. }
  67. struct rrdeng_page_descr *pg_cache_create_descr(void)
  68. {
  69. struct rrdeng_page_descr *descr;
  70. descr = mallocz(sizeof(*descr));
  71. descr->page_length = 0;
  72. descr->start_time = INVALID_TIME;
  73. descr->end_time = INVALID_TIME;
  74. descr->id = NULL;
  75. descr->extent = NULL;
  76. descr->pg_cache_descr_state = 0;
  77. descr->pg_cache_descr = NULL;
  78. return descr;
  79. }
  80. /* The caller must hold page descriptor lock. */
  81. void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
  82. {
  83. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  84. if (pg_cache_descr->waiters)
  85. uv_cond_broadcast(&pg_cache_descr->cond);
  86. }
  87. void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  88. {
  89. rrdeng_page_descr_mutex_lock(ctx, descr);
  90. pg_cache_wake_up_waiters_unsafe(descr);
  91. rrdeng_page_descr_mutex_unlock(ctx, descr);
  92. }
  93. /*
  94. * The caller must hold page descriptor lock.
  95. * The lock will be released and re-acquired. The descriptor is not guaranteed
  96. * to exist after this function returns.
  97. */
  98. void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
  99. {
  100. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  101. ++pg_cache_descr->waiters;
  102. uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
  103. --pg_cache_descr->waiters;
  104. }
  105. /*
  106. * The caller must hold page descriptor lock.
  107. * The lock will be released and re-acquired. The descriptor is not guaranteed
  108. * to exist after this function returns.
  109. * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
  110. */
  111. int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
  112. {
  113. int ret;
  114. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  115. ++pg_cache_descr->waiters;
  116. ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
  117. --pg_cache_descr->waiters;
  118. return ret;
  119. }
  120. /*
  121. * Returns page flags.
  122. * The lock will be released and re-acquired. The descriptor is not guaranteed
  123. * to exist after this function returns.
  124. */
  125. unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  126. {
  127. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  128. unsigned long flags;
  129. rrdeng_page_descr_mutex_lock(ctx, descr);
  130. pg_cache_wait_event_unsafe(descr);
  131. flags = pg_cache_descr->flags;
  132. rrdeng_page_descr_mutex_unlock(ctx, descr);
  133. return flags;
  134. }
  135. /*
  136. * The caller must hold page descriptor lock.
  137. */
  138. int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  139. {
  140. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  141. if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  142. (exclusive_access && pg_cache_descr->refcnt)) {
  143. return 0;
  144. }
  145. return 1;
  146. }
  147. /*
  148. * The caller must hold page descriptor lock.
  149. * Gets a reference to the page descriptor.
  150. * Returns 1 on success and 0 on failure.
  151. */
  152. int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  153. {
  154. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  155. if (!pg_cache_can_get_unsafe(descr, exclusive_access))
  156. return 0;
  157. if (exclusive_access)
  158. pg_cache_descr->flags |= RRD_PAGE_LOCKED;
  159. ++pg_cache_descr->refcnt;
  160. return 1;
  161. }
  162. /*
  163. * The caller must hold the page descriptor lock.
  164. * This function may block doing cleanup.
  165. */
  166. void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
  167. {
  168. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  169. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  170. if (0 == --pg_cache_descr->refcnt) {
  171. pg_cache_wake_up_waiters_unsafe(descr);
  172. }
  173. }
  174. /*
  175. * This function may block doing cleanup.
  176. */
  177. void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  178. {
  179. rrdeng_page_descr_mutex_lock(ctx, descr);
  180. pg_cache_put_unsafe(descr);
  181. rrdeng_page_descr_mutex_unlock(ctx, descr);
  182. }
  183. /* The caller must hold the page cache lock */
  184. static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
  185. {
  186. struct page_cache *pg_cache = &ctx->pg_cache;
  187. pg_cache->populated_pages -= number;
  188. }
  189. static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
  190. {
  191. struct page_cache *pg_cache = &ctx->pg_cache;
  192. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  193. pg_cache_release_pages_unsafe(ctx, number);
  194. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  195. }
  196. /*
  197. * This function returns the maximum number of pages allowed in the page cache.
  198. */
  199. unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
  200. {
  201. /* it's twice the number of producers since we pin 2 pages per producer */
  202. return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers;
  203. }
  204. /*
  205. * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
  206. * number of pages below that number.
  207. */
  208. unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
  209. {
  210. /* it's twice the number of producers since we pin 2 pages per producer */
  211. return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers;
  212. }
  213. /*
  214. * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
  215. * cache.
  216. */
  217. unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
  218. {
  219. /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
  220. return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
  221. }
  222. /*
  223. * This function will block until it reserves #number populated pages.
  224. * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
  225. */
  226. static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  227. {
  228. struct page_cache *pg_cache = &ctx->pg_cache;
  229. unsigned failures = 0;
  230. const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
  231. unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
  232. assert(number < ctx->max_cache_pages);
  233. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  234. if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
  235. debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
  236. number);
  237. while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
  238. if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
  239. /* failed to evict */
  240. struct completion compl;
  241. struct rrdeng_cmd cmd;
  242. ++failures;
  243. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  244. completion_init(&compl);
  245. cmd.opcode = RRDENG_FLUSH_PAGES;
  246. cmd.completion = &compl;
  247. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  248. /* wait for some pages to be flushed */
  249. debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
  250. completion_wait_for(&compl);
  251. completion_destroy(&compl);
  252. if (unlikely(failures > 1)) {
  253. unsigned long slots, usecs_to_sleep;
  254. /* exponential backoff */
  255. slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
  256. usecs_to_sleep = slots * exp_backoff_slot_usec;
  257. if (usecs_to_sleep >= USEC_PER_SEC)
  258. error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC);
  259. (void)sleep_usec(usecs_to_sleep);
  260. }
  261. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  262. }
  263. }
  264. pg_cache->populated_pages += number;
  265. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  266. }
  267. /*
  268. * This function will attempt to reserve #number populated pages.
  269. * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
  270. * Returns 0 on failure and 1 on success.
  271. */
  272. static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  273. {
  274. struct page_cache *pg_cache = &ctx->pg_cache;
  275. unsigned count = 0;
  276. int ret = 0;
  277. assert(number < ctx->max_cache_pages);
  278. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  279. if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
  280. debug(D_RRDENGINE,
  281. "==Page cache full. Trying to reserve %u pages.==",
  282. number);
  283. do {
  284. if (!pg_cache_try_evict_one_page_unsafe(ctx))
  285. break;
  286. ++count;
  287. } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
  288. debug(D_RRDENGINE, "Evicted %u pages.", count);
  289. }
  290. if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
  291. pg_cache->populated_pages += number;
  292. ret = 1; /* success */
  293. }
  294. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  295. return ret;
  296. }
  297. /* The caller must hold the page cache and the page descriptor locks in that order */
  298. static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  299. {
  300. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  301. dbengine_page_free(pg_cache_descr->page);
  302. pg_cache_descr->page = NULL;
  303. pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
  304. pg_cache_release_pages_unsafe(ctx, 1);
  305. ++ctx->stats.pg_cache_evictions;
  306. }
  307. /*
  308. * The caller must hold the page cache lock.
  309. * Lock order: page cache -> replaceQ -> page descriptor
  310. * This function iterates all pages and tries to evict one.
  311. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
  312. * or it sets it to NULL if no write-back is in progress.
  313. *
  314. * Returns 1 on success and 0 on failure.
  315. */
  316. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
  317. {
  318. struct page_cache *pg_cache = &ctx->pg_cache;
  319. unsigned long old_flags;
  320. struct rrdeng_page_descr *descr;
  321. struct page_cache_descr *pg_cache_descr = NULL;
  322. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  323. for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
  324. descr = pg_cache_descr->descr;
  325. rrdeng_page_descr_mutex_lock(ctx, descr);
  326. old_flags = pg_cache_descr->flags;
  327. if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
  328. /* must evict */
  329. pg_cache_evict_unsafe(ctx, descr);
  330. pg_cache_put_unsafe(descr);
  331. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  332. rrdeng_page_descr_mutex_unlock(ctx, descr);
  333. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  334. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  335. return 1;
  336. }
  337. rrdeng_page_descr_mutex_unlock(ctx, descr);
  338. }
  339. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  340. /* failed to evict */
  341. return 0;
  342. }
  343. /**
  344. * Deletes a page from the database.
  345. * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
  346. * @param ctx is the database instance.
  347. * @param descr is the page descriptor.
  348. * @param remove_dirty must be non-zero if the page to be deleted is dirty.
  349. * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
  350. * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
  351. * NULL. Otherwise, metric_id is not set.
  352. * @return 1 if it's safe to delete the metric, 0 otherwise.
  353. */
  354. uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
  355. uint8_t is_exclusive_holder, uuid_t *metric_id)
  356. {
  357. struct page_cache *pg_cache = &ctx->pg_cache;
  358. struct page_cache_descr *pg_cache_descr = NULL;
  359. Pvoid_t *PValue;
  360. struct pg_cache_page_index *page_index = NULL;
  361. int ret;
  362. uint8_t can_delete_metric = 0;
  363. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  364. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  365. fatal_assert(NULL != PValue);
  366. page_index = *PValue;
  367. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  368. uv_rwlock_wrlock(&page_index->lock);
  369. ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  370. if (unlikely(0 == ret)) {
  371. uv_rwlock_wrunlock(&page_index->lock);
  372. if (unlikely(debug_flags & D_RRDENGINE)) {
  373. print_page_descr(descr);
  374. }
  375. goto destroy;
  376. }
  377. --page_index->page_count;
  378. if (!page_index->writers && !page_index->page_count) {
  379. can_delete_metric = 1;
  380. if (metric_id) {
  381. memcpy(metric_id, page_index->id, sizeof(uuid_t));
  382. }
  383. }
  384. uv_rwlock_wrunlock(&page_index->lock);
  385. fatal_assert(1 == ret);
  386. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  387. ++ctx->stats.pg_cache_deletions;
  388. --pg_cache->page_descriptors;
  389. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  390. rrdeng_page_descr_mutex_lock(ctx, descr);
  391. pg_cache_descr = descr->pg_cache_descr;
  392. if (!is_exclusive_holder) {
  393. /* If we don't hold an exclusive page reference get one */
  394. while (!pg_cache_try_get_unsafe(descr, 1)) {
  395. debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
  396. if (unlikely(debug_flags & D_RRDENGINE))
  397. print_page_cache_descr(descr);
  398. pg_cache_wait_event_unsafe(descr);
  399. }
  400. }
  401. if (remove_dirty) {
  402. pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
  403. } else {
  404. /* even a locked page could be dirty */
  405. while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
  406. debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
  407. if (unlikely(debug_flags & D_RRDENGINE))
  408. print_page_cache_descr(descr);
  409. pg_cache_wait_event_unsafe(descr);
  410. }
  411. }
  412. rrdeng_page_descr_mutex_unlock(ctx, descr);
  413. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  414. /* only after locking can it be safely deleted from LRU */
  415. pg_cache_replaceQ_delete(ctx, descr);
  416. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  417. pg_cache_evict_unsafe(ctx, descr);
  418. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  419. }
  420. pg_cache_put(ctx, descr);
  421. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  422. while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  423. rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
  424. (void)sleep_usec(1000); /* 1 msec */
  425. }
  426. destroy:
  427. freez(descr);
  428. pg_cache_update_metric_times(page_index);
  429. return can_delete_metric;
  430. }
  431. static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
  432. {
  433. usec_t pg_start, pg_end;
  434. pg_start = descr->start_time;
  435. pg_end = descr->end_time;
  436. return (pg_start < start_time && pg_end >= start_time) ||
  437. (pg_start >= start_time && pg_start <= end_time);
  438. }
  439. static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
  440. {
  441. return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
  442. }
  443. /* The caller must hold the page index lock */
  444. static inline struct rrdeng_page_descr *
  445. find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
  446. {
  447. struct rrdeng_page_descr *descr = NULL;
  448. Pvoid_t *PValue;
  449. Word_t Index;
  450. Index = (Word_t)(start_time / USEC_PER_SEC);
  451. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  452. if (likely(NULL != PValue)) {
  453. descr = *PValue;
  454. if (is_page_in_time_range(descr, start_time, end_time)) {
  455. return descr;
  456. }
  457. }
  458. Index = (Word_t)(start_time / USEC_PER_SEC);
  459. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  460. if (likely(NULL != PValue)) {
  461. descr = *PValue;
  462. if (is_page_in_time_range(descr, start_time, end_time)) {
  463. return descr;
  464. }
  465. }
  466. return NULL;
  467. }
  468. /* Update metric oldest and latest timestamps efficiently when adding new values */
  469. void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
  470. {
  471. usec_t oldest_time = page_index->oldest_time;
  472. usec_t latest_time = page_index->latest_time;
  473. if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
  474. page_index->oldest_time = descr->start_time;
  475. }
  476. if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
  477. page_index->latest_time = descr->end_time;
  478. }
  479. }
  480. /* Update metric oldest and latest timestamps when removing old values */
  481. void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
  482. {
  483. Pvoid_t *firstPValue, *lastPValue;
  484. Word_t firstIndex, lastIndex;
  485. struct rrdeng_page_descr *descr;
  486. usec_t oldest_time = INVALID_TIME;
  487. usec_t latest_time = INVALID_TIME;
  488. uv_rwlock_rdlock(&page_index->lock);
  489. /* Find first page in range */
  490. firstIndex = (Word_t)0;
  491. firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
  492. if (likely(NULL != firstPValue)) {
  493. descr = *firstPValue;
  494. oldest_time = descr->start_time;
  495. }
  496. lastIndex = (Word_t)-1;
  497. lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
  498. if (likely(NULL != lastPValue)) {
  499. descr = *lastPValue;
  500. latest_time = descr->end_time;
  501. }
  502. uv_rwlock_rdunlock(&page_index->lock);
  503. if (unlikely(NULL == firstPValue)) {
  504. fatal_assert(NULL == lastPValue);
  505. page_index->oldest_time = page_index->latest_time = INVALID_TIME;
  506. return;
  507. }
  508. page_index->oldest_time = oldest_time;
  509. page_index->latest_time = latest_time;
  510. }
  511. /* If index is NULL lookup by UUID (descr->id) */
  512. void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
  513. struct rrdeng_page_descr *descr)
  514. {
  515. struct page_cache *pg_cache = &ctx->pg_cache;
  516. Pvoid_t *PValue;
  517. struct pg_cache_page_index *page_index;
  518. unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
  519. if (0 != pg_cache_descr_state) {
  520. /* there is page cache descriptor pre-allocated state */
  521. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  522. fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
  523. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  524. pg_cache_reserve_pages(ctx, 1);
  525. if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
  526. pg_cache_replaceQ_insert(ctx, descr);
  527. }
  528. }
  529. if (unlikely(NULL == index)) {
  530. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  531. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  532. fatal_assert(NULL != PValue);
  533. page_index = *PValue;
  534. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  535. } else {
  536. page_index = index;
  537. }
  538. uv_rwlock_wrlock(&page_index->lock);
  539. PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  540. *PValue = descr;
  541. ++page_index->page_count;
  542. pg_cache_add_new_metric_time(page_index, descr);
  543. uv_rwlock_wrunlock(&page_index->lock);
  544. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  545. ++ctx->stats.pg_cache_insertions;
  546. ++pg_cache->page_descriptors;
  547. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  548. }
  549. usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
  550. {
  551. struct page_cache *pg_cache = &ctx->pg_cache;
  552. struct rrdeng_page_descr *descr = NULL;
  553. Pvoid_t *PValue;
  554. struct pg_cache_page_index *page_index = NULL;
  555. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  556. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  557. if (likely(NULL != PValue)) {
  558. page_index = *PValue;
  559. }
  560. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  561. if (NULL == PValue) {
  562. return INVALID_TIME;
  563. }
  564. uv_rwlock_rdlock(&page_index->lock);
  565. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  566. if (NULL == descr) {
  567. uv_rwlock_rdunlock(&page_index->lock);
  568. return INVALID_TIME;
  569. }
  570. uv_rwlock_rdunlock(&page_index->lock);
  571. return descr->start_time;
  572. }
  573. /**
  574. * Return page information for the first page before point_in_time that satisfies the filter.
  575. * @param ctx DB context
  576. * @param page_index page index of a metric
  577. * @param point_in_time the pages that are searched must be older than this timestamp
  578. * @param filter decides if the page satisfies the caller's criteria
  579. * @param page_info the result of the search is set in this pointer
  580. */
  581. void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
  582. usec_t point_in_time, pg_cache_page_info_filter_t *filter,
  583. struct rrdeng_page_info *page_info)
  584. {
  585. struct page_cache *pg_cache = &ctx->pg_cache;
  586. struct rrdeng_page_descr *descr = NULL;
  587. Pvoid_t *PValue;
  588. Word_t Index;
  589. (void)pg_cache;
  590. fatal_assert(NULL != page_index);
  591. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  592. uv_rwlock_rdlock(&page_index->lock);
  593. do {
  594. PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
  595. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  596. } while (descr != NULL && !filter(descr));
  597. if (unlikely(NULL == descr)) {
  598. page_info->page_length = 0;
  599. page_info->start_time = INVALID_TIME;
  600. page_info->end_time = INVALID_TIME;
  601. } else {
  602. page_info->page_length = descr->page_length;
  603. page_info->start_time = descr->start_time;
  604. page_info->end_time = descr->end_time;
  605. }
  606. uv_rwlock_rdunlock(&page_index->lock);
  607. }
  608. /**
  609. * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
  610. * @param ctx DB context
  611. * @param id lookup by UUID
  612. * @param start_time exact starting time in usec
  613. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  614. * @return the page descriptor or NULL on failure. It can fail if:
  615. * 1. The page is already allocated to the page cache.
  616. * 2. It did not succeed to get a reference.
  617. * 3. It did not succeed to reserve a spot in the page cache.
  618. */
  619. struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
  620. usec_t start_time)
  621. {
  622. struct page_cache *pg_cache = &ctx->pg_cache;
  623. struct rrdeng_page_descr *descr = NULL;
  624. struct page_cache_descr *pg_cache_descr = NULL;
  625. unsigned long flags;
  626. Pvoid_t *PValue;
  627. struct pg_cache_page_index *page_index = NULL;
  628. Word_t Index;
  629. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  630. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  631. if (likely(NULL != PValue)) {
  632. page_index = *PValue;
  633. }
  634. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  635. if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
  636. /* Failed to find page or failed to reserve a spot in the cache */
  637. return NULL;
  638. }
  639. uv_rwlock_rdlock(&page_index->lock);
  640. Index = (Word_t)(start_time / USEC_PER_SEC);
  641. PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
  642. if (likely(NULL != PValue)) {
  643. descr = *PValue;
  644. }
  645. if (NULL == PValue || 0 == descr->page_length) {
  646. /* Failed to find non-empty page */
  647. uv_rwlock_rdunlock(&page_index->lock);
  648. pg_cache_release_pages(ctx, 1);
  649. return NULL;
  650. }
  651. rrdeng_page_descr_mutex_lock(ctx, descr);
  652. pg_cache_descr = descr->pg_cache_descr;
  653. flags = pg_cache_descr->flags;
  654. uv_rwlock_rdunlock(&page_index->lock);
  655. if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) {
  656. /* Failed to get reference or page is already populated */
  657. rrdeng_page_descr_mutex_unlock(ctx, descr);
  658. pg_cache_release_pages(ctx, 1);
  659. return NULL;
  660. }
  661. /* success */
  662. rrdeng_page_descr_mutex_unlock(ctx, descr);
  663. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  664. return descr;
  665. }
  666. /**
  667. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  668. * Does not get a reference.
  669. * @param ctx DB context
  670. * @param id UUID
  671. * @param start_time inclusive starting time in usec
  672. * @param end_time inclusive ending time in usec
  673. * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
  674. * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
  675. * If page_info_arrayp is set to NULL nothing was allocated.
  676. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  677. * @return the number of pages that overlap with the time range [start_time,end_time].
  678. */
  679. unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
  680. struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
  681. {
  682. struct page_cache *pg_cache = &ctx->pg_cache;
  683. struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
  684. struct page_cache_descr *pg_cache_descr = NULL;
  685. unsigned i, j, k, preload_count, count, page_info_array_max_size;
  686. unsigned long flags;
  687. Pvoid_t *PValue;
  688. struct pg_cache_page_index *page_index = NULL;
  689. Word_t Index;
  690. uint8_t failed_to_reserve;
  691. fatal_assert(NULL != ret_page_indexp);
  692. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  693. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  694. if (likely(NULL != PValue)) {
  695. *ret_page_indexp = page_index = *PValue;
  696. }
  697. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  698. if (NULL == PValue) {
  699. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  700. *ret_page_indexp = NULL;
  701. return 0;
  702. }
  703. uv_rwlock_rdlock(&page_index->lock);
  704. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  705. if (NULL == descr) {
  706. uv_rwlock_rdunlock(&page_index->lock);
  707. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  708. *ret_page_indexp = NULL;
  709. return 0;
  710. } else {
  711. Index = (Word_t)(descr->start_time / USEC_PER_SEC);
  712. }
  713. if (page_info_arrayp) {
  714. page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  715. *page_info_arrayp = mallocz(page_info_array_max_size);
  716. }
  717. for (count = 0, preload_count = 0 ;
  718. descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
  719. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
  720. descr = unlikely(NULL == PValue) ? NULL : *PValue) {
  721. /* Iterate all pages in range */
  722. if (unlikely(0 == descr->page_length))
  723. continue;
  724. if (page_info_arrayp) {
  725. if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
  726. page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  727. *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
  728. }
  729. (*page_info_arrayp)[count].start_time = descr->start_time;
  730. (*page_info_arrayp)[count].end_time = descr->end_time;
  731. (*page_info_arrayp)[count].page_length = descr->page_length;
  732. }
  733. ++count;
  734. rrdeng_page_descr_mutex_lock(ctx, descr);
  735. pg_cache_descr = descr->pg_cache_descr;
  736. flags = pg_cache_descr->flags;
  737. if (pg_cache_can_get_unsafe(descr, 0)) {
  738. if (flags & RRD_PAGE_POPULATED) {
  739. /* success */
  740. rrdeng_page_descr_mutex_unlock(ctx, descr);
  741. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  742. continue;
  743. }
  744. }
  745. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  746. preload_array[preload_count++] = descr;
  747. if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
  748. rrdeng_page_descr_mutex_unlock(ctx, descr);
  749. break;
  750. }
  751. }
  752. rrdeng_page_descr_mutex_unlock(ctx, descr);
  753. }
  754. uv_rwlock_rdunlock(&page_index->lock);
  755. failed_to_reserve = 0;
  756. for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
  757. struct rrdeng_cmd cmd;
  758. struct rrdeng_page_descr *next;
  759. descr = preload_array[i];
  760. if (NULL == descr) {
  761. continue;
  762. }
  763. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  764. failed_to_reserve = 1;
  765. break;
  766. }
  767. cmd.opcode = RRDENG_READ_EXTENT;
  768. cmd.read_extent.page_cache_descr[0] = descr;
  769. /* don't use this page again */
  770. preload_array[i] = NULL;
  771. for (j = 0, k = 1 ; j < preload_count ; ++j) {
  772. next = preload_array[j];
  773. if (NULL == next) {
  774. continue;
  775. }
  776. if (descr->extent == next->extent) {
  777. /* same extent, consolidate */
  778. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  779. failed_to_reserve = 1;
  780. break;
  781. }
  782. cmd.read_extent.page_cache_descr[k++] = next;
  783. /* don't use this page again */
  784. preload_array[j] = NULL;
  785. }
  786. }
  787. cmd.read_extent.page_count = k;
  788. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  789. }
  790. if (failed_to_reserve) {
  791. debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
  792. for (i = 0 ; i < preload_count ; ++i) {
  793. descr = preload_array[i];
  794. if (NULL == descr) {
  795. continue;
  796. }
  797. pg_cache_put(ctx, descr);
  798. }
  799. }
  800. if (!preload_count) {
  801. /* no such page */
  802. debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
  803. }
  804. if (unlikely(0 == count && page_info_arrayp)) {
  805. freez(*page_info_arrayp);
  806. *page_info_arrayp = NULL;
  807. }
  808. return count;
  809. }
  810. /*
  811. * Searches for a page and gets a reference.
  812. * When point_in_time is INVALID_TIME get any page.
  813. * If index is NULL lookup by UUID (id).
  814. */
  815. struct rrdeng_page_descr *
  816. pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  817. usec_t point_in_time)
  818. {
  819. struct page_cache *pg_cache = &ctx->pg_cache;
  820. struct rrdeng_page_descr *descr = NULL;
  821. struct page_cache_descr *pg_cache_descr = NULL;
  822. unsigned long flags;
  823. Pvoid_t *PValue;
  824. struct pg_cache_page_index *page_index = NULL;
  825. Word_t Index;
  826. uint8_t page_not_in_cache;
  827. if (unlikely(NULL == index)) {
  828. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  829. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  830. if (likely(NULL != PValue)) {
  831. page_index = *PValue;
  832. }
  833. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  834. if (NULL == PValue) {
  835. return NULL;
  836. }
  837. } else {
  838. page_index = index;
  839. }
  840. pg_cache_reserve_pages(ctx, 1);
  841. page_not_in_cache = 0;
  842. uv_rwlock_rdlock(&page_index->lock);
  843. while (1) {
  844. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  845. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  846. if (likely(NULL != PValue)) {
  847. descr = *PValue;
  848. }
  849. if (NULL == PValue ||
  850. 0 == descr->page_length ||
  851. (INVALID_TIME != point_in_time &&
  852. !is_point_in_time_in_page(descr, point_in_time))) {
  853. /* non-empty page not found */
  854. uv_rwlock_rdunlock(&page_index->lock);
  855. pg_cache_release_pages(ctx, 1);
  856. return NULL;
  857. }
  858. rrdeng_page_descr_mutex_lock(ctx, descr);
  859. pg_cache_descr = descr->pg_cache_descr;
  860. flags = pg_cache_descr->flags;
  861. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  862. /* success */
  863. rrdeng_page_descr_mutex_unlock(ctx, descr);
  864. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  865. break;
  866. }
  867. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  868. struct rrdeng_cmd cmd;
  869. uv_rwlock_rdunlock(&page_index->lock);
  870. cmd.opcode = RRDENG_READ_PAGE;
  871. cmd.read_page.page_cache_descr = descr;
  872. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  873. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  874. if(unlikely(debug_flags & D_RRDENGINE))
  875. print_page_cache_descr(descr);
  876. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  877. pg_cache_wait_event_unsafe(descr);
  878. }
  879. /* success */
  880. /* Downgrade exclusive reference to allow other readers */
  881. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  882. pg_cache_wake_up_waiters_unsafe(descr);
  883. rrdeng_page_descr_mutex_unlock(ctx, descr);
  884. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  885. return descr;
  886. }
  887. uv_rwlock_rdunlock(&page_index->lock);
  888. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  889. if(unlikely(debug_flags & D_RRDENGINE))
  890. print_page_cache_descr(descr);
  891. if (!(flags & RRD_PAGE_POPULATED))
  892. page_not_in_cache = 1;
  893. pg_cache_wait_event_unsafe(descr);
  894. rrdeng_page_descr_mutex_unlock(ctx, descr);
  895. /* reset scan to find again */
  896. uv_rwlock_rdlock(&page_index->lock);
  897. }
  898. uv_rwlock_rdunlock(&page_index->lock);
  899. if (!(flags & RRD_PAGE_DIRTY))
  900. pg_cache_replaceQ_set_hot(ctx, descr);
  901. pg_cache_release_pages(ctx, 1);
  902. if (page_not_in_cache)
  903. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  904. else
  905. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  906. return descr;
  907. }
  908. /*
  909. * Searches for the first page between start_time and end_time and gets a reference.
  910. * start_time and end_time are inclusive.
  911. * If index is NULL lookup by UUID (id).
  912. */
  913. struct rrdeng_page_descr *
  914. pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  915. usec_t start_time, usec_t end_time)
  916. {
  917. struct page_cache *pg_cache = &ctx->pg_cache;
  918. struct rrdeng_page_descr *descr = NULL;
  919. struct page_cache_descr *pg_cache_descr = NULL;
  920. unsigned long flags;
  921. Pvoid_t *PValue;
  922. struct pg_cache_page_index *page_index = NULL;
  923. uint8_t page_not_in_cache;
  924. if (unlikely(NULL == index)) {
  925. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  926. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  927. if (likely(NULL != PValue)) {
  928. page_index = *PValue;
  929. }
  930. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  931. if (NULL == PValue) {
  932. return NULL;
  933. }
  934. } else {
  935. page_index = index;
  936. }
  937. pg_cache_reserve_pages(ctx, 1);
  938. page_not_in_cache = 0;
  939. uv_rwlock_rdlock(&page_index->lock);
  940. int retry_count = 0;
  941. while (1) {
  942. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  943. if (NULL == descr || 0 == descr->page_length || retry_count == MAX_PAGE_CACHE_RETRY_WAIT) {
  944. /* non-empty page not found */
  945. if (retry_count == MAX_PAGE_CACHE_RETRY_WAIT)
  946. error_report("Page cache timeout while waiting for page %p : returning FAIL", descr);
  947. uv_rwlock_rdunlock(&page_index->lock);
  948. pg_cache_release_pages(ctx, 1);
  949. return NULL;
  950. }
  951. rrdeng_page_descr_mutex_lock(ctx, descr);
  952. pg_cache_descr = descr->pg_cache_descr;
  953. flags = pg_cache_descr->flags;
  954. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  955. /* success */
  956. rrdeng_page_descr_mutex_unlock(ctx, descr);
  957. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  958. break;
  959. }
  960. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  961. struct rrdeng_cmd cmd;
  962. uv_rwlock_rdunlock(&page_index->lock);
  963. cmd.opcode = RRDENG_READ_PAGE;
  964. cmd.read_page.page_cache_descr = descr;
  965. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  966. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  967. if(unlikely(debug_flags & D_RRDENGINE))
  968. print_page_cache_descr(descr);
  969. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  970. pg_cache_wait_event_unsafe(descr);
  971. }
  972. /* success */
  973. /* Downgrade exclusive reference to allow other readers */
  974. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  975. pg_cache_wake_up_waiters_unsafe(descr);
  976. rrdeng_page_descr_mutex_unlock(ctx, descr);
  977. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  978. return descr;
  979. }
  980. uv_rwlock_rdunlock(&page_index->lock);
  981. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  982. if(unlikely(debug_flags & D_RRDENGINE))
  983. print_page_cache_descr(descr);
  984. if (!(flags & RRD_PAGE_POPULATED))
  985. page_not_in_cache = 1;
  986. if (pg_cache_timedwait_event_unsafe(descr, 1) == UV_ETIMEDOUT) {
  987. error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count);
  988. ++retry_count;
  989. }
  990. rrdeng_page_descr_mutex_unlock(ctx, descr);
  991. /* reset scan to find again */
  992. uv_rwlock_rdlock(&page_index->lock);
  993. }
  994. uv_rwlock_rdunlock(&page_index->lock);
  995. if (!(flags & RRD_PAGE_DIRTY))
  996. pg_cache_replaceQ_set_hot(ctx, descr);
  997. pg_cache_release_pages(ctx, 1);
  998. if (page_not_in_cache)
  999. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  1000. else
  1001. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  1002. return descr;
  1003. }
  1004. struct pg_cache_page_index *create_page_index(uuid_t *id)
  1005. {
  1006. struct pg_cache_page_index *page_index;
  1007. page_index = mallocz(sizeof(*page_index));
  1008. page_index->JudyL_array = (Pvoid_t) NULL;
  1009. uuid_copy(page_index->id, *id);
  1010. fatal_assert(0 == uv_rwlock_init(&page_index->lock));
  1011. page_index->oldest_time = INVALID_TIME;
  1012. page_index->latest_time = INVALID_TIME;
  1013. page_index->prev = NULL;
  1014. page_index->page_count = 0;
  1015. page_index->writers = 0;
  1016. return page_index;
  1017. }
  1018. static void init_metrics_index(struct rrdengine_instance *ctx)
  1019. {
  1020. struct page_cache *pg_cache = &ctx->pg_cache;
  1021. pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
  1022. pg_cache->metrics_index.last_page_index = NULL;
  1023. fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
  1024. }
  1025. static void init_replaceQ(struct rrdengine_instance *ctx)
  1026. {
  1027. struct page_cache *pg_cache = &ctx->pg_cache;
  1028. pg_cache->replaceQ.head = NULL;
  1029. pg_cache->replaceQ.tail = NULL;
  1030. fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
  1031. }
  1032. static void init_committed_page_index(struct rrdengine_instance *ctx)
  1033. {
  1034. struct page_cache *pg_cache = &ctx->pg_cache;
  1035. pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
  1036. fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
  1037. pg_cache->committed_page_index.latest_corr_id = 0;
  1038. pg_cache->committed_page_index.nr_committed_pages = 0;
  1039. }
  1040. void init_page_cache(struct rrdengine_instance *ctx)
  1041. {
  1042. struct page_cache *pg_cache = &ctx->pg_cache;
  1043. pg_cache->page_descriptors = 0;
  1044. pg_cache->populated_pages = 0;
  1045. fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
  1046. init_metrics_index(ctx);
  1047. init_replaceQ(ctx);
  1048. init_committed_page_index(ctx);
  1049. }
  1050. void free_page_cache(struct rrdengine_instance *ctx)
  1051. {
  1052. struct page_cache *pg_cache = &ctx->pg_cache;
  1053. Word_t ret_Judy, bytes_freed = 0;
  1054. Pvoid_t *PValue;
  1055. struct pg_cache_page_index *page_index, *prev_page_index;
  1056. Word_t Index;
  1057. struct rrdeng_page_descr *descr;
  1058. struct page_cache_descr *pg_cache_descr;
  1059. /* Free committed page index */
  1060. ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
  1061. fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
  1062. bytes_freed += ret_Judy;
  1063. for (page_index = pg_cache->metrics_index.last_page_index ;
  1064. page_index != NULL ;
  1065. page_index = prev_page_index) {
  1066. prev_page_index = page_index->prev;
  1067. /* Find first page in range */
  1068. Index = (Word_t) 0;
  1069. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  1070. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1071. while (descr != NULL) {
  1072. /* Iterate all page descriptors of this metric */
  1073. if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  1074. /* Check rrdenglocking.c */
  1075. pg_cache_descr = descr->pg_cache_descr;
  1076. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  1077. dbengine_page_free(pg_cache_descr->page);
  1078. bytes_freed += RRDENG_BLOCK_SIZE;
  1079. }
  1080. rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
  1081. bytes_freed += sizeof(*pg_cache_descr);
  1082. }
  1083. freez(descr);
  1084. bytes_freed += sizeof(*descr);
  1085. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
  1086. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1087. }
  1088. /* Free page index */
  1089. ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
  1090. fatal_assert(NULL == page_index->JudyL_array);
  1091. bytes_freed += ret_Judy;
  1092. freez(page_index);
  1093. bytes_freed += sizeof(*page_index);
  1094. }
  1095. /* Free metrics index */
  1096. ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
  1097. fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
  1098. bytes_freed += ret_Judy;
  1099. info("Freed %lu bytes of memory from page cache.", bytes_freed);
  1100. }