Browse Source

Replication of metrics (gaps filling) during streaming (#13873)

* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"

This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7.

* Profile plugin

* Fix macos static thread

* Add support for replication

- Add a new capability for replication, when not supported the agent
should behave as previously.
- When replication is supported, the text protocol supports the
following new commands:
    - CHART_DEFINITION_END: send the first/last entry of the child
    - REPLAY_RRDSET_BEGIN: sends the name of the chart we are
      replicating
    - REPLAY_RRDSET_HEADER: sends a line describing the columns of the
      following command (ie. start-time, end-time, dim1-name, ...)
    - REPLAY_RRDSET_DONE: sends values to push for a specific start/end
      time
    - REPLAY_RRDSET_END: send the (a) update every of the chart, (b)
      first/last entries in DB, (c) whether the child's been told to
      start streaming, (d) original after/before period to replicate.
    - REPLAY_CHART: Sent from a parent to a child, specifying (a)
      the chart name we want data for, (b) whether the child should
      start streaming once it has fullfilled the request with the
      aforementioned commands, (c) after/before of the data the parent
      wants
- As a consequence of the new protocol, streaming is disabled for all
  charts on a new connection. It's enabled once replication is finished.
- The configuration parameters are specified from within stream.conf:
        - "enable replication = yes|no"
        - "seconds to replicate = 3600"
        - "replication step = 600" (ie. how many seconds to fill per
          roundtrip request.

* Minor fixes

- quote set and dim ids
- start streaming after writing replicated data to the buffer
- write replicated data only when buffer is less than 50% full.
- use reentrant iteration for charts

* Do not send chart definitions on connection.

* Track replication status through rrdset flags.

* Add debug flag for noisy log messages.

* Add license notice.

* Iterate charts with reentrant loop

* Set replication finished flag when streaming is disabled.

* Revert "Profile plugin"

This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d.

Used only for testing purposes.

* Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)""

This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d.

Reapply commit that I had to revert in order to be able to build the
agent on MacOS.

* Build replication source files with CMake.

* Pass number of words in plugind functions.

* Use get_word instead of indexing words.

* Use size_t instead of int.

* Pay only what we use when splitting words.

* no need to redefine PLUGINSD_MAX_WORDS

* fix formatting warning

* all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart

* keep a sender dictionary with all the replication commands received and remove replication commands from charts

* do not replicate future data

* use last_updated to find the end of the db

* uniformity of replication logs

* rewrite of the query logic

* replication.c in C; debug info in human readable dates

* update the chart on every replication row

* update all chart members so that rrdset_done() can continue

* update the protocol to push one dimension per line and transfer data collection state to parent

* fix formatting

* remove replication object from pluginsd

* shorter communication

* fix typo

* support for replication proxies

* proper use of flags

* set receiver replication finished flag on charts created after the sender has been connected

* clear RRDSET_FLAG_SYNC_CLOCK on replicated charts

* log storing of nulls

* log first store

* log update every switches

* test ignoring timestamps but sending a point just after replication end

* replication should work on end_time

* use replicated timestamps

* at the final replication step, replicate all the remaining points

* cleanup code from tests

* print timestamps as unsigned long long

* more formating changes; fix conflicting type of replicate_chart_response()

* updated stream.conf

* always respond to replication requests

* in non-dbengine db modes, do not replicate more than the database size

* advance the db pointer of legacy db modes

* should be multiplied by update_every

* fix buggy label parsing - identified by codacy

* dont log error on history mismatches for db mode dbengine

* allow SSL requests to streaming children

* dont use ssl variable

Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
vkalintiris 2 years ago
parent
commit
282e0dfaa9

+ 2 - 0
CMakeLists.txt

@@ -860,6 +860,8 @@ set(STREAMING_PLUGIN_FILES
         streaming/compression.c
         streaming/receiver.c
         streaming/sender.c
+        streaming/replication.c
+        streaming/replication.h
         )
 
 set(CLAIM_PLUGIN_FILES

+ 2 - 0
Makefile.am

@@ -622,6 +622,8 @@ STREAMING_PLUGIN_FILES = \
     streaming/compression.c \
     streaming/sender.c \
     streaming/receiver.c \
+    streaming/replication.h \
+    streaming/replication.c \
     streaming/rrdpush.h \
     $(NULL)
 

+ 28 - 25
collectors/apps.plugin/apps_plugin.c

@@ -4311,7 +4311,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
     struct pid_stat *p;
 
     char *words[PLUGINSD_MAX_WORDS] = { NULL };
-    pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+    size_t num_words = pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
 
     struct target *category = NULL, *user = NULL, *group = NULL;
     const char *process_name = NULL;
@@ -4322,51 +4322,52 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
     bool filter_pid = false, filter_uid = false, filter_gid = false;
 
     for(int i = 1; i < PLUGINSD_MAX_WORDS ;i++) {
-        if(!words[i]) break;
+        const char *keyword = get_word(words, num_words, i);
+        if(!keyword) break;
 
-        if(!category && strncmp(words[i], PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
-            category = find_target_by_name(apps_groups_root_target, &words[i][strlen(PROCESS_FILTER_CATEGORY)]);
+        if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
+            category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]);
             if(!category) {
                 apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found.");
                 return;
             }
         }
-        else if(!user && strncmp(words[i], PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
-            user = find_target_by_name(users_root_target, &words[i][strlen(PROCESS_FILTER_USER)]);
+        else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
+            user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]);
             if(!user) {
                 apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found.");
                 return;
             }
         }
-        else if(strncmp(words[i], PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
-            group = find_target_by_name(groups_root_target, &words[i][strlen(PROCESS_FILTER_GROUP)]);
+        else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
+            group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]);
             if(!group) {
                 apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found.");
                 return;
             }
         }
-        else if(!process_name && strncmp(words[i], PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) {
-            process_name = &words[i][strlen(PROCESS_FILTER_PROCESS)];
+        else if(!process_name && strncmp(keyword, PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) {
+            process_name = &keyword[strlen(PROCESS_FILTER_PROCESS)];
         }
-        else if(!pid && strncmp(words[i], PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) {
-            pid = str2i(&words[i][strlen(PROCESS_FILTER_PID)]);
+        else if(!pid && strncmp(keyword, PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) {
+            pid = str2i(&keyword[strlen(PROCESS_FILTER_PID)]);
             filter_pid = true;
         }
-        else if(!uid && strncmp(words[i], PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) {
-            uid = str2i(&words[i][strlen(PROCESS_FILTER_UID)]);
+        else if(!uid && strncmp(keyword, PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) {
+            uid = str2i(&keyword[strlen(PROCESS_FILTER_UID)]);
             filter_uid = true;
         }
-        else if(!gid && strncmp(words[i], PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) {
-            gid = str2i(&words[i][strlen(PROCESS_FILTER_GID)]);
+        else if(!gid && strncmp(keyword, PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) {
+            gid = str2i(&keyword[strlen(PROCESS_FILTER_GID)]);
             filter_gid = true;
         }
-        else if(strcmp(words[i], "help") == 0) {
+        else if(strcmp(keyword, "help") == 0) {
             apps_plugin_function_processes_help(transaction);
             return;
         }
         else {
             char msg[PLUGINSD_LINE_MAX];
-            snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", words[i]);
+            snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword);
             apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, msg);
             return;
         }
@@ -4779,16 +4780,18 @@ void *reader_main(void *arg __maybe_unused) {
     while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
 
         char *words[PLUGINSD_MAX_WORDS] = { NULL };
-        pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+        size_t num_words = pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
 
-        if(words[0] && strcmp(words[0], PLUGINSD_KEYWORD_FUNCTION) == 0) {
-            char *transaction = words[1];
-            char *timeout_s = words[2];
-            char *function = words[3];
+        const char *keyword = get_word(words, num_words, 0);
+
+        if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+            char *transaction = get_word(words, num_words, 1);
+            char *timeout_s = get_word(words, num_words, 2);
+            char *function = get_word(words, num_words, 3);
 
             if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
                 error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
-                      words[0],
+                      keyword,
                       transaction?transaction:"(unset)",
                       timeout_s?timeout_s:"(unset)",
                       function?function:"(unset)");
@@ -4813,7 +4816,7 @@ void *reader_main(void *arg __maybe_unused) {
             }
         }
         else
-            error("Received unknown command: %s", words[0]?words[0]:"(unset)");
+            error("Received unknown command: %s", keyword?keyword:"(unset)");
     }
 
     if(!s || feof(stdin) || ferror(stdin)) {

+ 1 - 1
collectors/plugins.d/plugins_d.c

@@ -6,7 +6,7 @@
 char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL };
 struct plugind *pluginsd_root = NULL;
 
-inline int pluginsd_initialize_plugin_directories()
+inline size_t pluginsd_initialize_plugin_directories()
 {
     char plugins_dirs[(FILENAME_MAX * 2) + 1];
     static char *plugins_dir_list = NULL;

+ 10 - 3
collectors/plugins.d/plugins_d.h

@@ -11,6 +11,7 @@
 #define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
 
 #define PLUGINSD_KEYWORD_CHART                  "CHART"
+#define PLUGINSD_KEYWORD_CHART_DEFINITION_END   "CHART_DEFINITION_END"
 #define PLUGINSD_KEYWORD_DIMENSION              "DIMENSION"
 #define PLUGINSD_KEYWORD_BEGIN                  "BEGIN"
 #define PLUGINSD_KEYWORD_SET                    "SET"
@@ -29,12 +30,18 @@
 #define PLUGINSD_KEYWORD_CONTEXT                "CONTEXT"
 #define PLUGINSD_KEYWORD_TOMBSTONE              "TOMBSTONE"
 #define PLUGINSD_KEYWORD_HOST                   "HOST"
-//#define PLUGINSD_KEYWORD_GAPS_REQUEST           "GAPS_REQUEST" // child -> parent
-//#define PLUGINSD_KEYWORD_CHART_GAP              "CHART_GAP"    // parent <- child
+
+#define PLUGINSD_KEYWORD_REPLAY_CHART           "REPLAY_CHART"
+#define PLUGINSD_KEYWORD_REPLAY_BEGIN           "RBEGIN"
+#define PLUGINSD_KEYWORD_REPLAY_SET             "RSET"
+#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE    "RDSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE    "RSSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_END             "REND"
 
 #define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
 
 #define PLUGINSD_LINE_MAX_SSL_READ 512
+
 #define PLUGINSD_MAX_WORDS 20
 
 #define PLUGINSD_MAX_DIRECTORIES 20
@@ -69,7 +76,7 @@ extern struct plugind *pluginsd_root;
 
 size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
 
-int pluginsd_initialize_plugin_directories();
+size_t pluginsd_initialize_plugin_directories();
 
 
 

+ 457 - 106
collectors/plugins.d/pluginsd_parser.c

@@ -4,10 +4,35 @@
 
 #define LOG_FUNCTIONS false
 
-PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+static int send_to_plugin(const char *txt, void *data) {
+    PARSER *parser = data;
+
+    if(!txt || !*txt)
+        return 0;
+
+#ifdef ENABLE_HTTPS
+    struct netdata_ssl *ssl = parser->ssl_output;
+    if(ssl) {
+        if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+            size_t size = strlen(txt);
+            return SSL_write(ssl->conn, txt, (int)size);
+        }
+
+        error("cannot write to SSL connection - connection is not ready.");
+        return -1;
+    }
+#endif
+
+    FILE *fp = parser->output;
+    int ret = fprintf(fp, "%s", txt);
+    fflush(fp);
+    return ret;
+}
+
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    char *dimension = words[1];
-    char *value = words[2];
+    char *dimension = get_word(words, num_words, 1);
+    char *value = get_word(words, num_words, 2);
 
     RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -47,10 +72,10 @@ disable:
     return PARSER_RC_ERROR;
 }
 
-PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    char *id = words[1];
-    char *microseconds_txt = words[2];
+    char *id = get_word(words, num_words, 1);
+    char *microseconds_txt = get_word(words, num_words, 2);
 
     RRDSET *st = NULL;
     RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
@@ -86,9 +111,11 @@ disable:
     return PARSER_RC_ERROR;
 }
 
-PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
     UNUSED(words);
+    UNUSED(num_words);
+
     RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
 
@@ -107,7 +134,7 @@ PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION  *plugins_actio
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
     if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
@@ -115,18 +142,18 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION  *plugins_act
         return PARSER_RC_OK;
     }
 
-    char *type = words[1];
-    char *name = words[2];
-    char *title = words[3];
-    char *units = words[4];
-    char *family = words[5];
-    char *context = words[6];
-    char *chart = words[7];
-    char *priority_s = words[8];
-    char *update_every_s = words[9];
-    char *options = words[10];
-    char *plugin = words[11];
-    char *module = words[12];
+    char *type = get_word(words, num_words, 1);
+    char *name = get_word(words, num_words, 2);
+    char *title = get_word(words, num_words, 3);
+    char *units = get_word(words, num_words, 4);
+    char *family = get_word(words, num_words, 5);
+    char *context = get_word(words, num_words, 6);
+    char *chart = get_word(words, num_words, 7);
+    char *priority_s = get_word(words, num_words, 8);
+    char *update_every_s = get_word(words, num_words, 9);
+    char *options = get_word(words, num_words, 10);
+    char *plugin = get_word(words, num_words, 11);
+    char *module = get_word(words, num_words, 12);
 
     // parse the id from type
     char *id = NULL;
@@ -231,14 +258,36 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION  *plugins_act
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action)
 {
-    char *id = words[1];
-    char *name = words[2];
-    char *algorithm = words[3];
-    char *multiplier_s = words[4];
-    char *divisor_s = words[5];
-    char *options = words[6];
+    UNUSED(plugins_action);
+
+    long first_entry_child = str2l(get_word(words, num_words, 1));
+    long last_entry_child = str2l(get_word(words, num_words, 2));
+
+    PARSER_USER_OBJECT *user_object = (PARSER_USER_OBJECT *) user;
+
+    RRDHOST *host = user_object->host;
+    RRDSET *st = user_object->st;
+    if(unlikely(!host || !st)) {
+        error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without a chart. Disabling it.");
+        return PARSER_RC_ERROR;
+    }
+
+    rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+
+    bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
+    return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+{
+    char *id = get_word(words, num_words, 1);
+    char *name = get_word(words, num_words, 2);
+    char *algorithm = get_word(words, num_words, 3);
+    char *multiplier_s = get_word(words, num_words, 4);
+    char *divisor_s = get_word(words, num_words, 5);
+    char *options = get_word(words, num_words, 6);
 
     RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -341,16 +390,18 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
     struct inflight_function *pf = func;
 
     PARSER  *parser = parser_ptr;
-    FILE *fp = parser->output;
 
     // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
     pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
 
+    char buffer[2048 + 1];
+    snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
+                      dictionary_acquired_item_name(item),
+                      pf->timeout,
+                      string2str(pf->function));
+
     // send the command to the plugin
-    int ret = fprintf(fp, "FUNCTION %s %d \"%s\"\n",
-            dictionary_acquired_item_name(item),
-            pf->timeout,
-            string2str(pf->function));
+    int ret = send_to_plugin(buffer, parser);
 
     pf->sent_ut = now_realtime_usec();
 
@@ -359,11 +410,9 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
         rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
     }
     else {
-        fflush(fp);
-
         internal_error(LOG_FUNCTIONS,
-                       "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, fd %d, in %llu usec)",
-                       string2str(pf->function), dictionary_acquired_item_name(item), ret, fileno(fp),
+                       "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+                       string2str(pf->function), dictionary_acquired_item_name(item), ret,
                        pf->sent_ut - pf->started_ut);
     }
 }
@@ -461,18 +510,18 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
     return HTTP_RESP_OK;
 }
 
-PARSER_RC pluginsd_function(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_function(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
     bool global = false;
-    int i = 1;
-    if(strcmp(words[i], "GLOBAL") == 0) {
+    size_t i = 1;
+    if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
         i++;
         global = true;
     }
 
-    char *name      = words[i++];
-    char *timeout_s = words[i++];
-    char *help      = words[i++];
+    char *name      = get_word(words, num_words, i++);
+    char *timeout_s = get_word(words, num_words, i++);
+    char *help      = get_word(words, num_words, i++);
 
     RRDSET *st = (global)?NULL:((PARSER_USER_OBJECT *) user)->st;
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -508,12 +557,12 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat
     string_freez(key);
 }
 
-PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    char *key = words[1];
-    char *status = words[2];
-    char *format = words[3];
-    char *expires = words[4];
+    char *key = get_word(words, num_words, 1);
+    char *status = get_word(words, num_words, 2);
+    char *format = get_word(words, num_words, 3);
+    char *expires = get_word(words, num_words, 4);
 
     if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
         error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
@@ -564,10 +613,10 @@ PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTI
 
 // ----------------------------------------------------------------------------
 
-PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    char *name = words[1];
-    char *value = words[2];
+    char *name = get_word(words, num_words, 1);
+    char *value = get_word(words, num_words, 2);
     NETDATA_DOUBLE v;
 
     RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
@@ -578,12 +627,12 @@ PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION  *plugins_
     if (name && *name) {
         if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
             global = 1;
-            name = words[2];
-            value = words[3];
+            name = get_word(words, num_words, 2);
+            value = get_word(words, num_words, 3);
         } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
             global = 0;
-            name = words[2];
-            value = words[3];
+            name = get_word(words, num_words, 2);
+            value = get_word(words, num_words, 3);
         }
     }
 
@@ -641,69 +690,78 @@ PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION  *plugins_
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    UNUSED(words);
     debug(D_PLUGINSD, "requested a FLUSH");
     ((PARSER_USER_OBJECT *) user)->st = NULL;
+    ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+    ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+    ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+    ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    UNUSED(user);
-    UNUSED(words);
-
     info("called DISABLE. Disabling it.");
     ((PARSER_USER_OBJECT *) user)->enabled = 0;
     return PARSER_RC_ERROR;
 }
 
-PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_label(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    char *store;
+    const char *name = get_word(words, num_words, 1);
+    const char *label_source = get_word(words, num_words, 2);
+    const char *value = get_word(words, num_words, 3);
 
-    if (!words[1] || !words[2] || !words[3]) {
+    if (!name || !label_source || !value) {
         error("Ignoring malformed or empty LABEL command.");
         return PARSER_RC_OK;
     }
-    if (!words[4])
-        store = words[3];
-    else {
-        store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char));
+
+    char *store = (char *)value;
+    bool allocated_store = false;
+
+    if(unlikely(num_words > 4)) {
+        allocated_store = true;
+        store = mallocz(PLUGINSD_LINE_MAX + 1);
         size_t remaining = PLUGINSD_LINE_MAX;
         char *move = store;
-        int i = 3;
-        while (i < PLUGINSD_MAX_WORDS) {
-            size_t length = strlen(words[i]);
-            if ((length + 1) >= remaining)
-                break;
-
-            remaining -= (length + 1);
-            memcpy(move, words[i], length);
+        char *word;
+        for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
+            if(i > 3) {
+                *move++ = ' ';
+                *move = '\0';
+                remaining--;
+            }
+
+            size_t length = strlen(word);
+            if (length > remaining)
+                length = remaining;
+
+            remaining -= length;
+            memcpy(move, word, length);
             move += length;
-            *move++ = ' ';
-
-            i++;
-            if (!words[i])
-                break;
+            *move = '\0';
         }
     }
 
     if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
         ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
 
-    rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, words[1], store, strtol(words[2], NULL, 10));
+    rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
+                  name,
+                  store,
+                  str2l(label_source));
 
-    if (store != words[3])
+    if (allocated_store)
         freez(store);
+
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    UNUSED(words);
-
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
     debug(D_PLUGINSD, "requested to OVERWRITE host labels");
 
@@ -719,9 +777,13 @@ PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION  *plugins
 }
 
 
-PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    if (!words[1] || !words[2] || !words[3]) {
+    const char *name = get_word(words, num_words, 1);
+    const char *value = get_word(words, num_words, 2);
+    const char *label_source = get_word(words, num_words, 3);
+
+    if (!name || !value || !*label_source) {
         error("Ignoring malformed or empty CHART LABEL command.");
         return PARSER_RC_OK;
     }
@@ -731,15 +793,14 @@ PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION  *plugins_ac
         rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
     }
 
-    rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, words[1], words[2], strtol(words[3], NULL, 10));
+    rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
+                  name, value, str2l(label_source));
 
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
+PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION  *plugins_action __maybe_unused)
 {
-    UNUSED(words);
-
     RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
     RRDSET *st = ((PARSER_USER_OBJECT *)user)->st;
 
@@ -762,9 +823,9 @@ PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION  *plu
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_guid(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
 {
-    char *uuid_str = words[1];
+    char *uuid_str = get_word(words, num_words, 1);
     uuid_t uuid;
 
     if (unlikely(!uuid_str)) {
@@ -784,9 +845,9 @@ PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_actio
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_context(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
 {
-    char *uuid_str = words[1];
+    char *uuid_str = get_word(words, num_words, 1);
     uuid_t uuid;
 
     if (unlikely(!uuid_str)) {
@@ -806,9 +867,9 @@ PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_ac
     return PARSER_RC_OK;
 }
 
-PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_tombstone(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
 {
-    char *uuid_str = words[1];
+    char *uuid_str = get_word(words, num_words, 1);
     uuid_t uuid;
 
     if (unlikely(!uuid_str)) {
@@ -828,15 +889,15 @@ PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_
     return PARSER_RC_OK;
 }
 
-PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION  *plugins_action)
+PARSER_RC metalog_pluginsd_host(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action)
 {
-    char *machine_guid = words[1];
-    char *hostname = words[2];
-    char *registry_hostname = words[3];
-    char *update_every_s = words[4];
-    char *os = words[5];
-    char *timezone = words[6];
-    char *tags = words[7];
+    char *machine_guid = get_word(words, num_words, 1);
+    char *hostname = get_word(words, num_words, 2);
+    char *registry_hostname = get_word(words, num_words, 3);
+    char *update_every_s = get_word(words, num_words, 4);
+    char *os = get_word(words, num_words, 5);
+    char *timezone = get_word(words, num_words, 6);
+    char *tags = get_word(words, num_words, 7);
 
     int update_every = 1;
     if (likely(update_every_s && *update_every_s))
@@ -855,6 +916,296 @@ PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION  *plug
     return PARSER_RC_OK;
 }
 
+PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+    char *id = get_word(words, num_words, 1);
+    char *start_time_str = get_word(words, num_words, 2);
+    char *end_time_str = get_word(words, num_words, 3);
+
+    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+    RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+    if (unlikely(!id || (!st && !*id))) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
+        goto disable;
+    }
+
+    if(*id) {
+        st = rrdset_find(host, id);
+        if (unlikely(!st)) {
+            error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
+                  id, rrdhost_hostname(host));
+            goto disable;
+        }
+
+        ((PARSER_USER_OBJECT *) user)->st = st;
+        ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+        ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+        ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+        ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+    }
+
+    if(start_time_str && end_time_str) {
+        time_t start_time = strtol(start_time_str, NULL, 0);
+        time_t end_time = strtol(end_time_str, NULL, 0);
+
+        if(start_time && end_time) {
+            if (start_time > end_time) {
+                error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
+                      rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time);
+                goto disable;
+            }
+
+            if (end_time - start_time != st->update_every)
+                rrdset_set_update_every(st, end_time - start_time);
+
+            st->last_collected_time.tv_sec = end_time;
+            st->last_collected_time.tv_usec = 0;
+
+            st->last_updated.tv_sec = end_time;
+            st->last_updated.tv_usec = 0;
+
+            ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
+            ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
+            ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+            ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+
+            st->counter++;
+            st->counter_done++;
+
+            // these are only needed for db mode RAM, SAVE, MAP, ALLOC
+            st->current_entry++;
+            if(st->current_entry >= st->entries)
+                st->current_entry -= st->entries;
+        }
+    }
+
+    return PARSER_RC_OK;
+
+disable:
+    ((PARSER_USER_OBJECT *)user)->enabled = 0;
+    return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+    char *dimension = get_word(words, num_words, 1);
+    char *value_str = get_word(words, num_words, 2);
+    char *flags_str = get_word(words, num_words, 3);
+
+    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+    if (unlikely(!st)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+              dimension, rrdhost_hostname(host));
+        goto disable;
+    }
+
+    if (unlikely(!dimension || !*dimension)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
+              rrdset_id(st), rrdhost_hostname(host));
+        goto disable;
+    }
+
+    if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without timings from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+              dimension, rrdhost_hostname(host));
+        goto disable;
+    }
+
+    if (unlikely(!value_str || !*value_str))
+        value_str = "nan";
+
+    if(unlikely(!flags_str))
+        flags_str = "";
+
+    if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+        debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str);
+
+    if (likely(value_str)) {
+        RRDDIM *rd = rrddim_find(st, dimension);
+        if(unlikely(!rd)) {
+            error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
+                  dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
+            goto disable;
+        }
+        else {
+            NETDATA_DOUBLE value = strtondd(value_str, NULL);
+            SN_FLAGS flags = SN_FLAG_NONE;
+
+            char c;
+            while((c = *flags_str++)) {
+                switch(c) {
+                    case 'R':
+                        flags |= SN_FLAG_RESET;
+                        break;
+
+                    case 'E':
+                        flags |= SN_EMPTY_SLOT;
+                        value = NAN;
+                        break;
+
+                    default:
+                        error("unknown flag '%c'", c);
+                        break;
+                }
+            }
+
+            if(!netdata_double_isnumber(value)) {
+                value = NAN;
+                flags = SN_EMPTY_SLOT;
+            }
+
+            rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags);
+            rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+            rd->last_collected_time.tv_usec = 0;
+            rd->collections_counter++;
+        }
+    }
+    return PARSER_RC_OK;
+
+disable:
+    ((PARSER_USER_OBJECT *) user)->enabled = 0;
+    return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+    char *dimension = get_word(words, num_words, 1);
+    char *last_collected_ut_str = get_word(words, num_words, 2);
+    char *last_collected_value_str = get_word(words, num_words, 3);
+    char *last_calculated_value_str = get_word(words, num_words, 4);
+    char *last_stored_value_str = get_word(words, num_words, 5);
+
+    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+    if (unlikely(!st)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+              dimension, rrdhost_hostname(host));
+        goto disable;
+    }
+
+    if (unlikely(!dimension || !*dimension)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
+              rrdset_id(st), rrdhost_hostname(host));
+        goto disable;
+    }
+
+    RRDDIM *rd = rrddim_find(st, dimension);
+    if(unlikely(!rd)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
+              dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
+        goto disable;
+    }
+
+    usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
+    usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+    if(last_collected_ut > dim_last_collected_ut) {
+        rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
+        rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+    }
+
+    rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
+    rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
+    rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
+    return PARSER_RC_OK;
+
+disable:
+    ((PARSER_USER_OBJECT *) user)->enabled = 0;
+    return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+    char *last_collected_ut_str = get_word(words, num_words, 1);
+    char *last_updated_ut_str = get_word(words, num_words, 2);
+    char *last_collected_total_str = get_word(words, num_words, 3);
+    char *collected_total_str = get_word(words, num_words, 4);
+
+    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+    if (unlikely(!st)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+              rrdhost_hostname(host));
+        goto disable;
+    }
+
+    usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
+    usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+    if(last_collected_ut > chart_last_collected_ut) {
+        st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
+        st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+    }
+
+    usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
+    usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
+    if(last_updated_ut > chart_last_updated_ut) {
+        st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC;
+        st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC;
+    }
+
+    st->last_collected_total = last_collected_total_str ? strtoll(last_collected_total_str, NULL, 0) : 0;
+    st->collected_total = collected_total_str ? strtoll(collected_total_str, NULL, 0) : 0;
+
+    st->counter++;
+    st->counter_done++;
+
+    return PARSER_RC_OK;
+
+disable:
+    ((PARSER_USER_OBJECT *) user)->enabled = 0;
+    return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+    if (num_words < 7) {
+        error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
+        return PARSER_RC_ERROR;
+    }
+
+    time_t update_every_child = str2l(get_word(words, num_words, 1));
+    time_t first_entry_child = str2l(get_word(words, num_words, 2));
+    time_t last_entry_child = str2l(get_word(words, num_words, 3));
+
+    bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0);
+    time_t first_entry_requested = str2l(get_word(words, num_words, 5));
+    time_t last_entry_requested = str2l(get_word(words, num_words, 6));
+
+    PARSER_USER_OBJECT *user_object = user;
+
+    RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+    RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+
+    if (unlikely(!st)) {
+        error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+              rrdhost_hostname(host));
+        return PARSER_RC_ERROR;
+    }
+
+    ((PARSER_USER_OBJECT *) user)->st = NULL;
+    ((PARSER_USER_OBJECT *) user)->count++;
+
+    st->counter++;
+    st->counter_done++;
+
+    if (start_streaming) {
+        if (st->update_every != update_every_child)
+            rrdset_set_update_every(st, update_every_child);
+
+        rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+        rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+        return PARSER_RC_OK;
+    }
+
+    bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child,
+                                      first_entry_requested, last_entry_requested);
+    return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
 static void pluginsd_process_thread_cleanup(void *ptr) {
     PARSER *parser = (PARSER *)ptr;
     rrd_collector_finished();
@@ -895,7 +1246,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
     };
 
     // fp_plugin_output = our input; fp_plugin_input = our output
-    PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT);
+    PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT, NULL);
 
     rrd_collector_started();
 

+ 11 - 5
collectors/plugins.d/pluginsd_parser.h

@@ -5,7 +5,6 @@
 
 #include "parser/parser.h"
 
-
 typedef struct parser_user_object {
     PARSER  *parser;
     RRDSET *st;
@@ -20,10 +19,17 @@ typedef struct parser_user_object {
     uint8_t st_exists;
     uint8_t host_exists;
     void *private; // the user can set this for private use
+
+    struct {
+        time_t start_time;
+        time_t end_time;
+
+        usec_t start_time_ut;
+        usec_t end_time_ut;
+    } replay;
 } PARSER_USER_OBJECT;
 
-PARSER_RC pluginsd_function(char **words, void *user, PLUGINSD_ACTION  *plugins_action);
-PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTION  *plugins_action);
+PARSER_RC pluginsd_function(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action);
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION  *plugins_action);
 void inflight_functions_init(PARSER *parser);
-
-#endif //NETDATA_PLUGINSD_PARSER_H
+#endif //NETDATA_PLUGINSD_PARSER_H

+ 9 - 9
collectors/statsd.plugin/statsd.c

@@ -1469,23 +1469,23 @@ static int statsd_readfile(const char *filename, STATSD_APP *app, STATSD_APP_CHA
             }
             else if (!strcmp(name, "dimension")) {
                 // metric [name [type [multiplier [divisor]]]]
-                char *words[10];
-                pluginsd_split_words(value, words, 10, NULL, NULL, 0);
+                char *words[10] = { NULL };
+                size_t num_words = pluginsd_split_words(value, words, 10, NULL, NULL, 0);
 
                 int pattern = 0;
                 size_t i = 0;
-                char *metric_name   = words[i++];
+                char *metric_name   = get_word(words, num_words, i++);
 
                 if(strcmp(metric_name, "pattern") == 0) {
-                    metric_name = words[i++];
+                    metric_name = get_word(words, num_words, i++);
                     pattern = 1;
                 }
 
-                char *dim_name   = words[i++];
-                char *type       = words[i++];
-                char *multiplier = words[i++];
-                char *divisor    = words[i++];
-                char *opts       = words[i++];
+                char *dim_name   = get_word(words, num_words, i++);
+                char *type       = get_word(words, num_words, i++);
+                char *multiplier = get_word(words, num_words, i++);
+                char *divisor    = get_word(words, num_words, i++);
+                char *opts       = get_word(words, num_words, i++);
 
                 RRDDIM_FLAGS flags = RRDDIM_FLAG_NONE;
                 RRDDIM_OPTIONS options = RRDDIM_OPTION_NONE;

+ 0 - 2
collectors/tc.plugin/plugin_tc.c

@@ -805,8 +805,6 @@ static inline struct tc_class *tc_class_add(struct tc_device *n, char *id, bool
 //    tc_device_index_del(d);
 //}
 
-#define PLUGINSD_MAX_WORDS 20
-
 static inline int tc_space(char c) {
     switch(c) {
     case ' ':

+ 6 - 4
daemon/static_threads_macos.c

@@ -12,18 +12,20 @@ const struct netdata_static_thread static_threads_macos[] = {
         .enabled = 1,
         .thread = NULL,
         .init_routine = NULL,
-        .start_routine = macos_main
+        .start_routine = macos_main,
+        .env_name = NULL,
+        .global_variable = NULL,
     },
 
-    {NULL, NULL, NULL, 0, NULL, NULL, NULL}
+    {NULL, NULL, NULL, 0, NULL, NULL, NULL, NULL, NULL}
 };
 
 const struct netdata_static_thread static_threads_freebsd[] = {
-    {NULL, NULL, NULL, 0, NULL, NULL, NULL}
+    {NULL, NULL, NULL, 0, NULL, NULL, NULL, NULL, NULL}
 };
 
 const struct netdata_static_thread static_threads_linux[] = {
-    {NULL, NULL, NULL, 0, NULL, NULL, NULL}
+    {NULL, NULL, NULL, 0, NULL, NULL, NULL, NULL, NULL}
 };
 
 struct netdata_static_thread *static_threads_get() {

Some files were not shown because too many files changed in this diff