aio: avoid extra ctx->poll_lock grab if polling is irq-less

Add a task_struct member, aio_task_data, which can be used to
stash completion events. This is where all events will wind up,
if polled IO completions are never done from an interrupt. With
that, we can avoid an extra io_context poll lock grab when off
io_getevents().

The exception being if we have multiple io_contexts banging on
the same device, then we could be finding events that are not
for us. If that's the case, we just fallback to stashing
completion events on the io_context poll_done list, like we
did before (and still do for IRQ driven event completions).

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/fs/aio.c b/fs/aio.c
index e02085f..2053b1b 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1331,15 +1331,16 @@
  * Migh return with data->iocbs holding entries, in which case
  * data->to_free is non-zero and the caller should free them.
  */
-static long aio_poll_reap(struct kioctx *ctx, struct aio_poll_data *data)
-	__releases(&ctx->poll_lock)
-	__acquires(&ctx->poll_lock)
+static long aio_poll_reap(struct kioctx *ctx, struct aio_poll_data *data,
+			  struct list_head *list, spinlock_t *lock)
+	__releases(lock)
+	__acquires(lock)
 {
 	struct aio_kiocb *iocb;
 	int ret, nr = 0;
 
 restart:
-	while (!list_empty(&ctx->poll_done)) {
+	while (!list_empty(list)) {
 		struct io_event __user *uev;
 		struct io_event ev;
 
@@ -1348,8 +1349,7 @@
 			data->to_free = 0;
 		}
 
-		iocb = list_first_entry(&ctx->poll_done, struct aio_kiocb,
-						ki_poll_list);
+		iocb = list_first_entry(list, struct aio_kiocb, ki_poll_list);
 		list_del(&iocb->ki_poll_list);
 
 		data->iocbs[data->to_free++] = iocb;
@@ -1376,9 +1376,14 @@
 		 * also fails, we're done. If it worked, we got another event
 		 * and we restart the list check since we dropped the lock.
 		 */
-		spin_unlock_irq(&ctx->poll_lock);
+		if (lock)
+			spin_unlock_irq(lock);
+
 		ret = copy_to_user(uev, &ev, sizeof(*uev));
-		spin_lock_irq(&ctx->poll_lock);
+
+		if (lock)
+			spin_lock_irq(lock);
+
 		if (!ret) {
 			nr++;
 			if (nr + data->off < data->max)
@@ -1396,10 +1401,10 @@
 }
 
 /*
- * Reap done events, if any
+ * Reap done events on our ctx, if any
  */
-static long aio_poll_find(struct kioctx *ctx, struct io_event __user *evs,
-			  int off, long max)
+static long aio_ctx_poll_find(struct kioctx *ctx, struct io_event __user *evs,
+			      int off, long max)
 {
 	struct aio_poll_data data = {
 		.evs		= evs,
@@ -1413,7 +1418,7 @@
 		return 0;
 
 	spin_lock_irq(&ctx->poll_lock);
-	ret = aio_poll_reap(ctx, &data);
+	ret = aio_poll_reap(ctx, &data, &ctx->poll_done, &ctx->poll_lock);
 	spin_unlock_irq(&ctx->poll_lock);
 
 	if (data.to_free)
@@ -1422,9 +1427,41 @@
 	return ret;
 }
 
+struct aio_task_data {
+	struct list_head list;
+	struct kioctx *ctx;
+};
+
+/*
+ * See if we have any events completed off our task aio_task_data.
+ */
+static int aio_task_poll_find(struct kioctx *ctx, struct io_event __user *event,
+			      int off, long min, long max)
+{
+	struct aio_task_data *aio_task_data = current->aio_task_data;
+	struct aio_poll_data data = {
+		.evs		= event,
+		.off		= off,
+		.max		= max,
+		.to_free	= 0
+	};
+	int ret;
+
+	if (!aio_task_data || list_empty(&aio_task_data->list))
+		return 0;
+
+	ret = aio_poll_reap(ctx, &data, &aio_task_data->list, NULL);
+
+	if (data.to_free)
+		iocb_put_many(ctx, data.iocbs, data.to_free);
+
+	return ret;
+}
+
 static void aio_poll_for_events(struct kioctx *ctx, struct aio_iopoll_data *pd,
 				unsigned int nr_pd, int off, long min, long max)
 {
+	struct aio_task_data *aio_task_data = current->aio_task_data;
 	int i, polled = 0;
 
 	/*
@@ -1441,7 +1478,8 @@
 		/*
 		 * If we have entries waiting to be reaped, stop polling
 		 */
-		if (!list_empty_careful(&ctx->poll_done))
+		if (!list_empty_careful(&ctx->poll_done) ||
+		    (aio_task_data && !list_empty(&aio_task_data->list)))
 			break;
 	}
 }
@@ -1470,7 +1508,7 @@
 			.to_free = 0
 		};
 
-		ret = aio_poll_reap(ctx, &data);
+		ret = aio_poll_reap(ctx, &data, &ctx->poll_done, &ctx->poll_lock);
 		if (!ret)
 			break;
 		else if (ret < 0 || ret + off >= min) {
@@ -1515,12 +1553,28 @@
 	spin_unlock_irq(&ctx->poll_lock);
 
 	if (nr_pd) {
+		struct aio_task_data aio_task_data = {
+			.list = LIST_HEAD_INIT(aio_task_data.list),
+			.ctx = ctx,
+		};
+
 		*entries = nr_pd;
+
+		current->aio_task_data = &aio_task_data;
 		aio_poll_for_events(ctx, pd, nr_pd, off, min, max);
+		ret = aio_task_poll_find(ctx, event, off, min, max);
+		current->aio_task_data = NULL;
+
+		if (ret <= 0)
+			goto done;
+
+		pre = ret;
+		off += ret;
 	}
 
 out:
-	ret = aio_poll_find(ctx, event, off, max);
+	ret = aio_ctx_poll_find(ctx, event, off, max);
+done:
 	if (ret >= 0)
 		return pre + ret;
 	else if (pre)
@@ -1763,6 +1817,7 @@
 	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
 	struct kioctx *ctx = iocb->ki_ctx;
 	struct file *filp = kiocb->ki_filp;
+	struct aio_task_data *aio_task_data;
 	unsigned long flags;
 
 	kiocb_end_write(kiocb);
@@ -1770,8 +1825,12 @@
 	iocb->ki_poll_res = res;
 	iocb->ki_poll_res2 = res2;
 
+	aio_task_data = current->aio_task_data;
 	spin_lock_irqsave(&ctx->poll_lock, flags);
-	list_move_tail(&iocb->ki_poll_list, &ctx->poll_done);
+	if (aio_task_data && aio_task_data->ctx == ctx)
+		list_move_tail(&iocb->ki_poll_list, &aio_task_data->list);
+	else
+		list_move_tail(&iocb->ki_poll_list, &ctx->poll_done);
 	spin_unlock_irqrestore(&ctx->poll_lock, flags);
 
 	fput(filp);
diff --git a/include/linux/sched.h b/include/linux/sched.h
index a51c13c..ca2aedf 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -35,6 +35,7 @@
 struct backing_dev_info;
 struct bio_list;
 struct blk_plug;
+struct aio_task_data;
 struct cfs_rq;
 struct fs_struct;
 struct futex_pi_state;
@@ -955,6 +956,8 @@
 	struct blk_plug			*plug;
 #endif
 
+	struct aio_task_data		*aio_task_data;
+
 	/* VM state: */
 	struct reclaim_state		*reclaim_state;