123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "functions_evloop.h"
- static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, BUFFER *payload, const char *source, void *data);
- struct functions_evloop_worker_job {
- bool used;
- bool running;
- bool cancelled;
- usec_t stop_monotonic_ut;
- char *cmd;
- const char *transaction;
- time_t timeout;
- BUFFER *payload;
- const char *source;
- functions_evloop_worker_execute_t cb;
- void *cb_data;
- };
- static void worker_job_cleanup(struct functions_evloop_worker_job *j) {
- freez((void *)j->cmd);
- freez((void *)j->transaction);
- freez((void *)j->source);
- buffer_free(j->payload);
- }
- struct rrd_functions_expectation {
- const char *function;
- size_t function_length;
- functions_evloop_worker_execute_t cb;
- void *cb_data;
- time_t default_timeout;
- struct rrd_functions_expectation *prev, *next;
- };
- struct functions_evloop_globals {
- const char *tag;
- DICTIONARY *worker_queue;
- pthread_mutex_t worker_mutex;
- pthread_cond_t worker_cond_var;
- size_t workers;
- netdata_mutex_t *stdout_mutex;
- bool *plugin_should_exit;
- netdata_thread_t reader_thread;
- netdata_thread_t *worker_threads;
- struct {
- DICTIONARY *nodes;
- } dyncfg;
- struct rrd_functions_expectation *expectations;
- };
- static void *rrd_functions_worker_globals_worker_main(void *arg) {
- struct functions_evloop_globals *wg = arg;
- bool last_acquired = true;
- while (true) {
- pthread_mutex_lock(&wg->worker_mutex);
- if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
- pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
- const DICTIONARY_ITEM *acquired = NULL;
- struct functions_evloop_worker_job *j;
- dfe_start_write(wg->worker_queue, j) {
- if(j->running || j->cancelled)
- continue;
- acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
- j->running = true;
- break;
- }
- dfe_done(j);
- pthread_mutex_unlock(&wg->worker_mutex);
- if(acquired) {
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
- last_acquired = true;
- j = dictionary_acquired_item_value(acquired);
- j->cb(j->transaction, j->cmd, &j->stop_monotonic_ut, &j->cancelled, j->payload, j->source, j->cb_data);
- dictionary_del(wg->worker_queue, j->transaction);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- dictionary_garbage_collect(wg->worker_queue);
- }
- else
- last_acquired = false;
- }
- return NULL;
- }
- static void worker_add_job(struct functions_evloop_globals *wg, const char *keyword, char *transaction, char *function, char *timeout_s, BUFFER *payload, const char *source) {
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- nd_log(NDLS_COLLECTORS, NDLP_ERR, "Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- keyword,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
- const char *msg = "No function with this name found";
- bool found = false;
- struct rrd_functions_expectation *we;
- for(we = wg->expectations; we ;we = we->next) {
- if(strncmp(function, we->function, we->function_length) == 0) {
- if(timeout <= 0)
- timeout = (int)we->default_timeout;
- struct functions_evloop_worker_job t = {
- .cmd = strdupz(function),
- .transaction = strdupz(transaction),
- .running = false,
- .cancelled = false,
- .timeout = timeout,
- .stop_monotonic_ut = now_monotonic_usec() + (timeout * USEC_PER_SEC),
- .used = false,
- .payload = buffer_dup(payload),
- .source = source ? strdupz(source) : NULL,
- .cb = we->cb,
- .cb_data = we->cb_data,
- };
- struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
- if(j->used) {
- nd_log(NDLS_COLLECTORS, NDLP_WARNING, "Received duplicate function transaction '%s'. Ignoring it.", transaction);
- worker_job_cleanup(&t);
- msg = "Duplicate function transaction. Ignoring it.";
- }
- else {
- found = true;
- j->used = true;
- pthread_cond_signal(&wg->worker_cond_var);
- }
- }
- }
- if(!found) {
- netdata_mutex_lock(wg->stdout_mutex);
- pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, msg);
- netdata_mutex_unlock(wg->stdout_mutex);
- }
- }
- }
- static void *rrd_functions_worker_globals_reader_main(void *arg) {
- struct functions_evloop_globals *wg = arg;
- struct {
- size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc...
- bool enabled;
- char *transaction;
- char *function;
- char *timeout_s;
- char *source;
- char *content_type;
- } deferred = { 0 };
- struct buffered_reader reader = { 0 };
- buffered_reader_init(&reader);
- BUFFER *buffer = buffer_create(sizeof(reader.read_buffer) + 2, NULL);
- while(!(*wg->plugin_should_exit)) {
- if(unlikely(!buffered_reader_next_line(&reader, buffer))) {
- buffered_reader_ret_t ret = buffered_reader_read_timeout(
- &reader,
- fileno((FILE *)stdin),
- 2 * 60 * MSEC_PER_SEC,
- false
- );
- if(unlikely(ret != BUFFERED_READER_READ_OK && ret != BUFFERED_READER_READ_POLL_TIMEOUT))
- break;
- continue;
- }
- if(deferred.enabled) {
- char *s = (char *)buffer_tostring(buffer);
- if(strstr(&s[deferred.last_len], PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "\n") != NULL) {
- if(deferred.last_len > 0)
- // remove the trailing newline from the buffer
- deferred.last_len--;
- s[deferred.last_len] = '\0';
- buffer->len = deferred.last_len;
- buffer->content_type = content_type_string2id(deferred.content_type);
- worker_add_job(wg, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD, deferred.transaction, deferred.function, deferred.timeout_s, buffer, deferred.source);
- buffer_flush(buffer);
- freez(deferred.transaction);
- freez(deferred.function);
- freez(deferred.timeout_s);
- freez(deferred.source);
- freez(deferred.content_type);
- memset(&deferred, 0, sizeof(deferred));
- }
- else
- deferred.last_len = buffer->len;
- continue;
- }
- char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS);
- const char *keyword = get_word(words, num_words, 0);
- if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0)) {
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
- char *source = get_word(words, num_words, 4);
- worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, source);
- }
- else if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0)) {
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
- char *source = get_word(words, num_words, 4);
- char *content_type = get_word(words, num_words, 5);
- deferred.transaction = strdupz(transaction ? transaction : "");
- deferred.timeout_s = strdupz(timeout_s ? timeout_s : "");
- deferred.function = strdupz(function ? function : "");
- deferred.source = strdupz(source ? source : "");
- deferred.content_type = strdupz(content_type ? content_type : "");
- deferred.last_len = 0;
- deferred.enabled = true;
- }
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
- char *transaction = get_word(words, num_words, 1);
- const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
- if(acquired) {
- struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
- __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- dictionary_del(wg->worker_queue, transaction);
- dictionary_garbage_collect(wg->worker_queue);
- }
- else
- nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction);
- }
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PROGRESS) == 0) {
- char *transaction = get_word(words, num_words, 1);
- const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
- if(acquired) {
- struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
- functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- }
- else
- nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction);
- }
- else
- nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)");
- buffer_flush(buffer);
- }
- if(!(*wg->plugin_should_exit))
- nd_log(NDLS_COLLECTORS, NDLP_ERR, "Read error on stdin");
- *wg->plugin_should_exit = true;
- exit(1);
- }
- void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
- struct functions_evloop_worker_job *j = value;
- worker_job_cleanup(j);
- }
- struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
- struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
- wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
- dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
- wg->dyncfg.nodes = dyncfg_nodes_dictionary_create();
- pthread_mutex_init(&wg->worker_mutex, NULL);
- pthread_cond_init(&wg->worker_cond_var, NULL);
- wg->plugin_should_exit = plugin_should_exit;
- wg->stdout_mutex = stdout_mutex;
- wg->workers = worker_threads;
- wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
- wg->tag = tag;
- char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
- netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
- rrd_functions_worker_globals_reader_main, wg);
- for(size_t i = 0; i < wg->workers ; i++) {
- snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
- netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
- rrd_functions_worker_globals_worker_main, wg);
- }
- functions_evloop_add_function(wg, "config", functions_evloop_config_cb, 120, wg);
- return wg;
- }
- void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data) {
- struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
- we->function = function;
- we->function_length = strlen(we->function);
- we->cb = cb;
- we->cb_data = data;
- we->default_timeout = default_timeout;
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
- }
- void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){
- for(size_t i = 0; i < wg->workers ; i++)
- netdata_thread_cancel(wg->worker_threads[i]);
- netdata_thread_cancel(wg->reader_thread);
- }
- // ----------------------------------------------------------------------------
- static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled,
- BUFFER *payload, const char *source, void *data) {
- struct functions_evloop_globals *wg = data;
- CLEAN_BUFFER *result = buffer_create(1024, NULL);
- int code = dyncfg_node_find_and_call(wg->dyncfg.nodes, transaction, function, stop_monotonic_ut, cancelled, payload, source, result);
- netdata_mutex_lock(wg->stdout_mutex);
- pluginsd_function_result_begin_to_stdout(transaction, code, content_type_id2string(result->content_type), result->expires);
- printf("%s", buffer_tostring(result));
- pluginsd_function_result_end_to_stdout();
- fflush(stdout);
- netdata_mutex_unlock(wg->stdout_mutex);
- }
- void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path, DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, const char *source, DYNCFG_CMDS cmds, dyncfg_cb_t cb, void *data) {
- if(!dyncfg_is_valid_id(id)) {
- nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
- return;
- }
- struct dyncfg_node tmp = {
- .cmds = cmds,
- .type = type,
- .cb = cb,
- .data = data,
- };
- dictionary_set(wg->dyncfg.nodes, id, &tmp, sizeof(tmp));
- CLEAN_BUFFER *c = buffer_create(100, NULL);
- dyncfg_cmds2buffer(cmds, c);
- netdata_mutex_lock(wg->stdout_mutex);
- fprintf(stdout,
- PLUGINSD_KEYWORD_CONFIG " '%s' " PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE " '%s' '%s' '%s' '%s' '%s' '%s'\n",
- id, dyncfg_id2status(status), dyncfg_id2type(type), path,
- dyncfg_id2source_type(source_type), source, buffer_tostring(c)
- );
- fflush(stdout);
- netdata_mutex_unlock(wg->stdout_mutex);
- }
- void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id) {
- if(!dyncfg_is_valid_id(id)) {
- nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
- return;
- }
- dictionary_del(wg->dyncfg.nodes, id);
- netdata_mutex_lock(wg->stdout_mutex);
- fprintf(stdout,
- PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "\n",
- id);
- fflush(stdout);
- netdata_mutex_unlock(wg->stdout_mutex);
- }
- void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status) {
- if(!dyncfg_is_valid_id(id)) {
- nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
- return;
- }
- netdata_mutex_lock(wg->stdout_mutex);
- fprintf(stdout,
- PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS " %s\n",
- id, dyncfg_id2status(status));
- fflush(stdout);
- netdata_mutex_unlock(wg->stdout_mutex);
- }
|