multicqes_drain.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. #include "../config-host.h"
  2. /* SPDX-License-Identifier: MIT */
  3. /*
  4. * Description: generic tests for io_uring drain io
  5. *
  6. * The main idea is to randomly generate different type of sqe to
  7. * challenge the drain logic. There are some restrictions for the
  8. * generated sqes, details in io_uring maillist:
  9. * https://lore.kernel.org/io-uring/39a49b4c-27c2-1035-b250-51daeccaab9b@linux.alibaba.com/
  10. *
  11. */
  12. #include <errno.h>
  13. #include <stdio.h>
  14. #include <unistd.h>
  15. #include <stdlib.h>
  16. #include <string.h>
  17. #include <time.h>
  18. #include <poll.h>
  19. #include "liburing.h"
  20. #include "helpers.h"
  21. enum {
  22. multi,
  23. single,
  24. nop,
  25. cancel,
  26. op_last,
  27. };
  28. struct sqe_info {
  29. __u8 op;
  30. unsigned flags;
  31. };
  32. #define max_entry 50
  33. /*
  34. * sqe_flags: combination of sqe flags
  35. * multi_sqes: record the user_data/index of all the multishot sqes
  36. * cnt: how many entries there are in multi_sqes
  37. * we can leverage multi_sqes array for cancellation: we randomly pick
  38. * up an entry in multi_sqes when form a cancellation sqe.
  39. * multi_cap: limitation of number of multishot sqes
  40. */
  41. static const unsigned sqe_flags[4] = {
  42. 0,
  43. IOSQE_IO_LINK,
  44. IOSQE_IO_DRAIN,
  45. IOSQE_IO_LINK | IOSQE_IO_DRAIN
  46. };
  47. static int multi_sqes[max_entry], cnt = 0;
  48. static int multi_cap = max_entry / 5;
  49. static int write_pipe(int pipe, char *str)
  50. {
  51. int ret;
  52. do {
  53. errno = 0;
  54. ret = write(pipe, str, 3);
  55. } while (ret == -1 && errno == EINTR);
  56. return ret;
  57. }
  58. static void read_pipe(int pipe)
  59. {
  60. char str[4] = {0};
  61. int ret;
  62. ret = read(pipe, &str, 3);
  63. if (ret < 0)
  64. perror("read");
  65. }
  66. static int trigger_event(struct io_uring *ring, int p[])
  67. {
  68. int ret;
  69. if ((ret = write_pipe(p[1], "foo")) != 3) {
  70. fprintf(stderr, "bad write return %d\n", ret);
  71. return 1;
  72. }
  73. usleep(1000);
  74. io_uring_get_events(ring);
  75. read_pipe(p[0]);
  76. return 0;
  77. }
  78. static void io_uring_sqe_prep(int op, struct io_uring_sqe *sqe,
  79. unsigned sqe_flags, int arg)
  80. {
  81. switch (op) {
  82. case multi:
  83. io_uring_prep_poll_add(sqe, arg, POLLIN);
  84. sqe->len |= IORING_POLL_ADD_MULTI;
  85. break;
  86. case single:
  87. io_uring_prep_poll_add(sqe, arg, POLLIN);
  88. break;
  89. case nop:
  90. io_uring_prep_nop(sqe);
  91. break;
  92. case cancel:
  93. io_uring_prep_poll_remove(sqe, arg);
  94. break;
  95. }
  96. sqe->flags = sqe_flags;
  97. }
  98. static __u8 generate_flags(int sqe_op)
  99. {
  100. __u8 flags = 0;
  101. /*
  102. * drain sqe must be put after multishot sqes cancelled
  103. */
  104. do {
  105. flags = sqe_flags[rand() % 4];
  106. } while ((flags & IOSQE_IO_DRAIN) && cnt);
  107. /*
  108. * cancel req cannot have drain or link flag
  109. */
  110. if (sqe_op == cancel) {
  111. flags &= ~(IOSQE_IO_DRAIN | IOSQE_IO_LINK);
  112. }
  113. /*
  114. * avoid below case:
  115. * sqe0(multishot, link)->sqe1(nop, link)->sqe2(nop)->sqe3(cancel_sqe0)
  116. * sqe3 may execute before sqe0 so that sqe0 isn't cancelled
  117. */
  118. if (sqe_op == multi)
  119. flags &= ~IOSQE_IO_LINK;
  120. return flags;
  121. }
  122. /*
  123. * function to generate opcode of a sqe
  124. * several restrictions here:
  125. * - cancel all the previous multishot sqes as soon as possible when
  126. * we reach high watermark.
  127. * - ensure there is some multishot sqe when generating a cancel sqe
  128. * - ensure a cancel/multshot sqe is not in a linkchain
  129. * - ensure number of multishot sqes doesn't exceed multi_cap
  130. * - don't generate multishot sqes after high watermark
  131. */
  132. static int generate_opcode(int i, int pre_flags)
  133. {
  134. int sqe_op;
  135. int high_watermark = max_entry - max_entry / 5;
  136. bool retry0 = false, retry1 = false, retry2 = false;
  137. if ((i >= high_watermark) && cnt) {
  138. sqe_op = cancel;
  139. } else {
  140. do {
  141. sqe_op = rand() % op_last;
  142. retry0 = (sqe_op == cancel) && (!cnt || (pre_flags & IOSQE_IO_LINK));
  143. retry1 = (sqe_op == multi) && ((multi_cap - 1 < 0) || i >= high_watermark);
  144. retry2 = (sqe_op == multi) && (pre_flags & IOSQE_IO_LINK);
  145. } while (retry0 || retry1 || retry2);
  146. }
  147. if (sqe_op == multi)
  148. multi_cap--;
  149. return sqe_op;
  150. }
  151. static inline void add_multishot_sqe(int index)
  152. {
  153. multi_sqes[cnt++] = index;
  154. }
  155. static int remove_multishot_sqe(void)
  156. {
  157. int ret;
  158. int rem_index = rand() % cnt;
  159. ret = multi_sqes[rem_index];
  160. multi_sqes[rem_index] = multi_sqes[cnt - 1];
  161. cnt--;
  162. return ret;
  163. }
  164. static int test_generic_drain(struct io_uring *ring)
  165. {
  166. struct io_uring_cqe *cqe;
  167. struct io_uring_sqe *sqe[max_entry];
  168. struct sqe_info si[max_entry];
  169. int cqe_data[max_entry << 1], cqe_res[max_entry << 1];
  170. int i, j, ret, arg = 0;
  171. int pipes[max_entry][2];
  172. int pre_flags = 0;
  173. for (i = 0; i < max_entry; i++) {
  174. if (pipe(pipes[i]) != 0) {
  175. perror("pipe");
  176. return 1;
  177. }
  178. }
  179. srand((unsigned)time(NULL));
  180. for (i = 0; i < max_entry; i++) {
  181. sqe[i] = io_uring_get_sqe(ring);
  182. if (!sqe[i]) {
  183. printf("get sqe failed\n");
  184. goto err;
  185. }
  186. int sqe_op = generate_opcode(i, pre_flags);
  187. __u8 flags = generate_flags(sqe_op);
  188. if (sqe_op == cancel)
  189. arg = remove_multishot_sqe();
  190. if (sqe_op == multi || sqe_op == single)
  191. arg = pipes[i][0];
  192. io_uring_sqe_prep(sqe_op, sqe[i], flags, arg);
  193. sqe[i]->user_data = i;
  194. si[i].op = sqe_op;
  195. si[i].flags = flags;
  196. pre_flags = flags;
  197. if (sqe_op == multi)
  198. add_multishot_sqe(i);
  199. }
  200. ret = io_uring_submit(ring);
  201. if (ret < 0) {
  202. printf("sqe submit failed\n");
  203. goto err;
  204. } else if (ret < max_entry) {
  205. printf("Submitted only %d\n", ret);
  206. goto err;
  207. }
  208. sleep(1);
  209. // TODO: randomize event triggerring order
  210. for (i = 0; i < max_entry; i++) {
  211. if (si[i].op != multi && si[i].op != single)
  212. continue;
  213. if (trigger_event(ring, pipes[i]))
  214. goto err;
  215. }
  216. sleep(1);
  217. i = 0;
  218. while (!io_uring_peek_cqe(ring, &cqe)) {
  219. cqe_data[i] = cqe->user_data;
  220. cqe_res[i++] = cqe->res;
  221. io_uring_cqe_seen(ring, cqe);
  222. }
  223. /*
  224. * compl_bits is a bit map to record completions.
  225. * eg. sqe[0], sqe[1], sqe[2] fully completed
  226. * then compl_bits is 000...00111b
  227. *
  228. */
  229. unsigned long long compl_bits = 0;
  230. for (j = 0; j < i; j++) {
  231. int index = cqe_data[j];
  232. if ((si[index].flags & IOSQE_IO_DRAIN) && index) {
  233. if ((~compl_bits) & ((1ULL << index) - 1)) {
  234. printf("drain failed\n");
  235. goto err;
  236. }
  237. }
  238. /*
  239. * for multishot sqes, record them only when it is cancelled
  240. */
  241. if ((si[index].op != multi) || (cqe_res[j] == -ECANCELED))
  242. compl_bits |= (1ULL << index);
  243. }
  244. return 0;
  245. err:
  246. return 1;
  247. }
  248. static int test_simple_drain(struct io_uring *ring)
  249. {
  250. struct io_uring_cqe *cqe;
  251. struct io_uring_sqe *sqe[2];
  252. int i, ret;
  253. int pipe1[2], pipe2[2];
  254. if (pipe(pipe1) != 0 || pipe(pipe2) != 0) {
  255. perror("pipe");
  256. return 1;
  257. }
  258. for (i = 0; i < 2; i++) {
  259. sqe[i] = io_uring_get_sqe(ring);
  260. if (!sqe[i]) {
  261. printf("get sqe failed\n");
  262. goto err;
  263. }
  264. }
  265. io_uring_prep_poll_multishot(sqe[0], pipe1[0], POLLIN);
  266. sqe[0]->user_data = 0;
  267. io_uring_prep_poll_add(sqe[1], pipe2[0], POLLIN);
  268. sqe[1]->user_data = 1;
  269. /* This test relies on multishot poll to trigger events continually.
  270. * however with IORING_SETUP_DEFER_TASKRUN this will only happen when
  271. * triggered with a get_events. Hence we sprinkle get_events whenever
  272. * there might be work to process in order to get the same result
  273. */
  274. ret = io_uring_submit_and_get_events(ring);
  275. if (ret < 0) {
  276. printf("sqe submit failed\n");
  277. goto err;
  278. } else if (ret < 2) {
  279. printf("Submitted only %d\n", ret);
  280. goto err;
  281. }
  282. for (i = 0; i < 2; i++) {
  283. if (trigger_event(ring, pipe1))
  284. goto err;
  285. }
  286. if (trigger_event(ring, pipe2))
  287. goto err;
  288. for (i = 0; i < 2; i++) {
  289. sqe[i] = io_uring_get_sqe(ring);
  290. if (!sqe[i]) {
  291. printf("get sqe failed\n");
  292. goto err;
  293. }
  294. }
  295. io_uring_prep_poll_remove(sqe[0], 0);
  296. sqe[0]->user_data = 2;
  297. io_uring_prep_nop(sqe[1]);
  298. sqe[1]->flags |= IOSQE_IO_DRAIN;
  299. sqe[1]->user_data = 3;
  300. ret = io_uring_submit(ring);
  301. if (ret < 0) {
  302. printf("sqe submit failed\n");
  303. goto err;
  304. } else if (ret < 2) {
  305. printf("Submitted only %d\n", ret);
  306. goto err;
  307. }
  308. for (i = 0; i < 6; i++) {
  309. ret = io_uring_wait_cqe(ring, &cqe);
  310. if (ret < 0) {
  311. printf("wait completion %d\n", ret);
  312. goto err;
  313. }
  314. if ((i == 5) && (cqe->user_data != 3))
  315. goto err;
  316. io_uring_cqe_seen(ring, cqe);
  317. }
  318. close(pipe1[0]);
  319. close(pipe1[1]);
  320. close(pipe2[0]);
  321. close(pipe2[1]);
  322. return 0;
  323. err:
  324. return 1;
  325. }
  326. static int test(bool defer_taskrun)
  327. {
  328. struct io_uring ring;
  329. int i, ret;
  330. unsigned int flags = 0;
  331. if (defer_taskrun)
  332. flags = IORING_SETUP_SINGLE_ISSUER |
  333. IORING_SETUP_DEFER_TASKRUN;
  334. ret = io_uring_queue_init(1024, &ring, flags);
  335. if (ret) {
  336. printf("ring setup failed\n");
  337. return T_EXIT_FAIL;
  338. }
  339. for (i = 0; i < 5; i++) {
  340. ret = test_simple_drain(&ring);
  341. if (ret) {
  342. fprintf(stderr, "test_simple_drain failed\n");
  343. return T_EXIT_FAIL;
  344. }
  345. }
  346. for (i = 0; i < 5; i++) {
  347. ret = test_generic_drain(&ring);
  348. if (ret) {
  349. fprintf(stderr, "test_generic_drain failed\n");
  350. return T_EXIT_FAIL;
  351. }
  352. }
  353. io_uring_queue_exit(&ring);
  354. return T_EXIT_PASS;
  355. }
  356. int main(int argc, char *argv[])
  357. {
  358. int ret;
  359. if (argc > 1)
  360. return T_EXIT_SKIP;
  361. ret = test(false);
  362. if (ret != T_EXIT_PASS) {
  363. fprintf(stderr, "%s: test(false) failed\n", argv[0]);
  364. return ret;
  365. }
  366. if (t_probe_defer_taskrun()) {
  367. ret = test(true);
  368. if (ret != T_EXIT_PASS) {
  369. fprintf(stderr, "%s: test(true) failed\n", argv[0]);
  370. return ret;
  371. }
  372. }
  373. return ret;
  374. }