Browse Source

Fix streaming scaling (#8375)

* Disallow multiple streaming connections to the same master agent

* Reject multiple streaming connections quickly without blocking

* Increase timeout for systemd service shutdown to give time to flush the db.

* Optimize page correlation ID to use atomic counter instead of locks

* Reduce contention in global configuration mutex

* Optimize complexity of inserting configuration sections from O(N) to O(1)

* Reduce overhead of clockgettime() by utilizing CLOCK_MONOTONIC_COARSE when applicable.

* Fix unit test compile errors
Markos Fountoulakis 5 years ago
parent
commit
161ba1592f

+ 1 - 1
collectors/ebpf_process.plugin/ebpf_process.c

@@ -809,7 +809,7 @@ static inline void set_log_file(char *ptr) {
 }
 
 static void set_global_values() {
-    struct section *sec = collector_config.sections;
+    struct section *sec = collector_config.first_section;
     while(sec) {
         if(!strcasecmp(sec->name, "global")) {
             struct config_option *values = sec->values;

+ 3 - 1
daemon/main.c

@@ -6,7 +6,8 @@ int netdata_zero_metrics_enabled;
 int netdata_anonymous_statistics_enabled;
 
 struct config netdata_config = {
-        .sections = NULL,
+        .first_section = NULL,
+        .last_section = NULL,
         .mutex = NETDATA_MUTEX_INITIALIZER,
         .index = {
                 .avl_tree = {
@@ -1148,6 +1149,7 @@ int main(int argc, char **argv) {
             mallopt(M_ARENA_MAX, 1);
 #endif
         test_clock_boottime();
+        test_clock_monotonic_coarse();
 
         // prepare configuration environment variables for the plugins
 

+ 1 - 3
database/engine/rrdengineapi.c

@@ -175,9 +175,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
 
         handle->descr = descr;
 
-        uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
-        handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++;
-        uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+        handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
 
         if (0 == rd->rrdset->rrddim_page_alignment) {
             /* this is the leading dimension that defines chart alignment */

+ 4 - 2
database/engine/rrdenginelib.h

@@ -27,11 +27,13 @@ struct rrdengine_instance;
 typedef uintptr_t rrdeng_stats_t;
 
 #ifdef __ATOMIC_RELAXED
-#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0)
+#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED)
 #else
-#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
+#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n)
 #endif
 
+#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n)
+
 #define RRDENG_PATH_MAX (4096)
 
 /* returns old *ptr value */

+ 12 - 10
database/rrdset.c

@@ -550,19 +550,21 @@ RRDSET *rrdset_create_custom(
     // ------------------------------------------------------------------------
     // get the options from the config, we need to create it
 
-    long rentries = config_get_number(config_section, "history", history_entries);
-    long entries = align_entries_to_pagesize(memory_mode, rentries);
-    if(entries != rentries) entries = config_set_number(config_section, "history", entries);
-
-    if(memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
-        entries = config_set_number(config_section, "history", 10);
-
+    long entries;
+    if(memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+        // only sets it the first time
+        entries = config_get_number(config_section, "history", 5);
+    } else {
+        long rentries = config_get_number(config_section, "history", history_entries);
+        entries = align_entries_to_pagesize(memory_mode, rentries);
+        if (entries != rentries) entries = config_set_number(config_section, "history", entries);
+
+        if (memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
+            entries = config_set_number(config_section, "history", 10);
+    }
     int enabled = config_get_boolean(config_section, "enabled", 1);
     if(!enabled) entries = 5;
 
-    if(memory_mode == RRD_MEMORY_MODE_DBENGINE)
-        entries = config_set_number(config_section, "history", 5);
-
     unsigned long size = sizeof(RRDSET);
     char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
 

+ 2 - 1
exporting/read_config.c

@@ -2,7 +2,8 @@
 
 #include "exporting_engine.h"
 
-struct config exporting_config = {.sections = NULL,
+struct config exporting_config = {.first_section = NULL,
+                                  .last_section = NULL,
                                   .mutex = NETDATA_MUTEX_INITIALIZER,
                                   .index = {.avl_tree = {.root = NULL, .compar = appconfig_section_compare},
                                             .rwlock = AVL_LOCK_INITIALIZER}};

+ 17 - 6
libnetdata/clocks/clocks.c

@@ -3,6 +3,7 @@
 #include "../libnetdata.h"
 
 static int clock_boottime_valid = 1;
+static int clock_monotonic_coarse_valid = 1;
 
 #ifndef HAVE_CLOCK_GETTIME
 inline int clock_gettime(clockid_t clk_id, struct timespec *ts) {
@@ -23,6 +24,12 @@ void test_clock_boottime(void) {
         clock_boottime_valid = 0;
 }
 
+void test_clock_monotonic_coarse(void) {
+    struct timespec ts;
+    if(clock_gettime(CLOCK_MONOTONIC_COARSE, &ts) == -1 && errno == EINVAL)
+        clock_monotonic_coarse_valid = 0;
+}
+
 static inline time_t now_sec(clockid_t clk_id) {
     struct timespec ts;
     if(unlikely(clock_gettime(clk_id, &ts) == -1)) {
@@ -69,27 +76,31 @@ inline int now_realtime_timeval(struct timeval *tv) {
 }
 
 inline time_t now_monotonic_sec(void) {
-    return now_sec(CLOCK_MONOTONIC);
+    return now_sec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
 }
 
 inline usec_t now_monotonic_usec(void) {
-    return now_usec(CLOCK_MONOTONIC);
+    return now_usec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
 }
 
 inline int now_monotonic_timeval(struct timeval *tv) {
-    return now_timeval(CLOCK_MONOTONIC, tv);
+    return now_timeval(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC, tv);
 }
 
 inline time_t now_boottime_sec(void) {
-    return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC);
+    return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME :
+                   likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
 }
 
 inline usec_t now_boottime_usec(void) {
-    return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC);
+    return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME :
+                    likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
 }
 
 inline int now_boottime_timeval(struct timeval *tv) {
-    return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC, tv);
+    return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME :
+                       likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC,
+                       tv);
 }
 
 inline usec_t timeval_usec(struct timeval *tv) {

+ 13 - 1
libnetdata/clocks/clocks.h

@@ -36,6 +36,12 @@ typedef struct heartbeat {
 #define CLOCK_MONOTONIC CLOCK_REALTIME
 #endif
 
+/* Prefer CLOCK_MONOTONIC_COARSE where available to reduce overhead. It has the same semantics as CLOCK_MONOTONIC */
+#ifndef CLOCK_MONOTONIC_COARSE
+/* fallback to CLOCK_MONOTONIC if not available */
+#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC
+#endif
+
 #ifndef CLOCK_BOOTTIME
 
 #ifdef CLOCK_UPTIME
@@ -43,7 +49,7 @@ typedef struct heartbeat {
 #define CLOCK_BOOTTIME CLOCK_UPTIME
 #else // CLOCK_UPTIME
 /* CLOCK_BOOTTIME falls back to CLOCK_MONOTONIC */
-#define CLOCK_BOOTTIME  CLOCK_MONOTONIC
+#define CLOCK_BOOTTIME  CLOCK_MONOTONIC_COARSE
 #endif // CLOCK_UPTIME
 
 #else // CLOCK_BOOTTIME
@@ -136,6 +142,12 @@ extern int sleep_usec(usec_t usec);
  */
 void test_clock_boottime(void);
 
+/*
+ * When running a binary with CLOCK_MONOTONIC_COARSE defined on a system with a linux kernel older than Linux 2.6.32 the
+ * clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC.
+ */
+void test_clock_monotonic_coarse(void);
+
 extern collected_number uptime_msec(char *filename);
 
 #endif /* NETDATA_CLOCKS_H */

+ 5 - 4
libnetdata/config/appconfig.c

@@ -169,12 +169,13 @@ static inline struct section *appconfig_section_create(struct config *root, cons
         error("INTERNAL ERROR: indexing of section '%s', already exists.", co->name);
 
     appconfig_wrlock(root);
-    struct section *co2 = root->sections;
+    struct section *co2 = root->last_section;
     if(co2) {
-        while (co2->next) co2 = co2->next;
         co2->next = co;
+    } else {
+        root->first_section = co;
     }
-    else root->sections = co;
+    root->last_section = co;
     appconfig_unlock(root);
 
     return co;
@@ -678,7 +679,7 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed)
         }
 
         appconfig_wrlock(root);
-        for(co = root->sections; co ; co = co->next) {
+        for(co = root->first_section; co ; co = co->next) {
             if(!strcmp(co->name, CONFIG_SECTION_GLOBAL)
                || !strcmp(co->name, CONFIG_SECTION_WEB)
                || !strcmp(co->name, CONFIG_SECTION_STATSD)

+ 2 - 1
libnetdata/config/appconfig.h

@@ -141,7 +141,8 @@ struct section {
 };
 
 struct config {
-    struct section *sections;
+    struct section *first_section;
+    struct section *last_section; // optimize inserting at the end
     netdata_mutex_t mutex;
     avl_tree_lock index;
 };

Some files were not shown because too many files changed in this diff