recv-msgall-stream.c 7.5 KB


  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Test MSG_WAITALL for recv/recvmsg and include normal sync versions just
  5. * for comparison.
  6. */
  7. #include <assert.h>
  8. #include <errno.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <unistd.h>
  13. #include <fcntl.h>
  14. #include <arpa/inet.h>
  15. #include <sys/types.h>
  16. #include <sys/socket.h>
  17. #include <pthread.h>
  18. #include "liburing.h"
  19. #include "helpers.h"
  20. #define MAX_MSG 128
  21. struct recv_data {
  22. pthread_mutex_t mutex;
  23. int use_recvmsg;
  24. int use_sync;
  25. __be16 port;
  26. };
  27. static int get_conn_sock(struct recv_data *rd, int *sockout)
  28. {
  29. struct sockaddr_in saddr;
  30. int sockfd, ret, val;
  31. memset(&saddr, 0, sizeof(saddr));
  32. saddr.sin_family = AF_INET;
  33. saddr.sin_addr.s_addr = htonl(INADDR_ANY);
  34. sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
  35. if (sockfd < 0) {
  36. perror("socket");
  37. goto err;
  38. }
  39. val = 1;
  40. setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
  41. setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
  42. if (t_bind_ephemeral_port(sockfd, &saddr)) {
  43. perror("bind");
  44. goto err;
  45. }
  46. rd->port = saddr.sin_port;
  47. ret = listen(sockfd, 16);
  48. if (ret < 0) {
  49. perror("listen");
  50. goto err;
  51. }
  52. pthread_mutex_unlock(&rd->mutex);
  53. ret = accept(sockfd, NULL, NULL);
  54. if (ret < 0) {
  55. perror("accept");
  56. return -1;
  57. }
  58. *sockout = sockfd;
  59. return ret;
  60. err:
  61. pthread_mutex_unlock(&rd->mutex);
  62. return -1;
  63. }
  64. static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
  65. struct recv_data *rd)
  66. {
  67. struct io_uring_sqe *sqe;
  68. struct msghdr msg = { };
  69. int sockfd, sockout = -1, ret;
  70. sockfd = get_conn_sock(rd, &sockout);
  71. if (sockfd < 0)
  72. goto err;
  73. sqe = io_uring_get_sqe(ring);
  74. if (!rd->use_recvmsg) {
  75. io_uring_prep_recv(sqe, sockfd, iov->iov_base, iov->iov_len,
  76. MSG_WAITALL);
  77. } else {
  78. msg.msg_namelen = sizeof(struct sockaddr_in);
  79. msg.msg_iov = iov;
  80. msg.msg_iovlen = 1;
  81. io_uring_prep_recvmsg(sqe, sockfd, &msg, MSG_WAITALL);
  82. }
  83. sqe->user_data = 2;
  84. ret = io_uring_submit(ring);
  85. if (ret <= 0) {
  86. fprintf(stderr, "submit failed: %d\n", ret);
  87. goto err;
  88. }
  89. *sock = sockfd;
  90. return 0;
  91. err:
  92. if (sockout != -1) {
  93. shutdown(sockout, SHUT_RDWR);
  94. close(sockout);
  95. }
  96. if (sockfd != -1) {
  97. shutdown(sockfd, SHUT_RDWR);
  98. close(sockfd);
  99. }
  100. return 1;
  101. }
  102. static int do_recv(struct io_uring *ring)
  103. {
  104. struct io_uring_cqe *cqe;
  105. int ret;
  106. ret = io_uring_wait_cqe(ring, &cqe);
  107. if (ret) {
  108. fprintf(stdout, "wait_cqe: %d\n", ret);
  109. goto err;
  110. }
  111. if (cqe->res == -EINVAL) {
  112. fprintf(stdout, "recv not supported, skipping\n");
  113. return 0;
  114. }
  115. if (cqe->res < 0) {
  116. fprintf(stderr, "failed cqe: %d\n", cqe->res);
  117. goto err;
  118. }
  119. if (cqe->res != MAX_MSG * sizeof(int)) {
  120. fprintf(stderr, "got wrong length: %d\n", cqe->res);
  121. goto err;
  122. }
  123. io_uring_cqe_seen(ring, cqe);
  124. return 0;
  125. err:
  126. return 1;
  127. }
  128. static int recv_sync(struct recv_data *rd)
  129. {
  130. int buf[MAX_MSG];
  131. struct iovec iov = {
  132. .iov_base = buf,
  133. .iov_len = sizeof(buf),
  134. };
  135. int i, ret, sockfd, sockout = -1;
  136. sockfd = get_conn_sock(rd, &sockout);
  137. if (rd->use_recvmsg) {
  138. struct msghdr msg = { };
  139. msg.msg_namelen = sizeof(struct sockaddr_in);
  140. msg.msg_iov = &iov;
  141. msg.msg_iovlen = 1;
  142. ret = recvmsg(sockfd, &msg, MSG_WAITALL);
  143. } else {
  144. ret = recv(sockfd, buf, sizeof(buf), MSG_WAITALL);
  145. }
  146. if (ret < 0) {
  147. perror("receive");
  148. goto err;
  149. }
  150. if (ret != sizeof(buf)) {
  151. ret = -1;
  152. goto err;
  153. }
  154. for (i = 0; i < MAX_MSG; i++) {
  155. if (buf[i] != i)
  156. goto err;
  157. }
  158. ret = 0;
  159. err:
  160. shutdown(sockout, SHUT_RDWR);
  161. shutdown(sockfd, SHUT_RDWR);
  162. close(sockout);
  163. close(sockfd);
  164. return ret;
  165. }
  166. static int recv_uring(struct recv_data *rd)
  167. {
  168. int buf[MAX_MSG];
  169. struct iovec iov = {
  170. .iov_base = buf,
  171. .iov_len = sizeof(buf),
  172. };
  173. struct io_uring_params p = { };
  174. struct io_uring ring;
  175. int ret, sock = -1, sockout = -1;
  176. ret = t_create_ring_params(1, &ring, &p);
  177. if (ret == T_SETUP_SKIP) {
  178. pthread_mutex_unlock(&rd->mutex);
  179. ret = 0;
  180. goto err;
  181. } else if (ret < 0) {
  182. pthread_mutex_unlock(&rd->mutex);
  183. goto err;
  184. }
  185. sock = recv_prep(&ring, &iov, &sockout, rd);
  186. if (ret) {
  187. fprintf(stderr, "recv_prep failed: %d\n", ret);
  188. goto err;
  189. }
  190. ret = do_recv(&ring);
  191. if (!ret) {
  192. int i;
  193. for (i = 0; i < MAX_MSG; i++) {
  194. if (buf[i] != i) {
  195. fprintf(stderr, "found %d at %d\n", buf[i], i);
  196. ret = 1;
  197. break;
  198. }
  199. }
  200. }
  201. shutdown(sockout, SHUT_RDWR);
  202. shutdown(sock, SHUT_RDWR);
  203. close(sock);
  204. close(sockout);
  205. io_uring_queue_exit(&ring);
  206. err:
  207. if (sock != -1) {
  208. shutdown(sock, SHUT_RDWR);
  209. close(sock);
  210. }
  211. if (sockout != -1) {
  212. shutdown(sockout, SHUT_RDWR);
  213. close(sockout);
  214. }
  215. return ret;
  216. }
  217. static void *recv_fn(void *data)
  218. {
  219. struct recv_data *rd = data;
  220. if (rd->use_sync)
  221. return (void *) (uintptr_t) recv_sync(rd);
  222. return (void *) (uintptr_t) recv_uring(rd);
  223. }
  224. static int do_send(struct recv_data *rd)
  225. {
  226. struct sockaddr_in saddr;
  227. struct io_uring ring;
  228. struct io_uring_cqe *cqe;
  229. struct io_uring_sqe *sqe;
  230. int sockfd, ret, i;
  231. struct iovec iov;
  232. int *buf;
  233. ret = io_uring_queue_init(2, &ring, 0);
  234. if (ret) {
  235. fprintf(stderr, "queue init failed: %d\n", ret);
  236. return 1;
  237. }
  238. buf = malloc(MAX_MSG * sizeof(int));
  239. for (i = 0; i < MAX_MSG; i++)
  240. buf[i] = i;
  241. sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
  242. if (sockfd < 0) {
  243. perror("socket");
  244. return 1;
  245. }
  246. pthread_mutex_lock(&rd->mutex);
  247. assert(rd->port != 0);
  248. memset(&saddr, 0, sizeof(saddr));
  249. saddr.sin_family = AF_INET;
  250. saddr.sin_port = rd->port;
  251. inet_pton(AF_INET, "127.0.0.1", &saddr.sin_addr);
  252. ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
  253. if (ret < 0) {
  254. perror("connect");
  255. return 1;
  256. }
  257. iov.iov_base = buf;
  258. iov.iov_len = MAX_MSG * sizeof(int) / 2;
  259. for (i = 0; i < 2; i++) {
  260. sqe = io_uring_get_sqe(&ring);
  261. io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
  262. sqe->user_data = 1;
  263. ret = io_uring_submit(&ring);
  264. if (ret <= 0) {
  265. fprintf(stderr, "submit failed: %d\n", ret);
  266. goto err;
  267. }
  268. usleep(10000);
  269. iov.iov_base += iov.iov_len;
  270. }
  271. for (i = 0; i < 2; i++) {
  272. ret = io_uring_wait_cqe(&ring, &cqe);
  273. if (cqe->res == -EINVAL) {
  274. fprintf(stdout, "send not supported, skipping\n");
  275. close(sockfd);
  276. free(buf);
  277. return 0;
  278. }
  279. if (cqe->res != iov.iov_len) {
  280. fprintf(stderr, "failed cqe: %d\n", cqe->res);
  281. goto err;
  282. }
  283. io_uring_cqe_seen(&ring, cqe);
  284. }
  285. shutdown(sockfd, SHUT_RDWR);
  286. close(sockfd);
  287. free(buf);
  288. return 0;
  289. err:
  290. shutdown(sockfd, SHUT_RDWR);
  291. close(sockfd);
  292. free(buf);
  293. return 1;
  294. }
  295. static int test(int use_recvmsg, int use_sync)
  296. {
  297. pthread_mutexattr_t attr;
  298. pthread_t recv_thread;
  299. struct recv_data rd;
  300. int ret;
  301. void *retval;
  302. pthread_mutexattr_init(&attr);
  303. pthread_mutexattr_setpshared(&attr, 1);
  304. pthread_mutex_init(&rd.mutex, &attr);
  305. pthread_mutex_lock(&rd.mutex);
  306. rd.use_recvmsg = use_recvmsg;
  307. rd.use_sync = use_sync;
  308. rd.port = 0;
  309. ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
  310. if (ret) {
  311. fprintf(stderr, "Thread create failed: %d\n", ret);
  312. pthread_mutex_unlock(&rd.mutex);
  313. return 1;
  314. }
  315. do_send(&rd);
  316. pthread_join(recv_thread, &retval);
  317. return (intptr_t)retval;
  318. }
  319. int main(int argc, char *argv[])
  320. {
  321. int ret;
  322. if (argc > 1)
  323. return 0;
  324. ret = test(0, 0);
  325. if (ret) {
  326. fprintf(stderr, "test recv failed\n");
  327. return ret;
  328. }
  329. ret = test(1, 0);
  330. if (ret) {
  331. fprintf(stderr, "test recvmsg failed\n");
  332. return ret;
  333. }
  334. ret = test(0, 1);
  335. if (ret) {
  336. fprintf(stderr, "test sync recv failed\n");
  337. return ret;
  338. }
  339. ret = test(1, 1);
  340. if (ret) {
  341. fprintf(stderr, "test sync recvmsg failed\n");
  342. return ret;
  343. }
  344. return 0;
  345. }