pluginsd_functions.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_functions.h"
  3. #define LOG_FUNCTIONS false
  4. // ----------------------------------------------------------------------------
  5. // execution of functions
  6. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  7. struct inflight_function *pf = func;
  8. PARSER *parser = parser_ptr;
  9. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  10. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  11. const char *transaction = dictionary_acquired_item_name(item);
  12. int rc = uuid_parse_flexi(transaction, pf->transaction);
  13. if(rc != 0)
  14. netdata_log_error("FUNCTION: '%s': cannot parse transaction UUID", string2str(pf->function));
  15. CLEAN_BUFFER *buffer = buffer_create(1024, NULL);
  16. if(pf->payload && buffer_strlen(pf->payload)) {
  17. buffer_sprintf(
  18. buffer,
  19. PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN " %s %d \"%s\" \""HTTP_ACCESS_FORMAT"\" \"%s\" \"%s\"\n",
  20. transaction,
  21. pf->timeout_s,
  22. string2str(pf->function),
  23. (HTTP_ACCESS_FORMAT_CAST)pf->access,
  24. pf->source ? pf->source : "",
  25. content_type_id2string(pf->payload->content_type)
  26. );
  27. buffer_fast_strcat(buffer, buffer_tostring(pf->payload), buffer_strlen(pf->payload));
  28. buffer_strcat(buffer, "\nFUNCTION_PAYLOAD_END\n");
  29. }
  30. else {
  31. buffer_sprintf(
  32. buffer,
  33. PLUGINSD_CALL_FUNCTION " %s %d \"%s\" \""HTTP_ACCESS_FORMAT"\" \"%s\"\n",
  34. transaction,
  35. pf->timeout_s,
  36. string2str(pf->function),
  37. (HTTP_ACCESS_FORMAT_CAST)pf->access,
  38. pf->source ? pf->source : ""
  39. );
  40. }
  41. // send the command to the plugin
  42. // IMPORTANT: make sure all commands are sent in 1 call, because in streaming they may interfere with others
  43. ssize_t ret = send_to_plugin(buffer_tostring(buffer), parser);
  44. pf->sent_monotonic_ut = now_monotonic_usec();
  45. if(ret < 0) {
  46. pf->sent_successfully = false;
  47. pf->code = HTTP_RESP_SERVICE_UNAVAILABLE;
  48. netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
  49. rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", pf->code);
  50. }
  51. else {
  52. pf->sent_successfully = true;
  53. internal_error(LOG_FUNCTIONS,
  54. "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
  55. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  56. pf->sent_monotonic_ut - pf->started_monotonic_ut);
  57. }
  58. }
  59. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  60. struct inflight_function *pf = new_func;
  61. netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  62. pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  63. pf->result.cb(pf->result_body_wb, pf->code, pf->result.data);
  64. string_freez(pf->function);
  65. return false;
  66. }
  67. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
  68. struct inflight_function *pf = func;
  69. struct parser *parser = (struct parser *)parser_ptr; (void)parser;
  70. internal_error(LOG_FUNCTIONS,
  71. "FUNCTION '%s' result of transaction '%s' received from collector "
  72. "(%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
  73. string2str(pf->function), dictionary_acquired_item_name(item),
  74. buffer_strlen(pf->result_body_wb),
  75. pf->sent_monotonic_ut - pf->started_monotonic_ut, now_realtime_usec() - pf->sent_monotonic_ut);
  76. pf->result.cb(pf->result_body_wb, pf->code, pf->result.data);
  77. string_freez(pf->function);
  78. buffer_free((void *)pf->payload);
  79. freez((void *)pf->source);
  80. }
  81. void pluginsd_inflight_functions_init(PARSER *parser) {
  82. parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
  83. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  84. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  85. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  86. }
  87. void pluginsd_inflight_functions_cleanup(PARSER *parser) {
  88. dictionary_destroy(parser->inflight.functions);
  89. }
  90. // ----------------------------------------------------------------------------
  91. void pluginsd_inflight_functions_garbage_collect(PARSER *parser, usec_t now_ut) {
  92. parser->inflight.smaller_monotonic_timeout_ut = 0;
  93. struct inflight_function *pf;
  94. dfe_start_write(parser->inflight.functions, pf) {
  95. if (*pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < now_ut) {
  96. internal_error(true,
  97. "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.",
  98. string2str(pf->function), pf_dfe.name, now_ut - pf->started_monotonic_ut);
  99. if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
  100. pf->code = rrd_call_function_error(pf->result_body_wb,
  101. "Timeout waiting for collector response.",
  102. HTTP_RESP_GATEWAY_TIMEOUT);
  103. dictionary_del(parser->inflight.functions, pf_dfe.name);
  104. }
  105. else if(!parser->inflight.smaller_monotonic_timeout_ut || *pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < parser->inflight.smaller_monotonic_timeout_ut)
  106. parser->inflight.smaller_monotonic_timeout_ut = *pf->stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
  107. }
  108. dfe_done(pf);
  109. }
  110. // ----------------------------------------------------------------------------
  111. static void pluginsd_function_cancel(void *data) {
  112. struct inflight_function *look_for = data, *t;
  113. bool sent = false;
  114. dfe_start_read(look_for->parser->inflight.functions, t) {
  115. if(look_for == t) {
  116. const char *transaction = t_dfe.name;
  117. internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
  118. char buffer[2048];
  119. snprintfz(buffer, sizeof(buffer), PLUGINSD_CALL_FUNCTION_CANCEL " %s\n", transaction);
  120. // send the command to the plugin
  121. ssize_t ret = send_to_plugin(buffer, t->parser);
  122. if(ret < 0)
  123. sent = true;
  124. break;
  125. }
  126. }
  127. dfe_done(t);
  128. if(sent <= 0)
  129. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  130. "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
  131. }
  132. static void pluginsd_function_progress_to_plugin(void *data) {
  133. struct inflight_function *look_for = data, *t;
  134. bool sent = false;
  135. dfe_start_read(look_for->parser->inflight.functions, t) {
  136. if(look_for == t) {
  137. const char *transaction = t_dfe.name;
  138. internal_error(true, "PLUGINSD: sending function progress to plugin for transaction '%s'", transaction);
  139. char buffer[2048];
  140. snprintfz(buffer, sizeof(buffer), PLUGINSD_CALL_FUNCTION_PROGRESS " %s\n", transaction);
  141. // send the command to the plugin
  142. ssize_t ret = send_to_plugin(buffer, t->parser);
  143. if(ret < 0)
  144. sent = true;
  145. break;
  146. }
  147. }
  148. dfe_done(t);
  149. if(sent <= 0)
  150. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  151. "PLUGINSD: FUNCTION_PROGRESS request didn't match any pending function requests in pluginsd.d.");
  152. }
  153. // this is the function called from
  154. // rrd_call_function_and_wait() and rrd_call_function_async()
  155. int pluginsd_function_execute_cb(struct rrd_function_execute *rfe, void *data) {
  156. // IMPORTANT: this function MUST call the result_cb even on failures
  157. PARSER *parser = data;
  158. usec_t now_ut = now_monotonic_usec();
  159. int timeout_s = (int)((*rfe->stop_monotonic_ut - now_ut + USEC_PER_SEC / 2) / USEC_PER_SEC);
  160. struct inflight_function tmp = {
  161. .started_monotonic_ut = now_ut,
  162. .stop_monotonic_ut = rfe->stop_monotonic_ut,
  163. .result_body_wb = rfe->result.wb,
  164. .timeout_s = timeout_s,
  165. .function = string_strdupz(rfe->function),
  166. .payload = buffer_dup(rfe->payload),
  167. .access = rfe->user_access,
  168. .source = rfe->source ? strdupz(rfe->source) : NULL,
  169. .parser = parser,
  170. .result = {
  171. .cb = rfe->result.cb,
  172. .data = rfe->result.data,
  173. },
  174. .progress = {
  175. .cb = rfe->progress.cb,
  176. .data = rfe->progress.data,
  177. },
  178. };
  179. uuid_copy(tmp.transaction, *rfe->transaction);
  180. char transaction_str[UUID_COMPACT_STR_LEN];
  181. uuid_unparse_lower_compact(tmp.transaction, transaction_str);
  182. dictionary_write_lock(parser->inflight.functions);
  183. // if there is any error, our dictionary callbacks will call the caller callback to notify
  184. // the caller about the error - no need for error handling here.
  185. struct inflight_function *t = dictionary_set(parser->inflight.functions, transaction_str, &tmp, sizeof(struct inflight_function));
  186. if(!t->sent_successfully) {
  187. int code = t->code;
  188. dictionary_write_unlock(parser->inflight.functions);
  189. dictionary_del(parser->inflight.functions, transaction_str);
  190. pluginsd_inflight_functions_garbage_collect(parser, now_ut);
  191. return code;
  192. }
  193. else {
  194. if (rfe->register_canceller.cb)
  195. rfe->register_canceller.cb(rfe->register_canceller.data, pluginsd_function_cancel, t);
  196. if (rfe->register_progresser.cb &&
  197. (parser->repertoire == PARSER_INIT_PLUGINSD || (parser->repertoire == PARSER_INIT_STREAMING &&
  198. stream_has_capability(&parser->user, STREAM_CAP_PROGRESS))))
  199. rfe->register_progresser.cb(rfe->register_progresser.data, pluginsd_function_progress_to_plugin, t);
  200. if (!parser->inflight.smaller_monotonic_timeout_ut ||
  201. *tmp.stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT < parser->inflight.smaller_monotonic_timeout_ut)
  202. parser->inflight.smaller_monotonic_timeout_ut = *tmp.stop_monotonic_ut + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
  203. // garbage collect stale inflight functions
  204. if (parser->inflight.smaller_monotonic_timeout_ut < now_ut)
  205. pluginsd_inflight_functions_garbage_collect(parser, now_ut);
  206. dictionary_write_unlock(parser->inflight.functions);
  207. return HTTP_RESP_OK;
  208. }
  209. }
  210. PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
  211. // a plugin or a child is registering a function
  212. bool global = false;
  213. size_t i = 1;
  214. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  215. i++;
  216. global = true;
  217. }
  218. char *name = get_word(words, num_words, i++);
  219. char *timeout_str = get_word(words, num_words, i++);
  220. char *help = get_word(words, num_words, i++);
  221. char *tags = get_word(words, num_words, i++);
  222. char *access_str = get_word(words, num_words, i++);
  223. char *priority_str = get_word(words, num_words, i++);
  224. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION);
  225. if(!host) return PARSER_RC_ERROR;
  226. RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
  227. if(!st) global = true;
  228. if (unlikely(!timeout_str || !name || !help || (!global && !st))) {
  229. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
  230. rrdhost_hostname(host),
  231. st?rrdset_id(st):"(unset)",
  232. global?"yes":"no",
  233. name?name:"(unset)",
  234. timeout_str ? timeout_str : "(unset)",
  235. help?help:"(unset)"
  236. );
  237. return PARSER_RC_ERROR;
  238. }
  239. int timeout_s = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  240. if (timeout_str && *timeout_str) {
  241. timeout_s = str2i(timeout_str);
  242. if (unlikely(timeout_s <= 0))
  243. timeout_s = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  244. }
  245. int priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
  246. if(priority_str && *priority_str) {
  247. priority = str2i(priority_str);
  248. if(priority <= 0)
  249. priority = RRDFUNCTIONS_PRIORITY_DEFAULT;
  250. }
  251. rrd_function_add(host, st, name, timeout_s, priority, help, tags,
  252. http_access_from_hex_mapping_old_roles(access_str), false,
  253. pluginsd_function_execute_cb, parser);
  254. parser->user.data_collections_count++;
  255. return PARSER_RC_OK;
  256. }
  257. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  258. STRING *key = action_data;
  259. if(key)
  260. dictionary_del(parser->inflight.functions, string2str(key));
  261. string_freez(key);
  262. parser->user.data_collections_count++;
  263. }
  264. static inline struct inflight_function *inflight_function_find(PARSER *parser, const char *transaction) {
  265. struct inflight_function *pf = NULL;
  266. if(transaction && *transaction)
  267. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, transaction);
  268. if(!pf)
  269. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", transaction ? transaction : "(unset)");
  270. return pf;
  271. }
  272. PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
  273. char *transaction = get_word(words, num_words, 1);
  274. char *status = get_word(words, num_words, 2);
  275. char *format = get_word(words, num_words, 3);
  276. char *expires = get_word(words, num_words, 4);
  277. if (unlikely(!transaction || !*transaction || !status || !*status || !format || !*format || !expires || !*expires)) {
  278. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  279. , transaction ? transaction : "(unset)"
  280. , status ? status : "(unset)"
  281. , format ? format : "(unset)"
  282. , expires ? expires : "(unset)"
  283. );
  284. }
  285. int code = (status && *status) ? str2i(status) : 0;
  286. if (code <= 0)
  287. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  288. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  289. struct inflight_function *pf = inflight_function_find(parser, transaction);
  290. if(pf) {
  291. if(format && *format)
  292. pf->result_body_wb->content_type = content_type_string2id(format);
  293. pf->code = code;
  294. pf->result_body_wb->expires = expiration;
  295. if(expiration <= now_realtime_sec())
  296. buffer_no_cacheable(pf->result_body_wb);
  297. else
  298. buffer_cacheable(pf->result_body_wb);
  299. }
  300. parser->defer.response = (pf) ? pf->result_body_wb : NULL;
  301. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  302. parser->defer.action = pluginsd_function_result_end;
  303. parser->defer.action_data = string_strdupz(transaction); // it is ok is key is NULL
  304. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  305. return PARSER_RC_OK;
  306. }
  307. PARSER_RC pluginsd_function_progress(char **words, size_t num_words, PARSER *parser) {
  308. size_t i = 1;
  309. char *transaction = get_word(words, num_words, i++);
  310. char *done_str = get_word(words, num_words, i++);
  311. char *all_str = get_word(words, num_words, i++);
  312. struct inflight_function *pf = inflight_function_find(parser, transaction);
  313. if(pf) {
  314. size_t done = done_str && *done_str ? str2u(done_str) : 0;
  315. size_t all = all_str && *all_str ? str2u(all_str) : 0;
  316. if(pf->progress.cb)
  317. pf->progress.cb(pf->progress.data, done, all);
  318. }
  319. return PARSER_RC_OK;
  320. }