Browse Source

replication fixes #6 (#14046)

use the faster monotonic clock in workers and replication; avoid unecessary statistics function on every request on replication - gather them all together once every second; check the chart flags on all mirrored hosts, not only the ones that have a sender; cleanup and unify replication logs; added child world time to REND; fix first BEGIN been transmitted when replication starts;
Costa Tsaousis 2 years ago
parent
commit
2e874e7916

+ 75 - 52
collectors/plugins.d/pluginsd_parser.c

@@ -151,6 +151,20 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
     if (microseconds_txt && *microseconds_txt)
         microseconds = str2ull(microseconds_txt);
 
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+    if(st->replay.log_next_data_collection) {
+        st->replay.log_next_data_collection = false;
+
+        internal_error(true,
+                       "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
+                       rrdhost_hostname(host), rrdset_id(st),
+                       st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
+                       st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
+                       microseconds
+                       );
+    }
+#endif
+
     if (likely(st->counter_done)) {
         if (likely(microseconds)) {
             if (((PARSER_USER_OBJECT *)user)->trust_durations)
@@ -312,6 +326,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
 {
     const char *first_entry_txt = get_word(words, num_words, 1);
     const char *last_entry_txt = get_word(words, num_words, 2);
+    const char *world_time_txt = get_word(words, num_words, 3);
 
     RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
     if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
@@ -319,22 +334,14 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
     RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
     if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if(unlikely(!first_entry_txt || !last_entry_txt)) {
-        error("PLUGINSD: 'host:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " without first or last entry. Disabling it.",
-              rrdhost_hostname(host));
-        return PLUGINSD_DISABLE_PLUGIN(user);
-    }
+    time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
+    time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
+    time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec();
 
-    long first_entry_child = str2l(first_entry_txt);
-    long last_entry_child = str2l(last_entry_txt);
-
-    internal_error(
-            (first_entry_child != 0 || last_entry_child != 0)
-            && (first_entry_child == 0 || last_entry_child == 0),
-            "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).",
-            rrdhost_hostname(host), rrdset_id(st),
-            (unsigned long long)first_entry_child, (unsigned long long)last_entry_child
-            );
+    if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0))
+        error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).",
+              rrdhost_hostname(host), rrdset_id(st),
+              first_entry_child, last_entry_child, child_world_time);
 
     bool ok = true;
     if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
@@ -350,8 +357,9 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
         rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
 
         PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
-        ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child,
-                                          last_entry_child, 0, 0);
+        ok = replicate_chart_request(send_to_plugin, parser, host, st,
+                                     first_entry_child, last_entry_child, child_world_time,
+                                     0, 0);
     }
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
     else {
@@ -910,7 +918,7 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _
     RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
     if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
     if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
     debug(D_PLUGINSD, "requested to commit chart labels");
@@ -950,28 +958,35 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
     ((PARSER_USER_OBJECT *) user)->st = st;
 
     if(start_time_str && end_time_str) {
-        time_t start_time = strtol(start_time_str, NULL, 0);
-        time_t end_time = strtol(end_time_str, NULL, 0);
+        time_t start_time = (time_t)str2ul(start_time_str);
+        time_t end_time = (time_t)str2ul(end_time_str);
 
         time_t wall_clock_time = 0, tolerance;
         bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
         if(child_now_str) {
-            wall_clock_time = strtol(child_now_str, NULL, 0);
+            wall_clock_time = (time_t)str2ul(child_now_str);
             tolerance = st->update_every + 1;
             wall_clock_comes_from_child = true;
         }
 
         if(wall_clock_time <= 0) {
             wall_clock_time = now_realtime_sec();
-            tolerance = st->update_every + 60;
+            tolerance = st->update_every + 5;
             wall_clock_comes_from_child = false;
         }
 
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
         internal_error(
                 (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
-                "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
+                "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
                 rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
+
+        internal_error(
+                true,
+                "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
+                rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
+                st->replay.after, st->replay.before);
 #endif
 
         if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
@@ -1002,10 +1017,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
             return PARSER_RC_OK;
         }
 
-        internal_error(true,
-                       "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld).",
-                       rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
-                       wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
+        error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+              rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+              wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
     }
 
     // the child sends an RBEGIN without any parameters initially
@@ -1051,7 +1065,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
               dimension,
               ((PARSER_USER_OBJECT *) user)->replay.start_time,
               ((PARSER_USER_OBJECT *) user)->replay.end_time);
-        return PARSER_RC_ERROR;
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     if (unlikely(!value_str || !*value_str))
@@ -1065,15 +1079,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
 
         RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
 
-        if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) {
-            error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the OBSOLETE flag set, but it is collected.",
-                  rrdhost_hostname(st->rrdhost),
-                  rrdset_id(st),
-                  rrddim_id(rd)
-                  );
-            rrddim_isnot_obsolete(st, rd);
-        }
-
         if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
             NETDATA_DOUBLE value = strtondd(value_str, NULL);
             SN_FLAGS flags = SN_FLAG_NONE;
@@ -1106,9 +1111,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
             rd->last_collected_time.tv_usec = 0;
             rd->collections_counter++;
         }
-        else
-            error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is collected. Ignoring data.",
-                  rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+        else {
+            error_limit_static_global_var(erl, 1, 0);
+            error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+                        rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+        }
     }
 
     rrddim_acquired_release(rda);
@@ -1180,18 +1187,29 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
 
 PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
 {
-    if (num_words < 7) {
+    if (num_words < 7) { // accepts 7, but the 7th is optional
         error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
         return PARSER_RC_ERROR;
     }
 
-    time_t update_every_child = str2l(get_word(words, num_words, 1));
-    time_t first_entry_child = (time_t)str2ull(get_word(words, num_words, 2));
-    time_t last_entry_child = (time_t)str2ull(get_word(words, num_words, 3));
+    const char *update_every_child_txt = get_word(words, num_words, 1);
+    const char *first_entry_child_txt = get_word(words, num_words, 2);
+    const char *last_entry_child_txt = get_word(words, num_words, 3);
+    const char *start_streaming_txt = get_word(words, num_words, 4);
+    const char *first_entry_requested_txt = get_word(words, num_words, 5);
+    const char *last_entry_requested_txt = get_word(words, num_words, 6);
+    const char *child_world_time_txt = get_word(words, num_words, 7); // optional
+
+    time_t update_every_child = (time_t)str2ul(update_every_child_txt);
+    time_t first_entry_child = (time_t)str2ul(first_entry_child_txt);
+    time_t last_entry_child = (time_t)str2ul(last_entry_child_txt);
 
-    bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0);
-    time_t first_entry_requested = (time_t)str2ull(get_word(words, num_words, 5));
-    time_t last_entry_requested = (time_t)str2ull(get_word(words, num_words, 6));
+    bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
+    time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt);
+    time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt);
+
+    // the optional child world time
+    time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec();
 
     PARSER_USER_OBJECT *user_object = user;
 
@@ -1201,13 +1219,15 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
     RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
     if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
-#ifdef NETDATATA_LOG_REPLICATION_REQUESTS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
     internal_error(true,
-                   "PLUGINSD: 'host:%s/chart:%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu",
+                   "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
                    rrdhost_hostname(host), rrdset_id(st),
                    (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
                    start_streaming?"true":"false",
-                   (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested);
+                   (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
+                   (unsigned long long)child_world_time
+                   );
 #endif
 
     ((PARSER_USER_OBJECT *) user)->st = NULL;
@@ -1236,6 +1256,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
     st->replay.start_streaming = false;
     st->replay.after = 0;
     st->replay.before = 0;
+    if(start_streaming)
+        st->replay.log_next_data_collection = true;
 #endif
 
     if (start_streaming) {
@@ -1250,7 +1272,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
         }
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
         else
-            internal_error(true, "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
+            internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
                   rrdhost_hostname(host), rrdset_id(st));
 #endif
         worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
@@ -1260,7 +1282,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
 
     rrdcontext_updated_retention_rrdset(st);
 
-    bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child,
+    bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
+                                      first_entry_child, last_entry_child, child_world_time,
                                       first_entry_requested, last_entry_requested);
     return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
 }

+ 2 - 0
database/rrd.h

@@ -673,6 +673,7 @@ struct rrdset {
 
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
     struct {
+        bool log_next_data_collection;
         bool start_streaming;
         time_t after;
         time_t before;
@@ -1078,6 +1079,7 @@ extern RRDHOST *localhost;
 #define rrdhost_sender_replicating_charts_minus_one(host) (__atomic_sub_fetch(&((host)->rrdpush_sender_replicating_charts), 1, __ATOMIC_RELAXED))
 #define rrdhost_sender_replicating_charts_zero(host) (__atomic_store_n(&((host)->rrdpush_sender_replicating_charts), 0, __ATOMIC_RELAXED))
 
+extern DICTIONARY *rrdhost_root_index;
 long rrdhost_hosts_available(void);
 
 // ----------------------------------------------------------------------------

+ 0 - 2
database/sqlite/sqlite_metadata.c

@@ -2,8 +2,6 @@
 
 #include "sqlite_metadata.h"
 
-extern DICTIONARY *rrdhost_root_index;
-
 // SQL statements
 
 #define SQL_STORE_CLAIM_ID  "insert into node_instance " \

+ 4 - 4
libnetdata/worker_utilization/worker_utilization.c

@@ -56,7 +56,7 @@ void worker_register(const char *workname) {
     worker->tag = strdupz(netdata_thread_tag());
     worker->workname = strdupz(workname);
 
-    usec_t now = now_realtime_usec();
+    usec_t now = now_monotonic_usec();
     worker->statistics_last_checkpoint = now;
     worker->last_action_timestamp = now;
     worker->last_action = WORKER_IDLE;
@@ -145,14 +145,14 @@ static inline void worker_is_idle_with_time(usec_t now) {
 void worker_is_idle(void) {
     if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return;
 
-    worker_is_idle_with_time(now_realtime_usec());
+    worker_is_idle_with_time(now_monotonic_usec());
 }
 
 void worker_is_busy(size_t job_id) {
     if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
         return;
 
-    usec_t now = now_realtime_usec();
+    usec_t now = now_monotonic_usec();
 
     if(worker->last_action == WORKER_BUSY)
         worker_is_idle_with_time(now);
@@ -215,7 +215,7 @@ void workers_foreach(const char *workname, void (*callback)(
 
     struct worker *p;
     DOUBLE_LINKED_LIST_FOREACH_FORWARD(base, p, prev, next) {
-        usec_t now = now_realtime_usec();
+        usec_t now = now_monotonic_usec();
 
         // find per job type statistics
         STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES];

+ 279 - 181
streaming/replication.c

@@ -6,7 +6,10 @@
 #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
 
-static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
+// ----------------------------------------------------------------------------
+// sending replication replies
+
+static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
     size_t dimensions = rrdset_number_of_dimensions(st);
 
     struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
@@ -23,7 +26,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
     memset(data, 0, sizeof(data));
 
     if(enable_streaming && st->last_updated.tv_sec > before) {
-        internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu",
+        internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)before,
                        (unsigned long long)st->last_updated.tv_sec
@@ -35,8 +38,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
     {
         RRDDIM *rd;
         rrddim_foreach_read(rd, st) {
-            if (rd_dfe.counter >= dimensions)
+            if (rd_dfe.counter >= dimensions) {
+                internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+                               rrdhost_hostname(st->rrdhost), rrdset_id(st));
                 break;
+            }
 
             if(rd->exposed) {
                 data[rd_dfe.counter].dict = rd_dfe.dict;
@@ -65,7 +71,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
                 data[i].sp = ops->next_metric(&data[i].handle);
 
             internal_error(max_skip <= 0,
-                           "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu",
+                           "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
                             rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
 
             if(data[i].sp.end_time < now)
@@ -81,10 +87,9 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
             }
         }
 
-        time_t wall_clock_time = now_realtime_sec();
-        if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) {
+        if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) {
             internal_error(true,
-                           "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
+                           "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
                             rrdhost_hostname(st->rrdhost), rrdset_id(st),
                            (unsigned long long)min_start_time,
                            (unsigned long long)min_end_time,
@@ -95,7 +100,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
         if(min_end_time < now) {
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
             internal_error(true,
-                           "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
+                           "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
                            rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
 #endif // NETDATA_LOG_REPLICATION_REQUESTS
             break;
@@ -138,14 +143,14 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
         log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
         log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
         internal_error(true,
-                       "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+                       "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
                        (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
     }
     else
         internal_error(true,
-                       "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
+                       "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)after, (unsigned long long)before);
 #endif // NETDATA_LOG_REPLICATION_REQUESTS
@@ -195,7 +200,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     time_t first_entry_local = rrdset_first_entry_t(st);
     if(first_entry_local > now + tolerance) {
         internal_error(true,
-                       "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)",
+                       "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)first_entry_local, (unsigned long long)now);
         first_entry_local = now;
@@ -208,14 +213,20 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     time_t last_entry_local = st->last_updated.tv_sec;
     if(!last_entry_local) {
         internal_error(true,
-                       "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+                       "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st));
         last_entry_local = rrdset_last_entry_t(st);
+        if(!last_entry_local) {
+            internal_error(true,
+                           "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
+                           rrdhost_hostname(st->rrdhost), rrdset_id(st));
+            last_entry_local = now;
+        }
     }
 
     if(last_entry_local > now + tolerance) {
         internal_error(true,
-                       "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+                       "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)last_entry_local, (unsigned long long)now);
         last_entry_local = now;
@@ -240,28 +251,49 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     // and copying the result to the host's buffer in order to avoid
     // holding the host's buffer lock for too long
     BUFFER *wb = sender_start(host->sender);
-    {
-        // pass the original after/before so that the parent knows about
-        // which time range we responded
-        buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
-
-        if(after != 0 && before != 0)
-            before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
-        else {
-            after = 0;
-            before = 0;
-            enable_streaming = true;
-        }
 
-        if(enable_streaming)
-            replicate_chart_collection_state(wb, st);
+    buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
 
-        // end with first/last entries we have, and the first start time and
-        // last end time of the data we sent
-        buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
-                       (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
-                       enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+    if(after != 0 && before != 0)
+        before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
+    else {
+        after = 0;
+        before = 0;
+        enable_streaming = true;
     }
+
+    // get again the world clock time
+    time_t world_clock_time = now_realtime_sec();
+    if(enable_streaming) {
+        if(now < world_clock_time) {
+            // we needed time to execute this request
+            // so, the parent will need to replicate more data
+            enable_streaming = false;
+        }
+        else
+            replicate_chart_collection_state(wb, st);
+    }
+
+    // end with first/last entries we have, and the first start time and
+    // last end time of the data we sent
+    buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
+
+                   // current chart update every
+                   (int)st->update_every
+
+                   // child first db time, child end db time
+                   , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+
+                   // start streaming boolean
+                   , enable_streaming ? "true" : "false"
+
+                   // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
+                   , (unsigned long long)after, (unsigned long long)before
+
+                   // child world clock time
+                   , (unsigned long long)world_clock_time
+                   );
+
     sender_commit(host->sender, wb);
 
     return enable_streaming;
@@ -282,12 +314,14 @@ struct replication_request_details {
     struct {
         time_t first_entry_t;               // the first entry time the child has
         time_t last_entry_t;                // the last entry time the child has
+        time_t world_time_t;                // the current time of the child
     } child_db;
 
     struct {
         time_t first_entry_t;               // the first entry time we have
         time_t last_entry_t;                // the last entry time we have
         bool last_entry_t_adjusted_to_now;  // true, if the last entry time was in the future and we fixed
+        time_t now;                         // the current local world clock time
     } local_db;
 
     struct {
@@ -305,8 +339,6 @@ struct replication_request_details {
         time_t before;                      // the end time of this replication request
         bool start_streaming;               // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
     } wanted;
-
-    time_t now;                             // the current wall clock time
 };
 
 static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
@@ -316,6 +348,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
         st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
 
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
+    st->replay.log_next_data_collection = true;
+
     char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
 
     if(r->wanted.after)
@@ -326,7 +360,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
 
     internal_error(true,
                    "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
-                   "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s"
+                   "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
                    , rrdhost_hostname(r->host), rrdset_id(r->st)
                    , r->wanted.after, wanted_after_buf
                    , r->wanted.before, wanted_before_buf
@@ -334,7 +368,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
                    , msg
                    , r->last_request.after, r->last_request.before
                    , r->child_db.first_entry_t, r->child_db.last_entry_t
-                   , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW"
+                   , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+                   , r->local_db.first_entry_t, r->local_db.last_entry_t
+                   , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
                    , r->gap.from, r->gap.to
                    , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
                    , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -352,7 +388,8 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
 
     int ret = r->caller.callback(buffer, r->caller.data);
     if (ret < 0) {
-        error("REPLICATION: failed to send replication request to child (error %d)", ret);
+        error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+              rrdhost_hostname(r->host), rrdset_id(r->st), ret);
         return false;
     }
 
@@ -360,7 +397,7 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
 }
 
 bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
-                             time_t first_entry_child, time_t last_entry_child,
+                             time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
                              time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
 {
     struct replication_request_details r = {
@@ -375,6 +412,14 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
             .child_db = {
                     .first_entry_t = first_entry_child,
                     .last_entry_t = last_entry_child,
+                    .world_time_t = child_world_time,
+            },
+
+            .local_db = {
+                    .first_entry_t = rrdset_first_entry_t(st),
+                    .last_entry_t = rrdset_last_entry_t(st),
+                    .last_entry_t_adjusted_to_now = false,
+                    .now  = now_realtime_sec(),
             },
 
             .last_request = {
@@ -387,15 +432,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
                     .before = 0,
                     .start_streaming = true,
             },
-
-            .now = now_realtime_sec(),
     };
 
-    // get our local database retention
-    r.local_db.first_entry_t = rrdset_first_entry_t(st);
-    r.local_db.last_entry_t = rrdset_last_entry_t(st);
-    if(r.local_db.last_entry_t > r.now) {
-        r.local_db.last_entry_t = r.now;
+    // check our local database retention
+    if(r.local_db.last_entry_t > r.local_db.now) {
+        r.local_db.last_entry_t = r.local_db.now;
         r.local_db.last_entry_t_adjusted_to_now = true;
     }
 
@@ -408,7 +449,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
             r.gap.from = r.local_db.last_entry_t;
         else
             // we don't have any data, the gap is the max timeframe we are allowed to replicate
-            r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate;
+            r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
 
     }
     else {
@@ -419,7 +460,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
     }
 
     // we want all the data up to now
-    r.gap.to = r.now;
+    r.gap.to = r.local_db.now;
 
     // The gap is now r.gap.from -> r.gap.to
 
@@ -461,8 +502,11 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
     if(r.wanted.before > r.child_db.last_entry_t)
         r.wanted.before = r.child_db.last_entry_t;
 
-    // the child should start streaming immediately if the wanted duration is small
-    r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t);
+    if(r.wanted.after > r.wanted.before)
+        r.wanted.after = r.wanted.before;
+
+    // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child
+    r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
 
     // the wanted timeframe is now r.wanted.after -> r.wanted.before
     // send it
@@ -499,11 +543,12 @@ struct replication_sort_entry {
 static struct replication_thread {
     netdata_mutex_t mutex;
 
+    size_t pending;
     size_t added;
     size_t executed;
     size_t removed;
+    size_t last_executed;
     time_t first_time_t;
-    size_t requests_count;
     Word_t next_unique_id;
     struct replication_request *requests;
 
@@ -516,12 +561,13 @@ static struct replication_thread {
     size_t waits;
 
     Pvoid_t JudyL_array;
-} rep = {
+} replication_globals = {
         .mutex = NETDATA_MUTEX_INITIALIZER,
+        .pending = 0,
         .added = 0,
         .executed = 0,
+        .last_executed = 0,
         .first_time_t = 0,
-        .requests_count = 0,
         .next_unique_id = 1,
         .skipped_no_room = 0,
         .skipped_not_connected = 0,
@@ -535,7 +581,7 @@ static __thread int replication_recursive_mutex_recursions = 0;
 
 static void replication_recursive_lock() {
     if(++replication_recursive_mutex_recursions == 1)
-        netdata_mutex_lock(&rep.mutex);
+        netdata_mutex_lock(&replication_globals.mutex);
 
 #ifdef NETDATA_INTERNAL_CHECKS
     if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
@@ -545,7 +591,7 @@ static void replication_recursive_lock() {
 
 static void replication_recursive_unlock() {
     if(--replication_recursive_mutex_recursions == 0)
-        netdata_mutex_unlock(&rep.mutex);
+        netdata_mutex_unlock(&replication_globals.mutex);
 
 #ifdef NETDATA_INTERNAL_CHECKS
     if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
@@ -563,7 +609,7 @@ static struct replication_sort_entry *replication_sort_entry_create(struct repli
 
     // copy the request
     rse->rq = rq;
-    rse->unique_id = rep.next_unique_id++;
+    rse->unique_id = replication_globals.next_unique_id++;
 
     // save the unique id into the request, to be able to delete it later
     rq->unique_id = rse->unique_id;
@@ -580,29 +626,29 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
 
     struct replication_sort_entry *rse = replication_sort_entry_create(rq);
 
-    if(rq->after < (time_t)rep.last_after) {
+    if(rq->after < (time_t)replication_globals.last_after) {
         // make it find this request first
-        rep.last_after = rq->after;
-        rep.last_unique_id = rq->unique_id;
+        replication_globals.last_after = rq->after;
+        replication_globals.last_unique_id = rq->unique_id;
     }
 
-    rep.added++;
-    rep.requests_count++;
+    replication_globals.added++;
+    replication_globals.pending++;
 
     Pvoid_t *inner_judy_ptr;
 
     // find the outer judy entry, using after as key
-    inner_judy_ptr = JudyLGet(rep.JudyL_array, (Word_t) rq->after, PJE0);
+    inner_judy_ptr = JudyLGet(replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
     if(!inner_judy_ptr)
-        inner_judy_ptr = JudyLIns(&rep.JudyL_array, (Word_t) rq->after, PJE0);
+        inner_judy_ptr = JudyLIns(&replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
 
     // add it to the inner judy, using unique_id as key
     Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
     *item = rse;
     rq->indexed_in_judy = true;
 
-    if(!rep.first_time_t || rq->after < rep.first_time_t)
-        rep.first_time_t = rq->after;
+    if(!replication_globals.first_time_t || rq->after < replication_globals.first_time_t)
+        replication_globals.first_time_t = rq->after;
 
     replication_recursive_unlock();
 
@@ -612,8 +658,8 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
 static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
     bool inner_judy_deleted = false;
 
-    rep.removed++;
-    rep.requests_count--;
+    replication_globals.removed++;
+    replication_globals.pending--;
 
     rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
 
@@ -624,7 +670,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
 
     // if no items left, delete it from the outer judy
     if(**inner_judy_ppptr == NULL) {
-        JudyLDel(&rep.JudyL_array, rse->rq->after, PJE0);
+        JudyLDel(&replication_globals.JudyL_array, rse->rq->after, PJE0);
         inner_judy_deleted = true;
     }
 
@@ -641,7 +687,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
     replication_recursive_lock();
     if(rq->indexed_in_judy) {
 
-        inner_judy_pptr = JudyLGet(rep.JudyL_array, rq->after, PJE0);
+        inner_judy_pptr = JudyLGet(replication_globals.JudyL_array, rq->after, PJE0);
         if (inner_judy_pptr) {
             Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
             if (our_item_pptr) {
@@ -651,7 +697,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
         }
 
         if (!rse_to_delete)
-            fatal("Cannot find sort entry to delete for host '%s', chart '%s', time %ld.",
+            fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
                   rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
 
     }
@@ -674,16 +720,16 @@ static struct replication_request replication_request_get_first_available() {
     struct replication_request rq = (struct replication_request){ .found = false };
 
 
-    if(unlikely(!rep.last_after || !rep.last_unique_id)) {
-        rep.last_after = 0;
-        rep.last_unique_id = 0;
+    if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) {
+        replication_globals.last_after = 0;
+        replication_globals.last_unique_id = 0;
     }
 
     bool find_same_after = true;
-    while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(rep.JudyL_array, &rep.last_after, find_same_after))) {
+    while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
         Pvoid_t *our_item_pptr;
 
-        while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &rep.last_unique_id, PJE0))) {
+        while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
             struct replication_sort_entry *rse = *our_item_pptr;
             struct sender_state *s = rse->rq->sender;
 
@@ -697,7 +743,7 @@ static struct replication_request replication_request_get_first_available() {
                     s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
 
             if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
-                rep.skipped_not_connected++;
+                replication_globals.skipped_not_connected++;
                 if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
                     break;
             }
@@ -714,14 +760,14 @@ static struct replication_request replication_request_get_first_available() {
                     break;
             }
             else
-                rep.skipped_no_room++;
+                replication_globals.skipped_no_room++;
         }
 
         // call JudyLNext from now on
         find_same_after = false;
 
         // prepare for the next iteration on the outer loop
-        rep.last_unique_id = 0;
+        replication_globals.last_unique_id = 0;
     }
 
     replication_recursive_unlock();
@@ -756,64 +802,28 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
     struct replication_request *rq = old_value; (void)rq;
     struct replication_request *rq_new = new_value;
 
-        replication_recursive_lock();
-
-        if(!rq->indexed_in_judy) {
-            replication_sort_entry_add(rq);
-            internal_error(
-                    true,
-                    "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
-                    rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
-                    (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
-                    (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
-        }
-        else
-            internal_error(
-                    true,
-                    "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
-                    rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
-                    (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
-                    (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+    replication_recursive_lock();
 
-        replication_recursive_unlock();
+    if(!rq->indexed_in_judy) {
+        replication_sort_entry_add(rq);
+        internal_error(
+                true,
+                "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+                rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+                (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+                (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+    }
+    else {
+        internal_error(
+                true,
+                "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+                rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
+                dictionary_acquired_item_name(item),
+                (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
+                (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
+    }
 
-//    bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false;
-//
-//    if(rq_new->after < rq->after && rq_new->after != 0)
-//        updated_after = true;
-//
-//    if(rq_new->before > rq->before)
-//        updated_before = true;
-//
-//    if(rq_new->start_streaming != rq->start_streaming)
-//        updated_start_streaming = true;
-//
-//    if(updated_after || updated_before || updated_start_streaming) {
-//        replication_recursive_lock();
-//
-//        if(rq->indexed_in_judy)
-//            replication_sort_entry_del(rq);
-//
-//        if(rq_new->after < rq->after && rq_new->after != 0)
-//            rq->after = rq_new->after;
-//
-//        if(rq->after == 0)
-//            rq->before = 0;
-//        else if(rq_new->before > rq->before)
-//            rq->before = rq_new->before;
-//
-//        rq->start_streaming = rq->start_streaming;
-//        replication_sort_entry_add(rq);
-//
-//        replication_recursive_unlock();
-//        updated = true;
-//
-//        internal_error(
-//                true,
-//                "STREAM %s [send to %s]: REPLAY ERROR: updated duplicate replication command for chart '%s' (from %llu to %llu [%s])",
-//                rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
-//                (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false");
-//    }
+    replication_recursive_unlock();
 
     string_freez(rq_new->chart_id);
     return false;
@@ -880,9 +890,9 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
         percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
         s->replication_reached_max = false;
         replication_recursive_lock();
-        rep.last_after = 0;
-        rep.last_unique_id = 0;
-        rep.sender_resets++;
+        replication_globals.last_after = 0;
+        replication_globals.last_unique_id = 0;
+        replication_globals.sender_resets++;
         replication_recursive_unlock();
     }
 
@@ -916,6 +926,79 @@ static void replication_main_cleanup(void *ptr) {
 #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM        12
 #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS          13
 #define WORKER_JOB_CUSTOM_METRIC_WAITS                  14
+#define WORKER_JOB_CHECK_CONSISTENCY                    15
+
+#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10
+
+static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
+    if(host->sender) {
+        size_t pending_requests = host->sender->replication_pending_requests;
+        size_t dict_entries = dictionary_entries(host->sender->replication_requests);
+
+        internal_error(
+                !pending_requests && dict_entries,
+                "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
+                rrdhost_hostname(host), pending_requests, dict_entries);
+    }
+
+    size_t ok = 0;
+    size_t errors = 0;
+
+    RRDSET *st;
+    rrdset_foreach_read(st, host) {
+        RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+
+        bool is_error = false;
+
+        if(!flags) {
+            internal_error(
+                    true,
+                    "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
+                    rrdhost_hostname(host), rrdset_id(st)
+            );
+            is_error = true;
+        }
+
+        if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+            internal_error(
+                    true,
+                    "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
+                    rrdhost_hostname(host), rrdset_id(st)
+            );
+            is_error = true;
+        }
+
+        if(is_error)
+            errors++;
+        else
+            ok++;
+    }
+    rrdset_foreach_done(st);
+
+    internal_error(errors,
+                   "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
+                   rrdhost_hostname(host), ok, errors);
+
+    return errors;
+}
+
+static void verify_all_hosts_charts_are_streaming_now(void) {
+#ifdef NETDATA_INTERNAL_CHECKS
+    worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
+
+    size_t errors = 0;
+    RRDHOST *host;
+    dfe_start_reentrant(rrdhost_root_index, host)
+        errors += verify_host_charts_are_streaming_now(host);
+    dfe_done(host);
+
+    size_t executed = replication_globals.executed;
+    internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
+    replication_globals.last_executed = executed;
+#else
+    ;
+#endif
+}
 
 void *replication_thread_main(void *ptr __maybe_unused) {
     netdata_thread_cleanup_push(replication_main_cleanup, ptr);
@@ -927,6 +1010,8 @@ void *replication_thread_main(void *ptr __maybe_unused) {
     worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
     worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
     worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming");
+    worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
+    worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
 
     worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
     worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
@@ -937,62 +1022,75 @@ void *replication_thread_main(void *ptr __maybe_unused) {
     worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
     worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
 
+    // start from 100% completed
+    worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
+
     time_t latest_first_time_t = 0;
+    long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
+    usec_t last_now_mono_ut = now_monotonic_usec();
 
     while(!netdata_exit) {
-        worker_is_busy(WORKER_JOB_FIND_NEXT);
-        struct replication_request rq = replication_request_get_first_available();
 
-        worker_is_busy(WORKER_JOB_STATISTICS);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)rep.requests_count);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)rep.added);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)rep.executed);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)rep.skipped_not_connected);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)rep.skipped_no_room);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)rep.sender_resets);
-        worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)rep.waits);
-
-        if(latest_first_time_t) {
-            time_t now = now_realtime_sec();
-            time_t total = now - rep.first_time_t;
-            time_t done = latest_first_time_t - rep.first_time_t;
-            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
-        }
+        // statistics
+        usec_t now_mono_ut = now_monotonic_usec();
+        if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
+            last_now_mono_ut = now_mono_ut;
 
-        if(unlikely(!rq.found)) {
-            worker_is_idle();
+            if(!replication_globals.pending && run_verification_countdown-- == 0) {
+                replication_globals.first_time_t = 0; // reset the statistics about completion percentage
+                verify_all_hosts_charts_are_streaming_now();
+            }
 
-            if(!rep.requests_count)
+            worker_is_busy(WORKER_JOB_STATISTICS);
+
+            if(latest_first_time_t && replication_globals.pending) {
+                // completion percentage statistics
+                time_t now = now_realtime_sec();
+                time_t total = now - replication_globals.first_time_t;
+                time_t done = latest_first_time_t - replication_globals.first_time_t;
+                worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
+                                  (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
+            }
+            else
                 worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
 
-            // make it start from the beginning
-            rep.last_after = 0;
-            rep.last_unique_id = 0;
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.pending);
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.added);
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)replication_globals.executed);
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.skipped_not_connected);
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.skipped_no_room);
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.sender_resets);
+            worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.waits);
+        }
+
+        worker_is_busy(WORKER_JOB_FIND_NEXT);
+        struct replication_request rq = replication_request_get_first_available();
 
-            rep.waits++;
+        if(unlikely(!rq.found)) {
+            // make it scan all the pending requests next time
+            replication_globals.last_after = 0;
+            replication_globals.last_unique_id = 0;
 
-            sleep_usec(1000 * USEC_PER_MS);
+            replication_globals.waits++;
+
+            worker_is_idle();
+            sleep_usec(((replication_globals.pending) ? 10 : 1000) * USEC_PER_MS);
             continue;
         }
-        else {
-            // delete the request from the dictionary
-            worker_is_busy(WORKER_JOB_DELETE_ENTRY);
-            if(!dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)))
-                error("REPLAY: 'host:%s/chart:%s' failed to be deleted from sender dictionary",
-                      rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
-
-            if(rq.sender->replication_pending_requests == 0 && dictionary_entries(rq.sender->replication_requests) != 0)
-                error("REPLAY: 'host:%s/chart:%s' sender dictionary has %zu entries, but sender pending requests are %zu",
-                      rrdhost_hostname(rq.sender->host), string2str(rq.chart_id),
-                      dictionary_entries(rq.sender->replication_requests),
-                      rq.sender->replication_pending_requests);
-        }
+
+        run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
+
+        // delete the request from the dictionary
+        worker_is_busy(WORKER_JOB_DELETE_ENTRY);
+        if(!dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)))
+            error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index",
+                  rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
 
         worker_is_busy(WORKER_JOB_FIND_CHART);
         RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id));
         if(!st) {
-            internal_error(true, "REPLAY ERROR: chart '%s' not found on host '%s'",
-                           string2str(rq.chart_id), rrdhost_hostname(rq.sender->host));
+            internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
+                           rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
 
             continue;
         }
@@ -1010,12 +1108,12 @@ void *replication_thread_main(void *ptr __maybe_unused) {
         netdata_thread_disable_cancelability();
 
         // send the replication data
-        bool start_streaming = replicate_chart_response(st->rrdhost, st,
-                                                        rq.start_streaming, rq.after, rq.before);
+        bool start_streaming = replicate_chart_response(
+                st->rrdhost, st, rq.start_streaming, rq.after, rq.before);
 
         netdata_thread_enable_cancelability();
 
-        rep.executed++;
+        replication_globals.executed++;
 
         if(start_streaming && rq.sender_last_flush_ut == rrdpush_sender_get_flush_time(rq.sender)) {
             worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING);
@@ -1034,8 +1132,8 @@ void *replication_thread_main(void *ptr __maybe_unused) {
 #endif
             }
             else
-                internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating",
-                               string2str(rq.chart_id), rrdhost_hostname(st->rrdhost));
+                internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
+                               rrdhost_hostname(st->rrdhost), string2str(rq.chart_id));
         }
 
         string_freez(rq.chart_id);

+ 1 - 1
streaming/replication.h

@@ -11,7 +11,7 @@ typedef int (*send_command)(const char *txt, void *data);
 
 bool replicate_chart_request(send_command callback, void *callback_data,
                              RRDHOST *rh, RRDSET *rs,
-                             time_t first_entry_child, time_t last_entry_child,
+                             time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
                              time_t response_first_start_time, time_t response_last_end_time);
 
 void replication_init_sender(struct sender_state *sender);

+ 40 - 20
streaming/rrdpush.c

@@ -223,7 +223,9 @@ static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
 
 // Send the current chart definition.
 // Assumes that collector thread has already called sender_start for mutex / buffer state.
-static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
+static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
+    bool replication_progress = false;
+
     RRDHOST *host = st->rrdhost;
 
     rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -296,29 +298,43 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
         time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0);
         time_t last_entry_local = st->last_updated.tv_sec;
 
-        if(!last_entry_local) {
+        if(unlikely(!last_entry_local))
+            last_entry_local = rrdset_last_entry_t(st);
+
+        time_t now = now_realtime_sec();
+        if(unlikely(last_entry_local > now)) {
             internal_error(true,
-                           "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
-                           rrdhost_hostname(st->rrdhost), rrdset_id(st));
+                           "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld",
+                           rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                           last_entry_local, now);
+            last_entry_local = now;
+        }
 
-            last_entry_local = rrdset_last_entry_t(st);
-            time_t now = now_realtime_sec();
-
-            if(last_entry_local > now) {
-                internal_error(true,
-                               "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
-                               rrdhost_hostname(st->rrdhost), rrdset_id(st),
-                               (unsigned long long)last_entry_local, (unsigned long long)now);
-                last_entry_local = now;
-            }
+        if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) {
+            internal_error(true,
+                           "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every",
+                           rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                           first_entry_local, last_entry_local);
+            first_entry_local = last_entry_local - st->update_every;
+        }
+
+        if(unlikely(!first_entry_local && last_entry_local)) {
+            internal_error(true,
+                           "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time",
+                           rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                           first_entry_local, last_entry_local);
+            first_entry_local = last_entry_local;
         }
 
-        buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu\n",
-                       (unsigned long long)first_entry_local, (unsigned long long)last_entry_local);
+        buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
+                       (unsigned long long)first_entry_local,
+                       (unsigned long long)last_entry_local,
+                       (unsigned long long)now);
 
         rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
         rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
         rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+        replication_progress = true;
 
 #ifdef NETDATA_LOG_REPLICATION_REQUESTS
         internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
@@ -327,6 +343,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
     }
 
     st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+    return replication_progress;
 }
 
 // sends the current chart dimensions
@@ -411,16 +428,19 @@ void rrdset_done_push(RRDSET *st) {
     }
 
     RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
+    bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
+    bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
 
-    if(unlikely(!should_send_chart_matching(st, rrdset_flags)))
+    if(unlikely((exposed_upstream && replication_in_progress) ||
+                !should_send_chart_matching(st, rrdset_flags)))
         return;
 
     BUFFER *wb = sender_start(host->sender);
 
-    if(unlikely(!(rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED)))
-        rrdpush_send_chart_definition(wb, st);
+    if(unlikely(!exposed_upstream))
+        replication_in_progress = rrdpush_send_chart_definition(wb, st);
 
-    if (likely(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED))
+    if (likely(!replication_in_progress))
         rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
 
     sender_commit(host->sender, wb);