Browse Source

Rrddim acquire on replay set (#13932)

* prevent RRDDIM from vanishing while replay is working with it

* set chart last access time

* set chart last access time everytime someone finds it

* do not replay dimensions that are archived

* remove the obsolete flag from dimensions that replayed; do not process archived dimensions

* cleanup db_metric_handle refcount of hidden dimensions

* more information in page alignment fatal

* do not fatal() of page alignment reset when the caller is the only writer
Costa Tsaousis 2 years ago
parent
commit
c2b8b9a807

+ 28 - 12
collectors/plugins.d/pluginsd_parser.c

@@ -56,14 +56,15 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user, PLUGINSD_ACTI
         debug(D_PLUGINSD, "is setting dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value ? value : "<nothing>");
 
     if (value) {
-        RRDDIM *rd = rrddim_find(st, dimension);
+        RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+        RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
         if (unlikely(!rd)) {
-            error(
-                "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
-                dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
+            error( "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
+                    dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
             goto disable;
-        } else
-            rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+        }
+        rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+        rrddim_acquired_release(rda);
     }
     return PARSER_RC_OK;
 
@@ -1024,19 +1025,28 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGIN
         debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str);
 
     if (likely(value_str)) {
-        RRDDIM *rd = rrddim_find(st, dimension);
+        RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+        RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
         if(unlikely(!rd)) {
             error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
                   dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
             goto disable;
         }
-        else {
+
+        RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
+
+        if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) {
+            error("Dimension %s in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
+            rrddim_isnot_obsolete(st, rd);
+        }
+
+        if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
             NETDATA_DOUBLE value = strtondd(value_str, NULL);
             SN_FLAGS flags = SN_FLAG_NONE;
 
             char c;
-            while((c = *flags_str++)) {
-                switch(c) {
+            while ((c = *flags_str++)) {
+                switch (c) {
                     case 'R':
                         flags |= SN_FLAG_RESET;
                         break;
@@ -1052,7 +1062,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGIN
                 }
             }
 
-            if(!netdata_double_isnumber(value)) {
+            if (!netdata_double_isnumber(value)) {
                 value = NAN;
                 flags = SN_EMPTY_SLOT;
             }
@@ -1062,6 +1072,10 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGIN
             rd->last_collected_time.tv_usec = 0;
             rd->collections_counter++;
         }
+        else
+            error("Dimension %s in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st));
+
+        rrddim_acquired_release(rda);
     }
     return PARSER_RC_OK;
 
@@ -1093,7 +1107,8 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
         goto disable;
     }
 
-    RRDDIM *rd = rrddim_find(st, dimension);
+    RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+    RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
     if(unlikely(!rd)) {
         error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
               dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
@@ -1110,6 +1125,7 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
     rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
     rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
     rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
+    rrddim_acquired_release(rda);
     return PARSER_RC_OK;
 
 disable:

+ 5 - 6
database/engine/rrdengineapi.c

@@ -134,13 +134,12 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *
         __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
 
         if(pa) {
-            if(page_index->alignment && page_index->alignment != pa)
-                fatal("DBENGINE: page_index has a different alignment.");
+            if(page_index->alignment && page_index->alignment != pa && page_index->writers > 0)
+                fatal("DBENGINE: page_index has a different alignment (page_index refcount is %u, writers is %u).",
+                        page_index->refcount, page_index->writers);
 
-            if(!page_index->alignment) {
-                page_index->alignment = pa;
-                __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
-            }
+            page_index->alignment = pa;
+            __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
         }
     }
 

+ 3 - 0
database/rrd.h

@@ -1280,6 +1280,9 @@ int rrddim_set_multiplier(RRDSET *st, RRDDIM *rd, collected_number multiplier);
 int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor);
 
 RRDDIM *rrddim_find(RRDSET *st, const char *id);
+RRDDIM_ACQUIRED *rrddim_find_and_acquire(RRDSET *st, const char *id);
+RRDDIM *rrddim_acquired_to_rrddim(RRDDIM_ACQUIRED *rda);
+void rrddim_acquired_release(RRDDIM_ACQUIRED *rda);
 RRDDIM *rrddim_find_active(RRDSET *st, const char *id);
 
 int rrddim_hide(RRDSET *st, const char *id);

+ 5 - 2
database/rrdcontext.c

@@ -2406,7 +2406,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
         tier_retention[tier].eng = eng;
         tier_retention[tier].db_update_every = (time_t) (qtl->host->db[tier].tier_grouping * ri->update_every);
 
-        if(rm->rrddim && rm->rrddim->tiers[tier]->db_metric_handle)
+        if(rm->rrddim && rm->rrddim->tiers[tier] && rm->rrddim->tiers[tier]->db_metric_handle)
             tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier]->db_metric_handle);
         else
             tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid, NULL);
@@ -2439,6 +2439,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
         }
     }
 
+    bool release_retention = true;
     bool timeframe_matches =
             (tiers_added
             && (common_first_time_t - common_update_every * 2) <= qt->window.before
@@ -2521,11 +2522,13 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
                 qm->tiers[tier].db_last_time_t = tier_retention[tier].db_last_time_t;
                 qm->tiers[tier].db_update_every = tier_retention[tier].db_update_every;
             }
+            release_retention = false;
         }
     }
-    else {
+    else
         qtl->metrics_skipped_due_to_not_matching_timeframe++;
 
+    if(release_retention) {
         // cleanup anything we allocated to the retention we will not use
         for(size_t tier = 0; tier < storage_tiers ;tier++) {
             if (tier_retention[tier].db_metric_handle)

+ 38 - 22
database/rrddim.c

@@ -180,24 +180,21 @@ static void rrddim_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
 
     debug(D_RRD_CALLS, "rrddim_free() %s.%s", rrdset_name(st), rrddim_name(rd));
 
-    if (!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
-
-        size_t tiers_available = 0, tiers_said_yes = 0;
-        for(size_t tier = 0; tier < storage_tiers ;tier++) {
-            if(rd->tiers[tier]) {
-                tiers_available++;
+    size_t tiers_available = 0, tiers_said_yes = 0;
+    for(size_t tier = 0; tier < storage_tiers ;tier++) {
+        if(rd->tiers[tier] && rd->tiers[tier]->db_collection_handle) {
+            tiers_available++;
 
-                if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle))
-                    tiers_said_yes++;
+            if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle))
+                tiers_said_yes++;
 
-                rd->tiers[tier]->db_collection_handle = NULL;
-            }
+            rd->tiers[tier]->db_collection_handle = NULL;
         }
+    }
 
-        if (tiers_available == tiers_said_yes && tiers_said_yes && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
-            /* This metric has no data and no references */
-            metaqueue_delete_dimension_uuid(&rd->metric_uuid);
-        }
+    if (tiers_available == tiers_said_yes && tiers_said_yes && rd->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+        /* This metric has no data and no references */
+        metaqueue_delete_dimension_uuid(&rd->metric_uuid);
     }
 
     rrddimvar_delete_all(rd);
@@ -246,16 +243,14 @@ static bool rrddim_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
     rc += rrddim_set_multiplier(st, rd, ctr->multiplier);
     rc += rrddim_set_divisor(st, rd, ctr->divisor);
 
-    if(rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
-
-        for(size_t tier = 0; tier < storage_tiers ;tier++) {
-            if (rd->tiers[tier])
-                rd->tiers[tier]->db_collection_handle =
-                    rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every);
-        }
+    for(size_t tier = 0; tier < storage_tiers ;tier++) {
+        if (rd->tiers[tier] && !rd->tiers[tier]->db_collection_handle)
+            rd->tiers[tier]->db_collection_handle =
+                rd->tiers[tier]->collect_ops->init(rd->tiers[tier]->db_metric_handle, st->rrdhost->db[tier].tier_grouping * st->update_every);
+    }
 
+    if(rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
         rrddim_flag_clear(rd, RRDDIM_FLAG_ARCHIVED);
-
         if(!rrdset_is_ar_chart(st)) {
             rrddim_flag_set(rd, RRDDIM_FLAG_PENDING_HEALTH_INITIALIZATION);
             rrdset_flag_set(rd->rrdset, RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION);
@@ -318,6 +313,27 @@ inline RRDDIM *rrddim_find(RRDSET *st, const char *id) {
     return rrddim_index_find(st, id);
 }
 
+inline RRDDIM_ACQUIRED *rrddim_find_and_acquire(RRDSET *st, const char *id) {
+    debug(D_RRD_CALLS, "rrddim_find() for chart %s, dimension %s", rrdset_name(st), id);
+
+    return (RRDDIM_ACQUIRED *)dictionary_get_and_acquire_item(st->rrddim_root_index, id);
+}
+
+RRDDIM *rrddim_acquired_to_rrddim(RRDDIM_ACQUIRED *rda) {
+    if(unlikely(!rda))
+        return NULL;
+
+    return (RRDDIM *) dictionary_acquired_item_value((const DICTIONARY_ITEM *)rda);
+}
+
+void rrddim_acquired_release(RRDDIM_ACQUIRED *rda) {
+    if(unlikely(!rda))
+        return;
+
+    RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+    dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, (const DICTIONARY_ITEM *)rda);
+}
+
 // This will not return dimensions that are archived
 RRDDIM *rrddim_find_active(RRDSET *st, const char *id) {
     RRDDIM *rd = rrddim_find(st, id);

+ 6 - 0
database/rrdset.c

@@ -353,6 +353,8 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo
     RRDSET *st = rrdset;
     RRDHOST *host = st->rrdhost;
 
+    st->last_accessed_time = now_realtime_sec();
+
     if((host->health_enabled && (ctr->react_action & (RRDSET_REACT_NEW | RRDSET_REACT_CHART_ACTIVATED))) && !rrdset_is_ar_chart(st)) {
         rrdset_flag_set(st, RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION);
         rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION);
@@ -420,6 +422,10 @@ static RRDSET *rrdset_index_find(RRDHOST *host, const char *id) {
 inline RRDSET *rrdset_find(RRDHOST *host, const char *id) {
     debug(D_RRD_CALLS, "rrdset_find() for chart '%s' in host '%s'", id, rrdhost_hostname(host));
     RRDSET *st = rrdset_index_find(host, id);
+
+    if(st)
+        st->last_accessed_time = now_realtime_sec();
+
     return(st);
 }