test queue channel comms Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/src/include/liburing.h b/src/include/liburing.h index e3f394e..9eb5868 100644 --- a/src/include/liburing.h +++ b/src/include/liburing.h
@@ -230,6 +230,8 @@ struct io_uring_cqe **cqe_ptr, unsigned wait_nr, int reg_index) LIBURING_NOEXCEPT; +int io_uring_register_queue_chan(struct io_uring *ring, + struct io_uring_chan_reg *reg) LIBURING_NOEXCEPT; int io_uring_register_wait_reg(struct io_uring *ring, struct io_uring_reg_wait *reg, int nr) LIBURING_NOEXCEPT; @@ -1595,6 +1597,24 @@ __io_uring_set_target_fixed_file(sqe, file_index); } +/* + * Post CQE on target indicated by 'queue' with 'res' as cqe->res, user_data + * as the cqe->user_data, and 'value' as the big_cqe u64 extra1 value. + * Flags can either be 0, or IORING_CHAN_IDLE. If flags is zero, then 'queue' + * must return a valid queue id returned from io_uring_register_queue_chan. + * If flags is IORING_CHAN_IDLE, 'queue' must be zero and an idle queue will + * get picked. If no idle queues exist, the SQE will be errored on the local + * ring. + */ +IOURINGINLINE void io_uring_prep_chan_post(struct io_uring_sqe *sqe, int queue, + __u64 value, __u64 user_data, + unsigned int flags) +{ + io_uring_prep_rw(IORING_OP_CHAN_POST, sqe, queue, + (void *) (uintptr_t) user_data, 0, value); + sqe->rw_flags = flags; +} + /* Read the kernel's SQ head index with appropriate memory ordering */ IOURINGINLINE unsigned io_uring_load_sq_head(const struct io_uring *ring) LIBURING_NOEXCEPT
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h index 212b587..dd05771 100644 --- a/src/include/liburing/io_uring.h +++ b/src/include/liburing/io_uring.h
@@ -269,6 +269,7 @@ IORING_OP_READV_FIXED, IORING_OP_WRITEV_FIXED, IORING_OP_PIPE, + IORING_OP_CHAN_POST, /* this goes last, obviously */ IORING_OP_LAST, @@ -431,6 +432,10 @@ */ #define IORING_NOP_INJECT_RESULT (1U << 0) +/* find and send to idle target. if not set, sqe->fd is the target */ +#define IORING_CHAN_POST_IDLE 0x1 + + /* * IO completion data structure (Completion Queue Entry) */ @@ -641,6 +646,8 @@ IORING_REGISTER_MEM_REGION = 34, + IORING_REGISTER_QUEUE_CHAN = 35, + /* this goes last */ IORING_REGISTER_LAST, @@ -686,6 +693,17 @@ __u64 __resv[2]; }; +enum { + IORING_CHAN_REG_BIDI = 0x1, +}; + +struct io_uring_chan_reg { + __u32 flags; + __u32 dst_fd; + __u32 nentries; + __u32 resv[7]; +}; + /* * Register a fully sparse file space, rather than pass in an array of all * -1 file descriptors.
diff --git a/src/liburing-ffi.map b/src/liburing-ffi.map index 004b91a..bc86fdb 100644 --- a/src/liburing-ffi.map +++ b/src/liburing-ffi.map
@@ -249,4 +249,6 @@ global: io_uring_prep_pipe; io_uring_prep_pipe_direct; + io_uring_register_queue_chan; + io_uring_prep_chan_post; } LIBURING_2.11;
diff --git a/src/liburing.map b/src/liburing.map index 6438873..1269779 100644 --- a/src/liburing.map +++ b/src/liburing.map
@@ -126,3 +126,8 @@ io_uring_memory_size_params; io_uring_register_sync_msg; } LIBURING_2.10; + +LIBURING_2.12 { + global: + io_uring_register_queue_chan; +} LIBURING_2.11;
diff --git a/src/register.c b/src/register.c index 93eda3f..78f006e 100644 --- a/src/register.c +++ b/src/register.c
@@ -513,3 +513,9 @@ ring->int_flags |= INT_FLAG_NO_IOWAIT; return 0; } + +int io_uring_register_queue_chan(struct io_uring *ring, + struct io_uring_chan_reg *reg) +{ + return do_register(ring, IORING_REGISTER_QUEUE_CHAN, reg, 1); +}
diff --git a/src/sanitize.c b/src/sanitize.c index 383b7d6..fa9096d 100644 --- a/src/sanitize.c +++ b/src/sanitize.c
@@ -120,7 +120,8 @@ sanitize_handlers[IORING_OP_READV_FIXED] = sanitize_sqe_addr; sanitize_handlers[IORING_OP_WRITEV_FIXED] = sanitize_sqe_addr; sanitize_handlers[IORING_OP_PIPE] = sanitize_sqe_addr; - _Static_assert(IORING_OP_PIPE + 1 == IORING_OP_LAST, "Need an implementation for all IORING_OP_* codes"); + sanitize_handlers[IORING_OP_CHAN_POST] = sanitize_sqe_addr; + _Static_assert(IORING_OP_CHAN_POST + 1 == IORING_OP_LAST, "Need an implementation for all IORING_OP_* codes"); sanitize_handlers_initialized = true; }
diff --git a/test/Makefile b/test/Makefile index edfc0df..fae547c 100644 --- a/test/Makefile +++ b/test/Makefile
@@ -172,6 +172,8 @@ poll-v-poll.c \ pollfree.c \ probe.c \ + queue-chan.c \ + queue-chan2.c \ read-before-exit.c \ read-inc-file.c \ read-mshot.c \
diff --git a/test/queue-chan.c b/test/queue-chan.c new file mode 100644 index 0000000..5004219 --- /dev/null +++ b/test/queue-chan.c
@@ -0,0 +1,124 @@ +/* SPDX-License-Identifier: MIT */ +/* + * Description: test queue -> queue channel comms + * + */ +#include <errno.h> +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <pthread.h> + +#include "liburing.h" + +struct data { + pthread_t thread; + pthread_barrier_t startup; + pthread_barrier_t run; + int ring_fd; + int ncqes; +}; + +static void *thread_fn(void *data) +{ + struct io_uring_cqe *cqe; + struct data *d = data; + struct io_uring dst; + int i; + + io_uring_queue_init(1024, &dst, IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_CQE32); + //io_uring_queue_init(8, &dst, 0); + d->ring_fd = dst.ring_fd; + pthread_barrier_wait(&d->startup); + + pthread_barrier_wait(&d->run); + for (i = 0; i < d->ncqes; i++) { + io_uring_wait_cqe(&dst, &cqe); + io_uring_cqe_seen(&dst, cqe); + } + io_uring_queue_exit(&dst); + return NULL; +} + +static int flush_cqes(struct io_uring *ring) +{ + struct io_uring_cqe *cqe; + int extras = 0; + + do { + int ret; + + ret = io_uring_peek_cqe(ring, &cqe); + if (ret) + break; + if (cqe->res < 0) + extras++; + io_uring_cqe_seen(ring, cqe); + } while (1); + + return extras; +} + +int main(int argc, char *argv[]) +{ + struct io_uring_chan_reg reg = { }; + struct io_uring_sqe *sqe; + struct io_uring src; + struct data d = { }; + void *tret; + int ret, i; + int to_submit, dupes = 0; + + if (argc > 1) + return 0; + + io_uring_queue_init(128, &src, 0); + + d.ncqes = 10000000; + //d.ncqes = 1000; + pthread_barrier_init(&d.startup, NULL, 2); + pthread_barrier_init(&d.run, NULL, 2); + + pthread_create(&d.thread, NULL, thread_fn, &d); + pthread_barrier_wait(&d.startup); + + reg.dst_fd = d.ring_fd; + reg.nentries = 256; + //reg.nentries = 32; + ret = io_uring_register_queue_chan(&src, ®); + printf("ret=%d\n", ret); + + pthread_barrier_wait(&d.run); + + to_submit = d.ncqes; + for (i = 0; i < to_submit; i++) { + int extras; + + sqe = io_uring_get_sqe(&src); + if (!sqe) { + io_uring_submit(&src); + extras = flush_cqes(&src); + dupes += extras; + to_submit += extras; + sqe = io_uring_get_sqe(&src); + } + + io_uring_prep_chan_post(sqe, 1, 0x1234, 0xdeadbeef, 0); + sqe->flags = IOSQE_CQE_SKIP_SUCCESS; + sqe->user_data = 0x5aa55aa5; + if (i + 1 == to_submit) { + io_uring_submit(&src); + extras = flush_cqes(&src); + if (!extras) + break; + to_submit += extras; + } + } + printf("submitter done, %d dupes\n", dupes); + + pthread_join(d.thread, &tret); + + io_uring_queue_exit(&src); + return 0; +}
diff --git a/test/queue-chan2.c b/test/queue-chan2.c new file mode 100644 index 0000000..627d6c5 --- /dev/null +++ b/test/queue-chan2.c
@@ -0,0 +1,153 @@ +/* SPDX-License-Identifier: MIT */ +/* + * Description: test queue -> queue channel comms + * + */ +#include <errno.h> +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <pthread.h> + +#include "liburing.h" + +struct data { + pthread_t thread; + pthread_barrier_t startup; + pthread_barrier_t run; + int do_wait; + int index; + int done; + int ring_fd; + int ncqes; +}; + +static void *thread_fn(void *data) +{ + struct io_uring_cqe *cqe; + struct data *d = data; + struct io_uring dst; + int i, ret; + + printf("thread%d: up\n", d->index); + + io_uring_queue_init(128, &dst, IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_CQE32); + d->ring_fd = dst.ring_fd; + pthread_barrier_wait(&d->startup); + + pthread_barrier_wait(&d->run); + if (d->do_wait) { + printf("thread%d: waiting on CQEs\n", d->index); + for (i = 0; i < d->ncqes; i++) { + if (d->done) + break; + io_uring_wait_cqe(&dst, &cqe); + printf("thread%d: got cqe\n", d->index); + io_uring_cqe_seen(&dst, cqe); + } + } else { + printf("thread%d: sleeping\n", d->index); + while (!d->done) { + ret = io_uring_peek_cqe(&dst, &cqe); + if (!ret) { + printf("thread%d: unexpected cqe!\n", d->index); + io_uring_cqe_seen(&dst, cqe); + } + usleep(10000); + } + } + + printf("thread%d: exit\n", d->index); + io_uring_queue_exit(&dst); + return NULL; +} + +static int flush_cqes(struct io_uring *ring) +{ + struct io_uring_cqe *cqe; + int extras = 0; + + do { + int ret; + + ret = io_uring_peek_cqe(ring, &cqe); + if (ret) + break; + if (cqe->res < 0) + extras++; + io_uring_cqe_seen(ring, cqe); + } while (1); + + return extras; +} + +#define NTHREADS 8 + +int main(int argc, char *argv[]) +{ + struct io_uring_sqe *sqe; + struct io_uring src; + struct data d[NTHREADS]; + void *tret; + int ret, i; + int to_submit, dupes = 0; + + if (argc > 1) + return 0; + + io_uring_queue_init(128, &src, 0); + + for (i = 0; i < NTHREADS; i++) { + d[i].ncqes = 10; + d[i].done = 0; + d[i].index = i + 1; + if (i == 3 || i == 5) + d[i].do_wait = 1; + else + d[i].do_wait = 0; + pthread_barrier_init(&d[i].startup, NULL, 2); + pthread_barrier_init(&d[i].run, NULL, 2); + } + + for (i = 0; i < NTHREADS; i++) + pthread_create(&d[i].thread, NULL, thread_fn, &d[i]); + + for (i = 0; i < NTHREADS; i++) + pthread_barrier_wait(&d[i].startup); + + for (i = 0; i < NTHREADS; i++) { + struct io_uring_chan_reg reg = { }; + + reg.dst_fd = d[i].ring_fd; + reg.nentries = 32; + ret = io_uring_register_queue_chan(&src, ®); + printf("thread%d: ret=%d\n", d[i].index, ret); + } + + for (i = 0; i < NTHREADS; i++) + pthread_barrier_wait(&d[i].run); + + to_submit = d[0].ncqes; + for (i = 0; i < to_submit; i++) { + int extras; + + sqe = io_uring_get_sqe(&src); + io_uring_prep_chan_post(sqe, 1, 0x1234, 0xdeadbeef, IORING_CHAN_POST_IDLE); + sqe->user_data = 0x5aa55aa5; + + io_uring_submit(&src); + extras = flush_cqes(&src); + dupes += extras; + to_submit += extras; + } + printf("submitter done, %d dupes\n", dupes); + + for (i = 0; i < NTHREADS; i++) { + d[i].done = 1; + pthread_join(d[i].thread, &tret); + } + + io_uring_queue_exit(&src); + return 0; +}