Browse Source

/api/v2/data - multi-host/context/instance/dimension/label queries (#14564)

* fundamentals for having /api/v2/ working

* use an atomic to prevent writing to internal pipe too much

* first attempt of multi-node, multi-context, multi-chart, multi-dimension queries

* v2 jsonwrap

* first attempt for group by

* cleaned up RRDR and fixed group by

* improvements to /api/v2/api

* query instance may be realloced, so pointers to it get invalid; solved memory leaks

* count of quried metrics in summary information

* provide detailed information about selected, excluded, queried and failed metrics for each entity

* select instances by fqdn too

* add timing information to json output

* link charts to rrdcontexts, if a query comes in and it is found unlinked

* calculate min, max, sum, average, volume, count per metric

* api v2 parameters naming

* renders alerts and units

* render machine_guid and node_id in all sections it is relevant

* unified keys

* group by now takes into account units and when there are multiple units involved, it creates a dimension per unit

* request and detailed are hidden behind an option

* summary includes only a flattened list of alerts

* alert counts per host and instance

* count of grouped metrics per dimension

* added contexts to summary

* added chart title

* added dimension priorities and chart type

* support for multiple group by at the same time

* minor fixes

* labels are now a tree

* keys uniformity

* filtering by alerts, both having a specific alert and having a specific alert in a specific status

* added scope of hosts and contexts

* count of instances on contexts and hosts

* make the api return valid responses even when the response contains no data

* calculate average and contribution % for every item in the summary

* fix compilation warnings

* fix compilation warnings - again
Costa Tsaousis 2 years ago
parent
commit
1cfad181a8
10 changed files with 820 additions and 262 deletions
  1. 4 0
      CMakeLists.txt
  2. 4 0
      Makefile.am
  3. 1 1
      daemon/common.h
  4. 19 17
      database/rrd.h
  5. 494 172
      database/rrdcontext.c
  6. 174 19
      database/rrdcontext.h
  7. 40 0
      database/rrdlabels.c
  8. 67 49
      streaming/replication.c
  9. 5 0
      streaming/rrdpush.h
  10. 12 4
      streaming/sender.c

+ 4 - 0
CMakeLists.txt

@@ -796,8 +796,12 @@ set(WEB_PLUGIN_FILES
         )
 
 set(API_PLUGIN_FILES
+        web/api/web_api.c
+        web/api/web_api.h
         web/api/web_api_v1.c
         web/api/web_api_v1.h
+        web/api/web_api_v2.c
+        web/api/web_api_v2.h
         web/api/badges/web_buffer_svg.c
         web/api/badges/web_buffer_svg.h
         web/api/exporters/allmetrics.c

+ 4 - 0
Makefile.am

@@ -628,8 +628,12 @@ API_PLUGIN_FILES = \
     web/api/formatters/rrdset2json.h \
     web/api/health/health_cmdapi.c \
     web/api/health/health_cmdapi.h \
+    web/api/web_api.c \
+    web/api/web_api.h \
     web/api/web_api_v1.c \
     web/api/web_api_v1.h \
+    web/api/web_api_v2.c \
+    web/api/web_api_v2.h \
     $(NULL)
 
 STREAMING_PLUGIN_FILES = \

+ 1 - 1
daemon/common.h

@@ -58,7 +58,7 @@
 #include "exporting/exporting_engine.h"
 
 // the netdata API
-#include "web/api/web_api_v1.h"
+#include "web/server/web_client.h"
 
 // all data collection plugins
 #include "collectors/all.h"

+ 19 - 17
database/rrd.h

@@ -132,6 +132,22 @@ typedef struct storage_point {
     SN_FLAGS flags;         // flags stored with the point
 } STORAGE_POINT;
 
+// ----------------------------------------------------------------------------
+// chart types
+
+typedef enum __attribute__ ((__packed__)) rrdset_type {
+    RRDSET_TYPE_LINE    = 0,
+    RRDSET_TYPE_AREA    = 1,
+    RRDSET_TYPE_STACKED = 2,
+} RRDSET_TYPE;
+
+#define RRDSET_TYPE_LINE_NAME "line"
+#define RRDSET_TYPE_AREA_NAME "area"
+#define RRDSET_TYPE_STACKED_NAME "stacked"
+
+RRDSET_TYPE rrdset_type_id(const char *name);
+const char *rrdset_type_name(RRDSET_TYPE chart_type);
+
 #include "rrdcontext.h"
 
 extern bool unittest_running;
@@ -177,23 +193,6 @@ extern bool ieee754_doubles;
 typedef long long total_number;
 #define TOTAL_NUMBER_FORMAT "%lld"
 
-// ----------------------------------------------------------------------------
-// chart types
-
-typedef enum __attribute__ ((__packed__)) rrdset_type {
-    RRDSET_TYPE_LINE    = 0,
-    RRDSET_TYPE_AREA    = 1,
-    RRDSET_TYPE_STACKED = 2,
-} RRDSET_TYPE;
-
-#define RRDSET_TYPE_LINE_NAME "line"
-#define RRDSET_TYPE_AREA_NAME "area"
-#define RRDSET_TYPE_STACKED_NAME "stacked"
-
-RRDSET_TYPE rrdset_type_id(const char *name);
-const char *rrdset_type_name(RRDSET_TYPE chart_type);
-
-
 // ----------------------------------------------------------------------------
 // algorithms types
 
@@ -285,6 +284,9 @@ void rrdlabels_add_pair(DICTIONARY *dict, const char *string, RRDLABEL_SRC ls);
 void rrdlabels_get_value_to_buffer_or_null(DICTIONARY *labels, BUFFER *wb, const char *key, const char *quote, const char *null);
 void rrdlabels_value_to_buffer_array_item_or_null(DICTIONARY *labels, BUFFER *wb, const char *key);
 void rrdlabels_get_value_strdup_or_null(DICTIONARY *labels, char **value, const char *key);
+void rrdlabels_get_value_strcpyz(DICTIONARY *labels, char *dst, size_t dst_len, const char *key);
+STRING *rrdlabels_get_value_string_dup(DICTIONARY *labels, const char *key);
+STRING *rrdlabels_get_value_to_buffer_or_unset(DICTIONARY *labels, BUFFER *wb, const char *key, const char *unset);
 void rrdlabels_flush(DICTIONARY *labels_dict);
 
 void rrdlabels_unmark_all(DICTIONARY *labels);

+ 494 - 172
database/rrdcontext.c

@@ -325,6 +325,16 @@ const char *rrdmetric_acquired_name(RRDMETRIC_ACQUIRED *rma) {
     return string2str(rm->name);
 }
 
+STRING *rrdmetric_acquired_id_dup(RRDMETRIC_ACQUIRED *rma) {
+    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+    return string_dup(rm->id);
+}
+
+STRING *rrdmetric_acquired_name_dup(RRDMETRIC_ACQUIRED *rma) {
+    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+    return string_dup(rm->name);
+}
+
 NETDATA_DOUBLE rrdmetric_acquired_last_stored_value(RRDMETRIC_ACQUIRED *rma) {
     RRDMETRIC *rm = rrdmetric_acquired_value(rma);
 
@@ -361,6 +371,16 @@ const char *rrdinstance_acquired_name(RRDINSTANCE_ACQUIRED *ria) {
     return string2str(ri->name);
 }
 
+const char *rrdinstance_acquired_units(RRDINSTANCE_ACQUIRED *ria) {
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return string2str(ri->units);
+}
+
+STRING *rrdinstance_acquired_units_dup(RRDINSTANCE_ACQUIRED *ria) {
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return string_dup(ri->units);
+}
+
 DICTIONARY *rrdinstance_acquired_labels(RRDINSTANCE_ACQUIRED *ria) {
     RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
     return ri->rrdlabels;
@@ -372,6 +392,11 @@ DICTIONARY *rrdinstance_acquired_functions(RRDINSTANCE_ACQUIRED *ria) {
     return ri->rrdset->functions_view;
 }
 
+RRDHOST *rrdinstance_acquired_rrdhost(RRDINSTANCE_ACQUIRED *ria) {
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return ri->rc->rrdhost;
+}
+
 // ----------------------------------------------------------------------------
 // helper one-liners for RRDCONTEXT
 
@@ -384,6 +409,40 @@ const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) {
     return string2str(rc->id);
 }
 
+bool rrdcontext_acquired_belongs_to_host(RRDCONTEXT_ACQUIRED *rca, RRDHOST *host) {
+    RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+    return rc->rrdhost == host;
+}
+
+bool rrdinstance_acquired_belongs_to_context(RRDINSTANCE_ACQUIRED *ria, RRDCONTEXT_ACQUIRED *rca) {
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+    return ri->rc == rc;
+}
+
+bool rrdmetric_acquired_belongs_to_instance(RRDMETRIC_ACQUIRED *rma, RRDINSTANCE_ACQUIRED *ria) {
+    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return rm->ri == ri;
+}
+
+time_t rrdinstance_acquired_update_every(RRDINSTANCE_ACQUIRED *ria) {
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return ri->update_every_s;
+}
+time_t rrdmetric_acquired_first_entry(RRDMETRIC_ACQUIRED *rma) {
+    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+    return rm->first_time_s;
+}
+time_t rrdmetric_acquired_last_entry(RRDMETRIC_ACQUIRED *rma) {
+    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+
+    if(rrd_flag_check(rm, RRD_FLAG_COLLECTED))
+        return 0;
+
+    return rm->last_time_s;
+}
+
 static inline RRDCONTEXT_ACQUIRED *rrdcontext_acquired_dup(RRDCONTEXT_ACQUIRED *rca) {
     RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
     return (RRDCONTEXT_ACQUIRED *)dictionary_acquired_item_dup((DICTIONARY *)rc->rrdhost->rrdctx, (DICTIONARY_ITEM *)rca);
@@ -2203,33 +2262,38 @@ typedef struct query_target_locals {
 
     RRDSET *st;
 
+    const char *scope_hosts;
+    const char *scope_contexts;
+
     const char *hosts;
     const char *contexts;
     const char *charts;
     const char *dimensions;
     const char *chart_label_key;
-    const char *charts_labels_filter;
+    const char *labels;
+    const char *alerts;
 
     long long after;
     long long before;
     bool match_ids;
     bool match_names;
 
-    RRDHOST *host;
-    RRDCONTEXT_ACQUIRED *rca;
-    RRDINSTANCE_ACQUIRED *ria;
-
     size_t metrics_skipped_due_to_not_matching_timeframe;
 } QUERY_TARGET_LOCALS;
 
 static __thread QUERY_TARGET thread_query_target = {};
 void query_target_release(QUERY_TARGET *qt) {
-    if(unlikely(!qt)) return;
-    if(unlikely(!qt->used)) return;
+    if(unlikely(!qt || !qt->used)) return;
+
+    simple_pattern_free(qt->hosts.scope_pattern);
+    qt->hosts.scope_pattern = NULL;
 
     simple_pattern_free(qt->hosts.pattern);
     qt->hosts.pattern = NULL;
 
+    simple_pattern_free(qt->contexts.scope_pattern);
+    qt->contexts.scope_pattern = NULL;
+
     simple_pattern_free(qt->contexts.pattern);
     qt->contexts.pattern = NULL;
 
@@ -2239,82 +2303,83 @@ void query_target_release(QUERY_TARGET *qt) {
     simple_pattern_free(qt->instances.chart_label_key_pattern);
     qt->instances.chart_label_key_pattern = NULL;
 
-    simple_pattern_free(qt->instances.charts_labels_filter_pattern);
-    qt->instances.charts_labels_filter_pattern = NULL;
+    simple_pattern_free(qt->instances.labels_pattern);
+    qt->instances.labels_pattern = NULL;
 
     simple_pattern_free(qt->query.pattern);
     qt->query.pattern = NULL;
 
     // release the query
     for(size_t i = 0, used = qt->query.used; i < used ;i++) {
-        string_freez(qt->query.array[i].dimension.id);
-        qt->query.array[i].dimension.id = NULL;
-
-        string_freez(qt->query.array[i].dimension.name);
-        qt->query.array[i].dimension.name = NULL;
-
-        string_freez(qt->query.array[i].chart.id);
-        qt->query.array[i].chart.id = NULL;
-
-        string_freez(qt->query.array[i].chart.name);
-        qt->query.array[i].chart.name = NULL;
+        QUERY_METRIC *qm = query_metric(qt, i);
 
         // reset the plans
-        for(size_t p = 0; p < qt->query.array[i].plan.used; p++) {
-            internal_fatal(qt->query.array[i].plan.array[p].initialized &&
-                            !qt->query.array[i].plan.array[p].finalized,
+        for(size_t p = 0; p < qm->plan.used; p++) {
+            internal_fatal(qm->plan.array[p].initialized &&
+                            !qm->plan.array[p].finalized,
                            "QUERY: left-over initialized plan");
 
-            qt->query.array[i].plan.array[p].initialized = false;
-            qt->query.array[i].plan.array[p].finalized = false;
+            qm->plan.array[p].initialized = false;
+            qm->plan.array[p].finalized = false;
         }
-        qt->query.array[i].plan.used = 0;
+        qm->plan.used = 0;
 
         // reset the tiers
         for(size_t tier = 0; tier < storage_tiers ;tier++) {
-            if(qt->query.array[i].tiers[tier].db_metric_handle) {
-                STORAGE_ENGINE *eng = qt->query.array[i].tiers[tier].eng;
-                eng->api.metric_release(qt->query.array[i].tiers[tier].db_metric_handle);
-                qt->query.array[i].tiers[tier].db_metric_handle = NULL;
-                qt->query.array[i].tiers[tier].weight = 0;
-                qt->query.array[i].tiers[tier].eng = NULL;
+            if(qm->tiers[tier].db_metric_handle) {
+                STORAGE_ENGINE *eng = qm->tiers[tier].eng;
+                eng->api.metric_release(qm->tiers[tier].db_metric_handle);
+                qm->tiers[tier].db_metric_handle = NULL;
+                qm->tiers[tier].eng = NULL;
             }
         }
     }
+    qt->query.used = 0;
 
-    // release the metrics
-    for(size_t i = 0, used = qt->metrics.used; i < used ;i++) {
-        rrdmetric_release(qt->metrics.array[i]);
-        qt->metrics.array[i] = NULL;
+    // release the dimensions
+    for(size_t i = 0, used = qt->dimensions.used; i < used ; i++) {
+        QUERY_DIMENSION *qd = query_dimension(qt, i);
+        rrdmetric_release(qd->rma);
+        qd->rma = NULL;
     }
+    qt->dimensions.used = 0;
 
     // release the instances
     for(size_t i = 0, used = qt->instances.used; i < used ;i++) {
-        rrdinstance_release(qt->instances.array[i]);
-        qt->instances.array[i] = NULL;
+        QUERY_INSTANCE *qi = &qt->instances.array[i];
+
+        rrdinstance_release(qi->ria);
+        qi->ria = NULL;
+
+        string_freez(qi->id_fqdn);
+        qi->id_fqdn = NULL;
+
+        string_freez(qi->name_fqdn);
+        qi->name_fqdn = NULL;
     }
+    qt->instances.used = 0;
 
     // release the contexts
     for(size_t i = 0, used = qt->contexts.used; i < used ;i++) {
-        rrdcontext_release(qt->contexts.array[i]);
-        qt->contexts.array[i] = NULL;
+        QUERY_CONTEXT *qc = query_context(qt, i);
+        rrdcontext_release(qc->rca);
+        qc->rca = NULL;
     }
+    qt->contexts.used = 0;
 
     // release the hosts
     for(size_t i = 0, used = qt->hosts.used; i < used ;i++) {
-        qt->hosts.array[i] = NULL;
+        QUERY_HOST *qh = query_host(qt, i);
+        qh->host = NULL;
     }
-
-    qt->query.used = 0;
-    qt->metrics.used = 0;
-    qt->instances.used = 0;
-    qt->contexts.used = 0;
     qt->hosts.used = 0;
 
     qt->db.minimum_latest_update_every_s = 0;
     qt->db.first_time_s = 0;
     qt->db.last_time_s = 0;
 
+    qt->group_by.used = 0;
+
     qt->id[0] = '\0';
 
     qt->used = false;
@@ -2330,10 +2395,10 @@ void query_target_free(void) {
     qt->query.array = NULL;
     qt->query.size = 0;
 
-    __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED);
-    freez(qt->metrics.array);
-    qt->metrics.array = NULL;
-    qt->metrics.size = 0;
+    __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED);
+    freez(qt->dimensions.array);
+    qt->dimensions.array = NULL;
+    qt->dimensions.size = 0;
 
     __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED);
     freez(qt->instances.array);
@@ -2351,31 +2416,17 @@ void query_target_free(void) {
     qt->hosts.size = 0;
 }
 
-static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED *rma, RRDINSTANCE *ri,
-                                    bool queryable_instance) {
+static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, QUERY_HOST *qh, QUERY_CONTEXT *qc,
+                                    QUERY_INSTANCE *qi, QUERY_DIMENSION *qd) {
     QUERY_TARGET *qt = qtl->qt;
-
-    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
-    if(rrd_flag_is_deleted(rm))
-        return;
-
-    if(qt->metrics.used == qt->metrics.size) {
-        size_t old_mem = qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *);
-        qt->metrics.size = (qt->metrics.size) ? qt->metrics.size * 2 : 1;
-        size_t new_mem = qt->metrics.size * sizeof(RRDMETRIC_ACQUIRED *);
-        qt->metrics.array = reallocz(qt->metrics.array, new_mem);
-
-        __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
-    }
-    qt->metrics.array[qt->metrics.used++] = rrdmetric_acquired_dup(rma);
-
-    if(!queryable_instance)
-        return;
+    RRDMETRIC *rm = rrdmetric_acquired_value(qd->rma);
+    RRDINSTANCE *ri = rm->ri;
 
     time_t common_first_time_s = 0;
     time_t common_last_time_s = 0;
     time_t common_update_every_s = 0;
     size_t tiers_added = 0;
+
     struct {
         STORAGE_ENGINE *eng;
         STORAGE_METRIC_HANDLE *db_metric_handle;
@@ -2385,14 +2436,14 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
     } tier_retention[storage_tiers];
 
     for (size_t tier = 0; tier < storage_tiers; tier++) {
-        STORAGE_ENGINE *eng = qtl->host->db[tier].eng;
+        STORAGE_ENGINE *eng = qh->host->db[tier].eng;
         tier_retention[tier].eng = eng;
-        tier_retention[tier].db_update_every_s = (time_t) (qtl->host->db[tier].tier_grouping * ri->update_every_s);
+        tier_retention[tier].db_update_every_s = (time_t) (qh->host->db[tier].tier_grouping * ri->update_every_s);
 
         if(rm->rrddim && rm->rrddim->tiers[tier].db_metric_handle)
             tier_retention[tier].db_metric_handle = eng->api.metric_dup(rm->rrddim->tiers[tier].db_metric_handle);
         else
-            tier_retention[tier].db_metric_handle = eng->api.metric_get(qtl->host->db[tier].instance, &rm->uuid);
+            tier_retention[tier].db_metric_handle = eng->api.metric_get(qh->host->db[tier].instance, &rm->uuid);
 
         if(tier_retention[tier].db_metric_handle) {
             tier_retention[tier].db_first_time_s = tier_retention[tier].eng->api.query_ops.oldest_time_s(tier_retention[tier].db_metric_handle);
@@ -2425,8 +2476,8 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
     bool release_retention = true;
     bool timeframe_matches =
             (tiers_added
-            && (common_first_time_s - common_update_every_s * 2) <= qt->window.before
-            && (common_last_time_s + common_update_every_s * 2) >= qt->window.after
+             && (common_first_time_s - common_update_every_s * 2) <= qt->window.before
+             && (common_last_time_s + common_update_every_s * 2) >= qt->window.after
             ) ? true : false;
 
     if(timeframe_matches) {
@@ -2435,7 +2486,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
         if (rrd_flag_check(rm, RRD_FLAG_HIDDEN)
             || (rm->rrddim && rrddim_option_check(rm->rrddim, RRDDIM_OPTION_HIDDEN))) {
             options |= RRDR_DIMENSION_HIDDEN;
-            options &= ~RRDR_DIMENSION_QUERIED;
+            options &= ~RRDR_DIMENSION_SELECTED;
         }
 
         if (qt->query.pattern) {
@@ -2443,16 +2494,16 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
             // lets see if this dimension is selected
 
             if ((qtl->match_ids   && simple_pattern_matches(qt->query.pattern, string2str(rm->id)))
-             || (qtl->match_names && simple_pattern_matches(qt->query.pattern, string2str(rm->name)))
+                || (qtl->match_names && simple_pattern_matches(qt->query.pattern, string2str(rm->name)))
                     ) {
                 // it matches the pattern
-                options |= (RRDR_DIMENSION_QUERIED | RRDR_DIMENSION_NONZERO);
+                options |= (RRDR_DIMENSION_SELECTED | RRDR_DIMENSION_NONZERO);
                 options &= ~RRDR_DIMENSION_HIDDEN;
             }
             else {
                 // it does not match the pattern
                 options |= RRDR_DIMENSION_HIDDEN;
-                options &= ~RRDR_DIMENSION_QUERIED;
+                options &= ~RRDR_DIMENSION_SELECTED;
             }
         }
         else {
@@ -2460,10 +2511,10 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
             // so this is a selected dimension
             // if it is not hidden
             if(!(options & RRDR_DIMENSION_HIDDEN))
-                options |= RRDR_DIMENSION_QUERIED;
+                options |= RRDR_DIMENSION_SELECTED;
         }
 
-        if((options & RRDR_DIMENSION_HIDDEN) && (options & RRDR_DIMENSION_QUERIED))
+        if((options & RRDR_DIMENSION_HIDDEN) && (options & RRDR_DIMENSION_SELECTED))
             options &= ~RRDR_DIMENSION_HIDDEN;
 
         if(!(options & RRDR_DIMENSION_HIDDEN) || (qt->request.options & RRDR_OPTION_PERCENTAGE)) {
@@ -2474,28 +2525,22 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
                 ri->rrdset->last_accessed_time_s = qtl->start_s;
 
             if (qt->query.used == qt->query.size) {
-                size_t old_mem = qt->query.size * sizeof(QUERY_METRIC);
+                size_t old_mem = qt->query.size * sizeof(*qt->query.array);
                 qt->query.size = (qt->query.size) ? qt->query.size * 2 : 1;
-                size_t new_mem = qt->query.size * sizeof(QUERY_METRIC);
+                size_t new_mem = qt->query.size * sizeof(*qt->query.array);
                 qt->query.array = reallocz(qt->query.array, new_mem);
 
                 __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
             }
             QUERY_METRIC *qm = &qt->query.array[qt->query.used++];
+            memset(qm, 0, sizeof(*qm));
 
-            qm->plan.used = 0;
-            qm->dimension.options = options;
+            qm->status = options;
 
-            qm->link.host = qtl->host;
-            qm->link.rca = qtl->rca;
-            qm->link.ria = qtl->ria;
-            qm->link.rma = rma;
-
-            qm->chart.id = string_dup(ri->id);
-            qm->chart.name = string_dup(ri->name);
-
-            qm->dimension.id = string_dup(rm->id);
-            qm->dimension.name = string_dup(rm->name);
+            qm->link.query_host_id = qh->slot;
+            qm->link.query_context_id = qc->slot;
+            qm->link.query_instance_id = qi->slot;
+            qm->link.query_dimension_id = qd->slot;
 
             if (!qt->db.first_time_s || common_first_time_s < qt->db.first_time_s)
                 qt->db.first_time_s = common_first_time_s;
@@ -2510,11 +2555,29 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
                 qm->tiers[tier].db_last_time_s = tier_retention[tier].db_last_time_s;
                 qm->tiers[tier].db_update_every_s = tier_retention[tier].db_update_every_s;
             }
+
             release_retention = false;
+
+            qi->metrics.selected++;
+            qc->metrics.selected++;
+            qh->metrics.selected++;
+        }
+        else {
+            qi->metrics.excluded++;
+            qc->metrics.excluded++;
+            qh->metrics.excluded++;
+
+            qd->status |= QUERY_STATUS_DIMENSION_HIDDEN;
         }
     }
-    else
+    else {
+        qi->metrics.excluded++;
+        qc->metrics.excluded++;
+        qh->metrics.excluded++;
+
+        qd->status |= QUERY_STATUS_DIMENSION_NODATA;
         qtl->metrics_skipped_due_to_not_matching_timeframe++;
+    }
 
     if(release_retention) {
         // cleanup anything we allocated to the retention we will not use
@@ -2525,7 +2588,180 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED
     }
 }
 
-static void query_target_add_instance(QUERY_TARGET_LOCALS *qtl, RRDINSTANCE_ACQUIRED *ria, bool queryable_instance) {
+static void query_target_add_dimension(QUERY_TARGET_LOCALS *qtl, QUERY_HOST *qh, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi,
+                                       RRDMETRIC_ACQUIRED *rma, bool queryable_instance) {
+    QUERY_TARGET *qt = qtl->qt;
+
+    RRDMETRIC *rm = rrdmetric_acquired_value(rma);
+    if(rrd_flag_is_deleted(rm))
+        return;
+
+    if(qt->dimensions.used == qt->dimensions.size) {
+        size_t old_mem = qt->dimensions.size * sizeof(*qt->dimensions.array);
+        qt->dimensions.size = (qt->dimensions.size) ? qt->dimensions.size * 2 : 1;
+        size_t new_mem = qt->dimensions.size * sizeof(*qt->dimensions.array);
+        qt->dimensions.array = reallocz(qt->dimensions.array, new_mem);
+
+        __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
+    }
+    QUERY_DIMENSION *qd = &qt->dimensions.array[qt->dimensions.used];
+    memset(qd, 0, sizeof(*qd));
+
+    qd->slot = qt->dimensions.used++;
+    qd->rma = rrdmetric_acquired_dup(rma);
+    qd->status = QUERY_STATUS_NONE;
+
+    if(!queryable_instance) {
+        qi->metrics.excluded++;
+        qc->metrics.excluded++;
+        qh->metrics.excluded++;
+        qd->status |= QUERY_STATUS_EXCLUDED;
+        return;
+    }
+
+    query_target_add_metric(qtl, qh, qc, qi, qd);
+}
+
+static inline STRING *rrdinstance_id_fqdn_v1(RRDINSTANCE_ACQUIRED *ria) {
+    if(unlikely(!ria))
+        return NULL;
+
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return string_dup(ri->id);
+}
+
+static inline STRING *rrdinstance_name_fqdn_v1(RRDINSTANCE_ACQUIRED *ria) {
+    if(unlikely(!ria))
+        return NULL;
+
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return string_dup(ri->name);
+}
+
+static inline STRING *rrdinstance_id_fqdn_v2(RRDINSTANCE_ACQUIRED *ria) {
+    if(unlikely(!ria))
+        return NULL;
+
+    char buffer[RRD_ID_LENGTH_MAX + 1];
+
+    RRDHOST *host = rrdinstance_acquired_rrdhost(ria);
+    snprintfz(buffer, RRD_ID_LENGTH_MAX, "%s@%s", rrdinstance_acquired_id(ria), host->machine_guid);
+    return string_strdupz(buffer);
+}
+
+static inline STRING *rrdinstance_name_fqdn_v2(RRDINSTANCE_ACQUIRED *ria) {
+    if(unlikely(!ria))
+        return NULL;
+
+    char buffer[RRD_ID_LENGTH_MAX + 1];
+
+    RRDHOST *host = rrdinstance_acquired_rrdhost(ria);
+    snprintfz(buffer, RRD_ID_LENGTH_MAX, "%s@%s", rrdinstance_acquired_name(ria), rrdhost_hostname(host));
+    return string_strdupz(buffer);
+}
+
+RRDSET *rrdinstance_acquired_rrdset(RRDINSTANCE_ACQUIRED *ria) {
+    RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+    return ri->rrdset;
+}
+
+const char *rrdcontext_acquired_units(RRDCONTEXT_ACQUIRED *rca) {
+    RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+    return string2str(rc->units);
+}
+
+RRDSET_TYPE rrdcontext_acquired_chart_type(RRDCONTEXT_ACQUIRED *rca) {
+    RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+    return rc->chart_type;
+}
+
+const char *rrdcontext_acquired_title(RRDCONTEXT_ACQUIRED *rca) {
+    RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+    return string2str(rc->title);
+}
+
+static void query_target_eval_instance_rrdcalc(QUERY_TARGET_LOCALS *qtl __maybe_unused,
+                                               QUERY_HOST *qh, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi) {
+    RRDSET *st = rrdinstance_acquired_rrdset(qi->ria);
+    if (st) {
+        netdata_rwlock_rdlock(&st->alerts.rwlock);
+        if (st->alerts.base) {
+            for (RRDCALC *rc = st->alerts.base; rc; rc = rc->next) {
+                switch(rc->status) {
+                    case RRDCALC_STATUS_CLEAR:
+                        qi->alerts.clear++;
+                        qc->alerts.clear++;
+                        qh->alerts.clear++;
+                        break;
+
+                    case RRDCALC_STATUS_WARNING:
+                        qi->alerts.warning++;
+                        qc->alerts.warning++;
+                        qh->alerts.warning++;
+                        break;
+
+                    case RRDCALC_STATUS_CRITICAL:
+                        qi->alerts.critical++;
+                        qc->alerts.critical++;
+                        qh->alerts.critical++;
+                        break;
+
+                    default:
+                    case RRDCALC_STATUS_UNINITIALIZED:
+                    case RRDCALC_STATUS_UNDEFINED:
+                    case RRDCALC_STATUS_REMOVED:
+                        qi->alerts.other++;
+                        qc->alerts.other++;
+                        qh->alerts.other++;
+                        break;
+                }
+            }
+        }
+        netdata_rwlock_unlock(&st->alerts.rwlock);
+    }
+}
+
+static bool query_target_match_alert_pattern(QUERY_INSTANCE *qi, SIMPLE_PATTERN *pattern) {
+    if(!pattern)
+        return true;
+
+    RRDSET *st = rrdinstance_acquired_rrdset(qi->ria);
+    if (!st)
+        return false;
+
+    BUFFER *wb = NULL;
+    bool matched = false;
+    netdata_rwlock_rdlock(&st->alerts.rwlock);
+    if (st->alerts.base) {
+        for (RRDCALC *rc = st->alerts.base; rc; rc = rc->next) {
+            if(simple_pattern_matches(pattern, string2str(rc->name))) {
+                matched = true;
+                break;
+            }
+
+            if(!wb)
+                wb = buffer_create(0, NULL);
+            else
+                buffer_flush(wb);
+
+            buffer_fast_strcat(wb, string2str(rc->name), string_strlen(rc->name));
+            buffer_fast_strcat(wb, ":", 1);
+            buffer_strcat(wb, rrdcalc_status2string(rc->status));
+
+            if(simple_pattern_matches(pattern, buffer_tostring(wb))) {
+                matched = true;
+                break;
+            }
+        }
+    }
+    netdata_rwlock_unlock(&st->alerts.rwlock);
+
+    buffer_free(wb);
+    return matched;
+}
+
+static void query_target_add_instance(QUERY_TARGET_LOCALS *qtl, QUERY_HOST *qh, QUERY_CONTEXT *qc,
+                                      RRDINSTANCE_ACQUIRED *ria, bool queryable_instance, bool filter_instances) {
     QUERY_TARGET *qt = qtl->qt;
 
     RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
@@ -2533,47 +2769,94 @@ static void query_target_add_instance(QUERY_TARGET_LOCALS *qtl, RRDINSTANCE_ACQU
         return;
 
     if(qt->instances.used == qt->instances.size) {
-        size_t old_mem = qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *);
+        size_t old_mem = qt->instances.size * sizeof(*qt->instances.array);
         qt->instances.size = (qt->instances.size) ? qt->instances.size * 2 : 1;
-        size_t new_mem = qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *);
+        size_t new_mem = qt->instances.size * sizeof(*qt->instances.array);
         qt->instances.array = reallocz(qt->instances.array, new_mem);
 
         __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
     }
+    QUERY_INSTANCE *qi = &qt->instances.array[qt->instances.used];
+    memset(qi, 0, sizeof(*qi));
 
-    qtl->ria = qt->instances.array[qt->instances.used++] = rrdinstance_acquired_dup(ria);
+    qi->slot = qt->instances.used;
+    qt->instances.used++;
+    qi->ria = rrdinstance_acquired_dup(ria);
+    qi->query_host_id = qh->slot;
+
+    if(qt->request.version <= 1) {
+        qi->id_fqdn = rrdinstance_id_fqdn_v1(ria);
+        qi->name_fqdn = rrdinstance_name_fqdn_v1(ria);
+    }
+    else {
+        qi->id_fqdn = rrdinstance_id_fqdn_v2(ria);
+        qi->name_fqdn = rrdinstance_name_fqdn_v2(ria);
+    }
 
     if(qt->db.minimum_latest_update_every_s == 0 || ri->update_every_s < qt->db.minimum_latest_update_every_s)
         qt->db.minimum_latest_update_every_s = ri->update_every_s;
 
+    if(queryable_instance && filter_instances) {
+        queryable_instance = false;
+        if(!qt->instances.pattern
+           || (qtl->match_ids   && simple_pattern_matches(qt->instances.pattern, string2str(ri->id)))
+           || (qtl->match_ids   && simple_pattern_matches(qt->instances.pattern, string2str(qi->id_fqdn)))
+           || (qtl->match_names && simple_pattern_matches(qt->instances.pattern, string2str(ri->name)))
+           || (qtl->match_names && simple_pattern_matches(qt->instances.pattern, string2str(qi->name_fqdn)))
+                )
+            queryable_instance = true;
+    }
+
     if(queryable_instance) {
         if ((qt->instances.chart_label_key_pattern && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, qt->instances.chart_label_key_pattern, ':')) ||
-            (qt->instances.charts_labels_filter_pattern && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, qt->instances.charts_labels_filter_pattern, ':')))
+            (qt->instances.labels_pattern && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, qt->instances.labels_pattern, ':')))
             queryable_instance = false;
     }
 
+    if(queryable_instance) {
+        if(qt->instances.alerts_pattern && !query_target_match_alert_pattern(qi, qt->instances.alerts_pattern))
+            queryable_instance = false;
+    }
+
+    if(queryable_instance && qt->request.version >= 2)
+        query_target_eval_instance_rrdcalc(qtl, qh, qc, qi);
+
     size_t added = 0;
 
     if(unlikely(qt->request.rma)) {
-        query_target_add_metric(qtl, qt->request.rma, ri, queryable_instance);
+        query_target_add_dimension(qtl, qh, qc, qi, qt->request.rma, queryable_instance);
         added++;
     }
     else {
         RRDMETRIC *rm;
         dfe_start_read(ri->rrdmetrics, rm) {
-            query_target_add_metric(qtl, (RRDMETRIC_ACQUIRED *) rm_dfe.item, ri, queryable_instance);
+            query_target_add_dimension(qtl, qh, qc, qi, (RRDMETRIC_ACQUIRED *) rm_dfe.item, queryable_instance);
             added++;
         }
         dfe_done(rm);
     }
 
     if(!added) {
+        qc->instances.excluded++;
+        qh->instances.excluded++;
+
         qt->instances.used--;
         rrdinstance_release(ria);
+        qi->ria = NULL;
+
+        string_freez(qi->id_fqdn);
+        qi->id_fqdn = NULL;
+
+        string_freez(qi->name_fqdn);
+        qi->name_fqdn = NULL;
+    }
+    else {
+        qc->instances.selected++;
+        qh->instances.selected++;
     }
 }
 
-static void query_target_add_context(QUERY_TARGET_LOCALS *qtl, RRDCONTEXT_ACQUIRED *rca) {
+static void query_target_add_context(QUERY_TARGET_LOCALS *qtl, QUERY_HOST *qh, RRDCONTEXT_ACQUIRED *rca, bool queryable_context) {
     QUERY_TARGET *qt = qtl->qt;
 
     RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
@@ -2581,35 +2864,31 @@ static void query_target_add_context(QUERY_TARGET_LOCALS *qtl, RRDCONTEXT_ACQUIR
         return;
 
     if(qt->contexts.used == qt->contexts.size) {
-        size_t old_mem = qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *);
+        size_t old_mem = qt->contexts.size * sizeof(*qt->contexts.array);
         qt->contexts.size = (qt->contexts.size) ? qt->contexts.size * 2 : 1;
-        size_t new_mem = qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *);
+        size_t new_mem = qt->contexts.size * sizeof(*qt->contexts.array);
         qt->contexts.array = reallocz(qt->contexts.array, new_mem);
 
         __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
     }
-    qtl->rca = qt->contexts.array[qt->contexts.used++] = rrdcontext_acquired_dup(rca);
+    QUERY_CONTEXT *qc = &qt->contexts.array[qt->contexts.used];
+    memset(qc, 0, sizeof(*qc));
+    qc->slot = qt->contexts.used++;
+    qc->rca =  rrdcontext_acquired_dup(rca);
 
     size_t added = 0;
     if(unlikely(qt->request.ria)) {
-        query_target_add_instance(qtl, qt->request.ria, true);
+         query_target_add_instance(qtl, qh, qc, qt->request.ria, queryable_context, false);
         added++;
     }
     else if(unlikely(qtl->st && qtl->st->rrdcontext == rca && qtl->st->rrdinstance)) {
-        query_target_add_instance(qtl, qtl->st->rrdinstance, true);
+        query_target_add_instance(qtl, qh, qc, qtl->st->rrdinstance, queryable_context, false);
         added++;
     }
     else {
         RRDINSTANCE *ri;
         dfe_start_read(rc->rrdinstances, ri) {
-            bool queryable_instance = false;
-            if(!qt->instances.pattern
-                || (qtl->match_ids   && simple_pattern_matches(qt->instances.pattern, string2str(ri->id)))
-                || (qtl->match_names && simple_pattern_matches(qt->instances.pattern, string2str(ri->name)))
-                )
-                queryable_instance = true;
-
-            query_target_add_instance(qtl, (RRDINSTANCE_ACQUIRED *)ri_dfe.item, queryable_instance);
+            query_target_add_instance(qtl, qh, qc, (RRDINSTANCE_ACQUIRED *)ri_dfe.item, queryable_context, true);
             added++;
         }
         dfe_done(ri);
@@ -2621,56 +2900,79 @@ static void query_target_add_context(QUERY_TARGET_LOCALS *qtl, RRDCONTEXT_ACQUIR
     }
 }
 
-static void query_target_add_host(QUERY_TARGET_LOCALS *qtl, RRDHOST *host) {
+static void query_target_add_host(QUERY_TARGET_LOCALS *qtl, RRDHOST *host, bool queryable_host) {
     QUERY_TARGET *qt = qtl->qt;
 
     if(qt->hosts.used == qt->hosts.size) {
-        size_t old_mem = qt->hosts.size * sizeof(RRDHOST *);
+        size_t old_mem = qt->hosts.size * sizeof(*qt->hosts.array);
         qt->hosts.size = (qt->hosts.size) ? qt->hosts.size * 2 : 1;
-        size_t new_mem = qt->hosts.size * sizeof(RRDHOST *);
+        size_t new_mem = qt->hosts.size * sizeof(*qt->hosts.array);
         qt->hosts.array = reallocz(qt->hosts.array, new_mem);
 
         __atomic_add_fetch(&netdata_buffers_statistics.query_targets_size, new_mem - old_mem, __ATOMIC_RELAXED);
     }
-    qtl->host = qt->hosts.array[qt->hosts.used++] = host;
+    QUERY_HOST *qh = &qt->hosts.array[qt->hosts.used];
+    memset(qh, 0, sizeof(*qh));
+    qh->slot = qt->hosts.used++;
+    qh->host = host;
+
+    if(host->node_id)
+        uuid_unparse_lower(*host->node_id, qh->node_id);
+    else
+        qh->node_id[0] = '\0';
 
     // is the chart given valid?
     if(unlikely(qtl->st && (!qtl->st->rrdinstance || !qtl->st->rrdcontext))) {
-        error("QUERY TARGET: RRDSET '%s' given, because it is not linked to rrdcontext structures. Switching to context query.", rrdset_name(qtl->st));
+        error("QUERY TARGET: RRDSET '%s' given, but it is not linked to rrdcontext structures. Linking it now.", rrdset_name(qtl->st));
+        rrdinstance_from_rrdset(qtl->st);
 
-        if(!is_valid_sp(qtl->charts))
-            qtl->charts = rrdset_name(qtl->st);
+        if(unlikely(qtl->st && (!qtl->st->rrdinstance || !qtl->st->rrdcontext))) {
+            error("QUERY TARGET: RRDSET '%s' given, but failed to be linked to rrdcontext structures. Switching to context query.",
+                  rrdset_name(qtl->st));
 
-        qtl->st = NULL;
+            if (!is_valid_sp(qtl->charts))
+                qtl->charts = rrdset_name(qtl->st);
+
+            qtl->st = NULL;
+        }
     }
 
     size_t added = 0;
     if(unlikely(qt->request.rca)) {
-        query_target_add_context(qtl, qt->request.rca);
+        query_target_add_context(qtl, qh, qt->request.rca, true);
         added++;
     }
     else if(unlikely(qtl->st)) {
         // single chart data queries
-        query_target_add_context(qtl, qtl->st->rrdcontext);
+        query_target_add_context(qtl, qh, qtl->st->rrdcontext, true);
         added++;
     }
     else {
         // context pattern queries
-        RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item((DICTIONARY *)qtl->host->rrdctx, qtl->contexts);
+        RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item((DICTIONARY *)host->rrdctx, qtl->scope_contexts);
         if(likely(rca)) {
             // we found it!
-            query_target_add_context(qtl, rca);
+
+            bool queryable_context = queryable_host;
+            if(queryable_context && qt->contexts.pattern && !simple_pattern_matches(qt->contexts.pattern, rrdcontext_acquired_id(rca)))
+                queryable_context = false;
+
+            query_target_add_context(qtl, qh, rca, queryable_context);
             rrdcontext_release(rca);
             added++;
         }
         else {
             // Probably it is a pattern, we need to search for it...
             RRDCONTEXT *rc;
-            dfe_start_read((DICTIONARY *)qtl->host->rrdctx, rc) {
-                if(qt->contexts.pattern && !simple_pattern_matches(qt->contexts.pattern, string2str(rc->id)))
+            dfe_start_read((DICTIONARY *)host->rrdctx, rc) {
+                if(qt->contexts.scope_pattern && !simple_pattern_matches(qt->contexts.scope_pattern, string2str(rc->id)))
                     continue;
 
-                query_target_add_context(qtl, (RRDCONTEXT_ACQUIRED *)rc_dfe.item);
+                bool queryable_context = queryable_host;
+                if(queryable_context && qt->contexts.pattern && !simple_pattern_matches(qt->contexts.pattern, string2str(rc->id)))
+                    queryable_context = false;
+
+                query_target_add_context(qtl, qh, (RRDCONTEXT_ACQUIRED *)rc_dfe.item, queryable_context);
                 added++;
             }
             dfe_done(rc);
@@ -2695,7 +2997,7 @@ void query_target_generate_name(QUERY_TARGET *qt) {
         snprintfz(tier_buffer, 20, "/tier:%zu", qt->request.tier);
 
     if(qt->request.st)
-        snprintfz(qt->id, MAX_QUERY_TARGET_ID_LENGTH, "chart://host:%s/instance:%s/dimensions:%s/after:%lld/before:%lld/points:%zu/group:%s%s/options:%s%s%s"
+        snprintfz(qt->id, MAX_QUERY_TARGET_ID_LENGTH, "chart://hosts:%s/instance:%s/dimensions:%s/after:%lld/before:%lld/points:%zu/group:%s%s/options:%s%s%s"
                   , rrdhost_hostname(qt->request.st->rrdhost)
                   , rrdset_name(qt->request.st)
                   , (qt->request.dimensions) ? qt->request.dimensions : "*"
@@ -2709,7 +3011,7 @@ void query_target_generate_name(QUERY_TARGET *qt) {
                   , tier_buffer
                   );
     else if(qt->request.host && qt->request.rca && qt->request.ria && qt->request.rma)
-        snprintfz(qt->id, MAX_QUERY_TARGET_ID_LENGTH, "metric://host:%s/context:%s/instance:%s/dimension:%s/after:%lld/before:%lld/points:%zu/group:%s%s/options:%s%s%s"
+        snprintfz(qt->id, MAX_QUERY_TARGET_ID_LENGTH, "metric://hosts:%s/context:%s/instance:%s/dimension:%s/after:%lld/before:%lld/points:%zu/group:%s%s/options:%s%s%s"
                 , rrdhost_hostname(qt->request.host)
                 , rrdcontext_acquired_id(qt->request.rca)
                 , rrdinstance_acquired_id(qt->request.ria)
@@ -2724,7 +3026,7 @@ void query_target_generate_name(QUERY_TARGET *qt) {
                 , tier_buffer
                 );
     else
-        snprintfz(qt->id, MAX_QUERY_TARGET_ID_LENGTH, "context://host:%s/contexts:%s/instances:%s/dimensions:%s/after:%lld/before:%lld/points:%zu/group:%s%s/options:%s%s%s"
+        snprintfz(qt->id, MAX_QUERY_TARGET_ID_LENGTH, "context://hosts:%s/contexts:%s/instances:%s/dimensions:%s/after:%lld/before:%lld/points:%zu/group:%s%s/options:%s%s%s"
                 , (qt->request.host) ? rrdhost_hostname(qt->request.host) : ((qt->request.hosts) ? qt->request.hosts : "*")
                 , (qt->request.contexts) ? qt->request.contexts : "*"
                 , (qt->request.charts) ? qt->request.charts : "*"
@@ -2754,6 +3056,19 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
     qt->used = true;
     qt->queries++;
 
+    if(!qtr->received_ut)
+        qtr->received_ut = now_monotonic_usec();
+
+    qt->timings.received_ut = qtr->received_ut;
+
+    if(qtr->hosts && !qtr->scope_hosts)
+        qtr->scope_hosts = qtr->hosts;
+
+    if(qtr->contexts && !qtr->scope_contexts)
+        qtr->scope_contexts = qtr->contexts;
+
+    memset(&qt->query_stats, 0, sizeof(qt->query_stats));
+
     // copy the request into query_thread_target
     qt->request = *qtr;
 
@@ -2766,25 +3081,34 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
     QUERY_TARGET_LOCALS qtl = {
         .qt = qt,
         .start_s = now_realtime_sec(),
-        .host = qt->request.host,
         .st = qt->request.st,
+        .scope_hosts = qt->request.scope_hosts,
+        .scope_contexts = qt->request.scope_contexts,
         .hosts = qt->request.hosts,
         .contexts = qt->request.contexts,
         .charts = qt->request.charts,
         .dimensions = qt->request.dimensions,
         .chart_label_key = qt->request.chart_label_key,
-        .charts_labels_filter = qt->request.charts_labels_filter,
+        .labels = qt->request.labels,
+        .alerts = qt->request.alerts,
     };
 
+    RRDHOST *host = qt->request.host;
+
     qt->db.minimum_latest_update_every_s = 0; // it will be updated by query_target_add_query()
 
     // prepare all the patterns
     qt->hosts.pattern = is_valid_sp(qtl.hosts) ? simple_pattern_create(qtl.hosts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
+    qt->hosts.scope_pattern = is_valid_sp(qtl.scope_hosts) ? simple_pattern_create(qtl.scope_hosts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
+
     qt->contexts.pattern = is_valid_sp(qtl.contexts) ? simple_pattern_create(qtl.contexts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
+    qt->contexts.scope_pattern = is_valid_sp(qtl.scope_contexts) ? simple_pattern_create(qtl.scope_contexts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
+
     qt->instances.pattern = is_valid_sp(qtl.charts) ? simple_pattern_create(qtl.charts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
     qt->query.pattern = is_valid_sp(qtl.dimensions) ? simple_pattern_create(qtl.dimensions, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
     qt->instances.chart_label_key_pattern = is_valid_sp(qtl.chart_label_key) ? simple_pattern_create(qtl.chart_label_key, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
-    qt->instances.charts_labels_filter_pattern = is_valid_sp(qtl.charts_labels_filter) ? simple_pattern_create(qtl.charts_labels_filter, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
+    qt->instances.labels_pattern = is_valid_sp(qtl.labels) ? simple_pattern_create(qtl.labels, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
+    qt->instances.alerts_pattern = is_valid_sp(qtl.alerts) ? simple_pattern_create(qtl.alerts, ",|\t\r\n\f\v", SIMPLE_PATTERN_EXACT) : NULL;
 
     qtl.match_ids = qt->request.options & RRDR_OPTION_MATCH_IDS;
     qtl.match_names = qt->request.options & RRDR_OPTION_MATCH_NAMES;
@@ -2793,58 +3117,56 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
 
     // verify that the chart belongs to the host we are interested
     if(qtl.st) {
-        if (!qtl.host) {
+        if (!host) {
             // It is NULL, set it ourselves.
-            qtl.host = qtl.st->rrdhost;
+            host = qtl.st->rrdhost;
         }
-        else if (unlikely(qtl.host != qtl.st->rrdhost)) {
+        else if (unlikely(host != qtl.st->rrdhost)) {
             // Oops! A different host!
             error("QUERY TARGET: RRDSET '%s' given does not belong to host '%s'. Switching query host to '%s'",
-                  rrdset_name(qtl.st), rrdhost_hostname(qtl.host), rrdhost_hostname(qtl.st->rrdhost));
-            qtl.host = qtl.st->rrdhost;
+                  rrdset_name(qtl.st), rrdhost_hostname(host), rrdhost_hostname(qtl.st->rrdhost));
+            host = qtl.st->rrdhost;
         }
     }
 
-    if(qtl.host) {
+    if(host) {
         // single host query
-        query_target_add_host(&qtl, qtl.host);
-        qtl.hosts = rrdhost_hostname(qtl.host);
+        query_target_add_host(&qtl, host, true);
+        qtl.hosts = rrdhost_hostname(host);
     }
     else {
+        char uuid[UUID_STR_LEN];
+
         // multi host query
         rrd_rdlock();
-        rrdhost_foreach_read(qtl.host) {
-            if(!qt->hosts.pattern || simple_pattern_matches(qt->hosts.pattern, rrdhost_hostname(qtl.host)))
-                query_target_add_host(&qtl, qtl.host);
+        rrdhost_foreach_read(host) {
+
+            if(!host->node_id)
+                uuid_unparse_lower(*host->node_id, uuid);
+            else
+                uuid[0] = '\0';
+
+            if(!qt->hosts.scope_pattern ||
+                simple_pattern_matches(qt->hosts.scope_pattern, rrdhost_hostname(host)) ||
+                simple_pattern_matches(qt->hosts.scope_pattern, host->machine_guid) ||
+                (*uuid && simple_pattern_matches(qt->hosts.scope_pattern, uuid))) {
+
+                bool queryable_host = false;
+                if(!qt->hosts.pattern ||
+                    simple_pattern_matches(qt->hosts.pattern, rrdhost_hostname(host)) ||
+                    simple_pattern_matches(qt->hosts.pattern, host->machine_guid) ||
+                    simple_pattern_matches(qt->hosts.pattern, uuid))
+                    queryable_host = true;
+
+                query_target_add_host(&qtl, host, queryable_host);
+            }
         }
         rrd_unlock();
     }
 
-    // make sure everything is good
-    if(!qt->query.used || !qt->metrics.used || !qt->instances.used || !qt->contexts.used || !qt->hosts.used) {
-        internal_error(
-                true
-                , "QUERY TARGET: query '%s' does not have all the data required. "
-                  "Matched %u hosts, %u contexts, %u instances, %u dimensions, %u metrics to query, "
-                  "%zu metrics skipped because they don't have data in the desired time-frame. "
-                  "Aborting it."
-                , qt->id
-                , qt->hosts.used
-                , qt->contexts.used
-                , qt->instances.used
-                , qt->metrics.used
-                , qt->query.used
-                , qtl.metrics_skipped_due_to_not_matching_timeframe
-                );
-
-        query_target_release(qt);
-        return NULL;
-    }
+    query_target_calculate_window(qt);
 
-    if(!query_target_calculate_window(qt)) {
-        query_target_release(qt);
-        return NULL;
-    }
+    qt->timings.preprocessed_ut = now_monotonic_usec();
 
     return qt;
 }

+ 174 - 19
database/rrdcontext.h

@@ -25,12 +25,30 @@ typedef struct rrdcontext_acquired RRDCONTEXT_ACQUIRED;
 
 const char *rrdmetric_acquired_id(RRDMETRIC_ACQUIRED *rma);
 const char *rrdmetric_acquired_name(RRDMETRIC_ACQUIRED *rma);
+
+STRING *rrdmetric_acquired_id_dup(RRDMETRIC_ACQUIRED *rma);
+STRING *rrdmetric_acquired_name_dup(RRDMETRIC_ACQUIRED *rma);
+
 NETDATA_DOUBLE rrdmetric_acquired_last_stored_value(RRDMETRIC_ACQUIRED *rma);
+time_t rrdmetric_acquired_first_entry(RRDMETRIC_ACQUIRED *rma);
+time_t rrdmetric_acquired_last_entry(RRDMETRIC_ACQUIRED *rma);
+bool rrdmetric_acquired_belongs_to_instance(RRDMETRIC_ACQUIRED *rma, RRDINSTANCE_ACQUIRED *ria);
 
 const char *rrdinstance_acquired_id(RRDINSTANCE_ACQUIRED *ria);
 const char *rrdinstance_acquired_name(RRDINSTANCE_ACQUIRED *ria);
+const char *rrdinstance_acquired_units(RRDINSTANCE_ACQUIRED *ria);
+STRING *rrdinstance_acquired_units_dup(RRDINSTANCE_ACQUIRED *ria);
 DICTIONARY *rrdinstance_acquired_labels(RRDINSTANCE_ACQUIRED *ria);
 DICTIONARY *rrdinstance_acquired_functions(RRDINSTANCE_ACQUIRED *ria);
+RRDHOST *rrdinstance_acquired_rrdhost(RRDINSTANCE_ACQUIRED *ria);
+RRDSET *rrdinstance_acquired_rrdset(RRDINSTANCE_ACQUIRED *ria);
+
+bool rrdinstance_acquired_belongs_to_context(RRDINSTANCE_ACQUIRED *ria, RRDCONTEXT_ACQUIRED *rca);
+time_t rrdinstance_acquired_update_every(RRDINSTANCE_ACQUIRED *ria);
+
+const char *rrdcontext_acquired_units(RRDCONTEXT_ACQUIRED *rca);
+const char *rrdcontext_acquired_title(RRDCONTEXT_ACQUIRED *rca);
+RRDSET_TYPE rrdcontext_acquired_chart_type(RRDCONTEXT_ACQUIRED *rca);
 
 // ----------------------------------------------------------------------------
 // public API for rrdhost
@@ -67,6 +85,7 @@ int rrdcontexts_to_json(RRDHOST *host, BUFFER *wb, time_t after, time_t before,
 // public API for rrdcontexts
 
 const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca);
+bool rrdcontext_acquired_belongs_to_host(RRDCONTEXT_ACQUIRED *rca, RRDHOST *host);
 
 // ----------------------------------------------------------------------------
 // public API for rrddims
@@ -118,6 +137,15 @@ DICTIONARY *rrdcontext_all_metrics_to_dict(RRDHOST *host, SIMPLE_PATTERN *contex
 // ----------------------------------------------------------------------------
 // public API for queries
 
+typedef enum __attribute__ ((__packed__)) {
+    QUERY_STATUS_NONE             = 0,
+    QUERY_STATUS_QUERIED          = (1 << 0),
+    QUERY_STATUS_DIMENSION_NODATA = (1 << 1),
+    QUERY_STATUS_DIMENSION_HIDDEN = (1 << 2),
+    QUERY_STATUS_EXCLUDED         = (1 << 3),
+    QUERY_STATUS_FAILED           = (1 << 4),
+} QUERY_STATUS;
+
 typedef struct query_plan_entry {
     size_t tier;
     time_t after;
@@ -134,7 +162,75 @@ typedef struct query_plan_entry {
 
 #define QUERY_PLANS_MAX (RRD_STORAGE_TIERS * 2)
 
+struct query_instances_counts {
+    size_t selected;
+    size_t excluded;
+};
+
+struct query_metrics_counts {
+    size_t selected;
+    size_t excluded;
+    size_t queried;
+    size_t failed;
+};
+
+struct query_alerts_counts {
+    size_t clear;
+    size_t warning;
+    size_t critical;
+    size_t other;
+};
+
+struct query_data_statistics {
+    size_t count;
+    NETDATA_DOUBLE min;
+    NETDATA_DOUBLE max;
+    NETDATA_DOUBLE sum;
+    NETDATA_DOUBLE volume;
+};
+
+typedef struct query_host {
+    uint32_t slot;
+    RRDHOST *host;
+    char node_id[UUID_STR_LEN];
+
+    struct query_data_statistics query_stats;
+    struct query_instances_counts instances;
+    struct query_metrics_counts metrics;
+    struct query_alerts_counts alerts;
+} QUERY_HOST;
+
+typedef struct query_context {
+    uint32_t slot;
+    RRDCONTEXT_ACQUIRED *rca;
+
+    struct query_data_statistics query_stats;
+    struct query_instances_counts instances;
+    struct query_metrics_counts metrics;
+    struct query_alerts_counts alerts;
+} QUERY_CONTEXT;
+
+typedef struct query_instance {
+    uint32_t slot;
+    uint32_t query_host_id;
+    RRDINSTANCE_ACQUIRED *ria;
+    STRING *id_fqdn;
+    STRING *name_fqdn;
+
+    struct query_data_statistics query_stats;
+    struct query_metrics_counts metrics;
+    struct query_alerts_counts alerts;
+} QUERY_INSTANCE;
+
+typedef struct query_dimension {
+    uint32_t slot;
+    RRDMETRIC_ACQUIRED *rma;
+    QUERY_STATUS status;
+} QUERY_DIMENSION;
+
 typedef struct query_metric {
+    RRDR_DIMENSION_FLAGS status;
+
     struct query_metric_tier {
         struct storage_engine *eng;
         STORAGE_METRIC_HANDLE *db_metric_handle;
@@ -150,28 +246,31 @@ typedef struct query_metric {
     } plan;
 
     struct {
-        RRDHOST *host;
-        RRDCONTEXT_ACQUIRED *rca;
-        RRDINSTANCE_ACQUIRED *ria;
-        RRDMETRIC_ACQUIRED *rma;
+        uint32_t query_host_id;
+        uint32_t query_context_id;
+        uint32_t query_instance_id;
+        uint32_t query_dimension_id;
     } link;
 
-    struct {
-        STRING *id;
-        STRING *name;
-        RRDR_DIMENSION_FLAGS options;
-    } dimension;
+    struct query_data_statistics query_stats;
 
     struct {
+        size_t slot;
         STRING *id;
         STRING *name;
-    } chart;
+        STRING *units;
+    } grouped_as;
 
 } QUERY_METRIC;
 
 #define MAX_QUERY_TARGET_ID_LENGTH 255
 
 typedef struct query_target_request {
+    size_t version;
+
+    const char *scope_hosts;
+    const char *scope_contexts;
+
     // selecting / filtering metrics to be queried
     RRDHOST *host;                      // the host to be queried (can be NULL, hosts will be used)
     RRDCONTEXT_ACQUIRED *rca;           // the context to be queried (can be NULL)
@@ -183,7 +282,8 @@ typedef struct query_target_request {
     const char *charts;                 // charts simple pattern (for context queries)
     const char *dimensions;             // dimensions simple pattern
     const char *chart_label_key;        // select only the chart having this label key
-    const char *charts_labels_filter;   // select only the charts having this combo of label key:value
+    const char *labels;                 // select only the charts having this combo of label key:value
+    const char *alerts;                 // select only the charts having this combo of alert name:status
 
     time_t after;                       // the requested timeframe
     time_t before;                      // the requested timeframe
@@ -206,11 +306,14 @@ typedef struct query_target_request {
 
     // group by across multiple time-series
     RRDR_GROUP_BY group_by;
-    const char *group_by_key;
-    RRDR_GROUP_BY_FUNCTION group_by_function;
+    char *group_by_label;
+    RRDR_GROUP_BY_FUNCTION group_by_aggregate_function;
 
+    usec_t received_ut;
 } QUERY_TARGET_REQUEST;
 
+#define GROUP_BY_MAX_LABEL_KEYS 10
+
 typedef struct query_target {
     char id[MAX_QUERY_TARGET_ID_LENGTH + 1]; // query identifier (for logging)
     QUERY_TARGET_REQUEST request;
@@ -248,36 +351,88 @@ typedef struct query_target {
     } query;
 
     struct {
-        RRDMETRIC_ACQUIRED **array;
+        QUERY_DIMENSION *array;
         uint32_t used;                      // how many items of the array are used
         uint32_t size;                      // the size of the array
-    } metrics;
+    } dimensions;
 
     struct {
-        RRDINSTANCE_ACQUIRED **array;
+        QUERY_INSTANCE *array;
         uint32_t used;                      // how many items of the array are used
         uint32_t size;                      // the size of the array
         SIMPLE_PATTERN *pattern;
+        SIMPLE_PATTERN *labels_pattern;
+        SIMPLE_PATTERN *alerts_pattern;
         SIMPLE_PATTERN *chart_label_key_pattern;
-        SIMPLE_PATTERN *charts_labels_filter_pattern;
     } instances;
 
     struct {
-        RRDCONTEXT_ACQUIRED **array;
+        QUERY_CONTEXT *array;
         uint32_t used;                      // how many items of the array are used
         uint32_t size;                      // the size of the array
         SIMPLE_PATTERN *pattern;
+        SIMPLE_PATTERN *scope_pattern;
     } contexts;
 
     struct {
-        RRDHOST **array;
+        QUERY_HOST *array;
         uint32_t used;                      // how many items of the array are used
         uint32_t size;                      // the size of the array
         SIMPLE_PATTERN *pattern;
+        SIMPLE_PATTERN *scope_pattern;
     } hosts;
 
+    struct {
+        size_t used;
+        char *label_keys[GROUP_BY_MAX_LABEL_KEYS];
+    } group_by;
+
+    struct query_data_statistics query_stats;
+
+    struct {
+        usec_t received_ut;
+        usec_t preprocessed_ut;
+        usec_t executed_ut;
+        usec_t group_by_ut;
+        usec_t finished_ut;
+    } timings;
 } QUERY_TARGET;
 
+static inline NEVERNULL QUERY_HOST *query_host(QUERY_TARGET *qt, size_t id) {
+    internal_fatal(id >= qt->hosts.used, "QUERY: invalid query host id");
+    return &qt->hosts.array[id];
+}
+
+static inline NEVERNULL QUERY_CONTEXT *query_context(QUERY_TARGET *qt, size_t query_context_id) {
+    internal_fatal(query_context_id >= qt->contexts.used, "QUERY: invalid query context id");
+    return &qt->contexts.array[query_context_id];
+}
+
+static inline NEVERNULL QUERY_INSTANCE *query_instance(QUERY_TARGET *qt, size_t query_instance_id) {
+    internal_fatal(query_instance_id >= qt->instances.used, "QUERY: invalid query instance id");
+    return &qt->instances.array[query_instance_id];
+}
+
+static inline NEVERNULL QUERY_DIMENSION *query_dimension(QUERY_TARGET *qt, size_t query_dimension_id) {
+    internal_fatal(query_dimension_id >= qt->dimensions.used, "QUERY: invalid query dimension id");
+    return &qt->dimensions.array[query_dimension_id];
+}
+
+static inline NEVERNULL QUERY_METRIC *query_metric(QUERY_TARGET *qt, size_t id) {
+    internal_fatal(id >= qt->query.used, "QUERY: invalid query metric id");
+    return &qt->query.array[id];
+}
+
+static inline const char *query_metric_id(QUERY_TARGET *qt, QUERY_METRIC *qm) {
+    QUERY_DIMENSION *qd = query_dimension(qt, qm->link.query_dimension_id);
+    return rrdmetric_acquired_id(qd->rma);
+}
+
+static inline const char *query_metric_name(QUERY_TARGET *qt, QUERY_METRIC *qm) {
+    QUERY_DIMENSION *qd = query_dimension(qt, qm->link.query_dimension_id);
+    return rrdmetric_acquired_name(qd->rma);
+}
+
 void query_target_free(void);
 void query_target_release(QUERY_TARGET *qt);
 

+ 40 - 0
database/rrdlabels.c

@@ -677,6 +677,46 @@ void rrdlabels_get_value_strdup_or_null(DICTIONARY *labels, char **value, const
     dictionary_acquired_item_release(labels, acquired_item);
 }
 
+void rrdlabels_get_value_strcpyz(DICTIONARY *labels, char *dst, size_t dst_len, const char *key) {
+    const DICTIONARY_ITEM *acquired_item = dictionary_get_and_acquire_item(labels, key);
+    RRDLABEL *lb = dictionary_acquired_item_value(acquired_item);
+
+    if(lb && lb->label_value)
+        strncpyz(dst, string2str(lb->label_value), dst_len);
+    else
+        dst[0] = '\0';
+
+    dictionary_acquired_item_release(labels, acquired_item);
+}
+
+STRING *rrdlabels_get_value_string_dup(DICTIONARY *labels, const char *key) {
+    const DICTIONARY_ITEM *acquired_item = dictionary_get_and_acquire_item(labels, key);
+    RRDLABEL *lb = dictionary_acquired_item_value(acquired_item);
+
+    STRING *ret = NULL;
+    if(lb && lb->label_value)
+        ret = string_dup(lb->label_value);
+
+    dictionary_acquired_item_release(labels, acquired_item);
+
+    return ret;
+}
+
+STRING *rrdlabels_get_value_to_buffer_or_unset(DICTIONARY *labels, BUFFER *wb, const char *key, const char *unset) {
+    const DICTIONARY_ITEM *acquired_item = dictionary_get_and_acquire_item(labels, key);
+    RRDLABEL *lb = dictionary_acquired_item_value(acquired_item);
+
+    STRING *ret = NULL;
+    if(lb && lb->label_value)
+        buffer_strcat(wb, string2str(lb->label_value));
+    else
+        buffer_strcat(wb, unset);
+
+    dictionary_acquired_item_release(labels, acquired_item);
+
+    return ret;
+}
+
 // ----------------------------------------------------------------------------
 // rrdlabels_unmark_all()
 // remove labels RRDLABEL_FLAG_OLD and RRDLABEL_FLAG_NEW from all dictionary items

+ 67 - 49
streaming/replication.c

@@ -1626,67 +1626,85 @@ static void replication_initialize_workers(bool master) {
 #define REQUEST_QUEUE_EMPTY (-1)
 #define REQUEST_CHART_NOT_FOUND (-2)
 
-static int replication_execute_next_pending_request(bool cancel) {
-    static __thread int max_requests_ahead = 0;
-    static __thread struct replication_request *rqs = NULL;
-    static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
-    static __thread size_t queue_rounds = 0; (void)queue_rounds;
+static __thread struct replication_thread_pipeline {
+    int max_requests_ahead;
+    struct replication_request *rqs;
+    int rqs_last_executed, rqs_last_prepared;
+    size_t queue_rounds;
+} rtp = {
+        .max_requests_ahead = 0,
+        .rqs = NULL,
+        .rqs_last_executed = 0,
+        .rqs_last_prepared = 0,
+        .queue_rounds = 0,
+};
+
+static void replication_pipeline_cancel_and_cleanup(void) {
+    if(!rtp.rqs)
+        return;
+
     struct replication_request *rq;
+    size_t cancelled = 0;
 
-    if(unlikely(cancel)) {
-        if(rqs) {
-            size_t cancelled = 0;
-            do {
-                if (++rqs_last_executed >= max_requests_ahead)
-                    rqs_last_executed = 0;
+    do {
+        if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+            rtp.rqs_last_executed = 0;
 
-                rq = &rqs[rqs_last_executed];
+        rq = &rtp.rqs[rtp.rqs_last_executed];
 
-                if (rq->q) {
-                    internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
-                    internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
+        if (rq->q) {
+            internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+            internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
 
-                    replication_response_cancel_and_finalize(rq->q);
-                    rq->q = NULL;
-                    cancelled++;
-                }
+            replication_response_cancel_and_finalize(rq->q);
+            rq->q = NULL;
+            cancelled++;
+        }
 
-                rq->executed = true;
-                rq->found = false;
+        rq->executed = true;
+        rq->found = false;
 
-            } while (rqs_last_executed != rqs_last_prepared);
+    } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
 
-            internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
-        }
-        return REQUEST_QUEUE_EMPTY;
-    }
+    internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
+
+    freez(rtp.rqs);
+    rtp.rqs = NULL;
+    rtp.max_requests_ahead = 0;
+    rtp.rqs_last_executed = 0;
+    rtp.rqs_last_prepared = 0;
+    rtp.queue_rounds = 0;
+}
+
+static int replication_pipeline_execute_next(void) {
+    struct replication_request *rq;
 
-    if(unlikely(!rqs)) {
-        max_requests_ahead = get_netdata_cpus() / 2;
+    if(unlikely(!rtp.rqs)) {
+        rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
 
-        if(max_requests_ahead > libuv_worker_threads * 2)
-            max_requests_ahead = libuv_worker_threads * 2;
+        if(rtp.max_requests_ahead > libuv_worker_threads * 2)
+            rtp.max_requests_ahead = libuv_worker_threads * 2;
 
-        if(max_requests_ahead < 2)
-            max_requests_ahead = 2;
+        if(rtp.max_requests_ahead < 2)
+            rtp.max_requests_ahead = 2;
 
-        rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
-        __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
+        rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
+        __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
     }
 
     // fill the queue
     do {
-        if(++rqs_last_prepared >= max_requests_ahead) {
-            rqs_last_prepared = 0;
-            queue_rounds++;
+        if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
+            rtp.rqs_last_prepared = 0;
+            rtp.queue_rounds++;
         }
 
-        internal_fatal(rqs[rqs_last_prepared].q,
+        internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
                        "REPLAY FATAL: slot is used by query that has not been executed!");
 
         worker_is_busy(WORKER_JOB_FIND_NEXT);
-        rqs[rqs_last_prepared] = replication_request_get_first_available();
-        rq = &rqs[rqs_last_prepared];
+        rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
+        rq = &rtp.rqs[rtp.rqs_last_prepared];
 
         if(rq->found) {
             if (!rq->st) {
@@ -1707,14 +1725,14 @@ static int replication_execute_next_pending_request(bool cancel) {
             rq->executed = false;
         }
 
-    } while(rq->found && rqs_last_prepared != rqs_last_executed);
+    } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
 
     // pick the first usable
     do {
-        if (++rqs_last_executed >= max_requests_ahead)
-            rqs_last_executed = 0;
+        if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
+            rtp.rqs_last_executed = 0;
 
-        rq = &rqs[rqs_last_executed];
+        rq = &rtp.rqs[rtp.rqs_last_executed];
 
         if(rq->found) {
             internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
@@ -1747,7 +1765,7 @@ static int replication_execute_next_pending_request(bool cancel) {
         else
             internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
 
-    } while(!rq->found && rqs_last_executed != rqs_last_prepared);
+    } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
 
     if(unlikely(!rq->found)) {
         worker_is_idle();
@@ -1771,7 +1789,7 @@ static int replication_execute_next_pending_request(bool cancel) {
 }
 
 static void replication_worker_cleanup(void *ptr __maybe_unused) {
-    replication_execute_next_pending_request(true);
+    replication_pipeline_cancel_and_cleanup();
     worker_unregister();
 }
 
@@ -1781,7 +1799,7 @@ static void *replication_worker_thread(void *ptr) {
     netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
 
     while(service_running(SERVICE_REPLICATION)) {
-        if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+        if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
             sender_thread_buffer_free();
             worker_is_busy(WORKER_JOB_WAIT);
             worker_is_idle();
@@ -1797,7 +1815,7 @@ static void replication_main_cleanup(void *ptr) {
     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
     static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
 
-    replication_execute_next_pending_request(true);
+    replication_pipeline_cancel_and_cleanup();
 
     int threads = (int)replication_globals.main_thread.threads;
     for(int i = 0; i < threads ;i++) {
@@ -1914,7 +1932,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
             worker_is_idle();
         }
 
-        if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
+        if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
 
             worker_is_busy(WORKER_JOB_WAIT);
             replication_recursive_lock();

+ 5 - 0
streaming/rrdpush.h

@@ -186,12 +186,17 @@ struct sender_state {
     } replication;
 
     struct {
+        bool pending_data;
         size_t buffer_used_percentage;          // the current utilization of the sending buffer
         usec_t last_flush_time_ut;              // the last time the sender flushed the sending buffer in USEC
         time_t last_buffer_recreate_s;          // true when the sender buffer should be re-created
     } atomic;
 };
 
+#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
+#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
+
 #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
 #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
 

+ 12 - 4
streaming/sender.c

@@ -178,8 +178,16 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
 
     replication_recalculate_buffer_used_ratio_unsafe(s);
 
+    bool signal_sender = false;
+    if(!rrdpush_sender_pipe_has_pending_data(s)) {
+        rrdpush_sender_pipe_set_pending_data(s);
+        signal_sender = true;
+    }
+
     netdata_mutex_unlock(&s->mutex);
-    rrdpush_signal_sender_to_wake_up(s);
+
+    if(signal_sender)
+        rrdpush_signal_sender_to_wake_up(s);
 }
 
 static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
@@ -1016,7 +1024,6 @@ void execute_commands(struct sender_state *s) {
 }
 
 struct rrdpush_sender_thread_data {
-    struct sender_state *sender_state;
     RRDHOST *host;
     char *pipe_buffer;
 };
@@ -1249,7 +1256,6 @@ void *rrdpush_sender_thread(void *ptr) {
 
     struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
     thread_data->pipe_buffer = mallocz(pipe_buffer_size);
-    thread_data->sender_state = s;
     thread_data->host = s->host;
 
     netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
@@ -1305,8 +1311,10 @@ void *rrdpush_sender_thread(void *ptr) {
         netdata_mutex_lock(&s->mutex);
         size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
         size_t available = cbuffer_available_size_unsafe(s->buffer);
-        if (unlikely(!outstanding))
+        if (unlikely(!outstanding)) {
+            rrdpush_sender_pipe_clear_pending_data(s);
             rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
+        }
         netdata_mutex_unlock(&s->mutex);
 
         worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);

Some files were not shown because too many files changed in this diff