Browse Source

fix rrdcontexts left in the post-processing queue from the garbage collector (#13645)

* fix rrdcontexts left in the post-processing queue from the garbage collector

* set the queuing flags atomically, using the dictionary callbacks
Costa Tsaousis 2 years ago
parent
commit
642e00348d
2 changed files with 67 additions and 49 deletions
  1. 61 48
      database/rrdcontext.c
  2. 6 1
      libnetdata/dictionary/dictionary.c

+ 61 - 48
database/rrdcontext.c

@@ -106,9 +106,7 @@ typedef enum {
 #define RRD_FLAGS_PREVENTING_DELETIONS                ( \
      RRD_FLAG_QUEUED_FOR_HUB                            \
     |RRD_FLAG_COLLECTED                                 \
-                                                        \
-    /* RRD_FLAG_QUEUED_FOR_POST_PROCESSING */           \
-    /* should not be here or nothing will be deleted */ \
+    |RRD_FLAG_QUEUED_FOR_POST_PROCESSING                \
 )
 
 // get all the flags of an object
@@ -325,18 +323,10 @@ static inline void rrdmetric_release(RRDMETRIC_ACQUIRED *rma) {
 // ----------------------------------------------------------------------------
 // helper one-liners for RRDINSTANCE
 
-static inline RRDINSTANCE_ACQUIRED *rrdinstance_dup(RRDINSTANCE_ACQUIRED *ria) {
-    return (RRDINSTANCE_ACQUIRED *)dictionary_acquired_item_dup((DICTIONARY_ITEM *)ria);
-}
-
 static inline RRDINSTANCE *rrdinstance_acquired_value(RRDINSTANCE_ACQUIRED *ria) {
     return dictionary_acquired_item_value((DICTIONARY_ITEM *)ria);
 }
 
-static inline const char *rrdinstance_acquired_name(RRDINSTANCE_ACQUIRED *ria) {
-    return dictionary_acquired_item_name((DICTIONARY_ITEM *)ria);
-}
-
 static inline void rrdinstance_release(RRDINSTANCE_ACQUIRED *ria) {
     RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
     dictionary_acquired_item_release(ri->rc->rrdinstances, (DICTIONARY_ITEM *)ria);
@@ -345,22 +335,10 @@ static inline void rrdinstance_release(RRDINSTANCE_ACQUIRED *ria) {
 // ----------------------------------------------------------------------------
 // helper one-liners for RRDCONTEXT
 
-static inline RRDCONTEXT_ACQUIRED *rrdcontext_dup(RRDCONTEXT_ACQUIRED *rca) {
-    return (RRDCONTEXT_ACQUIRED *)dictionary_acquired_item_dup((DICTIONARY_ITEM *)rca);
-}
-
-static inline const char *rrdcontext_acquired_name(RRDCONTEXT_ACQUIRED *rca) {
-    return dictionary_acquired_item_name((DICTIONARY_ITEM *)rca);
-}
-
 static inline RRDCONTEXT *rrdcontext_acquired_value(RRDCONTEXT_ACQUIRED *rca) {
     return dictionary_acquired_item_value((DICTIONARY_ITEM *)rca);
 }
 
-static inline RRDCONTEXT_ACQUIRED *rrdcontext_acquire(RRDHOST *host, const char *name) {
-    return (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item((DICTIONARY *)host->rrdctx, name);
-}
-
 static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) {
     RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
     dictionary_acquired_item_release((DICTIONARY *)rc->rrdhost->rrdctx, (DICTIONARY_ITEM *)rca);
@@ -374,7 +352,6 @@ static uint64_t rrdcontext_version_hash_with_callback(RRDHOST *host, void (*call
 
 static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs);
 static void rrdcontext_garbage_collect_for_all_hosts(void);
-void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
 
 #define rrdcontext_lock(rc) netdata_mutex_lock(&((rc)->mutex))
 #define rrdcontext_unlock(rc) netdata_mutex_unlock(&((rc)->mutex))
@@ -386,6 +363,9 @@ static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc);
 static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused);
 static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused);
 
+static void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
+
+static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc);
 static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function, RRD_FLAGS flags);
 static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs);
 
@@ -1279,6 +1259,37 @@ static void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) {
         rrdcontext_queue_for_post_processing(rc, function, rc->flags);
 }
 
+static void rrdcontext_hub_queue_insert_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+    RRDCONTEXT *rc = context;
+    rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
+    rc->queue.queued_ut = now_realtime_usec();
+    rc->queue.queued_flags = rrd_flags_get(rc);
+}
+
+static void rrdcontext_hub_queue_delete_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+    RRDCONTEXT *rc = context;
+    rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
+}
+
+static void rrdcontext_hub_queue_conflict_callback(const char *name __maybe_unused, void *context, void *new_context __maybe_unused, void *data __maybe_unused) {
+    // context and new_context are the same
+    // we just need to update the timings
+    RRDCONTEXT *rc = context;
+    rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
+    rc->queue.queued_ut = now_realtime_usec();
+    rc->queue.queued_flags |= rrd_flags_get(rc);
+}
+
+static void rrdcontext_post_processing_queue_insert_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+    RRDCONTEXT *rc = context;
+    rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+}
+
+static void rrdcontext_post_processing_queue_delete_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+    RRDCONTEXT *rc = context;
+    rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+}
+
 void rrdhost_create_rrdcontexts(RRDHOST *host) {
     if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
         return;
@@ -1292,8 +1303,20 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) {
     dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, (void *)host);
     dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, (void *)host);
 
-    host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
-    host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
+    host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(
+         DICTIONARY_FLAG_DONT_OVERWRITE_VALUE
+        |DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
+
+    dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
+    dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
+    dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
+
+    host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(
+         DICTIONARY_FLAG_DONT_OVERWRITE_VALUE
+        |DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
+
+    dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
+    dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
 }
 
 void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
@@ -1309,7 +1332,6 @@ void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
         RRDCONTEXT *rc;
         dfe_start_write(old, rc) {
             dictionary_del_having_write_lock(old, string2str(rc->id));
-            rrdset_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
         }
         dfe_done(rc);
         dictionary_destroy(old);
@@ -1322,7 +1344,6 @@ void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
         RRDCONTEXT *rc;
         dfe_start_write(old, rc) {
             dictionary_del_having_write_lock(old, string2str(rc->id));
-            rrdset_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
         }
         dfe_done(rc);
         dictionary_destroy(old);
@@ -2381,6 +2402,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
 
         if(unlikely(rrdcontext_should_be_deleted(rc))) {
             if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+            rrdcontext_dequeue_from_post_processing(rc);
             rrdcontext_delete_from_sql_unsafe(rc);
 
             if(dictionary_del((DICTIONARY *)host->rrdctx, string2str(rc->id)) != 0)
@@ -2669,17 +2691,8 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
     if(unlikely(rrd_flag_is_updated(rc) && rc->rrdhost->rrdctx_hub_queue)) {
         if(check_if_cloud_version_changed_unsafe(rc, false)) {
             rc->version = rrdcontext_get_next_version(rc);
-
-            if(rrd_flag_check(rc, RRD_FLAG_QUEUED_FOR_HUB)) {
-                rc->queue.queued_ut = now_realtime_usec();
-                rc->queue.queued_flags |= rrd_flags_get(rc);
-            }
-            else {
-                rc->queue.queued_ut = now_realtime_usec();
-                rc->queue.queued_flags = rrd_flags_get(rc);
-                rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
-                dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx_hub_queue, string2str(rc->id), rc, sizeof(*rc));
-            }
+            dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx_hub_queue,
+                           string2str(rc->id), rc, sizeof(*rc));
         }
     }
 
@@ -2691,8 +2704,10 @@ static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *fun
     if(unlikely(!rc->rrdhost->rrdctx_post_processing_queue)) return;
 
     if(!rrd_flag_check(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING)) {
-        rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
-        dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx_post_processing_queue, string2str(rc->id), rc, sizeof(*rc));
+        dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx_post_processing_queue,
+                       string2str(rc->id),
+                       rc,
+                       sizeof(*rc));
 
 #ifdef NETDATA_INTERNAL_CHECKS
         {
@@ -2711,14 +2726,11 @@ static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *fun
             buffer_free(wb_flags);
         }
 #endif
-
     }
 }
 
 static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) {
     if(unlikely(!rc->rrdhost->rrdctx_post_processing_queue)) return;
-
-    rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
     dictionary_del((DICTIONARY *)rc->rrdhost->rrdctx_post_processing_queue, string2str(rc->id));
 }
 
@@ -2884,6 +2896,10 @@ static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc
     return dispatch_ut;
 }
 
+static void rrdcontext_dequeue_from_hub_queue(RRDCONTEXT *rc) {
+    dictionary_del((DICTIONARY *)rc->rrdhost->rrdctx_hub_queue, string2str(rc->id));
+}
+
 static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now_ut) {
 
     // check if we have received a streaming command for this host
@@ -2938,12 +2954,9 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
             else
                 rc->version = rc->hub.version;
 
-            // remove the queued flag, so that it can be queued again
-            rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
-
             // remove it from the queue
             worker_is_busy(WORKER_JOB_DEQUEUE);
-            dictionary_del((DICTIONARY *)host->rrdctx_hub_queue, string2str(rc->id));
+            rrdcontext_dequeue_from_hub_queue(rc);
 
             if(unlikely(rrdcontext_should_be_deleted(rc))) {
                 // this is a deleted context - delete it forever...

+ 6 - 1
libnetdata/dictionary/dictionary.c

@@ -914,6 +914,10 @@ static NAME_VALUE *dictionary_set_name_value_unsafe(DICTIONARY *dict, const char
         // so, either we will return the old one
         // or overwrite the value, depending on dictionary flags
 
+        // We should not compare the values here!
+        // even if they are the same, we have to do the whole job
+        // so that the callbacks will be called.
+
         nv = *pnv;
 
         if(!(dict->flags & DICTIONARY_FLAG_DONT_OVERWRITE_VALUE)) {
@@ -927,7 +931,8 @@ static NAME_VALUE *dictionary_set_name_value_unsafe(DICTIONARY *dict, const char
         }
 
         else {
-            // make sure this flag is not set
+            // we did really nothing!
+            // make sure this flag is not set.
             nv->flags &= ~NAME_VALUE_FLAG_NEW_OR_UPDATED;
         }
     }