spawn_client.c 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "spawn.h"
  3. static uv_process_t process;
  4. static uv_pipe_t spawn_channel;
  5. static uv_loop_t *loop;
  6. uv_async_t spawn_async;
  7. static char prot_buffer[MAX_COMMAND_LENGTH];
  8. static unsigned prot_buffer_len = 0;
  9. static void async_cb(uv_async_t *handle)
  10. {
  11. uv_stop(handle->loop);
  12. }
  13. static void after_pipe_write(uv_write_t* req, int status)
  14. {
  15. (void)status;
  16. #ifdef SPAWN_DEBUG
  17. info("CLIENT %s called status=%d", __func__, status);
  18. #endif
  19. freez(req->data);
  20. }
  21. static void client_parse_spawn_protocol(unsigned source_len, char *source)
  22. {
  23. unsigned required_len;
  24. struct spawn_prot_header *header;
  25. struct spawn_prot_spawn_result *spawn_result;
  26. struct spawn_prot_cmd_exit_status *exit_status;
  27. struct spawn_cmd_info *cmdinfo;
  28. while (source_len) {
  29. required_len = sizeof(*header);
  30. if (prot_buffer_len < required_len)
  31. copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
  32. if (prot_buffer_len < required_len)
  33. return; /* Source buffer ran out */
  34. header = (struct spawn_prot_header *)prot_buffer;
  35. cmdinfo = (struct spawn_cmd_info *)header->handle;
  36. fatal_assert(NULL != cmdinfo);
  37. switch(header->opcode) {
  38. case SPAWN_PROT_SPAWN_RESULT:
  39. required_len += sizeof(*spawn_result);
  40. if (prot_buffer_len < required_len)
  41. copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
  42. if (prot_buffer_len < required_len)
  43. return; /* Source buffer ran out */
  44. spawn_result = (struct spawn_prot_spawn_result *)(header + 1);
  45. uv_mutex_lock(&cmdinfo->mutex);
  46. cmdinfo->pid = spawn_result->exec_pid;
  47. if (0 == cmdinfo->pid) { /* Failed to spawn */
  48. #ifdef SPAWN_DEBUG
  49. info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__);
  50. #endif
  51. cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE;
  52. uv_cond_signal(&cmdinfo->cond);
  53. } else {
  54. cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp;
  55. cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS;
  56. #ifdef SPAWN_DEBUG
  57. info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__);
  58. #endif
  59. }
  60. uv_mutex_unlock(&cmdinfo->mutex);
  61. prot_buffer_len = 0;
  62. break;
  63. case SPAWN_PROT_CMD_EXIT_STATUS:
  64. required_len += sizeof(*exit_status);
  65. if (prot_buffer_len < required_len)
  66. copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
  67. if (prot_buffer_len < required_len)
  68. return; /* Source buffer ran out */
  69. exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1);
  70. uv_mutex_lock(&cmdinfo->mutex);
  71. cmdinfo->exit_status = exit_status->exec_exit_status;
  72. #ifdef SPAWN_DEBUG
  73. info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status);
  74. #endif
  75. cmdinfo->flags |= SPAWN_CMD_DONE;
  76. uv_cond_signal(&cmdinfo->cond);
  77. uv_mutex_unlock(&cmdinfo->mutex);
  78. prot_buffer_len = 0;
  79. break;
  80. default:
  81. fatal_assert(0);
  82. break;
  83. }
  84. }
  85. }
  86. static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf)
  87. {
  88. if (0 == nread) {
  89. info("%s: Zero bytes read from spawn pipe.", __func__);
  90. } else if (UV_EOF == nread) {
  91. info("EOF found in spawn pipe.");
  92. } else if (nread < 0) {
  93. error("%s: %s", __func__, uv_strerror(nread));
  94. }
  95. if (nread < 0) { /* stop stream due to EOF or error */
  96. (void)uv_read_stop((uv_stream_t *)pipe);
  97. } else if (nread) {
  98. #ifdef SPAWN_DEBUG
  99. info("CLIENT %s read %u", __func__, (unsigned)nread);
  100. #endif
  101. client_parse_spawn_protocol(nread, buf->base);
  102. }
  103. if (buf && buf->len) {
  104. freez(buf->base);
  105. }
  106. if (nread < 0) {
  107. uv_close((uv_handle_t *)pipe, NULL);
  108. }
  109. }
  110. static void on_read_alloc(uv_handle_t* handle,
  111. size_t suggested_size,
  112. uv_buf_t* buf)
  113. {
  114. (void)handle;
  115. buf->base = mallocz(suggested_size);
  116. buf->len = suggested_size;
  117. }
  118. static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo)
  119. {
  120. int ret;
  121. uv_buf_t writebuf[3];
  122. struct write_context *write_ctx;
  123. write_ctx = callocz(1, sizeof(*write_ctx));
  124. write_ctx->write_req.data = write_ctx;
  125. uv_mutex_lock(&cmdinfo->mutex);
  126. cmdinfo->flags |= SPAWN_CMD_PROCESSED;
  127. uv_mutex_unlock(&cmdinfo->mutex);
  128. write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD;
  129. write_ctx->header.handle = cmdinfo;
  130. write_ctx->payload.command_length = strlen(cmdinfo->command_to_run);
  131. writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
  132. writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload));
  133. writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length);
  134. #ifdef SPAWN_DEBUG
  135. info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial);
  136. #endif
  137. ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write);
  138. fatal_assert(ret == 0);
  139. }
  140. void spawn_client(void *arg)
  141. {
  142. int ret;
  143. struct completion *completion = (struct completion *)arg;
  144. loop = mallocz(sizeof(uv_loop_t));
  145. ret = uv_loop_init(loop);
  146. if (ret) {
  147. error("uv_loop_init(): %s", uv_strerror(ret));
  148. spawn_thread_error = ret;
  149. goto error_after_loop_init;
  150. }
  151. loop->data = NULL;
  152. spawn_async.data = NULL;
  153. ret = uv_async_init(loop, &spawn_async, async_cb);
  154. if (ret) {
  155. error("uv_async_init(): %s", uv_strerror(ret));
  156. spawn_thread_error = ret;
  157. goto error_after_async_init;
  158. }
  159. ret = uv_pipe_init(loop, &spawn_channel, 1);
  160. if (ret) {
  161. error("uv_pipe_init(): %s", uv_strerror(ret));
  162. spawn_thread_error = ret;
  163. goto error_after_pipe_init;
  164. }
  165. fatal_assert(spawn_channel.ipc);
  166. ret = create_spawn_server(loop, &spawn_channel, &process);
  167. if (ret) {
  168. error("Failed to fork spawn server process.");
  169. spawn_thread_error = ret;
  170. goto error_after_spawn_server;
  171. }
  172. spawn_thread_error = 0;
  173. spawn_thread_shutdown = 0;
  174. /* wake up initialization thread */
  175. completion_mark_complete(completion);
  176. prot_buffer_len = 0;
  177. ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read);
  178. fatal_assert(ret == 0);
  179. while (spawn_thread_shutdown == 0) {
  180. struct spawn_cmd_info *cmdinfo;
  181. uv_run(loop, UV_RUN_DEFAULT);
  182. while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) {
  183. spawn_process_cmd(cmdinfo);
  184. }
  185. }
  186. /* cleanup operations of the event loop */
  187. info("Shutting down spawn client event loop.");
  188. uv_close((uv_handle_t *)&spawn_channel, NULL);
  189. uv_close((uv_handle_t *)&spawn_async, NULL);
  190. uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
  191. info("Shutting down spawn client loop complete.");
  192. fatal_assert(0 == uv_loop_close(loop));
  193. return;
  194. error_after_spawn_server:
  195. uv_close((uv_handle_t *)&spawn_channel, NULL);
  196. error_after_pipe_init:
  197. uv_close((uv_handle_t *)&spawn_async, NULL);
  198. error_after_async_init:
  199. uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
  200. fatal_assert(0 == uv_loop_close(loop));
  201. error_after_loop_init:
  202. freez(loop);
  203. /* wake up initialization thread */
  204. completion_mark_complete(completion);
  205. }