Browse Source

Faster streaming by 25% on the child (#13708)

* faster printing of BEGIN, SET, END

* fewer conditions

* faster buffer_fast_strcat()

* faster buffer_fast_strcat() fix

* eliminate atomic operations and conditions in the BEGIN, SET, END flow

* removed unecessary condition
Costa Tsaousis 2 years ago
parent
commit
12cd398894
4 changed files with 76 additions and 45 deletions
  1. 3 3
      database/rrddim.c
  2. 12 0
      libnetdata/buffer/buffer.c
  3. 1 0
      libnetdata/buffer/buffer.h
  4. 60 42
      streaming/rrdpush.c

+ 3 - 3
database/rrddim.c

@@ -363,8 +363,8 @@ inline int rrddim_set_algorithm(RRDSET *st, RRDDIM *rd, RRD_ALGORITHM algorithm)
     debug(D_RRD_CALLS, "Updating algorithm of dimension '%s/%s' from %s to %s", rrdset_id(st), rrddim_name(rd), rrd_algorithm_name(rd->algorithm), rrd_algorithm_name(algorithm));
     rd->algorithm = algorithm;
     rd->exposed = 0;
-    rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
     rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+    rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
     rrdcontext_updated_rrddim_algorithm(rd);
     return 1;
 }
@@ -376,8 +376,8 @@ inline int rrddim_set_multiplier(RRDSET *st, RRDDIM *rd, collected_number multip
     debug(D_RRD_CALLS, "Updating multiplier of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, rrdset_id(st), rrddim_name(rd), rd->multiplier, multiplier);
     rd->multiplier = multiplier;
     rd->exposed = 0;
-    rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
     rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+    rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
     rrdcontext_updated_rrddim_multiplier(rd);
     return 1;
 }
@@ -389,8 +389,8 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor)
     debug(D_RRD_CALLS, "Updating divisor of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, rrdset_id(st), rrddim_name(rd), rd->divisor, divisor);
     rd->divisor = divisor;
     rd->exposed = 0;
-    rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
     rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+    rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
     rrdcontext_updated_rrddim_divisor(rd);
     return 1;
 }

+ 12 - 0
libnetdata/buffer/buffer.c

@@ -136,6 +136,18 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue)
     wb->len += wstr - str;
 }
 
+void buffer_print_ll(BUFFER *wb, long long value)
+{
+    buffer_need_bytes(wb, 50);
+
+    if(value < 0) {
+        buffer_fast_strcat(wb, "-", 1);
+        value = -value;
+    }
+
+    buffer_print_llu(wb, value);
+}
+
 void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) {
     if(unlikely(!txt || !*txt)) return;
 

+ 1 - 0
libnetdata/buffer/buffer.h

@@ -78,6 +78,7 @@ extern char *print_number_llu_r(char *str, unsigned long long uvalue);
 extern char *print_number_llu_r_smart(char *str, unsigned long long uvalue);
 
 extern void buffer_print_llu(BUFFER *wb, unsigned long long uvalue);
+extern void buffer_print_ll(BUFFER *wb, long long value);
 
 static inline void buffer_need_bytes(BUFFER *buffer, size_t needed_free_size) {
     if(unlikely(buffer->size - buffer->len < needed_free_size))

+ 60 - 42
streaming/rrdpush.c

@@ -129,29 +129,44 @@ int rrdpush_init() {
 unsigned int remote_clock_resync_iterations = 60;
 
 
-static inline int should_send_chart_matching(RRDSET *st) {
-    // Do not stream anomaly rates charts.
-    if (unlikely(rrdset_is_ar_chart(st)))
-        return false;
-
-    if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION))
-        return ml_streaming_enabled();
+static inline bool should_send_chart_matching(RRDSET *st) {
+    RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
 
-    if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
+    if(unlikely(!flags)) {
         RRDHOST *host = st->rrdhost;
 
-        if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
+        // Do not stream anomaly rates charts.
+        if (unlikely(rrdset_is_ar_chart(st))) {
+            rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+            rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+            flags = RRDSET_FLAG_UPSTREAM_IGNORE;
+        }
+        else if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) {
+            if(ml_streaming_enabled()) {
+                rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+                rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+                flags = RRDSET_FLAG_UPSTREAM_SEND;
+            }
+            else {
+                rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+                rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+                flags = RRDSET_FLAG_UPSTREAM_IGNORE;
+            }
+        }
+        else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
             simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) {
             rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
             rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+            flags = RRDSET_FLAG_UPSTREAM_SEND;
         }
         else {
             rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
             rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+            flags = RRDSET_FLAG_UPSTREAM_IGNORE;
         }
     }
 
-    return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
+    return flags & RRDSET_FLAG_UPSTREAM_SEND;
 }
 
 int configured_as_parent() {
@@ -173,22 +188,7 @@ int configured_as_parent() {
     return is_parent;
 }
 
-// checks if the current chart definition has been sent
-static inline int need_to_send_chart_definition(RRDSET *st) {
-    if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
-        return 1;
-
-    RRDDIM *rd;
-    dfe_start_read(st->rrddim_root_index, rd) {
-        if(unlikely(!rd->exposed)) {
-            internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
-            return 1;
-        }
-    }
-    dfe_done(rd);
-
-    return 0;
-}
+#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))
 
 // chart labels
 static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
@@ -276,22 +276,42 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) {
 // sends the current chart dimensions
 static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
     RRDHOST *host = st->rrdhost;
-    buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", rrdset_id(st), (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
-    if (s->version >= VERSION_GAP_FILLING)
-        buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec);
-    else
-        buffer_strcat(host->sender->build, "\n");
+    BUFFER *wb = host->sender->build;
+
+    buffer_fast_strcat(wb, "BEGIN \"", 7);
+    buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+    buffer_fast_strcat(wb, "\" ", 2);
+    buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
+
+    if (s->version >= VERSION_GAP_FILLING) {
+        buffer_fast_strcat(wb, " ", 1);
+        buffer_print_ll(wb, st->last_collected_time.tv_sec);
+    }
+
+    buffer_fast_strcat(wb, "\n", 1);
 
     size_t count_of_dimensions_written = 0;
     RRDDIM *rd;
     rrddim_foreach_read(rd, st) {
-        if(rd->updated && rd->exposed) {
-            buffer_sprintf(host->sender->build, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n", rrddim_id(rd), rd->collected_value);
+        if(unlikely(!rd->updated))
+            continue;
+
+        if(likely(rd->exposed)) {
+            buffer_fast_strcat(wb, "SET \"", 5);
+            buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+            buffer_fast_strcat(wb, "\" = ", 4);
+            buffer_print_ll(wb, rd->collected_value);
+            buffer_fast_strcat(wb, "\n", 1);
             count_of_dimensions_written++;
         }
+        else {
+            internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+            // we will include it in the next iteration
+            rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+        }
     }
     rrddim_foreach_done(rd);
-    buffer_strcat(host->sender->build, "END\n");
+    buffer_fast_strcat(wb, "END\n", 4);
 
     return count_of_dimensions_written != 0;
 }
@@ -350,15 +370,16 @@ void rrdset_done_push(RRDSET *st) {
 
     RRDHOST *host = st->rrdhost;
 
-    if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
-        rrdpush_sender_thread_spawn(host);
-
     // Handle non-connected case
     if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)
                  || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) {
 
+        if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
+            rrdpush_sender_thread_spawn(host);
+
         if(unlikely(!host->rrdpush_sender_error_shown))
             error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+
         host->rrdpush_sender_error_shown = 1;
 
         return;
@@ -368,15 +389,12 @@ void rrdset_done_push(RRDSET *st) {
         host->rrdpush_sender_error_shown = 0;
     }
 
-    if(dictionary_entries(st->rrddim_root_index) == 0)
-        return;
-
     sender_start(host->sender);
 
-    if(need_to_send_chart_definition(st))
+    if(unlikely(need_to_send_chart_definition(st)))
         rrdpush_send_chart_definition(st);
 
-    if(rrdpush_send_chart_metrics_nolock(st, host->sender)) {
+    if(likely(rrdpush_send_chart_metrics_nolock(st, host->sender))) {
         // signal the sender there are more data
         if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
             error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host));