123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "pluginsd_internals.h"
- static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
- char *dimension = get_word(words, num_words, idx++);
- char *value = get_word(words, num_words, idx++);
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- st->pluginsd.set = true;
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
- rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
- if (value && *value)
- rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
- char *id = get_word(words, num_words, idx++);
- char *microseconds_txt = get_word(words, num_words, idx++);
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- usec_t microseconds = 0;
- if (microseconds_txt && *microseconds_txt) {
- long long t = str2ll(microseconds_txt, NULL);
- if(t >= 0)
- microseconds = t;
- }
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- if(st->replay.log_next_data_collection) {
- st->replay.log_next_data_collection = false;
- internal_error(true,
- "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
- rrdhost_hostname(host), rrdset_id(st),
- st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
- st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
- microseconds
- );
- }
- #endif
- if (likely(st->counter_done)) {
- if (likely(microseconds)) {
- if (parser->user.trust_durations)
- rrdset_next_usec_unfiltered(st, microseconds);
- else
- rrdset_next_usec(st, microseconds);
- }
- else
- rrdset_next(st);
- }
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
- char *tv_sec = get_word(words, num_words, 1);
- char *tv_usec = get_word(words, num_words, 2);
- char *pending_rrdset_next = get_word(words, num_words, 3);
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
- parser->user.data_collections_count++;
- struct timeval tv = {
- .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
- .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
- };
-
- if(!tv.tv_sec)
- now_realtime_timeval(&tv);
- rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
- return PARSER_RC_OK;
- }
- static void pluginsd_host_define_cleanup(PARSER *parser) {
- string_freez(parser->user.host_define.hostname);
- rrdlabels_destroy(parser->user.host_define.rrdlabels);
- parser->user.host_define.hostname = NULL;
- parser->user.host_define.rrdlabels = NULL;
- parser->user.host_define.parsing_host = false;
- }
- static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
- if(uuid_parse(guid, *uuid))
- return false;
- uuid_unparse_lower(*uuid, output);
- return true;
- }
- static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
- char *guid = get_word(words, num_words, 1);
- char *hostname = get_word(words, num_words, 2);
- if(unlikely(!guid || !*guid || !hostname || !*hostname))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
- if(unlikely(parser->user.host_define.parsing_host))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
- "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
- if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
- parser->user.host_define.hostname = string_strdupz(hostname);
- parser->user.host_define.rrdlabels = rrdlabels_create();
- parser->user.host_define.parsing_host = true;
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
- char *name = get_word(words, num_words, 1);
- char *value = get_word(words, num_words, 2);
- if(!name || !*name || !value)
- return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
- if(!parser->user.host_define.parsing_host || !labels)
- return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
- rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
- return pluginsd_host_dictionary(words, num_words, parser,
- parser->user.host_define.rrdlabels,
- PLUGINSD_KEYWORD_HOST_LABEL);
- }
- static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- if(!parser->user.host_define.parsing_host)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
- RRDHOST *host = rrdhost_find_or_create(
- string2str(parser->user.host_define.hostname),
- string2str(parser->user.host_define.hostname),
- parser->user.host_define.machine_guid_str,
- "Netdata Virtual Host 1.0",
- netdata_configured_timezone,
- netdata_configured_abbrev_timezone,
- netdata_configured_utc_offset,
- NULL,
- program_name,
- program_version,
- default_rrd_update_every,
- default_rrd_history_entries,
- default_rrd_memory_mode,
- default_health_enabled,
- default_rrdpush_enabled,
- default_rrdpush_destination,
- default_rrdpush_api_key,
- default_rrdpush_send_charts_matching,
- default_rrdpush_enable_replication,
- default_rrdpush_seconds_to_replicate,
- default_rrdpush_replication_step,
- rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
- false);
- rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
- dyncfg_host_init(host);
- if(host->rrdlabels) {
- rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
- }
- else {
- host->rrdlabels = parser->user.host_define.rrdlabels;
- parser->user.host_define.rrdlabels = NULL;
- }
- pluginsd_host_define_cleanup(parser);
- parser->user.host = host;
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
- rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
- rrdcontext_host_child_connected(host);
- schedule_node_info_update(host);
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
- char *guid = get_word(words, num_words, 1);
- if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
- parser->user.host = localhost;
- return PARSER_RC_OK;
- }
- uuid_t uuid;
- char uuid_str[UUID_STR_LEN];
- if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
- RRDHOST *host = rrdhost_find_by_guid(uuid_str);
- if(unlikely(!host))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
- parser->user.host = host;
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
- char *type = get_word(words, num_words, idx++);
- char *name = get_word(words, num_words, idx++);
- char *title = get_word(words, num_words, idx++);
- char *units = get_word(words, num_words, idx++);
- char *family = get_word(words, num_words, idx++);
- char *context = get_word(words, num_words, idx++);
- char *chart = get_word(words, num_words, idx++);
- char *priority_s = get_word(words, num_words, idx++);
- char *update_every_s = get_word(words, num_words, idx++);
- char *options = get_word(words, num_words, idx++);
- char *plugin = get_word(words, num_words, idx++);
- char *module = get_word(words, num_words, idx++);
- // parse the id from type
- char *id = NULL;
- if (likely(type && (id = strchr(type, '.')))) {
- *id = '\0';
- id++;
- }
- // make sure we have the required variables
- if (unlikely((!type || !*type || !id || !*id)))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
- // parse the name, and make sure it does not include 'type.'
- if (unlikely(name && *name)) {
- // when data are streamed from child nodes
- // name will be type.name
- // so, we have to remove 'type.' from name too
- size_t len = strlen(type);
- if (strncmp(type, name, len) == 0 && name[len] == '.')
- name = &name[len + 1];
- // if the name is the same with the id,
- // or is just 'NULL', clear it.
- if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
- name = NULL;
- }
- int priority = 1000;
- if (likely(priority_s && *priority_s))
- priority = str2i(priority_s);
- int update_every = parser->user.cd->update_every;
- if (likely(update_every_s && *update_every_s))
- update_every = str2i(update_every_s);
- if (unlikely(!update_every))
- update_every = parser->user.cd->update_every;
- RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
- if (unlikely(chart))
- chart_type = rrdset_type_id(chart);
- if (unlikely(name && !*name))
- name = NULL;
- if (unlikely(family && !*family))
- family = NULL;
- if (unlikely(context && !*context))
- context = NULL;
- if (unlikely(!title))
- title = "";
- if (unlikely(!units))
- units = "unknown";
- netdata_log_debug(
- D_PLUGINSD,
- "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
- type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
- priority, update_every);
- RRDSET *st = NULL;
- st = rrdset_create(
- host, type, id, name, family, context, title, units,
- (plugin && *plugin) ? plugin : parser->user.cd->filename,
- module, priority, update_every,
- chart_type);
- bool obsolete = false;
- if (likely(st)) {
- if (options && *options) {
- if (strstr(options, "obsolete")) {
- rrdset_is_obsolete___safe_from_collector_thread(st);
- obsolete = true;
- }
- else
- rrdset_isnot_obsolete___safe_from_collector_thread(st);
- if (strstr(options, "detail"))
- rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
- if (strstr(options, "hidden"))
- rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
- if (strstr(options, "store_first"))
- rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
- }
- else {
- rrdset_isnot_obsolete___safe_from_collector_thread(st);
- rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
- rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
- }
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
- }
- else
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
- const char *first_entry_txt = get_word(words, num_words, 1);
- const char *last_entry_txt = get_word(words, num_words, 2);
- const char *wall_clock_time_txt = get_word(words, num_words, 3);
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
- time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
- time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
- bool ok = true;
- if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- st->replay.start_streaming = false;
- st->replay.after = 0;
- st->replay.before = 0;
- #endif
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
- ok = replicate_chart_request(send_to_plugin, parser, host, st,
- first_entry_child, last_entry_child, child_wall_clock_time,
- 0, 0);
- }
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- else {
- internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- }
- #endif
- return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
- }
- static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
- char *id = get_word(words, num_words, idx++);
- char *name = get_word(words, num_words, idx++);
- char *algorithm = get_word(words, num_words, idx++);
- char *multiplier_s = get_word(words, num_words, idx++);
- char *divisor_s = get_word(words, num_words, idx++);
- char *options = get_word(words, num_words, idx++);
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if (unlikely(!id))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
- long multiplier = 1;
- if (multiplier_s && *multiplier_s) {
- multiplier = str2ll_encoded(multiplier_s);
- if (unlikely(!multiplier))
- multiplier = 1;
- }
- long divisor = 1;
- if (likely(divisor_s && *divisor_s)) {
- divisor = str2ll_encoded(divisor_s);
- if (unlikely(!divisor))
- divisor = 1;
- }
- if (unlikely(!algorithm || !*algorithm))
- algorithm = "absolute";
- if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- netdata_log_debug(
- D_PLUGINSD,
- "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
- rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
- options ? options : "");
- RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
- int unhide_dimension = 1;
- rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- bool obsolete = false;
- if (options && *options) {
- if (strstr(options, "obsolete") != NULL) {
- obsolete = true;
- rrddim_is_obsolete___safe_from_collector_thread(st, rd);
- }
- else
- rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
- unhide_dimension = !strstr(options, "hidden");
- if (strstr(options, "noreset") != NULL)
- rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- if (strstr(options, "nooverflow") != NULL)
- rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- }
- else
- rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
- bool should_update_dimension = false;
- if (likely(unhide_dimension)) {
- rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
- should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
- }
- else {
- rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
- should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
- }
- if (should_update_dimension) {
- rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
- rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- }
- pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
- return PARSER_RC_OK;
- }
- // ----------------------------------------------------------------------------
- static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
- char *name = get_word(words, num_words, 1);
- char *value = get_word(words, num_words, 2);
- NETDATA_DOUBLE v;
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_get_scope_chart(parser);
- int global = (st) ? 0 : 1;
- if (name && *name) {
- if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
- global = 1;
- name = get_word(words, num_words, 2);
- value = get_word(words, num_words, 3);
- } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
- global = 0;
- name = get_word(words, num_words, 2);
- value = get_word(words, num_words, 3);
- }
- }
- if (unlikely(!name || !*name))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
- if (unlikely(!value || !*value))
- value = NULL;
- if (unlikely(!value)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- (global) ? "HOST" : "CHART",
- name);
- return PARSER_RC_OK;
- }
- if (!global && !st)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
- char *endptr = NULL;
- v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
- if (unlikely(endptr && *endptr)) {
- if (endptr == value)
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name);
- else
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name,
- endptr);
- }
- if (global) {
- const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
- if (rva) {
- rrdvar_custom_host_variable_set(host, rva, v);
- rrdvar_custom_host_variable_release(host, rva);
- }
- else
- netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
- rrdhost_hostname(host),
- name);
- } else {
- const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
- if (rsa) {
- rrdsetvar_custom_chart_variable_set(st, rsa, v);
- rrdsetvar_custom_chart_variable_release(st, rsa);
- }
- else
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
- rrdhost_hostname(host), rrdset_id(st), name);
- }
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
- parser->user.replay.start_time = 0;
- parser->user.replay.end_time = 0;
- parser->user.replay.start_time_ut = 0;
- parser->user.replay.end_time_ut = 0;
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
- parser->user.enabled = 0;
- return PARSER_RC_STOP;
- }
- static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
- const char *name = get_word(words, num_words, 1);
- const char *label_source = get_word(words, num_words, 2);
- const char *value = get_word(words, num_words, 3);
- if (!name || !label_source || !value)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
- char *store = (char *)value;
- bool allocated_store = false;
- if(unlikely(num_words > 4)) {
- allocated_store = true;
- store = mallocz(PLUGINSD_LINE_MAX + 1);
- size_t remaining = PLUGINSD_LINE_MAX;
- char *move = store;
- char *word;
- for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
- if(i > 3) {
- *move++ = ' ';
- *move = '\0';
- remaining--;
- }
- size_t length = strlen(word);
- if (length > remaining)
- length = remaining;
- remaining -= length;
- memcpy(move, word, length);
- move += length;
- *move = '\0';
- }
- }
- if(unlikely(!(parser->user.new_host_labels)))
- parser->user.new_host_labels = rrdlabels_create();
- if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
- int is_ephemeral = appconfig_test_boolean_value((char *) value);
- if (is_ephemeral) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
- if (likely(host))
- rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
- }
- }
- rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
- if (allocated_store)
- freez(store);
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
- if(unlikely(!host->rrdlabels))
- host->rrdlabels = rrdlabels_create();
- rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
- if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
- rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
- rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
- rrdlabels_destroy(parser->user.new_host_labels);
- parser->user.new_host_labels = NULL;
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
- const char *name = get_word(words, num_words, 1);
- const char *value = get_word(words, num_words, 2);
- const char *label_source = get_word(words, num_words, 3);
- if (!name || !value || !label_source) {
- netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- }
- if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
- RRDSET *st = pluginsd_get_scope_chart(parser);
- parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
- rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
- }
- rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
- if(!parser->user.chart_rrdlabels_linked_temporarily) {
- netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- }
- rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
- rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
- rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- rrdset_metadata_updated(st);
- parser->user.chart_rrdlabels_linked_temporarily = NULL;
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
- timing_init();
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
- char *id = get_word(words, num_words, idx++);
- char *update_every_str = get_word(words, num_words, idx++);
- char *end_time_str = get_word(words, num_words, idx++);
- char *wall_clock_time_str = get_word(words, num_words, idx++);
- if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- timing_step(TIMING_STEP_BEGIN2_PREPARE);
- RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
- rrdset_isnot_obsolete___safe_from_collector_thread(st);
- timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
- // ------------------------------------------------------------------------
- // parse the parameters
- time_t update_every = (time_t) str2ull_encoded(update_every_str);
- time_t end_time = (time_t) str2ull_encoded(end_time_str);
- time_t wall_clock_time;
- if(likely(*wall_clock_time_str == '#'))
- wall_clock_time = end_time;
- else
- wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
- if (unlikely(update_every != st->update_every))
- rrdset_set_update_every_s(st, update_every);
- timing_step(TIMING_STEP_BEGIN2_PARSE);
- // ------------------------------------------------------------------------
- // prepare our state
- pluginsd_lock_rrdset_data_collection(parser);
- parser->user.v2.update_every = update_every;
- parser->user.v2.end_time = end_time;
- parser->user.v2.wall_clock_time = wall_clock_time;
- parser->user.v2.ml_locked = ml_chart_update_begin(st);
- timing_step(TIMING_STEP_BEGIN2_ML);
- // ------------------------------------------------------------------------
- // propagate it forward in v2
- if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
- parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
- if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
- // check receiver capabilities
- bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
- // check sender capabilities
- bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
- NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- BUFFER *wb = parser->user.v2.stream_buffer.wb;
- buffer_need_bytes(wb, 1024);
- if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
- if(with_slots) {
- buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
- buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
- }
- buffer_fast_strcat(wb, " '", 2);
- buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
- buffer_fast_strcat(wb, "' ", 2);
- if(can_copy)
- buffer_strcat(wb, update_every_str);
- else
- buffer_print_uint64_encoded(wb, integer_encoding, update_every);
- buffer_fast_strcat(wb, " ", 1);
- if(can_copy)
- buffer_strcat(wb, end_time_str);
- else
- buffer_print_uint64_encoded(wb, integer_encoding, end_time);
- buffer_fast_strcat(wb, " ", 1);
- if(can_copy)
- buffer_strcat(wb, wall_clock_time_str);
- else
- buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
- buffer_fast_strcat(wb, "\n", 1);
- parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
- parser->user.v2.stream_buffer.begin_v2_added = true;
- }
- timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
- // ------------------------------------------------------------------------
- // store it
- st->last_collected_time.tv_sec = end_time;
- st->last_collected_time.tv_usec = 0;
- st->last_updated.tv_sec = end_time;
- st->last_updated.tv_usec = 0;
- st->counter++;
- st->counter_done++;
- // these are only needed for db mode RAM, ALLOC
- st->db.current_entry++;
- if(st->db.current_entry >= st->db.entries)
- st->db.current_entry -= st->db.entries;
- timing_step(TIMING_STEP_BEGIN2_STORE);
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
- timing_init();
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
- char *dimension = get_word(words, num_words, idx++);
- char *collected_str = get_word(words, num_words, idx++);
- char *value_str = get_word(words, num_words, idx++);
- char *flags_str = get_word(words, num_words, idx++);
- if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- timing_step(TIMING_STEP_SET2_PREPARE);
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- st->pluginsd.set = true;
- if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
- rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
- timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
- // ------------------------------------------------------------------------
- // parse the parameters
- collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
- NETDATA_DOUBLE value;
- if(*value_str == '#')
- value = (NETDATA_DOUBLE)collected_value;
- else
- value = str2ndd_encoded(value_str, NULL);
- SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
- timing_step(TIMING_STEP_SET2_PARSE);
- // ------------------------------------------------------------------------
- // check value and ML
- if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
- value = NAN;
- flags = SN_EMPTY_SLOT;
- if(parser->user.v2.ml_locked)
- ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
- }
- else if(parser->user.v2.ml_locked) {
- if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
- // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
- flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
- }
- else
- flags |= SN_FLAG_NOT_ANOMALOUS;
- }
- timing_step(TIMING_STEP_SET2_ML);
- // ------------------------------------------------------------------------
- // propagate it forward in v2
- if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
- // check if receiver and sender have the same number parsing capabilities
- bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
- // check the sender capabilities
- bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
- NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
- BUFFER *wb = parser->user.v2.stream_buffer.wb;
- buffer_need_bytes(wb, 1024);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
- if(with_slots) {
- buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
- buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
- }
- buffer_fast_strcat(wb, " '", 2);
- buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
- buffer_fast_strcat(wb, "' ", 2);
- if(can_copy)
- buffer_strcat(wb, collected_str);
- else
- buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
- buffer_fast_strcat(wb, " ", 1);
- if(can_copy)
- buffer_strcat(wb, value_str);
- else
- buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
- buffer_fast_strcat(wb, " ", 1);
- buffer_print_sn_flags(wb, flags, true);
- buffer_fast_strcat(wb, "\n", 1);
- }
- timing_step(TIMING_STEP_SET2_PROPAGATE);
- // ------------------------------------------------------------------------
- // store it
- rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
- rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
- rd->collector.last_collected_time.tv_usec = 0;
- rd->collector.last_collected_value = collected_value;
- rd->collector.last_stored_value = value;
- rd->collector.last_calculated_value = value;
- rd->collector.counter++;
- rrddim_set_updated(rd);
- timing_step(TIMING_STEP_SET2_STORE);
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- timing_init();
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- parser->user.data_collections_count++;
- timing_step(TIMING_STEP_END2_PREPARE);
- // ------------------------------------------------------------------------
- // propagate the whole chart update in v1
- if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
- rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
- timing_step(TIMING_STEP_END2_PUSH_V1);
- // ------------------------------------------------------------------------
- // unblock data collection
- pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
- rrdcontext_collected_rrdset(st);
- store_metric_collection_completed();
- timing_step(TIMING_STEP_END2_RRDSET);
- // ------------------------------------------------------------------------
- // propagate it forward
- rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
- timing_step(TIMING_STEP_END2_PROPAGATE);
- // ------------------------------------------------------------------------
- // cleanup RRDSET / RRDDIM
- if(likely(st->pluginsd.dims_with_slots)) {
- for(size_t i = 0; i < st->pluginsd.size ;i++) {
- RRDDIM *rd = st->pluginsd.prd_array[i].rd;
- if(!rd)
- continue;
- rd->collector.calculated_value = 0;
- rd->collector.collected_value = 0;
- rrddim_clear_updated(rd);
- }
- }
- else {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st){
- rd->collector.calculated_value = 0;
- rd->collector.collected_value = 0;
- rrddim_clear_updated(rd);
- }
- rrddim_foreach_done(rd);
- }
- // ------------------------------------------------------------------------
- // reset state
- parser->user.v2 = (struct parser_user_object_v2){ 0 };
- timing_step(TIMING_STEP_END2_STORE);
- timing_report();
- return PARSER_RC_OK;
- }
- static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
- netdata_log_info("PLUGINSD: plugin called EXIT.");
- return PARSER_RC_STOP;
- }
- static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
- {
- const char *host_uuid_str = get_word(words, num_words, 1);
- const char *claim_id_str = get_word(words, num_words, 2);
- if (!host_uuid_str || !claim_id_str) {
- netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
- host_uuid_str ? host_uuid_str : "[unset]",
- claim_id_str ? claim_id_str : "[unset]");
- return PARSER_RC_ERROR;
- }
- uuid_t uuid;
- RRDHOST *host = parser->user.host;
- // We don't need the parsed UUID
- // just do it to check the format
- if(uuid_parse(host_uuid_str, uuid)) {
- netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
- return PARSER_RC_ERROR;
- }
- if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
- netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
- return PARSER_RC_ERROR;
- }
- if(strcmp(host_uuid_str, host->machine_guid) != 0) {
- netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
- return PARSER_RC_OK; //the message is OK problem must be somewhere else
- }
- rrdhost_aclk_state_lock(host);
- if (host->aclk_state.claimed_id)
- freez(host->aclk_state.claimed_id);
- host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
- rrdhost_aclk_state_unlock(host);
- rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
- rrdpush_send_claimed_id(host);
- return PARSER_RC_OK;
- }
- // ----------------------------------------------------------------------------
- void pluginsd_cleanup_v2(PARSER *parser) {
- // this is called when the thread is stopped while processing
- pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
- }
- void pluginsd_process_thread_cleanup(void *ptr) {
- PARSER *parser = (PARSER *)ptr;
- pluginsd_cleanup_v2(parser);
- pluginsd_host_define_cleanup(parser);
- rrd_collector_finished();
- #ifdef NETDATA_LOG_STREAM_RECEIVE
- if(parser->user.stream_log_fp) {
- fclose(parser->user.stream_log_fp);
- parser->user.stream_log_fp = NULL;
- }
- #endif
- parser_destroy(parser);
- }
- bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
- PARSER *parser = ptr;
- if(!parser || !parser->user.host)
- return false;
- buffer_strcat(wb, rrdhost_hostname(parser->user.host));
- return true;
- }
- bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
- PARSER *parser = ptr;
- if(!parser || !parser->user.st)
- return false;
- buffer_strcat(wb, rrdset_name(parser->user.st));
- return true;
- }
- bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
- PARSER *parser = ptr;
- if(!parser || !parser->user.st)
- return false;
- buffer_strcat(wb, string2str(parser->user.st->context));
- return true;
- }
- inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
- {
- int enabled = cd->unsafe.enabled;
- if (!fp_plugin_input || !fp_plugin_output || !enabled) {
- cd->unsafe.enabled = 0;
- return 0;
- }
- if (unlikely(fileno(fp_plugin_input) == -1)) {
- netdata_log_error("input file descriptor given is not a valid stream");
- cd->serial_failures++;
- return 0;
- }
- if (unlikely(fileno(fp_plugin_output) == -1)) {
- netdata_log_error("output file descriptor given is not a valid stream");
- cd->serial_failures++;
- return 0;
- }
- clearerr(fp_plugin_input);
- clearerr(fp_plugin_output);
- PARSER *parser;
- {
- PARSER_USER_OBJECT user = {
- .enabled = cd->unsafe.enabled,
- .host = host,
- .cd = cd,
- .trust_durations = trust_durations
- };
- // fp_plugin_output = our input; fp_plugin_input = our output
- parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
- }
- pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
- rrd_collector_started();
- size_t count = 0;
- // this keeps the parser with its current value
- // so, parser needs to be allocated before pushing it
- netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser)
- {
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
- ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
- buffered_reader_init(&parser->reader);
- CLEAN_BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
- while(likely(service_running(SERVICE_COLLECTORS))) {
- if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
- buffered_reader_ret_t ret = buffered_reader_read_timeout(
- &parser->reader,
- fileno((FILE *) parser->fp_input),
- 2 * 60 * MSEC_PER_SEC, true
- );
- if(unlikely(ret != BUFFERED_READER_READ_OK))
- break;
- continue;
- }
- if(unlikely(parser_action(parser, buffer->buffer)))
- break;
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- cd->unsafe.enabled = parser->user.enabled;
- count = parser->user.data_collections_count;
- if(likely(count)) {
- cd->successful_collections += count;
- cd->serial_failures = 0;
- }
- else
- cd->serial_failures++;
- }
- netdata_thread_cleanup_pop(1); // free parser with the pop function
- return count;
- }
- #include "gperf-hashtable.h"
- PARSER_RC parser_execute(PARSER *parser, const PARSER_KEYWORD *keyword, char **words, size_t num_words) {
- switch(keyword->id) {
- case PLUGINSD_KEYWORD_ID_SET2:
- return pluginsd_set_v2(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_BEGIN2:
- return pluginsd_begin_v2(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_END2:
- return pluginsd_end_v2(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_SET:
- return pluginsd_set(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_BEGIN:
- return pluginsd_begin(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_END:
- return pluginsd_end(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_RSET:
- return pluginsd_replay_set(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_RBEGIN:
- return pluginsd_replay_begin(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_RDSTATE:
- return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_RSSTATE:
- return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_REND:
- return pluginsd_replay_end(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_DIMENSION:
- return pluginsd_dimension(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_CHART:
- return pluginsd_chart(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END:
- return pluginsd_chart_definition_end(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_CLABEL:
- return pluginsd_clabel(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_CLABEL_COMMIT:
- return pluginsd_clabel_commit(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_FUNCTION:
- return pluginsd_function(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN:
- return pluginsd_function_result_begin(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS:
- return pluginsd_function_progress(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_LABEL:
- return pluginsd_label(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_OVERWRITE:
- return pluginsd_overwrite(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_VARIABLE:
- return pluginsd_variable(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_CLAIMED_ID:
- return streaming_claimed_id(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_HOST:
- return pluginsd_host(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_HOST_DEFINE:
- return pluginsd_host_define(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_HOST_DEFINE_END:
- return pluginsd_host_define_end(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_HOST_LABEL:
- return pluginsd_host_labels(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_FLUSH:
- return pluginsd_flush(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_DISABLE:
- return pluginsd_disable(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_EXIT:
- return pluginsd_exit(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_CONFIG:
- return pluginsd_config(words, num_words, parser);
- case PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE:
- case PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE:
- case PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB:
- case PLUGINSD_KEYWORD_ID_DYNCFG_RESET:
- case PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS:
- case PLUGINSD_KEYWORD_ID_DELETE_JOB:
- return pluginsd_dyncfg_noop(words, num_words, parser);
- default:
- netdata_log_error("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
- return PARSER_RC_ERROR;;
- }
- }
- void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
- parser->repertoire = repertoire;
- for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
- if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
- worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
- }
- }
- int pluginsd_parser_unittest(void) {
- PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
- pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
- char *lines[] = {
- "BEGIN2 abcdefghijklmnopqr 123",
- "SET2 abcdefg 0x12345678 0 0",
- "SET2 hijklmnoqr 0x12345678 0 0",
- "SET2 stuvwxyz 0x12345678 0 0",
- "END2",
- NULL,
- };
- char *words[PLUGINSD_MAX_WORDS];
- size_t iterations = 1000000;
- size_t count = 0;
- char input[PLUGINSD_LINE_MAX + 1];
- usec_t started = now_realtime_usec();
- while(--iterations) {
- for(size_t line = 0; lines[line] ;line++) {
- strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
- size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
- const char *command = get_word(words, num_words, 0);
- const PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
- if(unlikely(!keyword))
- fatal("Cannot parse the line '%s'", lines[line]);
- count++;
- }
- }
- usec_t ended = now_realtime_usec();
- netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
- (double)(ended - started) / (double)USEC_PER_SEC,
- (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
- parser_destroy(p);
- return 0;
- }
|