| /* SPDX-License-Identifier: MIT */ |
| /* |
| * Description: test multishot read (IORING_OP_READ_MULTISHOT) on pipes, |
| * using ring provided buffers |
| * |
| */ |
| #include <errno.h> |
| #include <stdio.h> |
| #include <unistd.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <fcntl.h> |
| |
| #include "liburing.h" |
| #include "helpers.h" |
| |
| #define BUF_SIZE 32 |
| #define BUF_SIZE_FIRST 17 |
| #define NR_BUFS 64 |
| #define BUF_BGID 1 |
| |
| #define BR_MASK (NR_BUFS - 1) |
| |
| #define NR_OVERFLOW (NR_BUFS / 4) |
| |
| static int no_buf_ring, no_read_mshot, no_buf_ring_inc; |
| |
| static void arm_read(struct io_uring *ring, int fd, int use_mshot) |
| { |
| struct io_uring_sqe *sqe; |
| |
| sqe = io_uring_get_sqe(ring); |
| if (use_mshot) { |
| io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID); |
| } else { |
| io_uring_prep_read(sqe, fd, NULL, 0, 0); |
| sqe->flags = IOSQE_BUFFER_SELECT; |
| sqe->buf_group = BUF_BGID; |
| } |
| |
| io_uring_submit(ring); |
| } |
| |
| static int test_inc(int use_mshot, int flags) |
| { |
| struct io_uring_buf_ring *br; |
| struct io_uring_params p = { }; |
| struct io_uring_cqe *cqe; |
| struct io_uring ring; |
| int nbytes = 65536; |
| int ret, fds[2], i; |
| char tmp[31]; |
| char *buf; |
| void *ptr; |
| int bid = -1; |
| int bid_bytes; |
| |
| if (no_buf_ring) |
| return 0; |
| |
| p.flags = flags; |
| ret = io_uring_queue_init_params(64, &ring, &p); |
| if (ret) { |
| fprintf(stderr, "ring setup failed: %d\n", ret); |
| return 1; |
| } |
| |
| if (pipe(fds) < 0) { |
| perror("pipe"); |
| return 1; |
| } |
| |
| if (posix_memalign((void **) &buf, 4096, 65536)) |
| return 1; |
| |
| br = io_uring_setup_buf_ring(&ring, 32, BUF_BGID, IOU_PBUF_RING_INC, &ret); |
| if (!br) { |
| if (ret == -EINVAL) { |
| no_buf_ring_inc = 1; |
| free(buf); |
| return 0; |
| } |
| fprintf(stderr, "Buffer ring register failed %d\n", ret); |
| return 1; |
| } |
| |
| ptr = buf; |
| buf = ptr + 65536 - 2048; |
| for (i = 0; i < 32; i++) { |
| io_uring_buf_ring_add(br, buf, 2048, i, 31, i); |
| buf -= 2048; |
| } |
| io_uring_buf_ring_advance(br, 32); |
| |
| memset(tmp, 0x5a, sizeof(tmp)); |
| |
| arm_read(&ring, fds[0], use_mshot); |
| |
| bid_bytes = 0; |
| do { |
| int write_size = sizeof(tmp); |
| |
| if (write_size > nbytes) |
| write_size = nbytes; |
| |
| io_uring_get_events(&ring); |
| ret = io_uring_peek_cqe(&ring, &cqe); |
| if (!ret) { |
| int this_bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT; |
| if (bid == -1) { |
| bid = this_bid; |
| } else if (bid != this_bid) { |
| if (bid_bytes != 2048) { |
| fprintf(stderr, "unexpected bid bytes %d\n", |
| bid_bytes); |
| return 1; |
| } |
| bid = this_bid; |
| bid_bytes = 0; |
| } |
| bid_bytes += cqe->res; |
| nbytes -= cqe->res; |
| if (!(cqe->flags & IORING_CQE_F_MORE)) |
| arm_read(&ring, fds[0], use_mshot); |
| io_uring_cqe_seen(&ring, cqe); |
| if (!nbytes) |
| break; |
| } |
| usleep(1000); |
| ret = write(fds[1], tmp, write_size); |
| if (ret < 0) { |
| perror("write"); |
| return 1; |
| } else if (ret != write_size) { |
| printf("short write %d\n", ret); |
| return 1; |
| } |
| } while (nbytes); |
| |
| if (bid_bytes) { |
| if (bid_bytes != 2048) { |
| fprintf(stderr, "unexpected bid bytes %d\n", bid_bytes); |
| return 1; |
| } |
| } |
| |
| io_uring_free_buf_ring(&ring, br, 32, BUF_BGID); |
| io_uring_queue_exit(&ring); |
| free(ptr); |
| close(fds[0]); |
| close(fds[1]); |
| return 0; |
| } |
| |
| static int test_clamp(void) |
| { |
| struct io_uring_buf_ring *br; |
| struct io_uring_params p = { }; |
| struct io_uring_sqe *sqe; |
| struct io_uring_cqe *cqe; |
| struct io_uring ring; |
| int ret, fds[2], i; |
| char tmp[32]; |
| char *buf; |
| void *ptr; |
| |
| ret = io_uring_queue_init_params(4, &ring, &p); |
| if (ret) { |
| fprintf(stderr, "ring setup failed: %d\n", ret); |
| return 1; |
| } |
| |
| if (pipe(fds) < 0) { |
| perror("pipe"); |
| return 1; |
| } |
| |
| if (posix_memalign((void **) &buf, 4096, NR_BUFS * BUF_SIZE)) |
| return 1; |
| |
| br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, 0, &ret); |
| if (!br) { |
| if (ret == -EINVAL) { |
| no_buf_ring = 1; |
| return 0; |
| } |
| fprintf(stderr, "Buffer ring register failed %d\n", ret); |
| return 1; |
| } |
| |
| ptr = buf; |
| io_uring_buf_ring_add(br, buf, 16, 1, BR_MASK, 0); |
| buf += 16; |
| io_uring_buf_ring_add(br, buf, 32, 2, BR_MASK, 1); |
| buf += 32; |
| io_uring_buf_ring_add(br, buf, 32, 3, BR_MASK, 2); |
| buf += 32; |
| io_uring_buf_ring_add(br, buf, 32, 4, BR_MASK, 3); |
| buf += 32; |
| io_uring_buf_ring_advance(br, 4); |
| |
| memset(tmp, 0xaa, sizeof(tmp)); |
| |
| sqe = io_uring_get_sqe(&ring); |
| io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID); |
| |
| ret = io_uring_submit(&ring); |
| if (ret != 1) { |
| fprintf(stderr, "submit: %d\n", ret); |
| return 1; |
| } |
| |
| /* prevent pipe buffer merging */ |
| usleep(1000); |
| ret = write(fds[1], tmp, 16); |
| |
| usleep(1000); |
| ret = write(fds[1], tmp, sizeof(tmp)); |
| |
| /* prevent pipe buffer merging */ |
| usleep(1000); |
| ret = write(fds[1], tmp, 16); |
| |
| usleep(1000); |
| ret = write(fds[1], tmp, sizeof(tmp)); |
| |
| /* |
| * We should see a 16 byte completion, then a 32 byte, then a 16 byte, |
| * and finally a 32 byte again. |
| */ |
| for (i = 0; i < 4; i++) { |
| ret = io_uring_wait_cqe(&ring, &cqe); |
| if (ret) { |
| fprintf(stderr, "wait cqe failed %d\n", ret); |
| return 1; |
| } |
| if (cqe->res < 0) { |
| fprintf(stderr, "cqe res: %d\n", cqe->res); |
| return 1; |
| } |
| if (!(cqe->flags & IORING_CQE_F_MORE)) { |
| fprintf(stderr, "no more cqes\n"); |
| return 1; |
| } |
| if (i == 0 || i == 2) { |
| if (cqe->res != 16) { |
| fprintf(stderr, "%d cqe got %d\n", i, cqe->res); |
| return 1; |
| } |
| } else if (i == 1 || i == 3) { |
| if (cqe->res != 32) { |
| fprintf(stderr, "%d cqe got %d\n", i, cqe->res); |
| return 1; |
| } |
| } |
| io_uring_cqe_seen(&ring, cqe); |
| } |
| |
| io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID); |
| io_uring_queue_exit(&ring); |
| free(ptr); |
| return 0; |
| } |
| |
| static int test(int first_good, int async, int overflow, int incremental) |
| { |
| struct io_uring_buf_ring *br; |
| struct io_uring_params p = { }; |
| struct io_uring_sqe *sqe; |
| struct io_uring_cqe *cqe; |
| struct io_uring ring; |
| int ret, fds[2], i, start_msg = 0; |
| int br_flags = 0; |
| char tmp[32]; |
| void *ptr[NR_BUFS]; |
| char *inc_index; |
| |
| p.flags = IORING_SETUP_CQSIZE; |
| if (!overflow) |
| p.cq_entries = NR_BUFS + 1; |
| else |
| p.cq_entries = NR_OVERFLOW; |
| ret = io_uring_queue_init_params(1, &ring, &p); |
| if (ret) { |
| fprintf(stderr, "ring setup failed: %d\n", ret); |
| return 1; |
| } |
| |
| if (incremental) { |
| if (no_buf_ring_inc) |
| return 0; |
| br_flags |= IOU_PBUF_RING_INC; |
| } |
| |
| br = io_uring_setup_buf_ring(&ring, NR_BUFS, BUF_BGID, br_flags, &ret); |
| if (!br) { |
| if (ret == -EINVAL) { |
| if (incremental) { |
| no_buf_ring_inc = 1; |
| return 0; |
| } |
| no_buf_ring = 1; |
| return 0; |
| } |
| fprintf(stderr, "Buffer ring register failed %d\n", ret); |
| return 1; |
| } |
| |
| if (pipe(fds) < 0) { |
| perror("pipe"); |
| return 1; |
| } |
| |
| if (!incremental) { |
| for (i = 0; i < NR_BUFS; i++) { |
| unsigned size = i <= 1 ? BUF_SIZE_FIRST : BUF_SIZE; |
| ptr[i] = malloc(size); |
| if (!ptr[i]) |
| return 1; |
| io_uring_buf_ring_add(br, ptr[i], size, i + 1, BR_MASK, i); |
| } |
| inc_index = NULL; |
| io_uring_buf_ring_advance(br, NR_BUFS); |
| } else { |
| inc_index = ptr[0] = malloc(NR_BUFS * BUF_SIZE); |
| memset(inc_index, 0, NR_BUFS * BUF_SIZE); |
| io_uring_buf_ring_add(br, ptr[0], NR_BUFS * BUF_SIZE, 1, BR_MASK, 0); |
| io_uring_buf_ring_advance(br, 1); |
| } |
| |
| if (first_good) { |
| sprintf(tmp, "this is buffer %d\n", start_msg++); |
| ret = write(fds[1], tmp, strlen(tmp)); |
| } |
| |
| sqe = io_uring_get_sqe(&ring); |
| /* len == 0 means just use the defined provided buffer length */ |
| io_uring_prep_read_multishot(sqe, fds[0], 0, 0, BUF_BGID); |
| if (async) |
| sqe->flags |= IOSQE_ASYNC; |
| |
| ret = io_uring_submit(&ring); |
| if (ret != 1) { |
| fprintf(stderr, "submit: %d\n", ret); |
| return 1; |
| } |
| |
| /* write NR_BUFS + 1, or if first_good is set, NR_BUFS */ |
| for (i = 0; i < NR_BUFS + !first_good; i++) { |
| /* prevent pipe buffer merging */ |
| usleep(1000); |
| sprintf(tmp, "this is buffer %d\n", i + start_msg); |
| ret = write(fds[1], tmp, strlen(tmp)); |
| if (ret != strlen(tmp)) { |
| fprintf(stderr, "write ret %d\n", ret); |
| return 1; |
| } |
| } |
| |
| for (i = 0; i < NR_BUFS + 1; i++) { |
| int bid; |
| |
| ret = io_uring_wait_cqe(&ring, &cqe); |
| if (ret) { |
| fprintf(stderr, "wait cqe failed %d\n", ret); |
| return 1; |
| } |
| if (cqe->res < 0) { |
| /* expected failure as we try to read one too many */ |
| if (cqe->res == -ENOBUFS && i == NR_BUFS) |
| break; |
| if (!i && cqe->res == -EINVAL) { |
| no_read_mshot = 1; |
| break; |
| } |
| fprintf(stderr, "%d: cqe res %d\n", i, cqe->res); |
| return 1; |
| } else if (i > 9 && cqe->res <= 17) { |
| fprintf(stderr, "truncated message %d %d\n", i, cqe->res); |
| return 1; |
| } |
| |
| if (!(cqe->flags & IORING_CQE_F_BUFFER)) { |
| fprintf(stderr, "no buffer selected\n"); |
| return 1; |
| } |
| bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT; |
| if (incremental && bid != 1) { |
| fprintf(stderr, "bid %d for incremental\n", bid); |
| return 1; |
| } |
| if (incremental && !first_good) { |
| char out_buf[64]; |
| sprintf(out_buf, "this is buffer %d\n", i + start_msg); |
| if (strncmp(inc_index, out_buf, strlen(out_buf))) |
| return 1; |
| inc_index += cqe->res; |
| } |
| if (!(cqe->flags & IORING_CQE_F_MORE)) { |
| /* we expect this on overflow */ |
| if (overflow && i >= NR_OVERFLOW) |
| break; |
| fprintf(stderr, "no more cqes\n"); |
| return 1; |
| } |
| /* should've overflown! */ |
| if (overflow && i > NR_OVERFLOW) { |
| fprintf(stderr, "Expected overflow!\n"); |
| return 1; |
| } |
| io_uring_cqe_seen(&ring, cqe); |
| } |
| |
| |
| io_uring_free_buf_ring(&ring, br, NR_BUFS, BUF_BGID); |
| io_uring_queue_exit(&ring); |
| if (incremental) { |
| free(ptr[0]); |
| } else { |
| for (i = 0; i < NR_BUFS; i++) |
| free(ptr[i]); |
| } |
| return 0; |
| } |
| |
| static int test_invalid(int async) |
| { |
| struct io_uring_buf_ring *br; |
| struct io_uring_params p = { }; |
| struct io_uring_sqe *sqe; |
| struct io_uring_cqe *cqe; |
| struct io_uring ring; |
| char fname[32] = ".mshot.%d.XXXXXX"; |
| int ret, fd; |
| char *buf; |
| |
| p.flags = IORING_SETUP_CQSIZE; |
| p.cq_entries = NR_BUFS; |
| ret = io_uring_queue_init_params(1, &ring, &p); |
| if (ret) { |
| fprintf(stderr, "ring setup failed: %d\n", ret); |
| return 1; |
| } |
| |
| fd = mkstemp(fname); |
| if (fd < 0) { |
| perror("mkstemp"); |
| return 1; |
| } |
| unlink(fname); |
| |
| if (posix_memalign((void **) &buf, 4096, BUF_SIZE)) |
| return 1; |
| |
| br = io_uring_setup_buf_ring(&ring, 1, BUF_BGID, 0, &ret); |
| if (!br) { |
| fprintf(stderr, "Buffer ring register failed %d\n", ret); |
| return 1; |
| } |
| |
| io_uring_buf_ring_add(br, buf, BUF_SIZE, 1, BR_MASK, 0); |
| io_uring_buf_ring_advance(br, 1); |
| |
| sqe = io_uring_get_sqe(&ring); |
| /* len == 0 means just use the defined provided buffer length */ |
| io_uring_prep_read_multishot(sqe, fd, 0, 0, BUF_BGID); |
| if (async) |
| sqe->flags |= IOSQE_ASYNC; |
| |
| ret = io_uring_submit(&ring); |
| if (ret != 1) { |
| fprintf(stderr, "submit: %d\n", ret); |
| return 1; |
| } |
| |
| ret = io_uring_wait_cqe(&ring, &cqe); |
| if (ret) { |
| fprintf(stderr, "wait cqe failed %d\n", ret); |
| return 1; |
| } |
| if (cqe->flags & IORING_CQE_F_MORE) { |
| fprintf(stderr, "MORE flag set unexpected %d\n", cqe->flags); |
| return 1; |
| } |
| if (cqe->res != -EBADFD) { |
| fprintf(stderr, "Got cqe res %d, wanted -EBADFD\n", cqe->res); |
| return 1; |
| } |
| |
| io_uring_cqe_seen(&ring, cqe); |
| io_uring_free_buf_ring(&ring, br, 1, BUF_BGID); |
| io_uring_queue_exit(&ring); |
| free(buf); |
| return 0; |
| } |
| |
| int main(int argc, char *argv[]) |
| { |
| int ret; |
| |
| if (argc > 1) |
| return T_EXIT_SKIP; |
| |
| ret = test(0, 0, 0, 0); |
| if (ret) { |
| fprintf(stderr, "test 0 0 0 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| if (no_buf_ring || no_read_mshot) { |
| printf("skip\n"); |
| return T_EXIT_SKIP; |
| } |
| |
| ret = test(0, 1, 0, 0); |
| if (ret) { |
| fprintf(stderr, "test 0 1 0, failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 0, 0, 0); |
| if (ret) { |
| fprintf(stderr, "test 1 0 0 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(0, 0, 1, 0); |
| if (ret) { |
| fprintf(stderr, "test 0 0 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(0, 1, 1, 0); |
| if (ret) { |
| fprintf(stderr, "test 0 1 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 0, 1, 0); |
| if (ret) { |
| fprintf(stderr, "test 1 0 1, failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 0, 1, 0); |
| if (ret) { |
| fprintf(stderr, "test 1 0 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 1, 1, 0); |
| if (ret) { |
| fprintf(stderr, "test 1 1 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(0, 0, 0, 1); |
| if (ret) { |
| fprintf(stderr, "test 0 0 0 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(0, 0, 1, 1); |
| if (ret) { |
| fprintf(stderr, "test 0 0 1 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(0, 1, 0, 1); |
| if (ret) { |
| fprintf(stderr, "test 0 1 0 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(0, 1, 1, 1); |
| if (ret) { |
| fprintf(stderr, "test 0 1 1 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 0, 0, 1); |
| if (ret) { |
| fprintf(stderr, "test 1 0 0 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 0, 1, 1); |
| if (ret) { |
| fprintf(stderr, "test 1 0 1 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 1, 0, 1); |
| if (ret) { |
| fprintf(stderr, "test 1 1 0 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test(1, 1, 1, 1); |
| if (ret) { |
| fprintf(stderr, "test 1 1 1 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_invalid(0); |
| if (ret) { |
| fprintf(stderr, "test_invalid 0 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_invalid(1); |
| if (ret) { |
| fprintf(stderr, "test_invalid 1 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_clamp(); |
| if (ret) { |
| fprintf(stderr, "test_clamp failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_inc(0, 0); |
| if (ret) { |
| fprintf(stderr, "test_inc 0 0 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_inc(0, IORING_SETUP_SQPOLL); |
| if (ret) { |
| fprintf(stderr, "test_inc 0 sqpoll failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_inc(0, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN); |
| if (ret) { |
| fprintf(stderr, "test_inc 0 defer failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_inc(1, 0); |
| if (ret) { |
| fprintf(stderr, "test_inc 1 0 failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_inc(1, IORING_SETUP_SQPOLL); |
| if (ret) { |
| fprintf(stderr, "test_inc 1 sqpoll failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| ret = test_inc(1, IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN); |
| if (ret) { |
| fprintf(stderr, "test_inc 1 defer failed\n"); |
| return T_EXIT_FAIL; |
| } |
| |
| return T_EXIT_PASS; |
| } |