Browse Source

fix post-processing of contexts (#13807)

Costa Tsaousis 2 years ago
parent
commit
ec95f306f3
1 changed files with 56 additions and 12 deletions
  1. 56 12
      database/rrdcontext.c

+ 56 - 12
database/rrdcontext.c

@@ -155,6 +155,7 @@ rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditio
                                | RRD_FLAG_DELETED                                                               \
                                | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED                                 \
                                | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION                                          \
+                               | RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD                                      \
         )
 
 #define rrd_flag_set_archived(obj)                                                                              \
@@ -272,9 +273,9 @@ typedef struct rrdinstance {
     DICTIONARY *rrdmetrics;
 
     struct {
-        uint32_t collected_metrics;     // a temporary variable to detect BEGIN/END without SET
-                                        // don't use it for other purposes
-                                        // it goes up and then resets to zero, on every iteration
+        uint32_t collected_metrics_count;   // a temporary variable to detect BEGIN/END without SET
+                                            // don't use it for other purposes
+                                            // it goes up and then resets to zero, on every iteration
     } internal;
 } RRDINSTANCE;
 
@@ -297,12 +298,20 @@ typedef struct rrdcontext {
     DICTIONARY *rrdinstances;
     RRDHOST *rrdhost;
 
+    struct {
+        RRD_FLAGS queued_flags;         // the last flags that triggered the post-processing
+        usec_t queued_ut;               // the last time this was queued
+        usec_t dequeued_ut;             // the last time we sent (or deduplicated) this context
+        size_t executions;              // how many times this context has been processed
+    } pp;
+
     struct {
         RRD_FLAGS queued_flags;         // the last flags that triggered the queueing
         usec_t queued_ut;               // the last time this was queued
         usec_t delay_calc_ut;           // the last time we calculated the scheduled_dispatched_ut
         usec_t scheduled_dispatch_ut;   // the time it was/is scheduled to be sent
-        usec_t dequeued_ut;             // the last time we sent (or deduped) this context
+        usec_t dequeued_ut;             // the last time we sent (or deduplicated) this context
+        size_t dispatches;              // the number of times this has been dispatched to hub
     } queue;
 
     netdata_mutex_t mutex;
@@ -639,7 +648,7 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
         rrd_flag_set_collected(rm);
 
     // we use this variable to detect BEGIN/END without SET
-    rm->ri->internal.collected_metrics++;
+    rm->ri->internal.collected_metrics_count++;
 
     rrdmetric_trigger_updates(rm, __FUNCTION__ );
 }
@@ -1079,11 +1088,11 @@ static inline void rrdinstance_collected_rrdset(RRDSET *st) {
 
     rrdinstance_updated_rrdset_flags_no_action(ri, st);
 
-    if(unlikely(ri->internal.collected_metrics && !rrd_flag_is_collected(ri)))
+    if(unlikely(ri->internal.collected_metrics_count && !rrd_flag_is_collected(ri)))
         rrd_flag_set_collected(ri);
 
     // we use this variable to detect BEGIN/END without SET
-    ri->internal.collected_metrics = 0;
+    ri->internal.collected_metrics_count = 0;
 
     rrdinstance_trigger_updates(ri, __FUNCTION__ );
 }
@@ -1273,11 +1282,31 @@ static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item _
 static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
     RRDCONTEXT *rc = context;
     rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+    rc->pp.queued_flags = rc->flags;
+    rc->pp.queued_ut = now_realtime_usec();
 }
 
 static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
     RRDCONTEXT *rc = context;
     rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+    rc->pp.dequeued_ut = now_realtime_usec();
+}
+
+static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
+    RRDCONTEXT *rc = context;
+    bool changed = false;
+
+    if(!(rc->flags & RRD_FLAG_QUEUED_FOR_POST_PROCESSING)) {
+        rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+        changed = true;
+    }
+
+    if(rc->pp.queued_flags != rc->flags) {
+        rc->pp.queued_flags |= rc->flags;
+        changed = true;
+    }
+
+    return changed;
 }
 
 void rrdhost_create_rrdcontexts(RRDHOST *host) {
@@ -1291,15 +1320,14 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) {
     dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, host);
 
     host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
-
     dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
     dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
     dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
 
     host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
-
-    dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
-    dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
+    dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
+    dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
+    dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_conflict_callback, NULL);
 }
 
 void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
@@ -1396,7 +1424,6 @@ void rrdcontext_host_child_connected(RRDHOST *host) {
 }
 
 void rrdcontext_host_child_disconnected(RRDHOST *host) {
-
     rrdcontext_recalculate_host_retention(host, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD, false);
 }
 
@@ -1894,14 +1921,29 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void
                        ",\n\t\t\t\"last_queued\":%llu"
                        ",\n\t\t\t\"scheduled_dispatch\":%llu"
                        ",\n\t\t\t\"last_dequeued\":%llu"
+                       ",\n\t\t\t\"dispatches\":%zu"
                        ",\n\t\t\t\"hub_version\":%"PRIu64""
                        ",\n\t\t\t\"version\":%"PRIu64""
                        , rc->queue.queued_ut / USEC_PER_SEC
                        , rc->queue.scheduled_dispatch_ut / USEC_PER_SEC
                        , rc->queue.dequeued_ut / USEC_PER_SEC
+                       , rc->queue.dispatches
                        , rc->hub.version
                        , rc->version
                        );
+
+        buffer_strcat(wb, ",\n\t\t\t\"pp_reasons\":\"");
+        rrd_reasons_to_buffer(rc->pp.queued_flags, wb);
+        buffer_strcat(wb, "\"");
+
+        buffer_sprintf(wb,
+                       ",\n\t\t\t\"pp_last_queued\":%llu"
+                       ",\n\t\t\t\"pp_last_dequeued\":%llu"
+                       ",\n\t\t\t\"pp_executed\":%zu"
+                       , rc->pp.queued_ut / USEC_PER_SEC
+                       , rc->pp.dequeued_ut / USEC_PER_SEC
+                       , rc->pp.executions
+        );
     }
 
     rrdcontext_unlock(rc);
@@ -2609,6 +2651,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
     }
 
     rrdcontext_lock(rc);
+    rc->pp.executions++;
 
     if(unlikely(!instances_active)) {
         // we had some instances, but they are gone now...
@@ -2931,6 +2974,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
                 rrdcontext_message_send_unsafe(rc, false, bundle);
                 messages_added++;
 
+                rc->queue.dispatches++;
                 rc->queue.dequeued_ut = now_ut;
             }
             else