Browse Source

added option to enable/disable statsd; added statistical functions (some of which) are required by statsd metrics

Costa Tsaousis (ktsaou) 7 years ago
parent
commit
5f6dfc9ee8
6 changed files with 398 additions and 7 deletions
  1. 1 1
      CMakeLists.txt
  2. 2 0
      src/Makefile.am
  3. 1 0
      src/common.h
  4. 375 0
      src/statistical.c
  5. 14 0
      src/statistical.h
  6. 5 6
      src/statsd.c

+ 1 - 1
CMakeLists.txt

@@ -151,7 +151,7 @@ set(NETDATA_SOURCE_FILES
         src/web_client.h
         src/web_server.c
         src/web_server.h
-        src/locks.h src/statsd.c src/statsd.h)
+        src/locks.h src/statsd.c src/statsd.h src/statistical.c src/statistical.h)
 
 set(APPS_PLUGIN_SOURCE_FILES
         src/appconfig.c

+ 2 - 0
src/Makefile.am

@@ -119,6 +119,8 @@ netdata_SOURCES = \
 	simple_pattern.h \
 	socket.c \
 	socket.h \
+	statistical.c \
+	statistical.h \
 	statsd.c \
 	statsd.h \
 	storage_number.c \

+ 1 - 0
src/common.h

@@ -202,6 +202,7 @@
 #define NETDATA_OS_TYPE "linux"
 #endif /* __FreeBSD__, __APPLE__*/
 
+#include "statistical.h"
 #include "socket.h"
 #include "eval.h"
 #include "health.h"

+ 375 - 0
src/statistical.c

@@ -0,0 +1,375 @@
+#include "common.h"
+
+// --------------------------------------------------------------------------------------------------------------------
+
+long double average(long double *series, size_t entries) {
+    size_t i, count = 0;
+    long double sum = 0;
+
+    for(i = 0; i < entries ; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+        count++;
+        sum += value;
+    }
+
+    return sum / (long double)count;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+long double moving_average(long double *series, size_t entries, size_t period) {
+    size_t i, count = 0;
+    long double sum = 0, avg = 0;
+    long double p[period];
+
+    for(i = 0; i < entries; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+
+        if(count < period) {
+            sum += value;
+            avg = (count == period - 1) ? sum / (long double)period : 0;
+        }
+        else {
+            sum = sum - p[count % period] + value;
+            avg = sum / (long double)period;
+        }
+
+        p[count % period] = value;
+        count++;
+    }
+    return avg;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+static int qsort_compare(const void *a, const void *b) {
+    long double *p1 = (long double *)a, *p2 = (long double *)b;
+    long double n1 = *p1, n2 = *p2;
+
+    if(unlikely(isnan(n1) || isnan(n2))) {
+        if(isnan(n1) && !isnan(n2)) return -1;
+        if(!isnan(n1) && isnan(n2)) return 1;
+        return 0;
+    }
+    if(unlikely(isinf(n1) || isinf(n2))) {
+        if(!isinf(n1) && isinf(n2)) return -1;
+        if(isinf(n1) && !isinf(n2)) return 1;
+        return 0;
+    }
+
+    if(unlikely(n1 < n2)) return -1;
+    if(unlikely(n1 > n2)) return 1;
+    return 0;
+}
+
+long double median(long double *series, size_t entries) {
+    if(unlikely(entries == 0))
+        return NAN;
+
+    if(unlikely(entries == 1))
+        return series[0];
+
+    if(unlikely(entries == 2))
+        return (series[0] + series[1]) / 2;
+
+    long double *copy = mallocz(sizeof(long double) * entries);
+    memcpy(copy, series, sizeof(long double) * entries);
+    qsort(copy, entries, sizeof(long double), qsort_compare);
+
+    long double avg;
+    if(entries % 2 == 0) {
+        size_t m = entries / 2;
+        avg = (copy[m] + copy[m + 1]) / 2;
+    }
+    else {
+        avg = copy[entries / 2];
+    }
+
+    freez(copy);
+    return avg;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+long double moving_median(long double *series, size_t entries, size_t period) {
+    if(entries <= period)
+        return median(series, entries);
+
+    size_t len = entries - period;
+    long double *data = mallocz(sizeof(long double) * len);
+
+    size_t i;
+    for(i = period; i < entries; i++) {
+        data[i - period] = median(&series[i - period], period);
+    }
+
+    long double avg = median(data, entries - period);
+    freez(data);
+    return avg;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+// http://stackoverflow.com/a/15150143/4525767
+long double running_median_estimate(long double *series, size_t entries) {
+    long double median = 0.0f;
+    long double average = 0.0f;
+    size_t i;
+
+    for(i = 0; i < entries ; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+
+        average += ( value - average ) * 0.1f; // rough running average.
+        median += copysignl( average * 0.01, value - median );
+    }
+
+    return median;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+long double standard_deviation(long double *series, size_t entries) {
+    size_t i, count = 0;
+    long double sum = 0;
+
+    for(i = 0; i < entries ; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+        count++;
+
+        sum += value;
+    }
+    long double average = sum / (long double)count;
+
+    for(i = 0, count = 0, sum = 0; i < entries ; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+        count++;
+
+        sum += powl(value - average, 2);
+    }
+    long double variance = sum / (long double)(count - 1); // remove -1 to have a population stddev
+
+    long double stddev = sqrtl(variance);
+    return stddev;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+long double single_exponential_smoothing(long double *series, size_t entries, long double alpha) {
+    size_t i, count = 0;
+    long double level = 0, sum = 0;
+
+    if(unlikely(isnan(alpha)))
+        alpha = 0.3;
+
+    for(i = 0; i < entries ; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+        count++;
+
+        sum += value;
+
+        long double last_level = level;
+        level = alpha * value + (1.0 - alpha) * last_level;
+    }
+
+    return level;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+// http://grisha.org/blog/2016/02/16/triple-exponential-smoothing-forecasting-part-ii/
+long double double_exponential_smoothing(long double *series, size_t entries, long double alpha, long double beta, long double *forecast) {
+    size_t i, count = 0;
+    long double level = series[0], trend, sum;
+
+    if(unlikely(isnan(alpha)))
+        alpha = 0.3;
+
+    if(unlikely(isnan(beta)))
+        beta = 0.05;
+
+    if(likely(entries > 1))
+        trend = series[1] - series[0];
+    else
+        trend = 0;
+
+    sum = series[0];
+
+    for(i = 1; i < entries ; i++) {
+        long double value = series[i];
+        if(unlikely(isnan(value) || isinf(value))) continue;
+        count++;
+
+        sum += value;
+
+        long double last_level = level;
+
+        level = alpha * value + (1.0 - alpha) * (level + trend);
+        trend = beta * (level - last_level) + (1.0 - beta) * trend;
+    }
+
+    if(forecast)
+        *forecast = level + trend;
+
+    return level;
+}
+
+// --------------------------------------------------------------------------------------------------------------------
+
+/*
+ * Based on th R implementation
+ *
+ * a: level component
+ * b: trend component
+ * s: seasonal component
+ *
+ * Additive:
+ *
+ *   Yhat[t+h] = a[t] + h * b[t] + s[t + 1 + (h - 1) mod p],
+ *   a[t] = α (Y[t] - s[t-p]) + (1-α) (a[t-1] + b[t-1])
+ *   b[t] = β (a[t] - a[t-1]) + (1-β) b[t-1]
+ *   s[t] = γ (Y[t] - a[t]) + (1-γ) s[t-p]
+ *
+ * Multiplicative:
+ *
+ *   Yhat[t+h] = (a[t] + h * b[t]) * s[t + 1 + (h - 1) mod p],
+ *   a[t] = α (Y[t] / s[t-p]) + (1-α) (a[t-1] + b[t-1])
+ *   b[t] = β (a[t] - a[t-1]) + (1-β) b[t-1]
+ *   s[t] = γ (Y[t] / a[t]) + (1-γ) s[t-p]
+ */
+static int __HoltWinters(
+        long double *series,
+        int          entries,      // start_time + h
+
+        long double alpha,        // alpha parameter of Holt-Winters Filter.
+        long double beta,         // beta  parameter of Holt-Winters Filter. If set to 0, the function will do exponential smoothing.
+        long double gamma,        // gamma parameter used for the seasonal component. If set to 0, an non-seasonal model is fitted.
+
+        int *seasonal,
+        int *period,
+        long double *a,            // Start value for level (a[0]).
+        long double *b,            // Start value for trend (b[0]).
+        long double *s,            // Vector of start values for the seasonal component (s_1[0] ... s_p[0])
+
+        /* return values */
+        long double *SSE,          // The final sum of squared errors achieved in optimizing
+        long double *level,        // Estimated values for the level component (size entries - t + 2)
+        long double *trend,        // Estimated values for the trend component (size entries - t + 2)
+        long double *season        // Estimated values for the seasonal component (size entries - t + 2)
+)
+{
+    if(unlikely(entries < 4))
+        return 0;
+
+    int start_time = 2;
+
+    long double res = 0, xhat = 0, stmp = 0;
+    int i, i0, s0;
+
+    /* copy start values to the beginning of the vectors */
+    level[0] = *a;
+    if(beta > 0) trend[0] = *b;
+    if(gamma > 0) memcpy(season, s, *period * sizeof(long double));
+
+    for(i = start_time - 1; i < entries; i++) {
+        /* indices for period i */
+        i0 = i - start_time + 2;
+        s0 = i0 + *period - 1;
+
+        /* forecast *for* period i */
+        xhat = level[i0 - 1] + (beta > 0 ? trend[i0 - 1] : 0);
+        stmp = gamma > 0 ? season[s0 - *period] : (*seasonal != 1);
+        if (*seasonal == 1)
+            xhat += stmp;
+        else
+            xhat *= stmp;
+
+        /* Sum of Squared Errors */
+        res   = series[i] - xhat;
+        *SSE += res * res;
+
+        /* estimate of level *in* period i */
+        if (*seasonal == 1)
+            level[i0] = alpha       * (series[i] - stmp)
+                        + (1 - alpha) * (level[i0 - 1] + trend[i0 - 1]);
+        else
+            level[i0] = alpha       * (series[i] / stmp)
+                        + (1 - alpha) * (level[i0 - 1] + trend[i0 - 1]);
+
+        /* estimate of trend *in* period i */
+        if (beta > 0)
+            trend[i0] = beta        * (level[i0] - level[i0 - 1])
+                        + (1 - beta)  * trend[i0 - 1];
+
+        /* estimate of seasonal component *in* period i */
+        if (gamma > 0) {
+            if (*seasonal == 1)
+                season[s0] = gamma       * (series[i] - level[i0])
+                             + (1 - gamma) * stmp;
+            else
+                season[s0] = gamma       * (series[i] / level[i0])
+                             + (1 - gamma) * stmp;
+        }
+    }
+
+    return 1;
+}
+
+long double holtwinters(long double *series, size_t entries, long double alpha, long double beta, long double gamma, long double *forecast) {
+    if(unlikely(isnan(alpha)))
+        alpha = 0.3;
+
+    if(unlikely(isnan(beta)))
+        beta = 0.05;
+
+    if(unlikely(isnan(gamma)))
+        gamma = 0;
+
+    int seasonal = 0;
+    int period = 0;
+    long double a0 = series[0];
+    long double b0 = 0;
+    long double s[] = {};
+
+    long double errors;
+    size_t nb_computations = entries;
+    long double *estimated_level  = callocz(nb_computations, sizeof(long double));
+    long double *estimated_trend  = callocz(nb_computations, sizeof(long double));
+    long double *estimated_season = callocz(nb_computations, sizeof(long double));
+
+    int ret = __HoltWinters(
+            series,
+            (int)entries,
+            alpha,
+            beta,
+            gamma,
+            &seasonal,
+            &period,
+            &a0,
+            &b0,
+            s,
+            &errors,
+            estimated_level,
+            estimated_trend,
+            estimated_season
+    );
+
+    long double value = estimated_level[nb_computations - 1];
+
+    if(forecast)
+        *forecast = 0.0;
+
+    freez(estimated_level);
+    freez(estimated_trend);
+    freez(estimated_season);
+
+    if(!ret)
+        return 0.0;
+
+    return value;
+}

+ 14 - 0
src/statistical.h

@@ -0,0 +1,14 @@
+#ifndef NETDATA_STATISTICAL_H
+#define NETDATA_STATISTICAL_H
+
+extern long double average(long double *series, size_t entries);
+extern long double moving_average(long double *series, size_t entries, size_t period);
+extern long double median(long double *series, size_t entries);
+extern long double moving_median(long double *series, size_t entries, size_t period);
+extern long double running_median_estimate(long double *series, size_t entries);
+extern long double standard_deviation(long double *series, size_t entries);
+extern long double single_exponential_smoothing(long double *series, size_t entries, long double alpha);
+extern long double double_exponential_smoothing(long double *series, size_t entries, long double alpha, long double beta, long double *forecast);
+extern long double holtwinters(long double *series, size_t entries, long double alpha, long double beta, long double gamma, long double *forecast);
+
+#endif //NETDATA_STATISTICAL_H

+ 5 - 6
src/statsd.c

@@ -167,12 +167,6 @@ static int statsd_metric_compare(void* a, void* b) {
     else return strcmp(((STATSD_METRIC *)a)->name, ((STATSD_METRIC *)b)->name);
 }
 
-static int statsd_metric_set_compare(void* a, void* b) {
-    if(((STATSD_METRIC_SET_VALUE *)a)->hash < ((STATSD_METRIC_SET_VALUE *)b)->hash) return -1;
-    else if(((STATSD_METRIC_SET_VALUE *)a)->hash > ((STATSD_METRIC_SET_VALUE *)b)->hash) return 1;
-    else return strcmp(((STATSD_METRIC_SET_VALUE *)a)->value, ((STATSD_METRIC_SET_VALUE *)b)->value);
-}
-
 static inline STATSD_METRIC *stasd_metric_index_find(STATSD_INDEX *index, const char *name, uint32_t hash) {
     STATSD_METRIC tmp;
     tmp.name = name;
@@ -316,6 +310,8 @@ static inline void statsd_process_timer(STATSD_METRIC *m, char *v, char *r) {
 }
 
 static inline void statsd_process_set(STATSD_METRIC *m, char *v, char *r) {
+    (void)r;
+
     if(unlikely(!v || !*v)) {
         error("STATSD: metric of type set, with empty value is ignored.");
         return;
@@ -650,6 +646,9 @@ void *statsd_main(void *ptr) {
     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
         error("Cannot set pthread cancel state to ENABLE.");
 
+    int enabled = config_get_boolean(CONFIG_SECTION_STATSD, "enabled", 1);
+    if(!enabled) return NULL;
+
     statsd_listen_sockets_setup();
     if(!statsd.sockets.opened) {
         error("STATSD: No statsd sockets to listen to.");