spawn_client.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "spawn.h"
  3. static uv_process_t process;
  4. static uv_pipe_t spawn_channel;
  5. static uv_loop_t *loop;
  6. uv_async_t spawn_async;
  7. static char prot_buffer[MAX_COMMAND_LENGTH];
  8. static unsigned prot_buffer_len = 0;
  9. static void async_cb(uv_async_t *handle)
  10. {
  11. uv_stop(handle->loop);
  12. }
  13. static void after_pipe_write(uv_write_t* req, int status)
  14. {
  15. (void)status;
  16. #ifdef SPAWN_DEBUG
  17. netdata_log_info("CLIENT %s called status=%d", __func__, status);
  18. #endif
  19. void **data = req->data;
  20. freez(data[0]);
  21. freez(data[1]);
  22. freez(data);
  23. }
  24. static void client_parse_spawn_protocol(unsigned source_len, char *source)
  25. {
  26. unsigned required_len;
  27. struct spawn_prot_header *header;
  28. struct spawn_prot_spawn_result *spawn_result;
  29. struct spawn_prot_cmd_exit_status *exit_status;
  30. struct spawn_cmd_info *cmdinfo;
  31. while (source_len) {
  32. required_len = sizeof(*header);
  33. if (prot_buffer_len < required_len)
  34. copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
  35. if (prot_buffer_len < required_len)
  36. return; /* Source buffer ran out */
  37. header = (struct spawn_prot_header *)prot_buffer;
  38. cmdinfo = (struct spawn_cmd_info *)header->handle;
  39. fatal_assert(NULL != cmdinfo);
  40. switch(header->opcode) {
  41. case SPAWN_PROT_SPAWN_RESULT:
  42. required_len += sizeof(*spawn_result);
  43. if (prot_buffer_len < required_len)
  44. copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
  45. if (prot_buffer_len < required_len)
  46. return; /* Source buffer ran out */
  47. spawn_result = (struct spawn_prot_spawn_result *)(header + 1);
  48. uv_mutex_lock(&cmdinfo->mutex);
  49. cmdinfo->pid = spawn_result->exec_pid;
  50. if (0 == cmdinfo->pid) { /* Failed to spawn */
  51. #ifdef SPAWN_DEBUG
  52. netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__);
  53. #endif
  54. cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE;
  55. uv_cond_signal(&cmdinfo->cond);
  56. } else {
  57. cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp;
  58. cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS;
  59. #ifdef SPAWN_DEBUG
  60. netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__);
  61. #endif
  62. }
  63. uv_mutex_unlock(&cmdinfo->mutex);
  64. prot_buffer_len = 0;
  65. break;
  66. case SPAWN_PROT_CMD_EXIT_STATUS:
  67. required_len += sizeof(*exit_status);
  68. if (prot_buffer_len < required_len)
  69. copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
  70. if (prot_buffer_len < required_len)
  71. return; /* Source buffer ran out */
  72. exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1);
  73. uv_mutex_lock(&cmdinfo->mutex);
  74. cmdinfo->exit_status = exit_status->exec_exit_status;
  75. #ifdef SPAWN_DEBUG
  76. netdata_log_info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status);
  77. #endif
  78. cmdinfo->flags |= SPAWN_CMD_DONE;
  79. uv_cond_signal(&cmdinfo->cond);
  80. uv_mutex_unlock(&cmdinfo->mutex);
  81. prot_buffer_len = 0;
  82. break;
  83. default:
  84. fatal_assert(0);
  85. break;
  86. }
  87. }
  88. }
  89. static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf)
  90. {
  91. if (0 == nread) {
  92. netdata_log_info("%s: Zero bytes read from spawn pipe.", __func__);
  93. } else if (UV_EOF == nread) {
  94. netdata_log_info("EOF found in spawn pipe.");
  95. } else if (nread < 0) {
  96. netdata_log_error("%s: %s", __func__, uv_strerror(nread));
  97. }
  98. if (nread < 0) { /* stop stream due to EOF or error */
  99. (void)uv_read_stop((uv_stream_t *)pipe);
  100. } else if (nread) {
  101. #ifdef SPAWN_DEBUG
  102. netdata_log_info("CLIENT %s read %u", __func__, (unsigned)nread);
  103. #endif
  104. client_parse_spawn_protocol(nread, buf->base);
  105. }
  106. if (buf && buf->len) {
  107. freez(buf->base);
  108. }
  109. if (nread < 0) {
  110. uv_close((uv_handle_t *)pipe, NULL);
  111. }
  112. }
  113. static void on_read_alloc(uv_handle_t* handle,
  114. size_t suggested_size,
  115. uv_buf_t* buf)
  116. {
  117. (void)handle;
  118. buf->base = mallocz(suggested_size);
  119. buf->len = suggested_size;
  120. }
  121. static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo)
  122. {
  123. int ret;
  124. uv_buf_t *writebuf;
  125. struct write_context *write_ctx;
  126. void **data = callocz(2, sizeof(void *));
  127. writebuf = callocz(3, sizeof(uv_buf_t));
  128. write_ctx = callocz(1, sizeof(*write_ctx));
  129. data[0] = write_ctx;
  130. data[1] = writebuf;
  131. write_ctx->write_req.data = data;
  132. uv_mutex_lock(&cmdinfo->mutex);
  133. cmdinfo->flags |= SPAWN_CMD_PROCESSED;
  134. uv_mutex_unlock(&cmdinfo->mutex);
  135. write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD;
  136. write_ctx->header.handle = cmdinfo;
  137. write_ctx->payload.command_length = strlen(cmdinfo->command_to_run);
  138. writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
  139. writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload));
  140. writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length);
  141. #ifdef SPAWN_DEBUG
  142. netdata_log_info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial);
  143. #endif
  144. ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write);
  145. fatal_assert(ret == 0);
  146. }
  147. void spawn_client(void *arg)
  148. {
  149. int ret;
  150. struct completion *completion = (struct completion *)arg;
  151. loop = mallocz(sizeof(uv_loop_t));
  152. ret = uv_loop_init(loop);
  153. if (ret) {
  154. netdata_log_error("uv_loop_init(): %s", uv_strerror(ret));
  155. spawn_thread_error = ret;
  156. goto error_after_loop_init;
  157. }
  158. loop->data = NULL;
  159. spawn_async.data = NULL;
  160. ret = uv_async_init(loop, &spawn_async, async_cb);
  161. if (ret) {
  162. netdata_log_error("uv_async_init(): %s", uv_strerror(ret));
  163. spawn_thread_error = ret;
  164. goto error_after_async_init;
  165. }
  166. ret = uv_pipe_init(loop, &spawn_channel, 1);
  167. if (ret) {
  168. netdata_log_error("uv_pipe_init(): %s", uv_strerror(ret));
  169. spawn_thread_error = ret;
  170. goto error_after_pipe_init;
  171. }
  172. fatal_assert(spawn_channel.ipc);
  173. ret = create_spawn_server(loop, &spawn_channel, &process);
  174. if (ret) {
  175. netdata_log_error("Failed to fork spawn server process.");
  176. spawn_thread_error = ret;
  177. goto error_after_spawn_server;
  178. }
  179. spawn_thread_error = 0;
  180. spawn_thread_shutdown = 0;
  181. /* wake up initialization thread */
  182. completion_mark_complete(completion);
  183. prot_buffer_len = 0;
  184. ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read);
  185. fatal_assert(ret == 0);
  186. while (spawn_thread_shutdown == 0) {
  187. struct spawn_cmd_info *cmdinfo;
  188. uv_run(loop, UV_RUN_DEFAULT);
  189. while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) {
  190. spawn_process_cmd(cmdinfo);
  191. }
  192. }
  193. /* cleanup operations of the event loop */
  194. netdata_log_info("Shutting down spawn client event loop.");
  195. uv_close((uv_handle_t *)&spawn_channel, NULL);
  196. uv_close((uv_handle_t *)&spawn_async, NULL);
  197. uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
  198. netdata_log_info("Shutting down spawn client loop complete.");
  199. fatal_assert(0 == uv_loop_close(loop));
  200. return;
  201. error_after_spawn_server:
  202. uv_close((uv_handle_t *)&spawn_channel, NULL);
  203. error_after_pipe_init:
  204. uv_close((uv_handle_t *)&spawn_async, NULL);
  205. error_after_async_init:
  206. uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
  207. fatal_assert(0 == uv_loop_close(loop));
  208. error_after_loop_init:
  209. freez(loop);
  210. /* wake up initialization thread */
  211. completion_mark_complete(completion);
  212. }