Browse Source

fix chart definition end time_t printing and parsing (#13942)

* fix chart definition end time_t printing and parsing

* properly check parameters to chart definition end
Costa Tsaousis 2 years ago
parent
commit
04ecb72856
3 changed files with 41 additions and 3 deletions
  1. 16 2
      collectors/plugins.d/pluginsd_parser.c
  2. 7 0
      streaming/replication.c
  3. 18 1
      streaming/rrdpush.c

+ 16 - 2
collectors/plugins.d/pluginsd_parser.c

@@ -261,8 +261,16 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
 
 PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user)
 {
-    long first_entry_child = str2l(get_word(words, num_words, 1));
-    long last_entry_child = str2l(get_word(words, num_words, 2));
+    const char *first_entry_txt = get_word(words, num_words, 1);
+    const char *last_entry_txt = get_word(words, num_words, 2);
+
+    if(unlikely(!first_entry_txt || !last_entry_txt)) {
+        error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without first or last entry. Disabling it.");
+        return PARSER_RC_ERROR;
+    }
+
+    long first_entry_child = str2l(first_entry_txt);
+    long last_entry_child = str2l(last_entry_txt);
 
     PARSER_USER_OBJECT *user_object = (PARSER_USER_OBJECT *) user;
 
@@ -273,6 +281,12 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
         return PARSER_RC_ERROR;
     }
 
+    internal_error(
+               (first_entry_child != 0 || last_entry_child != 0)
+            && (first_entry_child == 0 || last_entry_child == 0),
+            "REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).",
+            (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+
     rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
 
     bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);

+ 7 - 0
streaming/replication.c

@@ -170,6 +170,13 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
 
     // find the latest entry we have
     time_t last_entry_local = st->last_updated.tv_sec;
+    if(!last_entry_local) {
+        internal_error(true,
+                       "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
+                       rrdset_id(st));
+        last_entry_local = rrdset_last_entry_t(st);
+    }
+
     if(last_entry_local > now) {
         internal_error(true,
                        "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",

+ 18 - 1
streaming/rrdpush.c

@@ -310,7 +310,24 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
     if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
         time_t first_entry_local = rrdset_first_entry_t(st);
         time_t last_entry_local = st->last_updated.tv_sec;
-        buffer_sprintf(wb, "CHART_DEFINITION_END %ld %ld\n", first_entry_local, last_entry_local);
+
+        if(!last_entry_local) {
+            internal_error(true,
+                           "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
+                           rrdset_id(st));
+
+            last_entry_local = rrdset_last_entry_t(st);
+            time_t now = now_realtime_sec();
+            if(last_entry_local > now) {
+                internal_error(true,
+                               "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
+                               rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
+                last_entry_local = now;
+            }
+        }
+
+        buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu\n",
+                       (unsigned long long)first_entry_local, (unsigned long long)last_entry_local);
     }
 
     st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);