123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- #include "../config-host.h"
- // SPDX-License-Identifier: MIT
- #include <errno.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <arpa/inet.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <pthread.h>
- #include <assert.h>
- #include "liburing.h"
- #include "helpers.h"
- #define ENORECVMULTISHOT 9999
- enum early_error_t {
- ERROR_NONE = 0,
- ERROR_NOT_ENOUGH_BUFFERS,
- ERROR_EARLY_CLOSE_SENDER,
- ERROR_EARLY_CLOSE_RECEIVER,
- ERROR_EARLY_OVERFLOW,
- ERROR_EARLY_LAST
- };
- struct args {
- bool stream;
- bool wait_each;
- bool recvmsg;
- enum early_error_t early_error;
- bool defer;
- };
- static int check_sockaddr(struct sockaddr_in *in)
- {
- struct in_addr expected;
- inet_pton(AF_INET, "127.0.0.1", &expected);
- if (in->sin_family != AF_INET) {
- fprintf(stderr, "bad family %d\n", (int)htons(in->sin_family));
- return -1;
- }
- if (memcmp(&expected, &in->sin_addr, sizeof(in->sin_addr))) {
- char buff[256];
- const char *addr = inet_ntop(AF_INET, &in->sin_addr, buff, sizeof(buff));
- fprintf(stderr, "unexpected address %s\n", addr ? addr : "INVALID");
- return -1;
- }
- return 0;
- }
- static int test(struct args *args)
- {
- int const N = 8;
- int const N_BUFFS = N * 64;
- int const N_CQE_OVERFLOW = 4;
- int const min_cqes = args->early_error ? 2 : 8;
- int const NAME_LEN = sizeof(struct sockaddr_storage);
- int const CONTROL_LEN = CMSG_ALIGN(sizeof(struct sockaddr_storage))
- + sizeof(struct cmsghdr);
- struct io_uring ring;
- struct io_uring_cqe *cqe;
- struct io_uring_sqe *sqe;
- int fds[2], ret, i, j;
- int total_sent_bytes = 0, total_recv_bytes = 0, total_dropped_bytes = 0;
- int send_buff[256];
- int *sent_buffs[N_BUFFS];
- int *recv_buffs[N_BUFFS];
- int *at;
- struct io_uring_cqe recv_cqe[N_BUFFS];
- int recv_cqes = 0;
- bool early_error = false;
- bool early_error_started = false;
- struct __kernel_timespec timeout = {
- .tv_sec = 1,
- };
- struct msghdr msg;
- struct io_uring_params params = { };
- int n_sqe = 32;
- memset(recv_buffs, 0, sizeof(recv_buffs));
- if (args->defer)
- params.flags |= IORING_SETUP_SINGLE_ISSUER |
- IORING_SETUP_DEFER_TASKRUN;
- if (args->early_error == ERROR_EARLY_OVERFLOW) {
- params.flags |= IORING_SETUP_CQSIZE;
- params.cq_entries = N_CQE_OVERFLOW;
- n_sqe = N_CQE_OVERFLOW;
- }
- ret = io_uring_queue_init_params(n_sqe, &ring, ¶ms);
- if (ret) {
- fprintf(stderr, "queue init failed: %d\n", ret);
- return ret;
- }
- ret = t_create_socket_pair(fds, args->stream);
- if (ret) {
- fprintf(stderr, "t_create_socket_pair failed: %d\n", ret);
- return ret;
- }
- if (!args->stream) {
- bool val = true;
- /* force some cmsgs to come back to us */
- ret = setsockopt(fds[0], IPPROTO_IP, IP_RECVORIGDSTADDR, &val,
- sizeof(val));
- if (ret) {
- fprintf(stderr, "setsockopt failed %d\n", errno);
- goto cleanup;
- }
- }
- for (i = 0; i < ARRAY_SIZE(send_buff); i++)
- send_buff[i] = i;
- for (i = 0; i < ARRAY_SIZE(recv_buffs); i++) {
- /* prepare some different sized buffers */
- int buffer_size = (i % 2 == 0 && (args->stream || args->recvmsg)) ? 1 : N;
- buffer_size *= sizeof(int);
- if (args->recvmsg) {
- buffer_size +=
- sizeof(struct io_uring_recvmsg_out) +
- NAME_LEN +
- CONTROL_LEN;
- }
- recv_buffs[i] = malloc(buffer_size);
- if (i > 2 && args->early_error == ERROR_NOT_ENOUGH_BUFFERS)
- continue;
- sqe = io_uring_get_sqe(&ring);
- io_uring_prep_provide_buffers(sqe, recv_buffs[i],
- buffer_size, 1, 7, i);
- io_uring_sqe_set_data64(sqe, 0x999);
- memset(recv_buffs[i], 0xcc, buffer_size);
- if (io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, NULL) < 0) {
- fprintf(stderr, "provide buffers failed: %d\n", ret);
- ret = -1;
- goto cleanup;
- }
- io_uring_cqe_seen(&ring, cqe);
- }
- sqe = io_uring_get_sqe(&ring);
- if (args->recvmsg) {
- unsigned int flags = 0;
- if (!args->stream)
- flags |= MSG_TRUNC;
- memset(&msg, 0, sizeof(msg));
- msg.msg_namelen = NAME_LEN;
- msg.msg_controllen = CONTROL_LEN;
- io_uring_prep_recvmsg_multishot(sqe, fds[0], &msg, flags);
- } else {
- io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
- }
- sqe->flags |= IOSQE_BUFFER_SELECT;
- sqe->buf_group = 7;
- io_uring_sqe_set_data64(sqe, 1234);
- io_uring_submit(&ring);
- at = &send_buff[0];
- total_sent_bytes = 0;
- for (i = 0; i < N; i++) {
- int to_send = sizeof(*at) * (i+1);
- total_sent_bytes += to_send;
- sent_buffs[i] = at;
- if (send(fds[1], at, to_send, 0) != to_send) {
- if (early_error_started)
- break;
- fprintf(stderr, "send failed %d\n", errno);
- ret = -1;
- goto cleanup;
- }
- if (i == 2) {
- if (args->early_error == ERROR_EARLY_CLOSE_RECEIVER) {
- /* allow previous sends to complete */
- usleep(1000);
- io_uring_get_events(&ring);
- sqe = io_uring_get_sqe(&ring);
- io_uring_prep_recv(sqe, fds[0], NULL, 0, 0);
- io_uring_prep_cancel64(sqe, 1234, 0);
- io_uring_sqe_set_data64(sqe, 0x888);
- sqe->flags |= IOSQE_CQE_SKIP_SUCCESS;
- io_uring_submit(&ring);
- early_error_started = true;
- /* allow the cancel to complete */
- usleep(1000);
- io_uring_get_events(&ring);
- }
- if (args->early_error == ERROR_EARLY_CLOSE_SENDER) {
- early_error_started = true;
- shutdown(fds[1], SHUT_RDWR);
- close(fds[1]);
- }
- }
- at += (i+1);
- if (args->wait_each) {
- ret = io_uring_wait_cqes(&ring, &cqe, 1, &timeout, NULL);
- if (ret) {
- fprintf(stderr, "wait_each failed: %d\n", ret);
- ret = -1;
- goto cleanup;
- }
- while (io_uring_peek_cqe(&ring, &cqe) == 0) {
- recv_cqe[recv_cqes++] = *cqe;
- if (cqe->flags & IORING_CQE_F_MORE) {
- io_uring_cqe_seen(&ring, cqe);
- } else {
- early_error = true;
- io_uring_cqe_seen(&ring, cqe);
- }
- }
- if (early_error)
- break;
- }
- }
- close(fds[1]);
- /* allow sends to finish */
- usleep(1000);
- if ((args->stream && !early_error) || recv_cqes < min_cqes) {
- unsigned int to_wait = 1;
- if (recv_cqes < min_cqes)
- to_wait = min_cqes - recv_cqes;
- ret = io_uring_wait_cqes(&ring, &cqe, to_wait, &timeout, NULL);
- if (ret && ret != -ETIME) {
- fprintf(stderr, "wait final failed: %d\n", ret);
- ret = -1;
- goto cleanup;
- }
- }
- while (io_uring_peek_cqe(&ring, &cqe) == 0) {
- recv_cqe[recv_cqes++] = *cqe;
- io_uring_cqe_seen(&ring, cqe);
- }
- ret = -1;
- at = &send_buff[0];
- if (recv_cqes < min_cqes) {
- if (recv_cqes > 0 && recv_cqe[0].res == -EINVAL) {
- return -ENORECVMULTISHOT;
- }
- /* some kernels apparently don't check ->ioprio, skip */
- ret = -ENORECVMULTISHOT;
- goto cleanup;
- }
- for (i = 0; i < recv_cqes; i++) {
- cqe = &recv_cqe[i];
- bool const is_last = i == recv_cqes - 1;
- /*
- * Older kernels could terminate multishot early due to overflow,
- * but later ones will not. So discriminate based on the MORE flag.
- */
- bool const early_last = args->early_error == ERROR_EARLY_OVERFLOW &&
- !args->wait_each &&
- i >= N_CQE_OVERFLOW &&
- !(cqe->flags & IORING_CQE_F_MORE);
- bool const should_be_last =
- (cqe->res <= 0) ||
- (args->stream && is_last) ||
- early_last;
- int *this_recv;
- int orig_payload_size = cqe->res;
- if (should_be_last) {
- int used_res = cqe->res;
- if (!is_last) {
- fprintf(stderr, "not last cqe had error %d\n", i);
- goto cleanup;
- }
- switch (args->early_error) {
- case ERROR_NOT_ENOUGH_BUFFERS:
- if (cqe->res != -ENOBUFS) {
- fprintf(stderr,
- "ERROR_NOT_ENOUGH_BUFFERS: res %d\n", cqe->res);
- goto cleanup;
- }
- break;
- case ERROR_EARLY_OVERFLOW:
- if (cqe->res < 0) {
- fprintf(stderr,
- "ERROR_EARLY_OVERFLOW: res %d\n", cqe->res);
- goto cleanup;
- }
- break;
- case ERROR_EARLY_CLOSE_RECEIVER:
- if (cqe->res != -ECANCELED) {
- fprintf(stderr,
- "ERROR_EARLY_CLOSE_RECEIVER: res %d\n", cqe->res);
- goto cleanup;
- }
- break;
- case ERROR_NONE:
- case ERROR_EARLY_CLOSE_SENDER:
- if (args->recvmsg && (cqe->flags & IORING_CQE_F_BUFFER)) {
- void *buff = recv_buffs[cqe->flags >> 16];
- struct io_uring_recvmsg_out *o =
- io_uring_recvmsg_validate(buff, cqe->res, &msg);
- if (!o) {
- fprintf(stderr, "invalid buff\n");
- goto cleanup;
- }
- if (o->payloadlen != 0) {
- fprintf(stderr, "expected 0 payloadlen, got %u\n",
- o->payloadlen);
- goto cleanup;
- }
- used_res = 0;
- } else if (cqe->res != 0) {
- fprintf(stderr, "early error: res %d\n", cqe->res);
- goto cleanup;
- }
- break;
- case ERROR_EARLY_LAST:
- fprintf(stderr, "bad error_early\n");
- goto cleanup;
- }
- if (cqe->res <= 0 && cqe->flags & IORING_CQE_F_BUFFER) {
- fprintf(stderr, "final BUFFER flag set\n");
- goto cleanup;
- }
- if (cqe->flags & IORING_CQE_F_MORE) {
- fprintf(stderr, "final MORE flag set\n");
- goto cleanup;
- }
- if (used_res <= 0)
- continue;
- } else {
- if (!(cqe->flags & IORING_CQE_F_MORE)) {
- fprintf(stderr, "MORE flag not set\n");
- goto cleanup;
- }
- }
- if (!(cqe->flags & IORING_CQE_F_BUFFER)) {
- fprintf(stderr, "BUFFER flag not set\n");
- goto cleanup;
- }
- this_recv = recv_buffs[cqe->flags >> 16];
- if (args->recvmsg) {
- struct io_uring_recvmsg_out *o = io_uring_recvmsg_validate(
- this_recv, cqe->res, &msg);
- if (!o) {
- fprintf(stderr, "bad recvmsg\n");
- goto cleanup;
- }
- orig_payload_size = o->payloadlen;
- if (!args->stream) {
- orig_payload_size = o->payloadlen;
- struct cmsghdr *cmsg;
- if (o->namelen < sizeof(struct sockaddr_in)) {
- fprintf(stderr, "bad addr len %d",
- o->namelen);
- goto cleanup;
- }
- if (check_sockaddr((struct sockaddr_in *)io_uring_recvmsg_name(o)))
- goto cleanup;
- cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
- if (!cmsg ||
- cmsg->cmsg_level != IPPROTO_IP ||
- cmsg->cmsg_type != IP_RECVORIGDSTADDR) {
- fprintf(stderr, "bad cmsg");
- goto cleanup;
- }
- if (check_sockaddr((struct sockaddr_in *)CMSG_DATA(cmsg)))
- goto cleanup;
- cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
- if (cmsg) {
- fprintf(stderr, "unexpected extra cmsg\n");
- goto cleanup;
- }
- }
- this_recv = (int *)io_uring_recvmsg_payload(o, &msg);
- cqe->res = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
- if (o->payloadlen != cqe->res) {
- if (!(o->flags & MSG_TRUNC)) {
- fprintf(stderr, "expected truncated flag\n");
- goto cleanup;
- }
- total_dropped_bytes += (o->payloadlen - cqe->res);
- }
- }
- total_recv_bytes += cqe->res;
- if (cqe->res % 4 != 0) {
- /*
- * doesn't seem to happen in practice, would need some
- * work to remove this requirement
- */
- fprintf(stderr, "unexpectedly aligned buffer cqe->res=%d\n", cqe->res);
- goto cleanup;
- }
- /*
- * for tcp: check buffer arrived in order
- * for udp: based on size validate data based on size
- */
- if (!args->stream) {
- int sent_idx = orig_payload_size / sizeof(*at) - 1;
- if (sent_idx < 0 || sent_idx > N) {
- fprintf(stderr, "Bad sent idx: %d\n", sent_idx);
- goto cleanup;
- }
- at = sent_buffs[sent_idx];
- }
- for (j = 0; j < cqe->res / 4; j++) {
- int sent = *at++;
- int recv = *this_recv++;
- if (sent != recv) {
- fprintf(stderr, "recv=%d sent=%d\n", recv, sent);
- goto cleanup;
- }
- }
- }
- if (args->early_error == ERROR_NONE &&
- total_recv_bytes + total_dropped_bytes < total_sent_bytes) {
- fprintf(stderr,
- "missing recv: recv=%d dropped=%d sent=%d\n",
- total_recv_bytes, total_sent_bytes, total_dropped_bytes);
- goto cleanup;
- }
- ret = 0;
- cleanup:
- for (i = 0; i < ARRAY_SIZE(recv_buffs); i++)
- free(recv_buffs[i]);
- close(fds[0]);
- close(fds[1]);
- io_uring_queue_exit(&ring);
- return ret;
- }
- static int test_enobuf(void)
- {
- struct io_uring ring;
- struct io_uring_sqe *sqe;
- struct io_uring_cqe *cqes[16];
- char buffs[256];
- int ret, i, fds[2];
- if (t_create_ring(8, &ring, 0) != T_SETUP_OK) {
- fprintf(stderr, "ring create\n");
- return -1;
- }
- ret = t_create_socket_pair(fds, false);
- if (ret) {
- fprintf(stderr, "t_create_socket_pair\n");
- return ret;
- }
- sqe = io_uring_get_sqe(&ring);
- assert(sqe);
- /* deliberately only 2 provided buffers */
- io_uring_prep_provide_buffers(sqe, &buffs[0], 1, 2, 0, 0);
- io_uring_sqe_set_data64(sqe, 0);
- sqe = io_uring_get_sqe(&ring);
- assert(sqe);
- io_uring_prep_recv_multishot(sqe, fds[0], NULL, 0, 0);
- io_uring_sqe_set_data64(sqe, 1);
- sqe->buf_group = 0;
- sqe->flags |= IOSQE_BUFFER_SELECT;
- ret = io_uring_submit(&ring);
- if (ret != 2) {
- fprintf(stderr, "bad submit %d\n", ret);
- return -1;
- }
- for (i = 0; i < 3; i++) {
- do {
- ret = write(fds[1], "?", 1);
- } while (ret == -1 && errno == EINTR);
- }
- ret = io_uring_wait_cqes(&ring, &cqes[0], 4, NULL, NULL);
- if (ret) {
- fprintf(stderr, "wait cqes\n");
- return ret;
- }
- ret = io_uring_peek_batch_cqe(&ring, &cqes[0], 4);
- if (ret != 4) {
- fprintf(stderr, "peek batch cqes\n");
- return -1;
- }
- /* provide buffers */
- assert(cqes[0]->user_data == 0);
- assert(cqes[0]->res == 0);
- /* valid recv */
- assert(cqes[1]->user_data == 1);
- assert(cqes[2]->user_data == 1);
- assert(cqes[1]->res == 1);
- assert(cqes[2]->res == 1);
- assert(cqes[1]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE));
- assert(cqes[2]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE));
- /* missing buffer */
- assert(cqes[3]->user_data == 1);
- assert(cqes[3]->res == -ENOBUFS);
- assert(!(cqes[3]->flags & (IORING_CQE_F_BUFFER | IORING_CQE_F_MORE)));
- close(fds[0]);
- close(fds[1]);
- io_uring_queue_exit(&ring);
- return 0;
- }
- int main(int argc, char *argv[])
- {
- int ret;
- int loop;
- int early_error = 0;
- bool has_defer;
- if (argc > 1)
- return T_EXIT_SKIP;
- has_defer = t_probe_defer_taskrun();
- for (loop = 0; loop < 16; loop++) {
- struct args a = {
- .stream = loop & 0x01,
- .wait_each = loop & 0x2,
- .recvmsg = loop & 0x04,
- .defer = loop & 0x08,
- };
- if (a.defer && !has_defer)
- continue;
- for (early_error = 0; early_error < ERROR_EARLY_LAST; early_error++) {
- a.early_error = (enum early_error_t)early_error;
- ret = test(&a);
- if (ret) {
- if (ret == -ENORECVMULTISHOT) {
- if (loop == 0)
- return T_EXIT_SKIP;
- fprintf(stderr,
- "ENORECVMULTISHOT received but loop>0\n");
- }
- fprintf(stderr,
- "test stream=%d wait_each=%d recvmsg=%d early_error=%d "
- " defer=%d failed\n",
- a.stream, a.wait_each, a.recvmsg, a.early_error, a.defer);
- return T_EXIT_FAIL;
- }
- }
- }
- ret = test_enobuf();
- if (ret) {
- fprintf(stderr, "test_enobuf() failed: %d\n", ret);
- return T_EXIT_FAIL;
- }
- return T_EXIT_PASS;
- }
|