read-mshot.c 14 KB


  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Description: test multishot read (IORING_OP_READ_MULTISHOT) on pipes,
  5. * using ring provided buffers
  6. *
  7. */
  8. #include <errno.h>
  9. #include <stdio.h>
  10. #include <unistd.h>
  11. #include <stdlib.h>
  12. #include <string.h>
  13. #include <fcntl.h>
  14. #include "liburing.h"
  15. #include "helpers.h"
  16. #define BUF_SIZE 32
  17. #define BUF_SIZE_FIRST 17
  18. #define NR_BUFS 64
  19. #define BUF_BGID 1
  20. #define BR_MASK (NR_BUFS - 1)
  21. #define NR_OVERFLOW (NR_BUFS / 4)
  22. static int no_buf_ring, no_read_mshot, no_buf_ring_inc;
  23. static void arm_read(struct io_uring *ring, int fd, int use_mshot)
  24. {
  25. struct io_uring_sqe *sqe;
  26. sqe = io_uring_get_sqe(ring);
  27. if (use_mshot) {
  28. io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
  29. } else {
  30. io_uring_prep_read(sqe, fd, NULL, 0, 0);
  31. sqe->flags = IOSQE_BUFFER_SELECT;
  32. sqe->buf_group = BUF_BGID;
  33. }
  34. io_uring_submit(ring);
  35. }
  36. static int test_inc(int use_mshot, int flags)
  37. {
  38. struct io_uring_buf_ring *br;
  39. struct io_uring_params p = { };
  40. struct io_uring_cqe *cqe;
  41. struct io_uring ring;
  42. int nbytes = 65536;
  43. int ret, fds[2], i;
  44. char tmp[31];
  45. char *buf;
  46. void *ptr;
  47. int bid = -1;
  48. int bid_bytes;
  49. if (no_buf_ring)
  50. return 0;
  51. p.flags = flags;
  52. ret = io_uring_queue_init_params(64, &ring, &p);
  53. if (ret) {
  54. fprintf(stderr, "ring setup failed: %d\n", ret);
  55. return 1;
  56. }
  57. if (pipe(fds) < 0) {
  58. perror("pipe");
  59. return 1;
  60. }
  61. if (posix_memalign((void **) &buf, 4096, 65536))
  62. return 1;
  63. br = io_uring_setup_buf_ring(&ring, 32, BUF_BGID, IOU_PBUF_RING_INC, &ret);
  64. if (!br) {
  65. if (ret == -EINVAL) {
  66. no_buf_ring_inc = 1;
  67. free(buf);
  68. return 0;
  69. }
  70. fprintf(stderr, "Buffer ring register failed %d\n", ret);
  71. return 1;
  72. }
  73. ptr = buf;
  74. buf = ptr + 65536 - 2048;
  75. for (i = 0; i < 32; i++) {
  76. io_uring_buf_ring_add(br, buf, 2048, i, 31, i);
  77. buf -= 2048;
  78. }
  79. io_uring_buf_ring_advance(br, 32);
  80. memset(tmp, 0x5a, sizeof(tmp));
  81. arm_read(&ring, fds[0], use_mshot);
  82. bid_bytes = 0;
  83. do {
  84. int write_size = sizeof(tmp);
  85. if (write_size > nbytes)
  86. write_size = nbytes;
  87. io_uring_get_events(&ring);
  88. ret = io_uring_peek_cqe(&ring, &cqe);
  89. if (!ret) {
  90. int this_bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
  91. if (bid == -1) {
  92. bid = this_bid;
  93. } else if (bid != this_bid) {
  94. if (bid_bytes != 2048) {
  95. fprintf(stderr, "unexpected bid bytes %d\n",
  96. bid_bytes);
  97. return 1;
  98. }
  99. bid = this_bid;
  100. bid_bytes = 0;
  101. }
  102. bid_bytes += cqe->res;
  103. nbytes -= cqe->res;
  104. if (!(cqe->flags & IORING_CQE_F_MORE))
  105. arm_read(&ring, fds[0], use_mshot);
  106. io_uring_cqe_seen(&ring, cqe);
  107. if (!nbytes)
  108. break;
  109. }
  110. usleep(1000);
  111. ret = write(fds[1], tmp, write_size);
  112. if (ret < 0) {
  113. perror("write");
  114. return 1;
  115. } else if (ret != write_size) {
  116. printf("short write %d\n", ret);
  117. return 1;
  118. }
  119. } while (nbytes);
  120. if (bid_bytes) {
  121. if (bid_bytes != 2048) {
  122. fprintf(stderr, "unexpected bid bytes %d\n", bid_bytes);
  123. return 1;
  124. }
  125. }
  126. io_uring_free_buf_ring(&ring, br, 32, BUF_BGID);
  127. io_uring_queue_exit(&ring);
  128. free(ptr);
  129. close(fds[0]);
  130. close(fds[1]);
  131. return 0;
  132. }
  133. static int test_clamp(void)
  134. {
  135. struct io_uring_buf_ring *br;
  136. struct io_uring_params p = { };
  137. struct io_uring_sqe *sqe;
  138. struct io_uring_cqe *cqe;
  139. struct io_uring ring;
  140. int ret, fds[2], i;
  141. char tmp[32];
  142. char *buf;
  143. void *ptr;
  144. ret = io_uring_queue_init_params(4, &ring, &p);
  145. if (ret) {
  146. fprintf(stderr, "ring setup failed: %d\n", ret);
  147. return 1;
  148. }
  149. if (pipe(fds) < 0) {
  150. perror("pipe");
  151. return 1;
  152. }
  153. if (posix_memalign((void **) &buf, 4096, NR_BUFS * BUF_SIZE))
  154. return 1;
  155. br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret);
  156. if (!br) {
  157. if (ret == -EINVAL) {
  158. no_buf_ring = 1;
  159. return 0;
  160. }
  161. fprintf(stderr, "Buffer ring register failed %d\n", ret);
  162. return 1;
  163. }
  164. ptr = buf;
  165. io_uring_buf_ring_add(br, buf, 16, 1, BR_MASK, 0);
  166. buf += 16;
  167. io_uring_buf_ring_add(br, buf, 32, 2, BR_MASK, 1);
  168. buf += 32;
  169. io_uring_buf_ring_add(br, buf, 32, 3, BR_MASK, 2);
  170. buf += 32;
  171. io_uring_buf_ring_add(br, buf, 32, 4, BR_MASK, 3);
  172. buf += 32;
  173. io_uring_buf_ring_advance(br, 4);
  174. memset(tmp, 0xaa, sizeof(tmp));
  175. sqe = io_uring_get_sqe(&ring);
  176. io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
  177. ret = io_uring_submit(&ring);
  178. if (ret != 1) {
  179. fprintf(stderr, "submit: %d\n", ret);
  180. return 1;
  181. }
  182. /* prevent pipe buffer merging */
  183. usleep(1000);
  184. ret = write(fds[1], tmp, 16);
  185. usleep(1000);
  186. ret = write(fds[1], tmp, sizeof(tmp));
  187. /* prevent pipe buffer merging */
  188. usleep(1000);
  189. ret = write(fds[1], tmp, 16);
  190. usleep(1000);
  191. ret = write(fds[1], tmp, sizeof(tmp));
  192. /*
  193. * We should see a 16 byte completion, then a 32 byte, then a 16 byte,
  194. * and finally a 32 byte again.
  195. */
  196. for (i = 0; i < 4; i++) {
  197. ret = io_uring_wait_cqe(&ring, &cqe);
  198. if (ret) {
  199. fprintf(stderr, "wait cqe failed %d\n", ret);
  200. return 1;
  201. }
  202. if (cqe->res < 0) {
  203. fprintf(stderr, "cqe res: %d\n", cqe->res);
  204. return 1;
  205. }
  206. if (!(cqe->flags & IORING_CQE_F_MORE)) {
  207. fprintf(stderr, "no more cqes\n");
  208. return 1;
  209. }
  210. if (i == 0 || i == 2) {
  211. if (cqe->res != 16) {
  212. fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
  213. return 1;
  214. }
  215. } else if (i == 1 || i == 3) {
  216. if (cqe->res != 32) {
  217. fprintf(stderr, "%d cqe got %d\n", i, cqe->res);
  218. return 1;
  219. }
  220. }
  221. io_uring_cqe_seen(&ring, cqe);
  222. }
  223. io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID);
  224. io_uring_queue_exit(&ring);
  225. free(ptr);
  226. return 0;
  227. }
  228. static int test(int first_good, int async, int overflow, int incremental)
  229. {
  230. struct io_uring_buf_ring *br;
  231. struct io_uring_params p = { };
  232. struct io_uring_sqe *sqe;
  233. struct io_uring_cqe *cqe;
  234. struct io_uring ring;
  235. int ret, fds[2], i, start_msg = 0;
  236. int br_flags = 0;
  237. char tmp[32];
  238. void *ptr[NR_BUFS];
  239. char *inc_index;
  240. p.flags = IORING_SETUP_CQSIZE;
  241. if (!overflow)
  242. p.cq_entries = NR_BUFS + 1;
  243. else
  244. p.cq_entries = NR_OVERFLOW;
  245. ret = io_uring_queue_init_params(1, &ring, &p);
  246. if (ret) {
  247. fprintf(stderr, "ring setup failed: %d\n", ret);
  248. return 1;
  249. }
  250. if (incremental) {
  251. if (no_buf_ring_inc)
  252. return 0;
  253. br_flags |= IOU_PBUF_RING_INC;
  254. }
  255. br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, br_flags, &ret);
  256. if (!br) {
  257. if (ret == -EINVAL) {
  258. if (incremental) {
  259. no_buf_ring_inc = 1;
  260. return 0;
  261. }
  262. no_buf_ring = 1;
  263. return 0;
  264. }
  265. fprintf(stderr, "Buffer ring register failed %d\n", ret);
  266. return 1;
  267. }
  268. if (pipe(fds) < 0) {
  269. perror("pipe");
  270. return 1;
  271. }
  272. if (!incremental) {
  273. for (i = 0; i < NR_BUFS; i++) {
  274. unsigned size = i <= 1 ? BUF_SIZE_FIRST : BUF_SIZE;
  275. ptr[i] = malloc(size);
  276. if (!ptr[i])
  277. return 1;
  278. io_uring_buf_ring_add(br, ptr[i], size, i + 1, BR_MASK, i);
  279. }
  280. inc_index = NULL;
  281. io_uring_buf_ring_advance(br, NR_BUFS);
  282. } else {
  283. inc_index = ptr[0] = malloc(NR_BUFS * BUF_SIZE);
  284. memset(inc_index, 0, NR_BUFS * BUF_SIZE);
  285. io_uring_buf_ring_add(br, ptr[0], NR_BUFS * BUF_SIZE, 1, BR_MASK, 0);
  286. io_uring_buf_ring_advance(br, 1);
  287. }
  288. if (first_good) {
  289. sprintf(tmp, "this is buffer %d\n", start_msg++);
  290. ret = write(fds[1], tmp, strlen(tmp));
  291. }
  292. sqe = io_uring_get_sqe(&ring);
  293. /* len == 0 means just use the defined provided buffer length */
  294. io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID);
  295. if (async)
  296. sqe->flags |= IOSQE_ASYNC;
  297. ret = io_uring_submit(&ring);
  298. if (ret != 1) {
  299. fprintf(stderr, "submit: %d\n", ret);
  300. return 1;
  301. }
  302. /* write NR_BUFS + 1, or if first_good is set, NR_BUFS */
  303. for (i = 0; i < NR_BUFS + !first_good; i++) {
  304. /* prevent pipe buffer merging */
  305. usleep(1000);
  306. sprintf(tmp, "this is buffer %d\n", i + start_msg);
  307. ret = write(fds[1], tmp, strlen(tmp));
  308. if (ret != strlen(tmp)) {
  309. fprintf(stderr, "write ret %d\n", ret);
  310. return 1;
  311. }
  312. }
  313. for (i = 0; i < NR_BUFS + 1; i++) {
  314. int bid;
  315. ret = io_uring_wait_cqe(&ring, &cqe);
  316. if (ret) {
  317. fprintf(stderr, "wait cqe failed %d\n", ret);
  318. return 1;
  319. }
  320. if (cqe->res < 0) {
  321. /* expected failure as we try to read one too many */
  322. if (cqe->res == -ENOBUFS && i == NR_BUFS)
  323. break;
  324. if (!i && cqe->res == -EINVAL) {
  325. no_read_mshot = 1;
  326. break;
  327. }
  328. fprintf(stderr, "%d: cqe res %d\n", i, cqe->res);
  329. return 1;
  330. } else if (i > 9 && cqe->res <= 17) {
  331. fprintf(stderr, "truncated message %d %d\n", i, cqe->res);
  332. return 1;
  333. }
  334. if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
  335. fprintf(stderr, "no buffer selected\n");
  336. return 1;
  337. }
  338. bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
  339. if (incremental && bid != 1) {
  340. fprintf(stderr, "bid %d for incremental\n", bid);
  341. return 1;
  342. }
  343. if (incremental && !first_good) {
  344. char out_buf[64];
  345. sprintf(out_buf, "this is buffer %d\n", i + start_msg);
  346. if (strncmp(inc_index, out_buf, strlen(out_buf)))
  347. return 1;
  348. inc_index += cqe->res;
  349. }
  350. if (!(cqe->flags & IORING_CQE_F_MORE)) {
  351. /* we expect this on overflow */
  352. if (overflow && i >= NR_OVERFLOW)
  353. break;
  354. fprintf(stderr, "no more cqes\n");
  355. return 1;
  356. }
  357. /* should've overflown! */
  358. if (overflow && i > NR_OVERFLOW) {
  359. fprintf(stderr, "Expected overflow!\n");
  360. return 1;
  361. }
  362. io_uring_cqe_seen(&ring, cqe);
  363. }
  364. io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID);
  365. io_uring_queue_exit(&ring);
  366. if (incremental) {
  367. free(ptr[0]);
  368. } else {
  369. for (i = 0; i < NR_BUFS; i++)
  370. free(ptr[i]);
  371. }
  372. return 0;
  373. }
  374. static int test_invalid(int async)
  375. {
  376. struct io_uring_buf_ring *br;
  377. struct io_uring_params p = { };
  378. struct io_uring_sqe *sqe;
  379. struct io_uring_cqe *cqe;
  380. struct io_uring ring;
  381. char fname[32] = ".mshot.%d.XXXXXX";
  382. int ret, fd;
  383. char *buf;
  384. p.flags = IORING_SETUP_CQSIZE;
  385. p.cq_entries = NR_BUFS;
  386. ret = io_uring_queue_init_params(1, &ring, &p);
  387. if (ret) {
  388. fprintf(stderr, "ring setup failed: %d\n", ret);
  389. return 1;
  390. }
  391. fd = mkstemp(fname);
  392. if (fd < 0) {
  393. perror("mkstemp");
  394. return 1;
  395. }
  396. unlink(fname);
  397. if (posix_memalign((void **) &buf, 4096, BUF_SIZE))
  398. return 1;
  399. br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret);
  400. if (!br) {
  401. fprintf(stderr, "Buffer ring register failed %d\n", ret);
  402. return 1;
  403. }
  404. io_uring_buf_ring_add(br, buf, BUF_SIZE, 1, BR_MASK, 0);
  405. io_uring_buf_ring_advance(br, 1);
  406. sqe = io_uring_get_sqe(&ring);
  407. /* len == 0 means just use the defined provided buffer length */
  408. io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID);
  409. if (async)
  410. sqe->flags |= IOSQE_ASYNC;
  411. ret = io_uring_submit(&ring);
  412. if (ret != 1) {
  413. fprintf(stderr, "submit: %d\n", ret);
  414. return 1;
  415. }
  416. ret = io_uring_wait_cqe(&ring, &cqe);
  417. if (ret) {
  418. fprintf(stderr, "wait cqe failed %d\n", ret);
  419. return 1;
  420. }
  421. if (cqe->flags & IORING_CQE_F_MORE) {
  422. fprintf(stderr, "MORE flag set unexpected %d\n", cqe->flags);
  423. return 1;
  424. }
  425. if (cqe->res != -EBADFD) {
  426. fprintf(stderr, "Got cqe res %d, wanted -EBADFD\n", cqe->res);
  427. return 1;
  428. }
  429. io_uring_cqe_seen(&ring, cqe);
  430. io_uring_free_buf_ring(&ring, br, 1, BUF_BGID);
  431. io_uring_queue_exit(&ring);
  432. free(buf);
  433. return 0;
  434. }
  435. int main(int argc, char *argv[])
  436. {
  437. int ret;
  438. if (argc > 1)
  439. return T_EXIT_SKIP;
  440. ret = test(0, 0, 0, 0);
  441. if (ret) {
  442. fprintf(stderr, "test 0 0 0 failed\n");
  443. return T_EXIT_FAIL;
  444. }
  445. if (no_buf_ring || no_read_mshot) {
  446. printf("skip\n");
  447. return T_EXIT_SKIP;
  448. }
  449. ret = test(0, 1, 0, 0);
  450. if (ret) {
  451. fprintf(stderr, "test 0 1 0, failed\n");
  452. return T_EXIT_FAIL;
  453. }
  454. ret = test(1, 0, 0, 0);
  455. if (ret) {
  456. fprintf(stderr, "test 1 0 0 failed\n");
  457. return T_EXIT_FAIL;
  458. }
  459. ret = test(0, 0, 1, 0);
  460. if (ret) {
  461. fprintf(stderr, "test 0 0 1 failed\n");
  462. return T_EXIT_FAIL;
  463. }
  464. ret = test(0, 1, 1, 0);
  465. if (ret) {
  466. fprintf(stderr, "test 0 1 1 failed\n");
  467. return T_EXIT_FAIL;
  468. }
  469. ret = test(1, 0, 1, 0);
  470. if (ret) {
  471. fprintf(stderr, "test 1 0 1, failed\n");
  472. return T_EXIT_FAIL;
  473. }
  474. ret = test(1, 0, 1, 0);
  475. if (ret) {
  476. fprintf(stderr, "test 1 0 1 failed\n");
  477. return T_EXIT_FAIL;
  478. }
  479. ret = test(1, 1, 1, 0);
  480. if (ret) {
  481. fprintf(stderr, "test 1 1 1 failed\n");
  482. return T_EXIT_FAIL;
  483. }
  484. ret = test(0, 0, 0, 1);
  485. if (ret) {
  486. fprintf(stderr, "test 0 0 0 1 failed\n");
  487. return T_EXIT_FAIL;
  488. }
  489. ret = test(0, 0, 1, 1);
  490. if (ret) {
  491. fprintf(stderr, "test 0 0 1 1 failed\n");
  492. return T_EXIT_FAIL;
  493. }
  494. ret = test(0, 1, 0, 1);
  495. if (ret) {
  496. fprintf(stderr, "test 0 1 0 1 failed\n");
  497. return T_EXIT_FAIL;
  498. }
  499. ret = test(0, 1, 1, 1);
  500. if (ret) {
  501. fprintf(stderr, "test 0 1 1 1 failed\n");
  502. return T_EXIT_FAIL;
  503. }
  504. ret = test(1, 0, 0, 1);
  505. if (ret) {
  506. fprintf(stderr, "test 1 0 0 1 failed\n");
  507. return T_EXIT_FAIL;
  508. }
  509. ret = test(1, 0, 1, 1);
  510. if (ret) {
  511. fprintf(stderr, "test 1 0 1 1 failed\n");
  512. return T_EXIT_FAIL;
  513. }
  514. ret = test(1, 1, 0, 1);
  515. if (ret) {
  516. fprintf(stderr, "test 1 1 0 1 failed\n");
  517. return T_EXIT_FAIL;
  518. }
  519. ret = test(1, 1, 1, 1);
  520. if (ret) {
  521. fprintf(stderr, "test 1 1 1 1 failed\n");
  522. return T_EXIT_FAIL;
  523. }
  524. ret = test_invalid(0);
  525. if (ret) {
  526. fprintf(stderr, "test_invalid 0 failed\n");
  527. return T_EXIT_FAIL;
  528. }
  529. ret = test_invalid(1);
  530. if (ret) {
  531. fprintf(stderr, "test_invalid 1 failed\n");
  532. return T_EXIT_FAIL;
  533. }
  534. ret = test_clamp();
  535. if (ret) {
  536. fprintf(stderr, "test_clamp failed\n");
  537. return T_EXIT_FAIL;
  538. }
  539. ret = test_inc(0, 0);
  540. if (ret) {
  541. fprintf(stderr, "test_inc 0 0 failed\n");
  542. return T_EXIT_FAIL;
  543. }
  544. ret = test_inc(0, IORING_SETUP_SQPOLL);
  545. if (ret) {
  546. fprintf(stderr, "test_inc 0 sqpoll failed\n");
  547. return T_EXIT_FAIL;
  548. }
  549. ret = test_inc(0, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
  550. if (ret) {
  551. fprintf(stderr, "test_inc 0 defer failed\n");
  552. return T_EXIT_FAIL;
  553. }
  554. ret = test_inc(1, 0);
  555. if (ret) {
  556. fprintf(stderr, "test_inc 1 0 failed\n");
  557. return T_EXIT_FAIL;
  558. }
  559. ret = test_inc(1, IORING_SETUP_SQPOLL);
  560. if (ret) {
  561. fprintf(stderr, "test_inc 1 sqpoll failed\n");
  562. return T_EXIT_FAIL;
  563. }
  564. ret = test_inc(1, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN);
  565. if (ret) {
  566. fprintf(stderr, "test_inc 1 defer failed\n");
  567. return T_EXIT_FAIL;
  568. }
  569. return T_EXIT_PASS;
  570. }