232c93d07b74.c 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Test case for socket read/write through IORING_OP_READV and
  5. * IORING_OP_WRITEV, using both TCP and sockets and blocking and
  6. * non-blocking IO.
  7. *
  8. * Heavily based on a test case from Hrvoje Zeba <zeba.hrvoje@gmail.com>
  9. */
  10. #include <stdio.h>
  11. #include <stdlib.h>
  12. #include <stdint.h>
  13. #include <assert.h>
  14. #include <pthread.h>
  15. #include <errno.h>
  16. #include <fcntl.h>
  17. #include <unistd.h>
  18. #include <sys/socket.h>
  19. #include <sys/un.h>
  20. #include <netinet/tcp.h>
  21. #include <netinet/in.h>
  22. #include <arpa/inet.h>
  23. #include "helpers.h"
  24. #include "liburing.h"
  25. #define RECV_BUFF_SIZE 2
  26. #define SEND_BUFF_SIZE 3
  27. struct params {
  28. int tcp;
  29. int non_blocking;
  30. __be16 bind_port;
  31. };
  32. static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  33. static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  34. static int rcv_ready = 0;
  35. static void set_rcv_ready(void)
  36. {
  37. pthread_mutex_lock(&mutex);
  38. rcv_ready = 1;
  39. pthread_cond_signal(&cond);
  40. pthread_mutex_unlock(&mutex);
  41. }
  42. static void wait_for_rcv_ready(void)
  43. {
  44. pthread_mutex_lock(&mutex);
  45. while (!rcv_ready)
  46. pthread_cond_wait(&cond, &mutex);
  47. pthread_mutex_unlock(&mutex);
  48. }
  49. static void *rcv(void *arg)
  50. {
  51. struct params *p = arg;
  52. int s0;
  53. int res;
  54. if (p->tcp) {
  55. int ret, val = 1;
  56. s0 = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
  57. res = setsockopt(s0, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
  58. assert(res != -1);
  59. res = setsockopt(s0, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
  60. assert(res != -1);
  61. struct sockaddr_in addr;
  62. addr.sin_family = AF_INET;
  63. addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  64. ret = t_bind_ephemeral_port(s0, &addr);
  65. assert(!ret);
  66. p->bind_port = addr.sin_port;
  67. } else {
  68. s0 = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
  69. assert(s0 != -1);
  70. struct sockaddr_un addr;
  71. memset(&addr, 0, sizeof(addr));
  72. addr.sun_family = AF_UNIX;
  73. memcpy(addr.sun_path, "\0sock", 6);
  74. res = bind(s0, (struct sockaddr *) &addr, sizeof(addr));
  75. assert(res != -1);
  76. }
  77. res = listen(s0, 128);
  78. assert(res != -1);
  79. set_rcv_ready();
  80. int s1 = accept(s0, NULL, NULL);
  81. assert(s1 != -1);
  82. if (p->non_blocking) {
  83. int flags = fcntl(s1, F_GETFL, 0);
  84. assert(flags != -1);
  85. flags |= O_NONBLOCK;
  86. res = fcntl(s1, F_SETFL, flags);
  87. assert(res != -1);
  88. }
  89. struct io_uring m_io_uring;
  90. void *ret = NULL;
  91. res = io_uring_queue_init(32, &m_io_uring, 0);
  92. assert(res >= 0);
  93. int bytes_read = 0;
  94. int expected_byte = 0;
  95. int done = 0;
  96. while (!done && bytes_read != 33) {
  97. char buff[RECV_BUFF_SIZE];
  98. struct iovec iov;
  99. iov.iov_base = buff;
  100. iov.iov_len = sizeof(buff);
  101. struct io_uring_sqe *sqe = io_uring_get_sqe(&m_io_uring);
  102. assert(sqe != NULL);
  103. io_uring_prep_readv(sqe, s1, &iov, 1, 0);
  104. res = io_uring_submit(&m_io_uring);
  105. assert(res != -1);
  106. struct io_uring_cqe *cqe;
  107. unsigned head;
  108. unsigned count = 0;
  109. while (!done && count != 1) {
  110. io_uring_for_each_cqe(&m_io_uring, head, cqe) {
  111. if (cqe->res < 0)
  112. assert(cqe->res == -EAGAIN);
  113. else {
  114. int i;
  115. for (i = 0; i < cqe->res; i++) {
  116. if (buff[i] != expected_byte) {
  117. fprintf(stderr,
  118. "Received %d, wanted %d\n",
  119. buff[i], expected_byte);
  120. ret++;
  121. done = 1;
  122. }
  123. expected_byte++;
  124. }
  125. bytes_read += cqe->res;
  126. }
  127. count++;
  128. }
  129. assert(count <= 1);
  130. io_uring_cq_advance(&m_io_uring, count);
  131. }
  132. }
  133. shutdown(s1, SHUT_RDWR);
  134. close(s1);
  135. close(s0);
  136. io_uring_queue_exit(&m_io_uring);
  137. return ret;
  138. }
  139. static void *snd(void *arg)
  140. {
  141. struct params *p = arg;
  142. int s0;
  143. int ret;
  144. wait_for_rcv_ready();
  145. if (p->tcp) {
  146. int val = 1;
  147. s0 = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
  148. ret = setsockopt(s0, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
  149. assert(ret != -1);
  150. struct sockaddr_in addr;
  151. addr.sin_family = AF_INET;
  152. addr.sin_port = p->bind_port;
  153. addr.sin_addr.s_addr = inet_addr("127.0.0.1");
  154. ret = connect(s0, (struct sockaddr*) &addr, sizeof(addr));
  155. assert(ret != -1);
  156. } else {
  157. s0 = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
  158. assert(s0 != -1);
  159. struct sockaddr_un addr;
  160. memset(&addr, 0, sizeof(addr));
  161. addr.sun_family = AF_UNIX;
  162. memcpy(addr.sun_path, "\0sock", 6);
  163. ret = connect(s0, (struct sockaddr*) &addr, sizeof(addr));
  164. assert(ret != -1);
  165. }
  166. if (p->non_blocking) {
  167. int flags = fcntl(s0, F_GETFL, 0);
  168. assert(flags != -1);
  169. flags |= O_NONBLOCK;
  170. ret = fcntl(s0, F_SETFL, flags);
  171. assert(ret != -1);
  172. }
  173. struct io_uring m_io_uring;
  174. ret = io_uring_queue_init(32, &m_io_uring, 0);
  175. assert(ret >= 0);
  176. int bytes_written = 0;
  177. int done = 0;
  178. while (!done && bytes_written != 33) {
  179. char buff[SEND_BUFF_SIZE];
  180. int i;
  181. for (i = 0; i < SEND_BUFF_SIZE; i++)
  182. buff[i] = i + bytes_written;
  183. struct iovec iov;
  184. iov.iov_base = buff;
  185. iov.iov_len = sizeof(buff);
  186. struct io_uring_sqe *sqe = io_uring_get_sqe(&m_io_uring);
  187. assert(sqe != NULL);
  188. io_uring_prep_writev(sqe, s0, &iov, 1, 0);
  189. ret = io_uring_submit(&m_io_uring);
  190. assert(ret != -1);
  191. struct io_uring_cqe *cqe;
  192. unsigned head;
  193. unsigned count = 0;
  194. while (!done && count != 1) {
  195. io_uring_for_each_cqe(&m_io_uring, head, cqe) {
  196. if (cqe->res < 0) {
  197. if (cqe->res == -EPIPE) {
  198. done = 1;
  199. break;
  200. }
  201. assert(cqe->res == -EAGAIN);
  202. } else {
  203. bytes_written += cqe->res;
  204. }
  205. count++;
  206. }
  207. assert(count <= 1);
  208. io_uring_cq_advance(&m_io_uring, count);
  209. }
  210. usleep(100000);
  211. }
  212. shutdown(s0, SHUT_RDWR);
  213. close(s0);
  214. io_uring_queue_exit(&m_io_uring);
  215. return NULL;
  216. }
  217. int main(int argc, char *argv[])
  218. {
  219. struct params p;
  220. pthread_t t1, t2;
  221. void *res1, *res2;
  222. int i, exit_val = T_EXIT_PASS;
  223. if (argc > 1)
  224. return T_EXIT_SKIP;
  225. for (i = 0; i < 4; i++) {
  226. p.tcp = i & 1;
  227. p.non_blocking = (i & 2) >> 1;
  228. rcv_ready = 0;
  229. pthread_create(&t1, NULL, rcv, &p);
  230. pthread_create(&t2, NULL, snd, &p);
  231. pthread_join(t1, &res1);
  232. pthread_join(t2, &res2);
  233. if (res1 || res2) {
  234. fprintf(stderr, "Failed tcp=%d, non_blocking=%d\n", p.tcp, p.non_blocking);
  235. exit_val = T_EXIT_FAIL;
  236. }
  237. }
  238. return exit_val;
  239. }