pipe.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/pipe.h>
  6. #include <aws/io/event_loop.h>
  7. #ifdef __GLIBC__
  8. # define __USE_GNU
  9. #endif
  10. /* TODO: move this detection to CMAKE and a config header */
  11. #if !defined(COMPAT_MODE) && defined(__GLIBC__) && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9) || __GLIBC__ > 2)
  12. # define HAVE_PIPE2 1
  13. #else
  14. # define HAVE_PIPE2 0
  15. #endif
  16. #include <errno.h>
  17. #include <fcntl.h>
  18. #include <unistd.h>
  19. /* This isn't defined on ancient linux distros (breaking the builds).
  20. * However, if this is a prebuild, we purposely build on an ancient system, but
  21. * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
  22. * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
  23. * gets passed as long as it does.
  24. */
  25. #ifndef O_CLOEXEC
  26. # define O_CLOEXEC 02000000
  27. #endif
  28. struct read_end_impl {
  29. struct aws_allocator *alloc;
  30. struct aws_io_handle handle;
  31. struct aws_event_loop *event_loop;
  32. aws_pipe_on_readable_fn *on_readable_user_callback;
  33. void *on_readable_user_data;
  34. /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up.
  35. * If clean_up() sees that the pointer is set, the bool it points to will get set true. */
  36. bool *did_user_callback_clean_up_read_end;
  37. bool is_subscribed;
  38. };
  39. struct pipe_write_request {
  40. struct aws_byte_cursor original_cursor;
  41. struct aws_byte_cursor cursor; /* tracks progress of write */
  42. size_t num_bytes_written;
  43. aws_pipe_on_write_completed_fn *user_callback;
  44. void *user_data;
  45. struct aws_linked_list_node list_node;
  46. /* True if the write-end is cleaned up while the user callback is being invoked */
  47. bool did_user_callback_clean_up_write_end;
  48. };
  49. struct write_end_impl {
  50. struct aws_allocator *alloc;
  51. struct aws_io_handle handle;
  52. struct aws_event_loop *event_loop;
  53. struct aws_linked_list write_list;
  54. /* Valid while invoking user callback on a completed write request. */
  55. struct pipe_write_request *currently_invoking_write_callback;
  56. bool is_writable;
  57. /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated pipe_write_request around
  58. * and re-using it whenever possible */
  59. };
  60. static void s_write_end_on_event(
  61. struct aws_event_loop *event_loop,
  62. struct aws_io_handle *handle,
  63. int events,
  64. void *user_data);
  65. static int s_translate_posix_error(int err) {
  66. AWS_ASSERT(err);
  67. switch (err) {
  68. case EPIPE:
  69. return AWS_IO_BROKEN_PIPE;
  70. default:
  71. return AWS_ERROR_SYS_CALL_FAILURE;
  72. }
  73. }
  74. static int s_raise_posix_error(int err) {
  75. return aws_raise_error(s_translate_posix_error(err));
  76. }
  77. AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) {
  78. int err;
  79. #if HAVE_PIPE2
  80. err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC);
  81. if (err) {
  82. return s_raise_posix_error(err);
  83. }
  84. return AWS_OP_SUCCESS;
  85. #else
  86. err = pipe(pipe_fds);
  87. if (err) {
  88. return s_raise_posix_error(err);
  89. }
  90. for (int i = 0; i < 2; ++i) {
  91. int flags = fcntl(pipe_fds[i], F_GETFL);
  92. if (flags == -1) {
  93. s_raise_posix_error(err);
  94. goto error;
  95. }
  96. flags |= O_NONBLOCK | O_CLOEXEC;
  97. if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) {
  98. s_raise_posix_error(err);
  99. goto error;
  100. }
  101. }
  102. return AWS_OP_SUCCESS;
  103. error:
  104. close(pipe_fds[0]);
  105. close(pipe_fds[1]);
  106. return AWS_OP_ERR;
  107. #endif
  108. }
  109. int aws_pipe_init(
  110. struct aws_pipe_read_end *read_end,
  111. struct aws_event_loop *read_end_event_loop,
  112. struct aws_pipe_write_end *write_end,
  113. struct aws_event_loop *write_end_event_loop,
  114. struct aws_allocator *allocator) {
  115. AWS_ASSERT(read_end);
  116. AWS_ASSERT(read_end_event_loop);
  117. AWS_ASSERT(write_end);
  118. AWS_ASSERT(write_end_event_loop);
  119. AWS_ASSERT(allocator);
  120. AWS_ZERO_STRUCT(*read_end);
  121. AWS_ZERO_STRUCT(*write_end);
  122. struct read_end_impl *read_impl = NULL;
  123. struct write_end_impl *write_impl = NULL;
  124. int err;
  125. /* Open pipe */
  126. int pipe_fds[2];
  127. err = aws_open_nonblocking_posix_pipe(pipe_fds);
  128. if (err) {
  129. return AWS_OP_ERR;
  130. }
  131. /* Init read-end */
  132. read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
  133. if (!read_impl) {
  134. goto error;
  135. }
  136. read_impl->alloc = allocator;
  137. read_impl->handle.data.fd = pipe_fds[0];
  138. read_impl->event_loop = read_end_event_loop;
  139. /* Init write-end */
  140. write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
  141. if (!write_impl) {
  142. goto error;
  143. }
  144. write_impl->alloc = allocator;
  145. write_impl->handle.data.fd = pipe_fds[1];
  146. write_impl->event_loop = write_end_event_loop;
  147. write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */
  148. aws_linked_list_init(&write_impl->write_list);
  149. read_end->impl_data = read_impl;
  150. write_end->impl_data = write_impl;
  151. err = aws_event_loop_subscribe_to_io_events(
  152. write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end);
  153. if (err) {
  154. goto error;
  155. }
  156. return AWS_OP_SUCCESS;
  157. error:
  158. close(pipe_fds[0]);
  159. close(pipe_fds[1]);
  160. if (read_impl) {
  161. aws_mem_release(allocator, read_impl);
  162. }
  163. if (write_impl) {
  164. aws_mem_release(allocator, write_impl);
  165. }
  166. read_end->impl_data = NULL;
  167. write_end->impl_data = NULL;
  168. return AWS_OP_ERR;
  169. }
  170. int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
  171. struct read_end_impl *read_impl = read_end->impl_data;
  172. if (!read_impl) {
  173. return aws_raise_error(AWS_IO_BROKEN_PIPE);
  174. }
  175. if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
  176. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  177. }
  178. if (read_impl->is_subscribed) {
  179. int err = aws_pipe_unsubscribe_from_readable_events(read_end);
  180. if (err) {
  181. return AWS_OP_ERR;
  182. }
  183. }
  184. /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */
  185. if (read_impl->did_user_callback_clean_up_read_end) {
  186. *read_impl->did_user_callback_clean_up_read_end = true;
  187. }
  188. close(read_impl->handle.data.fd);
  189. aws_mem_release(read_impl->alloc, read_impl);
  190. AWS_ZERO_STRUCT(*read_end);
  191. return AWS_OP_SUCCESS;
  192. }
  193. struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
  194. const struct read_end_impl *read_impl = read_end->impl_data;
  195. if (!read_impl) {
  196. aws_raise_error(AWS_IO_BROKEN_PIPE);
  197. return NULL;
  198. }
  199. return read_impl->event_loop;
  200. }
  201. struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
  202. const struct write_end_impl *write_impl = write_end->impl_data;
  203. if (!write_impl) {
  204. aws_raise_error(AWS_IO_BROKEN_PIPE);
  205. return NULL;
  206. }
  207. return write_impl->event_loop;
  208. }
  209. int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) {
  210. AWS_ASSERT(dst_buffer && dst_buffer->buffer);
  211. struct read_end_impl *read_impl = read_end->impl_data;
  212. if (!read_impl) {
  213. return aws_raise_error(AWS_IO_BROKEN_PIPE);
  214. }
  215. if (num_bytes_read) {
  216. *num_bytes_read = 0;
  217. }
  218. size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len;
  219. ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read);
  220. if (read_val < 0) {
  221. int errno_value = errno; /* Always cache errno before potential side-effect */
  222. if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
  223. return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
  224. }
  225. return s_raise_posix_error(errno_value);
  226. }
  227. /* Success */
  228. dst_buffer->len += read_val;
  229. if (num_bytes_read) {
  230. *num_bytes_read = read_val;
  231. }
  232. return AWS_OP_SUCCESS;
  233. }
  234. static void s_read_end_on_event(
  235. struct aws_event_loop *event_loop,
  236. struct aws_io_handle *handle,
  237. int events,
  238. void *user_data) {
  239. (void)event_loop;
  240. (void)handle;
  241. /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */
  242. struct aws_pipe_read_end *read_end = user_data;
  243. struct read_end_impl *read_impl = read_end->impl_data;
  244. AWS_ASSERT(read_impl);
  245. AWS_ASSERT(read_impl->event_loop == event_loop);
  246. AWS_ASSERT(&read_impl->handle == handle);
  247. AWS_ASSERT(read_impl->is_subscribed);
  248. AWS_ASSERT(events != 0);
  249. AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL);
  250. /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */
  251. bool did_user_callback_clean_up_read_end = false;
  252. read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end;
  253. /* If readable event received, tell user to try and read, even if "error" events have also occurred. */
  254. if (events & AWS_IO_EVENT_TYPE_READABLE) {
  255. read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data);
  256. if (did_user_callback_clean_up_read_end) {
  257. return;
  258. }
  259. events &= ~AWS_IO_EVENT_TYPE_READABLE;
  260. }
  261. if (events) {
  262. /* Check that user didn't unsubscribe in the previous callback */
  263. if (read_impl->is_subscribed) {
  264. read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data);
  265. if (did_user_callback_clean_up_read_end) {
  266. return;
  267. }
  268. }
  269. }
  270. read_impl->did_user_callback_clean_up_read_end = NULL;
  271. }
  272. int aws_pipe_subscribe_to_readable_events(
  273. struct aws_pipe_read_end *read_end,
  274. aws_pipe_on_readable_fn *on_readable,
  275. void *user_data) {
  276. AWS_ASSERT(on_readable);
  277. struct read_end_impl *read_impl = read_end->impl_data;
  278. if (!read_impl) {
  279. return aws_raise_error(AWS_IO_BROKEN_PIPE);
  280. }
  281. if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
  282. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  283. }
  284. if (read_impl->is_subscribed) {
  285. return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
  286. }
  287. read_impl->is_subscribed = true;
  288. read_impl->on_readable_user_callback = on_readable;
  289. read_impl->on_readable_user_data = user_data;
  290. int err = aws_event_loop_subscribe_to_io_events(
  291. read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end);
  292. if (err) {
  293. read_impl->is_subscribed = false;
  294. read_impl->on_readable_user_callback = NULL;
  295. read_impl->on_readable_user_data = NULL;
  296. return AWS_OP_ERR;
  297. }
  298. return AWS_OP_SUCCESS;
  299. }
  300. int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
  301. struct read_end_impl *read_impl = read_end->impl_data;
  302. if (!read_impl) {
  303. return aws_raise_error(AWS_IO_BROKEN_PIPE);
  304. }
  305. if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
  306. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  307. }
  308. if (!read_impl->is_subscribed) {
  309. return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
  310. }
  311. int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle);
  312. if (err) {
  313. return AWS_OP_ERR;
  314. }
  315. read_impl->is_subscribed = false;
  316. read_impl->on_readable_user_callback = NULL;
  317. read_impl->on_readable_user_data = NULL;
  318. return AWS_OP_SUCCESS;
  319. }
  320. /* Pop front write request, invoke its callback, and delete it.
  321. * Returns whether the callback resulted in the write-end getting cleaned up */
  322. static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) {
  323. struct write_end_impl *write_impl = write_end->impl_data;
  324. AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list));
  325. struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
  326. struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node);
  327. struct aws_allocator *alloc = write_impl->alloc;
  328. /* Let the write-end know that a callback is in process, so the write-end can inform the callback
  329. * whether it resulted in clean_up() being called. */
  330. bool write_end_cleaned_up_during_callback = false;
  331. struct pipe_write_request *prev_invoking_request = write_impl->currently_invoking_write_callback;
  332. write_impl->currently_invoking_write_callback = request;
  333. if (request->user_callback) {
  334. request->user_callback(write_end, error_code, request->original_cursor, request->user_data);
  335. write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end;
  336. }
  337. if (!write_end_cleaned_up_during_callback) {
  338. write_impl->currently_invoking_write_callback = prev_invoking_request;
  339. }
  340. aws_mem_release(alloc, request);
  341. return write_end_cleaned_up_during_callback;
  342. }
  343. /* Process write requests as long as the pipe remains writable */
  344. static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
  345. struct write_end_impl *write_impl = write_end->impl_data;
  346. AWS_ASSERT(write_impl);
  347. while (!aws_linked_list_empty(&write_impl->write_list)) {
  348. struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list);
  349. struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node);
  350. int completed_error_code = AWS_ERROR_SUCCESS;
  351. if (request->cursor.len > 0) {
  352. ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len);
  353. if (write_val < 0) {
  354. int errno_value = errno; /* Always cache errno before potential side-effect */
  355. if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
  356. /* The pipe is no longer writable. Bail out */
  357. write_impl->is_writable = false;
  358. return;
  359. }
  360. /* A non-recoverable error occurred during this write */
  361. completed_error_code = s_translate_posix_error(errno_value);
  362. } else {
  363. aws_byte_cursor_advance(&request->cursor, write_val);
  364. if (request->cursor.len > 0) {
  365. /* There was a partial write, loop again to try and write the rest. */
  366. continue;
  367. }
  368. }
  369. }
  370. /* If we got this far in the loop, then the write request is complete.
  371. * Note that the callback may result in the pipe being cleaned up. */
  372. bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code);
  373. if (write_end_cleaned_up) {
  374. /* Bail out! Any remaining requests were canceled during clean_up() */
  375. return;
  376. }
  377. }
  378. }
  379. /* Handle events on the write-end's file handle */
  380. static void s_write_end_on_event(
  381. struct aws_event_loop *event_loop,
  382. struct aws_io_handle *handle,
  383. int events,
  384. void *user_data) {
  385. (void)event_loop;
  386. (void)handle;
  387. /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */
  388. struct aws_pipe_write_end *write_end = user_data;
  389. struct write_end_impl *write_impl = write_end->impl_data;
  390. AWS_ASSERT(write_impl);
  391. AWS_ASSERT(write_impl->event_loop == event_loop);
  392. AWS_ASSERT(&write_impl->handle == handle);
  393. /* Only care about the writable event. */
  394. if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) {
  395. return;
  396. }
  397. write_impl->is_writable = true;
  398. s_write_end_process_requests(write_end);
  399. }
  400. int aws_pipe_write(
  401. struct aws_pipe_write_end *write_end,
  402. struct aws_byte_cursor src_buffer,
  403. aws_pipe_on_write_completed_fn *on_completed,
  404. void *user_data) {
  405. AWS_ASSERT(src_buffer.ptr);
  406. struct write_end_impl *write_impl = write_end->impl_data;
  407. if (!write_impl) {
  408. return aws_raise_error(AWS_IO_BROKEN_PIPE);
  409. }
  410. if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
  411. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  412. }
  413. struct pipe_write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct pipe_write_request));
  414. if (!request) {
  415. return AWS_OP_ERR;
  416. }
  417. request->original_cursor = src_buffer;
  418. request->cursor = src_buffer;
  419. request->user_callback = on_completed;
  420. request->user_data = user_data;
  421. aws_linked_list_push_back(&write_impl->write_list, &request->list_node);
  422. /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could
  423. * happen if a this aws_pipe_write() call was made by another write's completion callback */
  424. if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) {
  425. s_write_end_process_requests(write_end);
  426. }
  427. return AWS_OP_SUCCESS;
  428. }
  429. int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
  430. struct write_end_impl *write_impl = write_end->impl_data;
  431. if (!write_impl) {
  432. return aws_raise_error(AWS_IO_BROKEN_PIPE);
  433. }
  434. if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
  435. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  436. }
  437. int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle);
  438. if (err) {
  439. return AWS_OP_ERR;
  440. }
  441. close(write_impl->handle.data.fd);
  442. /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */
  443. AWS_ZERO_STRUCT(*write_end);
  444. /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */
  445. if (write_impl->currently_invoking_write_callback) {
  446. write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true;
  447. }
  448. /* Force any outstanding write requests to complete with an error status. */
  449. while (!aws_linked_list_empty(&write_impl->write_list)) {
  450. struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
  451. struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node);
  452. if (request->user_callback) {
  453. request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data);
  454. }
  455. aws_mem_release(write_impl->alloc, request);
  456. }
  457. aws_mem_release(write_impl->alloc, write_impl);
  458. return AWS_OP_SUCCESS;
  459. }