send_recvmsg.c 9.0 KB


  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Simple test case showing using sendmsg and recvmsg through io_uring
  5. */
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #include <unistd.h>
  10. #include <errno.h>
  11. #include <arpa/inet.h>
  12. #include <sys/types.h>
  13. #include <sys/socket.h>
  14. #include <pthread.h>
  15. #include <assert.h>
  16. #include "liburing.h"
  17. static char str[] = "This is a test of sendmsg and recvmsg over io_uring!";
  18. static int ud;
  19. #define MAX_MSG 128
  20. #define PORT 10203
  21. #define HOST "127.0.0.1"
  22. #define BUF_BGID 10
  23. #define BUF_BID 89
  24. #define MAX_IOV_COUNT 10
  25. static int no_pbuf_ring;
  26. static int recv_prep(struct io_uring *ring, int *sockfd, struct iovec iov[],
  27. int iov_count, int bgid, int async)
  28. {
  29. struct sockaddr_in saddr;
  30. struct msghdr msg;
  31. struct io_uring_sqe *sqe;
  32. int ret, val = 1;
  33. memset(&saddr, 0, sizeof(saddr));
  34. saddr.sin_family = AF_INET;
  35. saddr.sin_addr.s_addr = htonl(INADDR_ANY);
  36. saddr.sin_port = htons(PORT);
  37. *sockfd = socket(AF_INET, SOCK_DGRAM, 0);
  38. if (*sockfd < 0) {
  39. perror("socket");
  40. return 1;
  41. }
  42. val = 1;
  43. setsockopt(*sockfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
  44. setsockopt(*sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
  45. ret = bind(*sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
  46. if (ret < 0) {
  47. perror("bind");
  48. goto err;
  49. }
  50. sqe = io_uring_get_sqe(ring);
  51. if (!sqe) {
  52. fprintf(stderr, "io_uring_get_sqe failed\n");
  53. return 1;
  54. }
  55. io_uring_prep_recvmsg(sqe, *sockfd, &msg, 0);
  56. if (bgid) {
  57. iov->iov_base = NULL;
  58. sqe->flags |= IOSQE_BUFFER_SELECT;
  59. sqe->buf_group = bgid;
  60. iov_count = 1;
  61. }
  62. sqe->user_data = ++ud;
  63. if (async)
  64. sqe->flags |= IOSQE_ASYNC;
  65. memset(&msg, 0, sizeof(msg));
  66. msg.msg_namelen = sizeof(struct sockaddr_in);
  67. msg.msg_iov = iov;
  68. msg.msg_iovlen = iov_count;
  69. ret = io_uring_submit(ring);
  70. if (ret <= 0) {
  71. fprintf(stderr, "submit failed: %d\n", ret);
  72. goto err;
  73. }
  74. return 0;
  75. err:
  76. close(*sockfd);
  77. return 1;
  78. }
  79. struct recv_data {
  80. pthread_mutex_t *mutex;
  81. int buf_select;
  82. int buf_ring;
  83. int no_buf_add;
  84. int iov_count;
  85. int async;
  86. };
  87. static int do_recvmsg(struct io_uring *ring, char buf[MAX_MSG + 1],
  88. struct recv_data *rd)
  89. {
  90. struct io_uring_cqe *cqe;
  91. int ret;
  92. ret = io_uring_wait_cqe(ring, &cqe);
  93. if (ret) {
  94. fprintf(stdout, "wait_cqe: %d\n", ret);
  95. goto err;
  96. }
  97. if (cqe->res < 0) {
  98. if (rd->no_buf_add && (rd->buf_select || rd->buf_ring))
  99. return 0;
  100. fprintf(stderr, "%s: failed cqe: %d\n", __FUNCTION__, cqe->res);
  101. goto err;
  102. }
  103. if (cqe->flags & IORING_CQE_F_BUFFER) {
  104. int bid = cqe->flags >> 16;
  105. if (bid != BUF_BID)
  106. fprintf(stderr, "Buffer ID mismatch %d\n", bid);
  107. }
  108. if (rd->no_buf_add && (rd->buf_ring || rd->buf_select)) {
  109. fprintf(stderr, "Expected -ENOBUFS: %d\n", cqe->res);
  110. goto err;
  111. }
  112. if (cqe->res -1 != strlen(str)) {
  113. fprintf(stderr, "got wrong length: %d/%d\n", cqe->res,
  114. (int) strlen(str) + 1);
  115. goto err;
  116. }
  117. if (strncmp(str, buf, MAX_MSG + 1)) {
  118. fprintf(stderr, "string mismatch\n");
  119. goto err;
  120. }
  121. return 0;
  122. err:
  123. return 1;
  124. }
  125. static void init_iov(struct iovec iov[MAX_IOV_COUNT], int iov_to_use,
  126. char buf[MAX_MSG + 1])
  127. {
  128. int i, last_idx = iov_to_use - 1;
  129. assert(0 < iov_to_use && iov_to_use <= MAX_IOV_COUNT);
  130. for (i = 0; i < last_idx; ++i) {
  131. iov[i].iov_base = buf + i;
  132. iov[i].iov_len = 1;
  133. }
  134. iov[last_idx].iov_base = buf + last_idx;
  135. iov[last_idx].iov_len = MAX_MSG - last_idx;
  136. }
  137. static void *recv_fn(void *data)
  138. {
  139. struct recv_data *rd = data;
  140. pthread_mutex_t *mutex = rd->mutex;
  141. struct io_uring_buf_ring *br = NULL;
  142. char buf[MAX_MSG + 1];
  143. struct iovec iov[MAX_IOV_COUNT];
  144. struct io_uring ring;
  145. int ret, sockfd;
  146. if (rd->buf_ring && no_pbuf_ring)
  147. goto out_no_ring;
  148. init_iov(iov, rd->iov_count, buf);
  149. ret = io_uring_queue_init(1, &ring, 0);
  150. if (ret) {
  151. fprintf(stderr, "queue init failed: %d\n", ret);
  152. goto err;
  153. }
  154. if ((rd->buf_ring || rd->buf_select) && !rd->no_buf_add) {
  155. if (rd->buf_ring) {
  156. br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret);
  157. if (!br) {
  158. no_pbuf_ring = 1;
  159. goto out;
  160. }
  161. io_uring_buf_ring_add(br, buf, sizeof(buf), BUF_BID,
  162. io_uring_buf_ring_mask(1), 0);
  163. io_uring_buf_ring_advance(br, 1);
  164. } else {
  165. struct io_uring_sqe *sqe;
  166. struct io_uring_cqe *cqe;
  167. sqe = io_uring_get_sqe(&ring);
  168. io_uring_prep_provide_buffers(sqe, buf, sizeof(buf) -1,
  169. 1, BUF_BGID, BUF_BID);
  170. sqe->user_data = ++ud;
  171. ret = io_uring_submit(&ring);
  172. if (ret != 1) {
  173. fprintf(stderr, "submit ret=%d\n", ret);
  174. goto err;
  175. }
  176. ret = io_uring_wait_cqe(&ring, &cqe);
  177. if (ret) {
  178. fprintf(stderr, "wait_cqe=%d\n", ret);
  179. goto err;
  180. }
  181. ret = cqe->res;
  182. io_uring_cqe_seen(&ring, cqe);
  183. if (ret == -EINVAL) {
  184. fprintf(stdout, "PROVIDE_BUFFERS not supported, skip\n");
  185. goto out;
  186. } else if (ret < 0) {
  187. fprintf(stderr, "PROVIDER_BUFFERS %d\n", ret);
  188. goto err;
  189. }
  190. }
  191. }
  192. ret = recv_prep(&ring, &sockfd, iov, rd->iov_count,
  193. (rd->buf_ring || rd->buf_select) ? BUF_BGID : 0,
  194. rd->async);
  195. if (ret) {
  196. fprintf(stderr, "recv_prep failed: %d\n", ret);
  197. goto err;
  198. }
  199. pthread_mutex_unlock(mutex);
  200. ret = do_recvmsg(&ring, buf, rd);
  201. close(sockfd);
  202. if (br)
  203. io_uring_free_buf_ring(&ring, br, 1, BUF_BGID);
  204. io_uring_queue_exit(&ring);
  205. err:
  206. return (void *)(intptr_t)ret;
  207. out:
  208. io_uring_queue_exit(&ring);
  209. out_no_ring:
  210. pthread_mutex_unlock(mutex);
  211. if (br)
  212. io_uring_free_buf_ring(&ring, br, 1, BUF_BGID);
  213. return NULL;
  214. }
  215. static int do_sendmsg(void)
  216. {
  217. struct sockaddr_in saddr;
  218. struct iovec iov = {
  219. .iov_base = str,
  220. .iov_len = sizeof(str),
  221. };
  222. struct msghdr msg;
  223. struct io_uring ring;
  224. struct io_uring_cqe *cqe;
  225. struct io_uring_sqe *sqe;
  226. int sockfd, ret;
  227. ret = io_uring_queue_init(1, &ring, 0);
  228. if (ret) {
  229. fprintf(stderr, "queue init failed: %d\n", ret);
  230. return 1;
  231. }
  232. memset(&saddr, 0, sizeof(saddr));
  233. saddr.sin_family = AF_INET;
  234. saddr.sin_port = htons(PORT);
  235. inet_pton(AF_INET, HOST, &saddr.sin_addr);
  236. memset(&msg, 0, sizeof(msg));
  237. msg.msg_name = &saddr;
  238. msg.msg_namelen = sizeof(struct sockaddr_in);
  239. msg.msg_iov = &iov;
  240. msg.msg_iovlen = 1;
  241. sockfd = socket(AF_INET, SOCK_DGRAM, 0);
  242. if (sockfd < 0) {
  243. perror("socket");
  244. return 1;
  245. }
  246. usleep(10000);
  247. sqe = io_uring_get_sqe(&ring);
  248. io_uring_prep_sendmsg(sqe, sockfd, &msg, 0);
  249. sqe->user_data = ++ud;
  250. ret = io_uring_submit(&ring);
  251. if (ret <= 0) {
  252. fprintf(stderr, "submit failed: %d\n", ret);
  253. goto err;
  254. }
  255. ret = io_uring_wait_cqe(&ring, &cqe);
  256. if (cqe->res < 0) {
  257. fprintf(stderr, "%s: failed cqe: %d\n", __FUNCTION__, cqe->res);
  258. goto err;
  259. }
  260. close(sockfd);
  261. return 0;
  262. err:
  263. close(sockfd);
  264. return 1;
  265. }
  266. static int test(int buf_select, int buf_ring, int no_buf_add, int iov_count,
  267. int async)
  268. {
  269. struct recv_data rd;
  270. pthread_mutexattr_t attr;
  271. pthread_t recv_thread;
  272. pthread_mutex_t mutex;
  273. int ret;
  274. void *retval;
  275. if (buf_select || buf_ring)
  276. assert(iov_count == 1);
  277. pthread_mutexattr_init(&attr);
  278. pthread_mutexattr_setpshared(&attr, 1);
  279. pthread_mutex_init(&mutex, &attr);
  280. pthread_mutex_lock(&mutex);
  281. rd.mutex = &mutex;
  282. rd.buf_select = buf_select;
  283. rd.buf_ring = buf_ring;
  284. rd.no_buf_add = no_buf_add;
  285. rd.iov_count = iov_count;
  286. rd.async = async;
  287. ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
  288. if (ret) {
  289. pthread_mutex_unlock(&mutex);
  290. fprintf(stderr, "Thread create failed\n");
  291. return 1;
  292. }
  293. pthread_mutex_lock(&mutex);
  294. do_sendmsg();
  295. pthread_join(recv_thread, &retval);
  296. ret = (intptr_t)retval;
  297. return ret;
  298. }
  299. int main(int argc, char *argv[])
  300. {
  301. int ret;
  302. if (argc > 1)
  303. return 0;
  304. ret = test(0, 0, 0, 1, 0);
  305. if (ret) {
  306. fprintf(stderr, "send_recvmsg 0 0 0 1 0 failed\n");
  307. return 1;
  308. }
  309. ret = test(0, 0, 0, 10, 0);
  310. if (ret) {
  311. fprintf(stderr, "send_recvmsg multi iov failed\n");
  312. return 1;
  313. }
  314. ret = test(1, 0, 0, 1, 0);
  315. if (ret) {
  316. fprintf(stderr, "send_recvmsg 1 0 0 1 0 failed\n");
  317. return 1;
  318. }
  319. ret = test(1, 0, 1, 1, 0);
  320. if (ret) {
  321. fprintf(stderr, "send_recvmsg 1 0 1 1 0 failed\n");
  322. return 1;
  323. }
  324. ret = test(0, 1, 0, 1, 0);
  325. if (ret) {
  326. fprintf(stderr, "send_recvmsg 0 1 0 1 0 failed\n");
  327. return 1;
  328. }
  329. ret = test(1, 1, 0, 1, 0);
  330. if (ret) {
  331. fprintf(stderr, "send_recvmsg 1 1 0 1 0 failed\n");
  332. return 1;
  333. }
  334. ret = test(1, 1, 1, 1, 0);
  335. if (ret) {
  336. fprintf(stderr, "send_recvmsg 1 1 1 1 0 failed\n");
  337. return 1;
  338. }
  339. ret = test(0, 0, 0, 1, 1);
  340. if (ret) {
  341. fprintf(stderr, "send_recvmsg async 0 0 0 1 1 failed\n");
  342. return 1;
  343. }
  344. ret = test(0, 0, 0, 10, 1);
  345. if (ret) {
  346. fprintf(stderr, "send_recvmsg async multi iov failed\n");
  347. return 1;
  348. }
  349. ret = test(1, 0, 0, 1, 1);
  350. if (ret) {
  351. fprintf(stderr, "send_recvmsg async 1 0 0 1 1 failed\n");
  352. return 1;
  353. }
  354. ret = test(1, 0, 1, 1, 1);
  355. if (ret) {
  356. fprintf(stderr, "send_recvmsg async 1 0 1 1 1 failed\n");
  357. return 1;
  358. }
  359. ret = test(0, 1, 0, 1, 1);
  360. if (ret) {
  361. fprintf(stderr, "send_recvmsg async 0 1 0 1 1 failed\n");
  362. return 1;
  363. }
  364. ret = test(1, 1, 0, 1, 1);
  365. if (ret) {
  366. fprintf(stderr, "send_recvmsg async 1 1 0 1 1 failed\n");
  367. return 1;
  368. }
  369. ret = test(1, 1, 1, 1, 1);
  370. if (ret) {
  371. fprintf(stderr, "send_recvmsg async 1 1 1 1 1 failed\n");
  372. return 1;
  373. }
  374. return 0;
  375. }