functions_evloop.c 16 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "functions_evloop.h"
  3. static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, BUFFER *payload, const char *source, void *data);
  4. struct functions_evloop_worker_job {
  5. bool used;
  6. bool running;
  7. bool cancelled;
  8. usec_t stop_monotonic_ut;
  9. char *cmd;
  10. const char *transaction;
  11. time_t timeout;
  12. BUFFER *payload;
  13. const char *source;
  14. functions_evloop_worker_execute_t cb;
  15. void *cb_data;
  16. };
  17. static void worker_job_cleanup(struct functions_evloop_worker_job *j) {
  18. freez((void *)j->cmd);
  19. freez((void *)j->transaction);
  20. freez((void *)j->source);
  21. buffer_free(j->payload);
  22. }
  23. struct rrd_functions_expectation {
  24. const char *function;
  25. size_t function_length;
  26. functions_evloop_worker_execute_t cb;
  27. void *cb_data;
  28. time_t default_timeout;
  29. struct rrd_functions_expectation *prev, *next;
  30. };
  31. struct functions_evloop_globals {
  32. const char *tag;
  33. DICTIONARY *worker_queue;
  34. pthread_mutex_t worker_mutex;
  35. pthread_cond_t worker_cond_var;
  36. size_t workers;
  37. netdata_mutex_t *stdout_mutex;
  38. bool *plugin_should_exit;
  39. netdata_thread_t reader_thread;
  40. netdata_thread_t *worker_threads;
  41. struct {
  42. DICTIONARY *nodes;
  43. } dyncfg;
  44. struct rrd_functions_expectation *expectations;
  45. };
  46. static void *rrd_functions_worker_globals_worker_main(void *arg) {
  47. struct functions_evloop_globals *wg = arg;
  48. bool last_acquired = true;
  49. while (true) {
  50. pthread_mutex_lock(&wg->worker_mutex);
  51. if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
  52. pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
  53. const DICTIONARY_ITEM *acquired = NULL;
  54. struct functions_evloop_worker_job *j;
  55. dfe_start_write(wg->worker_queue, j) {
  56. if(j->running || j->cancelled)
  57. continue;
  58. acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
  59. j->running = true;
  60. break;
  61. }
  62. dfe_done(j);
  63. pthread_mutex_unlock(&wg->worker_mutex);
  64. if(acquired) {
  65. ND_LOG_STACK lgs[] = {
  66. ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
  67. ND_LOG_FIELD_END(),
  68. };
  69. ND_LOG_STACK_PUSH(lgs);
  70. last_acquired = true;
  71. j = dictionary_acquired_item_value(acquired);
  72. j->cb(j->transaction, j->cmd, &j->stop_monotonic_ut, &j->cancelled, j->payload, j->source, j->cb_data);
  73. dictionary_del(wg->worker_queue, j->transaction);
  74. dictionary_acquired_item_release(wg->worker_queue, acquired);
  75. dictionary_garbage_collect(wg->worker_queue);
  76. }
  77. else
  78. last_acquired = false;
  79. }
  80. return NULL;
  81. }
  82. static void worker_add_job(struct functions_evloop_globals *wg, const char *keyword, char *transaction, char *function, char *timeout_s, BUFFER *payload, const char *source) {
  83. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  84. nd_log(NDLS_COLLECTORS, NDLP_ERR, "Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  85. keyword,
  86. transaction?transaction:"(unset)",
  87. timeout_s?timeout_s:"(unset)",
  88. function?function:"(unset)");
  89. }
  90. else {
  91. int timeout = str2i(timeout_s);
  92. const char *msg = "No function with this name found";
  93. bool found = false;
  94. struct rrd_functions_expectation *we;
  95. for(we = wg->expectations; we ;we = we->next) {
  96. if(strncmp(function, we->function, we->function_length) == 0) {
  97. if(timeout <= 0)
  98. timeout = (int)we->default_timeout;
  99. struct functions_evloop_worker_job t = {
  100. .cmd = strdupz(function),
  101. .transaction = strdupz(transaction),
  102. .running = false,
  103. .cancelled = false,
  104. .timeout = timeout,
  105. .stop_monotonic_ut = now_monotonic_usec() + (timeout * USEC_PER_SEC),
  106. .used = false,
  107. .payload = buffer_dup(payload),
  108. .source = source ? strdupz(source) : NULL,
  109. .cb = we->cb,
  110. .cb_data = we->cb_data,
  111. };
  112. struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
  113. if(j->used) {
  114. nd_log(NDLS_COLLECTORS, NDLP_WARNING, "Received duplicate function transaction '%s'. Ignoring it.", transaction);
  115. worker_job_cleanup(&t);
  116. msg = "Duplicate function transaction. Ignoring it.";
  117. }
  118. else {
  119. found = true;
  120. j->used = true;
  121. pthread_cond_signal(&wg->worker_cond_var);
  122. }
  123. }
  124. }
  125. if(!found) {
  126. netdata_mutex_lock(wg->stdout_mutex);
  127. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, msg);
  128. netdata_mutex_unlock(wg->stdout_mutex);
  129. }
  130. }
  131. }
  132. static void *rrd_functions_worker_globals_reader_main(void *arg) {
  133. struct functions_evloop_globals *wg = arg;
  134. struct {
  135. size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc...
  136. bool enabled;
  137. char *transaction;
  138. char *function;
  139. char *timeout_s;
  140. char *source;
  141. char *content_type;
  142. } deferred = { 0 };
  143. struct buffered_reader reader = { 0 };
  144. buffered_reader_init(&reader);
  145. BUFFER *buffer = buffer_create(sizeof(reader.read_buffer) + 2, NULL);
  146. while(!(*wg->plugin_should_exit)) {
  147. if(unlikely(!buffered_reader_next_line(&reader, buffer))) {
  148. buffered_reader_ret_t ret = buffered_reader_read_timeout(
  149. &reader,
  150. fileno((FILE *)stdin),
  151. 2 * 60 * MSEC_PER_SEC,
  152. false
  153. );
  154. if(unlikely(ret != BUFFERED_READER_READ_OK && ret != BUFFERED_READER_READ_POLL_TIMEOUT))
  155. break;
  156. continue;
  157. }
  158. if(deferred.enabled) {
  159. char *s = (char *)buffer_tostring(buffer);
  160. if(strstr(&s[deferred.last_len], PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "\n") != NULL) {
  161. if(deferred.last_len > 0)
  162. // remove the trailing newline from the buffer
  163. deferred.last_len--;
  164. s[deferred.last_len] = '\0';
  165. buffer->len = deferred.last_len;
  166. buffer->content_type = content_type_string2id(deferred.content_type);
  167. worker_add_job(wg, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD, deferred.transaction, deferred.function, deferred.timeout_s, buffer, deferred.source);
  168. buffer_flush(buffer);
  169. freez(deferred.transaction);
  170. freez(deferred.function);
  171. freez(deferred.timeout_s);
  172. freez(deferred.source);
  173. freez(deferred.content_type);
  174. memset(&deferred, 0, sizeof(deferred));
  175. }
  176. else
  177. deferred.last_len = buffer->len;
  178. continue;
  179. }
  180. char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
  181. size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS);
  182. const char *keyword = get_word(words, num_words, 0);
  183. if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0)) {
  184. char *transaction = get_word(words, num_words, 1);
  185. char *timeout_s = get_word(words, num_words, 2);
  186. char *function = get_word(words, num_words, 3);
  187. char *source = get_word(words, num_words, 4);
  188. worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, source);
  189. }
  190. else if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0)) {
  191. char *transaction = get_word(words, num_words, 1);
  192. char *timeout_s = get_word(words, num_words, 2);
  193. char *function = get_word(words, num_words, 3);
  194. char *source = get_word(words, num_words, 4);
  195. char *content_type = get_word(words, num_words, 5);
  196. deferred.transaction = strdupz(transaction ? transaction : "");
  197. deferred.timeout_s = strdupz(timeout_s ? timeout_s : "");
  198. deferred.function = strdupz(function ? function : "");
  199. deferred.source = strdupz(source ? source : "");
  200. deferred.content_type = strdupz(content_type ? content_type : "");
  201. deferred.last_len = 0;
  202. deferred.enabled = true;
  203. }
  204. else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
  205. char *transaction = get_word(words, num_words, 1);
  206. const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
  207. if(acquired) {
  208. struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
  209. __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
  210. dictionary_acquired_item_release(wg->worker_queue, acquired);
  211. dictionary_del(wg->worker_queue, transaction);
  212. dictionary_garbage_collect(wg->worker_queue);
  213. }
  214. else
  215. nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction);
  216. }
  217. else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PROGRESS) == 0) {
  218. char *transaction = get_word(words, num_words, 1);
  219. const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
  220. if(acquired) {
  221. struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
  222. functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut);
  223. dictionary_acquired_item_release(wg->worker_queue, acquired);
  224. }
  225. else
  226. nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction);
  227. }
  228. else
  229. nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)");
  230. buffer_flush(buffer);
  231. }
  232. if(!(*wg->plugin_should_exit))
  233. nd_log(NDLS_COLLECTORS, NDLP_ERR, "Read error on stdin");
  234. *wg->plugin_should_exit = true;
  235. exit(1);
  236. }
  237. void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
  238. struct functions_evloop_worker_job *j = value;
  239. worker_job_cleanup(j);
  240. }
  241. struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
  242. struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
  243. wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  244. dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
  245. wg->dyncfg.nodes = dyncfg_nodes_dictionary_create();
  246. pthread_mutex_init(&wg->worker_mutex, NULL);
  247. pthread_cond_init(&wg->worker_cond_var, NULL);
  248. wg->plugin_should_exit = plugin_should_exit;
  249. wg->stdout_mutex = stdout_mutex;
  250. wg->workers = worker_threads;
  251. wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
  252. wg->tag = tag;
  253. char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
  254. snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
  255. netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
  256. rrd_functions_worker_globals_reader_main, wg);
  257. for(size_t i = 0; i < wg->workers ; i++) {
  258. snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
  259. netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
  260. rrd_functions_worker_globals_worker_main, wg);
  261. }
  262. functions_evloop_add_function(wg, "config", functions_evloop_config_cb, 120, wg);
  263. return wg;
  264. }
  265. void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data) {
  266. struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
  267. we->function = function;
  268. we->function_length = strlen(we->function);
  269. we->cb = cb;
  270. we->cb_data = data;
  271. we->default_timeout = default_timeout;
  272. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
  273. }
  274. void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){
  275. for(size_t i = 0; i < wg->workers ; i++)
  276. netdata_thread_cancel(wg->worker_threads[i]);
  277. netdata_thread_cancel(wg->reader_thread);
  278. }
  279. // ----------------------------------------------------------------------------
  280. static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled,
  281. BUFFER *payload, const char *source, void *data) {
  282. struct functions_evloop_globals *wg = data;
  283. CLEAN_BUFFER *result = buffer_create(1024, NULL);
  284. int code = dyncfg_node_find_and_call(wg->dyncfg.nodes, transaction, function, stop_monotonic_ut, cancelled, payload, source, result);
  285. netdata_mutex_lock(wg->stdout_mutex);
  286. pluginsd_function_result_begin_to_stdout(transaction, code, content_type_id2string(result->content_type), result->expires);
  287. printf("%s", buffer_tostring(result));
  288. pluginsd_function_result_end_to_stdout();
  289. fflush(stdout);
  290. netdata_mutex_unlock(wg->stdout_mutex);
  291. }
  292. void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path, DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, const char *source, DYNCFG_CMDS cmds, dyncfg_cb_t cb, void *data) {
  293. if(!dyncfg_is_valid_id(id)) {
  294. nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
  295. return;
  296. }
  297. struct dyncfg_node tmp = {
  298. .cmds = cmds,
  299. .type = type,
  300. .cb = cb,
  301. .data = data,
  302. };
  303. dictionary_set(wg->dyncfg.nodes, id, &tmp, sizeof(tmp));
  304. CLEAN_BUFFER *c = buffer_create(100, NULL);
  305. dyncfg_cmds2buffer(cmds, c);
  306. netdata_mutex_lock(wg->stdout_mutex);
  307. fprintf(stdout,
  308. PLUGINSD_KEYWORD_CONFIG " '%s' " PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE " '%s' '%s' '%s' '%s' '%s' '%s'\n",
  309. id, dyncfg_id2status(status), dyncfg_id2type(type), path,
  310. dyncfg_id2source_type(source_type), source, buffer_tostring(c)
  311. );
  312. fflush(stdout);
  313. netdata_mutex_unlock(wg->stdout_mutex);
  314. }
  315. void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id) {
  316. if(!dyncfg_is_valid_id(id)) {
  317. nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
  318. return;
  319. }
  320. dictionary_del(wg->dyncfg.nodes, id);
  321. netdata_mutex_lock(wg->stdout_mutex);
  322. fprintf(stdout,
  323. PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "\n",
  324. id);
  325. fflush(stdout);
  326. netdata_mutex_unlock(wg->stdout_mutex);
  327. }
  328. void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status) {
  329. if(!dyncfg_is_valid_id(id)) {
  330. nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
  331. return;
  332. }
  333. netdata_mutex_lock(wg->stdout_mutex);
  334. fprintf(stdout,
  335. PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS " %s\n",
  336. id, dyncfg_id2status(status));
  337. fflush(stdout);
  338. netdata_mutex_unlock(wg->stdout_mutex);
  339. }