232c93d07b74.c 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Test case for socket read/write through IORING_OP_READV and
  5. * IORING_OP_WRITEV, using both TCP and sockets and blocking and
  6. * non-blocking IO.
  7. *
  8. * Heavily based on a test case from Hrvoje Zeba <zeba.hrvoje@gmail.com>
  9. */
  10. #include <stdio.h>
  11. #include <stdlib.h>
  12. #include <stdint.h>
  13. #include <assert.h>
  14. #include <pthread.h>
  15. #include <errno.h>
  16. #include <fcntl.h>
  17. #include <unistd.h>
  18. #include <sys/socket.h>
  19. #include <sys/un.h>
  20. #include <netinet/tcp.h>
  21. #include <netinet/in.h>
  22. #include <arpa/inet.h>
  23. #include "helpers.h"
  24. #include "liburing.h"
  25. #define RECV_BUFF_SIZE 2
  26. #define SEND_BUFF_SIZE 3
  27. struct params {
  28. int tcp;
  29. int non_blocking;
  30. __be16 bind_port;
  31. };
  32. static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  33. static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  34. static int rcv_ready = 0;
  35. static void set_rcv_ready(void)
  36. {
  37. pthread_mutex_lock(&mutex);
  38. rcv_ready = 1;
  39. pthread_cond_signal(&cond);
  40. pthread_mutex_unlock(&mutex);
  41. }
  42. static void wait_for_rcv_ready(void)
  43. {
  44. pthread_mutex_lock(&mutex);
  45. while (!rcv_ready)
  46. pthread_cond_wait(&cond, &mutex);
  47. pthread_mutex_unlock(&mutex);
  48. }
  49. static void *rcv(void *arg)
  50. {
  51. struct params *p = arg;
  52. int s0;
  53. int res;
  54. if (p->tcp) {
  55. int val = 1;
  56. s0 = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
  57. res = setsockopt(s0, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
  58. assert(res != -1);
  59. res = setsockopt(s0, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
  60. assert(res != -1);
  61. struct sockaddr_in addr;
  62. addr.sin_family = AF_INET;
  63. addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  64. assert(t_bind_ephemeral_port(s0, &addr) == 0);
  65. p->bind_port = addr.sin_port;
  66. } else {
  67. s0 = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
  68. assert(s0 != -1);
  69. struct sockaddr_un addr;
  70. memset(&addr, 0, sizeof(addr));
  71. addr.sun_family = AF_UNIX;
  72. memcpy(addr.sun_path, "\0sock", 6);
  73. res = bind(s0, (struct sockaddr *) &addr, sizeof(addr));
  74. assert(res != -1);
  75. }
  76. res = listen(s0, 128);
  77. assert(res != -1);
  78. set_rcv_ready();
  79. int s1 = accept(s0, NULL, NULL);
  80. assert(s1 != -1);
  81. if (p->non_blocking) {
  82. int flags = fcntl(s1, F_GETFL, 0);
  83. assert(flags != -1);
  84. flags |= O_NONBLOCK;
  85. res = fcntl(s1, F_SETFL, flags);
  86. assert(res != -1);
  87. }
  88. struct io_uring m_io_uring;
  89. void *ret = NULL;
  90. res = io_uring_queue_init(32, &m_io_uring, 0);
  91. assert(res >= 0);
  92. int bytes_read = 0;
  93. int expected_byte = 0;
  94. int done = 0;
  95. while (!done && bytes_read != 33) {
  96. char buff[RECV_BUFF_SIZE];
  97. struct iovec iov;
  98. iov.iov_base = buff;
  99. iov.iov_len = sizeof(buff);
  100. struct io_uring_sqe *sqe = io_uring_get_sqe(&m_io_uring);
  101. assert(sqe != NULL);
  102. io_uring_prep_readv(sqe, s1, &iov, 1, 0);
  103. res = io_uring_submit(&m_io_uring);
  104. assert(res != -1);
  105. struct io_uring_cqe *cqe;
  106. unsigned head;
  107. unsigned count = 0;
  108. while (!done && count != 1) {
  109. io_uring_for_each_cqe(&m_io_uring, head, cqe) {
  110. if (cqe->res < 0)
  111. assert(cqe->res == -EAGAIN);
  112. else {
  113. int i;
  114. for (i = 0; i < cqe->res; i++) {
  115. if (buff[i] != expected_byte) {
  116. fprintf(stderr,
  117. "Received %d, wanted %d\n",
  118. buff[i], expected_byte);
  119. ret++;
  120. done = 1;
  121. }
  122. expected_byte++;
  123. }
  124. bytes_read += cqe->res;
  125. }
  126. count++;
  127. }
  128. assert(count <= 1);
  129. io_uring_cq_advance(&m_io_uring, count);
  130. }
  131. }
  132. shutdown(s1, SHUT_RDWR);
  133. close(s1);
  134. close(s0);
  135. io_uring_queue_exit(&m_io_uring);
  136. return ret;
  137. }
  138. static void *snd(void *arg)
  139. {
  140. struct params *p = arg;
  141. int s0;
  142. int ret;
  143. wait_for_rcv_ready();
  144. if (p->tcp) {
  145. int val = 1;
  146. s0 = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
  147. ret = setsockopt(s0, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
  148. assert(ret != -1);
  149. struct sockaddr_in addr;
  150. addr.sin_family = AF_INET;
  151. addr.sin_port = p->bind_port;
  152. addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  153. ret = connect(s0, (struct sockaddr*) &addr, sizeof(addr));
  154. assert(ret != -1);
  155. } else {
  156. s0 = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
  157. assert(s0 != -1);
  158. struct sockaddr_un addr;
  159. memset(&addr, 0, sizeof(addr));
  160. addr.sun_family = AF_UNIX;
  161. memcpy(addr.sun_path, "\0sock", 6);
  162. ret = connect(s0, (struct sockaddr*) &addr, sizeof(addr));
  163. assert(ret != -1);
  164. }
  165. if (p->non_blocking) {
  166. int flags = fcntl(s0, F_GETFL, 0);
  167. assert(flags != -1);
  168. flags |= O_NONBLOCK;
  169. ret = fcntl(s0, F_SETFL, flags);
  170. assert(ret != -1);
  171. }
  172. struct io_uring m_io_uring;
  173. ret = io_uring_queue_init(32, &m_io_uring, 0);
  174. assert(ret >= 0);
  175. int bytes_written = 0;
  176. int done = 0;
  177. while (!done && bytes_written != 33) {
  178. char buff[SEND_BUFF_SIZE];
  179. int i;
  180. for (i = 0; i < SEND_BUFF_SIZE; i++)
  181. buff[i] = i + bytes_written;
  182. struct iovec iov;
  183. iov.iov_base = buff;
  184. iov.iov_len = sizeof(buff);
  185. struct io_uring_sqe *sqe = io_uring_get_sqe(&m_io_uring);
  186. assert(sqe != NULL);
  187. io_uring_prep_writev(sqe, s0, &iov, 1, 0);
  188. ret = io_uring_submit(&m_io_uring);
  189. assert(ret != -1);
  190. struct io_uring_cqe *cqe;
  191. unsigned head;
  192. unsigned count = 0;
  193. while (!done && count != 1) {
  194. io_uring_for_each_cqe(&m_io_uring, head, cqe) {
  195. if (cqe->res < 0) {
  196. if (cqe->res == -EPIPE) {
  197. done = 1;
  198. break;
  199. }
  200. assert(cqe->res == -EAGAIN);
  201. } else {
  202. bytes_written += cqe->res;
  203. }
  204. count++;
  205. }
  206. assert(count <= 1);
  207. io_uring_cq_advance(&m_io_uring, count);
  208. }
  209. usleep(100000);
  210. }
  211. shutdown(s0, SHUT_RDWR);
  212. close(s0);
  213. io_uring_queue_exit(&m_io_uring);
  214. return NULL;
  215. }
  216. int main(int argc, char *argv[])
  217. {
  218. struct params p;
  219. pthread_t t1, t2;
  220. void *res1, *res2;
  221. int i, exit_val = T_EXIT_PASS;
  222. if (argc > 1)
  223. return T_EXIT_SKIP;
  224. for (i = 0; i < 4; i++) {
  225. p.tcp = i & 1;
  226. p.non_blocking = (i & 2) >> 1;
  227. rcv_ready = 0;
  228. pthread_create(&t1, NULL, rcv, &p);
  229. pthread_create(&t2, NULL, snd, &p);
  230. pthread_join(t1, &res1);
  231. pthread_join(t2, &res2);
  232. if (res1 || res2) {
  233. fprintf(stderr, "Failed tcp=%d, non_blocking=%d\n", p.tcp, p.non_blocking);
  234. exit_val = T_EXIT_FAIL;
  235. }
  236. }
  237. return exit_val;
  238. }