io_uring: add lockless LIFO list implementation for task_work

Do this just for deferred task_work for now, still a lot of kinks to
iron out. As it's a WIP, bump tw max count to something large, as
a retry list isn't there yet.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 7ddac4d..946dd9c 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -44,9 +44,20 @@ struct io_wq_work_node {
 	struct io_wq_work_node *next;
 };
 
+#ifdef CONFIG_64BIT
+typedef u128 io_wq_list_ptr_t;
+#else
+typedef u64 io_wq_list_ptr_t;
+#endif
+
 struct io_wq_work_list {
-	struct io_wq_work_node *first;
-	struct io_wq_work_node *last;
+	union {
+		struct {
+			struct io_wq_work_node *first;
+			struct io_wq_work_node *last;
+		};
+		io_wq_list_ptr_t list_ptr;
+	};
 };
 
 struct io_wq_work {
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b1cd77d..bc4cb58 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -121,7 +121,7 @@
 
 #define IO_COMPL_BATCH			32
 #define IO_REQ_ALLOC_BATCH		8
-#define IO_LOCAL_TW_DEFAULT_MAX		20
+#define IO_LOCAL_TW_DEFAULT_MAX		2000
 
 struct io_defer_entry {
 	struct list_head	list;
@@ -1179,12 +1179,111 @@ void tctx_task_work(struct callback_head *cb)
 	tctx_task_work_run(tctx);
 }
 
+#ifdef CONFIG_64BIT
+#ifdef system_has_cmpxchg128
+#define wq_has_cmpxchg()		system_has_cmpxchg128()
+#define wq_cmpxchg_list			try_cmpxchg128
+#endif
+#else
+#ifdef system_has_cmpxchg64
+#define wq_has_cmpxchg()		system_has_cmpxchg64()
+#define wq_cmpxchg_list			try_cmpxchg64
+#endif
+#endif
+
+#ifndef wq_has_cmpxchg
+#define wq_has_cmpxchg()		(0)
+#endif
+
+static void wq_ll_add_tail(struct io_wq_work_node *n, struct io_wq_work_list *l)
+{
+	n->next = NULL;
+	do {
+		struct io_wq_work_list new = {
+			.first	= READ_ONCE(l->first) ?: n,
+			.last	= n
+		};
+		struct io_wq_work_list old = {
+			.first	= READ_ONCE(l->first),
+			.last	= READ_ONCE(l->last)
+		};
+		struct io_wq_work_node *old_last = READ_ONCE(l->last);
+
+		if (wq_cmpxchg_list(&l->list_ptr, &old.list_ptr, new.list_ptr)) {
+			if (old_last)
+				WRITE_ONCE(old_last->next, n);
+			break;
+		}
+	} while (1);
+}
+
+static int wq_list_add_tail_ll(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+	unsigned int nr_tw_prev;
+	unsigned long flags;
+
+	if (wq_has_cmpxchg()) {
+		wq_ll_add_tail(&req->io_task_work.node, &ctx->work_list);
+		return 0;
+	}
+
+	spin_lock_irqsave(&ctx->work_lock, flags);
+	wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
+	nr_tw_prev = ctx->work_items++;
+	spin_unlock_irqrestore(&ctx->work_lock, flags);
+	return nr_tw_prev;
+}
+
+static void wq_ll_delete_all(struct io_wq_work_list *l,
+			     struct io_wq_work_list *dst)
+{
+	static struct io_wq_work_list new;
+
+	guard(rcu)();
+	do {
+		dst->first = READ_ONCE(l->first);
+		dst->last = READ_ONCE(l->last);
+
+		if (!wq_cmpxchg_list(&l->list_ptr, &dst->list_ptr, new.list_ptr))
+			continue;
+		if (!dst->last)
+			break;
+		if (!READ_ONCE(dst->last->next))
+			break;
+	} while (1);
+}
+
+static void wq_list_delete_all_ll(struct io_ring_ctx *ctx,
+				  struct io_wq_work_list *dst)
+{
+	if (wq_has_cmpxchg()) {
+		WRITE_ONCE(ctx->work_items, 0);
+		wq_ll_delete_all(&ctx->work_list, dst);
+		/* cmpxchg() provides the necessary memory barrier */
+		return;
+	}
+
+	spin_lock_irq(&ctx->work_lock);
+	*dst = ctx->work_list;
+	INIT_WQ_LIST(&ctx->work_list);
+	ctx->work_items = 0;
+	spin_unlock_irq(&ctx->work_lock);
+
+	/*
+	 * We need a barrier after unlock, which pairs with the barrier
+	 * in set_current_state() on the io_cqring_wait() side. It's used
+	 * to ensure that either we see updated ->cq_wait_nr, or waiters
+	 * going to sleep will observe the work added to the list, which
+	 * is similar to the wait/wake task state sync.
+	 */
+	smp_mb();
+}
+
 static inline void io_req_local_work_add(struct io_kiocb *req,
 					 struct io_ring_ctx *ctx,
 					 unsigned tw_flags)
 {
 	unsigned nr_tw, nr_tw_prev, nr_wait;
-	unsigned long flags;
 
 	/* See comment above IO_CQ_WAKE_INIT */
 	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
@@ -1196,10 +1295,9 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
 	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
 		tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
 
-	spin_lock_irqsave(&ctx->work_lock, flags);
-	wq_list_add_tail(&req->io_task_work.node, &ctx->work_list);
-	nr_tw_prev = ctx->work_items++;
-	spin_unlock_irqrestore(&ctx->work_lock, flags);
+	guard(rcu)();
+
+	nr_tw_prev = wq_list_add_tail_ll(ctx, req);
 
 	nr_tw = nr_tw_prev + 1;
 	if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
@@ -1212,16 +1310,6 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
 			io_eventfd_signal(ctx);
 	}
 
-	guard(rcu)();
-
-	/*
-	 * We need a barrier after unlock, which pairs with the barrier
-	 * in set_current_state() on the io_cqring_wait() side. It's used
-	 * to ensure that either we see updated ->cq_wait_nr, or waiters
-	 * going to sleep will observe the work added to the list, which
-	 * is similar to the wait/wake task state sync.
-	 */
-	smp_mb();
 	nr_wait = atomic_read(&ctx->cq_wait_nr);
 	/* not enough or no one is waiting */
 	if (nr_tw < nr_wait)
@@ -1308,7 +1396,8 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 			       int min_events)
 {
-	struct io_wq_work_node *node, *tail;
+	struct io_wq_work_node *node;
+	struct io_wq_work_list list;
 	int ret, limit, nitems;
 	unsigned int loops = 0;
 
@@ -1319,14 +1408,9 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 	ret = 0;
 	limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 again:
-	spin_lock_irq(&ctx->work_lock);
-	node = ctx->work_list.first;
-	tail = ctx->work_list.last;
-	nitems = ctx->work_items;
-	INIT_WQ_LIST(&ctx->work_list);
-	ctx->work_items = 0;
-	spin_unlock_irq(&ctx->work_lock);
-
+	wq_list_delete_all_ll(ctx, &list);
+	node = list.first;
+	nitems = 0;
 	while (node) {
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 						    io_task_work.node);
@@ -1334,12 +1418,14 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 		INDIRECT_CALL_2(req->io_task_work.func,
 				io_poll_task_func, io_req_rw_complete,
 				req, ts);
-		nitems--;
-		if (++ret >= limit)
+		if (++nitems >= limit)
 			break;
 	}
 
+	ret += nitems;
 	if (unlikely(node)) {
+		WARN_ON_ONCE(1);
+#if 0
 		spin_lock_irq(&ctx->work_lock);
 		tail->next = ctx->work_list.first;
 		ctx->work_list.first = node;
@@ -1347,6 +1433,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 			ctx->work_list.last = tail;
 		ctx->work_items += nitems;
 		spin_unlock_irq(&ctx->work_lock);
+#endif
 		goto retry_done;
 	}