diff --git a/fs/xfs/xfs_discard.c b/fs/xfs/xfs_discard.c
index afc4c78..d578799 100644
--- a/fs/xfs/xfs_discard.c
+++ b/fs/xfs/xfs_discard.c
@@ -1,6 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 /*
- * Copyright (C) 2010 Red Hat, Inc.
+ * Copyright (C) 2010, 2023 Red Hat, Inc.
  * All Rights Reserved.
  */
 #include "xfs.h"
@@ -19,21 +19,147 @@
 #include "xfs_log.h"
 #include "xfs_ag.h"
 
-STATIC int
-xfs_trim_extents(
+/*
+ * Notes on an efficient, low latency fstrim algorithm
+ *
+ * We need to walk the filesystem free space and issue discards on the free
+ * space that meet the search criteria (size and location). We cannot issue
+ * discards on extents that might be in use, or are so recently in use they are
+ * still marked as busy. To serialise against extent state changes whilst we are
+ * gathering extents to trim, we must hold the AGF lock to lock out other
+ * allocations and extent free operations that might change extent state.
+ *
+ * However, we cannot just hold the AGF for the entire AG free space walk whilst
+ * we issue discards on each free space that is found. Storage devices can have
+ * extremely slow discard implementations (e.g. ceph RBD) and so walking a
+ * couple of million free extents and issuing synchronous discards on each
+ * extent can take a *long* time. Whilst we are doing this walk, nothing else
+ * can access the AGF, and we can stall transactions and hence the log whilst
+ * modifications wait for the AGF lock to be released. This can lead hung tasks
+ * kicking the hung task timer and rebooting the system. This is bad.
+ *
+ * Hence we need to take a leaf from the bulkstat playbook. It takes the AGI
+ * lock, gathers a range of inode cluster buffers that are allocated, drops the
+ * AGI lock and then reads all the inode cluster buffers and processes them. It
+ * loops doing this, using a cursor to keep track of where it is up to in the AG
+ * for each iteration to restart the INOBT lookup from.
+ *
+ * We can't do this exactly with free space - once we drop the AGF lock, the
+ * state of the free extent is out of our control and we cannot run a discard
+ * safely on it in this situation. Unless, of course, we've marked the free
+ * extent as busy and undergoing a discard operation whilst we held the AGF
+ * locked.
+ *
+ * This is exactly how online discard works - free extents are marked busy when
+ * they are freed, and once the extent free has been committed to the journal,
+ * the busy extent record is marked as "undergoing discard" and the discard is
+ * then issued on the free extent. Once the discard completes, the busy extent
+ * record is removed and the extent is able to be allocated again.
+ *
+ * In the context of fstrim, if we find a free extent we need to discard, we
+ * don't have to discard it immediately. All we need to do it record that free
+ * extent as being busy and under discard, and all the allocation routines will
+ * now avoid trying to allocate it. Hence if we mark the extent as busy under
+ * the AGF lock, we can safely discard it without holding the AGF lock because
+ * nothing will attempt to allocate that free space until the discard completes.
+ *
+ * This also allows us to issue discards asynchronously like we do with online
+ * discard, and so for fast devices fstrim will run much faster as we can have
+ * multiple discard operations in flight at once, as well as pipeline the free
+ * extent search so that it overlaps in flight discard IO.
+ */
+
+struct workqueue_struct *xfs_discard_wq;
+
+static void
+xfs_discard_endio_work(
+	struct work_struct	*work)
+{
+	struct xfs_busy_extents	*extents =
+		container_of(work, struct xfs_busy_extents, endio_work);
+
+	xfs_extent_busy_clear(extents->mount, &extents->extent_list, false);
+	kmem_free(extents->owner);
+}
+
+/*
+ * Queue up the actual completion to a thread to avoid IRQ-safe locking for
+ * pagb_lock.
+ */
+static void
+xfs_discard_endio(
+	struct bio		*bio)
+{
+	struct xfs_busy_extents	*extents = bio->bi_private;
+
+	INIT_WORK(&extents->endio_work, xfs_discard_endio_work);
+	queue_work(xfs_discard_wq, &extents->endio_work);
+	bio_put(bio);
+}
+
+/*
+ * Walk the discard list and issue discards on all the busy extents in the
+ * list. We plug and chain the bios so that we only need a single completion
+ * call to clear all the busy extents once the discards are complete.
+ */
+int
+xfs_discard_extents(
+	struct xfs_mount	*mp,
+	struct xfs_busy_extents	*extents)
+{
+	struct xfs_extent_busy	*busyp;
+	struct bio		*bio = NULL;
+	struct blk_plug		plug;
+	int			error = 0;
+
+	blk_start_plug(&plug);
+	list_for_each_entry(busyp, &extents->extent_list, list) {
+		trace_xfs_discard_extent(mp, busyp->agno, busyp->bno,
+					 busyp->length);
+
+		error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev,
+				XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno),
+				XFS_FSB_TO_BB(mp, busyp->length),
+				GFP_NOFS, &bio);
+		if (error && error != -EOPNOTSUPP) {
+			xfs_info(mp,
+	 "discard failed for extent [0x%llx,%u], error %d",
+				 (unsigned long long)busyp->bno,
+				 busyp->length,
+				 error);
+			break;
+		}
+	}
+
+	if (bio) {
+		bio->bi_private = extents;
+		bio->bi_end_io = xfs_discard_endio;
+		submit_bio(bio);
+	} else {
+		xfs_discard_endio_work(&extents->endio_work);
+	}
+	blk_finish_plug(&plug);
+
+	return error;
+}
+
+
+static int
+xfs_trim_gather_extents(
 	struct xfs_perag	*pag,
 	xfs_daddr_t		start,
 	xfs_daddr_t		end,
 	xfs_daddr_t		minlen,
+	struct xfs_alloc_rec_incore *tcur,
+	struct xfs_busy_extents	*extents,
 	uint64_t		*blocks_trimmed)
 {
 	struct xfs_mount	*mp = pag->pag_mount;
-	struct block_device	*bdev = mp->m_ddev_targp->bt_bdev;
 	struct xfs_btree_cur	*cur;
 	struct xfs_buf		*agbp;
-	struct xfs_agf		*agf;
 	int			error;
 	int			i;
+	int			batch = 100;
 
 	/*
 	 * Force out the log.  This means any transactions that might have freed
@@ -45,20 +171,28 @@ xfs_trim_extents(
 	error = xfs_alloc_read_agf(pag, NULL, 0, &agbp);
 	if (error)
 		return error;
-	agf = agbp->b_addr;
 
 	cur = xfs_allocbt_init_cursor(mp, NULL, agbp, pag, XFS_BTNUM_CNT);
 
 	/*
-	 * Look up the longest btree in the AGF and start with it.
+	 * Look up the extent length requested in the AGF and start with it.
 	 */
-	error = xfs_alloc_lookup_ge(cur, 0, be32_to_cpu(agf->agf_longest), &i);
+	if (tcur->ar_startblock == NULLAGBLOCK)
+		error = xfs_alloc_lookup_ge(cur, 0, tcur->ar_blockcount, &i);
+	else
+		error = xfs_alloc_lookup_le(cur, tcur->ar_startblock,
+				tcur->ar_blockcount, &i);
 	if (error)
 		goto out_del_cursor;
+	if (i == 0) {
+		/* nothing of that length left in the AG, we are done */
+		tcur->ar_blockcount = 0;
+		goto out_del_cursor;
+	}
 
 	/*
 	 * Loop until we are done with all extents that are large
-	 * enough to be worth discarding.
+	 * enough to be worth discarding or we hit batch limits.
 	 */
 	while (i) {
 		xfs_agblock_t	fbno;
@@ -73,7 +207,16 @@ xfs_trim_extents(
 			error = -EFSCORRUPTED;
 			break;
 		}
-		ASSERT(flen <= be32_to_cpu(agf->agf_longest));
+
+		if (--batch <= 0) {
+			/*
+			 * Update the cursor to point at this extent so we
+			 * restart the next batch from this extent.
+			 */
+			tcur->ar_startblock = fbno;
+			tcur->ar_blockcount = flen;
+			break;
+		}
 
 		/*
 		 * use daddr format for all range/len calculations as that is
@@ -88,6 +231,7 @@ xfs_trim_extents(
 		 */
 		if (dlen < minlen) {
 			trace_xfs_discard_toosmall(mp, pag->pag_agno, fbno, flen);
+			tcur->ar_blockcount = 0;
 			break;
 		}
 
@@ -110,29 +254,103 @@ xfs_trim_extents(
 			goto next_extent;
 		}
 
-		trace_xfs_discard_extent(mp, pag->pag_agno, fbno, flen);
-		error = blkdev_issue_discard(bdev, dbno, dlen, GFP_NOFS);
-		if (error)
-			break;
+		xfs_extent_busy_insert_discard(pag, fbno, flen,
+				&extents->extent_list);
 		*blocks_trimmed += flen;
-
 next_extent:
 		error = xfs_btree_decrement(cur, 0, &i);
 		if (error)
 			break;
 
-		if (fatal_signal_pending(current)) {
-			error = -ERESTARTSYS;
-			break;
-		}
+		/*
+		 * If there's no more records in the tree, we are done. Set the
+		 * cursor block count to 0 to indicate to the caller that there
+		 * is no more extents to search.
+		 */
+		if (i == 0)
+			tcur->ar_blockcount = 0;
 	}
 
+	/*
+	 * If there was an error, release all the gathered busy extents because
+	 * we aren't going to issue a discard on them any more.
+	 */
+	if (error)
+		xfs_extent_busy_clear(mp, &extents->extent_list, false);
 out_del_cursor:
 	xfs_btree_del_cursor(cur, error);
 	xfs_buf_relse(agbp);
 	return error;
 }
 
+static bool
+xfs_trim_should_stop(void)
+{
+	return fatal_signal_pending(current) || freezing(current);
+}
+
+/*
+ * Iterate the free list gathering extents and discarding them. We need a cursor
+ * for the repeated iteration of gather/discard loop, so use the longest extent
+ * we found in the last batch as the key to start the next.
+ */
+static int
+xfs_trim_extents(
+	struct xfs_perag	*pag,
+	xfs_daddr_t		start,
+	xfs_daddr_t		end,
+	xfs_daddr_t		minlen,
+	uint64_t		*blocks_trimmed)
+{
+	struct xfs_alloc_rec_incore tcur = {
+		.ar_blockcount = pag->pagf_longest,
+		.ar_startblock = NULLAGBLOCK,
+	};
+	int			error = 0;
+
+	do {
+		struct xfs_busy_extents	*extents;
+
+		extents = kzalloc(sizeof(*extents), GFP_KERNEL);
+		if (!extents) {
+			error = -ENOMEM;
+			break;
+		}
+
+		extents->mount = pag->pag_mount;
+		extents->owner = extents;
+		INIT_LIST_HEAD(&extents->extent_list);
+
+		error = xfs_trim_gather_extents(pag, start, end, minlen,
+				&tcur, extents, blocks_trimmed);
+		if (error) {
+			kfree(extents);
+			break;
+		}
+
+		/*
+		 * We hand the extent list to the discard function here so the
+		 * discarded extents can be removed from the busy extent list.
+		 * This allows the discards to run asynchronously with gathering
+		 * the next round of extents to discard.
+		 *
+		 * However, we must ensure that we do not reference the extent
+		 * list  after this function call, as it may have been freed by
+		 * the time control returns to us.
+		 */
+		error = xfs_discard_extents(pag->pag_mount, extents);
+		if (error)
+			break;
+
+		if (xfs_trim_should_stop())
+			break;
+
+	} while (tcur.ar_blockcount != 0);
+
+	return error;
+
+}
+
 /*
  * trim a range of the filesystem.
  *
@@ -195,12 +413,12 @@ xfs_ioc_trim(
 	for_each_perag_range(mp, agno, xfs_daddr_to_agno(mp, end), pag) {
 		error = xfs_trim_extents(pag, start, end, minlen,
 					  &blocks_trimmed);
-		if (error) {
+		if (error)
 			last_error = error;
-			if (error == -ERESTARTSYS) {
-				xfs_perag_rele(pag);
-				break;
-			}
+
+		if (xfs_trim_should_stop()) {
+			xfs_perag_rele(pag);
+			break;
 		}
 	}
 
diff --git a/fs/xfs/xfs_discard.h b/fs/xfs/xfs_discard.h
index de92d9c..2b1a852 100644
--- a/fs/xfs/xfs_discard.h
+++ b/fs/xfs/xfs_discard.h
@@ -3,8 +3,10 @@
 #define XFS_DISCARD_H 1
 
 struct fstrim_range;
-struct list_head;
+struct xfs_mount;
+struct xfs_busy_extents;
 
-extern int	xfs_ioc_trim(struct xfs_mount *, struct fstrim_range __user *);
+int xfs_discard_extents(struct xfs_mount *mp, struct xfs_busy_extents *busy);
+int xfs_ioc_trim(struct xfs_mount *mp, struct fstrim_range __user *fstrim);
 
 #endif /* XFS_DISCARD_H */
diff --git a/fs/xfs/xfs_extent_busy.c b/fs/xfs/xfs_extent_busy.c
index 7c2fdc7..7468148 100644
--- a/fs/xfs/xfs_extent_busy.c
+++ b/fs/xfs/xfs_extent_busy.c
@@ -19,13 +19,13 @@
 #include "xfs_log.h"
 #include "xfs_ag.h"
 
-void
-xfs_extent_busy_insert(
-	struct xfs_trans	*tp,
+static void
+xfs_extent_busy_insert_list(
 	struct xfs_perag	*pag,
 	xfs_agblock_t		bno,
 	xfs_extlen_t		len,
-	unsigned int		flags)
+	unsigned int		flags,
+	struct list_head	*busy_list)
 {
 	struct xfs_extent_busy	*new;
 	struct xfs_extent_busy	*busyp;
@@ -40,7 +40,7 @@ xfs_extent_busy_insert(
 	new->flags = flags;
 
 	/* trace before insert to be able to see failed inserts */
-	trace_xfs_extent_busy(tp->t_mountp, pag->pag_agno, bno, len);
+	trace_xfs_extent_busy(pag->pag_mount, pag->pag_agno, bno, len);
 
 	spin_lock(&pag->pagb_lock);
 	rbp = &pag->pagb_tree.rb_node;
@@ -62,10 +62,32 @@ xfs_extent_busy_insert(
 	rb_link_node(&new->rb_node, parent, rbp);
 	rb_insert_color(&new->rb_node, &pag->pagb_tree);
 
-	list_add(&new->list, &tp->t_busy);
+	list_add(&new->list, busy_list);
 	spin_unlock(&pag->pagb_lock);
 }
 
+void
+xfs_extent_busy_insert(
+	struct xfs_trans	*tp,
+	struct xfs_perag	*pag,
+	xfs_agblock_t		bno,
+	xfs_extlen_t		len,
+	unsigned int		flags)
+{
+	xfs_extent_busy_insert_list(pag, bno, len, flags, &tp->t_busy);
+}
+
+void
+xfs_extent_busy_insert_discard(
+	struct xfs_perag	*pag,
+	xfs_agblock_t		bno,
+	xfs_extlen_t		len,
+	struct list_head	*busy_list)
+{
+	xfs_extent_busy_insert_list(pag, bno, len, XFS_EXTENT_BUSY_DISCARDED,
+			busy_list);
+}
+
 /*
  * Search for a busy extent within the range of the extent we are about to
  * allocate.  You need to be holding the busy extent tree lock when calling
diff --git a/fs/xfs/xfs_extent_busy.h b/fs/xfs/xfs_extent_busy.h
index c37bf87..0639aab 100644
--- a/fs/xfs/xfs_extent_busy.h
+++ b/fs/xfs/xfs_extent_busy.h
@@ -16,9 +16,6 @@ struct xfs_alloc_arg;
 /*
  * Busy block/extent entry.  Indexed by a rbtree in perag to mark blocks that
  * have been freed but whose transactions aren't committed to disk yet.
- *
- * Note that we use the transaction ID to record the transaction, not the
- * transaction structure itself. See xfs_extent_busy_insert() for details.
  */
 struct xfs_extent_busy {
 	struct rb_node	rb_node;	/* ag by-bno indexed search tree */
@@ -31,11 +28,32 @@ struct xfs_extent_busy {
 #define XFS_EXTENT_BUSY_SKIP_DISCARD	0x02	/* do not discard */
 };
 
+/*
+ * List used to track groups of related busy extents all the way through
+ * to discard completion.
+ */
+struct xfs_busy_extents {
+	struct xfs_mount	*mount;
+	struct list_head	extent_list;
+	struct work_struct	endio_work;
+
+	/*
+	 * Owner is the object containing the struct xfs_busy_extents to free
+	 * once the busy extents have been processed. If only the
+	 * xfs_busy_extents object needs freeing, then point this at itself.
+	 */
+	void			*owner;
+};
+
 void
 xfs_extent_busy_insert(struct xfs_trans *tp, struct xfs_perag *pag,
 	xfs_agblock_t bno, xfs_extlen_t len, unsigned int flags);
 
 void
+xfs_extent_busy_insert_discard(struct xfs_perag *pag, xfs_agblock_t bno,
+	xfs_extlen_t len, struct list_head *busy_list);
+
+void
 xfs_extent_busy_clear(struct xfs_mount *mp, struct list_head *list,
 	bool do_discard);
 
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
index ebc70aa..67a99d9 100644
--- a/fs/xfs/xfs_log_cil.c
+++ b/fs/xfs/xfs_log_cil.c
@@ -16,8 +16,7 @@
 #include "xfs_log.h"
 #include "xfs_log_priv.h"
 #include "xfs_trace.h"
-
-struct workqueue_struct *xfs_discard_wq;
+#include "xfs_discard.h"
 
 /*
  * Allocate a new ticket. Failing to get a new ticket makes it really hard to
@@ -103,7 +102,7 @@ xlog_cil_ctx_alloc(void)
 
 	ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
 	INIT_LIST_HEAD(&ctx->committing);
-	INIT_LIST_HEAD(&ctx->busy_extents);
+	INIT_LIST_HEAD(&ctx->busy_extents.extent_list);
 	INIT_LIST_HEAD(&ctx->log_items);
 	INIT_LIST_HEAD(&ctx->lv_chain);
 	INIT_WORK(&ctx->push_work, xlog_cil_push_work);
@@ -132,7 +131,7 @@ xlog_cil_push_pcp_aggregate(
 
 		if (!list_empty(&cilpcp->busy_extents)) {
 			list_splice_init(&cilpcp->busy_extents,
-					&ctx->busy_extents);
+					&ctx->busy_extents.extent_list);
 		}
 		if (!list_empty(&cilpcp->log_items))
 			list_splice_init(&cilpcp->log_items, &ctx->log_items);
@@ -708,76 +707,6 @@ xlog_cil_free_logvec(
 	}
 }
 
-static void
-xlog_discard_endio_work(
-	struct work_struct	*work)
-{
-	struct xfs_cil_ctx	*ctx =
-		container_of(work, struct xfs_cil_ctx, discard_endio_work);
-	struct xfs_mount	*mp = ctx->cil->xc_log->l_mp;
-
-	xfs_extent_busy_clear(mp, &ctx->busy_extents, false);
-	kmem_free(ctx);
-}
-
-/*
- * Queue up the actual completion to a thread to avoid IRQ-safe locking for
- * pagb_lock.  Note that we need a unbounded workqueue, otherwise we might
- * get the execution delayed up to 30 seconds for weird reasons.
- */
-static void
-xlog_discard_endio(
-	struct bio		*bio)
-{
-	struct xfs_cil_ctx	*ctx = bio->bi_private;
-
-	INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work);
-	queue_work(xfs_discard_wq, &ctx->discard_endio_work);
-	bio_put(bio);
-}
-
-static void
-xlog_discard_busy_extents(
-	struct xfs_mount	*mp,
-	struct xfs_cil_ctx	*ctx)
-{
-	struct list_head	*list = &ctx->busy_extents;
-	struct xfs_extent_busy	*busyp;
-	struct bio		*bio = NULL;
-	struct blk_plug		plug;
-	int			error = 0;
-
-	ASSERT(xfs_has_discard(mp));
-
-	blk_start_plug(&plug);
-	list_for_each_entry(busyp, list, list) {
-		trace_xfs_discard_extent(mp, busyp->agno, busyp->bno,
-					 busyp->length);
-
-		error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev,
-				XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno),
-				XFS_FSB_TO_BB(mp, busyp->length),
-				GFP_NOFS, &bio);
-		if (error && error != -EOPNOTSUPP) {
-			xfs_info(mp,
-	 "discard failed for extent [0x%llx,%u], error %d",
-				 (unsigned long long)busyp->bno,
-				 busyp->length,
-				 error);
-			break;
-		}
-	}
-
-	if (bio) {
-		bio->bi_private = ctx;
-		bio->bi_end_io = xlog_discard_endio;
-		submit_bio(bio);
-	} else {
-		xlog_discard_endio_work(&ctx->discard_endio_work);
-	}
-	blk_finish_plug(&plug);
-}
-
 /*
  * Mark all items committed and clear busy extents. We free the log vector
  * chains in a separate pass so that we unpin the log items as quickly as
@@ -807,8 +736,8 @@ xlog_cil_committed(
 	xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, &ctx->lv_chain,
 					ctx->start_lsn, abort);
 
-	xfs_extent_busy_sort(&ctx->busy_extents);
-	xfs_extent_busy_clear(mp, &ctx->busy_extents,
+	xfs_extent_busy_sort(&ctx->busy_extents.extent_list);
+	xfs_extent_busy_clear(mp, &ctx->busy_extents.extent_list,
 			      xfs_has_discard(mp) && !abort);
 
 	spin_lock(&ctx->cil->xc_push_lock);
@@ -817,10 +746,14 @@ xlog_cil_committed(
 
 	xlog_cil_free_logvec(&ctx->lv_chain);
 
-	if (!list_empty(&ctx->busy_extents))
-		xlog_discard_busy_extents(mp, ctx);
-	else
-		kmem_free(ctx);
+	if (!list_empty(&ctx->busy_extents.extent_list)) {
+		ctx->busy_extents.mount = mp;
+		ctx->busy_extents.owner = ctx;
+		xfs_discard_extents(mp, &ctx->busy_extents);
+		return;
+	}
+
+	kmem_free(ctx);
 }
 
 void
diff --git a/fs/xfs/xfs_log_priv.h b/fs/xfs/xfs_log_priv.h
index af87648..fa3ad1d 100644
--- a/fs/xfs/xfs_log_priv.h
+++ b/fs/xfs/xfs_log_priv.h
@@ -6,6 +6,8 @@
 #ifndef	__XFS_LOG_PRIV_H__
 #define __XFS_LOG_PRIV_H__
 
+#include "xfs_extent_busy.h"	/* for struct xfs_busy_extents */
+
 struct xfs_buf;
 struct xlog;
 struct xlog_ticket;
@@ -223,12 +225,11 @@ struct xfs_cil_ctx {
 	struct xlog_in_core	*commit_iclog;
 	struct xlog_ticket	*ticket;	/* chkpt ticket */
 	atomic_t		space_used;	/* aggregate size of regions */
-	struct list_head	busy_extents;	/* busy extents in chkpt */
+	struct xfs_busy_extents	busy_extents;
 	struct list_head	log_items;	/* log items in chkpt */
 	struct list_head	lv_chain;	/* logvecs being pushed */
 	struct list_head	iclog_entry;
 	struct list_head	committing;	/* ctx committing list */
-	struct work_struct	discard_endio_work;
 	struct work_struct	push_work;
 	atomic_t		order_id;
 
