pluginsd_functions.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_functions.h"
  3. #define LOG_FUNCTIONS false
  4. // ----------------------------------------------------------------------------
  5. // execution of functions
  6. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  7. struct inflight_function *pf = func;
  8. PARSER *parser = parser_ptr;
  9. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  10. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  11. const char *transaction = dictionary_acquired_item_name(item);
  12. int rc = uuid_parse_flexi(transaction, pf->transaction);
  13. if(rc != 0)
  14. netdata_log_error("FUNCTION: '%s': cannot parse transaction UUID", string2str(pf->function));
  15. CLEAN_BUFFER *buffer = buffer_create(1024, NULL);
  16. if(pf->payload && buffer_strlen(pf->payload)) {
  17. buffer_sprintf(
  18. buffer,
  19. PLUGINSD_KEYWORD_FUNCTION_PAYLOAD " %s %d \"%s\" \"%s\" \"%s\"\n",
  20. transaction,
  21. pf->timeout_s,
  22. string2str(pf->function),
  23. pf->source ? pf->source : "",
  24. content_type_id2string(pf->payload->content_type)
  25. );
  26. buffer_fast_strcat(buffer, buffer_tostring(pf->payload), buffer_strlen(pf->payload));
  27. buffer_strcat(buffer, "\nFUNCTION_PAYLOAD_END\n");
  28. }
  29. else {
  30. buffer_sprintf(
  31. buffer,
  32. PLUGINSD_KEYWORD_FUNCTION " %s %d \"%s\" \"%s\"\n",
  33. transaction,
  34. pf->timeout_s,
  35. string2str(pf->function),
  36. pf->source ? pf->source : ""
  37. );
  38. }
  39. // send the command to the plugin
  40. // IMPORTANT: make sure all commands are sent in 1 call, because in streaming they may interfere with others
  41. ssize_t ret = send_to_plugin(buffer_tostring(buffer), parser);
  42. pf->sent_monotonic_ut = now_monotonic_usec();
  43. if(ret < 0) {
  44. pf->sent_successfully = false;
  45. pf->code = HTTP_RESP_SERVICE_UNAVAILABLE;
  46. netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
  47. rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", pf->code);
  48. }
  49. else {
  50. pf->sent_successfully = true;
  51. internal_error(LOG_FUNCTIONS,
  52. "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
  53. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  54. pf->sent_monotonic_ut - pf->started_monotonic_ut);
  55. }
  56. }
  57. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  58. struct inflight_function *pf = new_func;
  59. netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  60. pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  61. pf->result.cb(pf->result_body_wb, pf->code, pf->result.data);
  62. string_freez(pf->function);
  63. return false;
  64. }
  65. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
  66. struct inflight_function *pf = func;
  67. struct parser *parser = (struct parser *)parser_ptr; (void)parser;
  68. internal_error(LOG_FUNCTIONS,
  69. "FUNCTION '%s' result of transaction '%s' received from collector "
  70. "(%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
  71. string2str(pf->function), dictionary_acquired_item_name(item),
  72. buffer_strlen(pf->result_body_wb),
  73. pf->sent_monotonic_ut - pf->started_monotonic_ut, now_realtime_usec() - pf->sent_monotonic_ut);
  74. pf->result.cb(pf->result_body_wb, pf->code, pf->result.data);
  75. string_freez(pf->function);
  76. buffer_free((void *)pf->payload);
  77. freez((void *)pf->source);
  78. }
  79. void pluginsd_inflight_functions_init(PARSER *parser) {
  80. parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
  81. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  82. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  83. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  84. }
  85. void pluginsd_inflight_functions_cleanup(PARSER *parser) {
  86. dictionary_destroy(parser->inflight.functions);
  87. }
  88. // ----------------------------------------------------------------------------
  89. void pluginsd_inflight_functions_garbage_collect(PARSER *parser, usec_t now_ut) {
  90. parser->inflight.smaller_monotonic_timeout_ut = 0;
  91. struct inflight_function *pf;
  92. dfe_start_write(parser->inflight.functions, pf) {
  93. if (*pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < now_ut) {
  94. internal_error(true,
  95. "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.",
  96. string2str(pf->function), pf_dfe.name, now_ut - pf->started_monotonic_ut);
  97. if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
  98. pf->code = rrd_call_function_error(pf->result_body_wb,
  99. "Timeout waiting for collector response.",
  100. HTTP_RESP_GATEWAY_TIMEOUT);
  101. dictionary_del(parser->inflight.functions, pf_dfe.name);
  102. }
  103. else if(!parser->inflight.smaller_monotonic_timeout_ut || *pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < parser->inflight.smaller_monotonic_timeout_ut)
  104. parser->inflight.smaller_monotonic_timeout_ut = *pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
  105. }
  106. dfe_done(pf);
  107. }
  108. // ----------------------------------------------------------------------------
  109. static void pluginsd_function_cancel(void *data) {
  110. struct inflight_function *look_for = data, *t;
  111. bool sent = false;
  112. dfe_start_read(look_for->parser->inflight.functions, t) {
  113. if(look_for == t) {
  114. const char *transaction = t_dfe.name;
  115. internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
  116. char buffer[2048];
  117. snprintfz(buffer, sizeof(buffer), PLUGINSD_KEYWORD_FUNCTION_CANCEL " %s\n", transaction);
  118. // send the command to the plugin
  119. ssize_t ret = send_to_plugin(buffer, t->parser);
  120. if(ret < 0)
  121. sent = true;
  122. break;
  123. }
  124. }
  125. dfe_done(t);
  126. if(sent <= 0)
  127. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  128. "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
  129. }
  130. static void pluginsd_function_progress_to_plugin(void *data) {
  131. struct inflight_function *look_for = data, *t;
  132. bool sent = false;
  133. dfe_start_read(look_for->parser->inflight.functions, t) {
  134. if(look_for == t) {
  135. const char *transaction = t_dfe.name;
  136. internal_error(true, "PLUGINSD: sending function progress to plugin for transaction '%s'", transaction);
  137. char buffer[2048];
  138. snprintfz(buffer, sizeof(buffer), PLUGINSD_KEYWORD_FUNCTION_PROGRESS " %s\n", transaction);
  139. // send the command to the plugin
  140. ssize_t ret = send_to_plugin(buffer, t->parser);
  141. if(ret < 0)
  142. sent = true;
  143. break;
  144. }
  145. }
  146. dfe_done(t);
  147. if(sent <= 0)
  148. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  149. "PLUGINSD: FUNCTION_PROGRESS request didn't match any pending function requests in pluginsd.d.");
  150. }
  151. // this is the function called from
  152. // rrd_call_function_and_wait() and rrd_call_function_async()
  153. int pluginsd_function_execute_cb(struct rrd_function_execute *rfe, void *data) {
  154. // IMPORTANT: this function MUST call the result_cb even on failures
  155. PARSER *parser = data;
  156. usec_t now_ut = now_monotonic_usec();
  157. int timeout_s = (int)((*rfe->stop_monotonic_ut - now_ut + USEC_PER_SEC / 2) / USEC_PER_SEC);
  158. struct inflight_function tmp = {
  159. .started_monotonic_ut = now_ut,
  160. .stop_monotonic_ut = rfe->stop_monotonic_ut,
  161. .result_body_wb = rfe->result.wb,
  162. .timeout_s = timeout_s,
  163. .function = string_strdupz(rfe->function),
  164. .payload = buffer_dup(rfe->payload),
  165. .source = rfe->source ? strdupz(rfe->source) : NULL,
  166. .parser = parser,
  167. .result = {
  168. .cb = rfe->result.cb,
  169. .data = rfe->result.data,
  170. },
  171. .progress = {
  172. .cb = rfe->progress.cb,
  173. .data = rfe->progress.data,
  174. },
  175. };
  176. uuid_copy(tmp.transaction, *rfe->transaction);
  177. char transaction_str[UUID_COMPACT_STR_LEN];
  178. uuid_unparse_lower_compact(tmp.transaction, transaction_str);
  179. dictionary_write_lock(parser->inflight.functions);
  180. // if there is any error, our dictionary callbacks will call the caller callback to notify
  181. // the caller about the error - no need for error handling here.
  182. struct inflight_function *t = dictionary_set(parser->inflight.functions, transaction_str, &tmp, sizeof(struct inflight_function));
  183. if(!t->sent_successfully) {
  184. int code = t->code;
  185. dictionary_write_unlock(parser->inflight.functions);
  186. dictionary_del(parser->inflight.functions, transaction_str);
  187. pluginsd_inflight_functions_garbage_collect(parser, now_ut);
  188. return code;
  189. }
  190. else {
  191. if (rfe->register_canceller.cb)
  192. rfe->register_canceller.cb(rfe->register_canceller.data, pluginsd_function_cancel, t);
  193. if (rfe->register_progresser.cb &&
  194. (parser->repertoire == PARSER_INIT_PLUGINSD || (parser->repertoire == PARSER_INIT_STREAMING &&
  195. stream_has_capability(&parser->user, STREAM_CAP_PROGRESS))))
  196. rfe->register_progresser.cb(rfe->register_progresser.data, pluginsd_function_progress_to_plugin, t);
  197. if (!parser->inflight.smaller_monotonic_timeout_ut ||
  198. *tmp.stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < parser->inflight.smaller_monotonic_timeout_ut)
  199. parser->inflight.smaller_monotonic_timeout_ut = *tmp.stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
  200. // garbage collect stale inflight functions
  201. if (parser->inflight.smaller_monotonic_timeout_ut < now_ut)
  202. pluginsd_inflight_functions_garbage_collect(parser, now_ut);
  203. dictionary_write_unlock(parser->inflight.functions);
  204. return HTTP_RESP_OK;
  205. }
  206. }
  207. PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
  208. // a plugin or a child is registering a function
  209. bool global = false;
  210. size_t i = 1;
  211. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  212. i++;
  213. global = true;
  214. }
  215. char *name = get_word(words, num_words, i++);
  216. char *timeout_str = get_word(words, num_words, i++);
  217. char *help = get_word(words, num_words, i++);
  218. char *tags = get_word(words, num_words, i++);
  219. char *access_str = get_word(words, num_words, i++);
  220. char *priority_str = get_word(words, num_words, i++);
  221. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION);
  222. if(!host) return PARSER_RC_ERROR;
  223. RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
  224. if(!st) global = true;
  225. if (unlikely(!timeout_str || !name || !help || (!global && !st))) {
  226. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
  227. rrdhost_hostname(host),
  228. st?rrdset_id(st):"(unset)",
  229. global?"yes":"no",
  230. name?name:"(unset)",
  231. timeout_str ? timeout_str : "(unset)",
  232. help?help:"(unset)"
  233. );
  234. return PARSER_RC_ERROR;
  235. }
  236. int timeout_s = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  237. if (timeout_str && *timeout_str) {
  238. timeout_s = str2i(timeout_str);
  239. if (unlikely(timeout_s <= 0))
  240. timeout_s = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  241. }
  242. int priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
  243. if(priority_str && *priority_str) {
  244. priority = str2i(priority_str);
  245. if(priority <= 0)
  246. priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
  247. }
  248. rrd_function_add(host, st, name, timeout_s, priority, help, tags,
  249. http_access2id(access_str), false,
  250. pluginsd_function_execute_cb, parser);
  251. parser->user.data_collections_count++;
  252. return PARSER_RC_OK;
  253. }
  254. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  255. STRING *key = action_data;
  256. if(key)
  257. dictionary_del(parser->inflight.functions, string2str(key));
  258. string_freez(key);
  259. parser->user.data_collections_count++;
  260. }
  261. static inline struct inflight_function *inflight_function_find(PARSER *parser, const char *transaction) {
  262. struct inflight_function *pf = NULL;
  263. if(transaction && *transaction)
  264. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, transaction);
  265. if(!pf)
  266. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", transaction ? transaction : "(unset)");
  267. return pf;
  268. }
  269. PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
  270. char *transaction = get_word(words, num_words, 1);
  271. char *status = get_word(words, num_words, 2);
  272. char *format = get_word(words, num_words, 3);
  273. char *expires = get_word(words, num_words, 4);
  274. if (unlikely(!transaction || !*transaction || !status || !*status || !format || !*format || !expires || !*expires)) {
  275. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  276. , transaction ? transaction : "(unset)"
  277. , status ? status : "(unset)"
  278. , format ? format : "(unset)"
  279. , expires ? expires : "(unset)"
  280. );
  281. }
  282. int code = (status && *status) ? str2i(status) : 0;
  283. if (code <= 0)
  284. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  285. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  286. struct inflight_function *pf = inflight_function_find(parser, transaction);
  287. if(pf) {
  288. if(format && *format)
  289. pf->result_body_wb->content_type = content_type_string2id(format);
  290. pf->code = code;
  291. pf->result_body_wb->expires = expiration;
  292. if(expiration <= now_realtime_sec())
  293. buffer_no_cacheable(pf->result_body_wb);
  294. else
  295. buffer_cacheable(pf->result_body_wb);
  296. }
  297. parser->defer.response = (pf) ? pf->result_body_wb : NULL;
  298. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  299. parser->defer.action = pluginsd_function_result_end;
  300. parser->defer.action_data = string_strdupz(transaction); // it is ok is key is NULL
  301. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  302. return PARSER_RC_OK;
  303. }
  304. PARSER_RC pluginsd_function_progress(char **words, size_t num_words, PARSER *parser) {
  305. size_t i = 1;
  306. char *transaction = get_word(words, num_words, i++);
  307. char *done_str = get_word(words, num_words, i++);
  308. char *all_str = get_word(words, num_words, i++);
  309. struct inflight_function *pf = inflight_function_find(parser, transaction);
  310. if(pf) {
  311. size_t done = done_str && *done_str ? str2u(done_str) : 0;
  312. size_t all = all_str && *all_str ? str2u(all_str) : 0;
  313. if(pf->progress.cb)
  314. pf->progress.cb(pf->progress.data, done, all);
  315. }
  316. return PARSER_RC_OK;
  317. }