Browse Source

replication fixes #5 (#14038)

* pluginsd cleanup; replication logic cleanup; fix bug in replication begin

* log replication start/stop and change the keyword of NETDATA_LOG_REPLICATION_REQUESTS logs to REPLAY

* dont ask for data the child does not have; log fixes

* more pluginsd cleanup

* count sender dictionary entries

* fix dictionary_flush()
Costa Tsaousis 2 years ago
parent
commit
8e1a99ad79

+ 252 - 241
collectors/plugins.d/pluginsd_parser.c

@@ -53,68 +53,98 @@ static int send_to_plugin(const char *txt, void *data) {
     return -4;
 }
 
-PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
-{
-    char *dimension = get_word(words, num_words, 1);
-    char *value = get_word(words, num_words, 2);
+static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) {
+    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
 
+    if(unlikely(!host))
+        error("PLUGINSD: command %s requires a host, but is not set.", cmd);
+
+    return host;
+}
+
+static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) {
     RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
 
+    if(unlikely(!st))
+        error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
+
+    return st;
+}
+
+static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
     if (unlikely(!dimension || !*dimension)) {
-        error("requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", rrdset_id(st), rrdhost_hostname(host));
-        goto disable;
+        error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+              rrdhost_hostname(host), rrdset_id(st), cmd);
+        return NULL;
     }
 
-    if (unlikely(!value || !*value))
-        value = NULL;
+    RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
 
-    if (unlikely(!st)) {
-        error(
-            "requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.", dimension,
-            value ? value : "<nothing>", rrdhost_hostname(host));
-        goto disable;
-    }
+    if (unlikely(!rda))
+        error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+              rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
 
-    if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
-        debug(D_PLUGINSD, "is setting dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value ? value : "<nothing>");
+    return rda;
+}
 
-    if (value) {
-        RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
-        RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
-        if (unlikely(!rd)) {
-            error( "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
-                    dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
-            goto disable;
-        }
-        rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
-        rrddim_acquired_release(rda);
+static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
+    if (unlikely(!chart || !*chart)) {
+        error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+              rrdhost_hostname(host), cmd);
+        return NULL;
     }
-    return PARSER_RC_OK;
 
-disable:
+    RRDSET *st = rrdset_find(host, chart);
+    if (unlikely(!st))
+        error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+              rrdhost_hostname(host), chart, cmd);
+
+    return st;
+}
+
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) {
     ((PARSER_USER_OBJECT *) user)->enabled = 0;
     return PARSER_RC_ERROR;
 }
 
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
+{
+    char *dimension = get_word(words, num_words, 1);
+    char *value = get_word(words, num_words, 2);
+
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+    RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+    if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
+    RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+
+    if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+        debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
+              rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
+
+    if (value && *value)
+        rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+
+    rrddim_acquired_release(rda);
+    return PARSER_RC_OK;
+}
+
 PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
 {
     char *id = get_word(words, num_words, 1);
     char *microseconds_txt = get_word(words, num_words, 2);
 
-    RRDSET *st = NULL;
-    RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!id)) {
-        error("requested a BEGIN without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
-        goto disable;
-    }
+    RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    st = rrdset_find(host, id);
-    if (unlikely(!st)) {
-        error("requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id, rrdhost_hostname(host));
-        goto disable;
-    }
     ((PARSER_USER_OBJECT *)user)->st = st;
 
     usec_t microseconds = 0;
@@ -127,13 +157,11 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
                 rrdset_next_usec_unfiltered(st, microseconds);
             else
                 rrdset_next_usec(st, microseconds);
-        } else
+        }
+        else
             rrdset_next(st);
     }
     return PARSER_RC_OK;
-disable:
-    ((PARSER_USER_OBJECT *)user)->enabled = 0;
-    return PARSER_RC_ERROR;
 }
 
 PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
@@ -141,14 +169,11 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
     UNUSED(words);
     UNUSED(num_words);
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!st)) {
-        error("requested an END, without a BEGIN on host '%s'. Disabling it.", rrdhost_hostname(host));
-        ((PARSER_USER_OBJECT *) user)->enabled = 0;
-        return PARSER_RC_ERROR;
-    }
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
     if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
         debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
@@ -165,11 +190,8 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
 
 PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
 {
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
-    if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
-        debug(D_PLUGINSD, "Ignoring chart belonging to missing or ignored host.");
-        return PARSER_RC_OK;
-    }
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
     char *type = get_word(words, num_words, 1);
     char *name = get_word(words, num_words, 2);
@@ -193,10 +215,9 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
 
     // make sure we have the required variables
     if (unlikely((!type || !*type || !id || !*id))) {
-        if (likely(host))
-            error("requested a CHART, without a type.id, on host '%s'. Disabling it.", rrdhost_hostname(host));
-        else
-            error("requested a CHART, without a type.id. Disabling it.");
+        error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.",
+              rrdhost_hostname(host));
+
         ((PARSER_USER_OBJECT *) user)->enabled = 0;
         return PARSER_RC_ERROR;
     }
@@ -292,39 +313,33 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
     const char *first_entry_txt = get_word(words, num_words, 1);
     const char *last_entry_txt = get_word(words, num_words, 2);
 
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
     if(unlikely(!first_entry_txt || !last_entry_txt)) {
-        error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without first or last entry. Disabling it.");
-        return PARSER_RC_ERROR;
+        error("PLUGINSD: 'host:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " without first or last entry. Disabling it.",
+              rrdhost_hostname(host));
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     long first_entry_child = str2l(first_entry_txt);
     long last_entry_child = str2l(last_entry_txt);
 
-    PARSER_USER_OBJECT *user_object = (PARSER_USER_OBJECT *) user;
-
-    RRDHOST *host = user_object->host;
-    RRDSET *st = user_object->st;
-    if(unlikely(!host || !st)) {
-        error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without a chart. Disabling it.");
-        return PARSER_RC_ERROR;
-    }
-
     internal_error(
-               (first_entry_child != 0 || last_entry_child != 0)
+            (first_entry_child != 0 || last_entry_child != 0)
             && (first_entry_child == 0 || last_entry_child == 0),
-            "REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).",
-            (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
-
-//    internal_error(
-//            true,
-//            "REPLAY host '%s', chart '%s': received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " first time %llu, last time %llu.",
-//            rrdhost_hostname(host), rrdset_id(st),
-//            (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+            "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).",
+            rrdhost_hostname(host), rrdset_id(st),
+            (unsigned long long)first_entry_child, (unsigned long long)last_entry_child
+            );
 
     bool ok = true;
     if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
 
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
         st->replay.start_streaming = false;
         st->replay.after = 0;
         st->replay.before = 0;
@@ -334,12 +349,16 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
         rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
         rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
 
-        ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child,
+        PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
+        ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child,
                                           last_entry_child, 0, 0);
     }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
     else {
-        internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st));
+        internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st));
     }
+#endif
 
     return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
 }
@@ -353,23 +372,22 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
     char *divisor_s = get_word(words, num_words, 5);
     char *options = get_word(words, num_words, 6);
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
-    if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
-        debug(D_PLUGINSD, "Ignoring dimension belonging to missing or ignored host.");
-        return PARSER_RC_OK;
-    }
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
     if (unlikely(!id)) {
-        error(
-            "requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", rrdhost_hostname(host),
-            st ? rrdset_id(st) : "UNSET");
-        goto disable;
+        error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.",
+              rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET");
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
-        error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", rrdhost_hostname(host));
-        goto disable;
+        error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.",
+              rrdhost_hostname(host));
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     long multiplier = 1;
@@ -421,7 +439,8 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
             rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
             metaqueue_dimension_update_flags(rd);
         }
-    } else {
+    }
+    else {
         rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
         if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
             rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
@@ -430,9 +449,6 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
     }
 
     return PARSER_RC_OK;
-disable:
-    ((PARSER_USER_OBJECT *)user)->enabled = 0;
-    return PARSER_RC_ERROR;
 }
 
 // ----------------------------------------------------------------------------
@@ -491,6 +507,7 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m
 
     return false;
 }
+
 static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
     struct inflight_function *pf = func;
 
@@ -587,18 +604,22 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
     char *timeout_s = get_word(words, num_words, i++);
     char *help      = get_word(words, num_words, i++);
 
-    RRDSET *st = (global)?NULL:((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION);
+    if(!host) return PARSER_RC_ERROR;
 
-    if (unlikely(!host || !timeout_s || !name || !help || (!global && !st))) {
-        error("requested a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'), host '%s', chart '%s'. Ignoring it.",
+    RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
+    if(!st) global = true;
+
+    if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
+        error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
+              rrdhost_hostname(host),
+              st?rrdset_id(st):"(unset)",
               global?"yes":"no",
               name?name:"(unset)",
               timeout_s?timeout_s:"(unset)",
-              help?help:"(unset)",
-              host?rrdhost_hostname(host):"(unset)",
-              st?rrdset_id(st):"(unset)");
-        return PARSER_RC_OK;
+              help?help:"(unset)"
+              );
+        return PARSER_RC_ERROR;
     }
 
     int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
@@ -683,8 +704,10 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
     char *value = get_word(words, num_words, 2);
     NETDATA_DOUBLE v;
 
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
     RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
 
     int global = (st) ? 0 : 1;
 
@@ -701,36 +724,50 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
     }
 
     if (unlikely(!name || !*name)) {
-        error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", rrdhost_hostname(host));
+        error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.",
+              rrdhost_hostname(host), st ? rrdset_id(st):"UNSET");
+
         ((PARSER_USER_OBJECT *)user)->enabled = 0;
-        return PARSER_RC_ERROR;
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     if (unlikely(!value || !*value))
         value = NULL;
 
     if (unlikely(!value)) {
-        error("cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name,
-              rrdhost_hostname(host));
+        error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
+              rrdhost_hostname(host),
+              st ? rrdset_id(st):"UNSET",
+              (global) ? "HOST" : "CHART",
+              name);
         return PARSER_RC_OK;
     }
 
     if (!global && !st) {
-        error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, rrdhost_hostname(host));
-        return PARSER_RC_OK;
+        error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart",
+              rrdhost_hostname(host),
+              st ? rrdset_id(st):"UNSET",
+              name
+              );
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     char *endptr = NULL;
     v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
     if (unlikely(endptr && *endptr)) {
         if (endptr == value)
-            error(
-                "the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name,
-                rrdhost_hostname(host));
+            error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
+                  rrdhost_hostname(host),
+                  st ? rrdset_id(st):"UNSET",
+                  value,
+                  name);
         else
-            error(
-                "the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name, rrdhost_hostname(host),
-                endptr);
+            error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
+                  rrdhost_hostname(host),
+                  st ? rrdset_id(st):"UNSET",
+                  value,
+                  name,
+                  endptr);
     }
 
     if (global) {
@@ -740,7 +777,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
             rrdvar_custom_host_variable_release(host, rva);
         }
         else
-            error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, rrdhost_hostname(host));
+            error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
+                  rrdhost_hostname(host),
+                  name);
     } else {
         const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
         if (rsa) {
@@ -748,7 +787,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
             rrdsetvar_custom_chart_variable_release(st, rsa);
         }
         else
-            error("cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, rrdhost_hostname(host), rrdset_id(st));
+            error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
+                  rrdhost_hostname(host), rrdset_id(st), name);
     }
 
     return PARSER_RC_OK;
@@ -767,7 +807,7 @@ PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_u
 
 PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
 {
-    info("called DISABLE. Disabling it.");
+    info("PLUGINSD: plugin called DISABLE. Disabling it.");
     ((PARSER_USER_OBJECT *) user)->enabled = 0;
     return PARSER_RC_ERROR;
 }
@@ -779,8 +819,8 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
     const char *value = get_word(words, num_words, 3);
 
     if (!name || !label_source || !value) {
-        error("Ignoring malformed or empty LABEL command.");
-        return PARSER_RC_OK;
+        error("PLUGINSD: ignoring malformed or empty LABEL command.");
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     char *store = (char *)value;
@@ -826,10 +866,12 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
 
 PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
 {
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
     debug(D_PLUGINSD, "requested to OVERWRITE host labels");
 
-    if(!host->rrdlabels)
+    if(unlikely(!host->rrdlabels))
         host->rrdlabels = rrdlabels_create();
 
     rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
@@ -849,7 +891,7 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
 
     if (!name || !value || !*label_source) {
         error("Ignoring malformed or empty CHART LABEL command.");
-        return PARSER_RC_OK;
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
@@ -865,17 +907,18 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
 
 PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
 {
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
-    RRDSET *st = ((PARSER_USER_OBJECT *)user)->st;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!st))
-        return PARSER_RC_OK;
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
     debug(D_PLUGINSD, "requested to commit chart labels");
 
     if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
-        error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", rrdhost_hostname(host));
-        return PARSER_RC_OK;
+        error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
+              rrdhost_hostname(host));
+        return PLUGINSD_DISABLE_PLUGIN(user);
     }
 
     rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
@@ -894,44 +937,42 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
     char *end_time_str = get_word(words, num_words, 3);
     char *child_now_str = get_word(words, num_words, 4);
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!id || (!st && !*id))) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
-        goto disable;
-    }
-
-    if(*id) {
-        st = rrdset_find(host, id);
-        if (unlikely(!st)) {
-            error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
-                  id, rrdhost_hostname(host));
-            goto disable;
-        }
+    RRDSET *st;
+    if (likely(!id || !*id))
+        st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    else
+        st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
 
-        ((PARSER_USER_OBJECT *) user)->st = st;
-    }
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+    ((PARSER_USER_OBJECT *) user)->st = st;
 
     if(start_time_str && end_time_str) {
         time_t start_time = strtol(start_time_str, NULL, 0);
         time_t end_time = strtol(end_time_str, NULL, 0);
 
         time_t wall_clock_time = 0, tolerance;
+        bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
         if(child_now_str) {
             wall_clock_time = strtol(child_now_str, NULL, 0);
-            tolerance = 1;
+            tolerance = st->update_every + 1;
+            wall_clock_comes_from_child = true;
         }
 
         if(wall_clock_time <= 0) {
             wall_clock_time = now_realtime_sec();
             tolerance = st->update_every + 60;
+            wall_clock_comes_from_child = false;
         }
 
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
         internal_error(
                 (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
-                "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).",
-                rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before);
+                "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
+                rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
+#endif
 
         if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
             if (unlikely(end_time - start_time != st->update_every))
@@ -962,8 +1003,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
         }
 
         internal_error(true,
-                       "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, but timestamps are invalid (now is %ld).",
-                       rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, wall_clock_time);
+                       "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld).",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+                       wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
     }
 
     // the child sends an RBEGIN without any parameters initially
@@ -976,63 +1018,59 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
     ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
     ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
     return PARSER_RC_OK;
-
-disable:
-    ((PARSER_USER_OBJECT *)user)->enabled = 0;
-    return PARSER_RC_ERROR;
 }
 
 PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
 {
-    if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled)
-        return PARSER_RC_OK;
-
     char *dimension = get_word(words, num_words, 1);
     char *value_str = get_word(words, num_words, 2);
     char *flags_str = get_word(words, num_words, 3);
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!st)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
-              dimension, rrdhost_hostname(host));
-        goto disable;
-    }
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!dimension || !*dimension)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
-              rrdset_id(st), rrdhost_hostname(host));
-        goto disable;
+    if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) {
+        error_limit_static_thread_var(erl, 1, 0);
+        error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors",
+                    rrdhost_hostname(host), rrdset_id(st));
+
+        // we have to return OK here
+        return PARSER_RC_OK;
     }
 
+    RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+    if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
     if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
-              dimension, rrdhost_hostname(host),
+        error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+              rrdhost_hostname(host),
+              rrdset_id(st),
+              dimension,
               ((PARSER_USER_OBJECT *) user)->replay.start_time,
               ((PARSER_USER_OBJECT *) user)->replay.end_time);
-        goto disable;
+        return PARSER_RC_ERROR;
     }
 
     if (unlikely(!value_str || !*value_str))
-        value_str = "nan";
+        value_str = "NAN";
 
     if(unlikely(!flags_str))
         flags_str = "";
 
     if (likely(value_str)) {
-        RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
         RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
-        if(unlikely(!rd)) {
-            error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
-                  dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
-            goto disable;
-        }
 
         RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
 
         if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) {
-            error("REPLAY: dimension '%s' in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
+            error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the OBSOLETE flag set, but it is collected.",
+                  rrdhost_hostname(st->rrdhost),
+                  rrdset_id(st),
+                  rrddim_id(rd)
+                  );
             rrddim_isnot_obsolete(st, rd);
         }
 
@@ -1069,15 +1107,12 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
             rd->collections_counter++;
         }
         else
-            error("REPLAY: dimension '%s' in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st));
-
-        rrddim_acquired_release(rda);
+            error("PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is collected. Ignoring data.",
+                  rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
     }
-    return PARSER_RC_OK;
 
-disable:
-    ((PARSER_USER_OBJECT *) user)->enabled = 0;
-    return PARSER_RC_ERROR;
+    rrddim_acquired_release(rda);
+    return PARSER_RC_OK;
 }
 
 PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
@@ -1088,29 +1123,16 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
     char *last_calculated_value_str = get_word(words, num_words, 4);
     char *last_stored_value_str = get_word(words, num_words, 5);
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!st)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
-              dimension, rrdhost_hostname(host));
-        goto disable;
-    }
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!dimension || !*dimension)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
-              rrdset_id(st), rrdhost_hostname(host));
-        goto disable;
-    }
+    RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+    if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
     RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
-    if(unlikely(!rd)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
-              dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
-        goto disable;
-    }
-
     usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
     usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
     if(last_collected_ut > dim_last_collected_ut) {
@@ -1123,10 +1145,6 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
     rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
     rrddim_acquired_release(rda);
     return PARSER_RC_OK;
-
-disable:
-    ((PARSER_USER_OBJECT *) user)->enabled = 0;
-    return PARSER_RC_ERROR;
 }
 
 PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
@@ -1134,14 +1152,11 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
     char *last_collected_ut_str = get_word(words, num_words, 1);
     char *last_updated_ut_str = get_word(words, num_words, 2);
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!st)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
-              rrdhost_hostname(host));
-        goto disable;
-    }
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
     usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
     usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
@@ -1161,10 +1176,6 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
     st->counter_done++;
 
     return PARSER_RC_OK;
-
-disable:
-    ((PARSER_USER_OBJECT *) user)->enabled = 0;
-    return PARSER_RC_ERROR;
 }
 
 PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
@@ -1184,21 +1195,20 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
 
     PARSER_USER_OBJECT *user_object = user;
 
-    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+    RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
+    if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
 
-    if (unlikely(!st)) {
-        error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
-              rrdhost_hostname(host));
-        return PARSER_RC_ERROR;
-    }
+    RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+    if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
 
-//    internal_error(true,
-//                   "REPLAY: host '%s', chart '%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu",
-//                   rrdhost_hostname(host), rrdset_id(st),
-//                   (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
-//                   start_streaming?"true":"false",
-//                   (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested);
+#ifdef NETDATATA_LOG_REPLICATION_REQUESTS
+    internal_error(true,
+                   "PLUGINSD: 'host:%s/chart:%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu",
+                   rrdhost_hostname(host), rrdset_id(st),
+                   (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
+                   start_streaming?"true":"false",
+                   (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested);
+#endif
 
     ((PARSER_USER_OBJECT *) user)->st = NULL;
     ((PARSER_USER_OBJECT *) user)->count++;
@@ -1222,7 +1232,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
     st->counter++;
     st->counter_done++;
 
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
     st->replay.start_streaming = false;
     st->replay.after = 0;
     st->replay.before = 0;
@@ -1238,10 +1248,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
             rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
             rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
         }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
         else
-            internal_error(true, "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', chart '%s' with enable_streaming = true, but there is no replication in progress for this chart.",
+            internal_error(true, "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
                   rrdhost_hostname(host), rrdset_id(st));
-
+#endif
         worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
 
         return PARSER_RC_OK;

+ 10 - 6
daemon/global_statistics.c

@@ -1225,6 +1225,7 @@ struct dictionary_categories {
     RRDDIM *rd_spins_use;
     RRDDIM *rd_spins_search;
     RRDDIM *rd_spins_insert;
+    RRDDIM *rd_spins_delete;
 
 } dictionary_categories[] = {
     { .stats = &dictionary_stats_category_other, "dictionaries", "dictionaries", 900000 },
@@ -1481,9 +1482,10 @@ static void update_dictionary_category_charts(struct dictionary_categories *c) {
     // ------------------------------------------------------------------------
 
     total = 0;
-    load_dictionary_stats_entry(spin_locks.use);
-    load_dictionary_stats_entry(spin_locks.search);
-    load_dictionary_stats_entry(spin_locks.insert);
+    load_dictionary_stats_entry(spin_locks.use_spins);
+    load_dictionary_stats_entry(spin_locks.search_spins);
+    load_dictionary_stats_entry(spin_locks.insert_spins);
+    load_dictionary_stats_entry(spin_locks.delete_spins);
 
     if(c->st_spins || total != 0) {
         if (unlikely(!c->st_spins)) {
@@ -1511,13 +1513,15 @@ static void update_dictionary_category_charts(struct dictionary_categories *c) {
             c->rd_spins_use = rrddim_add(c->st_spins, "use", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
             c->rd_spins_search = rrddim_add(c->st_spins, "search", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
             c->rd_spins_insert = rrddim_add(c->st_spins, "insert", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+            c->rd_spins_delete = rrddim_add(c->st_spins, "delete", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
 
             rrdlabels_add(c->st_spins->rrdlabels, "category", stats.name, RRDLABEL_SRC_AUTO);
         }
 
-        rrddim_set_by_pointer(c->st_spins, c->rd_spins_use, (collected_number)stats.spin_locks.use);
-        rrddim_set_by_pointer(c->st_spins, c->rd_spins_search, (collected_number)stats.spin_locks.search);
-        rrddim_set_by_pointer(c->st_spins, c->rd_spins_insert, (collected_number)stats.spin_locks.insert);
+        rrddim_set_by_pointer(c->st_spins, c->rd_spins_use, (collected_number)stats.spin_locks.use_spins);
+        rrddim_set_by_pointer(c->st_spins, c->rd_spins_search, (collected_number)stats.spin_locks.search_spins);
+        rrddim_set_by_pointer(c->st_spins, c->rd_spins_insert, (collected_number)stats.spin_locks.insert_spins);
+        rrddim_set_by_pointer(c->st_spins, c->rd_spins_delete, (collected_number)stats.spin_locks.delete_spins);
 
         rrdset_done(c->st_spins);
     }

+ 19 - 9
database/rrd.h

@@ -314,6 +314,12 @@ struct rrddim {
     collected_number collected_value;               // the current value, as collected - resets to 0 after being used
     collected_number last_collected_value;          // the last value that was collected, after being processed
 
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+    usec_t rrddim_store_metric_last_ut;             // the timestamp we last called rrddim_store_metric()
+    size_t rrddim_store_metric_count;               // the rrddim_store_metric() counter
+    const char *rrddim_store_metric_last_caller;    // the name of the function that last called rrddim_store_metric()
+#endif
+
     // ------------------------------------------------------------------------
     // db mode RAM, SAVE, MAP, ALLOC, NONE specifics
     // TODO - they should be managed by storage engine
@@ -532,20 +538,19 @@ typedef enum rrdset_flags {
 
     RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION    = (1 << 21),
 
-    RRDSET_FLAG_SENDER_REPLICATION_QUEUED        = (1 << 22), // the sending side has replication in progress
-    RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS   = (1 << 23), // the sending side has replication in progress
-    RRDSET_FLAG_SENDER_REPLICATION_FINISHED      = (1 << 24), // the sending side has completed replication
-    RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 25), // the receiving side has replication in progress
-    RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED    = (1 << 26), // the receiving side has completed replication
+    RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS   = (1 << 22), // the sending side has replication in progress
+    RRDSET_FLAG_SENDER_REPLICATION_FINISHED      = (1 << 23), // the sending side has completed replication
+    RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 24), // the receiving side has replication in progress
+    RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED    = (1 << 25), // the receiving side has completed replication
 
-    RRDSET_FLAG_UPSTREAM_SEND_VARIABLES          = (1 << 27), // a custom variable has been updated and needs to be exposed to parent
+    RRDSET_FLAG_UPSTREAM_SEND_VARIABLES          = (1 << 26), // a custom variable has been updated and needs to be exposed to parent
 } RRDSET_FLAGS;
 
 #define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
 #define rrdset_flag_set(st, flag)   __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_SEQ_CST)
 #define rrdset_flag_clear(st, flag) __atomic_and_fetch(&((st)->flags), ~(flag), __ATOMIC_SEQ_CST)
 
-#define rrdset_is_replicating(st) (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_SENDER_REPLICATION_QUEUED) \
+#define rrdset_is_replicating(st) (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS) \
     && !rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED|RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
 
 struct rrdset {
@@ -666,13 +671,13 @@ struct rrdset {
         RRDCALC *base;                              // double linked list of RRDCALC related to this RRDSET
     } alerts;
 
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
     struct {
         bool start_streaming;
         time_t after;
         time_t before;
     } replay;
-#endif
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
 };
 
 #define rrdset_plugin_name(st) string2str((st)->plugin_name)
@@ -1330,7 +1335,12 @@ time_t calc_dimension_liveness(RRDDIM *rd, time_t now);
 #endif
 long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
 
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+#define rrddim_store_metric(rd, point_end_time_ut, n, flags) rrddim_store_metric_with_trace(rd, point_end_time_ut, n, flags, __FUNCTION__)
+void rrddim_store_metric_with_trace(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags, const char *function);
+#else
 void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
+#endif
 
 // ----------------------------------------------------------------------------
 // Miscellaneous functions

+ 29 - 1
database/rrdset.c

@@ -1109,8 +1109,36 @@ void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, u
         }
     }
 }
-
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+void rrddim_store_metric_with_trace(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags, const char *function) {
+#else // !NETDATA_LOG_COLLECTION_ERRORS
 void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+#endif // !NETDATA_LOG_COLLECTION_ERRORS
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+    rd->rrddim_store_metric_count++;
+
+    if(likely(rd->rrddim_store_metric_count > 1)) {
+        usec_t expected = rd->rrddim_store_metric_last_ut + rd->update_every * USEC_PER_SEC;
+
+        if(point_end_time_ut != rd->rrddim_store_metric_last_ut) {
+            internal_error(true,
+                           "%s COLLECTION: 'host:%s/chart:%s/dim:%s' granularity %d, collection %zu, expected to store at tier 0 a value at %llu, but it gave %llu [%s%llu usec] (called from %s(), previously by %s())",
+                           (point_end_time_ut < rd->rrddim_store_metric_last_ut) ? "**PAST**" : "GAP",
+                           rrdhost_hostname(rd->rrdset->rrdhost), rrdset_id(rd->rrdset), rrddim_id(rd),
+                           rd->update_every,
+                           rd->rrddim_store_metric_count,
+                           expected, point_end_time_ut,
+                           (point_end_time_ut < rd->rrddim_store_metric_last_ut)?"by -" : "gap ",
+                           expected - point_end_time_ut,
+                           function,
+                           rd->rrddim_store_metric_last_caller?rd->rrddim_store_metric_last_caller:"none");
+        }
+    }
+
+    rd->rrddim_store_metric_last_ut = point_end_time_ut;
+    rd->rrddim_store_metric_last_caller = function;
+#endif // NETDATA_LOG_COLLECTION_ERRORS
+
     // store the metric on tier 0
     rd->tiers[0]->collect_ops->store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
 

+ 215 - 98
libnetdata/dictionary/dictionary.c

@@ -32,7 +32,8 @@ typedef enum item_options {
 
 typedef enum item_flags {
     ITEM_FLAG_NONE              = 0,
-    ITEM_FLAG_DELETED           = (1 << 0), // this item is deleted, so it is not available for traversal
+    ITEM_FLAG_DELETED           = (1 << 0), // this item is marked deleted, so it is not available for traversal (deleted from the index too)
+    ITEM_FLAG_BEING_CREATED     = (1 << 1), // this item is currently being created - this flag is removed when construction finishes
 
     // IMPORTANT: This is 8-bit
 } ITEM_FLAGS;
@@ -175,15 +176,19 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict);
 static inline void item_linked_list_remove(DICTIONARY *dict, DICTIONARY_ITEM *item);
 static size_t dict_item_free_with_hooks(DICTIONARY *dict, DICTIONARY_ITEM *item);
 static inline const char *item_get_name(const DICTIONARY_ITEM *item);
-static bool item_is_not_referenced_and_can_be_removed(DICTIONARY *dict, DICTIONARY_ITEM *item);
 static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, void *item);
 static void item_release(DICTIONARY *dict, DICTIONARY_ITEM *item);
-
-#define ITEM_OK 0
-#define ITEM_MARKED_FOR_DELETION (-1)  // the item is marked for deletion
-#define ITEM_IS_CURRENTLY_BEING_DELETED (-2) // the item is currently being deleted
-#define item_check_and_acquire(dict, item) (item_check_and_acquire_advanced(dict, item, false) == ITEM_OK)
+static bool dict_item_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item);
+
+#define RC_ITEM_OK                         ( 0)
+#define RC_ITEM_MARKED_FOR_DELETION        (-1) // the item is marked for deletion
+#define RC_ITEM_IS_CURRENTLY_BEING_DELETED (-2) // the item is currently being deleted
+#define RC_ITEM_IS_CURRENTLY_BEING_CREATED (-3) // the item is currently being deleted
+#define RC_ITEM_IS_REFERENCED              (-4) // the item is currently referenced
+#define item_check_and_acquire(dict, item) (item_check_and_acquire_advanced(dict, item, false) == RC_ITEM_OK)
 static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *item, bool having_index_lock);
+#define item_is_not_referenced_and_can_be_removed(dict, item) (item_is_not_referenced_and_can_be_removed_advanced(dict, item) == RC_ITEM_OK)
+static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY *dict, DICTIONARY_ITEM *item);
 
 // ----------------------------------------------------------------------------
 // memory statistics
@@ -345,14 +350,23 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) {
     __atomic_fetch_add(&dict->stats->ops.deletes, 1, __ATOMIC_RELAXED);
     __atomic_fetch_sub(&dict->stats->items.entries, 1, __ATOMIC_RELAXED);
 
+    size_t entries;
     if(unlikely(is_dictionary_single_threaded(dict))) {
         dict->version++;
-        dict->entries--;
+        entries = dict->entries++;
     }
     else {
         __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST);
-        __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_SEQ_CST);
+        entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_SEQ_CST);
     }
+
+#ifdef NETDATA_INTERNAL_CHECKS
+    if(unlikely(entries == 0))
+        fatal("DICT: negative number of entries in dictionary created from %s() (%zu@%s)",
+              dict->creation_function,
+              dict->creation_line,
+              dict->creation_file);
+#endif
 }
 static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) {
     __atomic_fetch_add(&dict->stats->ops.resets, 1, __ATOMIC_RELAXED);
@@ -369,13 +383,16 @@ static inline void DICTIONARY_STATS_WALKTHROUGHS_PLUS1(DICTIONARY *dict) {
     __atomic_fetch_add(&dict->stats->ops.walkthroughs, 1, __ATOMIC_RELAXED);
 }
 static inline void DICTIONARY_STATS_CHECK_SPINS_PLUS(DICTIONARY *dict, size_t count) {
-    __atomic_fetch_add(&dict->stats->spin_locks.use, count, __ATOMIC_RELAXED);
+    __atomic_fetch_add(&dict->stats->spin_locks.use_spins, count, __ATOMIC_RELAXED);
 }
 static inline void DICTIONARY_STATS_INSERT_SPINS_PLUS(DICTIONARY *dict, size_t count) {
-    __atomic_fetch_add(&dict->stats->spin_locks.insert, count, __ATOMIC_RELAXED);
+    __atomic_fetch_add(&dict->stats->spin_locks.insert_spins, count, __ATOMIC_RELAXED);
+}
+static inline void DICTIONARY_STATS_DELETE_SPINS_PLUS(DICTIONARY *dict, size_t count) {
+    __atomic_fetch_add(&dict->stats->spin_locks.delete_spins, count, __ATOMIC_RELAXED);
 }
 static inline void DICTIONARY_STATS_SEARCH_IGNORES_PLUS1(DICTIONARY *dict) {
-    __atomic_fetch_add(&dict->stats->spin_locks.search, 1, __ATOMIC_RELAXED);
+    __atomic_fetch_add(&dict->stats->spin_locks.search_spins, 1, __ATOMIC_RELAXED);
 }
 static inline void DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(DICTIONARY *dict) {
     __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELAXED);
@@ -422,10 +439,22 @@ static inline long int DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) {
 static inline long int DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) {
     __atomic_fetch_sub(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED);
 
+    long int referenced_items;
     if(unlikely(is_dictionary_single_threaded(dict)))
-        return --dict->referenced_items;
+        referenced_items = --dict->referenced_items;
     else
-        return __atomic_sub_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST);
+        referenced_items = __atomic_sub_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+    if(unlikely(referenced_items < 0))
+        fatal("DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)",
+              referenced_items,
+              dict->creation_function,
+              dict->creation_line,
+              dict->creation_file);
+#endif
+
+    return referenced_items;
 }
 
 static inline long int DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) {
@@ -683,8 +712,8 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) {
         item_next = item->next;
         int rc = item_check_and_acquire_advanced(dict, item, is_view);
 
-        if(rc == ITEM_MARKED_FOR_DELETION) {
-            // we don't have got a reference
+        if(rc == RC_ITEM_MARKED_FOR_DELETION) {
+            // we didn't get a reference
 
             if(item_is_not_referenced_and_can_be_removed(dict, item)) {
                 DOUBLE_LINKED_LIST_REMOVE_UNSAFE(dict->items.list, item, prev, next);
@@ -696,10 +725,10 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) {
                     break;
             }
         }
-        else if(rc == ITEM_IS_CURRENTLY_BEING_DELETED)
-            ; // do not touch this item (we haven't got a reference)
+        else if(rc == RC_ITEM_IS_CURRENTLY_BEING_DELETED)
+            ; // do not touch this item (we didn't get a reference)
 
-        else if(rc == ITEM_OK)
+        else if(rc == RC_ITEM_OK)
             item_release(dict, item);
 
         item = item_next;
@@ -829,6 +858,8 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
     size_t spins = 0;
     REFCOUNT refcount, desired;
 
+    int ret = RC_ITEM_OK;
+
     do {
         spins++;
 
@@ -836,12 +867,14 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
 
         if(refcount < 0) {
             // we can't use this item
-            return ITEM_IS_CURRENTLY_BEING_DELETED;
+            ret = RC_ITEM_IS_CURRENTLY_BEING_DELETED;
+            break;
         }
 
         if(item_flag_check(item, ITEM_FLAG_DELETED)) {
             // we can't use this item
-            return ITEM_MARKED_FOR_DELETION;
+            ret = RC_ITEM_MARKED_FOR_DELETION;
+            break;
         }
 
         desired = refcount + 1;
@@ -849,63 +882,89 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
     } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired,
                                           false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
 
-    // we acquired the item
+    // if ret == ITEM_OK, we acquired the item
 
-    if(is_view_dictionary(dict) && item_shared_flag_check(item, ITEM_FLAG_DELETED) && !item_flag_check(item, ITEM_FLAG_DELETED)) {
-        // but, we can't use this item
+    if(ret == RC_ITEM_OK) {
+        if (is_view_dictionary(dict) &&
+            item_shared_flag_check(item, ITEM_FLAG_DELETED) &&
+            !item_flag_check(item, ITEM_FLAG_DELETED)) {
+            // but, we can't use this item
 
-        if(having_index_lock) {
-            // delete it from the hashtable
-            hashtable_delete_unsafe(dict, item_get_name(item), item->key_len, item);
+            if (having_index_lock) {
+                // delete it from the hashtable
+                hashtable_delete_unsafe(dict, item_get_name(item), item->key_len, item);
 
-            // mark it in our dictionary as deleted too
-            // this is safe to be done here, because we have got
-            // a reference counter on item
-            item_flag_set(item, ITEM_FLAG_DELETED);
+                // mark it in our dictionary as deleted too
+                // this is safe to be done here, because we have got
+                // a reference counter on item
+                dict_item_set_deleted(dict, item);
 
-            DICTIONARY_ENTRIES_MINUS1(dict);
+                // decrement the refcount we incremented above
+                if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST) == 0) {
+                    // this is a deleted item, and we are the last one
+                    DICTIONARY_PENDING_DELETES_PLUS1(dict);
+                }
 
-            // decrement the refcount we incremented above
-            if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST) == 0) {
-                // this is a deleted item, and we are the last one
-                DICTIONARY_PENDING_DELETES_PLUS1(dict);
+                // do not touch the item below this point
+            } else {
+                // this is traversal / walkthrough
+                // decrement the refcount we incremented above
+                __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST);
             }
 
-            // do not touch the item below this point
-        }
-        else {
-            // this is traversal / walkthrough
-            // decrement the refcount we incremented above
-            __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST);
+            return RC_ITEM_MARKED_FOR_DELETION;
         }
 
-        return ITEM_MARKED_FOR_DELETION;
+        if(desired == 1)
+            DICTIONARY_REFERENCED_ITEMS_PLUS1(dict);
     }
 
-    if(desired == 1)
-        DICTIONARY_REFERENCED_ITEMS_PLUS1(dict);
 
-    if(unlikely(spins > 2 && dict->stats))
-        DICTIONARY_STATS_CHECK_SPINS_PLUS(dict, spins - 2);
+    if(unlikely(spins > 1 && dict->stats))
+        DICTIONARY_STATS_CHECK_SPINS_PLUS(dict, spins - 1);
 
-    return ITEM_OK; // we can use this item
+    return ret;
 }
 
 // if a dictionary item can be deleted, return true, otherwise return false
 // we use the private reference counter
-static inline bool item_is_not_referenced_and_can_be_removed(DICTIONARY *dict, DICTIONARY_ITEM *item) {
+static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY *dict, DICTIONARY_ITEM *item) {
     // if we can set refcount to REFCOUNT_DELETING, we can delete this item
 
-    REFCOUNT expected = DICTIONARY_ITEM_REFCOUNT_GET(dict, item);
-    if(expected == 0 && __atomic_compare_exchange_n(&item->refcount, &expected, REFCOUNT_DELETING,
-                                                     false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
+    size_t spins = 0;
+    REFCOUNT refcount, desired = REFCOUNT_DELETING;
 
-        // we are going to delete it
-        return true;
-    }
+    int ret = RC_ITEM_OK;
 
-    // we can't delete this
-    return false;
+    do {
+        spins++;
+
+        refcount = DICTIONARY_ITEM_REFCOUNT_GET(dict, item);
+
+        if(refcount < 0) {
+            // we can't use this item
+            ret = RC_ITEM_IS_CURRENTLY_BEING_DELETED;
+            break;
+        }
+
+        if(refcount > 0) {
+            // we can't delete this
+            ret = RC_ITEM_IS_REFERENCED;
+            break;
+        }
+
+        if(item_flag_check(item, ITEM_FLAG_BEING_CREATED)) {
+            // we can't use this item
+            ret = RC_ITEM_IS_CURRENTLY_BEING_CREATED;
+            break;
+        }
+    } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired,
+                                                         false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+
+    if(unlikely(spins > 1 && dict->stats))
+        DICTIONARY_STATS_DELETE_SPINS_PLUS(dict, spins - 1);
+
+    return ret;
 }
 
 // if a dictionary item can be freed, return true, otherwise return false
@@ -954,7 +1013,7 @@ static inline void **hashtable_insert_unsafe(DICTIONARY *dict, const char *name,
     JError_t J_Error;
     Pvoid_t *Rc = JudyHSIns(&dict->index.JudyHSArray, (void *)name, name_len, &J_Error);
     if (unlikely(Rc == PJERR)) {
-        fatal("DICTIONARY: Cannot insert entry with name '%s' to JudyHS, JU_ERRNO_* == %u, ID == %d",
+        error("DICTIONARY: Cannot insert entry with name '%s' to JudyHS, JU_ERRNO_* == %u, ID == %d",
               name, JU_ERRNO(&J_Error), JU_ERRID(&J_Error));
     }
 
@@ -1031,6 +1090,10 @@ static inline void item_linked_list_add(DICTIONARY *dict, DICTIONARY_ITEM *item)
     else
         DOUBLE_LINKED_LIST_APPEND_UNSAFE(dict->items.list, item, prev, next);
 
+    // clear the BEING created flag,
+    // after it has been inserted into the linked list
+    item_flag_clear(item, ITEM_FLAG_BEING_CREATED);
+
     garbage_collect_pending_deletes(dict);
     ll_recursive_unlock(dict, DICTIONARY_LOCK_WRITE);
 }
@@ -1088,6 +1151,7 @@ static DICTIONARY_ITEM *dict_item_create(DICTIONARY *dict __maybe_unused, size_t
     size_t size = sizeof(DICTIONARY_ITEM);
     item = callocz(1, size);
     item->refcount = 1;
+    item->flags = ITEM_FLAG_BEING_CREATED;
     *allocated_bytes += size;
 
     if(master_item) {
@@ -1218,6 +1282,9 @@ static void dict_item_reset_value_with_hooks(DICTIONARY *dict, DICTIONARY_ITEM *
 static size_t dict_item_free_with_hooks(DICTIONARY *dict, DICTIONARY_ITEM *item) {
     debug(D_DICTIONARY, "Destroying name value entry for name '%s'.", item_get_name(item));
 
+    if(!item_flag_check(item, ITEM_FLAG_DELETED))
+        DICTIONARY_ENTRIES_MINUS1(dict);
+
     size_t item_size = 0, key_size = 0, value_size = 0;
 
     key_size += item->key_len;
@@ -1260,20 +1327,51 @@ static void dict_item_shared_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item
     }
 }
 
-static inline void dict_item_free_or_mark_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) {
-    if(item_is_not_referenced_and_can_be_removed(dict, item)) {
-        dict_item_shared_set_deleted(dict, item);
-        item_linked_list_remove(dict, item);
-        dict_item_free_with_hooks(dict, item);
-    }
-    else {
-        dict_item_shared_set_deleted(dict, item);
-        item_flag_set(item, ITEM_FLAG_DELETED);
-        // after this point do not touch the item
-    }
+// returns true if we set the deleted flag on this item
+static bool dict_item_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) {
+    ITEM_FLAGS expected, desired;
+
+    do {
+        expected = __atomic_load_n(&item->flags, __ATOMIC_SEQ_CST);
+
+        if (expected & ITEM_FLAG_DELETED)
+            return false;
+
+        desired = expected | ITEM_FLAG_DELETED;
+
+    } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired,
+                                         false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
 
-    // the item is not available anymore
     DICTIONARY_ENTRIES_MINUS1(dict);
+    return true;
+}
+
+static inline void dict_item_free_or_mark_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) {
+    int rc = item_is_not_referenced_and_can_be_removed_advanced(dict, item);
+    switch(rc) {
+        case RC_ITEM_OK:
+            // the item is ours, refcount set to -100
+            dict_item_shared_set_deleted(dict, item);
+            item_linked_list_remove(dict, item);
+            dict_item_free_with_hooks(dict, item);
+            break;
+
+        case RC_ITEM_IS_REFERENCED:
+        case RC_ITEM_IS_CURRENTLY_BEING_CREATED:
+            // the item is currently referenced by others
+            dict_item_shared_set_deleted(dict, item);
+            dict_item_set_deleted(dict, item);
+            // after this point do not touch the item
+            break;
+
+        case RC_ITEM_IS_CURRENTLY_BEING_DELETED:
+            // an item that is currently being deleted by someone else - don't touch it
+            break;
+
+        default:
+            internal_error(true, "Hey dev! You forgot to add the new condition here!");
+            break;
+    }
 }
 
 // this is used by traversal functions to remove the current item
@@ -1404,10 +1502,11 @@ static DICTIONARY_ITEM *dict_item_add_or_reset_value_and_acquire(DICTIONARY *dic
             dictionary_index_lock_unlock(dict);
 
             item_linked_list_add(dict, item);
+
             added_or_updated = true;
         }
         else {
-            if(item_check_and_acquire_advanced(dict, *item_pptr, true) != ITEM_OK) {
+            if(item_check_and_acquire_advanced(dict, *item_pptr, true) != RC_ITEM_OK) {
                 spins++;
                 continue;
             }
@@ -1443,6 +1542,7 @@ static DICTIONARY_ITEM *dict_item_add_or_reset_value_and_acquire(DICTIONARY *dic
                 }
 
                 else {
+                    // conflict callback returned false
                     // we did really nothing!
                     ;
                 }
@@ -1530,11 +1630,10 @@ static bool dictionary_free_all_resources(DICTIONARY *dict, size_t *mem, bool fo
         // cache item->next
         // because we are going to free item
         DICTIONARY_ITEM *item_next = item->next;
+
         item_size += dict_item_free_with_hooks(dict, item);
         item = item_next;
 
-        DICTIONARY_ENTRIES_MINUS1(dict);
-
         // to speed up destruction, we don't
         // unlink item from the linked-list here
 
@@ -1801,22 +1900,11 @@ void dictionary_flush(DICTIONARY *dict) {
     if(unlikely(!dict))
         return;
 
-//    // delete the index
-//    dictionary_index_lock_wrlock(dict);
-//    hashtable_destroy_unsafe(dict);
-//    dictionary_index_lock_unlock(dict);
-
-    // delete all items
-    ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE); // get write lock here, to speed it up (it is recursive)
-    DICTIONARY_ITEM *item, *item_next;
-    for (item = dict->items.list; item; item = item_next) {
-        item_next = item->next;
-
-//        if(!item_flag_check(item, ITEM_FLAG_DELETED))
-//            dict_item_free_or_mark_deleted(dict, item);
-        dict_item_del(dict, item_get_name(item), (ssize_t)item_get_name_len(item));
+    void *value;
+    dfe_start_write(dict, value) {
+        dictionary_del_advanced(dict, item_get_name(value_dfe.item), (ssize_t)item_get_name_len(value_dfe.item) + 1);
     }
-    ll_recursive_unlock(dict, DICTIONARY_LOCK_WRITE);
+    dfe_done(value);
 
     DICTIONARY_STATS_DICT_FLUSHES_PLUS1(dict);
 }
@@ -2905,8 +2993,29 @@ static void *unittest_dict_thread(void *arg) {
         }
 
         dictionary_acquired_item_release(tu->dict, item);
-
         dictionary_del(tu->dict, "dict thread checking 1234567890");
+
+        // test concurrent deletions and flushes
+        {
+            if(gettid() % 2) {
+                char buf [256 + 1];
+
+                for (int i = 0; i < 1000; i++) {
+                    snprintfz(buf, 256, "del/flush test %d", i);
+                    dictionary_set(tu->dict, buf, NULL, 0);
+                }
+
+                for (int i = 0; i < 1000; i++) {
+                    snprintfz(buf, 256, "del/flush test %d", i);
+                    dictionary_del(tu->dict, buf);
+                }
+            }
+            else {
+                for (int i = 0; i < 10; i++) {
+                    dictionary_flush(tu->dict);
+                }
+            }
+        }
     }
 
     return arg;
@@ -2955,23 +3064,27 @@ static int dictionary_unittest_threads() {
             ", deletes %zu"
             ", searches %zu"
             ", resets %zu"
+            ", flushes %zu"
             ", entries %ld"
             ", referenced_items %ld"
             ", pending deletions %ld"
             ", check spins %zu"
             ", insert spins %zu"
+            ", delete spins %zu"
             ", search ignores %zu"
             "\n",
             tu.dict->stats->ops.inserts,
             tu.dict->stats->ops.deletes,
             tu.dict->stats->ops.searches,
             tu.dict->stats->ops.resets,
+            tu.dict->stats->ops.flushes,
             tu.dict->entries,
             tu.dict->referenced_items,
             tu.dict->pending_deletion_items,
-            tu.dict->stats->spin_locks.use,
-            tu.dict->stats->spin_locks.insert,
-            tu.dict->stats->spin_locks.search
+            tu.dict->stats->spin_locks.use_spins,
+            tu.dict->stats->spin_locks.insert_spins,
+            tu.dict->stats->spin_locks.delete_spins,
+            tu.dict->stats->spin_locks.search_spins
     );
     dictionary_destroy(tu.dict);
     tu.dict = NULL;
@@ -3118,6 +3231,7 @@ static int dictionary_unittest_view_threads() {
             ", pending deletions %ld"
             ", check spins %zu"
             ", insert spins %zu"
+            ", delete spins %zu"
             ", search ignores %zu"
             "\n",
             stats_master.ops.inserts,
@@ -3127,9 +3241,10 @@ static int dictionary_unittest_view_threads() {
             tv.master->entries,
             tv.master->referenced_items,
             tv.master->pending_deletion_items,
-            stats_master.spin_locks.use,
-            stats_master.spin_locks.insert,
-            stats_master.spin_locks.search
+            stats_master.spin_locks.use_spins,
+            stats_master.spin_locks.insert_spins,
+            stats_master.spin_locks.delete_spins,
+            stats_master.spin_locks.search_spins
     );
     fprintf(stderr,
             "VIEW  : inserts %zu"
@@ -3141,6 +3256,7 @@ static int dictionary_unittest_view_threads() {
             ", pending deletions %ld"
             ", check spins %zu"
             ", insert spins %zu"
+            ", delete spins %zu"
             ", search ignores %zu"
             "\n",
             stats_view.ops.inserts,
@@ -3150,9 +3266,10 @@ static int dictionary_unittest_view_threads() {
             tv.view->entries,
             tv.view->referenced_items,
             tv.view->pending_deletion_items,
-            stats_view.spin_locks.use,
-            stats_view.spin_locks.insert,
-            stats_view.spin_locks.search
+            stats_view.spin_locks.use_spins,
+            stats_view.spin_locks.insert_spins,
+            stats_view.spin_locks.delete_spins,
+            stats_view.spin_locks.search_spins
     );
     dictionary_destroy(tv.master);
     dictionary_destroy(tv.view);

+ 4 - 3
libnetdata/dictionary/dictionary.h

@@ -98,9 +98,10 @@ struct dictionary_stats {
 
     // spin locks
     struct {
-        size_t use;                 // number of times a reference to item had to spin to acquire it or ignore it
-        size_t search;              // number of times a successful search result had to be thrown away
-        size_t insert;              // number of times an insertion to the hash table had to be repeated
+        size_t use_spins;           // number of times a reference to item had to spin to acquire it or ignore it
+        size_t search_spins;        // number of times a successful search result had to be thrown away
+        size_t insert_spins;        // number of times an insertion to the hash table had to be repeated
+        size_t delete_spins;        // number of times a deletion had to spin to get a decision
     } spin_locks;
 };
 

+ 1 - 1
libnetdata/log/log.c

@@ -861,7 +861,7 @@ void error_limit_int(ERROR_LIMIT *erl, const char *prefix, const char *file __ma
     va_end( args );
 
     if(erl->count > 1)
-        fprintf(stderr, " (repeated %zu times in the last %llu secs)", erl->count, (unsigned long long)(erl->last_logged ? now - erl->last_logged : 0));
+        fprintf(stderr, " (similar messages repeated %zu times in the last %llu secs)", erl->count, (unsigned long long)(erl->last_logged ? now - erl->last_logged : 0));
 
     if(erl->sleep_ut)
         fprintf(stderr, " (sleeping for %llu microseconds every time this happens)", erl->sleep_ut);

+ 219 - 132
streaming/replication.c

@@ -3,7 +3,7 @@
 #include "replication.h"
 #include "Judy.h"
 
-#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 30
+#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
 
 static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
@@ -23,8 +23,8 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
     memset(data, 0, sizeof(data));
 
     if(enable_streaming && st->last_updated.tv_sec > before) {
-        internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu",
-                       rrdset_id(st),
+        internal_error(true, "REPLAY: 'host:%s/chart:%s' overwriting replication before from %llu to %llu",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)before,
                        (unsigned long long)st->last_updated.tv_sec
         );
@@ -65,7 +65,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
                 data[i].sp = ops->next_metric(&data[i].handle);
 
             internal_error(max_skip <= 0,
-                           "REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
+                           "REPLAY: 'host:%s/chart:%s', dimension '%s': db does not advance the query beyond time %llu",
                             rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
 
             if(data[i].sp.end_time < now)
@@ -84,7 +84,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
         time_t wall_clock_time = now_realtime_sec();
         if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + 1) {
             internal_error(true,
-                           "REPLAY: host '%s', chart '%s': db provided future start time %llu or end time %llu (now is %llu)",
+                           "REPLAY: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
                             rrdhost_hostname(st->rrdhost), rrdset_id(st),
                            (unsigned long long)min_start_time,
                            (unsigned long long)min_end_time,
@@ -93,9 +93,11 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
         }
 
         if(min_end_time < now) {
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
             internal_error(true,
-                           "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
+                           "REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
                            rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
             break;
         }
 
@@ -130,23 +132,23 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
         now = min_end_time + 1;
     }
 
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
     if(actual_after) {
         char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
         log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
         log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
         internal_error(true,
-                       "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
+                       "REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
                        (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
     }
     else
         internal_error(true,
-                       "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)",
+                       "REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
                        rrdhost_hostname(st->rrdhost), rrdset_id(st),
                        (unsigned long long)after, (unsigned long long)before);
-#endif
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
 
     // release all the dictionary items acquired
     // finalize the queries
@@ -193,8 +195,9 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     time_t first_entry_local = rrdset_first_entry_t(st);
     if(first_entry_local > now + tolerance) {
         internal_error(true,
-                       "RRDSET: '%s' first time %llu is in the future (now is %llu)",
-                       rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
+                       "RRDSET: 'host:%s/chart:%s' first time %llu is in the future (now is %llu)",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                       (unsigned long long)first_entry_local, (unsigned long long)now);
         first_entry_local = now;
     }
 
@@ -205,15 +208,16 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     time_t last_entry_local = st->last_updated.tv_sec;
     if(!last_entry_local) {
         internal_error(true,
-                       "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
-                       rrdset_id(st));
+                       "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st));
         last_entry_local = rrdset_last_entry_t(st);
     }
 
     if(last_entry_local > now + tolerance) {
         internal_error(true,
-                       "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
-                       rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+                       "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                       (unsigned long long)last_entry_local, (unsigned long long)now);
         last_entry_local = now;
     }
 
@@ -263,51 +267,90 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
     return enable_streaming;
 }
 
-static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
+// ----------------------------------------------------------------------------
+// sending replication requests
 
-    if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || after < st->rrdhost->receiver->replication_first_time_t))
-        st->rrdhost->receiver->replication_first_time_t = after;
+struct replication_request_details {
+    struct {
+        send_command callback;
+        void *data;
+    } caller;
 
-#ifdef NETDATA_INTERNAL_CHECKS
-    if(after && before) {
-        char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
-        log_date(after_buf, LOG_DATE_LENGTH, after);
-        log_date(before_buf, LOG_DATE_LENGTH, before);
-        internal_error(true,
-                       "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s",
-                       rrdhost_hostname(st->rrdhost), rrdset_id(st),
-                       (unsigned long long)after, after_buf, (unsigned long long)before, before_buf,
-                       start_streaming?"true":"false");
-    }
-    else {
-        internal_error(true,
-                       "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s",
-                       rrdhost_hostname(st->rrdhost), rrdset_id(st),
-                       start_streaming?"true":"false");
-    }
-#endif
+    RRDHOST *host;
+    RRDSET *st;
 
-#ifdef NETDATA_INTERNAL_CHECKS
-    internal_error(
-            st->replay.after != 0 || st->replay.before != 0,
-            "REPLAY ERROR: host '%s', chart '%s': sending replication request, while there is another inflight",
-            rrdhost_hostname(st->rrdhost), rrdset_id(st)
-            );
-
-    st->replay.start_streaming = start_streaming;
-    st->replay.after = after;
-    st->replay.before = before;
-#endif
+    struct {
+        time_t first_entry_t;               // the first entry time the child has
+        time_t last_entry_t;                // the last entry time the child has
+    } child_db;
+
+    struct {
+        time_t first_entry_t;               // the first entry time we have
+        time_t last_entry_t;                // the last entry time we have
+        bool last_entry_t_adjusted_to_now;  // true, if the last entry time was in the future and we fixed
+    } local_db;
+
+    struct {
+        time_t from;                        // the starting time of the entire gap we have
+        time_t to;                          // the ending time of the entire gap we have
+    } gap;
 
-    debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
-          rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
+    struct {
+        time_t after;                       // the start time we requested previously from this child
+        time_t before;                      // the end time we requested previously from this child
+    } last_request;
+
+    struct {
+        time_t after;                       // the start time of this replication request - the child will add 1 second
+        time_t before;                      // the end time of this replication request
+        bool start_streaming;               // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
+    } wanted;
+
+    time_t now;                             // the current wall clock time
+};
+
+static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
+    RRDSET *st = r->st;
+
+    if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
+        st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+    char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
+
+    if(r->wanted.after)
+        log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
+
+    if(r->wanted.before)
+        log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
+
+    internal_error(true,
+                   "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
+                   "last[%ld - %ld] child[%ld - %ld] local[%ld - %ld %s] gap[%ld - %ld %s] %s"
+                   , rrdhost_hostname(r->host), rrdset_id(r->st)
+                   , r->wanted.after, wanted_after_buf
+                   , r->wanted.before, wanted_before_buf
+                   , r->wanted.start_streaming ? "YES" : "NO"
+                   , msg
+                   , r->last_request.after, r->last_request.before
+                   , r->child_db.first_entry_t, r->child_db.last_entry_t
+                   , r->local_db.first_entry_t, r->local_db.last_entry_t, r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW"
+                   , r->gap.from, r->gap.to
+                   , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
+                   , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
+                   );
+
+    st->replay.start_streaming = r->wanted.start_streaming;
+    st->replay.after = r->wanted.after;
+    st->replay.before = r->wanted.before;
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
 
     char buffer[2048 + 1];
     snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
-                      rrdset_id(st), start_streaming ? "true" : "false",
-                      (unsigned long long)after, (unsigned long long)before);
+              rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
+              (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
 
-    int ret = callback(buffer, callback_data);
+    int ret = r->caller.callback(buffer, r->caller.data);
     if (ret < 0) {
         error("REPLICATION: failed to send replication request to child (error %d)", ret);
         return false;
@@ -320,81 +363,110 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
                              time_t first_entry_child, time_t last_entry_child,
                              time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
 {
-    time_t now = now_realtime_sec();
-
-    // if replication is disabled, send an empty replication request
-    // asking no data
-    if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) {
-        internal_error(true,
-                       "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
-                       rrdhost_hostname(host), rrdset_id(st));
+    struct replication_request_details r = {
+            .caller = {
+                    .callback = callback,
+                    .data = callback_data,
+            },
+
+            .host = host,
+            .st = st,
+
+            .child_db = {
+                    .first_entry_t = first_entry_child,
+                    .last_entry_t = last_entry_child,
+            },
+
+            .last_request = {
+                    .after = prev_first_entry_wanted,
+                    .before = prev_last_entry_wanted,
+            },
+
+            .wanted = {
+                    .after = 0,
+                    .before = 0,
+                    .start_streaming = true,
+            },
+
+            .now = now_realtime_sec(),
+    };
 
-        return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+    // get our local database retention
+    r.local_db.first_entry_t = rrdset_first_entry_t(st);
+    r.local_db.last_entry_t = rrdset_last_entry_t(st);
+    if(r.local_db.last_entry_t > r.now) {
+        r.local_db.last_entry_t = r.now;
+        r.local_db.last_entry_t_adjusted_to_now = true;
     }
 
-    // Child has no stored data
-    if (!last_entry_child) {
-        error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data",
-              rrdhost_hostname(host), rrdset_id(st));
+    // let's find the GAP we have
+    if(!r.last_request.after || !r.last_request.before) {
+        // there is no previous request
+
+        if(r.local_db.last_entry_t)
+            // we have some data, let's continue from the last point we have
+            r.gap.from = r.local_db.last_entry_t;
+        else
+            // we don't have any data, the gap is the max timeframe we are allowed to replicate
+            r.gap.from = r.now - r.host->rrdpush_seconds_to_replicate;
 
-        return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
+    }
+    else {
+        // we had sent a request - let's continue at the point we left it
+        // for this we don't take into account the actual data in our db
+        // because the child may also have gaps and we need to get over it
+        r.gap.from = r.last_request.before;
     }
 
-    // Nothing to get if the chart has not dimensions
-    if (!rrdset_number_of_dimensions(st)) {
-        error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions",
-              rrdhost_hostname(host), rrdset_id(st));
+    // we want all the data up to now
+    r.gap.to = r.now;
 
-        return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
-    }
+    // The gap is now r.gap.from -> r.gap.to
 
-    // if the child's first/last entries are nonsensical, resume streaming
-    // without asking for any data
-    if (first_entry_child <= 0) {
-        error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)",
-              rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child);
+    if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
+        return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
 
-        return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
-    }
+    if (unlikely(!r.child_db.last_entry_t))
+        return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
 
-    if (first_entry_child > last_entry_child) {
-        error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)",
-              rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+    if (unlikely(!rrdset_number_of_dimensions(st)))
+        return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
 
-        return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
-    }
+    if (r.child_db.first_entry_t <= 0)
+        return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
 
-    time_t last_entry_local = rrdset_last_entry_t(st);
-    if(last_entry_local > now) {
-        internal_error(true,
-                       "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.",
-                       rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
-        last_entry_local = now;
-    }
+    if (r.child_db.first_entry_t > r.child_db.last_entry_t)
+        return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
 
-    // should never happen but if it does, start streaming without asking for any data
-    if (last_entry_local > last_entry_child) {
-        error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
-              rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
+    if (r.local_db.last_entry_t > r.child_db.last_entry_t)
+        return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
 
-        return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
-    }
+    // let's find what the child can provide to fill that gap
 
-    time_t first_entry_wanted;
-    if (prev_first_entry_wanted && prev_last_entry_wanted) {
-        first_entry_wanted = prev_last_entry_wanted;
-        if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate)
-            first_entry_wanted = now - host->rrdpush_seconds_to_replicate;
-    }
+    if(r.child_db.first_entry_t > r.gap.from)
+        // the child does not have all the data - let's get what it has
+        r.wanted.after = r.child_db.first_entry_t;
+    else
+        // ok, the child can fill the entire gap we have
+        r.wanted.after = r.gap.from;
+
+    if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
+        // the duration is too big for one request - let's take the first step
+        r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
     else
-        first_entry_wanted = MAX(last_entry_local, first_entry_child);
+        // wow, we can do it in one request
+        r.wanted.before = r.gap.to;
 
-    time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step;
-    last_entry_wanted = MIN(last_entry_wanted, last_entry_child);
+    // don't ask from the child more than it has
+    if(r.wanted.before > r.child_db.last_entry_t)
+        r.wanted.before = r.child_db.last_entry_t;
 
-    bool start_streaming = (last_entry_wanted == last_entry_child);
+    // the child should start streaming immediately if the wanted duration is small
+    r.wanted.start_streaming = (r.wanted.before == r.child_db.last_entry_t);
 
-    return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
+    // the wanted timeframe is now r.wanted.after -> r.wanted.before
+    // send it
+    return send_replay_chart_cmd(&r, "OK");
 }
 
 // ----------------------------------------------------------------------------
@@ -633,6 +705,7 @@ static struct replication_request replication_request_get_first_available() {
             else if(sender_has_room_to_spare) {
                 // copy the request to return it
                 rq = *rse->rq;
+                rq.chart_id = string_dup(rq.chart_id);
 
                 // set the return result to found
                 rq.found = true;
@@ -662,14 +735,6 @@ static void replication_request_react_callback(const DICTIONARY_ITEM *item __may
     struct sender_state *s = sender_state; (void)s;
     struct replication_request *rq = value;
 
-    RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
-    if(!st) {
-        internal_error(true, "REPLAY: chart '%s' not found on host '%s'",
-                       string2str(rq->chart_id), rrdhost_hostname(rq->sender->host));
-    }
-    else
-        rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
-
     // IMPORTANT:
     // We use the react instead of the insert callback
     // because we want the item to be atomically visible
@@ -691,12 +756,26 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __
     struct replication_request *rq = old_value; (void)rq;
     struct replication_request *rq_new = new_value;
 
-    internal_error(
-            true,
-            "STREAM %s [send to %s]: REPLAY ERROR: ignoring duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
-            rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
-            (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
-            (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+        replication_recursive_lock();
+
+        if(!rq->indexed_in_judy) {
+            replication_sort_entry_add(rq);
+            internal_error(
+                    true,
+                    "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+                    rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+                    (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+                    (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+        }
+        else
+            internal_error(
+                    true,
+                    "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+                    rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
+                    (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
+                    (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
+
+        replication_recursive_unlock();
 
 //    bool updated_after = false, updated_before = false, updated_start_streaming = false, updated = false;
 //
@@ -880,7 +959,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
             worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, (NETDATA_DOUBLE)done * 100.0 / (NETDATA_DOUBLE)total);
         }
 
-        if(!rq.found) {
+        if(unlikely(!rq.found)) {
             worker_is_idle();
 
             if(!rep.requests_count)
@@ -898,7 +977,15 @@ void *replication_thread_main(void *ptr __maybe_unused) {
         else {
             // delete the request from the dictionary
             worker_is_busy(WORKER_JOB_DELETE_ENTRY);
-            dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id));
+            if(!dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)))
+                error("REPLAY: 'host:%s/chart:%s' failed to be deleted from sender dictionary",
+                      rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
+
+            if(rq.sender->replication_pending_requests == 0 && dictionary_entries(rq.sender->replication_requests) != 0)
+                error("REPLAY: 'host:%s/chart:%s' sender dictionary has %zu entries, but sender pending requests are %zu",
+                      rrdhost_hostname(rq.sender->host), string2str(rq.chart_id),
+                      dictionary_entries(rq.sender->replication_requests),
+                      rq.sender->replication_pending_requests);
         }
 
         worker_is_busy(WORKER_JOB_FIND_CHART);
@@ -910,13 +997,6 @@ void *replication_thread_main(void *ptr __maybe_unused) {
             continue;
         }
 
-        if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
-            rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
-            rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
-            rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
-        }
-        rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
-
         worker_is_busy(WORKER_JOB_QUERYING);
 
         latest_first_time_t = rq.after;
@@ -947,11 +1027,18 @@ void *replication_thread_main(void *ptr __maybe_unused) {
                 rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
                 rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
                 rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+                internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
+                               rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
             }
             else
                 internal_error(true, "REPLAY ERROR: received start streaming command for chart '%s' or host '%s', but the chart is not in progress replicating",
                                string2str(rq.chart_id), rrdhost_hostname(st->rrdhost));
         }
+
+        string_freez(rq.chart_id);
     }
 
     netdata_thread_cleanup_pop(1);

+ 17 - 5
streaming/rrdpush.c

@@ -298,21 +298,32 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
 
         if(!last_entry_local) {
             internal_error(true,
-                           "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
-                           rrdset_id(st));
+                           "RRDSET: 'host:%s/chart:%s' db reports last updated time zero.",
+                           rrdhost_hostname(st->rrdhost), rrdset_id(st));
 
             last_entry_local = rrdset_last_entry_t(st);
             time_t now = now_realtime_sec();
+
             if(last_entry_local > now) {
                 internal_error(true,
-                               "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
-                               rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+                               "RRDSET: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
+                               rrdhost_hostname(st->rrdhost), rrdset_id(st),
+                               (unsigned long long)last_entry_local, (unsigned long long)now);
                 last_entry_local = now;
             }
         }
 
         buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu\n",
                        (unsigned long long)first_entry_local, (unsigned long long)last_entry_local);
+
+        rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+        rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+        rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+        internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
+                       rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
     }
 
     st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
@@ -344,7 +355,8 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
             buffer_fast_strcat(wb, "\n", 1);
         }
         else {
-            internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+            internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
+                           rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
             // we will include it in the next iteration
             rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
         }

+ 7 - 3
streaming/sender.c

@@ -23,9 +23,10 @@
 #define WORKER_SENDER_JOB_BYTES_SENT                17
 #define WORKER_SENDER_JOB_REPLAY_REQUEST            18
 #define WORKER_SENDER_JOB_FUNCTION_REQUEST          19
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE          20
 
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 20
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 20
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
 #endif
 
 extern struct config stream_config;
@@ -225,7 +226,7 @@ static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
 
     RRDSET *st;
     rrdset_foreach_read(st, host) {
-        rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_QUEUED);
+        rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
         rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
 
         st->upstream_resync_time = 0;
@@ -1099,6 +1100,7 @@ void *rrdpush_sender_thread(void *ptr) {
     worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
     worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
     worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
+    worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
 
     struct sender_state *s = ptr;
     s->tid = gettid();
@@ -1342,6 +1344,8 @@ void *rrdpush_sender_thread(void *ptr) {
                   rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
             rrdpush_sender_thread_close_socket(s->host);
         }
+
+        worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication_requests));
     }
 
     netdata_thread_cleanup_pop(1);