123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941 |
- #include "metric.h"
- typedef int32_t REFCOUNT;
- #define REFCOUNT_DELETING (-100)
- typedef enum __attribute__ ((__packed__)) {
- METRIC_FLAG_HAS_RETENTION = (1 << 0),
- } METRIC_FLAGS;
- struct metric {
- uuid_t uuid; // never changes
- Word_t section; // never changes
- time_t first_time_s; //
- time_t latest_time_s_clean; // archived pages latest time
- time_t latest_time_s_hot; // latest time of the currently collected page
- uint32_t latest_update_every_s; //
- pid_t writer;
- uint8_t partition;
- METRIC_FLAGS flags;
- REFCOUNT refcount;
- SPINLOCK spinlock; // protects all variable members
- // THIS IS allocated with malloc()
- // YOU HAVE TO INITIALIZE IT YOURSELF !
- };
- static struct aral_statistics mrg_aral_statistics;
- struct mrg {
- size_t partitions;
- struct mrg_partition {
- ARAL *aral; // not protected by our spinlock - it has its own
- RW_SPINLOCK rw_spinlock;
- Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers)
- struct mrg_statistics stats;
- } index[];
- };
- static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) {
- mrg->index[partition].stats.additions_duplicate++;
- }
- static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
- mrg->index[partition].stats.entries++;
- mrg->index[partition].stats.additions++;
- mrg->index[partition].stats.size += sizeof(METRIC);
- }
- static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
- mrg->index[partition].stats.entries--;
- mrg->index[partition].stats.size -= sizeof(METRIC);
- mrg->index[partition].stats.deletions++;
- }
- static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) {
- __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED);
- }
- static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) {
- __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED);
- }
- static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
- mrg->index[partition].stats.delete_misses++;
- }
- #define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock)
- #define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock)
- #define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
- #define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
- #define metric_lock(metric) spinlock_lock(&(metric)->spinlock)
- #define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock)
- static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
- if(mem_after_judyl > mem_before_judyl)
- __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
- else if(mem_after_judyl < mem_before_judyl)
- __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
- }
- static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) {
- __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
- }
- static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) {
- __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
- }
- static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
- uint8_t *u = (uint8_t *)uuid;
- size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
- return *n % mrg->partitions;
- }
- static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
- size_t partition = metric->partition;
- bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0);
- if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
- metric->flags |= METRIC_FLAG_HAS_RETENTION;
- __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
- }
- else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
- metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
- __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
- }
- return has_retention;
- }
- static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
- size_t partition = metric->partition;
- REFCOUNT refcount;
- if(!having_spinlock)
- metric_lock(metric);
- if(unlikely(metric->refcount < 0))
- fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
- refcount = ++metric->refcount;
- // update its retention flags
- metric_has_retention_unsafe(mrg, metric);
- if(!having_spinlock)
- metric_unlock(metric);
- if(refcount == 1)
- __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return refcount;
- }
- static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
- bool ret = true;
- size_t partition = metric->partition;
- REFCOUNT refcount;
- metric_lock(metric);
- if(unlikely(metric->refcount <= 0))
- fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
- refcount = --metric->refcount;
- if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
- ret = false;
- metric_unlock(metric);
- if(unlikely(!refcount))
- __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return ret;
- }
- static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
- size_t partition = uuid_partition(mrg, entry->uuid);
- METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
- mrg_index_write_lock(mrg, partition);
- size_t mem_before_judyl, mem_after_judyl;
- Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
- fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
- if(unlikely(!*sections_judy_pptr))
- mrg_stats_size_judyhs_added_uuid(mrg, partition);
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
- if(unlikely(!PValue || PValue == PJERR))
- fatal("DBENGINE METRIC: corrupted section JudyL array");
- if(unlikely(*PValue != NULL)) {
- METRIC *metric = *PValue;
- metric_acquire(mrg, metric, false);
- MRG_STATS_DUPLICATE_ADD(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- if(ret)
- *ret = false;
- aral_freez(mrg->index[partition].aral, allocation);
- return metric;
- }
- METRIC *metric = allocation;
- uuid_copy(metric->uuid, *entry->uuid);
- metric->section = entry->section;
- metric->first_time_s = MAX(0, entry->first_time_s);
- metric->latest_time_s_clean = MAX(0, entry->last_time_s);
- metric->latest_time_s_hot = 0;
- metric->latest_update_every_s = entry->latest_update_every_s;
- metric->writer = 0;
- metric->refcount = 0;
- metric->flags = 0;
- metric->partition = partition;
- spinlock_init(&metric->spinlock);
- metric_acquire(mrg, metric, true); // no spinlock use required here
- *PValue = metric;
- MRG_STATS_ADDED_METRIC(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- if(ret)
- *ret = true;
- return metric;
- }
- static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
- size_t partition = uuid_partition(mrg, uuid);
- mrg_index_read_lock(mrg, partition);
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
- Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
- if(unlikely(!PValue)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
- METRIC *metric = *PValue;
- metric_acquire(mrg, metric, false);
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_HIT(mrg, partition);
- return metric;
- }
- static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = metric->partition;
- size_t mem_before_judyl, mem_after_judyl;
- mrg_index_write_lock(mrg, partition);
- if(!metric_release_and_can_be_deleted(mrg, metric)) {
- mrg->index[partition].stats.delete_having_retention_or_referenced++;
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
- if(unlikely(!rc)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
- if(!*sections_judy_pptr) {
- rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!rc))
- fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
- mrg_stats_size_judyhs_removed_uuid(mrg, partition);
- }
- MRG_STATS_DELETED_METRIC(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- aral_freez(mrg->index[partition].aral, metric);
- return true;
- }
- // ----------------------------------------------------------------------------
- // public API
- inline MRG *mrg_create(ssize_t partitions) {
- if(partitions < 1)
- partitions = get_netdata_cpus();
- MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions);
- mrg->partitions = partitions;
- for(size_t i = 0; i < mrg->partitions ; i++) {
- rw_spinlock_init(&mrg->index[i].rw_spinlock);
- char buf[ARAL_MAX_NAME + 1];
- snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
- mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false);
- }
- return mrg;
- }
- inline size_t mrg_aral_structures(void) {
- return aral_structures_from_stats(&mrg_aral_statistics);
- }
- inline size_t mrg_aral_overhead(void) {
- return aral_overhead_from_stats(&mrg_aral_statistics);
- }
- inline void mrg_destroy(MRG *mrg __maybe_unused) {
- // no destruction possible
- // we can't traverse the metrics list
- // to delete entries, the caller needs to keep pointers to them
- // and delete them one by one
- ;
- }
- inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
- // internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
- // "DBENGINE METRIC: metric latest time is in the future");
- return metric_add_and_acquire(mrg, &entry, ret);
- }
- inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
- return metric_get_and_acquire(mrg, uuid, section);
- }
- inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
- return acquired_metric_del(mrg, metric);
- }
- inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
- metric_acquire(mrg, metric, false);
- return metric;
- }
- inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
- return metric_release_and_can_be_deleted(mrg, metric);
- }
- inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
- return (Word_t)metric;
- }
- inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
- return &metric->uuid;
- }
- inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
- return metric->section;
- }
- inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
- internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(unlikely(first_time_s < 0))
- return false;
- metric_lock(metric);
- metric->first_time_s = first_time_s;
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
- }
- inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
- internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
- "DBENGINE METRIC: timestamp is negative");
- internal_fatal(first_time_s > max_acceptable_collected_time(),
- "DBENGINE METRIC: metric first time is in the future");
- internal_fatal(last_time_s > max_acceptable_collected_time(),
- "DBENGINE METRIC: metric last time is in the future");
- if(unlikely(first_time_s < 0))
- first_time_s = 0;
- if(unlikely(last_time_s < 0))
- last_time_s = 0;
- if(unlikely(update_every_s < 0))
- update_every_s = 0;
- if(unlikely(!first_time_s && !last_time_s && !update_every_s))
- return;
- metric_lock(metric);
- if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
- metric->first_time_s = first_time_s;
- if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) {
- metric->latest_time_s_clean = last_time_s;
- if(likely(update_every_s))
- metric->latest_update_every_s = (uint32_t) update_every_s;
- }
- else if(unlikely(!metric->latest_update_every_s && update_every_s))
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- }
- inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
- internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
- bool ret = false;
- metric_lock(metric);
- if(first_time_s > metric->first_time_s) {
- metric->first_time_s = first_time_s;
- ret = true;
- }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return ret;
- }
- inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t first_time_s;
- metric_lock(metric);
- if(unlikely(!metric->first_time_s)) {
- if(metric->latest_time_s_clean)
- metric->first_time_s = metric->latest_time_s_clean;
- else if(metric->latest_time_s_hot)
- metric->first_time_s = metric->latest_time_s_hot;
- }
- first_time_s = metric->first_time_s;
- metric_unlock(metric);
- return first_time_s;
- }
- inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
- metric_lock(metric);
- if(unlikely(!metric->first_time_s)) {
- if(metric->latest_time_s_clean)
- metric->first_time_s = metric->latest_time_s_clean;
- else if(metric->latest_time_s_hot)
- metric->first_time_s = metric->latest_time_s_hot;
- }
- *first_time_s = metric->first_time_s;
- *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- *update_every_s = metric->latest_update_every_s;
- metric_unlock(metric);
- }
- inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
- internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(unlikely(latest_time_s < 0))
- return false;
- metric_lock(metric);
- // internal_fatal(latest_time_s > max_acceptable_collected_time(),
- // "DBENGINE METRIC: metric latest time is in the future");
- // internal_fatal(metric->latest_time_s_clean > latest_time_s,
- // "DBENGINE METRIC: metric new clean latest time is older than the previous one");
- metric->latest_time_s_clean = latest_time_s;
- if(unlikely(!metric->first_time_s))
- metric->first_time_s = latest_time_s;
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
- }
- // returns true when metric still has retention
- inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
- Word_t section = mrg_metric_section(mrg, metric);
- bool do_again = false;
- size_t countdown = 5;
- bool ret = true;
- do {
- time_t min_first_time_s = LONG_MAX;
- time_t max_end_time_s = 0;
- PGC_PAGE *page;
- PGC_SEARCH method = PGC_SEARCH_FIRST;
- time_t page_first_time_s = 0;
- time_t page_end_time_s = 0;
- while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) {
- method = PGC_SEARCH_NEXT;
- bool is_hot = pgc_is_page_hot(page);
- bool is_dirty = pgc_is_page_dirty(page);
- page_first_time_s = pgc_page_start_time_s(page);
- page_end_time_s = pgc_page_end_time_s(page);
- if ((is_hot || is_dirty) && page_first_time_s > 0 && page_first_time_s < min_first_time_s)
- min_first_time_s = page_first_time_s;
- if (is_dirty && page_end_time_s > max_end_time_s)
- max_end_time_s = page_end_time_s;
- pgc_page_release(main_cache, page);
- }
- if (min_first_time_s == LONG_MAX)
- min_first_time_s = 0;
- metric_lock(metric);
- if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
- do_again = true;
- else {
- internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention");
- do_again = false;
- metric->first_time_s = min_first_time_s;
- metric->latest_time_s_clean = max_end_time_s;
- ret = metric_has_retention_unsafe(mrg, metric);
- }
- metric_unlock(metric);
- } while(do_again);
- return ret;
- }
- inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
- internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
- // internal_fatal(latest_time_s > max_acceptable_collected_time(),
- // "DBENGINE METRIC: metric latest time is in the future");
- if(unlikely(latest_time_s < 0))
- return false;
- metric_lock(metric);
- metric->latest_time_s_hot = latest_time_s;
- if(unlikely(!metric->first_time_s))
- metric->first_time_s = latest_time_s;
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
- }
- inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t max;
- metric_lock(metric);
- max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- metric_unlock(metric);
- return max;
- }
- inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(update_every_s <= 0)
- return false;
- metric_lock(metric);
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_unlock(metric);
- return true;
- }
- inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(update_every_s <= 0)
- return false;
- metric_lock(metric);
- if(!metric->latest_update_every_s)
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_unlock(metric);
- return true;
- }
- inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t update_every_s;
- metric_lock(metric);
- update_every_s = metric->latest_update_every_s;
- metric_unlock(metric);
- return update_every_s;
- }
- inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
- bool done = false;
- metric_lock(metric);
- if(!metric->writer) {
- metric->writer = gettid();
- __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
- done = true;
- }
- else
- __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
- metric_unlock(metric);
- return done;
- }
- inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
- bool done = false;
- metric_lock(metric);
- if(metric->writer) {
- metric->writer = 0;
- __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
- done = true;
- }
- metric_unlock(metric);
- return done;
- }
- inline void mrg_update_metric_retention_and_granularity_by_uuid(
- MRG *mrg, Word_t section, uuid_t *uuid,
- time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
- {
- if(unlikely(last_time_s > now_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
- "fixing last time to now",
- first_time_s, last_time_s, now_s);
- last_time_s = now_s;
- }
- if (unlikely(first_time_s > last_time_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
- "fixing first time to last time",
- first_time_s, last_time_s, now_s);
- first_time_s = last_time_s;
- }
- if (unlikely(first_time_s == 0 || last_time_s == 0)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
- "using them as-is",
- first_time_s, last_time_s, now_s);
- }
- bool added = false;
- METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section);
- if (!metric) {
- MRG_ENTRY entry = {
- .uuid = uuid,
- .section = section,
- .first_time_s = first_time_s,
- .last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
- };
- metric = mrg_metric_add_and_acquire(mrg, entry, &added);
- }
- if (likely(!added))
- mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
- mrg_metric_release(mrg, metric);
- }
- inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
- memset(s, 0, sizeof(struct mrg_statistics));
- for(size_t i = 0; i < mrg->partitions ;i++) {
- s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
- s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
- s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED);
- s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
- s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
- s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
- s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED);
- s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED);
- s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED);
- s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED);
- s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED);
- s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED);
- s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED);
- s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED);
- }
- s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions;
- }
- // ----------------------------------------------------------------------------
- // unit test
- struct mrg_stress_entry {
- uuid_t uuid;
- time_t after;
- time_t before;
- };
- struct mrg_stress {
- MRG *mrg;
- bool stop;
- size_t entries;
- struct mrg_stress_entry *array;
- size_t updates;
- };
- static void *mrg_stress(void *ptr) {
- struct mrg_stress *t = ptr;
- MRG *mrg = t->mrg;
- ssize_t start = 0;
- ssize_t end = (ssize_t)t->entries;
- ssize_t step = 1;
- if(gettid() % 2) {
- start = (ssize_t)t->entries - 1;
- end = -1;
- step = -1;
- }
- while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) {
- for (ssize_t i = start; i != end; i += step) {
- struct mrg_stress_entry *e = &t->array[i];
- time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED);
- time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED);
- mrg_update_metric_retention_and_granularity_by_uuid(
- mrg, 0x01,
- &e->uuid,
- after,
- before,
- 1,
- before);
- __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED);
- }
- }
- return ptr;
- }
- int mrg_unittest(void) {
- MRG *mrg = mrg_create(0);
- METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
- METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
- bool ret;
- uuid_t test_uuid;
- uuid_generate(test_uuid);
- MRG_ENTRY entry = {
- .uuid = &test_uuid,
- .section = 0,
- .first_time_s = 2,
- .last_time_s = 3,
- .latest_update_every_s = 4,
- };
- m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
- if(!ret)
- fatal("DBENGINE METRIC: failed to add metric");
- // add the same metric again
- m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
- if(m2_t0 != m1_t0)
- fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
- if(ret)
- fatal("DBENGINE METRIC: managed to add the same metric twice");
- m3_t0 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section);
- if(m3_t0 != m1_t0)
- fatal("DBENGINE METRIC: cannot find the metric added");
- // add the same metric again
- m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
- if(m4_t0 != m1_t0)
- fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
- if(ret)
- fatal("DBENGINE METRIC: managed to add the same metric twice");
- // add the same metric in another section
- entry.section = 1;
- m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
- if(!ret)
- fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section);
- // add the same metric again
- m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
- if(m2_t1 != m1_t1)
- fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section);
- if(ret)
- fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)");
- m3_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section);
- if(m3_t1 != m1_t1)
- fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section);
- // delete the first metric
- mrg_metric_release(mrg, m2_t0);
- mrg_metric_release(mrg, m3_t0);
- mrg_metric_release(mrg, m4_t0);
- mrg_metric_set_first_time_s(mrg, m1_t0, 0);
- mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0);
- mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0);
- if(!mrg_metric_release_and_delete(mrg, m1_t0))
- fatal("DBENGINE METRIC: cannot delete the first metric");
- m4_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section);
- if(m4_t1 != m1_t1)
- fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section);
- // delete the second metric
- mrg_metric_release(mrg, m2_t1);
- mrg_metric_release(mrg, m3_t1);
- mrg_metric_release(mrg, m4_t1);
- mrg_metric_set_first_time_s(mrg, m1_t1, 0);
- mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0);
- mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0);
- if(!mrg_metric_release_and_delete(mrg, m1_t1))
- fatal("DBENGINE METRIC: cannot delete the second metric");
- struct mrg_statistics s;
- mrg_get_statistics(mrg, &s);
- if(s.entries != 0)
- fatal("DBENGINE METRIC: invalid entries counter");
- size_t entries = 1000000;
- size_t threads = mrg->partitions / 3 + 1;
- size_t tiers = 3;
- size_t run_for_secs = 5;
- netdata_log_info("preparing stress test of %zu entries...", entries);
- struct mrg_stress t = {
- .mrg = mrg,
- .entries = entries,
- .array = callocz(entries, sizeof(struct mrg_stress_entry)),
- };
- time_t now = max_acceptable_collected_time();
- for(size_t i = 0; i < entries ;i++) {
- uuid_generate_random(t.array[i].uuid);
- t.array[i].after = now / 3;
- t.array[i].before = now / 2;
- }
- netdata_log_info("stress test is populating MRG with 3 tiers...");
- for(size_t i = 0; i < entries ;i++) {
- struct mrg_stress_entry *e = &t.array[i];
- for(size_t tier = 1; tier <= tiers ;tier++) {
- mrg_update_metric_retention_and_granularity_by_uuid(
- mrg, tier,
- &e->uuid,
- e->after,
- e->before,
- 1,
- e->before);
- }
- }
- netdata_log_info("stress test ready to run...");
- usec_t started_ut = now_monotonic_usec();
- pthread_t th[threads];
- for(size_t i = 0; i < threads ; i++) {
- char buf[15 + 1];
- snprintfz(buf, 15, "TH[%zu]", i);
- netdata_thread_create(&th[i], buf,
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress, &t);
- }
- sleep_usec(run_for_secs * USEC_PER_SEC);
- __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED);
- for(size_t i = 0; i < threads ; i++)
- netdata_thread_cancel(th[i]);
- for(size_t i = 0; i < threads ; i++)
- netdata_thread_join(th[i], NULL);
- usec_t ended_ut = now_monotonic_usec();
- struct mrg_statistics stats;
- mrg_get_statistics(mrg, &stats);
- netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
- "%zu deletions, %zu wrong deletions, "
- "%zu successful searches, %zu wrong searches, "
- "in %"PRIu64" usecs",
- stats.additions, stats.additions_duplicate,
- stats.deletions, stats.delete_misses,
- stats.search_hits, stats.search_misses,
- ended_ut - started_ut);
- netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread",
- (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0,
- (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads);
- mrg_destroy(mrg);
- netdata_log_info("DBENGINE METRIC: all tests passed!");
- return 0;
- }
|