spawn.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "spawn.h"
  3. static uv_thread_t thread;
  4. int spawn_thread_error;
  5. int spawn_thread_shutdown;
  6. struct spawn_queue spawn_cmd_queue;
  7. static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run)
  8. {
  9. struct spawn_cmd_info *cmdinfo;
  10. cmdinfo = mallocz(sizeof(*cmdinfo));
  11. fatal_assert(0 == uv_cond_init(&cmdinfo->cond));
  12. fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex));
  13. cmdinfo->serial = 0; /* invalid */
  14. cmdinfo->command_to_run = strdupz(command_to_run);
  15. cmdinfo->exit_status = -1; /* invalid */
  16. cmdinfo->pid = -1; /* invalid */
  17. cmdinfo->flags = 0;
  18. return cmdinfo;
  19. }
  20. void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
  21. {
  22. uv_cond_destroy(&cmdinfo->cond);
  23. uv_mutex_destroy(&cmdinfo->mutex);
  24. freez(cmdinfo->command_to_run);
  25. freez(cmdinfo);
  26. }
  27. int spawn_cmd_compare(void *a, void *b)
  28. {
  29. struct spawn_cmd_info *cmda = a, *cmdb = b;
  30. /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
  31. if (cmda->serial < cmdb->serial) return -1;
  32. if (cmda->serial > cmdb->serial) return 1;
  33. return 0;
  34. }
  35. static void init_spawn_cmd_queue(void)
  36. {
  37. spawn_cmd_queue.cmd_tree.root = NULL;
  38. spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
  39. spawn_cmd_queue.size = 0;
  40. spawn_cmd_queue.latest_serial = 0;
  41. fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
  42. fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
  43. }
  44. /*
  45. * Returns serial number of the enqueued command
  46. */
  47. uint64_t spawn_enq_cmd(const char *command_to_run)
  48. {
  49. unsigned queue_size;
  50. uint64_t serial;
  51. avl_t *avl_ret;
  52. struct spawn_cmd_info *cmdinfo;
  53. cmdinfo = create_spawn_cmd(command_to_run);
  54. /* wait for free space in queue */
  55. uv_mutex_lock(&spawn_cmd_queue.mutex);
  56. while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
  57. uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
  58. }
  59. fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING);
  60. spawn_cmd_queue.size = queue_size + 1;
  61. serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
  62. cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */
  63. /* enqueue command */
  64. avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
  65. fatal_assert(avl_ret == (avl_t *)cmdinfo);
  66. uv_mutex_unlock(&spawn_cmd_queue.mutex);
  67. /* wake up event loop */
  68. fatal_assert(0 == uv_async_send(&spawn_async));
  69. return serial;
  70. }
  71. /*
  72. * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
  73. */
  74. void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
  75. {
  76. avl_t *avl_ret;
  77. struct spawn_cmd_info tmp, *cmdinfo;
  78. tmp.serial = serial;
  79. uv_mutex_lock(&spawn_cmd_queue.mutex);
  80. avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp);
  81. uv_mutex_unlock(&spawn_cmd_queue.mutex);
  82. fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
  83. cmdinfo = (struct spawn_cmd_info *)avl_ret;
  84. uv_mutex_lock(&cmdinfo->mutex);
  85. while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
  86. /* Only 1 thread is allowed to wait for this command to finish */
  87. uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
  88. }
  89. uv_mutex_unlock(&cmdinfo->mutex);
  90. spawn_deq_cmd(cmdinfo);
  91. *exit_status = cmdinfo->exit_status;
  92. *exec_run_timestamp = cmdinfo->exec_run_timestamp;
  93. destroy_spawn_cmd(cmdinfo);
  94. }
  95. void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
  96. {
  97. unsigned queue_size;
  98. avl_t *avl_ret;
  99. uv_mutex_lock(&spawn_cmd_queue.mutex);
  100. queue_size = spawn_cmd_queue.size;
  101. fatal_assert(queue_size);
  102. /* dequeue command */
  103. avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
  104. fatal_assert(avl_ret);
  105. spawn_cmd_queue.size = queue_size - 1;
  106. /* wake up callers */
  107. uv_cond_signal(&spawn_cmd_queue.cond);
  108. uv_mutex_unlock(&spawn_cmd_queue.mutex);
  109. }
  110. /*
  111. * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
  112. * only writer as far as struct spawn_cmd_info entries are concerned.
  113. */
  114. static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
  115. {
  116. struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;
  117. if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
  118. *cmdinfop = cmdinfo;
  119. return -1; /* break tree traversal */
  120. }
  121. return 0; /* continue traversing */
  122. }
  123. struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
  124. {
  125. struct spawn_cmd_info *cmdinfo;
  126. unsigned queue_size;
  127. int ret;
  128. uv_mutex_lock(&spawn_cmd_queue.mutex);
  129. queue_size = spawn_cmd_queue.size;
  130. if (queue_size == 0) {
  131. uv_mutex_unlock(&spawn_cmd_queue.mutex);
  132. return NULL;
  133. }
  134. /* find command */
  135. cmdinfo = NULL;
  136. ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
  137. if (-1 != ret) { /* no commands available for processing */
  138. uv_mutex_unlock(&spawn_cmd_queue.mutex);
  139. return NULL;
  140. }
  141. uv_mutex_unlock(&spawn_cmd_queue.mutex);
  142. return cmdinfo;
  143. }
  144. /**
  145. * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
  146. * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
  147. * The caller has to be the netdata user as configured.
  148. *
  149. * @param loop the libuv loop of the caller context
  150. * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share
  151. * @param process the spawn server libuv process context
  152. * @return 0 on success or the libuv error code
  153. */
  154. int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
  155. {
  156. uv_process_options_t options = {0};
  157. char *args[3];
  158. int ret;
  159. #define SPAWN_SERVER_DESCRIPTORS (3)
  160. uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
  161. struct passwd *passwd = NULL;
  162. char *user = NULL;
  163. passwd = getpwuid(getuid());
  164. user = (passwd && passwd->pw_name) ? passwd->pw_name : "";
  165. args[0] = exepath;
  166. args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
  167. args[2] = NULL;
  168. memset(&options, 0, sizeof(options));
  169. options.file = exepath;
  170. options.args = args;
  171. options.exit_cb = NULL; //exit_cb;
  172. options.stdio = stdio;
  173. options.stdio_count = SPAWN_SERVER_DESCRIPTORS;
  174. stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
  175. stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
  176. stdio[1].flags = UV_INHERIT_FD;
  177. stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
  178. stdio[2].flags = UV_INHERIT_FD;
  179. stdio[2].data.fd = 2 /* UV_STDERR_FD */;
  180. ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
  181. if (0 != ret) {
  182. error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", exepath, user, uv_strerror(ret));
  183. fatal("Cannot start netdata without the spawn server.");
  184. }
  185. return ret;
  186. }
  187. #define CONCURRENT_SPAWNS 16
  188. #define SPAWN_ITERATIONS 10000
  189. #undef CONCURRENT_STRESS_TEST
  190. void spawn_init(void)
  191. {
  192. struct completion completion;
  193. int error;
  194. info("Initializing spawn client.");
  195. init_spawn_cmd_queue();
  196. completion_init(&completion);
  197. error = uv_thread_create(&thread, spawn_client, &completion);
  198. if (error) {
  199. error("uv_thread_create(): %s", uv_strerror(error));
  200. goto after_error;
  201. }
  202. /* wait for spawn client thread to initialize */
  203. completion_wait_for(&completion);
  204. completion_destroy(&completion);
  205. uv_thread_set_name_np(thread, "DAEMON_SPAWN");
  206. if (spawn_thread_error) {
  207. error = uv_thread_join(&thread);
  208. if (error) {
  209. error("uv_thread_create(): %s", uv_strerror(error));
  210. }
  211. goto after_error;
  212. }
  213. #ifdef CONCURRENT_STRESS_TEST
  214. signals_reset();
  215. signals_unblock();
  216. sleep(60);
  217. uint64_t serial[CONCURRENT_SPAWNS];
  218. for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
  219. for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
  220. char cmd[64];
  221. sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
  222. serial[i] = spawn_enq_cmd(cmd);
  223. info("Queued command %s for spawning.", cmd);
  224. }
  225. int exit_status;
  226. time_t exec_run_timestamp;
  227. for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
  228. info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
  229. exec_run_timestamp);
  230. spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
  231. info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
  232. exec_run_timestamp);
  233. }
  234. }
  235. exit(0);
  236. #endif
  237. return;
  238. after_error:
  239. error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
  240. }