pagecache.c 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. /* Forward declerations */
  5. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
  6. /* always inserts into tail */
  7. static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
  8. struct rrdeng_page_descr *descr)
  9. {
  10. struct page_cache *pg_cache = &ctx->pg_cache;
  11. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  12. if (likely(NULL != pg_cache->replaceQ.tail)) {
  13. pg_cache_descr->prev = pg_cache->replaceQ.tail;
  14. pg_cache->replaceQ.tail->next = pg_cache_descr;
  15. }
  16. if (unlikely(NULL == pg_cache->replaceQ.head)) {
  17. pg_cache->replaceQ.head = pg_cache_descr;
  18. }
  19. pg_cache->replaceQ.tail = pg_cache_descr;
  20. }
  21. static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
  22. struct rrdeng_page_descr *descr)
  23. {
  24. struct page_cache *pg_cache = &ctx->pg_cache;
  25. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
  26. prev = pg_cache_descr->prev;
  27. next = pg_cache_descr->next;
  28. if (likely(NULL != prev)) {
  29. prev->next = next;
  30. }
  31. if (likely(NULL != next)) {
  32. next->prev = prev;
  33. }
  34. if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
  35. pg_cache->replaceQ.head = next;
  36. }
  37. if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
  38. pg_cache->replaceQ.tail = prev;
  39. }
  40. pg_cache_descr->prev = pg_cache_descr->next = NULL;
  41. }
  42. void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
  43. struct rrdeng_page_descr *descr)
  44. {
  45. struct page_cache *pg_cache = &ctx->pg_cache;
  46. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  47. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  48. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  49. }
  50. void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
  51. struct rrdeng_page_descr *descr)
  52. {
  53. struct page_cache *pg_cache = &ctx->pg_cache;
  54. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  55. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  56. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  57. }
  58. void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
  59. struct rrdeng_page_descr *descr)
  60. {
  61. struct page_cache *pg_cache = &ctx->pg_cache;
  62. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  63. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  64. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  65. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  66. }
  67. struct rrdeng_page_descr *pg_cache_create_descr(void)
  68. {
  69. struct rrdeng_page_descr *descr;
  70. descr = mallocz(sizeof(*descr));
  71. descr->page_length = 0;
  72. descr->start_time = INVALID_TIME;
  73. descr->end_time = INVALID_TIME;
  74. descr->id = NULL;
  75. descr->extent = NULL;
  76. descr->pg_cache_descr_state = 0;
  77. descr->pg_cache_descr = NULL;
  78. return descr;
  79. }
  80. /* The caller must hold page descriptor lock. */
  81. void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
  82. {
  83. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  84. if (pg_cache_descr->waiters)
  85. uv_cond_broadcast(&pg_cache_descr->cond);
  86. }
  87. void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  88. {
  89. rrdeng_page_descr_mutex_lock(ctx, descr);
  90. pg_cache_wake_up_waiters_unsafe(descr);
  91. rrdeng_page_descr_mutex_unlock(ctx, descr);
  92. }
  93. /*
  94. * The caller must hold page descriptor lock.
  95. * The lock will be released and re-acquired. The descriptor is not guaranteed
  96. * to exist after this function returns.
  97. */
  98. void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
  99. {
  100. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  101. ++pg_cache_descr->waiters;
  102. uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
  103. --pg_cache_descr->waiters;
  104. }
  105. /*
  106. * The caller must hold page descriptor lock.
  107. * The lock will be released and re-acquired. The descriptor is not guaranteed
  108. * to exist after this function returns.
  109. * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
  110. */
  111. int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
  112. {
  113. int ret;
  114. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  115. ++pg_cache_descr->waiters;
  116. ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
  117. --pg_cache_descr->waiters;
  118. return ret;
  119. }
  120. /*
  121. * Returns page flags.
  122. * The lock will be released and re-acquired. The descriptor is not guaranteed
  123. * to exist after this function returns.
  124. */
  125. unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  126. {
  127. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  128. unsigned long flags;
  129. rrdeng_page_descr_mutex_lock(ctx, descr);
  130. pg_cache_wait_event_unsafe(descr);
  131. flags = pg_cache_descr->flags;
  132. rrdeng_page_descr_mutex_unlock(ctx, descr);
  133. return flags;
  134. }
  135. /*
  136. * The caller must hold page descriptor lock.
  137. */
  138. int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  139. {
  140. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  141. if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  142. (exclusive_access && pg_cache_descr->refcnt)) {
  143. return 0;
  144. }
  145. return 1;
  146. }
  147. /*
  148. * The caller must hold page descriptor lock.
  149. * Gets a reference to the page descriptor.
  150. * Returns 1 on success and 0 on failure.
  151. */
  152. int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  153. {
  154. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  155. if (!pg_cache_can_get_unsafe(descr, exclusive_access))
  156. return 0;
  157. if (exclusive_access)
  158. pg_cache_descr->flags |= RRD_PAGE_LOCKED;
  159. ++pg_cache_descr->refcnt;
  160. return 1;
  161. }
  162. /*
  163. * The caller must hold the page descriptor lock.
  164. * This function may block doing cleanup.
  165. */
  166. void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
  167. {
  168. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  169. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  170. if (0 == --pg_cache_descr->refcnt) {
  171. pg_cache_wake_up_waiters_unsafe(descr);
  172. }
  173. }
  174. /*
  175. * This function may block doing cleanup.
  176. */
  177. void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  178. {
  179. rrdeng_page_descr_mutex_lock(ctx, descr);
  180. pg_cache_put_unsafe(descr);
  181. rrdeng_page_descr_mutex_unlock(ctx, descr);
  182. }
  183. /* The caller must hold the page cache lock */
  184. static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
  185. {
  186. struct page_cache *pg_cache = &ctx->pg_cache;
  187. pg_cache->populated_pages -= number;
  188. }
  189. static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
  190. {
  191. struct page_cache *pg_cache = &ctx->pg_cache;
  192. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  193. pg_cache_release_pages_unsafe(ctx, number);
  194. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  195. }
  196. /*
  197. * This function returns the maximum number of pages allowed in the page cache.
  198. */
  199. unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
  200. {
  201. /* it's twice the number of producers since we pin 2 pages per producer */
  202. return ctx->max_cache_pages + 2 * (unsigned long)ctx->metric_API_max_producers;
  203. }
  204. /*
  205. * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
  206. * number of pages below that number.
  207. */
  208. unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
  209. {
  210. /* it's twice the number of producers since we pin 2 pages per producer */
  211. return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->metric_API_max_producers;
  212. }
  213. /*
  214. * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
  215. * cache.
  216. */
  217. unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
  218. {
  219. /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
  220. return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
  221. }
  222. /*
  223. * This function will block until it reserves #number populated pages.
  224. * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
  225. */
  226. static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  227. {
  228. struct page_cache *pg_cache = &ctx->pg_cache;
  229. unsigned failures = 0;
  230. const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
  231. unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
  232. assert(number < ctx->max_cache_pages);
  233. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  234. if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
  235. debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
  236. number);
  237. while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
  238. if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
  239. /* failed to evict */
  240. struct completion compl;
  241. struct rrdeng_cmd cmd;
  242. ++failures;
  243. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  244. init_completion(&compl);
  245. cmd.opcode = RRDENG_FLUSH_PAGES;
  246. cmd.completion = &compl;
  247. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  248. /* wait for some pages to be flushed */
  249. debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
  250. wait_for_completion(&compl);
  251. destroy_completion(&compl);
  252. if (unlikely(failures > 1)) {
  253. unsigned long slots;
  254. /* exponential backoff */
  255. slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
  256. (void)sleep_usec(slots * exp_backoff_slot_usec);
  257. }
  258. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  259. }
  260. }
  261. pg_cache->populated_pages += number;
  262. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  263. }
  264. /*
  265. * This function will attempt to reserve #number populated pages.
  266. * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
  267. * Returns 0 on failure and 1 on success.
  268. */
  269. static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  270. {
  271. struct page_cache *pg_cache = &ctx->pg_cache;
  272. unsigned count = 0;
  273. int ret = 0;
  274. assert(number < ctx->max_cache_pages);
  275. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  276. if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
  277. debug(D_RRDENGINE,
  278. "==Page cache full. Trying to reserve %u pages.==",
  279. number);
  280. do {
  281. if (!pg_cache_try_evict_one_page_unsafe(ctx))
  282. break;
  283. ++count;
  284. } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
  285. debug(D_RRDENGINE, "Evicted %u pages.", count);
  286. }
  287. if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
  288. pg_cache->populated_pages += number;
  289. ret = 1; /* success */
  290. }
  291. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  292. return ret;
  293. }
  294. /* The caller must hold the page cache and the page descriptor locks in that order */
  295. static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  296. {
  297. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  298. freez(pg_cache_descr->page);
  299. pg_cache_descr->page = NULL;
  300. pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
  301. pg_cache_release_pages_unsafe(ctx, 1);
  302. ++ctx->stats.pg_cache_evictions;
  303. }
  304. /*
  305. * The caller must hold the page cache lock.
  306. * Lock order: page cache -> replaceQ -> page descriptor
  307. * This function iterates all pages and tries to evict one.
  308. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
  309. * or it sets it to NULL if no write-back is in progress.
  310. *
  311. * Returns 1 on success and 0 on failure.
  312. */
  313. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
  314. {
  315. struct page_cache *pg_cache = &ctx->pg_cache;
  316. unsigned long old_flags;
  317. struct rrdeng_page_descr *descr;
  318. struct page_cache_descr *pg_cache_descr = NULL;
  319. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  320. for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
  321. descr = pg_cache_descr->descr;
  322. rrdeng_page_descr_mutex_lock(ctx, descr);
  323. old_flags = pg_cache_descr->flags;
  324. if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
  325. /* must evict */
  326. pg_cache_evict_unsafe(ctx, descr);
  327. pg_cache_put_unsafe(descr);
  328. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  329. rrdeng_page_descr_mutex_unlock(ctx, descr);
  330. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  331. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  332. return 1;
  333. }
  334. rrdeng_page_descr_mutex_unlock(ctx, descr);
  335. }
  336. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  337. /* failed to evict */
  338. return 0;
  339. }
  340. /**
  341. * Deletes a page from the database.
  342. * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
  343. * @param ctx is the database instance.
  344. * @param descr is the page descriptor.
  345. * @param remove_dirty must be non-zero if the page to be deleted is dirty.
  346. * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
  347. * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
  348. * NULL. Otherwise, metric_id is not set.
  349. * @return 1 if it's safe to delete the metric, 0 otherwise.
  350. */
  351. uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
  352. uint8_t is_exclusive_holder, uuid_t *metric_id)
  353. {
  354. struct page_cache *pg_cache = &ctx->pg_cache;
  355. struct page_cache_descr *pg_cache_descr = NULL;
  356. Pvoid_t *PValue;
  357. struct pg_cache_page_index *page_index = NULL;
  358. int ret;
  359. uint8_t can_delete_metric = 0;
  360. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  361. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  362. fatal_assert(NULL != PValue);
  363. page_index = *PValue;
  364. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  365. uv_rwlock_wrlock(&page_index->lock);
  366. ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  367. if (unlikely(0 == ret)) {
  368. uv_rwlock_wrunlock(&page_index->lock);
  369. error("Page under deletion was not in index.");
  370. if (unlikely(debug_flags & D_RRDENGINE)) {
  371. print_page_descr(descr);
  372. }
  373. goto destroy;
  374. }
  375. --page_index->page_count;
  376. if (!page_index->writers && !page_index->page_count) {
  377. can_delete_metric = 1;
  378. if (metric_id) {
  379. memcpy(metric_id, page_index->id, sizeof(uuid_t));
  380. }
  381. }
  382. uv_rwlock_wrunlock(&page_index->lock);
  383. fatal_assert(1 == ret);
  384. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  385. ++ctx->stats.pg_cache_deletions;
  386. --pg_cache->page_descriptors;
  387. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  388. rrdeng_page_descr_mutex_lock(ctx, descr);
  389. pg_cache_descr = descr->pg_cache_descr;
  390. if (!is_exclusive_holder) {
  391. /* If we don't hold an exclusive page reference get one */
  392. while (!pg_cache_try_get_unsafe(descr, 1)) {
  393. debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
  394. if (unlikely(debug_flags & D_RRDENGINE))
  395. print_page_cache_descr(descr);
  396. pg_cache_wait_event_unsafe(descr);
  397. }
  398. }
  399. if (remove_dirty) {
  400. pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
  401. } else {
  402. /* even a locked page could be dirty */
  403. while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
  404. debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
  405. if (unlikely(debug_flags & D_RRDENGINE))
  406. print_page_cache_descr(descr);
  407. pg_cache_wait_event_unsafe(descr);
  408. }
  409. }
  410. rrdeng_page_descr_mutex_unlock(ctx, descr);
  411. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  412. /* only after locking can it be safely deleted from LRU */
  413. pg_cache_replaceQ_delete(ctx, descr);
  414. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  415. pg_cache_evict_unsafe(ctx, descr);
  416. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  417. }
  418. pg_cache_put(ctx, descr);
  419. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  420. while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  421. rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
  422. (void)sleep_usec(1000); /* 1 msec */
  423. }
  424. destroy:
  425. freez(descr);
  426. pg_cache_update_metric_times(page_index);
  427. return can_delete_metric;
  428. }
  429. static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
  430. {
  431. usec_t pg_start, pg_end;
  432. pg_start = descr->start_time;
  433. pg_end = descr->end_time;
  434. return (pg_start < start_time && pg_end >= start_time) ||
  435. (pg_start >= start_time && pg_start <= end_time);
  436. }
  437. static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
  438. {
  439. return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
  440. }
  441. /* The caller must hold the page index lock */
  442. static inline struct rrdeng_page_descr *
  443. find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
  444. {
  445. struct rrdeng_page_descr *descr = NULL;
  446. Pvoid_t *PValue;
  447. Word_t Index;
  448. Index = (Word_t)(start_time / USEC_PER_SEC);
  449. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  450. if (likely(NULL != PValue)) {
  451. descr = *PValue;
  452. if (is_page_in_time_range(descr, start_time, end_time)) {
  453. return descr;
  454. }
  455. }
  456. Index = (Word_t)(start_time / USEC_PER_SEC);
  457. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  458. if (likely(NULL != PValue)) {
  459. descr = *PValue;
  460. if (is_page_in_time_range(descr, start_time, end_time)) {
  461. return descr;
  462. }
  463. }
  464. return NULL;
  465. }
  466. /* Update metric oldest and latest timestamps efficiently when adding new values */
  467. void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
  468. {
  469. usec_t oldest_time = page_index->oldest_time;
  470. usec_t latest_time = page_index->latest_time;
  471. if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
  472. page_index->oldest_time = descr->start_time;
  473. }
  474. if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
  475. page_index->latest_time = descr->end_time;
  476. }
  477. }
  478. /* Update metric oldest and latest timestamps when removing old values */
  479. void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
  480. {
  481. Pvoid_t *firstPValue, *lastPValue;
  482. Word_t firstIndex, lastIndex;
  483. struct rrdeng_page_descr *descr;
  484. usec_t oldest_time = INVALID_TIME;
  485. usec_t latest_time = INVALID_TIME;
  486. uv_rwlock_rdlock(&page_index->lock);
  487. /* Find first page in range */
  488. firstIndex = (Word_t)0;
  489. firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
  490. if (likely(NULL != firstPValue)) {
  491. descr = *firstPValue;
  492. oldest_time = descr->start_time;
  493. }
  494. lastIndex = (Word_t)-1;
  495. lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
  496. if (likely(NULL != lastPValue)) {
  497. descr = *lastPValue;
  498. latest_time = descr->end_time;
  499. }
  500. uv_rwlock_rdunlock(&page_index->lock);
  501. if (unlikely(NULL == firstPValue)) {
  502. fatal_assert(NULL == lastPValue);
  503. page_index->oldest_time = page_index->latest_time = INVALID_TIME;
  504. return;
  505. }
  506. page_index->oldest_time = oldest_time;
  507. page_index->latest_time = latest_time;
  508. }
  509. /* If index is NULL lookup by UUID (descr->id) */
  510. void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
  511. struct rrdeng_page_descr *descr)
  512. {
  513. struct page_cache *pg_cache = &ctx->pg_cache;
  514. Pvoid_t *PValue;
  515. struct pg_cache_page_index *page_index;
  516. unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
  517. if (0 != pg_cache_descr_state) {
  518. /* there is page cache descriptor pre-allocated state */
  519. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  520. fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
  521. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  522. pg_cache_reserve_pages(ctx, 1);
  523. if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
  524. pg_cache_replaceQ_insert(ctx, descr);
  525. }
  526. }
  527. if (unlikely(NULL == index)) {
  528. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  529. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  530. fatal_assert(NULL != PValue);
  531. page_index = *PValue;
  532. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  533. } else {
  534. page_index = index;
  535. }
  536. uv_rwlock_wrlock(&page_index->lock);
  537. PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  538. *PValue = descr;
  539. ++page_index->page_count;
  540. pg_cache_add_new_metric_time(page_index, descr);
  541. uv_rwlock_wrunlock(&page_index->lock);
  542. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  543. ++ctx->stats.pg_cache_insertions;
  544. ++pg_cache->page_descriptors;
  545. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  546. }
  547. usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
  548. {
  549. struct page_cache *pg_cache = &ctx->pg_cache;
  550. struct rrdeng_page_descr *descr = NULL;
  551. Pvoid_t *PValue;
  552. struct pg_cache_page_index *page_index = NULL;
  553. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  554. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  555. if (likely(NULL != PValue)) {
  556. page_index = *PValue;
  557. }
  558. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  559. if (NULL == PValue) {
  560. return INVALID_TIME;
  561. }
  562. uv_rwlock_rdlock(&page_index->lock);
  563. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  564. if (NULL == descr) {
  565. uv_rwlock_rdunlock(&page_index->lock);
  566. return INVALID_TIME;
  567. }
  568. uv_rwlock_rdunlock(&page_index->lock);
  569. return descr->start_time;
  570. }
  571. /**
  572. * Return page information for the first page before point_in_time that satisfies the filter.
  573. * @param ctx DB context
  574. * @param page_index page index of a metric
  575. * @param point_in_time the pages that are searched must be older than this timestamp
  576. * @param filter decides if the page satisfies the caller's criteria
  577. * @param page_info the result of the search is set in this pointer
  578. */
  579. void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
  580. usec_t point_in_time, pg_cache_page_info_filter_t *filter,
  581. struct rrdeng_page_info *page_info)
  582. {
  583. struct page_cache *pg_cache = &ctx->pg_cache;
  584. struct rrdeng_page_descr *descr = NULL;
  585. Pvoid_t *PValue;
  586. Word_t Index;
  587. (void)pg_cache;
  588. fatal_assert(NULL != page_index);
  589. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  590. uv_rwlock_rdlock(&page_index->lock);
  591. do {
  592. PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
  593. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  594. } while (descr != NULL && !filter(descr));
  595. if (unlikely(NULL == descr)) {
  596. page_info->page_length = 0;
  597. page_info->start_time = INVALID_TIME;
  598. page_info->end_time = INVALID_TIME;
  599. } else {
  600. page_info->page_length = descr->page_length;
  601. page_info->start_time = descr->start_time;
  602. page_info->end_time = descr->end_time;
  603. }
  604. uv_rwlock_rdunlock(&page_index->lock);
  605. }
  606. /**
  607. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  608. * Does not get a reference.
  609. * @param ctx DB context
  610. * @param id UUID
  611. * @param start_time inclusive starting time in usec
  612. * @param end_time inclusive ending time in usec
  613. * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
  614. * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
  615. * If page_info_arrayp is set to NULL nothing was allocated.
  616. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  617. * @return the number of pages that overlap with the time range [start_time,end_time].
  618. */
  619. unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
  620. struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
  621. {
  622. struct page_cache *pg_cache = &ctx->pg_cache;
  623. struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
  624. struct page_cache_descr *pg_cache_descr = NULL;
  625. unsigned i, j, k, preload_count, count, page_info_array_max_size;
  626. unsigned long flags;
  627. Pvoid_t *PValue;
  628. struct pg_cache_page_index *page_index = NULL;
  629. Word_t Index;
  630. uint8_t failed_to_reserve;
  631. fatal_assert(NULL != ret_page_indexp);
  632. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  633. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  634. if (likely(NULL != PValue)) {
  635. *ret_page_indexp = page_index = *PValue;
  636. }
  637. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  638. if (NULL == PValue) {
  639. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  640. *ret_page_indexp = NULL;
  641. return 0;
  642. }
  643. uv_rwlock_rdlock(&page_index->lock);
  644. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  645. if (NULL == descr) {
  646. uv_rwlock_rdunlock(&page_index->lock);
  647. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  648. *ret_page_indexp = NULL;
  649. return 0;
  650. } else {
  651. Index = (Word_t)(descr->start_time / USEC_PER_SEC);
  652. }
  653. if (page_info_arrayp) {
  654. page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  655. *page_info_arrayp = mallocz(page_info_array_max_size);
  656. }
  657. for (count = 0, preload_count = 0 ;
  658. descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
  659. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
  660. descr = unlikely(NULL == PValue) ? NULL : *PValue) {
  661. /* Iterate all pages in range */
  662. if (unlikely(0 == descr->page_length))
  663. continue;
  664. if (page_info_arrayp) {
  665. if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
  666. page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  667. *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
  668. }
  669. (*page_info_arrayp)[count].start_time = descr->start_time;
  670. (*page_info_arrayp)[count].end_time = descr->end_time;
  671. (*page_info_arrayp)[count].page_length = descr->page_length;
  672. }
  673. ++count;
  674. rrdeng_page_descr_mutex_lock(ctx, descr);
  675. pg_cache_descr = descr->pg_cache_descr;
  676. flags = pg_cache_descr->flags;
  677. if (pg_cache_can_get_unsafe(descr, 0)) {
  678. if (flags & RRD_PAGE_POPULATED) {
  679. /* success */
  680. rrdeng_page_descr_mutex_unlock(ctx, descr);
  681. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  682. continue;
  683. }
  684. }
  685. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  686. preload_array[preload_count++] = descr;
  687. if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
  688. rrdeng_page_descr_mutex_unlock(ctx, descr);
  689. break;
  690. }
  691. }
  692. rrdeng_page_descr_mutex_unlock(ctx, descr);
  693. }
  694. uv_rwlock_rdunlock(&page_index->lock);
  695. failed_to_reserve = 0;
  696. for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
  697. struct rrdeng_cmd cmd;
  698. struct rrdeng_page_descr *next;
  699. descr = preload_array[i];
  700. if (NULL == descr) {
  701. continue;
  702. }
  703. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  704. failed_to_reserve = 1;
  705. break;
  706. }
  707. cmd.opcode = RRDENG_READ_EXTENT;
  708. cmd.read_extent.page_cache_descr[0] = descr;
  709. /* don't use this page again */
  710. preload_array[i] = NULL;
  711. for (j = 0, k = 1 ; j < preload_count ; ++j) {
  712. next = preload_array[j];
  713. if (NULL == next) {
  714. continue;
  715. }
  716. if (descr->extent == next->extent) {
  717. /* same extent, consolidate */
  718. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  719. failed_to_reserve = 1;
  720. break;
  721. }
  722. cmd.read_extent.page_cache_descr[k++] = next;
  723. /* don't use this page again */
  724. preload_array[j] = NULL;
  725. }
  726. }
  727. cmd.read_extent.page_count = k;
  728. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  729. }
  730. if (failed_to_reserve) {
  731. debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
  732. for (i = 0 ; i < preload_count ; ++i) {
  733. descr = preload_array[i];
  734. if (NULL == descr) {
  735. continue;
  736. }
  737. pg_cache_put(ctx, descr);
  738. }
  739. }
  740. if (!preload_count) {
  741. /* no such page */
  742. debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
  743. }
  744. if (unlikely(0 == count && page_info_arrayp)) {
  745. freez(*page_info_arrayp);
  746. *page_info_arrayp = NULL;
  747. }
  748. return count;
  749. }
  750. /*
  751. * Searches for a page and gets a reference.
  752. * When point_in_time is INVALID_TIME get any page.
  753. * If index is NULL lookup by UUID (id).
  754. */
  755. struct rrdeng_page_descr *
  756. pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  757. usec_t point_in_time)
  758. {
  759. struct page_cache *pg_cache = &ctx->pg_cache;
  760. struct rrdeng_page_descr *descr = NULL;
  761. struct page_cache_descr *pg_cache_descr = NULL;
  762. unsigned long flags;
  763. Pvoid_t *PValue;
  764. struct pg_cache_page_index *page_index = NULL;
  765. Word_t Index;
  766. uint8_t page_not_in_cache;
  767. if (unlikely(NULL == index)) {
  768. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  769. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  770. if (likely(NULL != PValue)) {
  771. page_index = *PValue;
  772. }
  773. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  774. if (NULL == PValue) {
  775. return NULL;
  776. }
  777. } else {
  778. page_index = index;
  779. }
  780. pg_cache_reserve_pages(ctx, 1);
  781. page_not_in_cache = 0;
  782. uv_rwlock_rdlock(&page_index->lock);
  783. while (1) {
  784. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  785. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  786. if (likely(NULL != PValue)) {
  787. descr = *PValue;
  788. }
  789. if (NULL == PValue ||
  790. 0 == descr->page_length ||
  791. (INVALID_TIME != point_in_time &&
  792. !is_point_in_time_in_page(descr, point_in_time))) {
  793. /* non-empty page not found */
  794. uv_rwlock_rdunlock(&page_index->lock);
  795. pg_cache_release_pages(ctx, 1);
  796. return NULL;
  797. }
  798. rrdeng_page_descr_mutex_lock(ctx, descr);
  799. pg_cache_descr = descr->pg_cache_descr;
  800. flags = pg_cache_descr->flags;
  801. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  802. /* success */
  803. rrdeng_page_descr_mutex_unlock(ctx, descr);
  804. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  805. break;
  806. }
  807. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  808. struct rrdeng_cmd cmd;
  809. uv_rwlock_rdunlock(&page_index->lock);
  810. cmd.opcode = RRDENG_READ_PAGE;
  811. cmd.read_page.page_cache_descr = descr;
  812. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  813. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  814. if(unlikely(debug_flags & D_RRDENGINE))
  815. print_page_cache_descr(descr);
  816. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  817. pg_cache_wait_event_unsafe(descr);
  818. }
  819. /* success */
  820. /* Downgrade exclusive reference to allow other readers */
  821. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  822. pg_cache_wake_up_waiters_unsafe(descr);
  823. rrdeng_page_descr_mutex_unlock(ctx, descr);
  824. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  825. return descr;
  826. }
  827. uv_rwlock_rdunlock(&page_index->lock);
  828. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  829. if(unlikely(debug_flags & D_RRDENGINE))
  830. print_page_cache_descr(descr);
  831. if (!(flags & RRD_PAGE_POPULATED))
  832. page_not_in_cache = 1;
  833. pg_cache_wait_event_unsafe(descr);
  834. rrdeng_page_descr_mutex_unlock(ctx, descr);
  835. /* reset scan to find again */
  836. uv_rwlock_rdlock(&page_index->lock);
  837. }
  838. uv_rwlock_rdunlock(&page_index->lock);
  839. if (!(flags & RRD_PAGE_DIRTY))
  840. pg_cache_replaceQ_set_hot(ctx, descr);
  841. pg_cache_release_pages(ctx, 1);
  842. if (page_not_in_cache)
  843. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  844. else
  845. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  846. return descr;
  847. }
  848. /*
  849. * Searches for the first page between start_time and end_time and gets a reference.
  850. * start_time and end_time are inclusive.
  851. * If index is NULL lookup by UUID (id).
  852. */
  853. struct rrdeng_page_descr *
  854. pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  855. usec_t start_time, usec_t end_time)
  856. {
  857. struct page_cache *pg_cache = &ctx->pg_cache;
  858. struct rrdeng_page_descr *descr = NULL;
  859. struct page_cache_descr *pg_cache_descr = NULL;
  860. unsigned long flags;
  861. Pvoid_t *PValue;
  862. struct pg_cache_page_index *page_index = NULL;
  863. uint8_t page_not_in_cache;
  864. if (unlikely(NULL == index)) {
  865. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  866. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  867. if (likely(NULL != PValue)) {
  868. page_index = *PValue;
  869. }
  870. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  871. if (NULL == PValue) {
  872. return NULL;
  873. }
  874. } else {
  875. page_index = index;
  876. }
  877. pg_cache_reserve_pages(ctx, 1);
  878. page_not_in_cache = 0;
  879. uv_rwlock_rdlock(&page_index->lock);
  880. while (1) {
  881. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  882. if (NULL == descr || 0 == descr->page_length) {
  883. /* non-empty page not found */
  884. uv_rwlock_rdunlock(&page_index->lock);
  885. pg_cache_release_pages(ctx, 1);
  886. return NULL;
  887. }
  888. rrdeng_page_descr_mutex_lock(ctx, descr);
  889. pg_cache_descr = descr->pg_cache_descr;
  890. flags = pg_cache_descr->flags;
  891. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  892. /* success */
  893. rrdeng_page_descr_mutex_unlock(ctx, descr);
  894. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  895. break;
  896. }
  897. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  898. struct rrdeng_cmd cmd;
  899. uv_rwlock_rdunlock(&page_index->lock);
  900. cmd.opcode = RRDENG_READ_PAGE;
  901. cmd.read_page.page_cache_descr = descr;
  902. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  903. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  904. if(unlikely(debug_flags & D_RRDENGINE))
  905. print_page_cache_descr(descr);
  906. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  907. pg_cache_wait_event_unsafe(descr);
  908. }
  909. /* success */
  910. /* Downgrade exclusive reference to allow other readers */
  911. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  912. pg_cache_wake_up_waiters_unsafe(descr);
  913. rrdeng_page_descr_mutex_unlock(ctx, descr);
  914. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  915. return descr;
  916. }
  917. uv_rwlock_rdunlock(&page_index->lock);
  918. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  919. if(unlikely(debug_flags & D_RRDENGINE))
  920. print_page_cache_descr(descr);
  921. if (!(flags & RRD_PAGE_POPULATED))
  922. page_not_in_cache = 1;
  923. pg_cache_wait_event_unsafe(descr);
  924. rrdeng_page_descr_mutex_unlock(ctx, descr);
  925. /* reset scan to find again */
  926. uv_rwlock_rdlock(&page_index->lock);
  927. }
  928. uv_rwlock_rdunlock(&page_index->lock);
  929. if (!(flags & RRD_PAGE_DIRTY))
  930. pg_cache_replaceQ_set_hot(ctx, descr);
  931. pg_cache_release_pages(ctx, 1);
  932. if (page_not_in_cache)
  933. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  934. else
  935. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  936. return descr;
  937. }
  938. struct pg_cache_page_index *create_page_index(uuid_t *id)
  939. {
  940. struct pg_cache_page_index *page_index;
  941. page_index = mallocz(sizeof(*page_index));
  942. page_index->JudyL_array = (Pvoid_t) NULL;
  943. uuid_copy(page_index->id, *id);
  944. fatal_assert(0 == uv_rwlock_init(&page_index->lock));
  945. page_index->oldest_time = INVALID_TIME;
  946. page_index->latest_time = INVALID_TIME;
  947. page_index->prev = NULL;
  948. page_index->page_count = 0;
  949. page_index->writers = 0;
  950. return page_index;
  951. }
  952. static void init_metrics_index(struct rrdengine_instance *ctx)
  953. {
  954. struct page_cache *pg_cache = &ctx->pg_cache;
  955. pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
  956. pg_cache->metrics_index.last_page_index = NULL;
  957. fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
  958. }
  959. static void init_replaceQ(struct rrdengine_instance *ctx)
  960. {
  961. struct page_cache *pg_cache = &ctx->pg_cache;
  962. pg_cache->replaceQ.head = NULL;
  963. pg_cache->replaceQ.tail = NULL;
  964. fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
  965. }
  966. static void init_committed_page_index(struct rrdengine_instance *ctx)
  967. {
  968. struct page_cache *pg_cache = &ctx->pg_cache;
  969. pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
  970. fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
  971. pg_cache->committed_page_index.latest_corr_id = 0;
  972. pg_cache->committed_page_index.nr_committed_pages = 0;
  973. }
  974. void init_page_cache(struct rrdengine_instance *ctx)
  975. {
  976. struct page_cache *pg_cache = &ctx->pg_cache;
  977. pg_cache->page_descriptors = 0;
  978. pg_cache->populated_pages = 0;
  979. fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
  980. init_metrics_index(ctx);
  981. init_replaceQ(ctx);
  982. init_committed_page_index(ctx);
  983. }
  984. void free_page_cache(struct rrdengine_instance *ctx)
  985. {
  986. struct page_cache *pg_cache = &ctx->pg_cache;
  987. Word_t ret_Judy, bytes_freed = 0;
  988. Pvoid_t *PValue;
  989. struct pg_cache_page_index *page_index, *prev_page_index;
  990. Word_t Index;
  991. struct rrdeng_page_descr *descr;
  992. struct page_cache_descr *pg_cache_descr;
  993. /* Free committed page index */
  994. ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
  995. fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
  996. bytes_freed += ret_Judy;
  997. for (page_index = pg_cache->metrics_index.last_page_index ;
  998. page_index != NULL ;
  999. page_index = prev_page_index) {
  1000. prev_page_index = page_index->prev;
  1001. /* Find first page in range */
  1002. Index = (Word_t) 0;
  1003. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  1004. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1005. while (descr != NULL) {
  1006. /* Iterate all page descriptors of this metric */
  1007. if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  1008. /* Check rrdenglocking.c */
  1009. pg_cache_descr = descr->pg_cache_descr;
  1010. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  1011. freez(pg_cache_descr->page);
  1012. bytes_freed += RRDENG_BLOCK_SIZE;
  1013. }
  1014. rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
  1015. bytes_freed += sizeof(*pg_cache_descr);
  1016. }
  1017. freez(descr);
  1018. bytes_freed += sizeof(*descr);
  1019. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
  1020. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1021. }
  1022. /* Free page index */
  1023. ret_Judy = JudyLFreeArray(&page_index->JudyL_array, PJE0);
  1024. fatal_assert(NULL == page_index->JudyL_array);
  1025. bytes_freed += ret_Judy;
  1026. freez(page_index);
  1027. bytes_freed += sizeof(*page_index);
  1028. }
  1029. /* Free metrics index */
  1030. ret_Judy = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
  1031. fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
  1032. bytes_freed += ret_Judy;
  1033. info("Freed %lu bytes of memory from page cache.", bytes_freed);
  1034. }