Merge tag 'io_uring-5.12-2021-03-05' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "A bit of a mix between fallout from the worker change, cleanups and
  reductions now possible from that change, and fixes in general. In
  detail:

   - Fully serialize manager and worker creation, fixing races due to
     that.

   - Clean up some naming that had gone stale.

   - SQPOLL fixes.

   - Fix race condition around task_work rework that went into this
     merge window.

   - Implement unshare. Used for when the original task does unshare(2)
     or setuid/seteuid and friends, drops the original workers and forks
     new ones.

   - Drop the only remaining piece of state shuffling we had left, which
     was cred. Move it into issue instead, and we can drop all of that
     code too.

   - Kill f_op->flush() usage. That was such a nasty hack that we had
     out of necessity, we no longer need it.

   - Following from ->flush() removal, we can also drop various bits of
     ctx state related to SQPOLL and cancelations.

   - Fix an issue with IOPOLL retry, which originally was fallout from a
     filemap change (removing iov_iter_revert()), but uncovered an issue
     with iovec re-import too late.

   - Fix an issue with system suspend.

   - Use xchg() for fallback work, instead of cmpxchg().

   - Properly destroy io-wq on exec.

   - Add create_io_thread() core helper, and use that in io-wq and
     io_uring. This allows us to remove various silly completion events
     related to thread setup.

   - A few error handling fixes.

  This should be the grunt of fixes necessary for the new workers, next
  week should be quieter. We've got a pending series from Pavel on
  cancelations, and how tasks and rings are indexed. Outside of that,
  should just be minor fixes. Even with these fixes, we're still killing
  a net ~80 lines"

* tag 'io_uring-5.12-2021-03-05' of git://git.kernel.dk/linux-block: (41 commits)
  io_uring: don't restrict issue_flags for io_openat
  io_uring: make SQPOLL thread parking saner
  io-wq: kill hashed waitqueue before manager exits
  io_uring: clear IOCB_WAITQ for non -EIOCBQUEUED return
  io_uring: don't keep looping for more events if we can't flush overflow
  io_uring: move to using create_io_thread()
  kernel: provide create_io_thread() helper
  io_uring: reliably cancel linked timeouts
  io_uring: cancel-match based on flags
  io-wq: ensure all pending work is canceled on exit
  io_uring: ensure that threads freeze on suspend
  io_uring: remove extra in_idle wake up
  io_uring: inline __io_queue_async_work()
  io_uring: inline io_req_clean_work()
  io_uring: choose right tctx->io_wq for try cancel
  io_uring: fix -EAGAIN retry with IOPOLL
  io-wq: fix error path leak of buffered write hash map
  io_uring: remove sqo_task
  io_uring: kill sqo_dead and sqo submission halting
  io_uring: ignore double poll add on the same waitqueue head
  ...
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 44e2024..28868eb 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -16,6 +16,7 @@
 #include <linux/rculist_nulls.h>
 #include <linux/cpu.h>
 #include <linux/tracehook.h>
+#include <linux/freezer.h>
 
 #include "../kernel/sched/sched.h"
 #include "io-wq.h"
@@ -52,9 +53,6 @@
 	struct io_wq_work *cur_work;
 	spinlock_t lock;
 
-	const struct cred *cur_creds;
-	const struct cred *saved_creds;
-
 	struct completion ref_done;
 
 	struct rcu_head rcu;
@@ -117,7 +115,10 @@
 	struct io_wq_hash *hash;
 
 	refcount_t refs;
-	struct completion done;
+	struct completion exited;
+
+	atomic_t worker_refs;
+	struct completion worker_done;
 
 	struct hlist_node cpuhp_node;
 
@@ -126,6 +127,17 @@
 
 static enum cpuhp_state io_wq_online;
 
+struct io_cb_cancel_data {
+	work_cancel_fn *fn;
+	void *data;
+	int nr_running;
+	int nr_pending;
+	bool cancel_all;
+};
+
+static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
+				       struct io_cb_cancel_data *match);
+
 static bool io_worker_get(struct io_worker *worker)
 {
 	return refcount_inc_not_zero(&worker->ref);
@@ -175,11 +187,6 @@
 	worker->flags = 0;
 	preempt_enable();
 
-	if (worker->saved_creds) {
-		revert_creds(worker->saved_creds);
-		worker->cur_creds = worker->saved_creds = NULL;
-	}
-
 	raw_spin_lock_irq(&wqe->lock);
 	if (flags & IO_WORKER_F_FREE)
 		hlist_nulls_del_rcu(&worker->nulls_node);
@@ -188,7 +195,9 @@
 	raw_spin_unlock_irq(&wqe->lock);
 
 	kfree_rcu(worker, rcu);
-	io_wq_put(wqe->wq);
+	if (atomic_dec_and_test(&wqe->wq->worker_refs))
+		complete(&wqe->wq->worker_done);
+	do_exit(0);
 }
 
 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
@@ -263,12 +272,6 @@
 		io_wqe_wake_worker(wqe, acct);
 }
 
-static void io_worker_start(struct io_worker *worker)
-{
-	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
-	io_wqe_inc_running(worker);
-}
-
 /*
  * Worker will start processing some work. Move it to the busy list, if
  * it's currently on the freelist
@@ -319,10 +322,6 @@
 		worker->flags |= IO_WORKER_F_FREE;
 		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
 	}
-	if (worker->saved_creds) {
-		revert_creds(worker->saved_creds);
-		worker->cur_creds = worker->saved_creds = NULL;
-	}
 }
 
 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
@@ -397,18 +396,6 @@
 	}
 }
 
-static void io_wq_switch_creds(struct io_worker *worker,
-			       struct io_wq_work *work)
-{
-	const struct cred *old_creds = override_creds(work->creds);
-
-	worker->cur_creds = work->creds;
-	if (worker->saved_creds)
-		put_cred(old_creds); /* creds set by previous switch */
-	else
-		worker->saved_creds = old_creds;
-}
-
 static void io_assign_current_work(struct io_worker *worker,
 				   struct io_wq_work *work)
 {
@@ -458,8 +445,6 @@
 			unsigned int hash = io_get_work_hash(work);
 
 			next_hashed = wq_next_work(work);
-			if (work->creds && worker->cur_creds != work->creds)
-				io_wq_switch_creds(worker, work);
 			wq->do_work(work);
 			io_assign_current_work(worker, NULL);
 
@@ -495,8 +480,13 @@
 	struct io_worker *worker = data;
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
+	char buf[TASK_COMM_LEN];
 
-	io_worker_start(worker);
+	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
+	io_wqe_inc_running(worker);
+
+	sprintf(buf, "iou-wrk-%d", wq->task_pid);
+	set_task_comm(current, buf);
 
 	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 		set_current_state(TASK_INTERRUPTIBLE);
@@ -571,67 +561,11 @@
 	raw_spin_unlock_irq(&worker->wqe->lock);
 }
 
-static int task_thread(void *data, int index)
-{
-	struct io_worker *worker = data;
-	struct io_wqe *wqe = worker->wqe;
-	struct io_wqe_acct *acct = &wqe->acct[index];
-	struct io_wq *wq = wqe->wq;
-	char buf[TASK_COMM_LEN];
-
-	sprintf(buf, "iou-wrk-%d", wq->task_pid);
-	set_task_comm(current, buf);
-
-	current->pf_io_worker = worker;
-	worker->task = current;
-
-	set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node));
-	current->flags |= PF_NO_SETAFFINITY;
-
-	raw_spin_lock_irq(&wqe->lock);
-	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
-	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
-	worker->flags |= IO_WORKER_F_FREE;
-	if (index == IO_WQ_ACCT_BOUND)
-		worker->flags |= IO_WORKER_F_BOUND;
-	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
-		worker->flags |= IO_WORKER_F_FIXED;
-	acct->nr_workers++;
-	raw_spin_unlock_irq(&wqe->lock);
-
-	io_wqe_worker(data);
-	do_exit(0);
-}
-
-static int task_thread_bound(void *data)
-{
-	return task_thread(data, IO_WQ_ACCT_BOUND);
-}
-
-static int task_thread_unbound(void *data)
-{
-	return task_thread(data, IO_WQ_ACCT_UNBOUND);
-}
-
-pid_t io_wq_fork_thread(int (*fn)(void *), void *arg)
-{
-	unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|
-				CLONE_IO|SIGCHLD;
-	struct kernel_clone_args args = {
-		.flags		= ((lower_32_bits(flags) | CLONE_VM |
-				    CLONE_UNTRACED) & ~CSIGNAL),
-		.exit_signal	= (lower_32_bits(flags) & CSIGNAL),
-		.stack		= (unsigned long)fn,
-		.stack_size	= (unsigned long)arg,
-	};
-
-	return kernel_clone(&args);
-}
-
 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
+	struct io_wqe_acct *acct = &wqe->acct[index];
 	struct io_worker *worker;
-	pid_t pid;
+	struct task_struct *tsk;
 
 	__set_current_state(TASK_RUNNING);
 
@@ -645,17 +579,32 @@
 	spin_lock_init(&worker->lock);
 	init_completion(&worker->ref_done);
 
-	refcount_inc(&wq->refs);
+	atomic_inc(&wq->worker_refs);
 
-	if (index == IO_WQ_ACCT_BOUND)
-		pid = io_wq_fork_thread(task_thread_bound, worker);
-	else
-		pid = io_wq_fork_thread(task_thread_unbound, worker);
-	if (pid < 0) {
-		io_wq_put(wq);
+	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+	if (IS_ERR(tsk)) {
+		if (atomic_dec_and_test(&wq->worker_refs))
+			complete(&wq->worker_done);
 		kfree(worker);
 		return false;
 	}
+
+	tsk->pf_io_worker = worker;
+	worker->task = tsk;
+	set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node));
+	tsk->flags |= PF_NOFREEZE | PF_NO_SETAFFINITY;
+
+	raw_spin_lock_irq(&wqe->lock);
+	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
+	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
+	worker->flags |= IO_WORKER_F_FREE;
+	if (index == IO_WQ_ACCT_BOUND)
+		worker->flags |= IO_WORKER_F_BOUND;
+	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
+		worker->flags |= IO_WORKER_F_FIXED;
+	acct->nr_workers++;
+	raw_spin_unlock_irq(&wqe->lock);
+	wake_up_new_task(tsk);
 	return true;
 }
 
@@ -664,6 +613,8 @@
 {
 	struct io_wqe_acct *acct = &wqe->acct[index];
 
+	if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state))
+		return false;
 	/* if we have available workers or no work, no need */
 	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
 		return false;
@@ -697,6 +648,7 @@
 
 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
 {
+	set_notify_signal(worker->task);
 	wake_up_process(worker->task);
 	return false;
 }
@@ -725,6 +677,23 @@
 	}
 }
 
+static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
+{
+	return true;
+}
+
+static void io_wq_cancel_pending(struct io_wq *wq)
+{
+	struct io_cb_cancel_data match = {
+		.fn		= io_wq_work_match_all,
+		.cancel_all	= true,
+	};
+	int node;
+
+	for_each_node(node)
+		io_wqe_cancel_pending_work(wq->wqes[node], &match);
+}
+
 /*
  * Manager thread. Tasked with creating new workers, if we need them.
  */
@@ -732,25 +701,38 @@
 {
 	struct io_wq *wq = data;
 	char buf[TASK_COMM_LEN];
+	int node;
 
 	sprintf(buf, "iou-mgr-%d", wq->task_pid);
 	set_task_comm(current, buf);
-	current->flags |= PF_IO_WORKER;
-	wq->manager = current;
-
-	complete(&wq->done);
 
 	do {
 		set_current_state(TASK_INTERRUPTIBLE);
 		io_wq_check_workers(wq);
 		schedule_timeout(HZ);
+		try_to_freeze();
 		if (fatal_signal_pending(current))
 			set_bit(IO_WQ_BIT_EXIT, &wq->state);
 	} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
 
 	io_wq_check_workers(wq);
-	wq->manager = NULL;
-	io_wq_put(wq);
+
+	rcu_read_lock();
+	for_each_node(node)
+		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
+	rcu_read_unlock();
+
+	/* we might not ever have created any workers */
+	if (atomic_read(&wq->worker_refs))
+		wait_for_completion(&wq->worker_done);
+
+	spin_lock_irq(&wq->hash->wait.lock);
+	for_each_node(node)
+		list_del_init(&wq->wqes[node]->wait.entry);
+	spin_unlock_irq(&wq->hash->wait.lock);
+
+	io_wq_cancel_pending(wq);
+	complete(&wq->exited);
 	do_exit(0);
 }
 
@@ -787,23 +769,20 @@
 
 static int io_wq_fork_manager(struct io_wq *wq)
 {
-	int ret;
+	struct task_struct *tsk;
 
 	if (wq->manager)
 		return 0;
 
-	clear_bit(IO_WQ_BIT_EXIT, &wq->state);
-	refcount_inc(&wq->refs);
-	current->flags |= PF_IO_WORKER;
-	ret = io_wq_fork_thread(io_wq_manager, wq);
-	current->flags &= ~PF_IO_WORKER;
-	if (ret >= 0) {
-		wait_for_completion(&wq->done);
+	reinit_completion(&wq->worker_done);
+	tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
+	if (!IS_ERR(tsk)) {
+		wq->manager = get_task_struct(tsk);
+		wake_up_new_task(tsk);
 		return 0;
 	}
 
-	io_wq_put(wq);
-	return ret;
+	return PTR_ERR(tsk);
 }
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
@@ -813,7 +792,8 @@
 	unsigned long flags;
 
 	/* Can only happen if manager creation fails after exec */
-	if (unlikely(io_wq_fork_manager(wqe->wq))) {
+	if (io_wq_fork_manager(wqe->wq) ||
+	    test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
 		work->flags |= IO_WQ_WORK_CANCEL;
 		wqe->wq->do_work(work);
 		return;
@@ -849,14 +829,6 @@
 	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
 }
 
-struct io_cb_cancel_data {
-	work_cancel_fn *fn;
-	void *data;
-	int nr_running;
-	int nr_pending;
-	bool cancel_all;
-};
-
 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 {
 	struct io_cb_cancel_data *match = data;
@@ -1043,16 +1015,18 @@
 	}
 
 	wq->task_pid = current->pid;
-	init_completion(&wq->done);
+	init_completion(&wq->exited);
 	refcount_set(&wq->refs, 1);
 
+	init_completion(&wq->worker_done);
+	atomic_set(&wq->worker_refs, 0);
+
 	ret = io_wq_fork_manager(wq);
 	if (!ret)
 		return wq;
 
-	io_wq_put(wq);
-	io_wq_put_hash(data->hash);
 err:
+	io_wq_put_hash(data->hash);
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
 	for_each_node(node)
 		kfree(wq->wqes[node]);
@@ -1063,6 +1037,16 @@
 	return ERR_PTR(ret);
 }
 
+static void io_wq_destroy_manager(struct io_wq *wq)
+{
+	if (wq->manager) {
+		wake_up_process(wq->manager);
+		wait_for_completion(&wq->exited);
+		put_task_struct(wq->manager);
+		wq->manager = NULL;
+	}
+}
+
 static void io_wq_destroy(struct io_wq *wq)
 {
 	int node;
@@ -1070,26 +1054,16 @@
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
 
 	set_bit(IO_WQ_BIT_EXIT, &wq->state);
-	if (wq->manager)
-		wake_up_process(wq->manager);
+	io_wq_destroy_manager(wq);
 
-	rcu_read_lock();
-	for_each_node(node)
-		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
-	rcu_read_unlock();
-
-	spin_lock_irq(&wq->hash->wait.lock);
 	for_each_node(node) {
 		struct io_wqe *wqe = wq->wqes[node];
-
-		list_del_init(&wqe->wait.entry);
+		WARN_ON_ONCE(!wq_list_empty(&wqe->work_list));
 		kfree(wqe);
 	}
-	spin_unlock_irq(&wq->hash->wait.lock);
 	io_wq_put_hash(wq->hash);
 	kfree(wq->wqes);
 	kfree(wq);
-
 }
 
 void io_wq_put(struct io_wq *wq)
@@ -1098,6 +1072,13 @@
 		io_wq_destroy(wq);
 }
 
+void io_wq_put_and_exit(struct io_wq *wq)
+{
+	set_bit(IO_WQ_BIT_EXIT, &wq->state);
+	io_wq_destroy_manager(wq);
+	io_wq_put(wq);
+}
+
 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
 {
 	struct task_struct *task = worker->task;
diff --git a/fs/io-wq.h b/fs/io-wq.h
index b6ca12b..5fbf799 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -79,8 +79,8 @@
 
 struct io_wq_work {
 	struct io_wq_work_node list;
-	const struct cred *creds;
 	unsigned flags;
+	unsigned short personality;
 };
 
 static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
@@ -114,12 +114,11 @@
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
 void io_wq_put(struct io_wq *wq);
+void io_wq_put_and_exit(struct io_wq *wq);
 
 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
 void io_wq_hash_work(struct io_wq_work *work, void *val);
 
-pid_t io_wq_fork_thread(int (*fn)(void *), void *arg);
-
 static inline bool io_wq_is_hashed(struct io_wq_work *work)
 {
 	return work->flags & IO_WQ_WORK_HASHED;
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 4a08858..92c25b5 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -74,13 +74,11 @@
 #include <linux/fsnotify.h>
 #include <linux/fadvise.h>
 #include <linux/eventpoll.h>
-#include <linux/fs_struct.h>
 #include <linux/splice.h>
 #include <linux/task_work.h>
 #include <linux/pagemap.h>
 #include <linux/io_uring.h>
-#include <linux/blk-cgroup.h>
-#include <linux/audit.h>
+#include <linux/freezer.h>
 
 #define CREATE_TRACE_POINTS
 #include <trace/events/io_uring.h>
@@ -276,7 +274,7 @@
 
 	unsigned long		state;
 	struct completion	startup;
-	struct completion	completion;
+	struct completion	parked;
 	struct completion	exited;
 };
 
@@ -338,7 +336,6 @@
 		unsigned int		drain_next: 1;
 		unsigned int		eventfd_async: 1;
 		unsigned int		restricted: 1;
-		unsigned int		sqo_dead: 1;
 		unsigned int		sqo_exec: 1;
 
 		/*
@@ -380,11 +377,6 @@
 
 	struct io_rings	*rings;
 
-	/*
-	 * For SQPOLL usage
-	 */
-	struct task_struct	*sqo_task;
-
 	/* Only used for accounting purposes */
 	struct mm_struct	*mm_account;
 
@@ -688,7 +680,6 @@
 	REQ_F_POLLED_BIT,
 	REQ_F_BUFFER_SELECTED_BIT,
 	REQ_F_NO_FILE_TABLE_BIT,
-	REQ_F_WORK_INITIALIZED_BIT,
 	REQ_F_LTIMEOUT_ACTIVE_BIT,
 	REQ_F_COMPLETE_INLINE_BIT,
 
@@ -712,7 +703,7 @@
 
 	/* fail rest of links */
 	REQ_F_FAIL_LINK		= BIT(REQ_F_FAIL_LINK_BIT),
-	/* on inflight list */
+	/* on inflight list, should be cancelled and waited on exit reliably */
 	REQ_F_INFLIGHT		= BIT(REQ_F_INFLIGHT_BIT),
 	/* read/write uses file position */
 	REQ_F_CUR_POS		= BIT(REQ_F_CUR_POS_BIT),
@@ -730,8 +721,6 @@
 	REQ_F_BUFFER_SELECTED	= BIT(REQ_F_BUFFER_SELECTED_BIT),
 	/* doesn't need file table for this request */
 	REQ_F_NO_FILE_TABLE	= BIT(REQ_F_NO_FILE_TABLE_BIT),
-	/* io_wq_work is initialized */
-	REQ_F_WORK_INITIALIZED	= BIT(REQ_F_WORK_INITIALIZED_BIT),
 	/* linked timeout is active, i.e. prepared by link's head */
 	REQ_F_LTIMEOUT_ACTIVE	= BIT(REQ_F_LTIMEOUT_ACTIVE_BIT),
 	/* completion is deferred through io_comp_state */
@@ -1080,9 +1069,7 @@
 		return true;
 
 	io_for_each_link(req, head) {
-		if (!(req->flags & REQ_F_WORK_INITIALIZED))
-			continue;
-		if (req->file && req->file->f_op == &io_uring_fops)
+		if (req->flags & REQ_F_INFLIGHT)
 			return true;
 		if (req->task->files == files)
 			return true;
@@ -1096,24 +1083,6 @@
 		req->flags |= REQ_F_FAIL_LINK;
 }
 
-static inline void __io_req_init_async(struct io_kiocb *req)
-{
-	memset(&req->work, 0, sizeof(req->work));
-	req->flags |= REQ_F_WORK_INITIALIZED;
-}
-
-/*
- * Note: must call io_req_init_async() for the first time you
- * touch any members of io_wq_work.
- */
-static inline void io_req_init_async(struct io_kiocb *req)
-{
-	if (req->flags & REQ_F_WORK_INITIALIZED)
-		return;
-
-	__io_req_init_async(req);
-}
-
 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
 {
 	struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
@@ -1196,37 +1165,11 @@
 	return false;
 }
 
-static void io_req_clean_work(struct io_kiocb *req)
-{
-	if (!(req->flags & REQ_F_WORK_INITIALIZED))
-		return;
-
-	if (req->work.creds) {
-		put_cred(req->work.creds);
-		req->work.creds = NULL;
-	}
-	if (req->flags & REQ_F_INFLIGHT) {
-		struct io_ring_ctx *ctx = req->ctx;
-		struct io_uring_task *tctx = req->task->io_uring;
-		unsigned long flags;
-
-		spin_lock_irqsave(&ctx->inflight_lock, flags);
-		list_del(&req->inflight_entry);
-		spin_unlock_irqrestore(&ctx->inflight_lock, flags);
-		req->flags &= ~REQ_F_INFLIGHT;
-		if (atomic_read(&tctx->in_idle))
-			wake_up(&tctx->wait);
-	}
-
-	req->flags &= ~REQ_F_WORK_INITIALIZED;
-}
-
 static void io_req_track_inflight(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 
 	if (!(req->flags & REQ_F_INFLIGHT)) {
-		io_req_init_async(req);
 		req->flags |= REQ_F_INFLIGHT;
 
 		spin_lock_irq(&ctx->inflight_lock);
@@ -1240,8 +1183,6 @@
 	const struct io_op_def *def = &io_op_defs[req->opcode];
 	struct io_ring_ctx *ctx = req->ctx;
 
-	io_req_init_async(req);
-
 	if (req->flags & REQ_F_FORCE_ASYNC)
 		req->work.flags |= IO_WQ_WORK_CONCURRENT;
 
@@ -1252,8 +1193,6 @@
 		if (def->unbound_nonreg_file)
 			req->work.flags |= IO_WQ_WORK_UNBOUND;
 	}
-	if (!req->work.creds)
-		req->work.creds = get_current_cred();
 }
 
 static void io_prep_async_link(struct io_kiocb *req)
@@ -1264,7 +1203,7 @@
 		io_prep_async_work(cur);
 }
 
-static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req)
+static void io_queue_async_work(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
 	struct io_kiocb *link = io_prep_linked_timeout(req);
@@ -1275,18 +1214,9 @@
 
 	trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
 					&req->work, req->flags);
-	io_wq_enqueue(tctx->io_wq, &req->work);
-	return link;
-}
-
-static void io_queue_async_work(struct io_kiocb *req)
-{
-	struct io_kiocb *link;
-
 	/* init ->work of the whole link before punting */
 	io_prep_async_link(req);
-	link = __io_queue_async_work(req);
-
+	io_wq_enqueue(tctx->io_wq, &req->work);
 	if (link)
 		io_queue_linked_timeout(link);
 }
@@ -1521,18 +1451,22 @@
 	return all_flushed;
 }
 
-static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force,
+static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force,
 				     struct task_struct *tsk,
 				     struct files_struct *files)
 {
+	bool ret = true;
+
 	if (test_bit(0, &ctx->cq_check_overflow)) {
 		/* iopoll syncs against uring_lock, not completion_lock */
 		if (ctx->flags & IORING_SETUP_IOPOLL)
 			mutex_lock(&ctx->uring_lock);
-		__io_cqring_overflow_flush(ctx, force, tsk, files);
+		ret = __io_cqring_overflow_flush(ctx, force, tsk, files);
 		if (ctx->flags & IORING_SETUP_IOPOLL)
 			mutex_unlock(&ctx->uring_lock);
 	}
+
+	return ret;
 }
 
 static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags)
@@ -1714,9 +1648,19 @@
 		io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE));
 	if (req->fixed_rsrc_refs)
 		percpu_ref_put(req->fixed_rsrc_refs);
-	io_req_clean_work(req);
+
+	if (req->flags & REQ_F_INFLIGHT) {
+		struct io_ring_ctx *ctx = req->ctx;
+		unsigned long flags;
+
+		spin_lock_irqsave(&ctx->inflight_lock, flags);
+		list_del(&req->inflight_entry);
+		spin_unlock_irqrestore(&ctx->inflight_lock, flags);
+		req->flags &= ~REQ_F_INFLIGHT;
+	}
 }
 
+/* must to be called somewhat shortly after putting a request */
 static inline void io_put_task(struct task_struct *task, int nr)
 {
 	struct io_uring_task *tctx = task->io_uring;
@@ -1800,15 +1744,7 @@
 		trace_io_uring_fail_link(req, link);
 		io_cqring_fill_event(link, -ECANCELED);
 
-		/*
-		 * It's ok to free under spinlock as they're not linked anymore,
-		 * but avoid REQ_F_WORK_INITIALIZED because it may deadlock on
-		 * work.fs->lock.
-		 */
-		if (link->flags & REQ_F_WORK_INITIALIZED)
-			io_put_req_deferred(link, 2);
-		else
-			io_double_put_req(link);
+		io_put_req_deferred(link, 2);
 		link = nxt;
 	}
 	io_commit_cqring(ctx);
@@ -1845,6 +1781,18 @@
 	return __io_req_find_next(req);
 }
 
+static void ctx_flush_and_put(struct io_ring_ctx *ctx)
+{
+	if (!ctx)
+		return;
+	if (ctx->submit_state.comp.nr) {
+		mutex_lock(&ctx->uring_lock);
+		io_submit_flush_completions(&ctx->submit_state.comp, ctx);
+		mutex_unlock(&ctx->uring_lock);
+	}
+	percpu_ref_put(&ctx->refs);
+}
+
 static bool __tctx_task_work(struct io_uring_task *tctx)
 {
 	struct io_ring_ctx *ctx = NULL;
@@ -1862,30 +1810,20 @@
 	node = list.first;
 	while (node) {
 		struct io_wq_work_node *next = node->next;
-		struct io_ring_ctx *this_ctx;
 		struct io_kiocb *req;
 
 		req = container_of(node, struct io_kiocb, io_task_work.node);
-		this_ctx = req->ctx;
+		if (req->ctx != ctx) {
+			ctx_flush_and_put(ctx);
+			ctx = req->ctx;
+			percpu_ref_get(&ctx->refs);
+		}
+
 		req->task_work.func(&req->task_work);
 		node = next;
-
-		if (!ctx) {
-			ctx = this_ctx;
-		} else if (ctx != this_ctx) {
-			mutex_lock(&ctx->uring_lock);
-			io_submit_flush_completions(&ctx->submit_state.comp, ctx);
-			mutex_unlock(&ctx->uring_lock);
-			ctx = this_ctx;
-		}
 	}
 
-	if (ctx && ctx->submit_state.comp.nr) {
-		mutex_lock(&ctx->uring_lock);
-		io_submit_flush_completions(&ctx->submit_state.comp, ctx);
-		mutex_unlock(&ctx->uring_lock);
-	}
-
+	ctx_flush_and_put(ctx);
 	return list.first != NULL;
 }
 
@@ -1893,10 +1831,10 @@
 {
 	struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work);
 
+	clear_bit(0, &tctx->task_state);
+
 	while (__tctx_task_work(tctx))
 		cond_resched();
-
-	clear_bit(0, &tctx->task_state);
 }
 
 static int io_task_work_add(struct task_struct *tsk, struct io_kiocb *req,
@@ -2010,7 +1948,7 @@
 
 	/* ctx stays valid until unlock, even if we drop all ours ctx->refs */
 	mutex_lock(&ctx->uring_lock);
-	if (!ctx->sqo_dead && !(current->flags & PF_EXITING) && !current->in_execve)
+	if (!(current->flags & PF_EXITING) && !current->in_execve)
 		__io_queue_sqe(req);
 	else
 		__io_req_task_cancel(req, -EFAULT);
@@ -2472,23 +2410,32 @@
 		return false;
 	return !io_setup_async_rw(req, iovec, inline_vecs, &iter, false);
 }
-#endif
 
-static bool io_rw_reissue(struct io_kiocb *req)
+static bool io_rw_should_reissue(struct io_kiocb *req)
 {
-#ifdef CONFIG_BLOCK
 	umode_t mode = file_inode(req->file)->i_mode;
+	struct io_ring_ctx *ctx = req->ctx;
 
 	if (!S_ISBLK(mode) && !S_ISREG(mode))
 		return false;
-	if ((req->flags & REQ_F_NOWAIT) || io_wq_current_is_worker())
+	if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
+	    !(ctx->flags & IORING_SETUP_IOPOLL)))
 		return false;
 	/*
 	 * If ref is dying, we might be running poll reap from the exit work.
 	 * Don't attempt to reissue from that path, just let it fail with
 	 * -EAGAIN.
 	 */
-	if (percpu_ref_is_dying(&req->ctx->refs))
+	if (percpu_ref_is_dying(&ctx->refs))
+		return false;
+	return true;
+}
+#endif
+
+static bool io_rw_reissue(struct io_kiocb *req)
+{
+#ifdef CONFIG_BLOCK
+	if (!io_rw_should_reissue(req))
 		return false;
 
 	lockdep_assert_held(&req->ctx->uring_lock);
@@ -2531,6 +2478,19 @@
 {
 	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
 
+#ifdef CONFIG_BLOCK
+	/* Rewind iter, if we have one. iopoll path resubmits as usual */
+	if (res == -EAGAIN && io_rw_should_reissue(req)) {
+		struct io_async_rw *rw = req->async_data;
+
+		if (rw)
+			iov_iter_revert(&rw->iter,
+					req->result - iov_iter_count(&rw->iter));
+		else if (!io_resubmit_prep(req))
+			res = -EIO;
+	}
+#endif
+
 	if (kiocb->ki_flags & IOCB_WRITE)
 		kiocb_end_write(req);
 
@@ -3279,6 +3239,8 @@
 	ret = io_iter_do_read(req, iter);
 
 	if (ret == -EIOCBQUEUED) {
+		if (req->async_data)
+			iov_iter_revert(iter, io_size - iov_iter_count(iter));
 		goto out_free;
 	} else if (ret == -EAGAIN) {
 		/* IOPOLL retry should happen for io-wq threads */
@@ -3324,6 +3286,7 @@
 		if (ret == -EIOCBQUEUED)
 			return 0;
 		/* we got some bytes, but not all. retry. */
+		kiocb->ki_flags &= ~IOCB_WAITQ;
 	} while (ret > 0 && ret < io_size);
 done:
 	kiocb_done(kiocb, ret, issue_flags);
@@ -3410,6 +3373,8 @@
 	/* no retry on NONBLOCK nor RWF_NOWAIT */
 	if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
 		goto done;
+	if (ret2 == -EIOCBQUEUED && req->async_data)
+		iov_iter_revert(iter, io_size - iov_iter_count(iter));
 	if (!force_nonblock || ret2 != -EAGAIN) {
 		/* IOPOLL retry should happen for io-wq threads */
 		if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN)
@@ -3588,7 +3553,6 @@
 		 * Splice operation will be punted aync, and here need to
 		 * modify io_wq_work.flags, so initialize io_wq_work firstly.
 		 */
-		io_req_init_async(req);
 		req->work.flags |= IO_WQ_WORK_UNBOUND;
 	}
 
@@ -3864,7 +3828,7 @@
 
 static int io_openat(struct io_kiocb *req, unsigned int issue_flags)
 {
-	return io_openat2(req, issue_flags & IO_URING_F_NONBLOCK);
+	return io_openat2(req, issue_flags);
 }
 
 static int io_remove_buffers_prep(struct io_kiocb *req,
@@ -5003,6 +4967,9 @@
 			pt->error = -EINVAL;
 			return;
 		}
+		/* double add on the same waitqueue head, ignore */
+		if (poll->head == head)
+			return;
 		poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
 		if (!poll) {
 			pt->error = -ENOMEM;
@@ -5538,6 +5505,7 @@
 
 	data->mode = io_translate_timeout_mode(flags);
 	hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
+	io_req_track_inflight(req);
 	return 0;
 }
 
@@ -5945,8 +5913,22 @@
 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_ring_ctx *ctx = req->ctx;
+	const struct cred *creds = NULL;
 	int ret;
 
+	if (req->work.personality) {
+		const struct cred *new_creds;
+
+		if (!(issue_flags & IO_URING_F_NONBLOCK))
+			mutex_lock(&ctx->uring_lock);
+		new_creds = idr_find(&ctx->personality_idr, req->work.personality);
+		if (!(issue_flags & IO_URING_F_NONBLOCK))
+			mutex_unlock(&ctx->uring_lock);
+		if (!new_creds)
+			return -EINVAL;
+		creds = override_creds(new_creds);
+	}
+
 	switch (req->opcode) {
 	case IORING_OP_NOP:
 		ret = io_nop(req, issue_flags);
@@ -6053,6 +6035,9 @@
 		break;
 	}
 
+	if (creds)
+		revert_creds(creds);
+
 	if (ret)
 		return ret;
 
@@ -6216,18 +6201,10 @@
 static void __io_queue_sqe(struct io_kiocb *req)
 {
 	struct io_kiocb *linked_timeout = io_prep_linked_timeout(req);
-	const struct cred *old_creds = NULL;
 	int ret;
 
-	if ((req->flags & REQ_F_WORK_INITIALIZED) && req->work.creds &&
-	    req->work.creds != current_cred())
-		old_creds = override_creds(req->work.creds);
-
 	ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
 
-	if (old_creds)
-		revert_creds(old_creds);
-
 	/*
 	 * We async punt it if the file wasn't marked NOWAIT, or if the file
 	 * doesn't support non-blocking read/write attempts
@@ -6314,7 +6291,7 @@
 {
 	struct io_submit_state *state;
 	unsigned int sqe_flags;
-	int id, ret = 0;
+	int ret = 0;
 
 	req->opcode = READ_ONCE(sqe->opcode);
 	/* same numerical values with corresponding REQ_F_*, safe to copy */
@@ -6346,15 +6323,9 @@
 	    !io_op_defs[req->opcode].buffer_select)
 		return -EOPNOTSUPP;
 
-	id = READ_ONCE(sqe->personality);
-	if (id) {
-		__io_req_init_async(req);
-		req->work.creds = idr_find(&ctx->personality_idr, id);
-		if (unlikely(!req->work.creds))
-			return -EINVAL;
-		get_cred(req->work.creds);
-	}
-
+	req->work.list.next = NULL;
+	req->work.flags = 0;
+	req->work.personality = READ_ONCE(sqe->personality);
 	state = &ctx->submit_state;
 
 	/*
@@ -6616,8 +6587,7 @@
 		if (!list_empty(&ctx->iopoll_list))
 			io_do_iopoll(ctx, &nr_events, 0);
 
-		if (to_submit && !ctx->sqo_dead &&
-		    likely(!percpu_ref_is_dying(&ctx->refs)))
+		if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)))
 			ret = io_submit_sqes(ctx, to_submit);
 		mutex_unlock(&ctx->uring_lock);
 	}
@@ -6686,7 +6656,7 @@
 		 * wait_task_inactive().
 		 */
 		preempt_disable();
-		complete(&sqd->completion);
+		complete(&sqd->parked);
 		schedule_preempt_disabled();
 		preempt_enable();
 	}
@@ -6703,7 +6673,6 @@
 
 	sprintf(buf, "iou-sqp-%d", sqd->task_pid);
 	set_task_comm(current, buf);
-	sqd->thread = current;
 	current->pf_io_worker = NULL;
 
 	if (sqd->sq_cpu != -1)
@@ -6712,8 +6681,6 @@
 		set_cpus_allowed_ptr(current, cpu_online_mask);
 	current->flags |= PF_NO_SETAFFINITY;
 
-	complete(&sqd->completion);
-
 	wait_for_completion(&sqd->startup);
 
 	while (!io_sq_thread_should_stop(sqd)) {
@@ -6770,6 +6737,7 @@
 				io_ring_set_wakeup_flag(ctx);
 
 			schedule();
+			try_to_freeze();
 			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
 				io_ring_clear_wakeup_flag(ctx);
 		}
@@ -6784,18 +6752,25 @@
 	io_run_task_work();
 
 	/*
-	 * Clear thread under lock so that concurrent parks work correctly
+	 * Ensure that we park properly if racing with someone trying to park
+	 * while we're exiting. If we fail to grab the lock, check park and
+	 * park if necessary. The ordering with the park bit and the lock
+	 * ensures that we catch this reliably.
 	 */
-	complete_all(&sqd->completion);
-	mutex_lock(&sqd->lock);
+	if (!mutex_trylock(&sqd->lock)) {
+		if (io_sq_thread_should_park(sqd))
+			io_sq_thread_parkme(sqd);
+		mutex_lock(&sqd->lock);
+	}
+
 	sqd->thread = NULL;
 	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
 		ctx->sqo_exec = 1;
 		io_ring_set_wakeup_flag(ctx);
 	}
-	mutex_unlock(&sqd->lock);
 
 	complete(&sqd->exited);
+	mutex_unlock(&sqd->lock);
 	do_exit(0);
 }
 
@@ -6917,11 +6892,16 @@
 	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
 	trace_io_uring_cqring_wait(ctx, min_events);
 	do {
-		io_cqring_overflow_flush(ctx, false, NULL, NULL);
+		/* if we can't even flush overflow, don't wait for more */
+		if (!io_cqring_overflow_flush(ctx, false, NULL, NULL)) {
+			ret = -EBUSY;
+			break;
+		}
 		prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
 						TASK_INTERRUPTIBLE);
 		ret = io_cqring_wait_schedule(ctx, &iowq, &timeout);
 		finish_wait(&ctx->wait, &iowq.wq);
+		cond_resched();
 	} while (ret > 0);
 
 	restore_saved_sigmask_unless(ret == -EINTR);
@@ -7091,40 +7071,42 @@
 static void io_sq_thread_unpark(struct io_sq_data *sqd)
 	__releases(&sqd->lock)
 {
-	if (!sqd->thread)
-		return;
 	if (sqd->thread == current)
 		return;
 	clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
-	wake_up_state(sqd->thread, TASK_PARKED);
+	if (sqd->thread)
+		wake_up_state(sqd->thread, TASK_PARKED);
 	mutex_unlock(&sqd->lock);
 }
 
-static bool io_sq_thread_park(struct io_sq_data *sqd)
+static void io_sq_thread_park(struct io_sq_data *sqd)
 	__acquires(&sqd->lock)
 {
 	if (sqd->thread == current)
-		return true;
-	mutex_lock(&sqd->lock);
-	if (!sqd->thread) {
-		mutex_unlock(&sqd->lock);
-		return false;
-	}
+		return;
 	set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
-	wake_up_process(sqd->thread);
-	wait_for_completion(&sqd->completion);
-	return true;
+	mutex_lock(&sqd->lock);
+	if (sqd->thread) {
+		wake_up_process(sqd->thread);
+		wait_for_completion(&sqd->parked);
+	}
 }
 
 static void io_sq_thread_stop(struct io_sq_data *sqd)
 {
-	if (!sqd->thread)
+	if (test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state))
 		return;
-
-	set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
-	WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state));
-	wake_up_process(sqd->thread);
-	wait_for_completion(&sqd->exited);
+	mutex_lock(&sqd->lock);
+	if (sqd->thread) {
+		set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
+		WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state));
+		wake_up_process(sqd->thread);
+		mutex_unlock(&sqd->lock);
+		wait_for_completion(&sqd->exited);
+		WARN_ON_ONCE(sqd->thread);
+	} else {
+		mutex_unlock(&sqd->lock);
+	}
 }
 
 static void io_put_sq_data(struct io_sq_data *sqd)
@@ -7203,7 +7185,7 @@
 	mutex_init(&sqd->lock);
 	init_waitqueue_head(&sqd->wait);
 	init_completion(&sqd->startup);
-	init_completion(&sqd->completion);
+	init_completion(&sqd->parked);
 	init_completion(&sqd->exited);
 	return sqd;
 }
@@ -7834,6 +7816,8 @@
 	struct io_uring_task *tctx = tsk->io_uring;
 
 	WARN_ON_ONCE(!xa_empty(&tctx->xa));
+	WARN_ON_ONCE(tctx->io_wq);
+
 	percpu_counter_destroy(&tctx->inflight);
 	kfree(tctx);
 	tsk->io_uring = NULL;
@@ -7841,21 +7825,22 @@
 
 static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx)
 {
+	struct task_struct *tsk;
 	int ret;
 
 	clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
-	reinit_completion(&sqd->completion);
-	ctx->sqo_dead = ctx->sqo_exec = 0;
+	reinit_completion(&sqd->parked);
+	ctx->sqo_exec = 0;
 	sqd->task_pid = current->pid;
-	current->flags |= PF_IO_WORKER;
-	ret = io_wq_fork_thread(io_sq_thread, sqd);
-	current->flags &= ~PF_IO_WORKER;
-	if (ret < 0) {
-		sqd->thread = NULL;
-		return ret;
-	}
-	wait_for_completion(&sqd->completion);
-	return io_uring_alloc_task_context(sqd->thread, ctx);
+	tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
+	if (IS_ERR(tsk))
+		return PTR_ERR(tsk);
+	ret = io_uring_alloc_task_context(tsk, ctx);
+	if (ret)
+		set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
+	sqd->thread = tsk;
+	wake_up_new_task(tsk);
+	return ret;
 }
 
 static int io_sq_offload_create(struct io_ring_ctx *ctx,
@@ -7878,6 +7863,7 @@
 		fdput(f);
 	}
 	if (ctx->flags & IORING_SETUP_SQPOLL) {
+		struct task_struct *tsk;
 		struct io_sq_data *sqd;
 
 		ret = -EPERM;
@@ -7919,15 +7905,16 @@
 		}
 
 		sqd->task_pid = current->pid;
-		current->flags |= PF_IO_WORKER;
-		ret = io_wq_fork_thread(io_sq_thread, sqd);
-		current->flags &= ~PF_IO_WORKER;
-		if (ret < 0) {
-			sqd->thread = NULL;
+		tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
+		if (IS_ERR(tsk)) {
+			ret = PTR_ERR(tsk);
 			goto err;
 		}
-		wait_for_completion(&sqd->completion);
-		ret = io_uring_alloc_task_context(sqd->thread, ctx);
+		ret = io_uring_alloc_task_context(tsk, ctx);
+		if (ret)
+			set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
+		sqd->thread = tsk;
+		wake_up_new_task(tsk);
 		if (ret)
 			goto err;
 	} else if (p->flags & IORING_SETUP_SQ_AFF) {
@@ -7946,6 +7933,7 @@
 {
 	struct io_sq_data *sqd = ctx->sq_data;
 
+	ctx->flags &= ~IORING_SETUP_R_DISABLED;
 	if (ctx->flags & IORING_SETUP_SQPOLL)
 		complete(&sqd->startup);
 }
@@ -8384,7 +8372,7 @@
 	}
 }
 
-static void io_req_caches_free(struct io_ring_ctx *ctx, struct task_struct *tsk)
+static void io_req_caches_free(struct io_ring_ctx *ctx)
 {
 	struct io_submit_state *submit_state = &ctx->submit_state;
 	struct io_comp_state *cs = &ctx->submit_state.comp;
@@ -8444,7 +8432,7 @@
 
 	percpu_ref_exit(&ctx->refs);
 	free_uid(ctx->user);
-	io_req_caches_free(ctx, NULL);
+	io_req_caches_free(ctx);
 	if (ctx->hash_map)
 		io_wq_put_hash(ctx->hash_map);
 	kfree(ctx->cancel_hash);
@@ -8512,16 +8500,13 @@
 	return 0;
 }
 
-static void io_run_ctx_fallback(struct io_ring_ctx *ctx)
+static bool io_run_ctx_fallback(struct io_ring_ctx *ctx)
 {
-	struct callback_head *work, *head, *next;
+	struct callback_head *work, *next;
+	bool executed = false;
 
 	do {
-		do {
-			head = NULL;
-			work = READ_ONCE(ctx->exit_task_work);
-		} while (cmpxchg(&ctx->exit_task_work, work, head) != work);
-
+		work = xchg(&ctx->exit_task_work, NULL);
 		if (!work)
 			break;
 
@@ -8531,7 +8516,10 @@
 			work = next;
 			cond_resched();
 		} while (work);
+		executed = true;
 	} while (1);
+
+	return executed;
 }
 
 static void io_ring_exit_work(struct work_struct *work)
@@ -8547,7 +8535,6 @@
 	 */
 	do {
 		io_uring_try_cancel_requests(ctx, NULL, NULL);
-		io_run_ctx_fallback(ctx);
 	} while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20));
 	io_ring_ctx_free(ctx);
 }
@@ -8556,10 +8543,6 @@
 {
 	mutex_lock(&ctx->uring_lock);
 	percpu_ref_kill(&ctx->refs);
-
-	if (WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) && !ctx->sqo_dead))
-		ctx->sqo_dead = 1;
-
 	/* if force is set, the ring is going away. always drop after that */
 	ctx->cq_overflow_flushed = 1;
 	if (ctx->rings)
@@ -8648,7 +8631,8 @@
 					 struct files_struct *files)
 {
 	struct io_task_cancel cancel = { .task = task, .files = files, };
-	struct io_uring_task *tctx = current->io_uring;
+	struct task_struct *tctx_task = task ?: current;
+	struct io_uring_task *tctx = tctx_task->io_uring;
 
 	while (1) {
 		enum io_wq_cancel cret;
@@ -8671,6 +8655,7 @@
 		ret |= io_poll_remove_all(ctx, task, files);
 		ret |= io_kill_timeouts(ctx, task, files);
 		ret |= io_run_task_work();
+		ret |= io_run_ctx_fallback(ctx);
 		io_cqring_overflow_flush(ctx, true, task, files);
 		if (!ret)
 			break;
@@ -8718,17 +8703,6 @@
 	}
 }
 
-static void io_disable_sqo_submit(struct io_ring_ctx *ctx)
-{
-	mutex_lock(&ctx->uring_lock);
-	ctx->sqo_dead = 1;
-	mutex_unlock(&ctx->uring_lock);
-
-	/* make sure callers enter the ring to get error */
-	if (ctx->rings)
-		io_ring_set_wakeup_flag(ctx);
-}
-
 /*
  * We need to iteratively cancel requests, in case a request has dependent
  * hard links. These persist even for failure of cancelations, hence keep
@@ -8738,15 +8712,17 @@
 					  struct files_struct *files)
 {
 	struct task_struct *task = current;
-	bool did_park = false;
 
 	if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
-		io_disable_sqo_submit(ctx);
-		did_park = io_sq_thread_park(ctx->sq_data);
-		if (did_park) {
-			task = ctx->sq_data->thread;
-			atomic_inc(&task->io_uring->in_idle);
+		/* never started, nothing to cancel */
+		if (ctx->flags & IORING_SETUP_R_DISABLED) {
+			io_sq_offload_start(ctx);
+			return;
 		}
+		io_sq_thread_park(ctx->sq_data);
+		task = ctx->sq_data->thread;
+		if (task)
+			atomic_inc(&task->io_uring->in_idle);
 	}
 
 	io_cancel_defer_files(ctx, task, files);
@@ -8755,10 +8731,10 @@
 	if (!files)
 		io_uring_try_cancel_requests(ctx, task, NULL);
 
-	if (did_park) {
+	if (task)
 		atomic_dec(&task->io_uring->in_idle);
+	if (ctx->sq_data)
 		io_sq_thread_unpark(ctx->sq_data);
-	}
 }
 
 /*
@@ -8786,10 +8762,6 @@
 				fput(file);
 				return ret;
 			}
-
-			/* one and only SQPOLL file note, held by sqo_task */
-			WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) &&
-				     current != ctx->sqo_task);
 		}
 		tctx->last = file;
 	}
@@ -8819,13 +8791,17 @@
 		fput(file);
 }
 
-static void io_uring_remove_task_files(struct io_uring_task *tctx)
+static void io_uring_clean_tctx(struct io_uring_task *tctx)
 {
 	struct file *file;
 	unsigned long index;
 
 	xa_for_each(&tctx->xa, index, file)
 		io_uring_del_task_file(file);
+	if (tctx->io_wq) {
+		io_wq_put_and_exit(tctx->io_wq);
+		tctx->io_wq = NULL;
+	}
 }
 
 void __io_uring_files_cancel(struct files_struct *files)
@@ -8840,13 +8816,8 @@
 		io_uring_cancel_task_requests(file->private_data, files);
 	atomic_dec(&tctx->in_idle);
 
-	if (files) {
-		io_uring_remove_task_files(tctx);
-		if (tctx->io_wq) {
-			io_wq_put(tctx->io_wq);
-			tctx->io_wq = NULL;
-		}
-	}
+	if (files)
+		io_uring_clean_tctx(tctx);
 }
 
 static s64 tctx_inflight(struct io_uring_task *tctx)
@@ -8863,11 +8834,12 @@
 
 	if (!sqd)
 		return;
-	io_disable_sqo_submit(ctx);
-	if (!io_sq_thread_park(sqd))
+	io_sq_thread_park(sqd);
+	if (!sqd->thread || !sqd->thread->io_uring) {
+		io_sq_thread_unpark(sqd);
 		return;
+	}
 	tctx = ctx->sq_data->thread->io_uring;
-
 	atomic_inc(&tctx->in_idle);
 	do {
 		/* read completions before cancelations */
@@ -8903,7 +8875,6 @@
 	/* make sure overflow events are dropped */
 	atomic_inc(&tctx->in_idle);
 
-	/* trigger io_disable_sqo_submit() */
 	if (tctx->sqpoll) {
 		struct file *file;
 		unsigned long index;
@@ -8933,53 +8904,9 @@
 
 	atomic_dec(&tctx->in_idle);
 
-	io_uring_remove_task_files(tctx);
-}
-
-static int io_uring_flush(struct file *file, void *data)
-{
-	struct io_uring_task *tctx = current->io_uring;
-	struct io_ring_ctx *ctx = file->private_data;
-
-	/* Ignore helper thread files exit */
-	if (current->flags & PF_IO_WORKER)
-		return 0;
-
-	if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
-		io_uring_cancel_task_requests(ctx, NULL);
-		io_req_caches_free(ctx, current);
-	}
-
-	io_run_ctx_fallback(ctx);
-
-	if (!tctx)
-		return 0;
-
-	/* we should have cancelled and erased it before PF_EXITING */
-	WARN_ON_ONCE((current->flags & PF_EXITING) &&
-		     xa_load(&tctx->xa, (unsigned long)file));
-
-	/*
-	 * fput() is pending, will be 2 if the only other ref is our potential
-	 * task file note. If the task is exiting, drop regardless of count.
-	 */
-	if (atomic_long_read(&file->f_count) != 2)
-		return 0;
-
-	if (ctx->flags & IORING_SETUP_SQPOLL) {
-		/* there is only one file note, which is owned by sqo_task */
-		WARN_ON_ONCE(ctx->sqo_task != current &&
-			     xa_load(&tctx->xa, (unsigned long)file));
-		/* sqo_dead check is for when this happens after cancellation */
-		WARN_ON_ONCE(ctx->sqo_task == current && !ctx->sqo_dead &&
-			     !xa_load(&tctx->xa, (unsigned long)file));
-
-		io_disable_sqo_submit(ctx);
-	}
-
-	if (!(ctx->flags & IORING_SETUP_SQPOLL) || ctx->sqo_task == current)
-		io_uring_del_task_file(file);
-	return 0;
+	io_uring_clean_tctx(tctx);
+	/* all current's requests should be gone, we can kill tctx */
+	__io_uring_free(current);
 }
 
 static void *io_uring_validate_mmap_request(struct file *file,
@@ -9060,22 +8987,14 @@
 	do {
 		if (!io_sqring_full(ctx))
 			break;
-
 		prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
 
-		if (unlikely(ctx->sqo_dead)) {
-			ret = -EOWNERDEAD;
-			goto out;
-		}
-
 		if (!io_sqring_full(ctx))
 			break;
-
 		schedule();
 	} while (!signal_pending(current));
 
 	finish_wait(&ctx->sqo_sq_wait, &wait);
-out:
 	return ret;
 }
 
@@ -9157,8 +9076,6 @@
 			ctx->sqo_exec = 0;
 		}
 		ret = -EOWNERDEAD;
-		if (unlikely(ctx->sqo_dead))
-			goto out;
 		if (flags & IORING_ENTER_SQ_WAKEUP)
 			wake_up(&ctx->sq_data->wait);
 		if (flags & IORING_ENTER_SQ_WAIT) {
@@ -9313,7 +9230,6 @@
 
 static const struct file_operations io_uring_fops = {
 	.release	= io_uring_release,
-	.flush		= io_uring_flush,
 	.mmap		= io_uring_mmap,
 #ifndef CONFIG_MMU
 	.get_unmapped_area = io_uring_nommu_get_unmapped_area,
@@ -9468,7 +9384,6 @@
 	ctx->compat = in_compat_syscall();
 	if (!capable(CAP_IPC_LOCK))
 		ctx->user = get_uid(current_user());
-	ctx->sqo_task = current;
 
 	/*
 	 * This is just grabbed for accounting purposes. When a process exits,
@@ -9531,7 +9446,6 @@
 	 */
 	ret = io_uring_install_fd(ctx, file);
 	if (ret < 0) {
-		io_disable_sqo_submit(ctx);
 		/* fput will clean it up */
 		fput(file);
 		return ret;
@@ -9540,7 +9454,6 @@
 	trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
 	return ret;
 err:
-	io_disable_sqo_submit(ctx);
 	io_ring_ctx_wait_and_kill(ctx);
 	return ret;
 }
@@ -9708,10 +9621,7 @@
 	if (ctx->restrictions.registered)
 		ctx->restricted = 1;
 
-	ctx->flags &= ~IORING_SETUP_R_DISABLED;
-
 	io_sq_offload_start(ctx);
-
 	return 0;
 }
 
diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h
index 51ede771..7cb7bd0 100644
--- a/include/linux/io_uring.h
+++ b/include/linux/io_uring.h
@@ -38,7 +38,7 @@
 
 static inline void io_uring_task_cancel(void)
 {
-	if (current->io_uring && !xa_empty(&current->io_uring->xa))
+	if (current->io_uring)
 		__io_uring_task_cancel();
 }
 static inline void io_uring_files_cancel(struct files_struct *files)
diff --git a/include/linux/sched/task.h b/include/linux/sched/task.h
index c0f71f2..ef02be8 100644
--- a/include/linux/sched/task.h
+++ b/include/linux/sched/task.h
@@ -31,6 +31,7 @@
 	/* Number of elements in *set_tid */
 	size_t set_tid_size;
 	int cgroup;
+	int io_thread;
 	struct cgroup *cgrp;
 	struct css_set *cset;
 };
@@ -82,6 +83,7 @@
 extern void exit_itimers(struct signal_struct *);
 
 extern pid_t kernel_clone(struct kernel_clone_args *kargs);
+struct task_struct *create_io_thread(int (*fn)(void *), void *arg, int node);
 struct task_struct *fork_idle(int);
 struct mm_struct *copy_init_mm(void);
 extern pid_t kernel_thread(int (*fn)(void *), void *arg, unsigned long flags);
diff --git a/kernel/fork.c b/kernel/fork.c
index d66cd10..d3171e8 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -1940,6 +1940,8 @@
 	p = dup_task_struct(current, node);
 	if (!p)
 		goto fork_out;
+	if (args->io_thread)
+		p->flags |= PF_IO_WORKER;
 
 	/*
 	 * This _must_ happen before we call free_task(), i.e. before we jump
@@ -2411,6 +2413,34 @@
 }
 
 /*
+ * This is like kernel_clone(), but shaved down and tailored to just
+ * creating io_uring workers. It returns a created task, or an error pointer.
+ * The returned task is inactive, and the caller must fire it up through
+ * wake_up_new_task(p). All signals are blocked in the created task.
+ */
+struct task_struct *create_io_thread(int (*fn)(void *), void *arg, int node)
+{
+	unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|
+				CLONE_IO;
+	struct kernel_clone_args args = {
+		.flags		= ((lower_32_bits(flags) | CLONE_VM |
+				    CLONE_UNTRACED) & ~CSIGNAL),
+		.exit_signal	= (lower_32_bits(flags) & CSIGNAL),
+		.stack		= (unsigned long)fn,
+		.stack_size	= (unsigned long)arg,
+		.io_thread	= 1,
+	};
+	struct task_struct *tsk;
+
+	tsk = copy_process(NULL, 0, node, &args);
+	if (!IS_ERR(tsk)) {
+		sigfillset(&tsk->blocked);
+		sigdelsetmask(&tsk->blocked, sigmask(SIGKILL));
+	}
+	return tsk;
+}
+
+/*
  *  Ok, this is the main fork-routine.
  *
  * It copies the process, and if successful kick-starts