test queue channel comms

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/src/include/liburing.h b/src/include/liburing.h
index e3f394e..9eb5868 100644
--- a/src/include/liburing.h
+++ b/src/include/liburing.h
@@ -230,6 +230,8 @@
 				 struct io_uring_cqe **cqe_ptr, unsigned wait_nr,
 				 int reg_index) LIBURING_NOEXCEPT;
 
+int io_uring_register_queue_chan(struct io_uring *ring,
+				 struct io_uring_chan_reg *reg) LIBURING_NOEXCEPT;
 int io_uring_register_wait_reg(struct io_uring *ring,
 			       struct io_uring_reg_wait *reg, int nr)
    LIBURING_NOEXCEPT;
@@ -1595,6 +1597,24 @@
 	__io_uring_set_target_fixed_file(sqe, file_index);
 }
 
+/*
+ * Post CQE on target indicated by 'queue' with 'res' as cqe->res, user_data
+ * as the cqe->user_data, and 'value' as the big_cqe u64 extra1 value.
+ * Flags can either be 0, or IORING_CHAN_IDLE. If flags is zero, then 'queue'
+ * must return a valid queue id returned from io_uring_register_queue_chan.
+ * If flags is IORING_CHAN_IDLE, 'queue' must be zero and an idle queue will
+ * get picked. If no idle queues exist, the SQE will be errored on the local
+ * ring.
+ */
+IOURINGINLINE void io_uring_prep_chan_post(struct io_uring_sqe *sqe, int queue,
+					   __u64 value, __u64 user_data,
+					   unsigned int flags)
+{
+	io_uring_prep_rw(IORING_OP_CHAN_POST, sqe, queue,
+				(void *) (uintptr_t) user_data, 0, value);
+	sqe->rw_flags = flags;
+}
+
 /* Read the kernel's SQ head index with appropriate memory ordering */
 IOURINGINLINE unsigned io_uring_load_sq_head(const struct io_uring *ring)
 	LIBURING_NOEXCEPT
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index 212b587..dd05771 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -269,6 +269,7 @@
 	IORING_OP_READV_FIXED,
 	IORING_OP_WRITEV_FIXED,
 	IORING_OP_PIPE,
+	IORING_OP_CHAN_POST,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
@@ -431,6 +432,10 @@
  */
 #define IORING_NOP_INJECT_RESULT	(1U << 0)
 
+/* find and send to idle target. if not set, sqe->fd is the target */
+#define IORING_CHAN_POST_IDLE	0x1
+
+
 /*
  * IO completion data structure (Completion Queue Entry)
  */
@@ -641,6 +646,8 @@
 
 	IORING_REGISTER_MEM_REGION		= 34,
 
+	IORING_REGISTER_QUEUE_CHAN		= 35,
+
 	/* this goes last */
 	IORING_REGISTER_LAST,
 
@@ -686,6 +693,17 @@
 	__u64 __resv[2];
 };
 
+enum {
+	IORING_CHAN_REG_BIDI =	0x1,
+};
+
+struct io_uring_chan_reg {
+	__u32 flags;
+	__u32 dst_fd;
+	__u32 nentries;
+	__u32 resv[7];
+};
+
 /*
  * Register a fully sparse file space, rather than pass in an array of all
  * -1 file descriptors.
diff --git a/src/liburing-ffi.map b/src/liburing-ffi.map
index 004b91a..bc86fdb 100644
--- a/src/liburing-ffi.map
+++ b/src/liburing-ffi.map
@@ -249,4 +249,6 @@
 	global:
 		io_uring_prep_pipe;
 		io_uring_prep_pipe_direct;
+		io_uring_register_queue_chan;
+		io_uring_prep_chan_post;
 } LIBURING_2.11;
diff --git a/src/liburing.map b/src/liburing.map
index 6438873..1269779 100644
--- a/src/liburing.map
+++ b/src/liburing.map
@@ -126,3 +126,8 @@
 		io_uring_memory_size_params;
 		io_uring_register_sync_msg;
 } LIBURING_2.10;
+
+LIBURING_2.12 {
+	global:
+		io_uring_register_queue_chan;
+} LIBURING_2.11;
diff --git a/src/register.c b/src/register.c
index 93eda3f..78f006e 100644
--- a/src/register.c
+++ b/src/register.c
@@ -513,3 +513,9 @@
 		ring->int_flags |= INT_FLAG_NO_IOWAIT;
 	return 0;
 }
+
+int io_uring_register_queue_chan(struct io_uring *ring,
+				 struct io_uring_chan_reg *reg)
+{
+	return do_register(ring, IORING_REGISTER_QUEUE_CHAN, reg, 1);
+}
diff --git a/src/sanitize.c b/src/sanitize.c
index 383b7d6..fa9096d 100644
--- a/src/sanitize.c
+++ b/src/sanitize.c
@@ -120,7 +120,8 @@
 	sanitize_handlers[IORING_OP_READV_FIXED] = sanitize_sqe_addr;
 	sanitize_handlers[IORING_OP_WRITEV_FIXED] = sanitize_sqe_addr;
 	sanitize_handlers[IORING_OP_PIPE] = sanitize_sqe_addr;
-	_Static_assert(IORING_OP_PIPE + 1 == IORING_OP_LAST, "Need an implementation for all IORING_OP_* codes");
+	sanitize_handlers[IORING_OP_CHAN_POST] = sanitize_sqe_addr;
+	_Static_assert(IORING_OP_CHAN_POST + 1 == IORING_OP_LAST, "Need an implementation for all IORING_OP_* codes");
 	sanitize_handlers_initialized = true;
 }
 
diff --git a/test/Makefile b/test/Makefile
index edfc0df..fae547c 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -172,6 +172,8 @@
 	poll-v-poll.c \
 	pollfree.c \
 	probe.c \
+	queue-chan.c \
+	queue-chan2.c \
 	read-before-exit.c \
 	read-inc-file.c \
 	read-mshot.c \
diff --git a/test/queue-chan.c b/test/queue-chan.c
new file mode 100644
index 0000000..5004219
--- /dev/null
+++ b/test/queue-chan.c
@@ -0,0 +1,124 @@
+/* SPDX-License-Identifier: MIT */
+/*
+ * Description: test queue -> queue channel comms
+ *
+ */
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "liburing.h"
+
+struct data {
+	pthread_t thread;
+	pthread_barrier_t startup;
+	pthread_barrier_t run;
+	int ring_fd;
+	int ncqes;
+};
+
+static void *thread_fn(void *data)
+{
+	struct io_uring_cqe *cqe;
+	struct data *d = data;
+	struct io_uring dst;
+	int i;
+
+	io_uring_queue_init(1024, &dst, IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_CQE32);
+	//io_uring_queue_init(8, &dst, 0);
+	d->ring_fd = dst.ring_fd;
+	pthread_barrier_wait(&d->startup);
+
+	pthread_barrier_wait(&d->run);
+	for (i = 0; i < d->ncqes; i++) {
+		io_uring_wait_cqe(&dst, &cqe);
+		io_uring_cqe_seen(&dst, cqe);
+	}
+	io_uring_queue_exit(&dst);
+	return NULL;
+}
+
+static int flush_cqes(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+	int extras = 0;
+
+	do {
+		int ret;
+
+		ret = io_uring_peek_cqe(ring, &cqe);
+		if (ret)
+			break;
+		if (cqe->res < 0)
+			extras++;
+		io_uring_cqe_seen(ring, cqe);
+	} while (1);
+
+	return extras;
+}
+
+int main(int argc, char *argv[])
+{
+	struct io_uring_chan_reg reg = { };
+	struct io_uring_sqe *sqe;
+	struct io_uring src;
+	struct data d = { };
+	void *tret;
+	int ret, i;
+	int to_submit, dupes = 0;
+
+	if (argc > 1)
+		return 0;
+
+	io_uring_queue_init(128, &src, 0);
+
+	d.ncqes = 10000000;
+	//d.ncqes = 1000;
+	pthread_barrier_init(&d.startup, NULL, 2);
+	pthread_barrier_init(&d.run, NULL, 2);
+
+	pthread_create(&d.thread, NULL, thread_fn, &d);
+	pthread_barrier_wait(&d.startup);
+
+	reg.dst_fd = d.ring_fd;
+	reg.nentries = 256;
+	//reg.nentries = 32;
+	ret = io_uring_register_queue_chan(&src, &reg);
+	printf("ret=%d\n", ret);
+
+	pthread_barrier_wait(&d.run);
+
+	to_submit = d.ncqes;
+	for (i = 0; i < to_submit; i++) {
+		int extras;
+
+		sqe = io_uring_get_sqe(&src);
+		if (!sqe) {
+			io_uring_submit(&src);
+			extras = flush_cqes(&src);
+			dupes += extras;
+			to_submit += extras;
+			sqe = io_uring_get_sqe(&src);
+		}
+
+		io_uring_prep_chan_post(sqe, 1, 0x1234, 0xdeadbeef, 0);
+		sqe->flags = IOSQE_CQE_SKIP_SUCCESS;
+		sqe->user_data = 0x5aa55aa5;
+		if (i + 1 == to_submit) {
+			io_uring_submit(&src);
+			extras = flush_cqes(&src);
+			if (!extras)
+				break;
+			to_submit += extras;
+		}
+	}
+	printf("submitter done, %d dupes\n", dupes);
+
+	pthread_join(d.thread, &tret);
+
+	io_uring_queue_exit(&src);
+	return 0;
+}
diff --git a/test/queue-chan2.c b/test/queue-chan2.c
new file mode 100644
index 0000000..627d6c5
--- /dev/null
+++ b/test/queue-chan2.c
@@ -0,0 +1,153 @@
+/* SPDX-License-Identifier: MIT */
+/*
+ * Description: test queue -> queue channel comms
+ *
+ */
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "liburing.h"
+
+struct data {
+	pthread_t thread;
+	pthread_barrier_t startup;
+	pthread_barrier_t run;
+	int do_wait;
+	int index;
+	int done;
+	int ring_fd;
+	int ncqes;
+};
+
+static void *thread_fn(void *data)
+{
+	struct io_uring_cqe *cqe;
+	struct data *d = data;
+	struct io_uring dst;
+	int i, ret;
+
+	printf("thread%d: up\n", d->index);
+
+	io_uring_queue_init(128, &dst, IORING_SETUP_DEFER_TASKRUN|IORING_SETUP_SINGLE_ISSUER|IORING_SETUP_CQE32);
+	d->ring_fd = dst.ring_fd;
+	pthread_barrier_wait(&d->startup);
+
+	pthread_barrier_wait(&d->run);
+	if (d->do_wait) {
+		printf("thread%d: waiting on CQEs\n", d->index);
+		for (i = 0; i < d->ncqes; i++) {
+			if (d->done)
+				break;
+			io_uring_wait_cqe(&dst, &cqe);
+			printf("thread%d: got cqe\n", d->index);
+			io_uring_cqe_seen(&dst, cqe);
+		}
+	} else {
+		printf("thread%d: sleeping\n", d->index);
+		while (!d->done) {
+			ret = io_uring_peek_cqe(&dst, &cqe);
+			if (!ret) {
+				printf("thread%d: unexpected cqe!\n", d->index);
+				io_uring_cqe_seen(&dst, cqe);
+			}
+			usleep(10000);
+		}
+	}
+
+	printf("thread%d: exit\n", d->index);
+	io_uring_queue_exit(&dst);
+	return NULL;
+}
+
+static int flush_cqes(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+	int extras = 0;
+
+	do {
+		int ret;
+
+		ret = io_uring_peek_cqe(ring, &cqe);
+		if (ret)
+			break;
+		if (cqe->res < 0)
+			extras++;
+		io_uring_cqe_seen(ring, cqe);
+	} while (1);
+
+	return extras;
+}
+
+#define NTHREADS		8
+
+int main(int argc, char *argv[])
+{
+	struct io_uring_sqe *sqe;
+	struct io_uring src;
+	struct data d[NTHREADS];
+	void *tret;
+	int ret, i;
+	int to_submit, dupes = 0;
+
+	if (argc > 1)
+		return 0;
+
+	io_uring_queue_init(128, &src, 0);
+
+	for (i = 0; i < NTHREADS; i++) {
+		d[i].ncqes = 10;
+		d[i].done = 0;
+		d[i].index = i + 1;
+		if (i == 3 || i == 5)
+			d[i].do_wait = 1;
+		else
+			d[i].do_wait = 0;
+		pthread_barrier_init(&d[i].startup, NULL, 2);
+		pthread_barrier_init(&d[i].run, NULL, 2);
+	}
+
+	for (i = 0; i < NTHREADS; i++)
+		pthread_create(&d[i].thread, NULL, thread_fn, &d[i]);
+	
+	for (i = 0; i < NTHREADS; i++)
+		pthread_barrier_wait(&d[i].startup);
+
+	for (i = 0; i < NTHREADS; i++) {
+		struct io_uring_chan_reg reg = { };
+
+		reg.dst_fd = d[i].ring_fd;
+		reg.nentries = 32;
+		ret = io_uring_register_queue_chan(&src, &reg);
+		printf("thread%d: ret=%d\n", d[i].index, ret);
+	}
+
+	for (i = 0; i < NTHREADS; i++)
+		pthread_barrier_wait(&d[i].run);
+
+	to_submit = d[0].ncqes;
+	for (i = 0; i < to_submit; i++) {
+		int extras;
+
+		sqe = io_uring_get_sqe(&src);
+		io_uring_prep_chan_post(sqe, 1, 0x1234, 0xdeadbeef, IORING_CHAN_POST_IDLE);
+		sqe->user_data = 0x5aa55aa5;
+
+		io_uring_submit(&src);
+		extras = flush_cqes(&src);
+		dupes += extras;
+		to_submit += extras;
+	}
+	printf("submitter done, %d dupes\n", dupes);
+
+	for (i = 0; i < NTHREADS; i++) {
+		d[i].done = 1;
+		pthread_join(d[i].thread, &tret);
+	}
+
+	io_uring_queue_exit(&src);
+	return 0;
+}