123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #define NETDATA_RRD_INTERNALS
- #include "rrdengine.h"
- /* Forward declerations */
- static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
- /* always inserts into tail */
- static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- if (likely(NULL != pg_cache->replaceQ.tail)) {
- descr->prev = pg_cache->replaceQ.tail;
- pg_cache->replaceQ.tail->next = descr;
- }
- if (unlikely(NULL == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = descr;
- }
- pg_cache->replaceQ.tail = descr;
- }
- static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *prev, *next;
- prev = descr->prev;
- next = descr->next;
- if (likely(NULL != prev)) {
- prev->next = next;
- }
- if (likely(NULL != next)) {
- next->prev = prev;
- }
- if (unlikely(descr == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = next;
- }
- if (unlikely(descr == pg_cache->replaceQ.tail)) {
- pg_cache->replaceQ.tail = prev;
- }
- descr->prev = descr->next = NULL;
- }
- void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- pg_cache_replaceQ_insert_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
- }
- void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- pg_cache_replaceQ_delete_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
- }
- void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- pg_cache_replaceQ_delete_unsafe(ctx, descr);
- pg_cache_replaceQ_insert_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
- }
- struct rrdeng_page_cache_descr *pg_cache_create_descr(void)
- {
- struct rrdeng_page_cache_descr *descr;
- descr = mallocz(sizeof(*descr));
- descr->page = NULL;
- descr->page_length = 0;
- descr->start_time = INVALID_TIME;
- descr->end_time = INVALID_TIME;
- descr->id = NULL;
- descr->extent = NULL;
- descr->flags = 0;
- descr->prev = descr->next = descr->private = NULL;
- descr->refcnt = 0;
- descr->waiters = 0;
- descr->handle = NULL;
- assert(0 == uv_cond_init(&descr->cond));
- assert(0 == uv_mutex_init(&descr->mutex));
- return descr;
- }
- void pg_cache_destroy_descr(struct rrdeng_page_cache_descr *descr)
- {
- uv_cond_destroy(&descr->cond);
- uv_mutex_destroy(&descr->mutex);
- free(descr);
- }
- /* The caller must hold page descriptor lock. */
- void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_cache_descr *descr)
- {
- if (descr->waiters)
- uv_cond_broadcast(&descr->cond);
- }
- /*
- * The caller must hold page descriptor lock.
- * The lock will be released and re-acquired. The descriptor is not guaranteed
- * to exist after this function returns.
- */
- void pg_cache_wait_event_unsafe(struct rrdeng_page_cache_descr *descr)
- {
- ++descr->waiters;
- uv_cond_wait(&descr->cond, &descr->mutex);
- --descr->waiters;
- }
- /*
- * Returns page flags.
- * The lock will be released and re-acquired. The descriptor is not guaranteed
- * to exist after this function returns.
- */
- unsigned long pg_cache_wait_event(struct rrdeng_page_cache_descr *descr)
- {
- unsigned long flags;
- uv_mutex_lock(&descr->mutex);
- pg_cache_wait_event_unsafe(descr);
- flags = descr->flags;
- uv_mutex_unlock(&descr->mutex);
- return flags;
- }
- /*
- * The caller must hold page descriptor lock.
- * Gets a reference to the page descriptor.
- * Returns 1 on success and 0 on failure.
- */
- int pg_cache_try_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
- {
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
- return 0;
- }
- if (exclusive_access)
- descr->flags |= RRD_PAGE_LOCKED;
- ++descr->refcnt;
- return 1;
- }
- /*
- * The caller must hold page descriptor lock.
- * Same return values as pg_cache_try_get_unsafe() without doing anything.
- */
- int pg_cache_can_get_unsafe(struct rrdeng_page_cache_descr *descr, int exclusive_access)
- {
- if ((descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && descr->refcnt)) {
- return 0;
- }
- return 1;
- }
- /*
- * The caller must hold the page descriptor lock.
- * This function may block doing cleanup.
- */
- void pg_cache_put_unsafe(struct rrdeng_page_cache_descr *descr)
- {
- descr->flags &= ~RRD_PAGE_LOCKED;
- if (0 == --descr->refcnt) {
- pg_cache_wake_up_waiters_unsafe(descr);
- }
- /* TODO: perform cleanup */
- }
- /*
- * This function may block doing cleanup.
- */
- void pg_cache_put(struct rrdeng_page_cache_descr *descr)
- {
- uv_mutex_lock(&descr->mutex);
- pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
- }
- /* The caller must hold the page cache lock */
- static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- pg_cache->populated_pages -= number;
- }
- static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- pg_cache_release_pages_unsafe(ctx, number);
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- }
- /*
- * This function will block until it reserves #number populated pages.
- * It will trigger evictions or dirty page flushing if the ctx->max_cache_pages limit is hit.
- */
- static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- assert(number < ctx->max_cache_pages);
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- if (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1)
- debug(D_RRDENGINE, "=================================\nPage cache full. Reserving %u pages.\n=================================",
- number);
- while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
- if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
- /* failed to evict */
- struct completion compl;
- struct rrdeng_cmd cmd;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- init_completion(&compl);
- cmd.opcode = RRDENG_FLUSH_PAGES;
- cmd.completion = &compl;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- /* wait for some pages to be flushed */
- debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
- wait_for_completion(&compl);
- destroy_completion(&compl);
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- }
- }
- pg_cache->populated_pages += number;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- }
- /*
- * This function will attempt to reserve #number populated pages.
- * It may trigger evictions if the ctx->cache_pages_low_watermark limit is hit.
- * Returns 0 on failure and 1 on success.
- */
- static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned count = 0;
- int ret = 0;
- assert(number < ctx->max_cache_pages);
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- if (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1) {
- debug(D_RRDENGINE,
- "=================================\nPage cache full. Trying to reserve %u pages.\n=================================",
- number);
- do {
- if (!pg_cache_try_evict_one_page_unsafe(ctx))
- break;
- ++count;
- } while (pg_cache->populated_pages + number >= ctx->cache_pages_low_watermark + 1);
- debug(D_RRDENGINE, "Evicted %u pages.", count);
- }
- if (pg_cache->populated_pages + number < ctx->max_cache_pages + 1) {
- pg_cache->populated_pages += number;
- ret = 1; /* success */
- }
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- return ret;
- }
- /* The caller must hold the page cache and the page descriptor locks in that order */
- static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
- {
- free(descr->page);
- descr->page = NULL;
- descr->flags &= ~RRD_PAGE_POPULATED;
- pg_cache_release_pages_unsafe(ctx, 1);
- ++ctx->stats.pg_cache_evictions;
- }
- /*
- * The caller must hold the page cache lock.
- * Lock order: page cache -> replaceQ -> descriptor
- * This function iterates all pages and tries to evict one.
- * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
- * or it sets it to NULL if no write-back is in progress.
- *
- * Returns 1 on success and 0 on failure.
- */
- static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned long old_flags;
- struct rrdeng_page_cache_descr *descr;
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- for (descr = pg_cache->replaceQ.head ; NULL != descr ; descr = descr->next) {
- uv_mutex_lock(&descr->mutex);
- old_flags = descr->flags;
- if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
- /* must evict */
- pg_cache_evict_unsafe(ctx, descr);
- pg_cache_put_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
- pg_cache_replaceQ_delete_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
- return 1;
- }
- uv_mutex_unlock(&descr->mutex);
- };
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
- /* failed to evict */
- return 0;
- }
- /*
- * TODO: last waiter frees descriptor ?
- */
- void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
- int ret;
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- assert(NULL != PValue);
- page_index = *PValue;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- uv_rwlock_wrlock(&page_index->lock);
- ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
- uv_rwlock_wrunlock(&page_index->lock);
- if (unlikely(0 == ret)) {
- error("Page under deletion was not in index.");
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- goto destroy;
- }
- assert(1 == ret);
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- ++ctx->stats.pg_cache_deletions;
- --pg_cache->page_descriptors;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- uv_mutex_lock(&descr->mutex);
- while (!pg_cache_try_get_unsafe(descr, 1)) {
- debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
- }
- /* even a locked page could be dirty */
- while (unlikely(descr->flags & RRD_PAGE_DIRTY)) {
- debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- pg_cache_wait_event_unsafe(descr);
- }
- uv_mutex_unlock(&descr->mutex);
- if (descr->flags & RRD_PAGE_POPULATED) {
- /* only after locking can it be safely deleted from LRU */
- pg_cache_replaceQ_delete(ctx, descr);
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- pg_cache_evict_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- }
- pg_cache_put(descr);
- destroy:
- pg_cache_destroy_descr(descr);
- pg_cache_update_metric_times(page_index);
- }
- static inline int is_page_in_time_range(struct rrdeng_page_cache_descr *descr, usec_t start_time, usec_t end_time)
- {
- usec_t pg_start, pg_end;
- pg_start = descr->start_time;
- pg_end = descr->end_time;
- return (pg_start < start_time && pg_end >= start_time) ||
- (pg_start >= start_time && pg_start <= end_time);
- }
- static inline int is_point_in_time_in_page(struct rrdeng_page_cache_descr *descr, usec_t point_in_time)
- {
- return (point_in_time >= descr->start_time && point_in_time <= descr->end_time);
- }
- /* Update metric oldest and latest timestamps efficiently when adding new values */
- void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_cache_descr *descr)
- {
- usec_t oldest_time = page_index->oldest_time;
- usec_t latest_time = page_index->latest_time;
- if (unlikely(oldest_time == INVALID_TIME || descr->start_time < oldest_time)) {
- page_index->oldest_time = descr->start_time;
- }
- if (likely(descr->end_time > latest_time || latest_time == INVALID_TIME)) {
- page_index->latest_time = descr->end_time;
- }
- }
- /* Update metric oldest and latest timestamps when removing old values */
- void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
- {
- Pvoid_t *firstPValue, *lastPValue;
- Word_t firstIndex, lastIndex;
- struct rrdeng_page_cache_descr *descr;
- usec_t oldest_time = INVALID_TIME;
- usec_t latest_time = INVALID_TIME;
- uv_rwlock_rdlock(&page_index->lock);
- /* Find first page in range */
- firstIndex = (Word_t)0;
- firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
- if (likely(NULL != firstPValue)) {
- descr = *firstPValue;
- oldest_time = descr->start_time;
- }
- lastIndex = (Word_t)-1;
- lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
- if (likely(NULL != lastPValue)) {
- descr = *lastPValue;
- latest_time = descr->end_time;
- }
- uv_rwlock_rdunlock(&page_index->lock);
- if (unlikely(NULL == firstPValue)) {
- assert(NULL == lastPValue);
- page_index->oldest_time = page_index->latest_time = INVALID_TIME;
- return;
- }
- page_index->oldest_time = oldest_time;
- page_index->latest_time = latest_time;
- }
- /* If index is NULL lookup by UUID (descr->id) */
- void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_cache_descr *descr)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
- if (descr->flags & RRD_PAGE_POPULATED) {
- pg_cache_reserve_pages(ctx, 1);
- if (!(descr->flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_insert(ctx, descr);
- }
- if (unlikely(NULL == index)) {
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- assert(NULL != PValue);
- page_index = *PValue;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- } else {
- page_index = index;
- }
- uv_rwlock_wrlock(&page_index->lock);
- PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time / USEC_PER_SEC), PJE0);
- *PValue = descr;
- pg_cache_add_new_metric_time(page_index, descr);
- uv_rwlock_wrunlock(&page_index->lock);
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- ++ctx->stats.pg_cache_insertions;
- ++pg_cache->page_descriptors;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- }
- /*
- * Searches for a page and triggers disk I/O if necessary and possible.
- * Does not get a reference.
- * Returns page index pointer for given metric UUID.
- */
- struct pg_cache_page_index *
- pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time, usec_t end_time)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
- int i, j, k, count, found;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
- Word_t Index;
- uint8_t failed_to_reserve;
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- return NULL;
- }
- uv_rwlock_rdlock(&page_index->lock);
- /* Find first page in range */
- found = 0;
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- found = 1;
- }
- }
- if (!found) {
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- found = 1;
- }
- }
- }
- if (!found) {
- uv_rwlock_rdunlock(&page_index->lock);
- debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- return page_index;
- }
- for (count = 0 ;
- descr != NULL && is_page_in_time_range(descr, start_time, end_time);
- PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
- /* Iterate all pages in range */
- if (unlikely(0 == descr->page_length))
- continue;
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
- if (pg_cache_can_get_unsafe(descr, 0)) {
- if (flags & RRD_PAGE_POPULATED) {
- /* success */
- uv_mutex_unlock(&descr->mutex);
- debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
- continue;
- }
- }
- if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- preload_array[count++] = descr;
- if (PAGE_CACHE_MAX_PRELOAD_PAGES == count) {
- uv_mutex_unlock(&descr->mutex);
- break;
- }
- }
- uv_mutex_unlock(&descr->mutex);
- };
- uv_rwlock_rdunlock(&page_index->lock);
- failed_to_reserve = 0;
- for (i = 0 ; i < count && !failed_to_reserve ; ++i) {
- struct rrdeng_cmd cmd;
- struct rrdeng_page_cache_descr *next;
- descr = preload_array[i];
- if (NULL == descr) {
- continue;
- }
- if (!pg_cache_try_reserve_pages(ctx, 1)) {
- failed_to_reserve = 1;
- break;
- }
- cmd.opcode = RRDENG_READ_EXTENT;
- cmd.read_extent.page_cache_descr[0] = descr;
- /* don't use this page again */
- preload_array[i] = NULL;
- for (j = 0, k = 1 ; j < count ; ++j) {
- next = preload_array[j];
- if (NULL == next) {
- continue;
- }
- if (descr->extent == next->extent) {
- /* same extent, consolidate */
- if (!pg_cache_try_reserve_pages(ctx, 1)) {
- failed_to_reserve = 1;
- break;
- }
- cmd.read_extent.page_cache_descr[k++] = next;
- /* don't use this page again */
- preload_array[j] = NULL;
- }
- }
- cmd.read_extent.page_count = k;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- }
- if (failed_to_reserve) {
- debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
- for (i = 0 ; i < count ; ++i) {
- descr = preload_array[i];
- if (NULL == descr) {
- continue;
- }
- pg_cache_put(descr);
- }
- }
- if (!count) {
- /* no such page */
- debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
- }
- return page_index;
- }
- /*
- * Searches for a page and gets a reference.
- * When point_in_time is INVALID_TIME get any page.
- * If index is NULL lookup by UUID (id).
- */
- struct rrdeng_page_cache_descr *
- pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t point_in_time)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_cache_descr *descr = NULL;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
- Word_t Index;
- uint8_t page_not_in_cache;
- if (unlikely(NULL == index)) {
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- return NULL;
- }
- } else {
- page_index = index;
- }
- pg_cache_reserve_pages(ctx, 1);
- page_not_in_cache = 0;
- uv_rwlock_rdlock(&page_index->lock);
- while (1) {
- Index = (Word_t)(point_in_time / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- }
- if (NULL == PValue ||
- 0 == descr->page_length ||
- (INVALID_TIME != point_in_time &&
- !is_point_in_time_in_page(descr, point_in_time))) {
- /* non-empty page not found */
- uv_rwlock_rdunlock(&page_index->lock);
- pg_cache_release_pages(ctx, 1);
- return NULL;
- }
- uv_mutex_lock(&descr->mutex);
- flags = descr->flags;
- if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
- /* success */
- uv_mutex_unlock(&descr->mutex);
- debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
- break;
- }
- if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- struct rrdeng_cmd cmd;
- uv_rwlock_rdunlock(&page_index->lock);
- cmd.opcode = RRDENG_READ_PAGE;
- cmd.read_page.page_cache_descr = descr;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- while (!(descr->flags & RRD_PAGE_POPULATED)) {
- pg_cache_wait_event_unsafe(descr);
- }
- /* success */
- /* Downgrade exclusive reference to allow other readers */
- descr->flags &= ~RRD_PAGE_LOCKED;
- pg_cache_wake_up_waiters_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- return descr;
- }
- uv_rwlock_rdunlock(&page_index->lock);
- debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr);
- if (!(flags & RRD_PAGE_POPULATED))
- page_not_in_cache = 1;
- pg_cache_wait_event_unsafe(descr);
- uv_mutex_unlock(&descr->mutex);
- /* reset scan to find again */
- uv_rwlock_rdlock(&page_index->lock);
- }
- uv_rwlock_rdunlock(&page_index->lock);
- if (!(flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_set_hot(ctx, descr);
- pg_cache_release_pages(ctx, 1);
- if (page_not_in_cache)
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- else
- rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
- return descr;
- }
- struct pg_cache_page_index *create_page_index(uuid_t *id)
- {
- struct pg_cache_page_index *page_index;
- page_index = mallocz(sizeof(*page_index));
- page_index->JudyL_array = (Pvoid_t) NULL;
- uuid_copy(page_index->id, *id);
- assert(0 == uv_rwlock_init(&page_index->lock));
- page_index->oldest_time = INVALID_TIME;
- page_index->latest_time = INVALID_TIME;
- return page_index;
- }
- static void init_metrics_index(struct rrdengine_instance *ctx)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
- assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
- }
- static void init_replaceQ(struct rrdengine_instance *ctx)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- pg_cache->replaceQ.head = NULL;
- pg_cache->replaceQ.tail = NULL;
- assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
- }
- static void init_commited_page_index(struct rrdengine_instance *ctx)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL;
- assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock));
- pg_cache->commited_page_index.latest_corr_id = 0;
- pg_cache->commited_page_index.nr_commited_pages = 0;
- }
- void init_page_cache(struct rrdengine_instance *ctx)
- {
- struct page_cache *pg_cache = &ctx->pg_cache;
- pg_cache->page_descriptors = 0;
- pg_cache->populated_pages = 0;
- assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
- init_metrics_index(ctx);
- init_replaceQ(ctx);
- init_commited_page_index(ctx);
- }
|