pagecache.c 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. /* Forward declarations */
  5. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
  6. /* always inserts into tail */
  7. static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
  8. struct rrdeng_page_descr *descr)
  9. {
  10. struct page_cache *pg_cache = &ctx->pg_cache;
  11. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  12. if (likely(NULL != pg_cache->replaceQ.tail)) {
  13. pg_cache_descr->prev = pg_cache->replaceQ.tail;
  14. pg_cache->replaceQ.tail->next = pg_cache_descr;
  15. }
  16. if (unlikely(NULL == pg_cache->replaceQ.head)) {
  17. pg_cache->replaceQ.head = pg_cache_descr;
  18. }
  19. pg_cache->replaceQ.tail = pg_cache_descr;
  20. }
  21. static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
  22. struct rrdeng_page_descr *descr)
  23. {
  24. struct page_cache *pg_cache = &ctx->pg_cache;
  25. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
  26. prev = pg_cache_descr->prev;
  27. next = pg_cache_descr->next;
  28. if (likely(NULL != prev)) {
  29. prev->next = next;
  30. }
  31. if (likely(NULL != next)) {
  32. next->prev = prev;
  33. }
  34. if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
  35. pg_cache->replaceQ.head = next;
  36. }
  37. if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
  38. pg_cache->replaceQ.tail = prev;
  39. }
  40. pg_cache_descr->prev = pg_cache_descr->next = NULL;
  41. }
  42. void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
  43. struct rrdeng_page_descr *descr)
  44. {
  45. struct page_cache *pg_cache = &ctx->pg_cache;
  46. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  47. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  48. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  49. }
  50. void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
  51. struct rrdeng_page_descr *descr)
  52. {
  53. struct page_cache *pg_cache = &ctx->pg_cache;
  54. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  55. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  56. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  57. }
  58. void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
  59. struct rrdeng_page_descr *descr)
  60. {
  61. struct page_cache *pg_cache = &ctx->pg_cache;
  62. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  63. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  64. pg_cache_replaceQ_insert_unsafe(ctx, descr);
  65. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  66. }
  67. struct rrdeng_page_descr *pg_cache_create_descr(void)
  68. {
  69. struct rrdeng_page_descr *descr;
  70. descr = mallocz(sizeof(*descr));
  71. descr->page_length = 0;
  72. descr->start_time = INVALID_TIME;
  73. descr->end_time = INVALID_TIME;
  74. descr->id = NULL;
  75. descr->extent = NULL;
  76. descr->pg_cache_descr_state = 0;
  77. descr->pg_cache_descr = NULL;
  78. return descr;
  79. }
  80. /* The caller must hold page descriptor lock. */
  81. void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
  82. {
  83. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  84. if (pg_cache_descr->waiters)
  85. uv_cond_broadcast(&pg_cache_descr->cond);
  86. }
  87. void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  88. {
  89. rrdeng_page_descr_mutex_lock(ctx, descr);
  90. pg_cache_wake_up_waiters_unsafe(descr);
  91. rrdeng_page_descr_mutex_unlock(ctx, descr);
  92. }
  93. /*
  94. * The caller must hold page descriptor lock.
  95. * The lock will be released and re-acquired. The descriptor is not guaranteed
  96. * to exist after this function returns.
  97. */
  98. void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
  99. {
  100. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  101. ++pg_cache_descr->waiters;
  102. uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
  103. --pg_cache_descr->waiters;
  104. }
  105. /*
  106. * The caller must hold page descriptor lock.
  107. * The lock will be released and re-acquired. The descriptor is not guaranteed
  108. * to exist after this function returns.
  109. * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
  110. */
  111. int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
  112. {
  113. int ret;
  114. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  115. ++pg_cache_descr->waiters;
  116. ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
  117. --pg_cache_descr->waiters;
  118. return ret;
  119. }
  120. /*
  121. * Returns page flags.
  122. * The lock will be released and re-acquired. The descriptor is not guaranteed
  123. * to exist after this function returns.
  124. */
  125. unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  126. {
  127. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  128. unsigned long flags;
  129. rrdeng_page_descr_mutex_lock(ctx, descr);
  130. pg_cache_wait_event_unsafe(descr);
  131. flags = pg_cache_descr->flags;
  132. rrdeng_page_descr_mutex_unlock(ctx, descr);
  133. return flags;
  134. }
  135. /*
  136. * The caller must hold page descriptor lock.
  137. */
  138. int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  139. {
  140. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  141. if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
  142. (exclusive_access && pg_cache_descr->refcnt)) {
  143. return 0;
  144. }
  145. return 1;
  146. }
  147. /*
  148. * The caller must hold page descriptor lock.
  149. * Gets a reference to the page descriptor.
  150. * Returns 1 on success and 0 on failure.
  151. */
  152. int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
  153. {
  154. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  155. if (!pg_cache_can_get_unsafe(descr, exclusive_access))
  156. return 0;
  157. if (exclusive_access)
  158. pg_cache_descr->flags |= RRD_PAGE_LOCKED;
  159. ++pg_cache_descr->refcnt;
  160. return 1;
  161. }
  162. /*
  163. * The caller must hold the page descriptor lock.
  164. * This function may block doing cleanup.
  165. */
  166. void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
  167. {
  168. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  169. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  170. if (0 == --pg_cache_descr->refcnt) {
  171. pg_cache_wake_up_waiters_unsafe(descr);
  172. }
  173. }
  174. /*
  175. * This function may block doing cleanup.
  176. */
  177. void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  178. {
  179. rrdeng_page_descr_mutex_lock(ctx, descr);
  180. pg_cache_put_unsafe(descr);
  181. rrdeng_page_descr_mutex_unlock(ctx, descr);
  182. }
  183. /* The caller must hold the page cache lock */
  184. static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
  185. {
  186. struct page_cache *pg_cache = &ctx->pg_cache;
  187. pg_cache->populated_pages -= number;
  188. }
  189. static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
  190. {
  191. struct page_cache *pg_cache = &ctx->pg_cache;
  192. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  193. pg_cache_release_pages_unsafe(ctx, number);
  194. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  195. }
  196. /*
  197. * This function returns the maximum number of pages allowed in the page cache.
  198. */
  199. unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
  200. {
  201. return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers;
  202. }
  203. /*
  204. * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
  205. * number of pages below that number.
  206. */
  207. unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
  208. {
  209. return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
  210. }
  211. /*
  212. * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
  213. * cache.
  214. */
  215. unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
  216. {
  217. /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
  218. return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
  219. }
  220. /*
  221. * This function will block until it reserves #number populated pages.
  222. * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
  223. */
  224. static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  225. {
  226. struct page_cache *pg_cache = &ctx->pg_cache;
  227. unsigned failures = 0;
  228. const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
  229. unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
  230. assert(number < ctx->max_cache_pages);
  231. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  232. if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
  233. debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
  234. number);
  235. while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
  236. if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
  237. /* failed to evict */
  238. struct completion compl;
  239. struct rrdeng_cmd cmd;
  240. ++failures;
  241. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  242. completion_init(&compl);
  243. cmd.opcode = RRDENG_FLUSH_PAGES;
  244. cmd.completion = &compl;
  245. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  246. /* wait for some pages to be flushed */
  247. debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
  248. completion_wait_for(&compl);
  249. completion_destroy(&compl);
  250. if (unlikely(failures > 1)) {
  251. unsigned long slots, usecs_to_sleep;
  252. /* exponential backoff */
  253. slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
  254. usecs_to_sleep = slots * exp_backoff_slot_usec;
  255. if (usecs_to_sleep >= USEC_PER_SEC)
  256. error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC);
  257. (void)sleep_usec(usecs_to_sleep);
  258. }
  259. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  260. }
  261. }
  262. pg_cache->populated_pages += number;
  263. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  264. }
  265. /*
  266. * This function will attempt to reserve #number populated pages.
  267. * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
  268. * Returns 0 on failure and 1 on success.
  269. */
  270. static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
  271. {
  272. struct page_cache *pg_cache = &ctx->pg_cache;
  273. unsigned count = 0;
  274. int ret = 0;
  275. assert(number < ctx->max_cache_pages);
  276. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  277. if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
  278. debug(D_RRDENGINE,
  279. "==Page cache full. Trying to reserve %u pages.==",
  280. number);
  281. do {
  282. if (!pg_cache_try_evict_one_page_unsafe(ctx))
  283. break;
  284. ++count;
  285. } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
  286. debug(D_RRDENGINE, "Evicted %u pages.", count);
  287. }
  288. if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
  289. pg_cache->populated_pages += number;
  290. ret = 1; /* success */
  291. }
  292. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  293. return ret;
  294. }
  295. /* The caller must hold the page cache and the page descriptor locks in that order */
  296. static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
  297. {
  298. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  299. dbengine_page_free(pg_cache_descr->page);
  300. pg_cache_descr->page = NULL;
  301. pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
  302. pg_cache_release_pages_unsafe(ctx, 1);
  303. ++ctx->stats.pg_cache_evictions;
  304. }
  305. /*
  306. * The caller must hold the page cache lock.
  307. * Lock order: page cache -> replaceQ -> page descriptor
  308. * This function iterates all pages and tries to evict one.
  309. * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
  310. * or it sets it to NULL if no write-back is in progress.
  311. *
  312. * Returns 1 on success and 0 on failure.
  313. */
  314. static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
  315. {
  316. struct page_cache *pg_cache = &ctx->pg_cache;
  317. unsigned long old_flags;
  318. struct rrdeng_page_descr *descr;
  319. struct page_cache_descr *pg_cache_descr = NULL;
  320. uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
  321. for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
  322. descr = pg_cache_descr->descr;
  323. rrdeng_page_descr_mutex_lock(ctx, descr);
  324. old_flags = pg_cache_descr->flags;
  325. if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
  326. /* must evict */
  327. pg_cache_evict_unsafe(ctx, descr);
  328. pg_cache_put_unsafe(descr);
  329. pg_cache_replaceQ_delete_unsafe(ctx, descr);
  330. rrdeng_page_descr_mutex_unlock(ctx, descr);
  331. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  332. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  333. return 1;
  334. }
  335. rrdeng_page_descr_mutex_unlock(ctx, descr);
  336. }
  337. uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
  338. /* failed to evict */
  339. return 0;
  340. }
  341. /**
  342. * Deletes a page from the database.
  343. * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
  344. * @param ctx is the database instance.
  345. * @param descr is the page descriptor.
  346. * @param remove_dirty must be non-zero if the page to be deleted is dirty.
  347. * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
  348. * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
  349. * NULL. Otherwise, metric_id is not set.
  350. * @return 1 if it's safe to delete the metric, 0 otherwise.
  351. */
  352. uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
  353. uint8_t is_exclusive_holder, uuid_t *metric_id)
  354. {
  355. struct page_cache *pg_cache = &ctx->pg_cache;
  356. struct page_cache_descr *pg_cache_descr = NULL;
  357. Pvoid_t *PValue;
  358. struct pg_cache_page_index *page_index = NULL;
  359. int ret;
  360. uint8_t can_delete_metric = 0;
  361. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  362. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  363. fatal_assert(NULL != PValue);
  364. page_index = *PValue;
  365. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  366. uv_rwlock_wrlock(&page_index->lock);
  367. ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  368. if (unlikely(0 == ret)) {
  369. uv_rwlock_wrunlock(&page_index->lock);
  370. if (unlikely(debug_flags & D_RRDENGINE)) {
  371. print_page_descr(descr);
  372. }
  373. goto destroy;
  374. }
  375. --page_index->page_count;
  376. if (!page_index->writers && !page_index->page_count) {
  377. can_delete_metric = 1;
  378. if (metric_id) {
  379. memcpy(metric_id, page_index->id, sizeof(uuid_t));
  380. }
  381. }
  382. uv_rwlock_wrunlock(&page_index->lock);
  383. fatal_assert(1 == ret);
  384. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  385. ++ctx->stats.pg_cache_deletions;
  386. --pg_cache->page_descriptors;
  387. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  388. rrdeng_page_descr_mutex_lock(ctx, descr);
  389. pg_cache_descr = descr->pg_cache_descr;
  390. if (!is_exclusive_holder) {
  391. /* If we don't hold an exclusive page reference get one */
  392. while (!pg_cache_try_get_unsafe(descr, 1)) {
  393. debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
  394. if (unlikely(debug_flags & D_RRDENGINE))
  395. print_page_cache_descr(descr);
  396. pg_cache_wait_event_unsafe(descr);
  397. }
  398. }
  399. if (remove_dirty) {
  400. pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
  401. } else {
  402. /* even a locked page could be dirty */
  403. while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
  404. debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
  405. if (unlikely(debug_flags & D_RRDENGINE))
  406. print_page_cache_descr(descr);
  407. pg_cache_wait_event_unsafe(descr);
  408. }
  409. }
  410. rrdeng_page_descr_mutex_unlock(ctx, descr);
  411. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  412. /* only after locking can it be safely deleted from LRU */
  413. pg_cache_replaceQ_delete(ctx, descr);
  414. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  415. pg_cache_evict_unsafe(ctx, descr);
  416. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  417. }
  418. pg_cache_put(ctx, descr);
  419. rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
  420. while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  421. rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
  422. (void)sleep_usec(1000); /* 1 msec */
  423. }
  424. destroy:
  425. freez(descr);
  426. pg_cache_update_metric_times(page_index);
  427. return can_delete_metric;
  428. }
  429. static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
  430. {
  431. usec_t pg_start, pg_end;
  432. pg_start = descr->start_time;
  433. pg_end = descr->end_time;
  434. return (pg_start < start_time && pg_end >= start_time) ||
  435. (pg_start >= start_time && pg_start <= end_time);
  436. }
  437. static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
  438. {
  439. return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
  440. }
  441. /* The caller must hold the page index lock */
  442. static inline struct rrdeng_page_descr *
  443. find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
  444. {
  445. struct rrdeng_page_descr *descr = NULL;
  446. Pvoid_t *PValue;
  447. Word_t Index;
  448. Index = (Word_t)(start_time / USEC_PER_SEC);
  449. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  450. if (likely(NULL != PValue)) {
  451. descr = *PValue;
  452. if (is_page_in_time_range(descr, start_time, end_time)) {
  453. return descr;
  454. }
  455. }
  456. Index = (Word_t)(start_time / USEC_PER_SEC);
  457. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  458. if (likely(NULL != PValue)) {
  459. descr = *PValue;
  460. if (is_page_in_time_range(descr, start_time, end_time)) {
  461. return descr;
  462. }
  463. }
  464. return NULL;
  465. }
  466. /* Update metric oldest and latest timestamps efficiently when adding new values */
  467. void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
  468. {
  469. usec_t oldest_time = page_index->oldest_time;
  470. usec_t latest_time = page_index->latest_time;
  471. if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
  472. page_index->oldest_time = descr->start_time;
  473. }
  474. if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
  475. page_index->latest_time = descr->end_time;
  476. }
  477. }
  478. /* Update metric oldest and latest timestamps when removing old values */
  479. void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
  480. {
  481. Pvoid_t *firstPValue, *lastPValue;
  482. Word_t firstIndex, lastIndex;
  483. struct rrdeng_page_descr *descr;
  484. usec_t oldest_time = INVALID_TIME;
  485. usec_t latest_time = INVALID_TIME;
  486. uv_rwlock_rdlock(&page_index->lock);
  487. /* Find first page in range */
  488. firstIndex = (Word_t)0;
  489. firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
  490. if (likely(NULL != firstPValue)) {
  491. descr = *firstPValue;
  492. oldest_time = descr->start_time;
  493. }
  494. lastIndex = (Word_t)-1;
  495. lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
  496. if (likely(NULL != lastPValue)) {
  497. descr = *lastPValue;
  498. latest_time = descr->end_time;
  499. }
  500. uv_rwlock_rdunlock(&page_index->lock);
  501. if (unlikely(NULL == firstPValue)) {
  502. fatal_assert(NULL == lastPValue);
  503. page_index->oldest_time = page_index->latest_time = INVALID_TIME;
  504. return;
  505. }
  506. page_index->oldest_time = oldest_time;
  507. page_index->latest_time = latest_time;
  508. }
  509. /* If index is NULL lookup by UUID (descr->id) */
  510. void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
  511. struct rrdeng_page_descr *descr)
  512. {
  513. struct page_cache *pg_cache = &ctx->pg_cache;
  514. Pvoid_t *PValue;
  515. struct pg_cache_page_index *page_index;
  516. unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
  517. if (0 != pg_cache_descr_state) {
  518. /* there is page cache descriptor pre-allocated state */
  519. struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
  520. fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
  521. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  522. pg_cache_reserve_pages(ctx, 1);
  523. if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
  524. pg_cache_replaceQ_insert(ctx, descr);
  525. }
  526. }
  527. if (unlikely(NULL == index)) {
  528. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  529. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
  530. fatal_assert(NULL != PValue);
  531. page_index = *PValue;
  532. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  533. } else {
  534. page_index = index;
  535. }
  536. uv_rwlock_wrlock(&page_index->lock);
  537. PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
  538. *PValue = descr;
  539. ++page_index->page_count;
  540. pg_cache_add_new_metric_time(page_index, descr);
  541. uv_rwlock_wrunlock(&page_index->lock);
  542. uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
  543. ++ctx->stats.pg_cache_insertions;
  544. ++pg_cache->page_descriptors;
  545. uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
  546. }
  547. usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
  548. {
  549. struct page_cache *pg_cache = &ctx->pg_cache;
  550. struct rrdeng_page_descr *descr = NULL;
  551. Pvoid_t *PValue;
  552. struct pg_cache_page_index *page_index = NULL;
  553. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  554. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  555. if (likely(NULL != PValue)) {
  556. page_index = *PValue;
  557. }
  558. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  559. if (NULL == PValue) {
  560. return INVALID_TIME;
  561. }
  562. uv_rwlock_rdlock(&page_index->lock);
  563. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  564. if (NULL == descr) {
  565. uv_rwlock_rdunlock(&page_index->lock);
  566. return INVALID_TIME;
  567. }
  568. uv_rwlock_rdunlock(&page_index->lock);
  569. return descr->start_time;
  570. }
  571. /**
  572. * Return page information for the first page before point_in_time that satisfies the filter.
  573. * @param ctx DB context
  574. * @param page_index page index of a metric
  575. * @param point_in_time the pages that are searched must be older than this timestamp
  576. * @param filter decides if the page satisfies the caller's criteria
  577. * @param page_info the result of the search is set in this pointer
  578. */
  579. void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
  580. usec_t point_in_time, pg_cache_page_info_filter_t *filter,
  581. struct rrdeng_page_info *page_info)
  582. {
  583. struct page_cache *pg_cache = &ctx->pg_cache;
  584. struct rrdeng_page_descr *descr = NULL;
  585. Pvoid_t *PValue;
  586. Word_t Index;
  587. (void)pg_cache;
  588. fatal_assert(NULL != page_index);
  589. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  590. uv_rwlock_rdlock(&page_index->lock);
  591. do {
  592. PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
  593. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  594. } while (descr != NULL && !filter(descr));
  595. if (unlikely(NULL == descr)) {
  596. page_info->page_length = 0;
  597. page_info->start_time = INVALID_TIME;
  598. page_info->end_time = INVALID_TIME;
  599. } else {
  600. page_info->page_length = descr->page_length;
  601. page_info->start_time = descr->start_time;
  602. page_info->end_time = descr->end_time;
  603. }
  604. uv_rwlock_rdunlock(&page_index->lock);
  605. }
  606. /**
  607. * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
  608. * @param ctx DB context
  609. * @param id lookup by UUID
  610. * @param start_time exact starting time in usec
  611. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  612. * @return the page descriptor or NULL on failure. It can fail if:
  613. * 1. The page is already allocated to the page cache.
  614. * 2. It did not succeed to get a reference.
  615. * 3. It did not succeed to reserve a spot in the page cache.
  616. */
  617. struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
  618. usec_t start_time)
  619. {
  620. struct page_cache *pg_cache = &ctx->pg_cache;
  621. struct rrdeng_page_descr *descr = NULL;
  622. struct page_cache_descr *pg_cache_descr = NULL;
  623. unsigned long flags;
  624. Pvoid_t *PValue;
  625. struct pg_cache_page_index *page_index = NULL;
  626. Word_t Index;
  627. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  628. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  629. if (likely(NULL != PValue)) {
  630. page_index = *PValue;
  631. }
  632. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  633. if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
  634. /* Failed to find page or failed to reserve a spot in the cache */
  635. return NULL;
  636. }
  637. uv_rwlock_rdlock(&page_index->lock);
  638. Index = (Word_t)(start_time / USEC_PER_SEC);
  639. PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
  640. if (likely(NULL != PValue)) {
  641. descr = *PValue;
  642. }
  643. if (NULL == PValue || 0 == descr->page_length) {
  644. /* Failed to find non-empty page */
  645. uv_rwlock_rdunlock(&page_index->lock);
  646. pg_cache_release_pages(ctx, 1);
  647. return NULL;
  648. }
  649. rrdeng_page_descr_mutex_lock(ctx, descr);
  650. pg_cache_descr = descr->pg_cache_descr;
  651. flags = pg_cache_descr->flags;
  652. uv_rwlock_rdunlock(&page_index->lock);
  653. if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) {
  654. /* Failed to get reference or page is already populated */
  655. rrdeng_page_descr_mutex_unlock(ctx, descr);
  656. pg_cache_release_pages(ctx, 1);
  657. return NULL;
  658. }
  659. /* success */
  660. rrdeng_page_descr_mutex_unlock(ctx, descr);
  661. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  662. return descr;
  663. }
  664. /**
  665. * Searches for pages in a time range and triggers disk I/O if necessary and possible.
  666. * Does not get a reference.
  667. * @param ctx DB context
  668. * @param id UUID
  669. * @param start_time inclusive starting time in usec
  670. * @param end_time inclusive ending time in usec
  671. * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
  672. * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
  673. * If page_info_arrayp is set to NULL nothing was allocated.
  674. * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
  675. * @return the number of pages that overlap with the time range [start_time,end_time].
  676. */
  677. unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time,
  678. struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
  679. {
  680. struct page_cache *pg_cache = &ctx->pg_cache;
  681. struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
  682. struct page_cache_descr *pg_cache_descr = NULL;
  683. unsigned i, j, k, preload_count, count, page_info_array_max_size;
  684. unsigned long flags;
  685. Pvoid_t *PValue;
  686. struct pg_cache_page_index *page_index = NULL;
  687. Word_t Index;
  688. uint8_t failed_to_reserve;
  689. fatal_assert(NULL != ret_page_indexp);
  690. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  691. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  692. if (likely(NULL != PValue)) {
  693. *ret_page_indexp = page_index = *PValue;
  694. }
  695. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  696. if (NULL == PValue) {
  697. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  698. *ret_page_indexp = NULL;
  699. return 0;
  700. }
  701. uv_rwlock_rdlock(&page_index->lock);
  702. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  703. if (NULL == descr) {
  704. uv_rwlock_rdunlock(&page_index->lock);
  705. debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
  706. *ret_page_indexp = NULL;
  707. return 0;
  708. } else {
  709. Index = (Word_t)(descr->start_time / USEC_PER_SEC);
  710. }
  711. if (page_info_arrayp) {
  712. page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  713. *page_info_arrayp = mallocz(page_info_array_max_size);
  714. }
  715. for (count = 0, preload_count = 0 ;
  716. descr != NULL && is_page_in_time_range(descr, start_time, end_time) ;
  717. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
  718. descr = unlikely(NULL == PValue) ? NULL : *PValue) {
  719. /* Iterate all pages in range */
  720. if (unlikely(0 == descr->page_length))
  721. continue;
  722. if (page_info_arrayp) {
  723. if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
  724. page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
  725. *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
  726. }
  727. (*page_info_arrayp)[count].start_time = descr->start_time;
  728. (*page_info_arrayp)[count].end_time = descr->end_time;
  729. (*page_info_arrayp)[count].page_length = descr->page_length;
  730. }
  731. ++count;
  732. rrdeng_page_descr_mutex_lock(ctx, descr);
  733. pg_cache_descr = descr->pg_cache_descr;
  734. flags = pg_cache_descr->flags;
  735. if (pg_cache_can_get_unsafe(descr, 0)) {
  736. if (flags & RRD_PAGE_POPULATED) {
  737. /* success */
  738. rrdeng_page_descr_mutex_unlock(ctx, descr);
  739. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  740. continue;
  741. }
  742. }
  743. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  744. preload_array[preload_count++] = descr;
  745. if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
  746. rrdeng_page_descr_mutex_unlock(ctx, descr);
  747. break;
  748. }
  749. }
  750. rrdeng_page_descr_mutex_unlock(ctx, descr);
  751. }
  752. uv_rwlock_rdunlock(&page_index->lock);
  753. failed_to_reserve = 0;
  754. for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
  755. struct rrdeng_cmd cmd;
  756. struct rrdeng_page_descr *next;
  757. descr = preload_array[i];
  758. if (NULL == descr) {
  759. continue;
  760. }
  761. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  762. failed_to_reserve = 1;
  763. break;
  764. }
  765. cmd.opcode = RRDENG_READ_EXTENT;
  766. cmd.read_extent.page_cache_descr[0] = descr;
  767. /* don't use this page again */
  768. preload_array[i] = NULL;
  769. for (j = 0, k = 1 ; j < preload_count ; ++j) {
  770. next = preload_array[j];
  771. if (NULL == next) {
  772. continue;
  773. }
  774. if (descr->extent == next->extent) {
  775. /* same extent, consolidate */
  776. if (!pg_cache_try_reserve_pages(ctx, 1)) {
  777. failed_to_reserve = 1;
  778. break;
  779. }
  780. cmd.read_extent.page_cache_descr[k++] = next;
  781. /* don't use this page again */
  782. preload_array[j] = NULL;
  783. }
  784. }
  785. cmd.read_extent.page_count = k;
  786. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  787. }
  788. if (failed_to_reserve) {
  789. debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
  790. for (i = 0 ; i < preload_count ; ++i) {
  791. descr = preload_array[i];
  792. if (NULL == descr) {
  793. continue;
  794. }
  795. pg_cache_put(ctx, descr);
  796. }
  797. }
  798. if (!preload_count) {
  799. /* no such page */
  800. debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
  801. }
  802. if (unlikely(0 == count && page_info_arrayp)) {
  803. freez(*page_info_arrayp);
  804. *page_info_arrayp = NULL;
  805. }
  806. return count;
  807. }
  808. /*
  809. * Searches for a page and gets a reference.
  810. * When point_in_time is INVALID_TIME get any page.
  811. * If index is NULL lookup by UUID (id).
  812. */
  813. struct rrdeng_page_descr *
  814. pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  815. usec_t point_in_time)
  816. {
  817. struct page_cache *pg_cache = &ctx->pg_cache;
  818. struct rrdeng_page_descr *descr = NULL;
  819. struct page_cache_descr *pg_cache_descr = NULL;
  820. unsigned long flags;
  821. Pvoid_t *PValue;
  822. struct pg_cache_page_index *page_index = NULL;
  823. Word_t Index;
  824. uint8_t page_not_in_cache;
  825. if (unlikely(NULL == index)) {
  826. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  827. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  828. if (likely(NULL != PValue)) {
  829. page_index = *PValue;
  830. }
  831. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  832. if (NULL == PValue) {
  833. return NULL;
  834. }
  835. } else {
  836. page_index = index;
  837. }
  838. pg_cache_reserve_pages(ctx, 1);
  839. page_not_in_cache = 0;
  840. uv_rwlock_rdlock(&page_index->lock);
  841. while (1) {
  842. Index = (Word_t)(point_in_time / USEC_PER_SEC);
  843. PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
  844. if (likely(NULL != PValue)) {
  845. descr = *PValue;
  846. }
  847. if (NULL == PValue ||
  848. 0 == descr->page_length ||
  849. (INVALID_TIME != point_in_time &&
  850. !is_point_in_time_in_page(descr, point_in_time))) {
  851. /* non-empty page not found */
  852. uv_rwlock_rdunlock(&page_index->lock);
  853. pg_cache_release_pages(ctx, 1);
  854. return NULL;
  855. }
  856. rrdeng_page_descr_mutex_lock(ctx, descr);
  857. pg_cache_descr = descr->pg_cache_descr;
  858. flags = pg_cache_descr->flags;
  859. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  860. /* success */
  861. rrdeng_page_descr_mutex_unlock(ctx, descr);
  862. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  863. break;
  864. }
  865. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  866. struct rrdeng_cmd cmd;
  867. uv_rwlock_rdunlock(&page_index->lock);
  868. cmd.opcode = RRDENG_READ_PAGE;
  869. cmd.read_page.page_cache_descr = descr;
  870. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  871. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  872. if(unlikely(debug_flags & D_RRDENGINE))
  873. print_page_cache_descr(descr);
  874. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  875. pg_cache_wait_event_unsafe(descr);
  876. }
  877. /* success */
  878. /* Downgrade exclusive reference to allow other readers */
  879. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  880. pg_cache_wake_up_waiters_unsafe(descr);
  881. rrdeng_page_descr_mutex_unlock(ctx, descr);
  882. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  883. return descr;
  884. }
  885. uv_rwlock_rdunlock(&page_index->lock);
  886. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  887. if(unlikely(debug_flags & D_RRDENGINE))
  888. print_page_cache_descr(descr);
  889. if (!(flags & RRD_PAGE_POPULATED))
  890. page_not_in_cache = 1;
  891. pg_cache_wait_event_unsafe(descr);
  892. rrdeng_page_descr_mutex_unlock(ctx, descr);
  893. /* reset scan to find again */
  894. uv_rwlock_rdlock(&page_index->lock);
  895. }
  896. uv_rwlock_rdunlock(&page_index->lock);
  897. if (!(flags & RRD_PAGE_DIRTY))
  898. pg_cache_replaceQ_set_hot(ctx, descr);
  899. pg_cache_release_pages(ctx, 1);
  900. if (page_not_in_cache)
  901. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  902. else
  903. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  904. return descr;
  905. }
  906. /*
  907. * Searches for the first page between start_time and end_time and gets a reference.
  908. * start_time and end_time are inclusive.
  909. * If index is NULL lookup by UUID (id).
  910. */
  911. struct rrdeng_page_descr *
  912. pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
  913. usec_t start_time, usec_t end_time)
  914. {
  915. struct page_cache *pg_cache = &ctx->pg_cache;
  916. struct rrdeng_page_descr *descr = NULL;
  917. struct page_cache_descr *pg_cache_descr = NULL;
  918. unsigned long flags;
  919. Pvoid_t *PValue;
  920. struct pg_cache_page_index *page_index = NULL;
  921. uint8_t page_not_in_cache;
  922. if (unlikely(NULL == index)) {
  923. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  924. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
  925. if (likely(NULL != PValue)) {
  926. page_index = *PValue;
  927. }
  928. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  929. if (NULL == PValue) {
  930. return NULL;
  931. }
  932. } else {
  933. page_index = index;
  934. }
  935. pg_cache_reserve_pages(ctx, 1);
  936. page_not_in_cache = 0;
  937. uv_rwlock_rdlock(&page_index->lock);
  938. int retry_count = 0;
  939. while (1) {
  940. descr = find_first_page_in_time_range(page_index, start_time, end_time);
  941. if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) {
  942. /* non-empty page not found */
  943. if (retry_count == default_rrdeng_page_fetch_retries)
  944. error_report("Page cache timeout while waiting for page %p : returning FAIL", descr);
  945. uv_rwlock_rdunlock(&page_index->lock);
  946. pg_cache_release_pages(ctx, 1);
  947. return NULL;
  948. }
  949. rrdeng_page_descr_mutex_lock(ctx, descr);
  950. pg_cache_descr = descr->pg_cache_descr;
  951. flags = pg_cache_descr->flags;
  952. if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
  953. /* success */
  954. rrdeng_page_descr_mutex_unlock(ctx, descr);
  955. debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
  956. break;
  957. }
  958. if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
  959. struct rrdeng_cmd cmd;
  960. uv_rwlock_rdunlock(&page_index->lock);
  961. cmd.opcode = RRDENG_READ_PAGE;
  962. cmd.read_page.page_cache_descr = descr;
  963. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  964. debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
  965. if(unlikely(debug_flags & D_RRDENGINE))
  966. print_page_cache_descr(descr);
  967. while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
  968. pg_cache_wait_event_unsafe(descr);
  969. }
  970. /* success */
  971. /* Downgrade exclusive reference to allow other readers */
  972. pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
  973. pg_cache_wake_up_waiters_unsafe(descr);
  974. rrdeng_page_descr_mutex_unlock(ctx, descr);
  975. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  976. return descr;
  977. }
  978. uv_rwlock_rdunlock(&page_index->lock);
  979. debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
  980. if(unlikely(debug_flags & D_RRDENGINE))
  981. print_page_cache_descr(descr);
  982. if (!(flags & RRD_PAGE_POPULATED))
  983. page_not_in_cache = 1;
  984. if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) {
  985. error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count);
  986. ++retry_count;
  987. }
  988. rrdeng_page_descr_mutex_unlock(ctx, descr);
  989. /* reset scan to find again */
  990. uv_rwlock_rdlock(&page_index->lock);
  991. }
  992. uv_rwlock_rdunlock(&page_index->lock);
  993. if (!(flags & RRD_PAGE_DIRTY))
  994. pg_cache_replaceQ_set_hot(ctx, descr);
  995. pg_cache_release_pages(ctx, 1);
  996. if (page_not_in_cache)
  997. rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
  998. else
  999. rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
  1000. return descr;
  1001. }
  1002. struct pg_cache_page_index *create_page_index(uuid_t *id)
  1003. {
  1004. struct pg_cache_page_index *page_index;
  1005. page_index = mallocz(sizeof(*page_index));
  1006. page_index->JudyL_array = (Pvoid_t) NULL;
  1007. uuid_copy(page_index->id, *id);
  1008. fatal_assert(0 == uv_rwlock_init(&page_index->lock));
  1009. page_index->oldest_time = INVALID_TIME;
  1010. page_index->latest_time = INVALID_TIME;
  1011. page_index->prev = NULL;
  1012. page_index->page_count = 0;
  1013. page_index->writers = 0;
  1014. return page_index;
  1015. }
  1016. static void init_metrics_index(struct rrdengine_instance *ctx)
  1017. {
  1018. struct page_cache *pg_cache = &ctx->pg_cache;
  1019. pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
  1020. pg_cache->metrics_index.last_page_index = NULL;
  1021. fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
  1022. }
  1023. static void init_replaceQ(struct rrdengine_instance *ctx)
  1024. {
  1025. struct page_cache *pg_cache = &ctx->pg_cache;
  1026. pg_cache->replaceQ.head = NULL;
  1027. pg_cache->replaceQ.tail = NULL;
  1028. fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
  1029. }
  1030. static void init_committed_page_index(struct rrdengine_instance *ctx)
  1031. {
  1032. struct page_cache *pg_cache = &ctx->pg_cache;
  1033. pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
  1034. fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
  1035. pg_cache->committed_page_index.latest_corr_id = 0;
  1036. pg_cache->committed_page_index.nr_committed_pages = 0;
  1037. }
  1038. void init_page_cache(struct rrdengine_instance *ctx)
  1039. {
  1040. struct page_cache *pg_cache = &ctx->pg_cache;
  1041. pg_cache->page_descriptors = 0;
  1042. pg_cache->populated_pages = 0;
  1043. fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
  1044. init_metrics_index(ctx);
  1045. init_replaceQ(ctx);
  1046. init_committed_page_index(ctx);
  1047. }
  1048. /*
  1049. * METRIC # number
  1050. * 1. INDEX: JudyHS # bytes
  1051. * 2. DATA: page_index # bytes
  1052. *
  1053. * PAGE (1 page of 1 metric) # number
  1054. * 1. INDEX AT METRIC: page_index->JudyL_array # bytes
  1055. * 2. DATA: descr # bytes
  1056. *
  1057. * PAGE CACHE (1 page of 1 metric at the cache) # number
  1058. * 1. pg_cache_descr (if PG_CACHE_DESCR_ALLOCATED) # bytes
  1059. * 2. data (if RRD_PAGE_POPULATED) # bytes
  1060. *
  1061. */
  1062. void free_page_cache(struct rrdengine_instance *ctx)
  1063. {
  1064. struct page_cache *pg_cache = &ctx->pg_cache;
  1065. Pvoid_t *PValue;
  1066. struct pg_cache_page_index *page_index, *prev_page_index;
  1067. Word_t Index;
  1068. struct rrdeng_page_descr *descr;
  1069. struct page_cache_descr *pg_cache_descr;
  1070. Word_t metrics_number = 0,
  1071. metrics_bytes = 0,
  1072. metrics_index_bytes = 0,
  1073. metrics_duration = 0;
  1074. Word_t pages_number = 0,
  1075. pages_bytes = 0,
  1076. pages_index_bytes = 0;
  1077. Word_t pages_size_per_type[256] = { 0 },
  1078. pages_count_per_type[256] = { 0 };
  1079. Word_t cache_pages_number = 0,
  1080. cache_pages_bytes = 0,
  1081. cache_pages_data_bytes = 0;
  1082. size_t points_in_db = 0,
  1083. uncompressed_points_size = 0,
  1084. seconds_in_db = 0,
  1085. single_point_pages = 0;
  1086. Word_t pages_dirty_index_bytes = 0;
  1087. usec_t oldest_time_ut = LONG_MAX, latest_time_ut = 0;
  1088. /* Free committed page index */
  1089. pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
  1090. fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
  1091. for (page_index = pg_cache->metrics_index.last_page_index ;
  1092. page_index != NULL ;
  1093. page_index = prev_page_index) {
  1094. prev_page_index = page_index->prev;
  1095. /* Find first page in range */
  1096. Index = (Word_t) 0;
  1097. PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
  1098. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1099. size_t metric_duration = 0;
  1100. size_t metric_update_every = 0;
  1101. size_t metric_single_point_pages = 0;
  1102. while (descr != NULL) {
  1103. /* Iterate all page descriptors of this metric */
  1104. if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
  1105. cache_pages_number++;
  1106. /* Check rrdenglocking.c */
  1107. pg_cache_descr = descr->pg_cache_descr;
  1108. if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
  1109. dbengine_page_free(pg_cache_descr->page);
  1110. cache_pages_data_bytes += RRDENG_BLOCK_SIZE;
  1111. }
  1112. rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
  1113. cache_pages_bytes += sizeof(*pg_cache_descr);
  1114. }
  1115. if(descr->start_time < oldest_time_ut)
  1116. oldest_time_ut = descr->start_time;
  1117. if(descr->end_time > latest_time_ut)
  1118. latest_time_ut = descr->end_time;
  1119. pages_size_per_type[descr->type] += descr->page_length;
  1120. pages_count_per_type[descr->type]++;
  1121. size_t points_in_page = (descr->page_length / ctx->storage_size);
  1122. size_t page_duration = ((descr->end_time - descr->start_time) / USEC_PER_SEC);
  1123. size_t update_every = (page_duration == 0) ? 1 : page_duration / (points_in_page - 1);
  1124. if (!page_duration && metric_update_every) {
  1125. page_duration = metric_update_every;
  1126. update_every = metric_update_every;
  1127. }
  1128. else if(page_duration)
  1129. metric_update_every = update_every;
  1130. uncompressed_points_size += descr->page_length;
  1131. if(page_duration > 0) {
  1132. page_duration = update_every * points_in_page;
  1133. metric_duration += page_duration;
  1134. seconds_in_db += page_duration;
  1135. points_in_db += descr->page_length / ctx->storage_size;
  1136. }
  1137. else
  1138. metric_single_point_pages++;
  1139. freez(descr);
  1140. pages_bytes += sizeof(*descr);
  1141. pages_number++;
  1142. PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
  1143. descr = unlikely(NULL == PValue) ? NULL : *PValue;
  1144. }
  1145. if(metric_single_point_pages && metric_update_every) {
  1146. points_in_db += metric_single_point_pages;
  1147. seconds_in_db += metric_update_every * metric_single_point_pages;
  1148. metric_duration += metric_update_every * metric_single_point_pages;
  1149. }
  1150. else
  1151. single_point_pages += metric_single_point_pages;
  1152. /* Free page index */
  1153. pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0);
  1154. fatal_assert(NULL == page_index->JudyL_array);
  1155. freez(page_index);
  1156. metrics_number++;
  1157. metrics_bytes += sizeof(*page_index);
  1158. metrics_duration += metric_duration;
  1159. }
  1160. /* Free metrics index */
  1161. metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
  1162. fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
  1163. if(!metrics_number) metrics_number = 1;
  1164. if(!pages_number) pages_number = 1;
  1165. if(!cache_pages_number) cache_pages_number = 1;
  1166. if(!points_in_db) points_in_db = 1;
  1167. if(latest_time_ut == oldest_time_ut) oldest_time_ut -= USEC_PER_SEC;
  1168. if(single_point_pages) {
  1169. long double avg_duration = (long double)seconds_in_db / points_in_db;
  1170. points_in_db += single_point_pages;
  1171. seconds_in_db += (size_t)(avg_duration * single_point_pages);
  1172. }
  1173. info("DBENGINE STATISTICS ON METRICS:"
  1174. " Metrics: %lu (structures %lu bytes - per metric %0.2f, index (HS) %lu bytes - per metric %0.2f bytes - duration %zu secs) |"
  1175. " Page descriptors: %lu (structures %lu bytes - per page %0.2f bytes, index (L) %lu bytes - per page %0.2f, dirty index %lu bytes). |"
  1176. " Page cache: %lu pages (structures %lu bytes - per page %0.2f bytes, data %lu bytes). |"
  1177. " Points in db %zu, uncompressed size of points database %zu bytes. |"
  1178. " Duration of all points %zu seconds, average point duration %0.2f seconds."
  1179. " Duration of the database %llu seconds, average metric duration %0.2f seconds, average metric lifetime %0.2f%%."
  1180. , metrics_number, metrics_bytes, (double)metrics_bytes/metrics_number, metrics_index_bytes, (double)metrics_index_bytes/metrics_number, metrics_duration
  1181. , pages_number, pages_bytes, (double)pages_bytes/pages_number, pages_index_bytes, (double)pages_index_bytes/pages_number, pages_dirty_index_bytes
  1182. , cache_pages_number, cache_pages_bytes, (double)cache_pages_bytes/cache_pages_number, cache_pages_data_bytes
  1183. , points_in_db, uncompressed_points_size
  1184. , seconds_in_db, (double)seconds_in_db/points_in_db
  1185. , (latest_time_ut - oldest_time_ut) / USEC_PER_SEC, (double)metrics_duration/metrics_number
  1186. , (double)metrics_duration/metrics_number * 100.0 / ((latest_time_ut - oldest_time_ut) / USEC_PER_SEC)
  1187. );
  1188. for(int i = 0; i < 256 ;i++) {
  1189. if(pages_count_per_type[i])
  1190. info("DBENGINE STATISTICS ON PAGE TYPES: page type %d total pages %lu, average page size %0.2f bytes", i, pages_count_per_type[i], (double)pages_size_per_type[i]/pages_count_per_type[i]);
  1191. }
  1192. }