functions_evloop.c 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "functions_evloop.h"
  3. #define MAX_FUNCTION_PARAMETERS 1024
  4. struct functions_evloop_worker_job {
  5. bool used;
  6. bool running;
  7. bool cancelled;
  8. char *cmd;
  9. const char *transaction;
  10. time_t timeout;
  11. functions_evloop_worker_execute_t cb;
  12. };
  13. struct rrd_functions_expectation {
  14. const char *function;
  15. size_t function_length;
  16. functions_evloop_worker_execute_t cb;
  17. time_t default_timeout;
  18. struct rrd_functions_expectation *prev, *next;
  19. };
  20. struct functions_evloop_globals {
  21. const char *tag;
  22. DICTIONARY *worker_queue;
  23. pthread_mutex_t worker_mutex;
  24. pthread_cond_t worker_cond_var;
  25. size_t workers;
  26. netdata_mutex_t *stdout_mutex;
  27. bool *plugin_should_exit;
  28. netdata_thread_t reader_thread;
  29. netdata_thread_t *worker_threads;
  30. struct rrd_functions_expectation *expectations;
  31. };
  32. static void *rrd_functions_worker_globals_worker_main(void *arg) {
  33. struct functions_evloop_globals *wg = arg;
  34. bool last_acquired = true;
  35. while (true) {
  36. pthread_mutex_lock(&wg->worker_mutex);
  37. if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
  38. pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
  39. const DICTIONARY_ITEM *acquired = NULL;
  40. struct functions_evloop_worker_job *j;
  41. dfe_start_write(wg->worker_queue, j) {
  42. if(j->running || j->cancelled)
  43. continue;
  44. acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
  45. j->running = true;
  46. break;
  47. }
  48. dfe_done(j);
  49. pthread_mutex_unlock(&wg->worker_mutex);
  50. if(acquired) {
  51. last_acquired = true;
  52. j = dictionary_acquired_item_value(acquired);
  53. j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
  54. dictionary_del(wg->worker_queue, j->transaction);
  55. dictionary_acquired_item_release(wg->worker_queue, acquired);
  56. dictionary_garbage_collect(wg->worker_queue);
  57. }
  58. else
  59. last_acquired = false;
  60. }
  61. return NULL;
  62. }
  63. static void *rrd_functions_worker_globals_reader_main(void *arg) {
  64. struct functions_evloop_globals *wg = arg;
  65. char buffer[PLUGINSD_LINE_MAX + 1];
  66. char *s = NULL;
  67. while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
  68. char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
  69. size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
  70. const char *keyword = get_word(words, num_words, 0);
  71. if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
  72. char *transaction = get_word(words, num_words, 1);
  73. char *timeout_s = get_word(words, num_words, 2);
  74. char *function = get_word(words, num_words, 3);
  75. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  76. netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  77. keyword,
  78. transaction?transaction:"(unset)",
  79. timeout_s?timeout_s:"(unset)",
  80. function?function:"(unset)");
  81. }
  82. else {
  83. int timeout = str2i(timeout_s);
  84. bool found = false;
  85. struct rrd_functions_expectation *we;
  86. for(we = wg->expectations; we ;we = we->next) {
  87. if(strncmp(function, we->function, we->function_length) == 0) {
  88. struct functions_evloop_worker_job t = {
  89. .cmd = strdupz(function),
  90. .transaction = strdupz(transaction),
  91. .running = false,
  92. .cancelled = false,
  93. .timeout = timeout > 0 ? timeout : we->default_timeout,
  94. .used = false,
  95. .cb = we->cb,
  96. };
  97. struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
  98. if(j->used) {
  99. netdata_log_error("Received duplicate function transaction '%s'", transaction);
  100. freez((void *)t.cmd);
  101. freez((void *)t.transaction);
  102. }
  103. else {
  104. found = true;
  105. j->used = true;
  106. pthread_cond_signal(&wg->worker_cond_var);
  107. }
  108. }
  109. }
  110. if(!found) {
  111. netdata_mutex_lock(wg->stdout_mutex);
  112. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
  113. "No function with this name found.");
  114. netdata_mutex_unlock(wg->stdout_mutex);
  115. }
  116. }
  117. }
  118. else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
  119. char *transaction = get_word(words, num_words, 1);
  120. const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
  121. if(acquired) {
  122. struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
  123. __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
  124. dictionary_acquired_item_release(wg->worker_queue, acquired);
  125. dictionary_del(wg->worker_queue, transaction);
  126. dictionary_garbage_collect(wg->worker_queue);
  127. }
  128. else
  129. netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
  130. }
  131. else
  132. netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
  133. }
  134. if(!s || feof(stdin) || ferror(stdin)) {
  135. *wg->plugin_should_exit = true;
  136. netdata_log_error("Received error on stdin.");
  137. }
  138. exit(1);
  139. }
  140. void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
  141. struct functions_evloop_worker_job *j = value;
  142. freez((void *)j->cmd);
  143. freez((void *)j->transaction);
  144. }
  145. struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
  146. struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
  147. wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  148. dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
  149. pthread_mutex_init(&wg->worker_mutex, NULL);
  150. pthread_cond_init(&wg->worker_cond_var, NULL);
  151. wg->plugin_should_exit = plugin_should_exit;
  152. wg->stdout_mutex = stdout_mutex;
  153. wg->workers = worker_threads;
  154. wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
  155. wg->tag = tag;
  156. char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
  157. snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
  158. netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
  159. rrd_functions_worker_globals_reader_main, wg);
  160. for(size_t i = 0; i < wg->workers ; i++) {
  161. snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
  162. netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
  163. rrd_functions_worker_globals_worker_main, wg);
  164. }
  165. return wg;
  166. }
  167. void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
  168. struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
  169. we->function = function;
  170. we->function_length = strlen(we->function);
  171. we->cb = cb;
  172. we->default_timeout = default_timeout;
  173. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
  174. }