pluginsd_parser.c 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_parser.h"
  3. #define LOG_FUNCTIONS false
  4. static int send_to_plugin(const char *txt, void *data) {
  5. PARSER *parser = data;
  6. if(!txt || !*txt)
  7. return 0;
  8. #ifdef ENABLE_HTTPS
  9. struct netdata_ssl *ssl = parser->ssl_output;
  10. if(ssl) {
  11. if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
  12. size_t size = strlen(txt);
  13. return SSL_write(ssl->conn, txt, (int)size);
  14. }
  15. error("cannot write to SSL connection - connection is not ready.");
  16. return -1;
  17. }
  18. #endif
  19. FILE *fp = parser->output;
  20. int ret = fprintf(fp, "%s", txt);
  21. fflush(fp);
  22. return ret;
  23. }
  24. PARSER_RC pluginsd_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  25. {
  26. char *dimension = get_word(words, num_words, 1);
  27. char *value = get_word(words, num_words, 2);
  28. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  29. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  30. if (unlikely(!dimension || !*dimension)) {
  31. error("requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", rrdset_id(st), rrdhost_hostname(host));
  32. goto disable;
  33. }
  34. if (unlikely(!value || !*value))
  35. value = NULL;
  36. if (unlikely(!st)) {
  37. error(
  38. "requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.", dimension,
  39. value ? value : "<nothing>", rrdhost_hostname(host));
  40. goto disable;
  41. }
  42. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  43. debug(D_PLUGINSD, "is setting dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value ? value : "<nothing>");
  44. if (value) {
  45. RRDDIM *rd = rrddim_find(st, dimension);
  46. if (unlikely(!rd)) {
  47. error(
  48. "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
  49. dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
  50. goto disable;
  51. } else
  52. rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
  53. }
  54. return PARSER_RC_OK;
  55. disable:
  56. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  57. return PARSER_RC_ERROR;
  58. }
  59. PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  60. {
  61. char *id = get_word(words, num_words, 1);
  62. char *microseconds_txt = get_word(words, num_words, 2);
  63. RRDSET *st = NULL;
  64. RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
  65. if (unlikely(!id)) {
  66. error("requested a BEGIN without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
  67. goto disable;
  68. }
  69. st = rrdset_find(host, id);
  70. if (unlikely(!st)) {
  71. error("requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id, rrdhost_hostname(host));
  72. goto disable;
  73. }
  74. ((PARSER_USER_OBJECT *)user)->st = st;
  75. usec_t microseconds = 0;
  76. if (microseconds_txt && *microseconds_txt)
  77. microseconds = str2ull(microseconds_txt);
  78. if (likely(st->counter_done)) {
  79. if (likely(microseconds)) {
  80. if (((PARSER_USER_OBJECT *)user)->trust_durations)
  81. rrdset_next_usec_unfiltered(st, microseconds);
  82. else
  83. rrdset_next_usec(st, microseconds);
  84. } else
  85. rrdset_next(st);
  86. }
  87. return PARSER_RC_OK;
  88. disable:
  89. ((PARSER_USER_OBJECT *)user)->enabled = 0;
  90. return PARSER_RC_ERROR;
  91. }
  92. PARSER_RC pluginsd_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  93. {
  94. UNUSED(words);
  95. UNUSED(num_words);
  96. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  97. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  98. if (unlikely(!st)) {
  99. error("requested an END, without a BEGIN on host '%s'. Disabling it.", rrdhost_hostname(host));
  100. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  101. return PARSER_RC_ERROR;
  102. }
  103. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  104. debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  105. ((PARSER_USER_OBJECT *) user)->st = NULL;
  106. ((PARSER_USER_OBJECT *) user)->count++;
  107. rrdset_done(st);
  108. return PARSER_RC_OK;
  109. }
  110. PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  111. {
  112. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  113. if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
  114. debug(D_PLUGINSD, "Ignoring chart belonging to missing or ignored host.");
  115. return PARSER_RC_OK;
  116. }
  117. char *type = get_word(words, num_words, 1);
  118. char *name = get_word(words, num_words, 2);
  119. char *title = get_word(words, num_words, 3);
  120. char *units = get_word(words, num_words, 4);
  121. char *family = get_word(words, num_words, 5);
  122. char *context = get_word(words, num_words, 6);
  123. char *chart = get_word(words, num_words, 7);
  124. char *priority_s = get_word(words, num_words, 8);
  125. char *update_every_s = get_word(words, num_words, 9);
  126. char *options = get_word(words, num_words, 10);
  127. char *plugin = get_word(words, num_words, 11);
  128. char *module = get_word(words, num_words, 12);
  129. // parse the id from type
  130. char *id = NULL;
  131. if (likely(type && (id = strchr(type, '.')))) {
  132. *id = '\0';
  133. id++;
  134. }
  135. // make sure we have the required variables
  136. if (unlikely((!type || !*type || !id || !*id))) {
  137. if (likely(host))
  138. error("requested a CHART, without a type.id, on host '%s'. Disabling it.", rrdhost_hostname(host));
  139. else
  140. error("requested a CHART, without a type.id. Disabling it.");
  141. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  142. return PARSER_RC_ERROR;
  143. }
  144. // parse the name, and make sure it does not include 'type.'
  145. if (unlikely(name && *name)) {
  146. // when data are streamed from child nodes
  147. // name will be type.name
  148. // so we have to remove 'type.' from name too
  149. size_t len = strlen(type);
  150. if (strncmp(type, name, len) == 0 && name[len] == '.')
  151. name = &name[len + 1];
  152. // if the name is the same with the id,
  153. // or is just 'NULL', clear it.
  154. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  155. name = NULL;
  156. }
  157. int priority = 1000;
  158. if (likely(priority_s && *priority_s))
  159. priority = str2i(priority_s);
  160. int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
  161. if (likely(update_every_s && *update_every_s))
  162. update_every = str2i(update_every_s);
  163. if (unlikely(!update_every))
  164. update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
  165. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  166. if (unlikely(chart))
  167. chart_type = rrdset_type_id(chart);
  168. if (unlikely(name && !*name))
  169. name = NULL;
  170. if (unlikely(family && !*family))
  171. family = NULL;
  172. if (unlikely(context && !*context))
  173. context = NULL;
  174. if (unlikely(!title))
  175. title = "";
  176. if (unlikely(!units))
  177. units = "unknown";
  178. debug(
  179. D_PLUGINSD,
  180. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  181. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  182. priority, update_every);
  183. RRDSET *st = NULL;
  184. st = rrdset_create(
  185. host, type, id, name, family, context, title, units,
  186. (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename,
  187. module, priority, update_every,
  188. chart_type);
  189. if (likely(st)) {
  190. if (options && *options) {
  191. if (strstr(options, "obsolete"))
  192. rrdset_is_obsolete(st);
  193. else
  194. rrdset_isnot_obsolete(st);
  195. if (strstr(options, "detail"))
  196. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  197. else
  198. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  199. if (strstr(options, "hidden"))
  200. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  201. else
  202. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  203. if (strstr(options, "store_first"))
  204. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  205. else
  206. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  207. } else {
  208. rrdset_isnot_obsolete(st);
  209. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  210. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  211. }
  212. }
  213. ((PARSER_USER_OBJECT *)user)->st = st;
  214. return PARSER_RC_OK;
  215. }
  216. PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
  217. {
  218. UNUSED(plugins_action);
  219. long first_entry_child = str2l(get_word(words, num_words, 1));
  220. long last_entry_child = str2l(get_word(words, num_words, 2));
  221. PARSER_USER_OBJECT *user_object = (PARSER_USER_OBJECT *) user;
  222. RRDHOST *host = user_object->host;
  223. RRDSET *st = user_object->st;
  224. if(unlikely(!host || !st)) {
  225. error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without a chart. Disabling it.");
  226. return PARSER_RC_ERROR;
  227. }
  228. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  229. bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
  230. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  231. }
  232. PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  233. {
  234. char *id = get_word(words, num_words, 1);
  235. char *name = get_word(words, num_words, 2);
  236. char *algorithm = get_word(words, num_words, 3);
  237. char *multiplier_s = get_word(words, num_words, 4);
  238. char *divisor_s = get_word(words, num_words, 5);
  239. char *options = get_word(words, num_words, 6);
  240. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  241. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  242. if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
  243. debug(D_PLUGINSD, "Ignoring dimension belonging to missing or ignored host.");
  244. return PARSER_RC_OK;
  245. }
  246. if (unlikely(!id)) {
  247. error(
  248. "requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", rrdhost_hostname(host),
  249. st ? rrdset_id(st) : "UNSET");
  250. goto disable;
  251. }
  252. if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
  253. error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", rrdhost_hostname(host));
  254. goto disable;
  255. }
  256. long multiplier = 1;
  257. if (multiplier_s && *multiplier_s) {
  258. multiplier = strtol(multiplier_s, NULL, 0);
  259. if (unlikely(!multiplier))
  260. multiplier = 1;
  261. }
  262. long divisor = 1;
  263. if (likely(divisor_s && *divisor_s)) {
  264. divisor = strtol(divisor_s, NULL, 0);
  265. if (unlikely(!divisor))
  266. divisor = 1;
  267. }
  268. if (unlikely(!algorithm || !*algorithm))
  269. algorithm = "absolute";
  270. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  271. debug(
  272. D_PLUGINSD,
  273. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  274. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  275. options ? options : "");
  276. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  277. int unhide_dimension = 1;
  278. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  279. if (options && *options) {
  280. if (strstr(options, "obsolete") != NULL)
  281. rrddim_is_obsolete(st, rd);
  282. else
  283. rrddim_isnot_obsolete(st, rd);
  284. unhide_dimension = !strstr(options, "hidden");
  285. if (strstr(options, "noreset") != NULL)
  286. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  287. if (strstr(options, "nooverflow") != NULL)
  288. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  289. } else
  290. rrddim_isnot_obsolete(st, rd);
  291. if (likely(unhide_dimension)) {
  292. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  293. if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
  294. rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
  295. metaqueue_dimension_update_flags(rd);
  296. }
  297. } else {
  298. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  299. if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
  300. rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
  301. metaqueue_dimension_update_flags(rd);
  302. }
  303. }
  304. return PARSER_RC_OK;
  305. disable:
  306. ((PARSER_USER_OBJECT *)user)->enabled = 0;
  307. return PARSER_RC_ERROR;
  308. }
  309. // ----------------------------------------------------------------------------
  310. // execution of functions
  311. struct inflight_function {
  312. int code;
  313. int timeout;
  314. BUFFER *destination_wb;
  315. STRING *function;
  316. void (*callback)(BUFFER *wb, int code, void *callback_data);
  317. void *callback_data;
  318. usec_t timeout_ut;
  319. usec_t started_ut;
  320. usec_t sent_ut;
  321. };
  322. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  323. struct inflight_function *pf = func;
  324. PARSER *parser = parser_ptr;
  325. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  326. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  327. char buffer[2048 + 1];
  328. snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
  329. dictionary_acquired_item_name(item),
  330. pf->timeout,
  331. string2str(pf->function));
  332. // send the command to the plugin
  333. int ret = send_to_plugin(buffer, parser);
  334. pf->sent_ut = now_realtime_usec();
  335. if(ret < 0) {
  336. error("FUNCTION: failed to send function to plugin, fprintf() returned error %d", ret);
  337. rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
  338. }
  339. else {
  340. internal_error(LOG_FUNCTIONS,
  341. "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
  342. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  343. pf->sent_ut - pf->started_ut);
  344. }
  345. }
  346. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  347. struct inflight_function *pf = new_func;
  348. error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  349. pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  350. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  351. string_freez(pf->function);
  352. return false;
  353. }
  354. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
  355. struct inflight_function *pf = func;
  356. internal_error(LOG_FUNCTIONS,
  357. "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
  358. string2str(pf->function), dictionary_acquired_item_name(item),
  359. buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
  360. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  361. string_freez(pf->function);
  362. }
  363. void inflight_functions_init(PARSER *parser) {
  364. parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  365. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  366. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  367. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  368. }
  369. static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
  370. parser->inflight.smaller_timeout = 0;
  371. struct inflight_function *pf;
  372. dfe_start_write(parser->inflight.functions, pf) {
  373. if (pf->timeout_ut < now) {
  374. internal_error(true,
  375. "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
  376. string2str(pf->function), pf_dfe.name, now - pf->started_ut);
  377. if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
  378. pf->code = rrd_call_function_error(pf->destination_wb,
  379. "Timeout waiting for collector response.",
  380. HTTP_RESP_GATEWAY_TIMEOUT);
  381. dictionary_del(parser->inflight.functions, pf_dfe.name);
  382. }
  383. else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
  384. parser->inflight.smaller_timeout = pf->timeout_ut;
  385. }
  386. dfe_done(pf);
  387. }
  388. // this is the function that is called from
  389. // rrd_call_function_and_wait() and rrd_call_function_async()
  390. static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
  391. PARSER *parser = collector_data;
  392. usec_t now = now_realtime_usec();
  393. struct inflight_function tmp = {
  394. .started_ut = now,
  395. .timeout_ut = now + timeout * USEC_PER_SEC,
  396. .destination_wb = destination_wb,
  397. .timeout = timeout,
  398. .function = string_strdupz(function),
  399. .callback = callback,
  400. .callback_data = callback_data,
  401. };
  402. uuid_t uuid;
  403. uuid_generate_time(uuid);
  404. char key[UUID_STR_LEN];
  405. uuid_unparse_lower(uuid, key);
  406. dictionary_write_lock(parser->inflight.functions);
  407. // if there is any error, our dictionary callbacks will call the caller callback to notify
  408. // the caller about the error - no need for error handling here.
  409. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  410. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  411. parser->inflight.smaller_timeout = tmp.timeout_ut;
  412. // garbage collect stale inflight functions
  413. if(parser->inflight.smaller_timeout < now)
  414. inflight_functions_garbage_collect(parser, now);
  415. dictionary_write_unlock(parser->inflight.functions);
  416. return HTTP_RESP_OK;
  417. }
  418. PARSER_RC pluginsd_function(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  419. {
  420. bool global = false;
  421. size_t i = 1;
  422. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  423. i++;
  424. global = true;
  425. }
  426. char *name = get_word(words, num_words, i++);
  427. char *timeout_s = get_word(words, num_words, i++);
  428. char *help = get_word(words, num_words, i++);
  429. RRDSET *st = (global)?NULL:((PARSER_USER_OBJECT *) user)->st;
  430. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  431. if (unlikely(!host || !timeout_s || !name || !help || (!global && !st))) {
  432. error("requested a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'), host '%s', chart '%s'. Ignoring it.",
  433. global?"yes":"no",
  434. name?name:"(unset)",
  435. timeout_s?timeout_s:"(unset)",
  436. help?help:"(unset)",
  437. host?rrdhost_hostname(host):"(unset)",
  438. st?rrdset_id(st):"(unset)");
  439. return PARSER_RC_OK;
  440. }
  441. int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  442. if (timeout_s && *timeout_s) {
  443. timeout = str2i(timeout_s);
  444. if (unlikely(timeout <= 0))
  445. timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  446. }
  447. PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
  448. rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
  449. return PARSER_RC_OK;
  450. }
  451. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  452. STRING *key = action_data;
  453. if(key)
  454. dictionary_del(parser->inflight.functions, string2str(key));
  455. string_freez(key);
  456. }
  457. PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  458. {
  459. char *key = get_word(words, num_words, 1);
  460. char *status = get_word(words, num_words, 2);
  461. char *format = get_word(words, num_words, 3);
  462. char *expires = get_word(words, num_words, 4);
  463. if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
  464. error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  465. , key ? key : "(unset)"
  466. , status ? status : "(unset)"
  467. , format ? format : "(unset)"
  468. , expires ? expires : "(unset)"
  469. );
  470. }
  471. int code = (status && *status) ? str2i(status) : 0;
  472. if (code <= 0)
  473. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  474. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  475. PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
  476. struct inflight_function *pf = NULL;
  477. if(key && *key)
  478. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
  479. if(!pf) {
  480. error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
  481. }
  482. else {
  483. if(format && *format)
  484. pf->destination_wb->contenttype = functions_format_to_content_type(format);
  485. pf->code = code;
  486. pf->destination_wb->expires = expiration;
  487. if(expiration <= now_realtime_sec())
  488. buffer_no_cacheable(pf->destination_wb);
  489. else
  490. buffer_cacheable(pf->destination_wb);
  491. }
  492. parser->defer.response = (pf) ? pf->destination_wb : NULL;
  493. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  494. parser->defer.action = pluginsd_function_result_end;
  495. parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
  496. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  497. return PARSER_RC_OK;
  498. }
  499. // ----------------------------------------------------------------------------
  500. PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  501. {
  502. char *name = get_word(words, num_words, 1);
  503. char *value = get_word(words, num_words, 2);
  504. NETDATA_DOUBLE v;
  505. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  506. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  507. int global = (st) ? 0 : 1;
  508. if (name && *name) {
  509. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  510. global = 1;
  511. name = get_word(words, num_words, 2);
  512. value = get_word(words, num_words, 3);
  513. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  514. global = 0;
  515. name = get_word(words, num_words, 2);
  516. value = get_word(words, num_words, 3);
  517. }
  518. }
  519. if (unlikely(!name || !*name)) {
  520. error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", rrdhost_hostname(host));
  521. ((PARSER_USER_OBJECT *)user)->enabled = 0;
  522. return PARSER_RC_ERROR;
  523. }
  524. if (unlikely(!value || !*value))
  525. value = NULL;
  526. if (unlikely(!value)) {
  527. error("cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name,
  528. rrdhost_hostname(host));
  529. return PARSER_RC_OK;
  530. }
  531. if (!global && !st) {
  532. error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, rrdhost_hostname(host));
  533. return PARSER_RC_OK;
  534. }
  535. char *endptr = NULL;
  536. v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
  537. if (unlikely(endptr && *endptr)) {
  538. if (endptr == value)
  539. error(
  540. "the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name,
  541. rrdhost_hostname(host));
  542. else
  543. error(
  544. "the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name, rrdhost_hostname(host),
  545. endptr);
  546. }
  547. if (global) {
  548. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  549. if (rva) {
  550. rrdvar_custom_host_variable_set(host, rva, v);
  551. rrdvar_custom_host_variable_release(host, rva);
  552. }
  553. else
  554. error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, rrdhost_hostname(host));
  555. } else {
  556. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  557. if (rsa) {
  558. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  559. rrdsetvar_custom_chart_variable_release(st, rsa);
  560. }
  561. else
  562. error("cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, rrdhost_hostname(host), rrdset_id(st));
  563. }
  564. return PARSER_RC_OK;
  565. }
  566. PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  567. {
  568. debug(D_PLUGINSD, "requested a FLUSH");
  569. ((PARSER_USER_OBJECT *) user)->st = NULL;
  570. ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
  571. ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
  572. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
  573. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
  574. return PARSER_RC_OK;
  575. }
  576. PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused, PLUGINSD_ACTION *plugins_action __maybe_unused)
  577. {
  578. info("called DISABLE. Disabling it.");
  579. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  580. return PARSER_RC_ERROR;
  581. }
  582. PARSER_RC pluginsd_label(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  583. {
  584. const char *name = get_word(words, num_words, 1);
  585. const char *label_source = get_word(words, num_words, 2);
  586. const char *value = get_word(words, num_words, 3);
  587. if (!name || !label_source || !value) {
  588. error("Ignoring malformed or empty LABEL command.");
  589. return PARSER_RC_OK;
  590. }
  591. char *store = (char *)value;
  592. bool allocated_store = false;
  593. if(unlikely(num_words > 4)) {
  594. allocated_store = true;
  595. store = mallocz(PLUGINSD_LINE_MAX + 1);
  596. size_t remaining = PLUGINSD_LINE_MAX;
  597. char *move = store;
  598. char *word;
  599. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  600. if(i > 3) {
  601. *move++ = ' ';
  602. *move = '\0';
  603. remaining--;
  604. }
  605. size_t length = strlen(word);
  606. if (length > remaining)
  607. length = remaining;
  608. remaining -= length;
  609. memcpy(move, word, length);
  610. move += length;
  611. *move = '\0';
  612. }
  613. }
  614. if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
  615. ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
  616. rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
  617. name,
  618. store,
  619. str2l(label_source));
  620. if (allocated_store)
  621. freez(store);
  622. return PARSER_RC_OK;
  623. }
  624. PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  625. {
  626. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  627. debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  628. if(!host->rrdlabels)
  629. host->rrdlabels = rrdlabels_create();
  630. rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
  631. metaqueue_store_host_labels(host->machine_guid);
  632. rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
  633. ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
  634. return PARSER_RC_OK;
  635. }
  636. PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  637. {
  638. const char *name = get_word(words, num_words, 1);
  639. const char *value = get_word(words, num_words, 2);
  640. const char *label_source = get_word(words, num_words, 3);
  641. if (!name || !value || !*label_source) {
  642. error("Ignoring malformed or empty CHART LABEL command.");
  643. return PARSER_RC_OK;
  644. }
  645. if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
  646. ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels;
  647. rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
  648. }
  649. rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
  650. name, value, str2l(label_source));
  651. return PARSER_RC_OK;
  652. }
  653. PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  654. {
  655. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  656. RRDSET *st = ((PARSER_USER_OBJECT *)user)->st;
  657. if (unlikely(!st))
  658. return PARSER_RC_OK;
  659. debug(D_PLUGINSD, "requested to commit chart labels");
  660. if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
  661. error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", rrdhost_hostname(host));
  662. return PARSER_RC_OK;
  663. }
  664. rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
  665. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  666. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  667. ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL;
  668. return PARSER_RC_OK;
  669. }
  670. PARSER_RC pluginsd_guid(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
  671. {
  672. char *uuid_str = get_word(words, num_words, 1);
  673. uuid_t uuid;
  674. if (unlikely(!uuid_str)) {
  675. error("requested a GUID, without a uuid.");
  676. return PARSER_RC_ERROR;
  677. }
  678. if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) {
  679. error("requested a GUID, without a valid uuid string.");
  680. return PARSER_RC_ERROR;
  681. }
  682. debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str);
  683. if (plugins_action->guid_action) {
  684. return plugins_action->guid_action(user, &uuid);
  685. }
  686. return PARSER_RC_OK;
  687. }
  688. PARSER_RC pluginsd_context(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
  689. {
  690. char *uuid_str = get_word(words, num_words, 1);
  691. uuid_t uuid;
  692. if (unlikely(!uuid_str)) {
  693. error("requested a CONTEXT, without a uuid.");
  694. return PARSER_RC_ERROR;
  695. }
  696. if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) {
  697. error("requested a CONTEXT, without a valid uuid string.");
  698. return PARSER_RC_ERROR;
  699. }
  700. debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str);
  701. if (plugins_action->context_action) {
  702. return plugins_action->context_action(user, &uuid);
  703. }
  704. return PARSER_RC_OK;
  705. }
  706. PARSER_RC pluginsd_tombstone(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
  707. {
  708. char *uuid_str = get_word(words, num_words, 1);
  709. uuid_t uuid;
  710. if (unlikely(!uuid_str)) {
  711. error("requested a TOMBSTONE, without a uuid.");
  712. return PARSER_RC_ERROR;
  713. }
  714. if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) {
  715. error("requested a TOMBSTONE, without a valid uuid string.");
  716. return PARSER_RC_ERROR;
  717. }
  718. debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str);
  719. if (plugins_action->tombstone_action) {
  720. return plugins_action->tombstone_action(user, &uuid);
  721. }
  722. return PARSER_RC_OK;
  723. }
  724. PARSER_RC metalog_pluginsd_host(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
  725. {
  726. char *machine_guid = get_word(words, num_words, 1);
  727. char *hostname = get_word(words, num_words, 2);
  728. char *registry_hostname = get_word(words, num_words, 3);
  729. char *update_every_s = get_word(words, num_words, 4);
  730. char *os = get_word(words, num_words, 5);
  731. char *timezone = get_word(words, num_words, 6);
  732. char *tags = get_word(words, num_words, 7);
  733. int update_every = 1;
  734. if (likely(update_every_s && *update_every_s))
  735. update_every = str2i(update_every_s);
  736. if (unlikely(!update_every))
  737. update_every = 1;
  738. debug(D_PLUGINSD, "HOST PARSED: guid=%s, hostname=%s, reg_host=%s, update=%d, os=%s, timezone=%s, tags=%s",
  739. machine_guid, hostname, registry_hostname, update_every, os, timezone, tags);
  740. if (plugins_action->host_action) {
  741. return plugins_action->host_action(
  742. user, machine_guid, hostname, registry_hostname, update_every, os, timezone, tags);
  743. }
  744. return PARSER_RC_OK;
  745. }
  746. PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  747. {
  748. char *id = get_word(words, num_words, 1);
  749. char *start_time_str = get_word(words, num_words, 2);
  750. char *end_time_str = get_word(words, num_words, 3);
  751. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  752. RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
  753. if (unlikely(!id || (!st && !*id))) {
  754. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
  755. goto disable;
  756. }
  757. if(*id) {
  758. st = rrdset_find(host, id);
  759. if (unlikely(!st)) {
  760. error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
  761. id, rrdhost_hostname(host));
  762. goto disable;
  763. }
  764. ((PARSER_USER_OBJECT *) user)->st = st;
  765. ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
  766. ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
  767. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
  768. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
  769. }
  770. if(start_time_str && end_time_str) {
  771. time_t start_time = strtol(start_time_str, NULL, 0);
  772. time_t end_time = strtol(end_time_str, NULL, 0);
  773. if(start_time && end_time) {
  774. if (start_time > end_time) {
  775. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
  776. rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time);
  777. goto disable;
  778. }
  779. if (end_time - start_time != st->update_every)
  780. rrdset_set_update_every(st, end_time - start_time);
  781. st->last_collected_time.tv_sec = end_time;
  782. st->last_collected_time.tv_usec = 0;
  783. st->last_updated.tv_sec = end_time;
  784. st->last_updated.tv_usec = 0;
  785. ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
  786. ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
  787. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
  788. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
  789. st->counter++;
  790. st->counter_done++;
  791. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  792. st->current_entry++;
  793. if(st->current_entry >= st->entries)
  794. st->current_entry -= st->entries;
  795. }
  796. }
  797. return PARSER_RC_OK;
  798. disable:
  799. ((PARSER_USER_OBJECT *)user)->enabled = 0;
  800. return PARSER_RC_ERROR;
  801. }
  802. PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  803. {
  804. char *dimension = get_word(words, num_words, 1);
  805. char *value_str = get_word(words, num_words, 2);
  806. char *flags_str = get_word(words, num_words, 3);
  807. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  808. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  809. if (unlikely(!st)) {
  810. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
  811. dimension, rrdhost_hostname(host));
  812. goto disable;
  813. }
  814. if (unlikely(!dimension || !*dimension)) {
  815. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
  816. rrdset_id(st), rrdhost_hostname(host));
  817. goto disable;
  818. }
  819. if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
  820. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without timings from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
  821. dimension, rrdhost_hostname(host));
  822. goto disable;
  823. }
  824. if (unlikely(!value_str || !*value_str))
  825. value_str = "nan";
  826. if(unlikely(!flags_str))
  827. flags_str = "";
  828. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  829. debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str);
  830. if (likely(value_str)) {
  831. RRDDIM *rd = rrddim_find(st, dimension);
  832. if(unlikely(!rd)) {
  833. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
  834. dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
  835. goto disable;
  836. }
  837. else {
  838. NETDATA_DOUBLE value = strtondd(value_str, NULL);
  839. SN_FLAGS flags = SN_FLAG_NONE;
  840. char c;
  841. while((c = *flags_str++)) {
  842. switch(c) {
  843. case 'R':
  844. flags |= SN_FLAG_RESET;
  845. break;
  846. case 'E':
  847. flags |= SN_EMPTY_SLOT;
  848. value = NAN;
  849. break;
  850. default:
  851. error("unknown flag '%c'", c);
  852. break;
  853. }
  854. }
  855. if(!netdata_double_isnumber(value)) {
  856. value = NAN;
  857. flags = SN_EMPTY_SLOT;
  858. }
  859. rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags);
  860. rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time;
  861. rd->last_collected_time.tv_usec = 0;
  862. rd->collections_counter++;
  863. }
  864. }
  865. return PARSER_RC_OK;
  866. disable:
  867. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  868. return PARSER_RC_ERROR;
  869. }
  870. PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  871. {
  872. char *dimension = get_word(words, num_words, 1);
  873. char *last_collected_ut_str = get_word(words, num_words, 2);
  874. char *last_collected_value_str = get_word(words, num_words, 3);
  875. char *last_calculated_value_str = get_word(words, num_words, 4);
  876. char *last_stored_value_str = get_word(words, num_words, 5);
  877. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  878. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  879. if (unlikely(!st)) {
  880. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
  881. dimension, rrdhost_hostname(host));
  882. goto disable;
  883. }
  884. if (unlikely(!dimension || !*dimension)) {
  885. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
  886. rrdset_id(st), rrdhost_hostname(host));
  887. goto disable;
  888. }
  889. RRDDIM *rd = rrddim_find(st, dimension);
  890. if(unlikely(!rd)) {
  891. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
  892. dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
  893. goto disable;
  894. }
  895. usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
  896. usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
  897. if(last_collected_ut > dim_last_collected_ut) {
  898. rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
  899. rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
  900. }
  901. rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
  902. rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
  903. rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
  904. return PARSER_RC_OK;
  905. disable:
  906. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  907. return PARSER_RC_ERROR;
  908. }
  909. PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  910. {
  911. char *last_collected_ut_str = get_word(words, num_words, 1);
  912. char *last_updated_ut_str = get_word(words, num_words, 2);
  913. char *last_collected_total_str = get_word(words, num_words, 3);
  914. char *collected_total_str = get_word(words, num_words, 4);
  915. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  916. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  917. if (unlikely(!st)) {
  918. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
  919. rrdhost_hostname(host));
  920. goto disable;
  921. }
  922. usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
  923. usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
  924. if(last_collected_ut > chart_last_collected_ut) {
  925. st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
  926. st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
  927. }
  928. usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
  929. usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
  930. if(last_updated_ut > chart_last_updated_ut) {
  931. st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC;
  932. st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC;
  933. }
  934. st->last_collected_total = last_collected_total_str ? strtoll(last_collected_total_str, NULL, 0) : 0;
  935. st->collected_total = collected_total_str ? strtoll(collected_total_str, NULL, 0) : 0;
  936. st->counter++;
  937. st->counter_done++;
  938. return PARSER_RC_OK;
  939. disable:
  940. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  941. return PARSER_RC_ERROR;
  942. }
  943. PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
  944. {
  945. if (num_words < 7) {
  946. error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
  947. return PARSER_RC_ERROR;
  948. }
  949. time_t update_every_child = str2l(get_word(words, num_words, 1));
  950. time_t first_entry_child = str2l(get_word(words, num_words, 2));
  951. time_t last_entry_child = str2l(get_word(words, num_words, 3));
  952. bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0);
  953. time_t first_entry_requested = str2l(get_word(words, num_words, 5));
  954. time_t last_entry_requested = str2l(get_word(words, num_words, 6));
  955. PARSER_USER_OBJECT *user_object = user;
  956. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  957. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  958. if (unlikely(!st)) {
  959. error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
  960. rrdhost_hostname(host));
  961. return PARSER_RC_ERROR;
  962. }
  963. ((PARSER_USER_OBJECT *) user)->st = NULL;
  964. ((PARSER_USER_OBJECT *) user)->count++;
  965. st->counter++;
  966. st->counter_done++;
  967. if (start_streaming) {
  968. if (st->update_every != update_every_child)
  969. rrdset_set_update_every(st, update_every_child);
  970. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  971. rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
  972. return PARSER_RC_OK;
  973. }
  974. bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child,
  975. first_entry_requested, last_entry_requested);
  976. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  977. }
  978. static void pluginsd_process_thread_cleanup(void *ptr) {
  979. PARSER *parser = (PARSER *)ptr;
  980. rrd_collector_finished();
  981. parser_destroy(parser);
  982. }
  983. // New plugins.d parser
  984. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  985. {
  986. int enabled = cd->enabled;
  987. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  988. cd->enabled = 0;
  989. return 0;
  990. }
  991. if (unlikely(fileno(fp_plugin_input) == -1)) {
  992. error("input file descriptor given is not a valid stream");
  993. cd->serial_failures++;
  994. return 0;
  995. }
  996. if (unlikely(fileno(fp_plugin_output) == -1)) {
  997. error("output file descriptor given is not a valid stream");
  998. cd->serial_failures++;
  999. return 0;
  1000. }
  1001. clearerr(fp_plugin_input);
  1002. clearerr(fp_plugin_output);
  1003. PARSER_USER_OBJECT user = {
  1004. .enabled = cd->enabled,
  1005. .host = host,
  1006. .cd = cd,
  1007. .trust_durations = trust_durations
  1008. };
  1009. // fp_plugin_output = our input; fp_plugin_input = our output
  1010. PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT, NULL);
  1011. rrd_collector_started();
  1012. // this keeps the parser with its current value
  1013. // so, parser needs to be allocated before pushing it
  1014. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
  1015. user.parser = parser;
  1016. while (likely(!parser_next(parser))) {
  1017. if (unlikely(netdata_exit || parser_action(parser, NULL)))
  1018. break;
  1019. }
  1020. // free parser with the pop function
  1021. netdata_thread_cleanup_pop(1);
  1022. cd->enabled = user.enabled;
  1023. size_t count = user.count;
  1024. if (likely(count)) {
  1025. cd->successful_collections += count;
  1026. cd->serial_failures = 0;
  1027. }
  1028. else
  1029. cd->serial_failures++;
  1030. return count;
  1031. }