pluginsd_parser.c 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_internals.h"
  3. static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
  4. int idx = 1;
  5. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  6. if(slot >= 0) idx++;
  7. char *dimension = get_word(words, num_words, idx++);
  8. char *value = get_word(words, num_words, idx++);
  9. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
  10. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  11. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
  12. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  13. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
  14. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  15. st->pluginsd.set = true;
  16. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  17. netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
  18. rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
  19. if (value && *value)
  20. rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
  21. return PARSER_RC_OK;
  22. }
  23. static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
  24. int idx = 1;
  25. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  26. if(slot >= 0) idx++;
  27. char *id = get_word(words, num_words, idx++);
  28. char *microseconds_txt = get_word(words, num_words, idx++);
  29. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
  30. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  31. RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
  32. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  33. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
  34. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  35. usec_t microseconds = 0;
  36. if (microseconds_txt && *microseconds_txt) {
  37. long long t = str2ll(microseconds_txt, NULL);
  38. if(t >= 0)
  39. microseconds = t;
  40. }
  41. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  42. if(st->replay.log_next_data_collection) {
  43. st->replay.log_next_data_collection = false;
  44. internal_error(true,
  45. "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
  46. rrdhost_hostname(host), rrdset_id(st),
  47. st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
  48. st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
  49. microseconds
  50. );
  51. }
  52. #endif
  53. if (likely(st->counter_done)) {
  54. if (likely(microseconds)) {
  55. if (parser->user.trust_durations)
  56. rrdset_next_usec_unfiltered(st, microseconds);
  57. else
  58. rrdset_next_usec(st, microseconds);
  59. }
  60. else
  61. rrdset_next(st);
  62. }
  63. return PARSER_RC_OK;
  64. }
  65. static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
  66. char *tv_sec = get_word(words, num_words, 1);
  67. char *tv_usec = get_word(words, num_words, 2);
  68. char *pending_rrdset_next = get_word(words, num_words, 3);
  69. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
  70. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  71. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
  72. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  73. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  74. netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  75. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
  76. parser->user.data_collections_count++;
  77. struct timeval tv = {
  78. .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
  79. .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
  80. };
  81. if(!tv.tv_sec)
  82. now_realtime_timeval(&tv);
  83. rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
  84. return PARSER_RC_OK;
  85. }
  86. static void pluginsd_host_define_cleanup(PARSER *parser) {
  87. string_freez(parser->user.host_define.hostname);
  88. rrdlabels_destroy(parser->user.host_define.rrdlabels);
  89. parser->user.host_define.hostname = NULL;
  90. parser->user.host_define.rrdlabels = NULL;
  91. parser->user.host_define.parsing_host = false;
  92. }
  93. static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
  94. if(uuid_parse(guid, *uuid))
  95. return false;
  96. uuid_unparse_lower(*uuid, output);
  97. return true;
  98. }
  99. static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
  100. char *guid = get_word(words, num_words, 1);
  101. char *hostname = get_word(words, num_words, 2);
  102. if(unlikely(!guid || !*guid || !hostname || !*hostname))
  103. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
  104. if(unlikely(parser->user.host_define.parsing_host))
  105. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
  106. "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
  107. if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
  108. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
  109. parser->user.host_define.hostname = string_strdupz(hostname);
  110. parser->user.host_define.rrdlabels = rrdlabels_create();
  111. parser->user.host_define.parsing_host = true;
  112. return PARSER_RC_OK;
  113. }
  114. static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
  115. char *name = get_word(words, num_words, 1);
  116. char *value = get_word(words, num_words, 2);
  117. if(!name || !*name || !value)
  118. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
  119. if(!parser->user.host_define.parsing_host || !labels)
  120. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  121. rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
  122. return PARSER_RC_OK;
  123. }
  124. static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
  125. return pluginsd_host_dictionary(words, num_words, parser,
  126. parser->user.host_define.rrdlabels,
  127. PLUGINSD_KEYWORD_HOST_LABEL);
  128. }
  129. static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  130. if(!parser->user.host_define.parsing_host)
  131. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  132. RRDHOST *host = rrdhost_find_or_create(
  133. string2str(parser->user.host_define.hostname),
  134. string2str(parser->user.host_define.hostname),
  135. parser->user.host_define.machine_guid_str,
  136. "Netdata Virtual Host 1.0",
  137. netdata_configured_timezone,
  138. netdata_configured_abbrev_timezone,
  139. netdata_configured_utc_offset,
  140. NULL,
  141. program_name,
  142. program_version,
  143. default_rrd_update_every,
  144. default_rrd_history_entries,
  145. default_rrd_memory_mode,
  146. default_health_enabled,
  147. default_rrdpush_enabled,
  148. default_rrdpush_destination,
  149. default_rrdpush_api_key,
  150. default_rrdpush_send_charts_matching,
  151. default_rrdpush_enable_replication,
  152. default_rrdpush_seconds_to_replicate,
  153. default_rrdpush_replication_step,
  154. rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
  155. false);
  156. rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
  157. dyncfg_host_init(host);
  158. if(host->rrdlabels) {
  159. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
  160. }
  161. else {
  162. host->rrdlabels = parser->user.host_define.rrdlabels;
  163. parser->user.host_define.rrdlabels = NULL;
  164. }
  165. pluginsd_host_define_cleanup(parser);
  166. parser->user.host = host;
  167. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
  168. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  169. rrdcontext_host_child_connected(host);
  170. schedule_node_info_update(host);
  171. return PARSER_RC_OK;
  172. }
  173. static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
  174. char *guid = get_word(words, num_words, 1);
  175. if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
  176. parser->user.host = localhost;
  177. return PARSER_RC_OK;
  178. }
  179. uuid_t uuid;
  180. char uuid_str[UUID_STR_LEN];
  181. if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
  182. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
  183. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  184. if(unlikely(!host))
  185. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
  186. parser->user.host = host;
  187. return PARSER_RC_OK;
  188. }
  189. static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
  190. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
  191. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  192. int idx = 1;
  193. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  194. if(slot >= 0) idx++;
  195. char *type = get_word(words, num_words, idx++);
  196. char *name = get_word(words, num_words, idx++);
  197. char *title = get_word(words, num_words, idx++);
  198. char *units = get_word(words, num_words, idx++);
  199. char *family = get_word(words, num_words, idx++);
  200. char *context = get_word(words, num_words, idx++);
  201. char *chart = get_word(words, num_words, idx++);
  202. char *priority_s = get_word(words, num_words, idx++);
  203. char *update_every_s = get_word(words, num_words, idx++);
  204. char *options = get_word(words, num_words, idx++);
  205. char *plugin = get_word(words, num_words, idx++);
  206. char *module = get_word(words, num_words, idx++);
  207. // parse the id from type
  208. char *id = NULL;
  209. if (likely(type && (id = strchr(type, '.')))) {
  210. *id = '\0';
  211. id++;
  212. }
  213. // make sure we have the required variables
  214. if (unlikely((!type || !*type || !id || !*id)))
  215. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
  216. // parse the name, and make sure it does not include 'type.'
  217. if (unlikely(name && *name)) {
  218. // when data are streamed from child nodes
  219. // name will be type.name
  220. // so, we have to remove 'type.' from name too
  221. size_t len = strlen(type);
  222. if (strncmp(type, name, len) == 0 && name[len] == '.')
  223. name = &name[len + 1];
  224. // if the name is the same with the id,
  225. // or is just 'NULL', clear it.
  226. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  227. name = NULL;
  228. }
  229. int priority = 1000;
  230. if (likely(priority_s && *priority_s))
  231. priority = str2i(priority_s);
  232. int update_every = parser->user.cd->update_every;
  233. if (likely(update_every_s && *update_every_s))
  234. update_every = str2i(update_every_s);
  235. if (unlikely(!update_every))
  236. update_every = parser->user.cd->update_every;
  237. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  238. if (unlikely(chart))
  239. chart_type = rrdset_type_id(chart);
  240. if (unlikely(name && !*name))
  241. name = NULL;
  242. if (unlikely(family && !*family))
  243. family = NULL;
  244. if (unlikely(context && !*context))
  245. context = NULL;
  246. if (unlikely(!title))
  247. title = "";
  248. if (unlikely(!units))
  249. units = "unknown";
  250. netdata_log_debug(
  251. D_PLUGINSD,
  252. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  253. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  254. priority, update_every);
  255. RRDSET *st = NULL;
  256. st = rrdset_create(
  257. host, type, id, name, family, context, title, units,
  258. (plugin && *plugin) ? plugin : parser->user.cd->filename,
  259. module, priority, update_every,
  260. chart_type);
  261. bool obsolete = false;
  262. if (likely(st)) {
  263. if (options && *options) {
  264. if (strstr(options, "obsolete")) {
  265. rrdset_is_obsolete___safe_from_collector_thread(st);
  266. obsolete = true;
  267. }
  268. else
  269. rrdset_isnot_obsolete___safe_from_collector_thread(st);
  270. if (strstr(options, "detail"))
  271. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  272. else
  273. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  274. if (strstr(options, "hidden"))
  275. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  276. else
  277. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  278. if (strstr(options, "store_first"))
  279. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  280. else
  281. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  282. }
  283. else {
  284. rrdset_isnot_obsolete___safe_from_collector_thread(st);
  285. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  286. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  287. }
  288. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
  289. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  290. pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
  291. }
  292. else
  293. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
  294. return PARSER_RC_OK;
  295. }
  296. static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
  297. const char *first_entry_txt = get_word(words, num_words, 1);
  298. const char *last_entry_txt = get_word(words, num_words, 2);
  299. const char *wall_clock_time_txt = get_word(words, num_words, 3);
  300. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
  301. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  302. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
  303. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  304. time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
  305. time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
  306. time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
  307. bool ok = true;
  308. if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  309. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  310. st->replay.start_streaming = false;
  311. st->replay.after = 0;
  312. st->replay.before = 0;
  313. #endif
  314. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  315. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  316. rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
  317. ok = replicate_chart_request(send_to_plugin, parser, host, st,
  318. first_entry_child, last_entry_child, child_wall_clock_time,
  319. 0, 0);
  320. }
  321. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  322. else {
  323. internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
  324. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  325. }
  326. #endif
  327. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  328. }
  329. static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
  330. int idx = 1;
  331. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  332. if(slot >= 0) idx++;
  333. char *id = get_word(words, num_words, idx++);
  334. char *name = get_word(words, num_words, idx++);
  335. char *algorithm = get_word(words, num_words, idx++);
  336. char *multiplier_s = get_word(words, num_words, idx++);
  337. char *divisor_s = get_word(words, num_words, idx++);
  338. char *options = get_word(words, num_words, idx++);
  339. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
  340. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  341. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
  342. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  343. if (unlikely(!id))
  344. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
  345. long multiplier = 1;
  346. if (multiplier_s && *multiplier_s) {
  347. multiplier = str2ll_encoded(multiplier_s);
  348. if (unlikely(!multiplier))
  349. multiplier = 1;
  350. }
  351. long divisor = 1;
  352. if (likely(divisor_s && *divisor_s)) {
  353. divisor = str2ll_encoded(divisor_s);
  354. if (unlikely(!divisor))
  355. divisor = 1;
  356. }
  357. if (unlikely(!algorithm || !*algorithm))
  358. algorithm = "absolute";
  359. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  360. netdata_log_debug(
  361. D_PLUGINSD,
  362. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  363. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  364. options ? options : "");
  365. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  366. int unhide_dimension = 1;
  367. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  368. bool obsolete = false;
  369. if (options && *options) {
  370. if (strstr(options, "obsolete") != NULL) {
  371. obsolete = true;
  372. rrddim_is_obsolete___safe_from_collector_thread(st, rd);
  373. }
  374. else
  375. rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
  376. unhide_dimension = !strstr(options, "hidden");
  377. if (strstr(options, "noreset") != NULL)
  378. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  379. if (strstr(options, "nooverflow") != NULL)
  380. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  381. }
  382. else
  383. rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
  384. bool should_update_dimension = false;
  385. if (likely(unhide_dimension)) {
  386. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  387. should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  388. }
  389. else {
  390. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  391. should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  392. }
  393. if (should_update_dimension) {
  394. rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
  395. rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  396. }
  397. pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
  398. return PARSER_RC_OK;
  399. }
  400. // ----------------------------------------------------------------------------
  401. static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
  402. char *name = get_word(words, num_words, 1);
  403. char *value = get_word(words, num_words, 2);
  404. NETDATA_DOUBLE v;
  405. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
  406. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  407. RRDSET *st = pluginsd_get_scope_chart(parser);
  408. int global = (st) ? 0 : 1;
  409. if (name && *name) {
  410. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  411. global = 1;
  412. name = get_word(words, num_words, 2);
  413. value = get_word(words, num_words, 3);
  414. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  415. global = 0;
  416. name = get_word(words, num_words, 2);
  417. value = get_word(words, num_words, 3);
  418. }
  419. }
  420. if (unlikely(!name || !*name))
  421. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
  422. if (unlikely(!value || !*value))
  423. value = NULL;
  424. if (unlikely(!value)) {
  425. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
  426. rrdhost_hostname(host),
  427. st ? rrdset_id(st):"UNSET",
  428. (global) ? "HOST" : "CHART",
  429. name);
  430. return PARSER_RC_OK;
  431. }
  432. if (!global && !st)
  433. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
  434. char *endptr = NULL;
  435. v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
  436. if (unlikely(endptr && *endptr)) {
  437. if (endptr == value)
  438. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
  439. rrdhost_hostname(host),
  440. st ? rrdset_id(st):"UNSET",
  441. value,
  442. name);
  443. else
  444. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
  445. rrdhost_hostname(host),
  446. st ? rrdset_id(st):"UNSET",
  447. value,
  448. name,
  449. endptr);
  450. }
  451. if (global) {
  452. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  453. if (rva) {
  454. rrdvar_custom_host_variable_set(host, rva, v);
  455. rrdvar_custom_host_variable_release(host, rva);
  456. }
  457. else
  458. netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
  459. rrdhost_hostname(host),
  460. name);
  461. } else {
  462. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  463. if (rsa) {
  464. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  465. rrdsetvar_custom_chart_variable_release(st, rsa);
  466. }
  467. else
  468. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
  469. rrdhost_hostname(host), rrdset_id(st), name);
  470. }
  471. return PARSER_RC_OK;
  472. }
  473. static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  474. netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
  475. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
  476. parser->user.replay.start_time = 0;
  477. parser->user.replay.end_time = 0;
  478. parser->user.replay.start_time_ut = 0;
  479. parser->user.replay.end_time_ut = 0;
  480. return PARSER_RC_OK;
  481. }
  482. static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  483. netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
  484. parser->user.enabled = 0;
  485. return PARSER_RC_STOP;
  486. }
  487. static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
  488. const char *name = get_word(words, num_words, 1);
  489. const char *label_source = get_word(words, num_words, 2);
  490. const char *value = get_word(words, num_words, 3);
  491. if (!name || !label_source || !value)
  492. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
  493. char *store = (char *)value;
  494. bool allocated_store = false;
  495. if(unlikely(num_words > 4)) {
  496. allocated_store = true;
  497. store = mallocz(PLUGINSD_LINE_MAX + 1);
  498. size_t remaining = PLUGINSD_LINE_MAX;
  499. char *move = store;
  500. char *word;
  501. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  502. if(i > 3) {
  503. *move++ = ' ';
  504. *move = '\0';
  505. remaining--;
  506. }
  507. size_t length = strlen(word);
  508. if (length > remaining)
  509. length = remaining;
  510. remaining -= length;
  511. memcpy(move, word, length);
  512. move += length;
  513. *move = '\0';
  514. }
  515. }
  516. if(unlikely(!(parser->user.new_host_labels)))
  517. parser->user.new_host_labels = rrdlabels_create();
  518. if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
  519. int is_ephemeral = appconfig_test_boolean_value((char *) value);
  520. if (is_ephemeral) {
  521. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
  522. if (likely(host))
  523. rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
  524. }
  525. }
  526. rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
  527. if (allocated_store)
  528. freez(store);
  529. return PARSER_RC_OK;
  530. }
  531. static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  532. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
  533. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  534. netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  535. if(unlikely(!host->rrdlabels))
  536. host->rrdlabels = rrdlabels_create();
  537. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
  538. if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
  539. rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
  540. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  541. rrdlabels_destroy(parser->user.new_host_labels);
  542. parser->user.new_host_labels = NULL;
  543. return PARSER_RC_OK;
  544. }
  545. static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
  546. const char *name = get_word(words, num_words, 1);
  547. const char *value = get_word(words, num_words, 2);
  548. const char *label_source = get_word(words, num_words, 3);
  549. if (!name || !value || !label_source) {
  550. netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
  551. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  552. }
  553. if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
  554. RRDSET *st = pluginsd_get_scope_chart(parser);
  555. parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
  556. rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
  557. }
  558. rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
  559. return PARSER_RC_OK;
  560. }
  561. static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  562. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
  563. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  564. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
  565. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  566. netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
  567. if(!parser->user.chart_rrdlabels_linked_temporarily) {
  568. netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
  569. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  570. }
  571. rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
  572. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  573. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  574. rrdset_metadata_updated(st);
  575. parser->user.chart_rrdlabels_linked_temporarily = NULL;
  576. return PARSER_RC_OK;
  577. }
  578. static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
  579. timing_init();
  580. int idx = 1;
  581. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  582. if(slot >= 0) idx++;
  583. char *id = get_word(words, num_words, idx++);
  584. char *update_every_str = get_word(words, num_words, idx++);
  585. char *end_time_str = get_word(words, num_words, idx++);
  586. char *wall_clock_time_str = get_word(words, num_words, idx++);
  587. if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
  588. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
  589. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
  590. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  591. timing_step(TIMING_STEP_BEGIN2_PREPARE);
  592. RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
  593. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  594. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
  595. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  596. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
  597. rrdset_isnot_obsolete___safe_from_collector_thread(st);
  598. timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
  599. // ------------------------------------------------------------------------
  600. // parse the parameters
  601. time_t update_every = (time_t) str2ull_encoded(update_every_str);
  602. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  603. time_t wall_clock_time;
  604. if(likely(*wall_clock_time_str == '#'))
  605. wall_clock_time = end_time;
  606. else
  607. wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
  608. if (unlikely(update_every != st->update_every))
  609. rrdset_set_update_every_s(st, update_every);
  610. timing_step(TIMING_STEP_BEGIN2_PARSE);
  611. // ------------------------------------------------------------------------
  612. // prepare our state
  613. pluginsd_lock_rrdset_data_collection(parser);
  614. parser->user.v2.update_every = update_every;
  615. parser->user.v2.end_time = end_time;
  616. parser->user.v2.wall_clock_time = wall_clock_time;
  617. parser->user.v2.ml_locked = ml_chart_update_begin(st);
  618. timing_step(TIMING_STEP_BEGIN2_ML);
  619. // ------------------------------------------------------------------------
  620. // propagate it forward in v2
  621. if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
  622. parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
  623. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
  624. // check receiver capabilities
  625. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  626. // check sender capabilities
  627. bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
  628. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  629. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  630. buffer_need_bytes(wb, 1024);
  631. if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
  632. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  633. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
  634. if(with_slots) {
  635. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  636. buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
  637. }
  638. buffer_fast_strcat(wb, " '", 2);
  639. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  640. buffer_fast_strcat(wb, "' ", 2);
  641. if(can_copy)
  642. buffer_strcat(wb, update_every_str);
  643. else
  644. buffer_print_uint64_encoded(wb, integer_encoding, update_every);
  645. buffer_fast_strcat(wb, " ", 1);
  646. if(can_copy)
  647. buffer_strcat(wb, end_time_str);
  648. else
  649. buffer_print_uint64_encoded(wb, integer_encoding, end_time);
  650. buffer_fast_strcat(wb, " ", 1);
  651. if(can_copy)
  652. buffer_strcat(wb, wall_clock_time_str);
  653. else
  654. buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
  655. buffer_fast_strcat(wb, "\n", 1);
  656. parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
  657. parser->user.v2.stream_buffer.begin_v2_added = true;
  658. }
  659. timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
  660. // ------------------------------------------------------------------------
  661. // store it
  662. st->last_collected_time.tv_sec = end_time;
  663. st->last_collected_time.tv_usec = 0;
  664. st->last_updated.tv_sec = end_time;
  665. st->last_updated.tv_usec = 0;
  666. st->counter++;
  667. st->counter_done++;
  668. // these are only needed for db mode RAM, ALLOC
  669. st->db.current_entry++;
  670. if(st->db.current_entry >= st->db.entries)
  671. st->db.current_entry -= st->db.entries;
  672. timing_step(TIMING_STEP_BEGIN2_STORE);
  673. return PARSER_RC_OK;
  674. }
  675. static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
  676. timing_init();
  677. int idx = 1;
  678. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  679. if(slot >= 0) idx++;
  680. char *dimension = get_word(words, num_words, idx++);
  681. char *collected_str = get_word(words, num_words, idx++);
  682. char *value_str = get_word(words, num_words, idx++);
  683. char *flags_str = get_word(words, num_words, idx++);
  684. if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
  685. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
  686. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
  687. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  688. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  689. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  690. timing_step(TIMING_STEP_SET2_PREPARE);
  691. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
  692. if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  693. st->pluginsd.set = true;
  694. if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
  695. rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
  696. timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
  697. // ------------------------------------------------------------------------
  698. // parse the parameters
  699. collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
  700. NETDATA_DOUBLE value;
  701. if(*value_str == '#')
  702. value = (NETDATA_DOUBLE)collected_value;
  703. else
  704. value = str2ndd_encoded(value_str, NULL);
  705. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  706. timing_step(TIMING_STEP_SET2_PARSE);
  707. // ------------------------------------------------------------------------
  708. // check value and ML
  709. if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
  710. value = NAN;
  711. flags = SN_EMPTY_SLOT;
  712. if(parser->user.v2.ml_locked)
  713. ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
  714. }
  715. else if(parser->user.v2.ml_locked) {
  716. if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
  717. // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
  718. flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
  719. }
  720. else
  721. flags |= SN_FLAG_NOT_ANOMALOUS;
  722. }
  723. timing_step(TIMING_STEP_SET2_ML);
  724. // ------------------------------------------------------------------------
  725. // propagate it forward in v2
  726. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
  727. // check if receiver and sender have the same number parsing capabilities
  728. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  729. // check the sender capabilities
  730. bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
  731. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  732. NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  733. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  734. buffer_need_bytes(wb, 1024);
  735. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
  736. if(with_slots) {
  737. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  738. buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
  739. }
  740. buffer_fast_strcat(wb, " '", 2);
  741. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  742. buffer_fast_strcat(wb, "' ", 2);
  743. if(can_copy)
  744. buffer_strcat(wb, collected_str);
  745. else
  746. buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
  747. buffer_fast_strcat(wb, " ", 1);
  748. if(can_copy)
  749. buffer_strcat(wb, value_str);
  750. else
  751. buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
  752. buffer_fast_strcat(wb, " ", 1);
  753. buffer_print_sn_flags(wb, flags, true);
  754. buffer_fast_strcat(wb, "\n", 1);
  755. }
  756. timing_step(TIMING_STEP_SET2_PROPAGATE);
  757. // ------------------------------------------------------------------------
  758. // store it
  759. rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
  760. rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
  761. rd->collector.last_collected_time.tv_usec = 0;
  762. rd->collector.last_collected_value = collected_value;
  763. rd->collector.last_stored_value = value;
  764. rd->collector.last_calculated_value = value;
  765. rd->collector.counter++;
  766. rrddim_set_updated(rd);
  767. timing_step(TIMING_STEP_SET2_STORE);
  768. return PARSER_RC_OK;
  769. }
  770. static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  771. timing_init();
  772. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
  773. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  774. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  775. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  776. parser->user.data_collections_count++;
  777. timing_step(TIMING_STEP_END2_PREPARE);
  778. // ------------------------------------------------------------------------
  779. // propagate the whole chart update in v1
  780. if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
  781. rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
  782. timing_step(TIMING_STEP_END2_PUSH_V1);
  783. // ------------------------------------------------------------------------
  784. // unblock data collection
  785. pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
  786. rrdcontext_collected_rrdset(st);
  787. store_metric_collection_completed();
  788. timing_step(TIMING_STEP_END2_RRDSET);
  789. // ------------------------------------------------------------------------
  790. // propagate it forward
  791. rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
  792. timing_step(TIMING_STEP_END2_PROPAGATE);
  793. // ------------------------------------------------------------------------
  794. // cleanup RRDSET / RRDDIM
  795. if(likely(st->pluginsd.dims_with_slots)) {
  796. for(size_t i = 0; i < st->pluginsd.size ;i++) {
  797. RRDDIM *rd = st->pluginsd.prd_array[i].rd;
  798. if(!rd)
  799. continue;
  800. rd->collector.calculated_value = 0;
  801. rd->collector.collected_value = 0;
  802. rrddim_clear_updated(rd);
  803. }
  804. }
  805. else {
  806. RRDDIM *rd;
  807. rrddim_foreach_read(rd, st){
  808. rd->collector.calculated_value = 0;
  809. rd->collector.collected_value = 0;
  810. rrddim_clear_updated(rd);
  811. }
  812. rrddim_foreach_done(rd);
  813. }
  814. // ------------------------------------------------------------------------
  815. // reset state
  816. parser->user.v2 = (struct parser_user_object_v2){ 0 };
  817. timing_step(TIMING_STEP_END2_STORE);
  818. timing_report();
  819. return PARSER_RC_OK;
  820. }
  821. static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  822. netdata_log_info("PLUGINSD: plugin called EXIT.");
  823. return PARSER_RC_STOP;
  824. }
  825. static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
  826. {
  827. const char *host_uuid_str = get_word(words, num_words, 1);
  828. const char *claim_id_str = get_word(words, num_words, 2);
  829. if (!host_uuid_str || !claim_id_str) {
  830. netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
  831. host_uuid_str ? host_uuid_str : "[unset]",
  832. claim_id_str ? claim_id_str : "[unset]");
  833. return PARSER_RC_ERROR;
  834. }
  835. uuid_t uuid;
  836. RRDHOST *host = parser->user.host;
  837. // We don't need the parsed UUID
  838. // just do it to check the format
  839. if(uuid_parse(host_uuid_str, uuid)) {
  840. netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
  841. return PARSER_RC_ERROR;
  842. }
  843. if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
  844. netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
  845. return PARSER_RC_ERROR;
  846. }
  847. if(strcmp(host_uuid_str, host->machine_guid) != 0) {
  848. netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
  849. return PARSER_RC_OK; //the message is OK problem must be somewhere else
  850. }
  851. rrdhost_aclk_state_lock(host);
  852. if (host->aclk_state.claimed_id)
  853. freez(host->aclk_state.claimed_id);
  854. host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
  855. rrdhost_aclk_state_unlock(host);
  856. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
  857. rrdpush_send_claimed_id(host);
  858. return PARSER_RC_OK;
  859. }
  860. // ----------------------------------------------------------------------------
  861. void pluginsd_cleanup_v2(PARSER *parser) {
  862. // this is called when the thread is stopped while processing
  863. pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
  864. }
  865. void pluginsd_process_thread_cleanup(void *ptr) {
  866. PARSER *parser = (PARSER *)ptr;
  867. pluginsd_cleanup_v2(parser);
  868. pluginsd_host_define_cleanup(parser);
  869. rrd_collector_finished();
  870. #ifdef NETDATA_LOG_STREAM_RECEIVE
  871. if(parser->user.stream_log_fp) {
  872. fclose(parser->user.stream_log_fp);
  873. parser->user.stream_log_fp = NULL;
  874. }
  875. #endif
  876. parser_destroy(parser);
  877. }
  878. bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
  879. PARSER *parser = ptr;
  880. if(!parser || !parser->user.host)
  881. return false;
  882. buffer_strcat(wb, rrdhost_hostname(parser->user.host));
  883. return true;
  884. }
  885. bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
  886. PARSER *parser = ptr;
  887. if(!parser || !parser->user.st)
  888. return false;
  889. buffer_strcat(wb, rrdset_name(parser->user.st));
  890. return true;
  891. }
  892. bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
  893. PARSER *parser = ptr;
  894. if(!parser || !parser->user.st)
  895. return false;
  896. buffer_strcat(wb, string2str(parser->user.st->context));
  897. return true;
  898. }
  899. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  900. {
  901. int enabled = cd->unsafe.enabled;
  902. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  903. cd->unsafe.enabled = 0;
  904. return 0;
  905. }
  906. if (unlikely(fileno(fp_plugin_input) == -1)) {
  907. netdata_log_error("input file descriptor given is not a valid stream");
  908. cd->serial_failures++;
  909. return 0;
  910. }
  911. if (unlikely(fileno(fp_plugin_output) == -1)) {
  912. netdata_log_error("output file descriptor given is not a valid stream");
  913. cd->serial_failures++;
  914. return 0;
  915. }
  916. clearerr(fp_plugin_input);
  917. clearerr(fp_plugin_output);
  918. PARSER *parser;
  919. {
  920. PARSER_USER_OBJECT user = {
  921. .enabled = cd->unsafe.enabled,
  922. .host = host,
  923. .cd = cd,
  924. .trust_durations = trust_durations
  925. };
  926. // fp_plugin_output = our input; fp_plugin_input = our output
  927. parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
  928. }
  929. pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
  930. rrd_collector_started();
  931. size_t count = 0;
  932. // this keeps the parser with its current value
  933. // so, parser needs to be allocated before pushing it
  934. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser)
  935. {
  936. ND_LOG_STACK lgs[] = {
  937. ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
  938. ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
  939. ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
  940. ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
  941. ND_LOG_FIELD_END(),
  942. };
  943. ND_LOG_STACK_PUSH(lgs);
  944. buffered_reader_init(&parser->reader);
  945. CLEAN_BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
  946. while(likely(service_running(SERVICE_COLLECTORS))) {
  947. if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
  948. buffered_reader_ret_t ret = buffered_reader_read_timeout(
  949. &parser->reader,
  950. fileno((FILE *) parser->fp_input),
  951. 2 * 60 * MSEC_PER_SEC, true
  952. );
  953. if(unlikely(ret != BUFFERED_READER_READ_OK))
  954. break;
  955. continue;
  956. }
  957. if(unlikely(parser_action(parser, buffer->buffer)))
  958. break;
  959. buffer->len = 0;
  960. buffer->buffer[0] = '\0';
  961. }
  962. cd->unsafe.enabled = parser->user.enabled;
  963. count = parser->user.data_collections_count;
  964. if(likely(count)) {
  965. cd->successful_collections += count;
  966. cd->serial_failures = 0;
  967. }
  968. else
  969. cd->serial_failures++;
  970. }
  971. netdata_thread_cleanup_pop(1); // free parser with the pop function
  972. return count;
  973. }
  974. #include "gperf-hashtable.h"
  975. PARSER_RC parser_execute(PARSER *parser, const PARSER_KEYWORD *keyword, char **words, size_t num_words) {
  976. switch(keyword->id) {
  977. case PLUGINSD_KEYWORD_ID_SET2:
  978. return pluginsd_set_v2(words, num_words, parser);
  979. case PLUGINSD_KEYWORD_ID_BEGIN2:
  980. return pluginsd_begin_v2(words, num_words, parser);
  981. case PLUGINSD_KEYWORD_ID_END2:
  982. return pluginsd_end_v2(words, num_words, parser);
  983. case PLUGINSD_KEYWORD_ID_SET:
  984. return pluginsd_set(words, num_words, parser);
  985. case PLUGINSD_KEYWORD_ID_BEGIN:
  986. return pluginsd_begin(words, num_words, parser);
  987. case PLUGINSD_KEYWORD_ID_END:
  988. return pluginsd_end(words, num_words, parser);
  989. case PLUGINSD_KEYWORD_ID_RSET:
  990. return pluginsd_replay_set(words, num_words, parser);
  991. case PLUGINSD_KEYWORD_ID_RBEGIN:
  992. return pluginsd_replay_begin(words, num_words, parser);
  993. case PLUGINSD_KEYWORD_ID_RDSTATE:
  994. return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
  995. case PLUGINSD_KEYWORD_ID_RSSTATE:
  996. return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
  997. case PLUGINSD_KEYWORD_ID_REND:
  998. return pluginsd_replay_end(words, num_words, parser);
  999. case PLUGINSD_KEYWORD_ID_DIMENSION:
  1000. return pluginsd_dimension(words, num_words, parser);
  1001. case PLUGINSD_KEYWORD_ID_CHART:
  1002. return pluginsd_chart(words, num_words, parser);
  1003. case PLUGINSD_KEYWORD_ID_CHART_DEFINITION_END:
  1004. return pluginsd_chart_definition_end(words, num_words, parser);
  1005. case PLUGINSD_KEYWORD_ID_CLABEL:
  1006. return pluginsd_clabel(words, num_words, parser);
  1007. case PLUGINSD_KEYWORD_ID_CLABEL_COMMIT:
  1008. return pluginsd_clabel_commit(words, num_words, parser);
  1009. case PLUGINSD_KEYWORD_ID_FUNCTION:
  1010. return pluginsd_function(words, num_words, parser);
  1011. case PLUGINSD_KEYWORD_ID_FUNCTION_RESULT_BEGIN:
  1012. return pluginsd_function_result_begin(words, num_words, parser);
  1013. case PLUGINSD_KEYWORD_ID_FUNCTION_PROGRESS:
  1014. return pluginsd_function_progress(words, num_words, parser);
  1015. case PLUGINSD_KEYWORD_ID_LABEL:
  1016. return pluginsd_label(words, num_words, parser);
  1017. case PLUGINSD_KEYWORD_ID_OVERWRITE:
  1018. return pluginsd_overwrite(words, num_words, parser);
  1019. case PLUGINSD_KEYWORD_ID_VARIABLE:
  1020. return pluginsd_variable(words, num_words, parser);
  1021. case PLUGINSD_KEYWORD_ID_CLAIMED_ID:
  1022. return streaming_claimed_id(words, num_words, parser);
  1023. case PLUGINSD_KEYWORD_ID_HOST:
  1024. return pluginsd_host(words, num_words, parser);
  1025. case PLUGINSD_KEYWORD_ID_HOST_DEFINE:
  1026. return pluginsd_host_define(words, num_words, parser);
  1027. case PLUGINSD_KEYWORD_ID_HOST_DEFINE_END:
  1028. return pluginsd_host_define_end(words, num_words, parser);
  1029. case PLUGINSD_KEYWORD_ID_HOST_LABEL:
  1030. return pluginsd_host_labels(words, num_words, parser);
  1031. case PLUGINSD_KEYWORD_ID_FLUSH:
  1032. return pluginsd_flush(words, num_words, parser);
  1033. case PLUGINSD_KEYWORD_ID_DISABLE:
  1034. return pluginsd_disable(words, num_words, parser);
  1035. case PLUGINSD_KEYWORD_ID_EXIT:
  1036. return pluginsd_exit(words, num_words, parser);
  1037. case PLUGINSD_KEYWORD_ID_CONFIG:
  1038. return pluginsd_config(words, num_words, parser);
  1039. case PLUGINSD_KEYWORD_ID_DYNCFG_ENABLE:
  1040. case PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_MODULE:
  1041. case PLUGINSD_KEYWORD_ID_DYNCFG_REGISTER_JOB:
  1042. case PLUGINSD_KEYWORD_ID_DYNCFG_RESET:
  1043. case PLUGINSD_KEYWORD_ID_REPORT_JOB_STATUS:
  1044. case PLUGINSD_KEYWORD_ID_DELETE_JOB:
  1045. return pluginsd_dyncfg_noop(words, num_words, parser);
  1046. default:
  1047. netdata_log_error("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
  1048. return PARSER_RC_ERROR;;
  1049. }
  1050. }
  1051. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  1052. parser->repertoire = repertoire;
  1053. for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
  1054. if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
  1055. worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
  1056. }
  1057. }
  1058. int pluginsd_parser_unittest(void) {
  1059. PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  1060. pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
  1061. char *lines[] = {
  1062. "BEGIN2 abcdefghijklmnopqr 123",
  1063. "SET2 abcdefg 0x12345678 0 0",
  1064. "SET2 hijklmnoqr 0x12345678 0 0",
  1065. "SET2 stuvwxyz 0x12345678 0 0",
  1066. "END2",
  1067. NULL,
  1068. };
  1069. char *words[PLUGINSD_MAX_WORDS];
  1070. size_t iterations = 1000000;
  1071. size_t count = 0;
  1072. char input[PLUGINSD_LINE_MAX + 1];
  1073. usec_t started = now_realtime_usec();
  1074. while(--iterations) {
  1075. for(size_t line = 0; lines[line] ;line++) {
  1076. strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
  1077. size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
  1078. const char *command = get_word(words, num_words, 0);
  1079. const PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
  1080. if(unlikely(!keyword))
  1081. fatal("Cannot parse the line '%s'", lines[line]);
  1082. count++;
  1083. }
  1084. }
  1085. usec_t ended = now_realtime_usec();
  1086. netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
  1087. (double)(ended - started) / (double)USEC_PER_SEC,
  1088. (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
  1089. parser_destroy(p);
  1090. return 0;
  1091. }