Browse Source

replication improvements (#13989)

contexts are not set to collected until replication finishes for them; sender thread disables cancelability while replication queries are executed;
Costa Tsaousis 2 years ago
parent
commit
cbebc18ca3

+ 2 - 0
collectors/plugins.d/pluginsd_parser.c

@@ -1144,6 +1144,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
         return PARSER_RC_OK;
     }
 
+    rrdcontext_updated_retention_rrdset(st);
+
     bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child,
                                       first_entry_requested, last_entry_requested);
     return ok ? PARSER_RC_OK : PARSER_RC_ERROR;

+ 27 - 7
database/rrdcontext.c

@@ -63,9 +63,7 @@ typedef enum {
     RRD_FLAG_UPDATE_REASON_DB_ROTATION             = (1 << 28), // this context changed because of a db rotation
     RRD_FLAG_UPDATE_REASON_UNUSED                  = (1 << 29), // this context is not used anymore
     RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS           = (1 << 30), // this context is not used anymore
-
-    // DO NOT ADD (1 << 31) or bigger!
-    // runtime error: left shift of 1 by 31 places cannot be represented in type 'int'
+    RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION       = (1 << 31), // this object has updated retention
 } RRD_FLAGS;
 
 #define RRD_FLAG_ALL_UPDATE_REASONS                   ( \
@@ -229,6 +227,7 @@ static struct rrdcontext_reason {
     { RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD,      "child disconnected",   65 * USEC_PER_SEC },
     { RRD_FLAG_UPDATE_REASON_DB_ROTATION,             "db rotation",          65 * USEC_PER_SEC },
     { RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS,           "changed flags",        65 * USEC_PER_SEC },
+    { RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION,       "updated retention",    65 * USEC_PER_SEC },
 
     // terminator
     { 0, NULL, 0 },
@@ -1097,6 +1096,14 @@ static inline void rrdinstance_rrdset_is_freed(RRDSET *st) {
     st->rrdcontext = NULL;
 }
 
+static inline void rrdinstance_rrdset_has_updated_retention(RRDSET *st) {
+    RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
+    if(unlikely(!ri)) return;
+
+    rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION);
+    rrdinstance_trigger_updates(ri, __FUNCTION__ );
+}
+
 static inline void rrdinstance_updated_rrdset_name(RRDSET *st) {
     // the chart may not be initialized when this is called
     if(unlikely(!st->rrdinstance)) return;
@@ -1467,6 +1474,10 @@ void rrdcontext_removed_rrdset(RRDSET *st) {
     rrdinstance_rrdset_is_freed(st);
 }
 
+void rrdcontext_updated_retention_rrdset(RRDSET *st) {
+    rrdinstance_rrdset_has_updated_retention(st);
+}
+
 void rrdcontext_updated_rrdset_name(RRDSET *st) {
     rrdinstance_updated_rrdset_name(st);
 }
@@ -2743,9 +2754,10 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
     QUERY_TARGET *qt = &thread_query_target;
 
     if(qt->used)
-        fatal("QUERY TARGET: this query target is already used.");
+        fatal("QUERY TARGET: this query target is already used (%zu queries made with this QUERY_TARGET so far).", qt->queries);
 
     qt->used = true;
+    qt->queries++;
 
     // copy the request into query_thread_target
     qt->request = *qtr;
@@ -3248,7 +3260,7 @@ static void rrdmetric_process_updates(RRDMETRIC *rm, bool force, RRD_FLAGS reaso
     if(reason != RRD_FLAG_NONE)
         rrd_flag_set_updated(rm, reason);
 
-    if(!force && !rrd_flag_is_updated(rm) && rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION))
+    if(!force && !rrd_flag_is_updated(rm) && rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION) && !rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION))
         return;
 
     if(worker_jobs)
@@ -3282,7 +3294,11 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
         dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
             if(unlikely(netdata_exit)) break;
 
-            rrdmetric_process_updates(rm, force, reason, worker_jobs);
+            RRD_FLAGS reason_to_pass = reason;
+            if(rrd_flag_check(ri, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION))
+                reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION;
+
+            rrdmetric_process_updates(rm, force, reason_to_pass, worker_jobs);
 
             if(unlikely(!rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)))
                 live_retention = false;
@@ -3385,7 +3401,11 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
         dfe_start_reentrant(rc->rrdinstances, ri) {
             if(unlikely(netdata_exit)) break;
 
-            rrdinstance_post_process_updates(ri, force, reason, worker_jobs);
+            RRD_FLAGS reason_to_pass = reason;
+            if(rrd_flag_check(rc, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION))
+                reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION;
+
+            rrdinstance_post_process_updates(ri, force, reason_to_pass, worker_jobs);
 
             if(unlikely(hidden && !rrd_flag_check(ri, RRD_FLAG_HIDDEN)))
                 hidden = false;

+ 2 - 0
database/rrdcontext.h

@@ -87,6 +87,7 @@ void rrdcontext_updated_rrdset(RRDSET *st);
 void rrdcontext_removed_rrdset(RRDSET *st);
 void rrdcontext_updated_rrdset_name(RRDSET *st);
 void rrdcontext_updated_rrdset_flags(RRDSET *st);
+void rrdcontext_updated_retention_rrdset(RRDSET *st);
 void rrdcontext_collected_rrdset(RRDSET *st);
 int rrdcontext_find_chart_uuid(RRDSET *st, uuid_t *store_uuid);
 
@@ -177,6 +178,7 @@ typedef struct query_target {
     QUERY_TARGET_REQUEST request;
 
     bool used;                              // when true, this query is currently being used
+    size_t queries;                         // how many query we have done so far
 
     struct {
         bool relative;                      // true when the request made with relative timestamps, true if it was absolute

+ 3 - 2
database/rrdset.c

@@ -1097,8 +1097,6 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
 
         store_metric_at_tier(rd, t, sp, point_end_time_ut);
     }
-
-    rrdcontext_collected_rrddim(rd);
 }
 
 // caching of dimensions rrdset_done() and rrdset_done_interpolate() loop through
@@ -1257,6 +1255,7 @@ static inline size_t rrdset_done_interpolate(
             if(unlikely(!store_this_entry)) {
                 (void) ml_is_anomalous(rd, 0, false);
                 rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
+                rrdcontext_collected_rrddim(rd);
                 continue;
             }
 
@@ -1269,6 +1268,7 @@ static inline size_t rrdset_done_interpolate(
                 }
 
                 rrddim_store_metric(rd, next_store_ut, new_value, dim_storage_flags);
+                rrdcontext_collected_rrddim(rd);
                 rd->last_stored_value = new_value;
             }
             else {
@@ -1277,6 +1277,7 @@ static inline size_t rrdset_done_interpolate(
                 rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry);
 
                 rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
+                rrdcontext_collected_rrddim(rd);
                 rd->last_stored_value = NAN;
             }
 

+ 5 - 2
streaming/replication.c

@@ -153,10 +153,13 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     time_t query_after = after;
     time_t query_before = before;
     time_t now = now_realtime_sec();
+    time_t tolerance = 2;   // sometimes from the time we get this value, to the time we check,
+                            // a data collection has been made
+                            // so, we give this tolerance to detect invalid timestamps
 
     // find the first entry we have
     time_t first_entry_local = rrdset_first_entry_t(st);
-    if(first_entry_local > now) {
+    if(first_entry_local > now + tolerance) {
         internal_error(true,
                        "RRDSET: '%s' first time %llu is in the future (now is %llu)",
                        rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
@@ -175,7 +178,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
         last_entry_local = rrdset_last_entry_t(st);
     }
 
-    if(last_entry_local > now) {
+    if(last_entry_local > now + tolerance) {
         internal_error(true,
                        "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
                        rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);

+ 4 - 0
streaming/sender.c

@@ -1132,10 +1132,14 @@ static void process_replication_requests(struct sender_state *s) {
             continue;
         }
 
+        netdata_thread_disable_cancelability();
+
         // send the replication data
         bool start_streaming = replicate_chart_response(st->rrdhost, st,
                 rr->start_streaming, rr->after, rr->before);
 
+        netdata_thread_enable_cancelability();
+
         // enable normal streaming if we have to
         if (start_streaming) {
             debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",