123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "spawn.h"
- static uv_thread_t thread;
- int spawn_thread_error;
- int spawn_thread_shutdown;
- struct spawn_queue spawn_cmd_queue;
- static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run)
- {
- struct spawn_cmd_info *cmdinfo;
- cmdinfo = mallocz(sizeof(*cmdinfo));
- fatal_assert(0 == uv_cond_init(&cmdinfo->cond));
- fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex));
- cmdinfo->serial = 0; /* invalid */
- cmdinfo->command_to_run = strdupz(command_to_run);
- cmdinfo->exit_status = -1; /* invalid */
- cmdinfo->pid = -1; /* invalid */
- cmdinfo->flags = 0;
- return cmdinfo;
- }
- void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
- {
- uv_cond_destroy(&cmdinfo->cond);
- uv_mutex_destroy(&cmdinfo->mutex);
- freez(cmdinfo->command_to_run);
- freez(cmdinfo);
- }
- int spawn_cmd_compare(void *a, void *b)
- {
- struct spawn_cmd_info *cmda = a, *cmdb = b;
- /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
- if (cmda->serial < cmdb->serial) return -1;
- if (cmda->serial > cmdb->serial) return 1;
- return 0;
- }
- static void init_spawn_cmd_queue(void)
- {
- spawn_cmd_queue.cmd_tree.root = NULL;
- spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
- spawn_cmd_queue.size = 0;
- spawn_cmd_queue.latest_serial = 0;
- fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
- fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
- }
- /*
- * Returns serial number of the enqueued command
- */
- uint64_t spawn_enq_cmd(const char *command_to_run)
- {
- unsigned queue_size;
- uint64_t serial;
- avl_t *avl_ret;
- struct spawn_cmd_info *cmdinfo;
- cmdinfo = create_spawn_cmd(command_to_run);
- /* wait for free space in queue */
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
- uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
- }
- fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING);
- spawn_cmd_queue.size = queue_size + 1;
- serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
- cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */
- /* enqueue command */
- avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
- fatal_assert(avl_ret == (avl_t *)cmdinfo);
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- /* wake up event loop */
- fatal_assert(0 == uv_async_send(&spawn_async));
- return serial;
- }
- /*
- * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
- */
- void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
- {
- avl_t *avl_ret;
- struct spawn_cmd_info tmp, *cmdinfo;
- tmp.serial = serial;
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp);
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
- cmdinfo = (struct spawn_cmd_info *)avl_ret;
- uv_mutex_lock(&cmdinfo->mutex);
- while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
- /* Only 1 thread is allowed to wait for this command to finish */
- uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
- }
- uv_mutex_unlock(&cmdinfo->mutex);
- spawn_deq_cmd(cmdinfo);
- *exit_status = cmdinfo->exit_status;
- *exec_run_timestamp = cmdinfo->exec_run_timestamp;
- destroy_spawn_cmd(cmdinfo);
- }
- void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
- {
- unsigned queue_size;
- avl_t *avl_ret;
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- queue_size = spawn_cmd_queue.size;
- fatal_assert(queue_size);
- /* dequeue command */
- avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
- fatal_assert(avl_ret);
- spawn_cmd_queue.size = queue_size - 1;
- /* wake up callers */
- uv_cond_signal(&spawn_cmd_queue.cond);
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- }
- /*
- * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
- * only writer as far as struct spawn_cmd_info entries are concerned.
- */
- static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
- {
- struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;
- if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
- *cmdinfop = cmdinfo;
- return -1; /* break tree traversal */
- }
- return 0; /* continue traversing */
- }
- struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
- {
- struct spawn_cmd_info *cmdinfo;
- unsigned queue_size;
- int ret;
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- queue_size = spawn_cmd_queue.size;
- if (queue_size == 0) {
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- return NULL;
- }
- /* find command */
- cmdinfo = NULL;
- ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
- if (-1 != ret) { /* no commands available for processing */
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- return NULL;
- }
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- return cmdinfo;
- }
- /**
- * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
- * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
- * The caller has to be the netdata user as configured.
- *
- * @param loop the libuv loop of the caller context
- * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share
- * @param process the spawn server libuv process context
- * @return 0 on success or the libuv error code
- */
- int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
- {
- uv_process_options_t options = {0};
- char *args[3];
- int ret;
- #define SPAWN_SERVER_DESCRIPTORS (3)
- uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
- struct passwd *passwd = NULL;
- char *user = NULL;
- passwd = getpwuid(getuid());
- user = (passwd && passwd->pw_name) ? passwd->pw_name : "";
- args[0] = exepath;
- args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
- args[2] = NULL;
- memset(&options, 0, sizeof(options));
- options.file = exepath;
- options.args = args;
- options.exit_cb = NULL; //exit_cb;
- options.stdio = stdio;
- options.stdio_count = SPAWN_SERVER_DESCRIPTORS;
- stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
- stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
- stdio[1].flags = UV_INHERIT_FD;
- stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
- stdio[2].flags = UV_INHERIT_FD;
- stdio[2].data.fd = 2 /* UV_STDERR_FD */;
- ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
- if (0 != ret) {
- error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", exepath, user, uv_strerror(ret));
- fatal("Cannot start netdata without the spawn server.");
- }
- return ret;
- }
- #define CONCURRENT_SPAWNS 16
- #define SPAWN_ITERATIONS 10000
- #undef CONCURRENT_STRESS_TEST
- void spawn_init(void)
- {
- struct completion completion;
- int error;
- info("Initializing spawn client.");
- init_spawn_cmd_queue();
- completion_init(&completion);
- error = uv_thread_create(&thread, spawn_client, &completion);
- if (error) {
- error("uv_thread_create(): %s", uv_strerror(error));
- goto after_error;
- }
- /* wait for spawn client thread to initialize */
- completion_wait_for(&completion);
- completion_destroy(&completion);
- uv_thread_set_name_np(thread, "DAEMON_SPAWN");
- if (spawn_thread_error) {
- error = uv_thread_join(&thread);
- if (error) {
- error("uv_thread_create(): %s", uv_strerror(error));
- }
- goto after_error;
- }
- #ifdef CONCURRENT_STRESS_TEST
- signals_reset();
- signals_unblock();
- sleep(60);
- uint64_t serial[CONCURRENT_SPAWNS];
- for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
- for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
- char cmd[64];
- sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
- serial[i] = spawn_enq_cmd(cmd);
- info("Queued command %s for spawning.", cmd);
- }
- int exit_status;
- time_t exec_run_timestamp;
- for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
- info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
- exec_run_timestamp);
- spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
- info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
- exec_run_timestamp);
- }
- }
- exit(0);
- #endif
- return;
- after_error:
- error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
- }
|