ring-leak2.c 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Description: Test two ring deadlock. A buggy kernel will end up
  5. * having io_wq_* workers pending, as the circular reference
  6. * will prevent full exit.
  7. *
  8. * Based on a test case from Josef <josef.grieb@gmail.com>
  9. *
  10. */
  11. #include <errno.h>
  12. #include <fcntl.h>
  13. #include <netinet/in.h>
  14. #include <stdio.h>
  15. #include <stdlib.h>
  16. #include <string.h>
  17. #include <strings.h>
  18. #include <poll.h>
  19. #include <sys/socket.h>
  20. #include <unistd.h>
  21. #include <sys/eventfd.h>
  22. #include <pthread.h>
  23. #include "liburing.h"
  24. #include "../src/syscall.h"
  25. enum {
  26. ACCEPT,
  27. READ,
  28. WRITE,
  29. POLLING_IN,
  30. POLLING_RDHUP,
  31. CLOSE,
  32. EVENTFD_READ,
  33. };
  34. typedef struct conn_info {
  35. __u32 fd;
  36. __u16 type;
  37. __u16 bid;
  38. } conn_info;
  39. static char read_eventfd_buffer[8];
  40. static pthread_mutex_t lock;
  41. static struct io_uring *client_ring;
  42. static int client_eventfd = -1;
  43. static int setup_io_uring(struct io_uring *ring)
  44. {
  45. struct io_uring_params p = { };
  46. int ret;
  47. ret = io_uring_queue_init_params(8, ring, &p);
  48. if (ret) {
  49. fprintf(stderr, "Unable to setup io_uring: %s\n",
  50. strerror(-ret));
  51. return 1;
  52. }
  53. return 0;
  54. }
  55. static void add_socket_eventfd_read(struct io_uring *ring, int fd)
  56. {
  57. struct io_uring_sqe *sqe;
  58. conn_info conn_i = {
  59. .fd = fd,
  60. .type = EVENTFD_READ,
  61. };
  62. sqe = io_uring_get_sqe(ring);
  63. io_uring_prep_read(sqe, fd, &read_eventfd_buffer, 8, 0);
  64. io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
  65. memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
  66. }
  67. static void add_socket_pollin(struct io_uring *ring, int fd)
  68. {
  69. struct io_uring_sqe *sqe;
  70. conn_info conn_i = {
  71. .fd = fd,
  72. .type = POLLING_IN,
  73. };
  74. sqe = io_uring_get_sqe(ring);
  75. io_uring_prep_poll_add(sqe, fd, POLL_IN);
  76. memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
  77. }
  78. static void *server_thread(void *arg)
  79. {
  80. struct sockaddr_in serv_addr;
  81. int port = 0;
  82. int sock_listen_fd, evfd;
  83. const int val = 1;
  84. struct io_uring ring;
  85. sock_listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
  86. setsockopt(sock_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
  87. memset(&serv_addr, 0, sizeof(serv_addr));
  88. serv_addr.sin_family = AF_INET;
  89. serv_addr.sin_port = htons(port);
  90. serv_addr.sin_addr.s_addr = INADDR_ANY;
  91. evfd = eventfd(0, EFD_CLOEXEC);
  92. // bind and listen
  93. if (bind(sock_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
  94. perror("Error binding socket...\n");
  95. exit(1);
  96. }
  97. if (listen(sock_listen_fd, 1) < 0) {
  98. perror("Error listening on socket...\n");
  99. exit(1);
  100. }
  101. setup_io_uring(&ring);
  102. add_socket_eventfd_read(&ring, evfd);
  103. add_socket_pollin(&ring, sock_listen_fd);
  104. while (1) {
  105. struct io_uring_cqe *cqe;
  106. unsigned head;
  107. unsigned count = 0;
  108. io_uring_submit_and_wait(&ring, 1);
  109. io_uring_for_each_cqe(&ring, head, cqe) {
  110. struct conn_info conn_i;
  111. count++;
  112. memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));
  113. if (conn_i.type == ACCEPT) {
  114. int sock_conn_fd = cqe->res;
  115. // only read when there is no error, >= 0
  116. if (sock_conn_fd > 0) {
  117. add_socket_pollin(&ring, sock_listen_fd);
  118. pthread_mutex_lock(&lock);
  119. io_uring_submit(client_ring);
  120. pthread_mutex_unlock(&lock);
  121. }
  122. } else if (conn_i.type == POLLING_IN) {
  123. break;
  124. }
  125. }
  126. io_uring_cq_advance(&ring, count);
  127. }
  128. }
  129. static void *client_thread(void *arg)
  130. {
  131. struct io_uring ring;
  132. int ret;
  133. setup_io_uring(&ring);
  134. client_ring = &ring;
  135. client_eventfd = eventfd(0, EFD_CLOEXEC);
  136. pthread_mutex_lock(&lock);
  137. add_socket_eventfd_read(&ring, client_eventfd);
  138. pthread_mutex_unlock(&lock);
  139. while (1) {
  140. struct io_uring_cqe *cqe;
  141. unsigned head;
  142. unsigned count = 0;
  143. pthread_mutex_lock(&lock);
  144. io_uring_submit(&ring);
  145. pthread_mutex_unlock(&lock);
  146. ret = __sys_io_uring_enter(ring.ring_fd, 0, 1, IORING_ENTER_GETEVENTS, NULL);
  147. if (ret < 0) {
  148. perror("Error io_uring_enter...\n");
  149. exit(1);
  150. }
  151. // go through all CQEs
  152. io_uring_for_each_cqe(&ring, head, cqe) {
  153. struct conn_info conn_i;
  154. int type;
  155. count++;
  156. memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));
  157. type = conn_i.type;
  158. if (type == READ) {
  159. pthread_mutex_lock(&lock);
  160. if (cqe->res <= 0) {
  161. // connection closed or error
  162. shutdown(conn_i.fd, SHUT_RDWR);
  163. } else {
  164. pthread_mutex_unlock(&lock);
  165. break;
  166. }
  167. add_socket_pollin(&ring, conn_i.fd);
  168. pthread_mutex_unlock(&lock);
  169. } else if (type == WRITE) {
  170. } else if (type == POLLING_IN) {
  171. break;
  172. } else if (type == POLLING_RDHUP) {
  173. break;
  174. } else if (type == CLOSE) {
  175. } else if (type == EVENTFD_READ) {
  176. add_socket_eventfd_read(&ring, client_eventfd);
  177. }
  178. }
  179. io_uring_cq_advance(&ring, count);
  180. }
  181. }
  182. static void sig_alrm(int sig)
  183. {
  184. exit(0);
  185. }
  186. int main(int argc, char *argv[])
  187. {
  188. pthread_t server_thread_t, client_thread_t;
  189. struct sigaction act;
  190. if (argc > 1)
  191. return 0;
  192. if (pthread_mutex_init(&lock, NULL) != 0) {
  193. printf("\n mutex init failed\n");
  194. return 1;
  195. }
  196. pthread_create(&server_thread_t, NULL, &server_thread, NULL);
  197. pthread_create(&client_thread_t, NULL, &client_thread, NULL);
  198. memset(&act, 0, sizeof(act));
  199. act.sa_handler = sig_alrm;
  200. act.sa_flags = SA_RESTART;
  201. sigaction(SIGALRM, &act, NULL);
  202. alarm(1);
  203. pthread_join(server_thread_t, NULL);
  204. return 0;
  205. }