functions_evloop.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "functions_evloop.h"
  3. #define MAX_FUNCTION_PARAMETERS 1024
  4. struct functions_evloop_worker_job {
  5. bool used;
  6. bool running;
  7. bool cancelled;
  8. char *cmd;
  9. const char *transaction;
  10. time_t timeout;
  11. functions_evloop_worker_execute_t cb;
  12. };
  13. struct rrd_functions_expectation {
  14. const char *function;
  15. size_t function_length;
  16. functions_evloop_worker_execute_t cb;
  17. time_t default_timeout;
  18. struct rrd_functions_expectation *prev, *next;
  19. };
  20. struct functions_evloop_globals {
  21. const char *tag;
  22. DICTIONARY *worker_queue;
  23. pthread_mutex_t worker_mutex;
  24. pthread_cond_t worker_cond_var;
  25. size_t workers;
  26. netdata_mutex_t *stdout_mutex;
  27. bool *plugin_should_exit;
  28. netdata_thread_t reader_thread;
  29. netdata_thread_t *worker_threads;
  30. struct rrd_functions_expectation *expectations;
  31. };
  32. static void *rrd_functions_worker_globals_worker_main(void *arg) {
  33. struct functions_evloop_globals *wg = arg;
  34. bool last_acquired = true;
  35. while (true) {
  36. pthread_mutex_lock(&wg->worker_mutex);
  37. if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
  38. pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
  39. const DICTIONARY_ITEM *acquired = NULL;
  40. struct functions_evloop_worker_job *j;
  41. dfe_start_write(wg->worker_queue, j) {
  42. if(j->running || j->cancelled)
  43. continue;
  44. acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
  45. j->running = true;
  46. break;
  47. }
  48. dfe_done(j);
  49. pthread_mutex_unlock(&wg->worker_mutex);
  50. if(acquired) {
  51. ND_LOG_STACK lgs[] = {
  52. ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
  53. ND_LOG_FIELD_END(),
  54. };
  55. ND_LOG_STACK_PUSH(lgs);
  56. last_acquired = true;
  57. j = dictionary_acquired_item_value(acquired);
  58. j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
  59. dictionary_del(wg->worker_queue, j->transaction);
  60. dictionary_acquired_item_release(wg->worker_queue, acquired);
  61. dictionary_garbage_collect(wg->worker_queue);
  62. }
  63. else
  64. last_acquired = false;
  65. }
  66. return NULL;
  67. }
  68. static void *rrd_functions_worker_globals_reader_main(void *arg) {
  69. struct functions_evloop_globals *wg = arg;
  70. char buffer[PLUGINSD_LINE_MAX + 1];
  71. char *s = NULL;
  72. while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
  73. char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
  74. size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
  75. const char *keyword = get_word(words, num_words, 0);
  76. if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
  77. char *transaction = get_word(words, num_words, 1);
  78. char *timeout_s = get_word(words, num_words, 2);
  79. char *function = get_word(words, num_words, 3);
  80. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  81. netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  82. keyword,
  83. transaction?transaction:"(unset)",
  84. timeout_s?timeout_s:"(unset)",
  85. function?function:"(unset)");
  86. }
  87. else {
  88. int timeout = str2i(timeout_s);
  89. bool found = false;
  90. struct rrd_functions_expectation *we;
  91. for(we = wg->expectations; we ;we = we->next) {
  92. if(strncmp(function, we->function, we->function_length) == 0) {
  93. struct functions_evloop_worker_job t = {
  94. .cmd = strdupz(function),
  95. .transaction = strdupz(transaction),
  96. .running = false,
  97. .cancelled = false,
  98. .timeout = timeout > 0 ? timeout : we->default_timeout,
  99. .used = false,
  100. .cb = we->cb,
  101. };
  102. struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
  103. if(j->used) {
  104. netdata_log_error("Received duplicate function transaction '%s'", transaction);
  105. freez((void *)t.cmd);
  106. freez((void *)t.transaction);
  107. }
  108. else {
  109. found = true;
  110. j->used = true;
  111. pthread_cond_signal(&wg->worker_cond_var);
  112. }
  113. }
  114. }
  115. if(!found) {
  116. netdata_mutex_lock(wg->stdout_mutex);
  117. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
  118. "No function with this name found.");
  119. netdata_mutex_unlock(wg->stdout_mutex);
  120. }
  121. }
  122. }
  123. else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
  124. char *transaction = get_word(words, num_words, 1);
  125. const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
  126. if(acquired) {
  127. struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
  128. __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
  129. dictionary_acquired_item_release(wg->worker_queue, acquired);
  130. dictionary_del(wg->worker_queue, transaction);
  131. dictionary_garbage_collect(wg->worker_queue);
  132. }
  133. else
  134. netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
  135. }
  136. else
  137. netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
  138. }
  139. if(!s || feof(stdin) || ferror(stdin)) {
  140. *wg->plugin_should_exit = true;
  141. netdata_log_error("Received error on stdin.");
  142. }
  143. exit(1);
  144. }
  145. void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
  146. struct functions_evloop_worker_job *j = value;
  147. freez((void *)j->cmd);
  148. freez((void *)j->transaction);
  149. }
  150. struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
  151. struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
  152. wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  153. dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
  154. pthread_mutex_init(&wg->worker_mutex, NULL);
  155. pthread_cond_init(&wg->worker_cond_var, NULL);
  156. wg->plugin_should_exit = plugin_should_exit;
  157. wg->stdout_mutex = stdout_mutex;
  158. wg->workers = worker_threads;
  159. wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
  160. wg->tag = tag;
  161. char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
  162. snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
  163. netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
  164. rrd_functions_worker_globals_reader_main, wg);
  165. for(size_t i = 0; i < wg->workers ; i++) {
  166. snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
  167. netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
  168. rrd_functions_worker_globals_worker_main, wg);
  169. }
  170. return wg;
  171. }
  172. void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
  173. struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
  174. we->function = function;
  175. we->function_length = strlen(we->function);
  176. we->cb = cb;
  177. we->default_timeout = default_timeout;
  178. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
  179. }
  180. void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){
  181. for(size_t i = 0; i < wg->workers ; i++)
  182. netdata_thread_cancel(wg->worker_threads[i]);
  183. netdata_thread_cancel(wg->reader_thread);
  184. }