Merge tag 'nfs-for-5.3-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs

Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Stable fixes:

   - SUNRPC: Ensure bvecs are re-synced when we re-encode the RPC
     request

   - Fix an Oops in ff_layout_track_ds_error due to a PTR_ERR()
     dereference

   - Revert buggy NFS readdirplus optimisation

   - NFSv4: Handle the special Linux file open access mode

   - pnfs: Fix a problem where we gratuitously start doing I/O through
     the MDS

  Features:

   - Allow NFS client to set up multiple TCP connections to the server
     using a new 'nconnect=X' mount option. Queue length is used to
     balance load.

   - Enhance statistics reporting to report on all transports when using
     multiple connections.

   - Speed up SUNRPC by removing bh-safe spinlocks

   - Add a mechanism to allow NFSv4 to request that containers set a
     unique per-host identifier for when the hostname is not set.

   - Ensure NFSv4 updates the lease_time after a clientid update

  Bugfixes and cleanup:

   - Fix use-after-free in rpcrdma_post_recvs

   - Fix a memory leak when nfs_match_client() is interrupted

   - Fix buggy file access checking in NFSv4 open for execute

   - disable unsupported client side deduplication

   - Fix spurious client disconnections

   - Fix occasional RDMA transport deadlock

   - Various RDMA cleanups

   - Various tracepoint fixes

   - Fix the TCP callback channel to guarantee the server can actually
     send the number of callback requests that was negotiated at mount
     time"

* tag 'nfs-for-5.3-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (68 commits)
  pnfs/flexfiles: Add tracepoints for detecting pnfs fallback to MDS
  pnfs: Fix a problem where we gratuitously start doing I/O through the MDS
  SUNRPC: Optimise transport balancing code
  SUNRPC: Ensure the bvecs are reset when we re-encode the RPC request
  pnfs/flexfiles: Fix PTR_ERR() dereferences in ff_layout_track_ds_error
  NFSv4: Don't use the zero stateid with layoutget
  SUNRPC: Fix up backchannel slot table accounting
  SUNRPC: Fix initialisation of struct rpc_xprt_switch
  SUNRPC: Skip zero-refcount transports
  SUNRPC: Replace division by multiplication in calculation of queue length
  NFSv4: Validate the stateid before applying it to state recovery
  nfs4.0: Refetch lease_time after clientid update
  nfs4: Rename nfs41_setup_state_renewal
  nfs4: Make nfs4_proc_get_lease_time available for nfs4.0
  nfs: Fix copy-and-paste error in debug message
  NFS: Replace 16 seq_printf() calls by seq_puts()
  NFS: Use seq_putc() in nfs_show_stats()
  Revert "NFS: readdirplus optimization by cache mechanism" (memleak)
  SUNRPC: Fix transport accounting when caller specifies an rpc_xprt
  NFS: Record task, client ID, and XID in xdr_status trace points
  ...
diff --git a/fs/nfs/Makefile b/fs/nfs/Makefile
index c587e3c..34cdeae 100644
--- a/fs/nfs/Makefile
+++ b/fs/nfs/Makefile
@@ -8,7 +8,8 @@
 CFLAGS_nfstrace.o += -I$(src)
 nfs-y 			:= client.o dir.o file.o getroot.o inode.o super.o \
 			   io.o direct.o pagelist.o read.o symlink.o unlink.o \
-			   write.o namespace.o mount_clnt.o nfstrace.o export.o
+			   write.o namespace.o mount_clnt.o nfstrace.o \
+			   export.o sysfs.o
 nfs-$(CONFIG_ROOT_NFS)	+= nfsroot.o
 nfs-$(CONFIG_SYSCTL)	+= sysctl.o
 nfs-$(CONFIG_NFS_FSCACHE) += fscache.o fscache-index.o
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 3159673..f39924b 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -414,27 +414,39 @@
 validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot,
 		const struct cb_sequenceargs * args)
 {
+	__be32 ret;
+
+	ret = cpu_to_be32(NFS4ERR_BADSLOT);
 	if (args->csa_slotid > tbl->server_highest_slotid)
-		return htonl(NFS4ERR_BADSLOT);
+		goto out_err;
 
 	/* Replay */
 	if (args->csa_sequenceid == slot->seq_nr) {
+		ret = cpu_to_be32(NFS4ERR_DELAY);
 		if (nfs4_test_locked_slot(tbl, slot->slot_nr))
-			return htonl(NFS4ERR_DELAY);
+			goto out_err;
+
 		/* Signal process_op to set this error on next op */
+		ret = cpu_to_be32(NFS4ERR_RETRY_UNCACHED_REP);
 		if (args->csa_cachethis == 0)
-			return htonl(NFS4ERR_RETRY_UNCACHED_REP);
+			goto out_err;
 
 		/* Liar! We never allowed you to set csa_cachethis != 0 */
-		return htonl(NFS4ERR_SEQ_FALSE_RETRY);
+		ret = cpu_to_be32(NFS4ERR_SEQ_FALSE_RETRY);
+		goto out_err;
 	}
 
 	/* Note: wraparound relies on seq_nr being of type u32 */
-	if (likely(args->csa_sequenceid == slot->seq_nr + 1))
-		return htonl(NFS4_OK);
-
 	/* Misordered request */
-	return htonl(NFS4ERR_SEQ_MISORDERED);
+	ret = cpu_to_be32(NFS4ERR_SEQ_MISORDERED);
+	if (args->csa_sequenceid != slot->seq_nr + 1)
+		goto out_err;
+
+	return cpu_to_be32(NFS4_OK);
+
+out_err:
+	trace_nfs4_cb_seqid_err(args, ret);
+	return ret;
 }
 
 /*
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index d7e4f08..3083830 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -49,6 +49,7 @@
 #include "pnfs.h"
 #include "nfs.h"
 #include "netns.h"
+#include "sysfs.h"
 
 #define NFSDBG_FACILITY		NFSDBG_CLIENT
 
@@ -175,6 +176,7 @@
 	clp->cl_rpcclient = ERR_PTR(-EINVAL);
 
 	clp->cl_proto = cl_init->proto;
+	clp->cl_nconnect = cl_init->nconnect;
 	clp->cl_net = get_net(cl_init->net);
 
 	clp->cl_principal = "*";
@@ -192,7 +194,7 @@
 EXPORT_SYMBOL_GPL(nfs_alloc_client);
 
 #if IS_ENABLED(CONFIG_NFS_V4)
-void nfs_cleanup_cb_ident_idr(struct net *net)
+static void nfs_cleanup_cb_ident_idr(struct net *net)
 {
 	struct nfs_net *nn = net_generic(net, nfs_net_id);
 
@@ -214,7 +216,7 @@
 }
 
 #else
-void nfs_cleanup_cb_ident_idr(struct net *net)
+static void nfs_cleanup_cb_ident_idr(struct net *net)
 {
 }
 
@@ -406,10 +408,10 @@
 		clp = nfs_match_client(cl_init);
 		if (clp) {
 			spin_unlock(&nn->nfs_client_lock);
-			if (IS_ERR(clp))
-				return clp;
 			if (new)
 				new->rpc_ops->free_client(new);
+			if (IS_ERR(clp))
+				return clp;
 			return nfs_found_client(cl_init, clp);
 		}
 		if (new) {
@@ -493,6 +495,7 @@
 	struct rpc_create_args args = {
 		.net		= clp->cl_net,
 		.protocol	= clp->cl_proto,
+		.nconnect	= clp->cl_nconnect,
 		.address	= (struct sockaddr *)&clp->cl_addr,
 		.addrsize	= clp->cl_addrlen,
 		.timeout	= cl_init->timeparms,
@@ -658,6 +661,7 @@
 		.net = data->net,
 		.timeparms = &timeparms,
 		.cred = server->cred,
+		.nconnect = data->nfs_server.nconnect,
 	};
 	struct nfs_client *clp;
 	int error;
@@ -1072,6 +1076,18 @@
 #endif
 	spin_lock_init(&nn->nfs_client_lock);
 	nn->boot_time = ktime_get_real();
+
+	nfs_netns_sysfs_setup(nn, net);
+}
+
+void nfs_clients_exit(struct net *net)
+{
+	struct nfs_net *nn = net_generic(net, nfs_net_id);
+
+	nfs_netns_sysfs_destroy(nn);
+	nfs_cleanup_cb_ident_idr(net);
+	WARN_ON_ONCE(!list_empty(&nn->nfs_client_list));
+	WARN_ON_ONCE(!list_empty(&nn->nfs_volume_list));
 }
 
 #ifdef CONFIG_PROC_FS
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index 57b6a45..8d50109 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -80,6 +80,10 @@
 		ctx->dup_cookie = 0;
 		ctx->cred = get_cred(cred);
 		spin_lock(&dir->i_lock);
+		if (list_empty(&nfsi->open_files) &&
+		    (nfsi->cache_validity & NFS_INO_DATA_INVAL_DEFER))
+			nfsi->cache_validity |= NFS_INO_INVALID_DATA |
+				NFS_INO_REVAL_FORCED;
 		list_add(&ctx->list, &nfsi->open_files);
 		spin_unlock(&dir->i_lock);
 		return ctx;
@@ -140,19 +144,12 @@
 	struct nfs_cache_array_entry array[0];
 };
 
-struct readdirvec {
-	unsigned long nr;
-	unsigned long index;
-	struct page *pages[NFS_MAX_READDIR_RAPAGES];
-};
-
 typedef int (*decode_dirent_t)(struct xdr_stream *, struct nfs_entry *, bool);
 typedef struct {
 	struct file	*file;
 	struct page	*page;
 	struct dir_context *ctx;
 	unsigned long	page_index;
-	struct readdirvec pvec;
 	u64		*dir_cookie;
 	u64		last_cookie;
 	loff_t		current_index;
@@ -532,10 +529,6 @@
 	struct nfs_cache_array *array;
 	unsigned int count = 0;
 	int status;
-	int max_rapages = NFS_MAX_READDIR_RAPAGES;
-
-	desc->pvec.index = desc->page_index;
-	desc->pvec.nr = 0;
 
 	scratch = alloc_page(GFP_KERNEL);
 	if (scratch == NULL)
@@ -560,40 +553,20 @@
 		if (desc->plus)
 			nfs_prime_dcache(file_dentry(desc->file), entry);
 
-		status = nfs_readdir_add_to_array(entry, desc->pvec.pages[desc->pvec.nr]);
-		if (status == -ENOSPC) {
-			desc->pvec.nr++;
-			if (desc->pvec.nr == max_rapages)
-				break;
-			status = nfs_readdir_add_to_array(entry, desc->pvec.pages[desc->pvec.nr]);
-		}
+		status = nfs_readdir_add_to_array(entry, page);
 		if (status != 0)
 			break;
 	} while (!entry->eof);
 
-	/*
-	 * page and desc->pvec.pages[0] are valid, don't need to check
-	 * whether or not to be NULL.
-	 */
-	copy_highpage(page, desc->pvec.pages[0]);
-
 out_nopages:
 	if (count == 0 || (status == -EBADCOOKIE && entry->eof != 0)) {
-		array = kmap_atomic(desc->pvec.pages[desc->pvec.nr]);
+		array = kmap(page);
 		array->eof_index = array->size;
 		status = 0;
-		kunmap_atomic(array);
+		kunmap(page);
 	}
 
 	put_page(scratch);
-
-	/*
-	 * desc->pvec.nr > 0 means at least one page was completely filled,
-	 * we should return -ENOSPC. Otherwise function
-	 * nfs_readdir_xdr_to_array will enter infinite loop.
-	 */
-	if (desc->pvec.nr > 0)
-		return -ENOSPC;
 	return status;
 }
 
@@ -627,24 +600,6 @@
 	return -ENOMEM;
 }
 
-/*
- * nfs_readdir_rapages_init initialize rapages by nfs_cache_array structure.
- */
-static
-void nfs_readdir_rapages_init(nfs_readdir_descriptor_t *desc)
-{
-	struct nfs_cache_array *array;
-	int max_rapages = NFS_MAX_READDIR_RAPAGES;
-	int index;
-
-	for (index = 0; index < max_rapages; index++) {
-		array = kmap_atomic(desc->pvec.pages[index]);
-		memset(array, 0, sizeof(struct nfs_cache_array));
-		array->eof_index = -1;
-		kunmap_atomic(array);
-	}
-}
-
 static
 int nfs_readdir_xdr_to_array(nfs_readdir_descriptor_t *desc, struct page *page, struct inode *inode)
 {
@@ -655,12 +610,6 @@
 	int status = -ENOMEM;
 	unsigned int array_size = ARRAY_SIZE(pages);
 
-	/*
-	 * This means we hit readdir rdpages miss, the preallocated rdpages
-	 * are useless, the preallocate rdpages should be reinitialized.
-	 */
-	nfs_readdir_rapages_init(desc);
-
 	entry.prev_cookie = 0;
 	entry.cookie = desc->last_cookie;
 	entry.eof = 0;
@@ -721,24 +670,9 @@
 	struct inode	*inode = file_inode(desc->file);
 	int ret;
 
-	/*
-	 * If desc->page_index in range desc->pvec.index and
-	 * desc->pvec.index + desc->pvec.nr, we get readdir cache hit.
-	 */
-	if (desc->page_index >= desc->pvec.index &&
-		desc->page_index < (desc->pvec.index + desc->pvec.nr)) {
-		/*
-		 * page and desc->pvec.pages[x] are valid, don't need to check
-		 * whether or not to be NULL.
-		 */
-		copy_highpage(page, desc->pvec.pages[desc->page_index - desc->pvec.index]);
-		ret = 0;
-	} else {
-		ret = nfs_readdir_xdr_to_array(desc, page, inode);
-		if (ret < 0)
-			goto error;
-	}
-
+	ret = nfs_readdir_xdr_to_array(desc, page, inode);
+	if (ret < 0)
+		goto error;
 	SetPageUptodate(page);
 
 	if (invalidate_inode_pages2_range(inode->i_mapping, page->index + 1, -1) < 0) {
@@ -903,7 +837,6 @@
 			*desc = &my_desc;
 	struct nfs_open_dir_context *dir_ctx = file->private_data;
 	int res = 0;
-	int max_rapages = NFS_MAX_READDIR_RAPAGES;
 
 	dfprintk(FILE, "NFS: readdir(%pD2) starting at cookie %llu\n",
 			file, (long long)ctx->pos);
@@ -923,12 +856,6 @@
 	desc->decode = NFS_PROTO(inode)->decode_dirent;
 	desc->plus = nfs_use_readdirplus(inode, ctx);
 
-	res = nfs_readdir_alloc_pages(desc->pvec.pages, max_rapages);
-	if (res < 0)
-		return -ENOMEM;
-
-	nfs_readdir_rapages_init(desc);
-
 	if (ctx->pos == 0 || nfs_attribute_cache_expired(inode))
 		res = nfs_revalidate_mapping(inode, file->f_mapping);
 	if (res < 0)
@@ -964,7 +891,6 @@
 			break;
 	} while (!desc->eof);
 out:
-	nfs_readdir_free_pages(desc->pvec.pages, max_rapages);
 	if (res > 0)
 		res = 0;
 	dfprintk(FILE, "NFS: readdir(%pD2) returns %d\n", file, res);
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index bcff3bf..b04e20d 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -934,6 +934,10 @@
 	if (pgio->pg_error < 0)
 		return;
 out_mds:
+	trace_pnfs_mds_fallback_pg_init_read(pgio->pg_inode,
+			0, NFS4_MAX_UINT64, IOMODE_READ,
+			NFS_I(pgio->pg_inode)->layout,
+			pgio->pg_lseg);
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = NULL;
 	nfs_pageio_reset_read_mds(pgio);
@@ -1000,6 +1004,10 @@
 	return;
 
 out_mds:
+	trace_pnfs_mds_fallback_pg_init_write(pgio->pg_inode,
+			0, NFS4_MAX_UINT64, IOMODE_RW,
+			NFS_I(pgio->pg_inode)->layout,
+			pgio->pg_lseg);
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = NULL;
 	nfs_pageio_reset_write_mds(pgio);
@@ -1026,6 +1034,10 @@
 	if (pgio->pg_lseg)
 		return FF_LAYOUT_MIRROR_COUNT(pgio->pg_lseg);
 
+	trace_pnfs_mds_fallback_pg_get_mirror_count(pgio->pg_inode,
+			0, NFS4_MAX_UINT64, IOMODE_RW,
+			NFS_I(pgio->pg_inode)->layout,
+			pgio->pg_lseg);
 	/* no lseg means that pnfs is not in use, so no mirroring here */
 	nfs_pageio_reset_write_mds(pgio);
 out:
@@ -1075,6 +1087,10 @@
 			hdr->args.count,
 			(unsigned long long)hdr->args.offset);
 
+		trace_pnfs_mds_fallback_write_done(hdr->inode,
+				hdr->args.offset, hdr->args.count,
+				IOMODE_RW, NFS_I(hdr->inode)->layout,
+				hdr->lseg);
 		task->tk_status = pnfs_write_done_resend_to_mds(hdr);
 	}
 }
@@ -1094,6 +1110,10 @@
 			hdr->args.count,
 			(unsigned long long)hdr->args.offset);
 
+		trace_pnfs_mds_fallback_read_done(hdr->inode,
+				hdr->args.offset, hdr->args.count,
+				IOMODE_READ, NFS_I(hdr->inode)->layout,
+				hdr->lseg);
 		task->tk_status = pnfs_read_done_resend_to_mds(hdr);
 	}
 }
@@ -1827,6 +1847,9 @@
 out_failed:
 	if (ff_layout_avoid_mds_available_ds(lseg))
 		return PNFS_TRY_AGAIN;
+	trace_pnfs_mds_fallback_read_pagelist(hdr->inode,
+			hdr->args.offset, hdr->args.count,
+			IOMODE_READ, NFS_I(hdr->inode)->layout, lseg);
 	return PNFS_NOT_ATTEMPTED;
 }
 
@@ -1892,6 +1915,9 @@
 out_failed:
 	if (ff_layout_avoid_mds_available_ds(lseg))
 		return PNFS_TRY_AGAIN;
+	trace_pnfs_mds_fallback_write_pagelist(hdr->inode,
+			hdr->args.offset, hdr->args.count,
+			IOMODE_RW, NFS_I(hdr->inode)->layout, lseg);
 	return PNFS_NOT_ATTEMPTED;
 }
 
diff --git a/fs/nfs/flexfilelayout/flexfilelayoutdev.c b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
index 19f856f..3eda40a 100644
--- a/fs/nfs/flexfilelayout/flexfilelayoutdev.c
+++ b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
@@ -257,7 +257,7 @@
 	if (status == 0)
 		return 0;
 
-	if (mirror->mirror_ds == NULL)
+	if (IS_ERR_OR_NULL(mirror->mirror_ds))
 		return -EINVAL;
 
 	dserr = kmalloc(sizeof(*dserr), gfp_flags);
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 0b4a1a9..8a17582 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -51,6 +51,7 @@
 #include "pnfs.h"
 #include "nfs.h"
 #include "netns.h"
+#include "sysfs.h"
 
 #include "nfstrace.h"
 
@@ -208,7 +209,7 @@
 	}
 
 	if (inode->i_mapping->nrpages == 0)
-		flags &= ~NFS_INO_INVALID_DATA;
+		flags &= ~(NFS_INO_INVALID_DATA|NFS_INO_DATA_INVAL_DEFER);
 	nfsi->cache_validity |= flags;
 	if (flags & NFS_INO_INVALID_DATA)
 		nfs_fscache_invalidate(inode);
@@ -652,7 +653,8 @@
 	i_size_write(inode, offset);
 	/* Optimisation */
 	if (offset == 0)
-		NFS_I(inode)->cache_validity &= ~NFS_INO_INVALID_DATA;
+		NFS_I(inode)->cache_validity &= ~(NFS_INO_INVALID_DATA |
+				NFS_INO_DATA_INVAL_DEFER);
 	NFS_I(inode)->cache_validity &= ~NFS_INO_INVALID_SIZE;
 
 	spin_unlock(&inode->i_lock);
@@ -1032,6 +1034,10 @@
 	struct nfs_inode *nfsi = NFS_I(inode);
 
 	spin_lock(&inode->i_lock);
+	if (list_empty(&nfsi->open_files) &&
+	    (nfsi->cache_validity & NFS_INO_DATA_INVAL_DEFER))
+		nfsi->cache_validity |= NFS_INO_INVALID_DATA |
+			NFS_INO_REVAL_FORCED;
 	list_add_tail_rcu(&ctx->list, &nfsi->open_files);
 	spin_unlock(&inode->i_lock);
 }
@@ -1100,6 +1106,7 @@
 	nfs_fscache_open_file(inode, filp);
 	return 0;
 }
+EXPORT_SYMBOL_GPL(nfs_open);
 
 /*
  * This function is called whenever some part of NFS notices that
@@ -1312,7 +1319,8 @@
 
 	set_bit(NFS_INO_INVALIDATING, bitlock);
 	smp_wmb();
-	nfsi->cache_validity &= ~NFS_INO_INVALID_DATA;
+	nfsi->cache_validity &= ~(NFS_INO_INVALID_DATA|
+			NFS_INO_DATA_INVAL_DEFER);
 	spin_unlock(&inode->i_lock);
 	trace_nfs_invalidate_mapping_enter(inode);
 	ret = nfs_invalidate_mapping(inode, mapping);
@@ -1870,7 +1878,8 @@
 				dprintk("NFS: change_attr change on server for file %s/%ld\n",
 						inode->i_sb->s_id,
 						inode->i_ino);
-			}
+			} else if (!have_delegation)
+				nfsi->cache_validity |= NFS_INO_DATA_INVAL_DEFER;
 			inode_set_iversion_raw(inode, fattr->change_attr);
 			attr_changed = true;
 		}
@@ -2159,12 +2168,8 @@
 
 static void nfs_net_exit(struct net *net)
 {
-	struct nfs_net *nn = net_generic(net, nfs_net_id);
-
 	nfs_fs_proc_net_exit(net);
-	nfs_cleanup_cb_ident_idr(net);
-	WARN_ON_ONCE(!list_empty(&nn->nfs_client_list));
-	WARN_ON_ONCE(!list_empty(&nn->nfs_volume_list));
+	nfs_clients_exit(net);
 }
 
 static struct pernet_operations nfs_net_ops = {
@@ -2181,6 +2186,10 @@
 {
 	int err;
 
+	err = nfs_sysfs_init();
+	if (err < 0)
+		goto out10;
+
 	err = register_pernet_subsys(&nfs_net_ops);
 	if (err < 0)
 		goto out9;
@@ -2244,6 +2253,8 @@
 out8:
 	unregister_pernet_subsys(&nfs_net_ops);
 out9:
+	nfs_sysfs_exit();
+out10:
 	return err;
 }
 
@@ -2260,6 +2271,7 @@
 	unregister_nfs_fs();
 	nfs_fs_proc_exit();
 	nfsiod_stop();
+	nfs_sysfs_exit();
 }
 
 /* Not quite true; I just maintain it */
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 498fab7..a2346a2 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -69,8 +69,7 @@
  * Maximum number of pages that readdir can use for creating
  * a vmapped array of pages.
  */
-#define NFS_MAX_READDIR_PAGES 64
-#define NFS_MAX_READDIR_RAPAGES 8
+#define NFS_MAX_READDIR_PAGES 8
 
 struct nfs_client_initdata {
 	unsigned long init_flags;
@@ -82,6 +81,7 @@
 	struct nfs_subversion *nfs_mod;
 	int proto;
 	u32 minorversion;
+	unsigned int nconnect;
 	struct net *net;
 	const struct rpc_timeout *timeparms;
 	const struct cred *cred;
@@ -123,6 +123,7 @@
 		char			*export_path;
 		int			port;
 		unsigned short		protocol;
+		unsigned short		nconnect;
 	} nfs_server;
 
 	void			*lsm_opts;
@@ -158,6 +159,7 @@
 /* client.c */
 extern const struct rpc_program nfs_program;
 extern void nfs_clients_init(struct net *net);
+extern void nfs_clients_exit(struct net *net);
 extern struct nfs_client *nfs_alloc_client(const struct nfs_client_initdata *);
 int nfs_create_rpc_client(struct nfs_client *, const struct nfs_client_initdata *, rpc_authflavor_t);
 struct nfs_client *nfs_get_client(const struct nfs_client_initdata *);
@@ -170,7 +172,6 @@
 struct nfs_server *nfs_alloc_server(void);
 void nfs_server_copy_userdata(struct nfs_server *, struct nfs_server *);
 
-extern void nfs_cleanup_cb_ident_idr(struct net *);
 extern void nfs_put_client(struct nfs_client *);
 extern void nfs_free_client(struct nfs_client *);
 extern struct nfs_client *nfs4_find_client_ident(struct net *, int);
diff --git a/fs/nfs/netns.h b/fs/nfs/netns.h
index fc9978c..c8374f7 100644
--- a/fs/nfs/netns.h
+++ b/fs/nfs/netns.h
@@ -15,6 +15,8 @@
 	uint32_t major, minor;
 };
 
+struct nfs_netns_client;
+
 struct nfs_net {
 	struct cache_detail *nfs_dns_resolve;
 	struct rpc_pipe *bl_device_pipe;
@@ -29,6 +31,7 @@
 	unsigned short nfs_callback_tcpport6;
 	int cb_users[NFS4_MAX_MINOR_VERSION + 1];
 #endif
+	struct nfs_netns_client *nfs_client;
 	spinlock_t nfs_client_lock;
 	ktime_t boot_time;
 #ifdef CONFIG_PROC_FS
diff --git a/fs/nfs/nfs2xdr.c b/fs/nfs/nfs2xdr.c
index 572794d..cbc17a2 100644
--- a/fs/nfs/nfs2xdr.c
+++ b/fs/nfs/nfs2xdr.c
@@ -151,7 +151,7 @@
 	return 0;
 out_status:
 	*status = be32_to_cpup(p);
-	trace_nfs_xdr_status((int)*status);
+	trace_nfs_xdr_status(xdr, (int)*status);
 	return 0;
 }
 
diff --git a/fs/nfs/nfs3client.c b/fs/nfs/nfs3client.c
index fb0c425..148ceb7 100644
--- a/fs/nfs/nfs3client.c
+++ b/fs/nfs/nfs3client.c
@@ -102,6 +102,9 @@
 		return ERR_PTR(-EINVAL);
 	cl_init.hostname = buf;
 
+	if (mds_clp->cl_nconnect > 1 && ds_proto == XPRT_TRANSPORT_TCP)
+		cl_init.nconnect = mds_clp->cl_nconnect;
+
 	if (mds_srv->flags & NFS_MOUNT_NORESVPORT)
 		set_bit(NFS_CS_NORESVPORT, &cl_init.init_flags);
 
diff --git a/fs/nfs/nfs3xdr.c b/fs/nfs/nfs3xdr.c
index abbbdde..6027678 100644
--- a/fs/nfs/nfs3xdr.c
+++ b/fs/nfs/nfs3xdr.c
@@ -343,7 +343,7 @@
 	return 0;
 out_status:
 	*status = be32_to_cpup(p);
-	trace_nfs_xdr_status((int)*status);
+	trace_nfs_xdr_status(xdr, (int)*status);
 	return 0;
 }
 
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 8a38a25..d778dad 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -312,12 +312,12 @@
 		const struct nfs_lock_context *l_ctx,
 		fmode_t fmode);
 
+extern int nfs4_proc_get_lease_time(struct nfs_client *clp,
+		struct nfs_fsinfo *fsinfo);
 #if defined(CONFIG_NFS_V4_1)
 extern int nfs41_sequence_done(struct rpc_task *, struct nfs4_sequence_res *);
 extern int nfs4_proc_create_session(struct nfs_client *, const struct cred *);
 extern int nfs4_proc_destroy_session(struct nfs4_session *, const struct cred *);
-extern int nfs4_proc_get_lease_time(struct nfs_client *clp,
-		struct nfs_fsinfo *fsinfo);
 extern int nfs4_proc_layoutcommit(struct nfs4_layoutcommit_data *data,
 				  bool sync);
 extern int nfs4_detect_session_trunking(struct nfs_client *clp,
diff --git a/fs/nfs/nfs4client.c b/fs/nfs/nfs4client.c
index 81b9b6d..616393a 100644
--- a/fs/nfs/nfs4client.c
+++ b/fs/nfs/nfs4client.c
@@ -859,7 +859,8 @@
 		const size_t addrlen,
 		const char *ip_addr,
 		int proto, const struct rpc_timeout *timeparms,
-		u32 minorversion, struct net *net)
+		u32 minorversion, unsigned int nconnect,
+		struct net *net)
 {
 	struct nfs_client_initdata cl_init = {
 		.hostname = hostname,
@@ -875,6 +876,8 @@
 	};
 	struct nfs_client *clp;
 
+	if (minorversion > 0 && proto == XPRT_TRANSPORT_TCP)
+		cl_init.nconnect = nconnect;
 	if (server->flags & NFS_MOUNT_NORESVPORT)
 		set_bit(NFS_CS_NORESVPORT, &cl_init.init_flags);
 	if (server->options & NFS_OPTION_MIGRATION)
@@ -941,6 +944,9 @@
 		return ERR_PTR(-EINVAL);
 	cl_init.hostname = buf;
 
+	if (mds_clp->cl_nconnect > 1 && ds_proto == XPRT_TRANSPORT_TCP)
+		cl_init.nconnect = mds_clp->cl_nconnect;
+
 	if (mds_srv->flags & NFS_MOUNT_NORESVPORT)
 		__set_bit(NFS_CS_NORESVPORT, &cl_init.init_flags);
 
@@ -1074,6 +1080,7 @@
 			data->nfs_server.protocol,
 			&timeparms,
 			data->minorversion,
+			data->nfs_server.nconnect,
 			data->net);
 	if (error < 0)
 		return error;
@@ -1163,6 +1170,7 @@
 				XPRT_TRANSPORT_RDMA,
 				parent_server->client->cl_timeout,
 				parent_client->cl_mvops->minor_version,
+				parent_client->cl_nconnect,
 				parent_client->cl_net);
 	if (!error)
 		goto init_server;
@@ -1176,6 +1184,7 @@
 				XPRT_TRANSPORT_TCP,
 				parent_server->client->cl_timeout,
 				parent_client->cl_mvops->minor_version,
+				parent_client->cl_nconnect,
 				parent_client->cl_net);
 	if (error < 0)
 		goto error;
@@ -1271,7 +1280,8 @@
 	set_bit(NFS_MIG_TSM_POSSIBLE, &server->mig_status);
 	error = nfs4_set_client(server, hostname, sap, salen, buf,
 				clp->cl_proto, clnt->cl_timeout,
-				clp->cl_minorversion, net);
+				clp->cl_minorversion,
+				clp->cl_nconnect, net);
 	clear_bit(NFS_MIG_TSM_POSSIBLE, &server->mig_status);
 	if (error != 0) {
 		nfs_server_insert_lists(server);
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index f4157eb..96db471 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -49,7 +49,7 @@
 		return err;
 
 	if ((openflags & O_ACCMODE) == 3)
-		openflags--;
+		return nfs_open(inode, filp);
 
 	/* We can't create new files here */
 	openflags &= ~(O_CREAT|O_EXCL);
@@ -204,7 +204,11 @@
 	bool same_inode = false;
 	int ret;
 
-	if (remap_flags & ~(REMAP_FILE_DEDUP | REMAP_FILE_ADVISORY))
+	/* NFS does not support deduplication. */
+	if (remap_flags & REMAP_FILE_DEDUP)
+		return -EOPNOTSUPP;
+
+	if (remap_flags & ~REMAP_FILE_ADVISORY)
 		return -EINVAL;
 
 	/* check alignment w.r.t. clone_blksize */
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 6418cb6..39896af 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -428,6 +428,22 @@
 	return nfs4_delay_killable(timeout);
 }
 
+static const nfs4_stateid *
+nfs4_recoverable_stateid(const nfs4_stateid *stateid)
+{
+	if (!stateid)
+		return NULL;
+	switch (stateid->type) {
+	case NFS4_OPEN_STATEID_TYPE:
+	case NFS4_LOCK_STATEID_TYPE:
+	case NFS4_DELEGATION_STATEID_TYPE:
+		return stateid;
+	default:
+		break;
+	}
+	return NULL;
+}
+
 /* This is the error handling routine for processes that are allowed
  * to sleep.
  */
@@ -436,7 +452,7 @@
 {
 	struct nfs_client *clp = server->nfs_client;
 	struct nfs4_state *state = exception->state;
-	const nfs4_stateid *stateid = exception->stateid;
+	const nfs4_stateid *stateid;
 	struct inode *inode = exception->inode;
 	int ret = errorcode;
 
@@ -444,8 +460,9 @@
 	exception->recovering = 0;
 	exception->retry = 0;
 
+	stateid = nfs4_recoverable_stateid(exception->stateid);
 	if (stateid == NULL && state != NULL)
-		stateid = &state->stateid;
+		stateid = nfs4_recoverable_stateid(&state->stateid);
 
 	switch(errorcode) {
 		case 0:
@@ -1165,6 +1182,18 @@
 	return true;
 }
 
+static fmode_t _nfs4_ctx_to_accessmode(const struct nfs_open_context *ctx)
+{
+	 return ctx->mode & (FMODE_READ|FMODE_WRITE|FMODE_EXEC);
+}
+
+static fmode_t _nfs4_ctx_to_openmode(const struct nfs_open_context *ctx)
+{
+	fmode_t ret = ctx->mode & (FMODE_READ|FMODE_WRITE);
+
+	return (ctx->mode & FMODE_EXEC) ? FMODE_READ | ret : ret;
+}
+
 static u32
 nfs4_map_atomic_open_share(struct nfs_server *server,
 		fmode_t fmode, int openflags)
@@ -2900,14 +2929,13 @@
 }
 
 static int _nfs4_open_and_get_state(struct nfs4_opendata *opendata,
-		fmode_t fmode,
-		int flags,
-		struct nfs_open_context *ctx)
+		int flags, struct nfs_open_context *ctx)
 {
 	struct nfs4_state_owner *sp = opendata->owner;
 	struct nfs_server *server = sp->so_server;
 	struct dentry *dentry;
 	struct nfs4_state *state;
+	fmode_t acc_mode = _nfs4_ctx_to_accessmode(ctx);
 	unsigned int seq;
 	int ret;
 
@@ -2946,7 +2974,8 @@
 	/* Parse layoutget results before we check for access */
 	pnfs_parse_lgopen(state->inode, opendata->lgp, ctx);
 
-	ret = nfs4_opendata_access(sp->so_cred, opendata, state, fmode, flags);
+	ret = nfs4_opendata_access(sp->so_cred, opendata, state,
+			acc_mode, flags);
 	if (ret != 0)
 		goto out;
 
@@ -2978,7 +3007,7 @@
 	struct dentry *dentry = ctx->dentry;
 	const struct cred *cred = ctx->cred;
 	struct nfs4_threshold **ctx_th = &ctx->mdsthreshold;
-	fmode_t fmode = ctx->mode & (FMODE_READ|FMODE_WRITE|FMODE_EXEC);
+	fmode_t fmode = _nfs4_ctx_to_openmode(ctx);
 	enum open_claim_type4 claim = NFS4_OPEN_CLAIM_NULL;
 	struct iattr *sattr = c->sattr;
 	struct nfs4_label *label = c->label;
@@ -3024,7 +3053,7 @@
 	if (d_really_is_positive(dentry))
 		opendata->state = nfs4_get_open_state(d_inode(dentry), sp);
 
-	status = _nfs4_open_and_get_state(opendata, fmode, flags, ctx);
+	status = _nfs4_open_and_get_state(opendata, flags, ctx);
 	if (status != 0)
 		goto err_free_label;
 	state = ctx->state;
@@ -3594,9 +3623,9 @@
 	if (ctx->state == NULL)
 		return;
 	if (is_sync)
-		nfs4_close_sync(ctx->state, ctx->mode);
+		nfs4_close_sync(ctx->state, _nfs4_ctx_to_openmode(ctx));
 	else
-		nfs4_close_state(ctx->state, ctx->mode);
+		nfs4_close_state(ctx->state, _nfs4_ctx_to_openmode(ctx));
 }
 
 #define FATTR4_WORD1_NFS40_MASK (2*FATTR4_WORD1_MOUNTED_ON_FILEID - 1UL)
@@ -5980,7 +6009,7 @@
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_setclientid_ops,
 		.callback_data = &setclientid,
-		.flags = RPC_TASK_TIMEOUT,
+		.flags = RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN,
 	};
 	int status;
 
@@ -6046,7 +6075,8 @@
 	dprintk("NFS call  setclientid_confirm auth=%s, (client ID %llx)\n",
 		clp->cl_rpcclient->cl_auth->au_ops->au_name,
 		clp->cl_clientid);
-	status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
+	status = rpc_call_sync(clp->cl_rpcclient, &msg,
+			       RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN);
 	trace_nfs4_setclientid_confirm(clp, status);
 	dprintk("NFS reply setclientid_confirm: %d\n", status);
 	return status;
@@ -7627,7 +7657,7 @@
 		NFS_SP4_MACH_CRED_SECINFO, &clnt, &msg);
 
 	status = nfs4_call_sync(clnt, NFS_SERVER(dir), &msg, &args.seq_args,
-				&res.seq_res, 0);
+				&res.seq_res, RPC_TASK_NO_ROUND_ROBIN);
 	dprintk("NFS reply  secinfo: %d\n", status);
 
 	put_cred(cred);
@@ -7965,7 +7995,7 @@
 		.rpc_client = clp->cl_rpcclient,
 		.callback_ops = &nfs4_exchange_id_call_ops,
 		.rpc_message = &msg,
-		.flags = RPC_TASK_TIMEOUT,
+		.flags = RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN,
 	};
 	struct nfs41_exchange_id_data *calldata;
 	int status;
@@ -8190,7 +8220,8 @@
 	};
 	int status;
 
-	status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
+	status = rpc_call_sync(clp->cl_rpcclient, &msg,
+			       RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN);
 	trace_nfs4_destroy_clientid(clp, status);
 	if (status)
 		dprintk("NFS: Got error %d from the server %s on "
@@ -8241,6 +8272,8 @@
 	return ret;
 }
 
+#endif /* CONFIG_NFS_V4_1 */
+
 struct nfs4_get_lease_time_data {
 	struct nfs4_get_lease_time_args *args;
 	struct nfs4_get_lease_time_res *res;
@@ -8273,7 +8306,7 @@
 			(struct nfs4_get_lease_time_data *)calldata;
 
 	dprintk("--> %s\n", __func__);
-	if (!nfs41_sequence_done(task, &data->res->lr_seq_res))
+	if (!nfs4_sequence_done(task, &data->res->lr_seq_res))
 		return;
 	switch (task->tk_status) {
 	case -NFS4ERR_DELAY:
@@ -8331,6 +8364,8 @@
 	return status;
 }
 
+#ifdef CONFIG_NFS_V4_1
+
 /*
  * Initialize the values to be used by the client in CREATE_SESSION
  * If nfs4_init_session set the fore channel request and response sizes,
@@ -8345,6 +8380,7 @@
 {
 	unsigned int max_rqst_sz, max_resp_sz;
 	unsigned int max_bc_payload = rpc_max_bc_payload(clnt);
+	unsigned int max_bc_slots = rpc_num_bc_slots(clnt);
 
 	max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead;
 	max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead;
@@ -8367,6 +8403,8 @@
 	args->bc_attrs.max_resp_sz_cached = 0;
 	args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
 	args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1);
+	if (args->bc_attrs.max_reqs > max_bc_slots)
+		args->bc_attrs.max_reqs = max_bc_slots;
 
 	dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
 		"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
@@ -8469,7 +8507,8 @@
 	nfs4_init_channel_attrs(&args, clp->cl_rpcclient);
 	args.flags = (SESSION4_PERSIST | SESSION4_BACK_CHAN);
 
-	status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
+	status = rpc_call_sync(session->clp->cl_rpcclient, &msg,
+			       RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN);
 	trace_nfs4_create_session(clp, status);
 
 	switch (status) {
@@ -8545,7 +8584,8 @@
 	if (!test_and_clear_bit(NFS4_SESSION_ESTABLISHED, &session->session_state))
 		return 0;
 
-	status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
+	status = rpc_call_sync(session->clp->cl_rpcclient, &msg,
+			       RPC_TASK_TIMEOUT | RPC_TASK_NO_ROUND_ROBIN);
 	trace_nfs4_destroy_session(session->clp, status);
 
 	if (status)
@@ -8799,7 +8839,7 @@
 		.rpc_client = clp->cl_rpcclient,
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_reclaim_complete_call_ops,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_NO_ROUND_ROBIN,
 	};
 	int status = -ENOMEM;
 
@@ -9318,7 +9358,7 @@
 
 	dprintk("--> %s\n", __func__);
 	status = nfs4_call_sync(clnt, server, &msg, &args.seq_args,
-				&res.seq_res, 0);
+				&res.seq_res, RPC_TASK_NO_ROUND_ROBIN);
 	dprintk("<-- %s status=%d\n", __func__, status);
 
 	put_cred(cred);
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index e2e3c4f..9afd051 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -87,6 +87,27 @@
 
 static DEFINE_MUTEX(nfs_clid_init_mutex);
 
+static int nfs4_setup_state_renewal(struct nfs_client *clp)
+{
+	int status;
+	struct nfs_fsinfo fsinfo;
+	unsigned long now;
+
+	if (!test_bit(NFS_CS_CHECK_LEASE_TIME, &clp->cl_res_state)) {
+		nfs4_schedule_state_renewal(clp);
+		return 0;
+	}
+
+	now = jiffies;
+	status = nfs4_proc_get_lease_time(clp, &fsinfo);
+	if (status == 0) {
+		nfs4_set_lease_period(clp, fsinfo.lease_time * HZ, now);
+		nfs4_schedule_state_renewal(clp);
+	}
+
+	return status;
+}
+
 int nfs4_init_clientid(struct nfs_client *clp, const struct cred *cred)
 {
 	struct nfs4_setclientid_res clid = {
@@ -114,7 +135,7 @@
 	if (status != 0)
 		goto out;
 	clear_bit(NFS4CLNT_LEASE_CONFIRM, &clp->cl_state);
-	nfs4_schedule_state_renewal(clp);
+	nfs4_setup_state_renewal(clp);
 out:
 	return status;
 }
@@ -286,34 +307,13 @@
 
 #if defined(CONFIG_NFS_V4_1)
 
-static int nfs41_setup_state_renewal(struct nfs_client *clp)
-{
-	int status;
-	struct nfs_fsinfo fsinfo;
-	unsigned long now;
-
-	if (!test_bit(NFS_CS_CHECK_LEASE_TIME, &clp->cl_res_state)) {
-		nfs4_schedule_state_renewal(clp);
-		return 0;
-	}
-
-	now = jiffies;
-	status = nfs4_proc_get_lease_time(clp, &fsinfo);
-	if (status == 0) {
-		nfs4_set_lease_period(clp, fsinfo.lease_time * HZ, now);
-		nfs4_schedule_state_renewal(clp);
-	}
-
-	return status;
-}
-
 static void nfs41_finish_session_reset(struct nfs_client *clp)
 {
 	clear_bit(NFS4CLNT_LEASE_CONFIRM, &clp->cl_state);
 	clear_bit(NFS4CLNT_SESSION_RESET, &clp->cl_state);
 	/* create_session negotiated new slot table */
 	clear_bit(NFS4CLNT_BIND_CONN_TO_SESSION, &clp->cl_state);
-	nfs41_setup_state_renewal(clp);
+	nfs4_setup_state_renewal(clp);
 }
 
 int nfs41_init_clientid(struct nfs_client *clp, const struct cred *cred)
@@ -1064,8 +1064,7 @@
 		 * choose to use.
 		 */
 		goto out;
-	nfs4_copy_open_stateid(dst, state);
-	ret = 0;
+	ret = nfs4_copy_open_stateid(dst, state) ? 0 : -EAGAIN;
 out:
 	if (nfs_server_capable(state->inode, NFS_CAP_STATEID_NFSV41))
 		dst->seqid = 0;
diff --git a/fs/nfs/nfs4trace.c b/fs/nfs/nfs4trace.c
index e9fb3e5..1a8f376 100644
--- a/fs/nfs/nfs4trace.c
+++ b/fs/nfs/nfs4trace.c
@@ -16,4 +16,12 @@
 EXPORT_TRACEPOINT_SYMBOL_GPL(nfs4_pnfs_read);
 EXPORT_TRACEPOINT_SYMBOL_GPL(nfs4_pnfs_write);
 EXPORT_TRACEPOINT_SYMBOL_GPL(nfs4_pnfs_commit_ds);
+
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_pg_init_read);
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_pg_init_write);
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_pg_get_mirror_count);
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_read_done);
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_write_done);
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_read_pagelist);
+EXPORT_TRACEPOINT_SYMBOL_GPL(pnfs_mds_fallback_write_pagelist);
 #endif
diff --git a/fs/nfs/nfs4trace.h b/fs/nfs/nfs4trace.h
index cd1a5c0..b2f395f 100644
--- a/fs/nfs/nfs4trace.h
+++ b/fs/nfs/nfs4trace.h
@@ -156,7 +156,7 @@
 TRACE_DEFINE_ENUM(NFS4ERR_XDEV);
 
 #define show_nfsv4_errors(error) \
-	__print_symbolic(-(error), \
+	__print_symbolic(error, \
 		{ NFS4_OK, "OK" }, \
 		/* Mapped by nfs4_stat_to_errno() */ \
 		{ EPERM, "EPERM" }, \
@@ -348,7 +348,7 @@
 
 		TP_STRUCT__entry(
 			__string(dstaddr, clp->cl_hostname)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
@@ -357,8 +357,8 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) dstaddr=%s",
-			__entry->error,
+			"error=%ld (%s) dstaddr=%s",
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			__get_str(dstaddr)
 		)
@@ -420,7 +420,7 @@
 			__field(unsigned int, highest_slotid)
 			__field(unsigned int, target_highest_slotid)
 			__field(unsigned int, status_flags)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
@@ -435,10 +435,10 @@
 			__entry->error = res->sr_status;
 		),
 		TP_printk(
-			"error=%d (%s) session=0x%08x slot_nr=%u seq_nr=%u "
+			"error=%ld (%s) session=0x%08x slot_nr=%u seq_nr=%u "
 			"highest_slotid=%u target_highest_slotid=%u "
 			"status_flags=%u (%s)",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			__entry->session,
 			__entry->slot_nr,
@@ -467,7 +467,7 @@
 			__field(unsigned int, seq_nr)
 			__field(unsigned int, highest_slotid)
 			__field(unsigned int, cachethis)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
@@ -476,13 +476,13 @@
 			__entry->seq_nr = args->csa_sequenceid;
 			__entry->highest_slotid = args->csa_highestslotid;
 			__entry->cachethis = args->csa_cachethis;
-			__entry->error = -be32_to_cpu(status);
+			__entry->error = be32_to_cpu(status);
 		),
 
 		TP_printk(
-			"error=%d (%s) session=0x%08x slot_nr=%u seq_nr=%u "
+			"error=%ld (%s) session=0x%08x slot_nr=%u seq_nr=%u "
 			"highest_slotid=%u",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			__entry->session,
 			__entry->slot_nr,
@@ -490,6 +490,44 @@
 			__entry->highest_slotid
 		)
 );
+
+TRACE_EVENT(nfs4_cb_seqid_err,
+		TP_PROTO(
+			const struct cb_sequenceargs *args,
+			__be32 status
+		),
+		TP_ARGS(args, status),
+
+		TP_STRUCT__entry(
+			__field(unsigned int, session)
+			__field(unsigned int, slot_nr)
+			__field(unsigned int, seq_nr)
+			__field(unsigned int, highest_slotid)
+			__field(unsigned int, cachethis)
+			__field(unsigned long, error)
+		),
+
+		TP_fast_assign(
+			__entry->session = nfs_session_id_hash(&args->csa_sessionid);
+			__entry->slot_nr = args->csa_slotid;
+			__entry->seq_nr = args->csa_sequenceid;
+			__entry->highest_slotid = args->csa_highestslotid;
+			__entry->cachethis = args->csa_cachethis;
+			__entry->error = be32_to_cpu(status);
+		),
+
+		TP_printk(
+			"error=%ld (%s) session=0x%08x slot_nr=%u seq_nr=%u "
+			"highest_slotid=%u",
+			-__entry->error,
+			show_nfsv4_errors(__entry->error),
+			__entry->session,
+			__entry->slot_nr,
+			__entry->seq_nr,
+			__entry->highest_slotid
+		)
+);
+
 #endif /* CONFIG_NFS_V4_1 */
 
 TRACE_EVENT(nfs4_setup_sequence,
@@ -526,26 +564,37 @@
 
 TRACE_EVENT(nfs4_xdr_status,
 		TP_PROTO(
+			const struct xdr_stream *xdr,
 			u32 op,
 			int error
 		),
 
-		TP_ARGS(op, error),
+		TP_ARGS(xdr, op, error),
 
 		TP_STRUCT__entry(
+			__field(unsigned int, task_id)
+			__field(unsigned int, client_id)
+			__field(u32, xid)
 			__field(u32, op)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
+			const struct rpc_rqst *rqstp = xdr->rqst;
+			const struct rpc_task *task = rqstp->rq_task;
+
+			__entry->task_id = task->tk_pid;
+			__entry->client_id = task->tk_client->cl_clid;
+			__entry->xid = be32_to_cpu(rqstp->rq_xid);
 			__entry->op = op;
-			__entry->error = -error;
+			__entry->error = error;
 		),
 
 		TP_printk(
-			"operation %d: nfs status %d (%s)",
-			__entry->op,
-			__entry->error, show_nfsv4_errors(__entry->error)
+			"task:%u@%d xid=0x%08x error=%ld (%s) operation=%u",
+			__entry->task_id, __entry->client_id, __entry->xid,
+			-__entry->error, show_nfsv4_errors(__entry->error),
+			__entry->op
 		)
 );
 
@@ -559,7 +608,7 @@
 		TP_ARGS(ctx, flags, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(unsigned int, flags)
 			__field(unsigned int, fmode)
 			__field(dev_t, dev)
@@ -577,7 +626,7 @@
 			const struct nfs4_state *state = ctx->state;
 			const struct inode *inode = NULL;
 
-			__entry->error = error;
+			__entry->error = -error;
 			__entry->flags = flags;
 			__entry->fmode = (__force unsigned int)ctx->mode;
 			__entry->dev = ctx->dentry->d_sb->s_dev;
@@ -609,11 +658,11 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) flags=%d (%s) fmode=%s "
+			"error=%ld (%s) flags=%d (%s) fmode=%s "
 			"fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"name=%02x:%02x:%llu/%s stateid=%d:0x%08x "
 			"openstateid=%d:0x%08x",
-			 __entry->error,
+			 -__entry->error,
 			 show_nfsv4_errors(__entry->error),
 			 __entry->flags,
 			 show_open_flags(__entry->flags),
@@ -695,7 +744,7 @@
 			__field(u32, fhandle)
 			__field(u64, fileid)
 			__field(unsigned int, fmode)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, stateid_seq)
 			__field(u32, stateid_hash)
 		),
@@ -715,9 +764,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fmode=%s fileid=%02x:%02x:%llu "
+			"error=%ld (%s) fmode=%s fileid=%02x:%02x:%llu "
 			"fhandle=0x%08x openstateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			__entry->fmode ?  show_fmode_flags(__entry->fmode) :
 					  "closed",
@@ -757,7 +806,7 @@
 		TP_ARGS(request, state, cmd, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, cmd)
 			__field(char, type)
 			__field(loff_t, start)
@@ -787,10 +836,10 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) cmd=%s:%s range=%lld:%lld "
+			"error=%ld (%s) cmd=%s:%s range=%lld:%lld "
 			"fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"stateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			show_lock_cmd(__entry->cmd),
 			show_lock_type(__entry->type),
@@ -827,7 +876,7 @@
 		TP_ARGS(request, state, lockstateid, cmd, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, cmd)
 			__field(char, type)
 			__field(loff_t, start)
@@ -863,10 +912,10 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) cmd=%s:%s range=%lld:%lld "
+			"error=%ld (%s) cmd=%s:%s range=%lld:%lld "
 			"fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"stateid=%d:0x%08x lockstateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			show_lock_cmd(__entry->cmd),
 			show_lock_type(__entry->type),
@@ -932,7 +981,7 @@
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
 			__field(u32, fhandle)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, stateid_seq)
 			__field(u32, stateid_hash)
 		),
@@ -948,9 +997,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) dev=%02x:%02x fhandle=0x%08x "
+			"error=%ld (%s) dev=%02x:%02x fhandle=0x%08x "
 			"stateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			__entry->fhandle,
@@ -969,7 +1018,7 @@
 		TP_ARGS(state, lsp, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(dev_t, dev)
 			__field(u32, fhandle)
 			__field(u64, fileid)
@@ -991,9 +1040,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"stateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1026,7 +1075,7 @@
 
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(u64, dir)
 			__string(name, name->name)
 		),
@@ -1034,13 +1083,13 @@
 		TP_fast_assign(
 			__entry->dev = dir->i_sb->s_dev;
 			__entry->dir = NFS_FILEID(dir);
-			__entry->error = error;
+			__entry->error = -error;
 			__assign_str(name, name->name);
 		),
 
 		TP_printk(
-			"error=%d (%s) name=%02x:%02x:%llu/%s",
-			__entry->error,
+			"error=%ld (%s) name=%02x:%02x:%llu/%s",
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->dir,
@@ -1076,7 +1125,7 @@
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
 			__field(u64, ino)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
@@ -1086,8 +1135,8 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) inode=%02x:%02x:%llu",
-			__entry->error,
+			"error=%ld (%s) inode=%02x:%02x:%llu",
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->ino
@@ -1107,7 +1156,7 @@
 
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(u64, olddir)
 			__string(oldname, oldname->name)
 			__field(u64, newdir)
@@ -1124,9 +1173,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) oldname=%02x:%02x:%llu/%s "
+			"error=%ld (%s) oldname=%02x:%02x:%llu/%s "
 			"newname=%02x:%02x:%llu/%s",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->olddir,
@@ -1149,19 +1198,19 @@
 			__field(dev_t, dev)
 			__field(u32, fhandle)
 			__field(u64, fileid)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
 			__entry->dev = inode->i_sb->s_dev;
 			__entry->fileid = NFS_FILEID(inode);
 			__entry->fhandle = nfs_fhandle_hash(NFS_FH(inode));
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x",
-			__entry->error,
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x",
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1200,7 +1249,7 @@
 			__field(dev_t, dev)
 			__field(u32, fhandle)
 			__field(u64, fileid)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, stateid_seq)
 			__field(u32, stateid_hash)
 		),
@@ -1217,9 +1266,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"stateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1257,7 +1306,7 @@
 			__field(u32, fhandle)
 			__field(u64, fileid)
 			__field(unsigned int, valid)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
@@ -1269,9 +1318,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"valid=%s",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1304,7 +1353,7 @@
 		TP_ARGS(clp, fhandle, inode, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(dev_t, dev)
 			__field(u32, fhandle)
 			__field(u64, fileid)
@@ -1325,9 +1374,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"dstaddr=%s",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1359,7 +1408,7 @@
 		TP_ARGS(clp, fhandle, inode, stateid, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(dev_t, dev)
 			__field(u32, fhandle)
 			__field(u64, fileid)
@@ -1386,9 +1435,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"stateid=%d:0x%08x dstaddr=%s",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1422,7 +1471,7 @@
 		TP_ARGS(name, len, id, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(u32, id)
 			__dynamic_array(char, name, len > 0 ? len + 1 : 1)
 		),
@@ -1437,8 +1486,8 @@
 		),
 
 		TP_printk(
-			"error=%d id=%u name=%s",
-			__entry->error,
+			"error=%ld (%s) id=%u name=%s",
+			-__entry->error, show_nfsv4_errors(__entry->error),
 			__entry->id,
 			__get_str(name)
 		)
@@ -1471,7 +1520,7 @@
 			__field(u64, fileid)
 			__field(loff_t, offset)
 			__field(size_t, count)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, stateid_seq)
 			__field(u32, stateid_hash)
 		),
@@ -1485,7 +1534,7 @@
 			__entry->fhandle = nfs_fhandle_hash(NFS_FH(inode));
 			__entry->offset = hdr->args.offset;
 			__entry->count = hdr->args.count;
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 			__entry->stateid_seq =
 				be32_to_cpu(state->stateid.seqid);
 			__entry->stateid_hash =
@@ -1493,9 +1542,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"offset=%lld count=%zu stateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1531,7 +1580,7 @@
 			__field(u64, fileid)
 			__field(loff_t, offset)
 			__field(size_t, count)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, stateid_seq)
 			__field(u32, stateid_hash)
 		),
@@ -1545,7 +1594,7 @@
 			__entry->fhandle = nfs_fhandle_hash(NFS_FH(inode));
 			__entry->offset = hdr->args.offset;
 			__entry->count = hdr->args.count;
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 			__entry->stateid_seq =
 				be32_to_cpu(state->stateid.seqid);
 			__entry->stateid_hash =
@@ -1553,9 +1602,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"offset=%lld count=%zu stateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1592,7 +1641,7 @@
 			__field(u64, fileid)
 			__field(loff_t, offset)
 			__field(size_t, count)
-			__field(int, error)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
@@ -1606,9 +1655,9 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"offset=%lld count=%zu",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1656,7 +1705,7 @@
 			__field(u32, iomode)
 			__field(u64, offset)
 			__field(u64, count)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(int, stateid_seq)
 			__field(u32, stateid_hash)
 			__field(int, layoutstateid_seq)
@@ -1689,10 +1738,10 @@
 		),
 
 		TP_printk(
-			"error=%d (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"iomode=%s offset=%llu count=%llu stateid=%d:0x%08x "
 			"layoutstateid=%d:0x%08x",
-			__entry->error,
+			-__entry->error,
 			show_nfsv4_errors(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
@@ -1722,6 +1771,7 @@
 TRACE_DEFINE_ENUM(PNFS_UPDATE_LAYOUT_INVALID_OPEN);
 TRACE_DEFINE_ENUM(PNFS_UPDATE_LAYOUT_RETRY);
 TRACE_DEFINE_ENUM(PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET);
+TRACE_DEFINE_ENUM(PNFS_UPDATE_LAYOUT_EXIT);
 
 #define show_pnfs_update_layout_reason(reason)				\
 	__print_symbolic(reason,					\
@@ -1737,7 +1787,8 @@
 		{ PNFS_UPDATE_LAYOUT_BLOCKED, "layouts blocked" },	\
 		{ PNFS_UPDATE_LAYOUT_INVALID_OPEN, "invalid open" },	\
 		{ PNFS_UPDATE_LAYOUT_RETRY, "retrying" },	\
-		{ PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET, "sent layoutget" })
+		{ PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET, "sent layoutget" }, \
+		{ PNFS_UPDATE_LAYOUT_EXIT, "exit" })
 
 TRACE_EVENT(pnfs_update_layout,
 		TP_PROTO(struct inode *inode,
@@ -1796,6 +1847,78 @@
 		)
 );
 
+DECLARE_EVENT_CLASS(pnfs_layout_event,
+		TP_PROTO(struct inode *inode,
+			loff_t pos,
+			u64 count,
+			enum pnfs_iomode iomode,
+			struct pnfs_layout_hdr *lo,
+			struct pnfs_layout_segment *lseg
+		),
+		TP_ARGS(inode, pos, count, iomode, lo, lseg),
+		TP_STRUCT__entry(
+			__field(dev_t, dev)
+			__field(u64, fileid)
+			__field(u32, fhandle)
+			__field(loff_t, pos)
+			__field(u64, count)
+			__field(enum pnfs_iomode, iomode)
+			__field(int, layoutstateid_seq)
+			__field(u32, layoutstateid_hash)
+			__field(long, lseg)
+		),
+		TP_fast_assign(
+			__entry->dev = inode->i_sb->s_dev;
+			__entry->fileid = NFS_FILEID(inode);
+			__entry->fhandle = nfs_fhandle_hash(NFS_FH(inode));
+			__entry->pos = pos;
+			__entry->count = count;
+			__entry->iomode = iomode;
+			if (lo != NULL) {
+				__entry->layoutstateid_seq =
+				be32_to_cpu(lo->plh_stateid.seqid);
+				__entry->layoutstateid_hash =
+				nfs_stateid_hash(&lo->plh_stateid);
+			} else {
+				__entry->layoutstateid_seq = 0;
+				__entry->layoutstateid_hash = 0;
+			}
+			__entry->lseg = (long)lseg;
+		),
+		TP_printk(
+			"fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"iomode=%s pos=%llu count=%llu "
+			"layoutstateid=%d:0x%08x lseg=0x%lx",
+			MAJOR(__entry->dev), MINOR(__entry->dev),
+			(unsigned long long)__entry->fileid,
+			__entry->fhandle,
+			show_pnfs_iomode(__entry->iomode),
+			(unsigned long long)__entry->pos,
+			(unsigned long long)__entry->count,
+			__entry->layoutstateid_seq, __entry->layoutstateid_hash,
+			__entry->lseg
+		)
+);
+
+#define DEFINE_PNFS_LAYOUT_EVENT(name) \
+	DEFINE_EVENT(pnfs_layout_event, name, \
+		TP_PROTO(struct inode *inode, \
+			loff_t pos, \
+			u64 count, \
+			enum pnfs_iomode iomode, \
+			struct pnfs_layout_hdr *lo, \
+			struct pnfs_layout_segment *lseg \
+		), \
+		TP_ARGS(inode, pos, count, iomode, lo, lseg))
+
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_pg_init_read);
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_pg_init_write);
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_pg_get_mirror_count);
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_read_done);
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_write_done);
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_read_pagelist);
+DEFINE_PNFS_LAYOUT_EVENT(pnfs_mds_fallback_write_pagelist);
+
 #endif /* CONFIG_NFS_V4_1 */
 
 #endif /* _TRACE_NFS4_H */
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 6024461..46a8d63 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -837,6 +837,7 @@
 #define NFS4_dec_sequence_sz \
 				(compound_decode_hdr_maxsz + \
 				 decode_sequence_maxsz)
+#endif
 #define NFS4_enc_get_lease_time_sz	(compound_encode_hdr_maxsz + \
 					 encode_sequence_maxsz + \
 					 encode_putrootfh_maxsz + \
@@ -845,6 +846,7 @@
 					 decode_sequence_maxsz + \
 					 decode_putrootfh_maxsz + \
 					 decode_fsinfo_maxsz)
+#if defined(CONFIG_NFS_V4_1)
 #define NFS4_enc_reclaim_complete_sz	(compound_encode_hdr_maxsz + \
 					 encode_sequence_maxsz + \
 					 encode_reclaim_complete_maxsz)
@@ -2957,6 +2959,8 @@
 	encode_nops(&hdr);
 }
 
+#endif
+
 /*
  * a GET_LEASE_TIME request
  */
@@ -2977,6 +2981,8 @@
 	encode_nops(&hdr);
 }
 
+#ifdef CONFIG_NFS_V4_1
+
 /*
  * a RECLAIM_COMPLETE request
  */
@@ -3187,7 +3193,7 @@
 	return true;
 out_status:
 	nfserr = be32_to_cpup(p);
-	trace_nfs4_xdr_status(opnum, nfserr);
+	trace_nfs4_xdr_status(xdr, opnum, nfserr);
 	*nfs_retval = nfs4_stat_to_errno(nfserr);
 	return true;
 out_bad_operation:
@@ -3427,7 +3433,7 @@
 		*res = be32_to_cpup(p);
 		bitmap[0] &= ~FATTR4_WORD0_LEASE_TIME;
 	}
-	dprintk("%s: file size=%u\n", __func__, (unsigned int)*res);
+	dprintk("%s: lease time=%u\n", __func__, (unsigned int)*res);
 	return 0;
 }
 
@@ -7122,6 +7128,8 @@
 	return status;
 }
 
+#endif
+
 /*
  * Decode GET_LEASE_TIME response
  */
@@ -7143,6 +7151,8 @@
 	return status;
 }
 
+#ifdef CONFIG_NFS_V4_1
+
 /*
  * Decode RECLAIM_COMPLETE response
  */
@@ -7551,7 +7561,7 @@
 	PROC41(CREATE_SESSION,	enc_create_session,	dec_create_session),
 	PROC41(DESTROY_SESSION,	enc_destroy_session,	dec_destroy_session),
 	PROC41(SEQUENCE,	enc_sequence,		dec_sequence),
-	PROC41(GET_LEASE_TIME,	enc_get_lease_time,	dec_get_lease_time),
+	PROC(GET_LEASE_TIME,	enc_get_lease_time,	dec_get_lease_time),
 	PROC41(RECLAIM_COMPLETE,enc_reclaim_complete,	dec_reclaim_complete),
 	PROC41(GETDEVICEINFO,	enc_getdeviceinfo,	dec_getdeviceinfo),
 	PROC41(LAYOUTGET,	enc_layoutget,		dec_layoutget),
diff --git a/fs/nfs/nfstrace.h b/fs/nfs/nfstrace.h
index a0d6910..976d408 100644
--- a/fs/nfs/nfstrace.h
+++ b/fs/nfs/nfstrace.h
@@ -11,6 +11,16 @@
 #include <linux/tracepoint.h>
 #include <linux/iversion.h>
 
+TRACE_DEFINE_ENUM(DT_UNKNOWN);
+TRACE_DEFINE_ENUM(DT_FIFO);
+TRACE_DEFINE_ENUM(DT_CHR);
+TRACE_DEFINE_ENUM(DT_DIR);
+TRACE_DEFINE_ENUM(DT_BLK);
+TRACE_DEFINE_ENUM(DT_REG);
+TRACE_DEFINE_ENUM(DT_LNK);
+TRACE_DEFINE_ENUM(DT_SOCK);
+TRACE_DEFINE_ENUM(DT_WHT);
+
 #define nfs_show_file_type(ftype) \
 	__print_symbolic(ftype, \
 			{ DT_UNKNOWN, "UNKNOWN" }, \
@@ -23,25 +33,57 @@
 			{ DT_SOCK, "SOCK" }, \
 			{ DT_WHT, "WHT" })
 
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_DATA);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_ATIME);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_ACCESS);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_ACL);
+TRACE_DEFINE_ENUM(NFS_INO_REVAL_PAGECACHE);
+TRACE_DEFINE_ENUM(NFS_INO_REVAL_FORCED);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_LABEL);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_CHANGE);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_CTIME);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_MTIME);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_SIZE);
+TRACE_DEFINE_ENUM(NFS_INO_INVALID_OTHER);
+
 #define nfs_show_cache_validity(v) \
 	__print_flags(v, "|", \
-			{ NFS_INO_INVALID_ATTR, "INVALID_ATTR" }, \
 			{ NFS_INO_INVALID_DATA, "INVALID_DATA" }, \
 			{ NFS_INO_INVALID_ATIME, "INVALID_ATIME" }, \
 			{ NFS_INO_INVALID_ACCESS, "INVALID_ACCESS" }, \
 			{ NFS_INO_INVALID_ACL, "INVALID_ACL" }, \
 			{ NFS_INO_REVAL_PAGECACHE, "REVAL_PAGECACHE" }, \
 			{ NFS_INO_REVAL_FORCED, "REVAL_FORCED" }, \
-			{ NFS_INO_INVALID_LABEL, "INVALID_LABEL" })
+			{ NFS_INO_INVALID_LABEL, "INVALID_LABEL" }, \
+			{ NFS_INO_INVALID_CHANGE, "INVALID_CHANGE" }, \
+			{ NFS_INO_INVALID_CTIME, "INVALID_CTIME" }, \
+			{ NFS_INO_INVALID_MTIME, "INVALID_MTIME" }, \
+			{ NFS_INO_INVALID_SIZE, "INVALID_SIZE" }, \
+			{ NFS_INO_INVALID_OTHER, "INVALID_OTHER" })
+
+TRACE_DEFINE_ENUM(NFS_INO_ADVISE_RDPLUS);
+TRACE_DEFINE_ENUM(NFS_INO_STALE);
+TRACE_DEFINE_ENUM(NFS_INO_ACL_LRU_SET);
+TRACE_DEFINE_ENUM(NFS_INO_INVALIDATING);
+TRACE_DEFINE_ENUM(NFS_INO_FSCACHE);
+TRACE_DEFINE_ENUM(NFS_INO_FSCACHE_LOCK);
+TRACE_DEFINE_ENUM(NFS_INO_LAYOUTCOMMIT);
+TRACE_DEFINE_ENUM(NFS_INO_LAYOUTCOMMITTING);
+TRACE_DEFINE_ENUM(NFS_INO_LAYOUTSTATS);
+TRACE_DEFINE_ENUM(NFS_INO_ODIRECT);
 
 #define nfs_show_nfsi_flags(v) \
 	__print_flags(v, "|", \
-			{ 1 << NFS_INO_ADVISE_RDPLUS, "ADVISE_RDPLUS" }, \
-			{ 1 << NFS_INO_STALE, "STALE" }, \
-			{ 1 << NFS_INO_INVALIDATING, "INVALIDATING" }, \
-			{ 1 << NFS_INO_FSCACHE, "FSCACHE" }, \
-			{ 1 << NFS_INO_LAYOUTCOMMIT, "NEED_LAYOUTCOMMIT" }, \
-			{ 1 << NFS_INO_LAYOUTCOMMITTING, "LAYOUTCOMMIT" })
+			{ BIT(NFS_INO_ADVISE_RDPLUS), "ADVISE_RDPLUS" }, \
+			{ BIT(NFS_INO_STALE), "STALE" }, \
+			{ BIT(NFS_INO_ACL_LRU_SET), "ACL_LRU_SET" }, \
+			{ BIT(NFS_INO_INVALIDATING), "INVALIDATING" }, \
+			{ BIT(NFS_INO_FSCACHE), "FSCACHE" }, \
+			{ BIT(NFS_INO_FSCACHE_LOCK), "FSCACHE_LOCK" }, \
+			{ BIT(NFS_INO_LAYOUTCOMMIT), "NEED_LAYOUTCOMMIT" }, \
+			{ BIT(NFS_INO_LAYOUTCOMMITTING), "LAYOUTCOMMIT" }, \
+			{ BIT(NFS_INO_LAYOUTSTATS), "LAYOUTSTATS" }, \
+			{ BIT(NFS_INO_ODIRECT), "ODIRECT" })
 
 DECLARE_EVENT_CLASS(nfs_inode_event,
 		TP_PROTO(
@@ -83,7 +125,7 @@
 		TP_ARGS(inode, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(dev_t, dev)
 			__field(u32, fhandle)
 			__field(unsigned char, type)
@@ -96,7 +138,7 @@
 
 		TP_fast_assign(
 			const struct nfs_inode *nfsi = NFS_I(inode);
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 			__entry->dev = inode->i_sb->s_dev;
 			__entry->fileid = nfsi->fileid;
 			__entry->fhandle = nfs_fhandle_hash(&nfsi->fh);
@@ -108,10 +150,10 @@
 		),
 
 		TP_printk(
-			"error=%d fileid=%02x:%02x:%llu fhandle=0x%08x "
+			"error=%ld (%s) fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"type=%u (%s) version=%llu size=%lld "
-			"cache_validity=%lu (%s) nfs_flags=%ld (%s)",
-			__entry->error,
+			"cache_validity=0x%lx (%s) nfs_flags=0x%lx (%s)",
+			-__entry->error, nfs_show_status(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
 			__entry->fhandle,
@@ -158,13 +200,41 @@
 DEFINE_NFS_INODE_EVENT(nfs_access_enter);
 DEFINE_NFS_INODE_EVENT_DONE(nfs_access_exit);
 
+TRACE_DEFINE_ENUM(LOOKUP_FOLLOW);
+TRACE_DEFINE_ENUM(LOOKUP_DIRECTORY);
+TRACE_DEFINE_ENUM(LOOKUP_AUTOMOUNT);
+TRACE_DEFINE_ENUM(LOOKUP_PARENT);
+TRACE_DEFINE_ENUM(LOOKUP_REVAL);
+TRACE_DEFINE_ENUM(LOOKUP_RCU);
+TRACE_DEFINE_ENUM(LOOKUP_NO_REVAL);
+TRACE_DEFINE_ENUM(LOOKUP_NO_EVAL);
+TRACE_DEFINE_ENUM(LOOKUP_OPEN);
+TRACE_DEFINE_ENUM(LOOKUP_CREATE);
+TRACE_DEFINE_ENUM(LOOKUP_EXCL);
+TRACE_DEFINE_ENUM(LOOKUP_RENAME_TARGET);
+TRACE_DEFINE_ENUM(LOOKUP_JUMPED);
+TRACE_DEFINE_ENUM(LOOKUP_ROOT);
+TRACE_DEFINE_ENUM(LOOKUP_EMPTY);
+TRACE_DEFINE_ENUM(LOOKUP_DOWN);
+
 #define show_lookup_flags(flags) \
-	__print_flags((unsigned long)flags, "|", \
-			{ LOOKUP_AUTOMOUNT, "AUTOMOUNT" }, \
+	__print_flags(flags, "|", \
+			{ LOOKUP_FOLLOW, "FOLLOW" }, \
 			{ LOOKUP_DIRECTORY, "DIRECTORY" }, \
+			{ LOOKUP_AUTOMOUNT, "AUTOMOUNT" }, \
+			{ LOOKUP_PARENT, "PARENT" }, \
+			{ LOOKUP_REVAL, "REVAL" }, \
+			{ LOOKUP_RCU, "RCU" }, \
+			{ LOOKUP_NO_REVAL, "NO_REVAL" }, \
+			{ LOOKUP_NO_EVAL, "NO_EVAL" }, \
 			{ LOOKUP_OPEN, "OPEN" }, \
 			{ LOOKUP_CREATE, "CREATE" }, \
-			{ LOOKUP_EXCL, "EXCL" })
+			{ LOOKUP_EXCL, "EXCL" }, \
+			{ LOOKUP_RENAME_TARGET, "RENAME_TARGET" }, \
+			{ LOOKUP_JUMPED, "JUMPED" }, \
+			{ LOOKUP_ROOT, "ROOT" }, \
+			{ LOOKUP_EMPTY, "EMPTY" }, \
+			{ LOOKUP_DOWN, "DOWN" })
 
 DECLARE_EVENT_CLASS(nfs_lookup_event,
 		TP_PROTO(
@@ -176,7 +246,7 @@
 		TP_ARGS(dir, dentry, flags),
 
 		TP_STRUCT__entry(
-			__field(unsigned int, flags)
+			__field(unsigned long, flags)
 			__field(dev_t, dev)
 			__field(u64, dir)
 			__string(name, dentry->d_name.name)
@@ -190,7 +260,7 @@
 		),
 
 		TP_printk(
-			"flags=%u (%s) name=%02x:%02x:%llu/%s",
+			"flags=0x%lx (%s) name=%02x:%02x:%llu/%s",
 			__entry->flags,
 			show_lookup_flags(__entry->flags),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
@@ -219,8 +289,8 @@
 		TP_ARGS(dir, dentry, flags, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
-			__field(unsigned int, flags)
+			__field(unsigned long, error)
+			__field(unsigned long, flags)
 			__field(dev_t, dev)
 			__field(u64, dir)
 			__string(name, dentry->d_name.name)
@@ -229,14 +299,14 @@
 		TP_fast_assign(
 			__entry->dev = dir->i_sb->s_dev;
 			__entry->dir = NFS_FILEID(dir);
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 			__entry->flags = flags;
 			__assign_str(name, dentry->d_name.name);
 		),
 
 		TP_printk(
-			"error=%d flags=%u (%s) name=%02x:%02x:%llu/%s",
-			__entry->error,
+			"error=%ld (%s) flags=0x%lx (%s) name=%02x:%02x:%llu/%s",
+			-__entry->error, nfs_show_status(__entry->error),
 			__entry->flags,
 			show_lookup_flags(__entry->flags),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
@@ -260,15 +330,43 @@
 DEFINE_NFS_LOOKUP_EVENT(nfs_lookup_revalidate_enter);
 DEFINE_NFS_LOOKUP_EVENT_DONE(nfs_lookup_revalidate_exit);
 
+TRACE_DEFINE_ENUM(O_WRONLY);
+TRACE_DEFINE_ENUM(O_RDWR);
+TRACE_DEFINE_ENUM(O_CREAT);
+TRACE_DEFINE_ENUM(O_EXCL);
+TRACE_DEFINE_ENUM(O_NOCTTY);
+TRACE_DEFINE_ENUM(O_TRUNC);
+TRACE_DEFINE_ENUM(O_APPEND);
+TRACE_DEFINE_ENUM(O_NONBLOCK);
+TRACE_DEFINE_ENUM(O_DSYNC);
+TRACE_DEFINE_ENUM(O_DIRECT);
+TRACE_DEFINE_ENUM(O_LARGEFILE);
+TRACE_DEFINE_ENUM(O_DIRECTORY);
+TRACE_DEFINE_ENUM(O_NOFOLLOW);
+TRACE_DEFINE_ENUM(O_NOATIME);
+TRACE_DEFINE_ENUM(O_CLOEXEC);
+
 #define show_open_flags(flags) \
-	__print_flags((unsigned long)flags, "|", \
+	__print_flags(flags, "|", \
+		{ O_WRONLY, "O_WRONLY" }, \
+		{ O_RDWR, "O_RDWR" }, \
 		{ O_CREAT, "O_CREAT" }, \
 		{ O_EXCL, "O_EXCL" }, \
+		{ O_NOCTTY, "O_NOCTTY" }, \
 		{ O_TRUNC, "O_TRUNC" }, \
 		{ O_APPEND, "O_APPEND" }, \
+		{ O_NONBLOCK, "O_NONBLOCK" }, \
 		{ O_DSYNC, "O_DSYNC" }, \
 		{ O_DIRECT, "O_DIRECT" }, \
-		{ O_DIRECTORY, "O_DIRECTORY" })
+		{ O_LARGEFILE, "O_LARGEFILE" }, \
+		{ O_DIRECTORY, "O_DIRECTORY" }, \
+		{ O_NOFOLLOW, "O_NOFOLLOW" }, \
+		{ O_NOATIME, "O_NOATIME" }, \
+		{ O_CLOEXEC, "O_CLOEXEC" })
+
+TRACE_DEFINE_ENUM(FMODE_READ);
+TRACE_DEFINE_ENUM(FMODE_WRITE);
+TRACE_DEFINE_ENUM(FMODE_EXEC);
 
 #define show_fmode_flags(mode) \
 	__print_flags(mode, "|", \
@@ -286,7 +384,7 @@
 		TP_ARGS(dir, ctx, flags),
 
 		TP_STRUCT__entry(
-			__field(unsigned int, flags)
+			__field(unsigned long, flags)
 			__field(unsigned int, fmode)
 			__field(dev_t, dev)
 			__field(u64, dir)
@@ -302,7 +400,7 @@
 		),
 
 		TP_printk(
-			"flags=%u (%s) fmode=%s name=%02x:%02x:%llu/%s",
+			"flags=0x%lx (%s) fmode=%s name=%02x:%02x:%llu/%s",
 			__entry->flags,
 			show_open_flags(__entry->flags),
 			show_fmode_flags(__entry->fmode),
@@ -323,8 +421,8 @@
 		TP_ARGS(dir, ctx, flags, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
-			__field(unsigned int, flags)
+			__field(unsigned long, error)
+			__field(unsigned long, flags)
 			__field(unsigned int, fmode)
 			__field(dev_t, dev)
 			__field(u64, dir)
@@ -332,7 +430,7 @@
 		),
 
 		TP_fast_assign(
-			__entry->error = error;
+			__entry->error = -error;
 			__entry->dev = dir->i_sb->s_dev;
 			__entry->dir = NFS_FILEID(dir);
 			__entry->flags = flags;
@@ -341,9 +439,9 @@
 		),
 
 		TP_printk(
-			"error=%d flags=%u (%s) fmode=%s "
+			"error=%ld (%s) flags=0x%lx (%s) fmode=%s "
 			"name=%02x:%02x:%llu/%s",
-			__entry->error,
+			-__entry->error, nfs_show_status(__entry->error),
 			__entry->flags,
 			show_open_flags(__entry->flags),
 			show_fmode_flags(__entry->fmode),
@@ -363,7 +461,7 @@
 		TP_ARGS(dir, dentry, flags),
 
 		TP_STRUCT__entry(
-			__field(unsigned int, flags)
+			__field(unsigned long, flags)
 			__field(dev_t, dev)
 			__field(u64, dir)
 			__string(name, dentry->d_name.name)
@@ -377,7 +475,7 @@
 		),
 
 		TP_printk(
-			"flags=%u (%s) name=%02x:%02x:%llu/%s",
+			"flags=0x%lx (%s) name=%02x:%02x:%llu/%s",
 			__entry->flags,
 			show_open_flags(__entry->flags),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
@@ -397,15 +495,15 @@
 		TP_ARGS(dir, dentry, flags, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
-			__field(unsigned int, flags)
+			__field(unsigned long, error)
+			__field(unsigned long, flags)
 			__field(dev_t, dev)
 			__field(u64, dir)
 			__string(name, dentry->d_name.name)
 		),
 
 		TP_fast_assign(
-			__entry->error = error;
+			__entry->error = -error;
 			__entry->dev = dir->i_sb->s_dev;
 			__entry->dir = NFS_FILEID(dir);
 			__entry->flags = flags;
@@ -413,8 +511,8 @@
 		),
 
 		TP_printk(
-			"error=%d flags=%u (%s) name=%02x:%02x:%llu/%s",
-			__entry->error,
+			"error=%ld (%s) flags=0x%lx (%s) name=%02x:%02x:%llu/%s",
+			-__entry->error, nfs_show_status(__entry->error),
 			__entry->flags,
 			show_open_flags(__entry->flags),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
@@ -469,7 +567,7 @@
 		TP_ARGS(dir, dentry, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(dev_t, dev)
 			__field(u64, dir)
 			__string(name, dentry->d_name.name)
@@ -478,13 +576,13 @@
 		TP_fast_assign(
 			__entry->dev = dir->i_sb->s_dev;
 			__entry->dir = NFS_FILEID(dir);
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 			__assign_str(name, dentry->d_name.name);
 		),
 
 		TP_printk(
-			"error=%d name=%02x:%02x:%llu/%s",
-			__entry->error,
+			"error=%ld (%s) name=%02x:%02x:%llu/%s",
+			-__entry->error, nfs_show_status(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->dir,
 			__get_str(name)
@@ -557,7 +655,7 @@
 		TP_ARGS(inode, dir, dentry, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(dev_t, dev)
 			__field(u64, fileid)
 			__field(u64, dir)
@@ -568,13 +666,13 @@
 			__entry->dev = inode->i_sb->s_dev;
 			__entry->fileid = NFS_FILEID(inode);
 			__entry->dir = NFS_FILEID(dir);
-			__entry->error = error;
+			__entry->error = error < 0 ? -error : 0;
 			__assign_str(name, dentry->d_name.name);
 		),
 
 		TP_printk(
-			"error=%d fileid=%02x:%02x:%llu name=%02x:%02x:%llu/%s",
-			__entry->error,
+			"error=%ld (%s) fileid=%02x:%02x:%llu name=%02x:%02x:%llu/%s",
+			-__entry->error, nfs_show_status(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			__entry->fileid,
 			MAJOR(__entry->dev), MINOR(__entry->dev),
@@ -642,7 +740,7 @@
 
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(u64, old_dir)
 			__string(old_name, old_dentry->d_name.name)
 			__field(u64, new_dir)
@@ -651,17 +749,17 @@
 
 		TP_fast_assign(
 			__entry->dev = old_dir->i_sb->s_dev;
+			__entry->error = -error;
 			__entry->old_dir = NFS_FILEID(old_dir);
 			__entry->new_dir = NFS_FILEID(new_dir);
-			__entry->error = error;
 			__assign_str(old_name, old_dentry->d_name.name);
 			__assign_str(new_name, new_dentry->d_name.name);
 		),
 
 		TP_printk(
-			"error=%d old_name=%02x:%02x:%llu/%s "
+			"error=%ld (%s) old_name=%02x:%02x:%llu/%s "
 			"new_name=%02x:%02x:%llu/%s",
-			__entry->error,
+			-__entry->error, nfs_show_status(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->old_dir,
 			__get_str(old_name),
@@ -697,7 +795,7 @@
 
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
-			__field(int, error)
+			__field(unsigned long, error)
 			__field(u64, dir)
 			__dynamic_array(char, name, data->args.name.len + 1)
 		),
@@ -707,15 +805,15 @@
 			size_t len = data->args.name.len;
 			__entry->dev = dir->i_sb->s_dev;
 			__entry->dir = NFS_FILEID(dir);
-			__entry->error = error;
+			__entry->error = -error;
 			memcpy(__get_str(name),
 				data->args.name.name, len);
 			__get_str(name)[len] = 0;
 		),
 
 		TP_printk(
-			"error=%d name=%02x:%02x:%llu/%s",
-			__entry->error,
+			"error=%ld (%s) name=%02x:%02x:%llu/%s",
+			-__entry->error, nfs_show_status(__entry->error),
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->dir,
 			__get_str(name)
@@ -974,6 +1072,8 @@
 TRACE_DEFINE_ENUM(NFSERR_NOENT);
 TRACE_DEFINE_ENUM(NFSERR_IO);
 TRACE_DEFINE_ENUM(NFSERR_NXIO);
+TRACE_DEFINE_ENUM(ECHILD);
+TRACE_DEFINE_ENUM(NFSERR_EAGAIN);
 TRACE_DEFINE_ENUM(NFSERR_ACCES);
 TRACE_DEFINE_ENUM(NFSERR_EXIST);
 TRACE_DEFINE_ENUM(NFSERR_XDEV);
@@ -985,6 +1085,7 @@
 TRACE_DEFINE_ENUM(NFSERR_NOSPC);
 TRACE_DEFINE_ENUM(NFSERR_ROFS);
 TRACE_DEFINE_ENUM(NFSERR_MLINK);
+TRACE_DEFINE_ENUM(NFSERR_OPNOTSUPP);
 TRACE_DEFINE_ENUM(NFSERR_NAMETOOLONG);
 TRACE_DEFINE_ENUM(NFSERR_NOTEMPTY);
 TRACE_DEFINE_ENUM(NFSERR_DQUOT);
@@ -1007,6 +1108,8 @@
 			{ NFSERR_NOENT, "NOENT" }, \
 			{ NFSERR_IO, "IO" }, \
 			{ NFSERR_NXIO, "NXIO" }, \
+			{ ECHILD, "CHILD" }, \
+			{ NFSERR_EAGAIN, "AGAIN" }, \
 			{ NFSERR_ACCES, "ACCES" }, \
 			{ NFSERR_EXIST, "EXIST" }, \
 			{ NFSERR_XDEV, "XDEV" }, \
@@ -1018,6 +1121,7 @@
 			{ NFSERR_NOSPC, "NOSPC" }, \
 			{ NFSERR_ROFS, "ROFS" }, \
 			{ NFSERR_MLINK, "MLINK" }, \
+			{ NFSERR_OPNOTSUPP, "OPNOTSUPP" }, \
 			{ NFSERR_NAMETOOLONG, "NAMETOOLONG" }, \
 			{ NFSERR_NOTEMPTY, "NOTEMPTY" }, \
 			{ NFSERR_DQUOT, "DQUOT" }, \
@@ -1035,22 +1139,33 @@
 
 TRACE_EVENT(nfs_xdr_status,
 		TP_PROTO(
+			const struct xdr_stream *xdr,
 			int error
 		),
 
-		TP_ARGS(error),
+		TP_ARGS(xdr, error),
 
 		TP_STRUCT__entry(
-			__field(int, error)
+			__field(unsigned int, task_id)
+			__field(unsigned int, client_id)
+			__field(u32, xid)
+			__field(unsigned long, error)
 		),
 
 		TP_fast_assign(
+			const struct rpc_rqst *rqstp = xdr->rqst;
+			const struct rpc_task *task = rqstp->rq_task;
+
+			__entry->task_id = task->tk_pid;
+			__entry->client_id = task->tk_client->cl_clid;
+			__entry->xid = be32_to_cpu(rqstp->rq_xid);
 			__entry->error = error;
 		),
 
 		TP_printk(
-			"error=%d (%s)",
-			__entry->error, nfs_show_status(__entry->error)
+			"task:%u@%d xid=0x%08x error=%ld (%s)",
+			__entry->task_id, __entry->client_id, __entry->xid,
+			-__entry->error, nfs_show_status(__entry->error)
 		)
 );
 
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 6ef5278..ed4e1b0 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -77,7 +77,7 @@
 static inline struct nfs_page *
 nfs_page_alloc(void)
 {
-	struct nfs_page	*p = kmem_cache_zalloc(nfs_page_cachep, GFP_NOIO);
+	struct nfs_page	*p = kmem_cache_zalloc(nfs_page_cachep, GFP_KERNEL);
 	if (p)
 		INIT_LIST_HEAD(&p->wb_list);
 	return p;
@@ -775,8 +775,6 @@
 	if (pagecount <= ARRAY_SIZE(pg_array->page_array))
 		pg_array->pagevec = pg_array->page_array;
 	else {
-		if (hdr->rw_mode == FMODE_WRITE)
-			gfp_flags = GFP_NOIO;
 		pg_array->pagevec = kcalloc(pagecount, sizeof(struct page *), gfp_flags);
 		if (!pg_array->pagevec) {
 			pg_array->npages = 0;
@@ -851,7 +849,7 @@
 	desc->pg_mirrors_dynamic = NULL;
 	if (mirror_count == 1)
 		return desc->pg_mirrors_static;
-	ret = kmalloc_array(mirror_count, sizeof(*ret), GFP_NOFS);
+	ret = kmalloc_array(mirror_count, sizeof(*ret), GFP_KERNEL);
 	if (ret != NULL) {
 		for (i = 0; i < mirror_count; i++)
 			nfs_pageio_mirror_init(&ret[i], desc->pg_bsize);
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 83722e9..75bd5b5 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -1890,7 +1890,7 @@
 		spin_unlock(&ino->i_lock);
 		lseg = ERR_PTR(wait_var_event_killable(&lo->plh_outstanding,
 					!atomic_read(&lo->plh_outstanding)));
-		if (IS_ERR(lseg) || !list_empty(&lo->plh_segs))
+		if (IS_ERR(lseg))
 			goto out_put_layout_hdr;
 		pnfs_put_layout_hdr(lo);
 		goto lookup_again;
@@ -1915,6 +1915,7 @@
 	 * stateid.
 	 */
 	if (test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags)) {
+		int status;
 
 		/*
 		 * The first layoutget for the file. Need to serialize per
@@ -1934,13 +1935,20 @@
 		}
 
 		first = true;
-		if (nfs4_select_rw_stateid(ctx->state,
+		status = nfs4_select_rw_stateid(ctx->state,
 					iomode == IOMODE_RW ? FMODE_WRITE : FMODE_READ,
-					NULL, &stateid, NULL) != 0) {
+					NULL, &stateid, NULL);
+		if (status != 0) {
 			trace_pnfs_update_layout(ino, pos, count,
 					iomode, lo, lseg,
 					PNFS_UPDATE_LAYOUT_INVALID_OPEN);
-			goto out_unlock;
+			if (status != -EAGAIN)
+				goto out_unlock;
+			spin_unlock(&ino->i_lock);
+			nfs4_schedule_stateid_recovery(server, ctx->state);
+			pnfs_clear_first_layoutget(lo);
+			pnfs_put_layout_hdr(lo);
+			goto lookup_again;
 		}
 	} else {
 		nfs4_stateid_copy(&stateid, &lo->plh_stateid);
@@ -2029,6 +2037,8 @@
 out_put_layout_hdr:
 	if (first)
 		pnfs_clear_first_layoutget(lo);
+	trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
+				 PNFS_UPDATE_LAYOUT_EXIT);
 	pnfs_put_layout_hdr(lo);
 out:
 	dprintk("%s: inode %s/%llu pNFS layout segment %s for "
@@ -2468,7 +2478,7 @@
 						   wb_size,
 						   IOMODE_RW,
 						   false,
-						   GFP_NOFS);
+						   GFP_KERNEL);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
 			pgio->pg_lseg = NULL;
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index f88ddac..3683d2b 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -77,6 +77,8 @@
 #define NFS_DEFAULT_VERSION 2
 #endif
 
+#define NFS_MAX_CONNECTIONS 16
+
 enum {
 	/* Mount options that take no arguments */
 	Opt_soft, Opt_softerr, Opt_hard,
@@ -108,6 +110,7 @@
 	Opt_nfsvers,
 	Opt_sec, Opt_proto, Opt_mountproto, Opt_mounthost,
 	Opt_addr, Opt_mountaddr, Opt_clientaddr,
+	Opt_nconnect,
 	Opt_lookupcache,
 	Opt_fscache_uniq,
 	Opt_local_lock,
@@ -181,6 +184,8 @@
 	{ Opt_mounthost, "mounthost=%s" },
 	{ Opt_mountaddr, "mountaddr=%s" },
 
+	{ Opt_nconnect, "nconnect=%s" },
+
 	{ Opt_lookupcache, "lookupcache=%s" },
 	{ Opt_fscache_uniq, "fsc=%s" },
 	{ Opt_local_lock, "local_lock=%s" },
@@ -582,7 +587,7 @@
 	}
 	default:
 		if (showdefaults)
-			seq_printf(m, ",mountaddr=unspecified");
+			seq_puts(m, ",mountaddr=unspecified");
 	}
 
 	if (nfss->mountd_version || showdefaults)
@@ -673,6 +678,8 @@
 	seq_printf(m, ",proto=%s",
 		   rpc_peeraddr2str(nfss->client, RPC_DISPLAY_NETID));
 	rcu_read_unlock();
+	if (clp->cl_nconnect > 0)
+		seq_printf(m, ",nconnect=%u", clp->cl_nconnect);
 	if (version == 4) {
 		if (nfss->port != NFS_PORT)
 			seq_printf(m, ",port=%u", nfss->port);
@@ -690,29 +697,29 @@
 		nfs_show_nfsv4_options(m, nfss, showdefaults);
 
 	if (nfss->options & NFS_OPTION_FSCACHE)
-		seq_printf(m, ",fsc");
+		seq_puts(m, ",fsc");
 
 	if (nfss->options & NFS_OPTION_MIGRATION)
-		seq_printf(m, ",migration");
+		seq_puts(m, ",migration");
 
 	if (nfss->flags & NFS_MOUNT_LOOKUP_CACHE_NONEG) {
 		if (nfss->flags & NFS_MOUNT_LOOKUP_CACHE_NONE)
-			seq_printf(m, ",lookupcache=none");
+			seq_puts(m, ",lookupcache=none");
 		else
-			seq_printf(m, ",lookupcache=pos");
+			seq_puts(m, ",lookupcache=pos");
 	}
 
 	local_flock = nfss->flags & NFS_MOUNT_LOCAL_FLOCK;
 	local_fcntl = nfss->flags & NFS_MOUNT_LOCAL_FCNTL;
 
 	if (!local_flock && !local_fcntl)
-		seq_printf(m, ",local_lock=none");
+		seq_puts(m, ",local_lock=none");
 	else if (local_flock && local_fcntl)
-		seq_printf(m, ",local_lock=all");
+		seq_puts(m, ",local_lock=all");
 	else if (local_flock)
-		seq_printf(m, ",local_lock=flock");
+		seq_puts(m, ",local_lock=flock");
 	else
-		seq_printf(m, ",local_lock=posix");
+		seq_puts(m, ",local_lock=posix");
 }
 
 /*
@@ -735,11 +742,21 @@
 EXPORT_SYMBOL_GPL(nfs_show_options);
 
 #if IS_ENABLED(CONFIG_NFS_V4)
+static void show_lease(struct seq_file *m, struct nfs_server *server)
+{
+	struct nfs_client *clp = server->nfs_client;
+	unsigned long expire;
+
+	seq_printf(m, ",lease_time=%ld", clp->cl_lease_time / HZ);
+	expire = clp->cl_last_renewal + clp->cl_lease_time;
+	seq_printf(m, ",lease_expired=%ld",
+		   time_after(expire, jiffies) ?  0 : (jiffies - expire) / HZ);
+}
 #ifdef CONFIG_NFS_V4_1
 static void show_sessions(struct seq_file *m, struct nfs_server *server)
 {
 	if (nfs4_has_session(server->nfs_client))
-		seq_printf(m, ",sessions");
+		seq_puts(m, ",sessions");
 }
 #else
 static void show_sessions(struct seq_file *m, struct nfs_server *server) {}
@@ -816,7 +833,7 @@
 	/*
 	 * Display all mount option settings
 	 */
-	seq_printf(m, "\n\topts:\t");
+	seq_puts(m, "\n\topts:\t");
 	seq_puts(m, sb_rdonly(root->d_sb) ? "ro" : "rw");
 	seq_puts(m, root->d_sb->s_flags & SB_SYNCHRONOUS ? ",sync" : "");
 	seq_puts(m, root->d_sb->s_flags & SB_NOATIME ? ",noatime" : "");
@@ -827,7 +844,7 @@
 
 	show_implementation_id(m, nfss);
 
-	seq_printf(m, "\n\tcaps:\t");
+	seq_puts(m, "\n\tcaps:\t");
 	seq_printf(m, "caps=0x%x", nfss->caps);
 	seq_printf(m, ",wtmult=%u", nfss->wtmult);
 	seq_printf(m, ",dtsize=%u", nfss->dtsize);
@@ -836,13 +853,14 @@
 
 #if IS_ENABLED(CONFIG_NFS_V4)
 	if (nfss->nfs_client->rpc_ops->version == 4) {
-		seq_printf(m, "\n\tnfsv4:\t");
+		seq_puts(m, "\n\tnfsv4:\t");
 		seq_printf(m, "bm0=0x%x", nfss->attr_bitmask[0]);
 		seq_printf(m, ",bm1=0x%x", nfss->attr_bitmask[1]);
 		seq_printf(m, ",bm2=0x%x", nfss->attr_bitmask[2]);
 		seq_printf(m, ",acl=0x%x", nfss->acl_bitmask);
 		show_sessions(m, nfss);
 		show_pnfs(m, nfss);
+		show_lease(m, nfss);
 	}
 #endif
 
@@ -874,20 +892,20 @@
 		preempt_enable();
 	}
 
-	seq_printf(m, "\n\tevents:\t");
+	seq_puts(m, "\n\tevents:\t");
 	for (i = 0; i < __NFSIOS_COUNTSMAX; i++)
 		seq_printf(m, "%lu ", totals.events[i]);
-	seq_printf(m, "\n\tbytes:\t");
+	seq_puts(m, "\n\tbytes:\t");
 	for (i = 0; i < __NFSIOS_BYTESMAX; i++)
 		seq_printf(m, "%Lu ", totals.bytes[i]);
 #ifdef CONFIG_NFS_FSCACHE
 	if (nfss->options & NFS_OPTION_FSCACHE) {
-		seq_printf(m, "\n\tfsc:\t");
+		seq_puts(m, "\n\tfsc:\t");
 		for (i = 0; i < __NFSIOS_FSCACHEMAX; i++)
 			seq_printf(m, "%Lu ", totals.fscache[i]);
 	}
 #endif
-	seq_printf(m, "\n");
+	seq_putc(m, '\n');
 
 	rpc_clnt_show_stats(m, nfss->client);
 
@@ -1549,6 +1567,11 @@
 			if (mnt->mount_server.addrlen == 0)
 				goto out_invalid_address;
 			break;
+		case Opt_nconnect:
+			if (nfs_get_option_ul_bound(args, &option, 1, NFS_MAX_CONNECTIONS))
+				goto out_invalid_value;
+			mnt->nfs_server.nconnect = option;
+			break;
 		case Opt_lookupcache:
 			string = match_strdup(args);
 			if (string == NULL)
diff --git a/fs/nfs/sysfs.c b/fs/nfs/sysfs.c
new file mode 100644
index 0000000..4f3390b
--- /dev/null
+++ b/fs/nfs/sysfs.c
@@ -0,0 +1,187 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2019 Hammerspace Inc
+ */
+
+#include <linux/module.h>
+#include <linux/kobject.h>
+#include <linux/sysfs.h>
+#include <linux/fs.h>
+#include <linux/slab.h>
+#include <linux/netdevice.h>
+#include <linux/string.h>
+#include <linux/nfs_fs.h>
+#include <linux/rcupdate.h>
+
+#include "nfs4_fs.h"
+#include "netns.h"
+#include "sysfs.h"
+
+struct kobject *nfs_client_kobj;
+static struct kset *nfs_client_kset;
+
+static void nfs_netns_object_release(struct kobject *kobj)
+{
+	kfree(kobj);
+}
+
+static const struct kobj_ns_type_operations *nfs_netns_object_child_ns_type(
+		struct kobject *kobj)
+{
+	return &net_ns_type_operations;
+}
+
+static struct kobj_type nfs_netns_object_type = {
+	.release = nfs_netns_object_release,
+	.sysfs_ops = &kobj_sysfs_ops,
+	.child_ns_type = nfs_netns_object_child_ns_type,
+};
+
+static struct kobject *nfs_netns_object_alloc(const char *name,
+		struct kset *kset, struct kobject *parent)
+{
+	struct kobject *kobj;
+
+	kobj = kzalloc(sizeof(*kobj), GFP_KERNEL);
+	if (kobj) {
+		kobj->kset = kset;
+		if (kobject_init_and_add(kobj, &nfs_netns_object_type,
+					parent, "%s", name) == 0)
+			return kobj;
+		kobject_put(kobj);
+	}
+	return NULL;
+}
+
+int nfs_sysfs_init(void)
+{
+	nfs_client_kset = kset_create_and_add("nfs", NULL, fs_kobj);
+	if (!nfs_client_kset)
+		return -ENOMEM;
+	nfs_client_kobj = nfs_netns_object_alloc("net", nfs_client_kset, NULL);
+	if  (!nfs_client_kobj) {
+		kset_unregister(nfs_client_kset);
+		nfs_client_kset = NULL;
+		return -ENOMEM;
+	}
+	return 0;
+}
+
+void nfs_sysfs_exit(void)
+{
+	kobject_put(nfs_client_kobj);
+	kset_unregister(nfs_client_kset);
+}
+
+static ssize_t nfs_netns_identifier_show(struct kobject *kobj,
+		struct kobj_attribute *attr, char *buf)
+{
+	struct nfs_netns_client *c = container_of(kobj,
+			struct nfs_netns_client,
+			kobject);
+	return scnprintf(buf, PAGE_SIZE, "%s\n", c->identifier);
+}
+
+/* Strip trailing '\n' */
+static size_t nfs_string_strip(const char *c, size_t len)
+{
+	while (len > 0 && c[len-1] == '\n')
+		--len;
+	return len;
+}
+
+static ssize_t nfs_netns_identifier_store(struct kobject *kobj,
+		struct kobj_attribute *attr,
+		const char *buf, size_t count)
+{
+	struct nfs_netns_client *c = container_of(kobj,
+			struct nfs_netns_client,
+			kobject);
+	const char *old;
+	char *p;
+	size_t len;
+
+	len = nfs_string_strip(buf, min_t(size_t, count, CONTAINER_ID_MAXLEN));
+	if (!len)
+		return 0;
+	p = kmemdup_nul(buf, len, GFP_KERNEL);
+	if (!p)
+		return -ENOMEM;
+	old = xchg(&c->identifier, p);
+	if (old) {
+		synchronize_rcu();
+		kfree(old);
+	}
+	return count;
+}
+
+static void nfs_netns_client_release(struct kobject *kobj)
+{
+	struct nfs_netns_client *c = container_of(kobj,
+			struct nfs_netns_client,
+			kobject);
+
+	if (c->identifier)
+		kfree(c->identifier);
+	kfree(c);
+}
+
+static const void *nfs_netns_client_namespace(struct kobject *kobj)
+{
+	return container_of(kobj, struct nfs_netns_client, kobject)->net;
+}
+
+static struct kobj_attribute nfs_netns_client_id = __ATTR(identifier,
+		0644, nfs_netns_identifier_show, nfs_netns_identifier_store);
+
+static struct attribute *nfs_netns_client_attrs[] = {
+	&nfs_netns_client_id.attr,
+	NULL,
+};
+
+static struct kobj_type nfs_netns_client_type = {
+	.release = nfs_netns_client_release,
+	.default_attrs = nfs_netns_client_attrs,
+	.sysfs_ops = &kobj_sysfs_ops,
+	.namespace = nfs_netns_client_namespace,
+};
+
+static struct nfs_netns_client *nfs_netns_client_alloc(struct kobject *parent,
+		struct net *net)
+{
+	struct nfs_netns_client *p;
+
+	p = kzalloc(sizeof(*p), GFP_KERNEL);
+	if (p) {
+		p->net = net;
+		p->kobject.kset = nfs_client_kset;
+		if (kobject_init_and_add(&p->kobject, &nfs_netns_client_type,
+					parent, "nfs_client") == 0)
+			return p;
+		kobject_put(&p->kobject);
+	}
+	return NULL;
+}
+
+void nfs_netns_sysfs_setup(struct nfs_net *netns, struct net *net)
+{
+	struct nfs_netns_client *clp;
+
+	clp = nfs_netns_client_alloc(nfs_client_kobj, net);
+	if (clp) {
+		netns->nfs_client = clp;
+		kobject_uevent(&clp->kobject, KOBJ_ADD);
+	}
+}
+
+void nfs_netns_sysfs_destroy(struct nfs_net *netns)
+{
+	struct nfs_netns_client *clp = netns->nfs_client;
+
+	if (clp) {
+		kobject_uevent(&clp->kobject, KOBJ_REMOVE);
+		kobject_del(&clp->kobject);
+		kobject_put(&clp->kobject);
+		netns->nfs_client = NULL;
+	}
+}
diff --git a/fs/nfs/sysfs.h b/fs/nfs/sysfs.h
new file mode 100644
index 0000000..f1b2741
--- /dev/null
+++ b/fs/nfs/sysfs.h
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2019 Hammerspace Inc
+ */
+
+#ifndef __NFS_SYSFS_H
+#define __NFS_SYSFS_H
+
+#define CONTAINER_ID_MAXLEN (64)
+
+struct nfs_netns_client {
+	struct kobject kobject;
+	struct net *net;
+	const char *identifier;
+};
+
+extern struct kobject *nfs_client_kobj;
+
+extern int nfs_sysfs_init(void);
+extern void nfs_sysfs_exit(void);
+
+void nfs_netns_sysfs_setup(struct nfs_net *netns, struct net *net);
+void nfs_netns_sysfs_destroy(struct nfs_net *netns);
+
+#endif
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 059a7c3..92d9cad 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -103,7 +103,7 @@
 
 static struct nfs_pgio_header *nfs_writehdr_alloc(void)
 {
-	struct nfs_pgio_header *p = mempool_alloc(nfs_wdata_mempool, GFP_NOIO);
+	struct nfs_pgio_header *p = mempool_alloc(nfs_wdata_mempool, GFP_KERNEL);
 
 	memset(p, 0, sizeof(*p));
 	p->rw_mode = FMODE_WRITE;
@@ -721,12 +721,11 @@
 	struct inode *inode = mapping->host;
 	struct nfs_pageio_descriptor pgio;
 	struct nfs_io_completion *ioc;
-	unsigned int pflags = memalloc_nofs_save();
 	int err;
 
 	nfs_inc_stats(inode, NFSIOS_VFSWRITEPAGES);
 
-	ioc = nfs_io_completion_alloc(GFP_NOFS);
+	ioc = nfs_io_completion_alloc(GFP_KERNEL);
 	if (ioc)
 		nfs_io_completion_init(ioc, nfs_io_completion_commit, inode);
 
@@ -737,8 +736,6 @@
 	nfs_pageio_complete(&pgio);
 	nfs_io_completion_put(ioc);
 
-	memalloc_nofs_restore(pflags);
-
 	if (err < 0)
 		goto out_err;
 	err = pgio.pg_error;
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 22494d1..fd59904 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -660,6 +660,7 @@
 	PNFS_UPDATE_LAYOUT_BLOCKED,
 	PNFS_UPDATE_LAYOUT_INVALID_OPEN,
 	PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET,
+	PNFS_UPDATE_LAYOUT_EXIT,
 };
 
 #define NFS4_OP_MAP_NUM_LONGS					\
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index d363d57..0a11712 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -223,6 +223,8 @@
 #define NFS_INO_INVALID_MTIME	BIT(10)		/* cached mtime is invalid */
 #define NFS_INO_INVALID_SIZE	BIT(11)		/* cached size is invalid */
 #define NFS_INO_INVALID_OTHER	BIT(12)		/* other attrs are invalid */
+#define NFS_INO_DATA_INVAL_DEFER	\
+				BIT(13)		/* Deferred cache invalidation */
 
 #define NFS_INO_INVALID_ATTR	(NFS_INO_INVALID_CHANGE \
 		| NFS_INO_INVALID_CTIME \
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 1e78032..a87fe85 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -58,6 +58,7 @@
 	struct nfs_subversion *	cl_nfs_mod;	/* pointer to nfs version module */
 
 	u32			cl_minorversion;/* NFSv4 minorversion */
+	unsigned int		cl_nconnect;	/* Number of connections */
 	const char *		cl_principal;  /* used for machine cred */
 
 #if IS_ENABLED(CONFIG_NFS_V4)
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index d4229a7..87d27e1 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -43,6 +43,7 @@
 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
 void xprt_free_bc_rqst(struct rpc_rqst *req);
+unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt);
 
 /*
  * Determine if a shared backchannel is in use
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 6e80731..abc63bd 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -124,6 +124,7 @@
 	u32			prognumber;	/* overrides program->number */
 	u32			version;
 	rpc_authflavor_t	authflavor;
+	u32			nconnect;
 	unsigned long		flags;
 	char			*client_name;
 	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
@@ -163,6 +164,8 @@
 void		rpc_release_client(struct rpc_clnt *);
 void		rpc_task_release_transport(struct rpc_task *);
 void		rpc_task_release_client(struct rpc_task *);
+struct rpc_xprt	*rpc_task_get_xprt(struct rpc_clnt *clnt,
+		struct rpc_xprt *xprt);
 
 int		rpcb_create_local(struct net *);
 void		rpcb_put_local(struct net *);
@@ -191,6 +194,7 @@
 struct net *	rpc_net_ns(struct rpc_clnt *);
 size_t		rpc_max_payload(struct rpc_clnt *);
 size_t		rpc_max_bc_payload(struct rpc_clnt *);
+unsigned int	rpc_num_bc_slots(struct rpc_clnt *);
 void		rpc_force_rebind(struct rpc_clnt *);
 size_t		rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
 const char	*rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
diff --git a/include/linux/sunrpc/metrics.h b/include/linux/sunrpc/metrics.h
index 1b37513..0ee3f70 100644
--- a/include/linux/sunrpc/metrics.h
+++ b/include/linux/sunrpc/metrics.h
@@ -30,7 +30,7 @@
 #include <linux/ktime.h>
 #include <linux/spinlock.h>
 
-#define RPC_IOSTATS_VERS	"1.0"
+#define RPC_IOSTATS_VERS	"1.1"
 
 struct rpc_iostats {
 	spinlock_t		om_lock;
@@ -66,6 +66,11 @@
 	ktime_t			om_queue,	/* queued for xmit */
 				om_rtt,		/* RPC RTT */
 				om_execute;	/* RPC execution */
+	/*
+	 * The count of operations that complete with tk_status < 0.
+	 * These statuses usually indicate error conditions.
+	 */
+	unsigned long           om_error_status;
 } ____cacheline_aligned;
 
 struct rpc_task;
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index d0e4518..baa3ecd 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -126,6 +126,7 @@
 #define RPC_CALL_MAJORSEEN	0x0020		/* major timeout seen */
 #define RPC_TASK_ROOTCREDS	0x0040		/* force root creds */
 #define RPC_TASK_DYNAMIC	0x0080		/* task was kmalloc'ed */
+#define	RPC_TASK_NO_ROUND_ROBIN	0x0100		/* send requests on "main" xprt */
 #define RPC_TASK_SOFT		0x0200		/* Use soft timeouts */
 #define RPC_TASK_SOFTCONN	0x0400		/* Fail if can't connect */
 #define RPC_TASK_SENT		0x0800		/* message was sent */
@@ -183,8 +184,9 @@
 #define RPC_NR_PRIORITY		(1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW)
 
 struct rpc_timer {
-	struct timer_list timer;
 	struct list_head list;
+	unsigned long expires;
+	struct delayed_work dwork;
 };
 
 /*
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index a6d9fce..13e108b 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -158,6 +158,7 @@
 	int		(*bc_setup)(struct rpc_xprt *xprt,
 				    unsigned int min_reqs);
 	size_t		(*bc_maxpayload)(struct rpc_xprt *xprt);
+	unsigned int	(*bc_num_slots)(struct rpc_xprt *xprt);
 	void		(*bc_free_rqst)(struct rpc_rqst *rqst);
 	void		(*bc_destroy)(struct rpc_xprt *xprt,
 				      unsigned int max_reqs);
@@ -238,6 +239,7 @@
 	/*
 	 * Send stuff
 	 */
+	atomic_long_t		queuelen;
 	spinlock_t		transport_lock;	/* lock transport info */
 	spinlock_t		reserve_lock;	/* lock slot table */
 	spinlock_t		queue_lock;	/* send/receive queue lock */
@@ -250,8 +252,9 @@
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	struct svc_serv		*bc_serv;       /* The RPC service which will */
 						/* process the callback */
-	int			bc_alloc_count;	/* Total number of preallocs */
-	atomic_t		bc_free_slots;
+	unsigned int		bc_alloc_max;
+	unsigned int		bc_alloc_count;	/* Total number of preallocs */
+	atomic_t		bc_slot_count;	/* Number of allocated slots */
 	spinlock_t		bc_pa_lock;	/* Protects the preallocated
 						 * items */
 	struct list_head	bc_pa_list;	/* List of preallocated
@@ -334,6 +337,9 @@
  */
 struct rpc_xprt		*xprt_create_transport(struct xprt_create *args);
 void			xprt_connect(struct rpc_task *task);
+unsigned long		xprt_reconnect_delay(const struct rpc_xprt *xprt);
+void			xprt_reconnect_backoff(struct rpc_xprt *xprt,
+					       unsigned long init_to);
 void			xprt_reserve(struct rpc_task *task);
 void			xprt_retry_reserve(struct rpc_task *task);
 int			xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
diff --git a/include/linux/sunrpc/xprtmultipath.h b/include/linux/sunrpc/xprtmultipath.h
index af1257c..c6cce3f 100644
--- a/include/linux/sunrpc/xprtmultipath.h
+++ b/include/linux/sunrpc/xprtmultipath.h
@@ -15,6 +15,8 @@
 	struct kref		xps_kref;
 
 	unsigned int		xps_nxprts;
+	unsigned int		xps_nactive;
+	atomic_long_t		xps_queuelen;
 	struct list_head	xps_xprt_list;
 
 	struct net *		xps_net;
diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index b81d0b3..7638dbe 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -56,6 +56,7 @@
 	 */
 	unsigned long		sock_state;
 	struct delayed_work	connect_worker;
+	struct work_struct	error_worker;
 	struct work_struct	recv_worker;
 	struct mutex		recv_mutex;
 	struct sockaddr_storage	srcaddr;
@@ -84,6 +85,10 @@
 #define XPRT_SOCK_CONNECTING	1U
 #define XPRT_SOCK_DATA_READY	(2)
 #define XPRT_SOCK_UPD_TIMEOUT	(3)
+#define XPRT_SOCK_WAKE_ERROR	(4)
+#define XPRT_SOCK_WAKE_WRITE	(5)
+#define XPRT_SOCK_WAKE_PENDING	(6)
+#define XPRT_SOCK_WAKE_DISCONNECT	(7)
 
 #endif /* __KERNEL__ */
 
diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index df9851cb..f6a4eaa 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -181,18 +181,6 @@
 				),					\
 				TP_ARGS(task, mr, nsegs))
 
-TRACE_DEFINE_ENUM(FRWR_IS_INVALID);
-TRACE_DEFINE_ENUM(FRWR_IS_VALID);
-TRACE_DEFINE_ENUM(FRWR_FLUSHED_FR);
-TRACE_DEFINE_ENUM(FRWR_FLUSHED_LI);
-
-#define xprtrdma_show_frwr_state(x)					\
-		__print_symbolic(x,					\
-				{ FRWR_IS_INVALID, "INVALID" },		\
-				{ FRWR_IS_VALID, "VALID" },		\
-				{ FRWR_FLUSHED_FR, "FLUSHED_FR" },	\
-				{ FRWR_FLUSHED_LI, "FLUSHED_LI" })
-
 DECLARE_EVENT_CLASS(xprtrdma_frwr_done,
 	TP_PROTO(
 		const struct ib_wc *wc,
@@ -203,22 +191,19 @@
 
 	TP_STRUCT__entry(
 		__field(const void *, mr)
-		__field(unsigned int, state)
 		__field(unsigned int, status)
 		__field(unsigned int, vendor_err)
 	),
 
 	TP_fast_assign(
 		__entry->mr = container_of(frwr, struct rpcrdma_mr, frwr);
-		__entry->state = frwr->fr_state;
 		__entry->status = wc->status;
 		__entry->vendor_err = __entry->status ? wc->vendor_err : 0;
 	),
 
 	TP_printk(
-		"mr=%p state=%s: %s (%u/0x%x)",
-		__entry->mr, xprtrdma_show_frwr_state(__entry->state),
-		rdma_show_wc_status(__entry->status),
+		"mr=%p: %s (%u/0x%x)",
+		__entry->mr, rdma_show_wc_status(__entry->status),
 		__entry->status, __entry->vendor_err
 	)
 );
@@ -390,6 +375,37 @@
 DEFINE_RXPRT_EVENT(xprtrdma_op_close);
 DEFINE_RXPRT_EVENT(xprtrdma_op_connect);
 
+TRACE_EVENT(xprtrdma_op_set_cto,
+	TP_PROTO(
+		const struct rpcrdma_xprt *r_xprt,
+		unsigned long connect,
+		unsigned long reconnect
+	),
+
+	TP_ARGS(r_xprt, connect, reconnect),
+
+	TP_STRUCT__entry(
+		__field(const void *, r_xprt)
+		__field(unsigned long, connect)
+		__field(unsigned long, reconnect)
+		__string(addr, rpcrdma_addrstr(r_xprt))
+		__string(port, rpcrdma_portstr(r_xprt))
+	),
+
+	TP_fast_assign(
+		__entry->r_xprt = r_xprt;
+		__entry->connect = connect;
+		__entry->reconnect = reconnect;
+		__assign_str(addr, rpcrdma_addrstr(r_xprt));
+		__assign_str(port, rpcrdma_portstr(r_xprt));
+	),
+
+	TP_printk("peer=[%s]:%s r_xprt=%p: connect=%lu reconnect=%lu",
+		__get_str(addr), __get_str(port), __entry->r_xprt,
+		__entry->connect / HZ, __entry->reconnect / HZ
+	)
+);
+
 TRACE_EVENT(xprtrdma_qp_event,
 	TP_PROTO(
 		const struct rpcrdma_xprt *r_xprt,
@@ -470,13 +486,12 @@
 
 TRACE_EVENT(xprtrdma_marshal,
 	TP_PROTO(
-		const struct rpc_rqst *rqst,
-		unsigned int hdrlen,
+		const struct rpcrdma_req *req,
 		unsigned int rtype,
 		unsigned int wtype
 	),
 
-	TP_ARGS(rqst, hdrlen, rtype, wtype),
+	TP_ARGS(req, rtype, wtype),
 
 	TP_STRUCT__entry(
 		__field(unsigned int, task_id)
@@ -491,10 +506,12 @@
 	),
 
 	TP_fast_assign(
+		const struct rpc_rqst *rqst = &req->rl_slot;
+
 		__entry->task_id = rqst->rq_task->tk_pid;
 		__entry->client_id = rqst->rq_task->tk_client->cl_clid;
 		__entry->xid = be32_to_cpu(rqst->rq_xid);
-		__entry->hdrlen = hdrlen;
+		__entry->hdrlen = req->rl_hdrbuf.len;
 		__entry->headlen = rqst->rq_snd_buf.head[0].iov_len;
 		__entry->pagelen = rqst->rq_snd_buf.page_len;
 		__entry->taillen = rqst->rq_snd_buf.tail[0].iov_len;
@@ -538,6 +555,33 @@
 	)
 );
 
+TRACE_EVENT(xprtrdma_prepsend_failed,
+	TP_PROTO(const struct rpc_rqst *rqst,
+		 int ret
+	),
+
+	TP_ARGS(rqst, ret),
+
+	TP_STRUCT__entry(
+		__field(unsigned int, task_id)
+		__field(unsigned int, client_id)
+		__field(u32, xid)
+		__field(int, ret)
+	),
+
+	TP_fast_assign(
+		__entry->task_id = rqst->rq_task->tk_pid;
+		__entry->client_id = rqst->rq_task->tk_client->cl_clid;
+		__entry->xid = be32_to_cpu(rqst->rq_xid);
+		__entry->ret = ret;
+	),
+
+	TP_printk("task:%u@%u xid=0x%08x: ret=%d",
+		__entry->task_id, __entry->client_id, __entry->xid,
+		__entry->ret
+	)
+);
+
 TRACE_EVENT(xprtrdma_post_send,
 	TP_PROTO(
 		const struct rpcrdma_req *req,
@@ -559,7 +603,8 @@
 		const struct rpc_rqst *rqst = &req->rl_slot;
 
 		__entry->task_id = rqst->rq_task->tk_pid;
-		__entry->client_id = rqst->rq_task->tk_client->cl_clid;
+		__entry->client_id = rqst->rq_task->tk_client ?
+				     rqst->rq_task->tk_client->cl_clid : -1;
 		__entry->req = req;
 		__entry->num_sge = req->rl_sendctx->sc_wr.num_sge;
 		__entry->signaled = req->rl_sendctx->sc_wr.send_flags &
@@ -698,6 +743,7 @@
 DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_fastreg);
 DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li);
 DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_wake);
+DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_done);
 
 TRACE_EVENT(xprtrdma_frwr_alloc,
 	TP_PROTO(
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index aa30750..3bcf985 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -35,7 +35,7 @@
 
 	  If unsure, say Y.
 
-config CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES
+config SUNRPC_DISABLE_INSECURE_ENCTYPES
 	bool "Secure RPC: Disable insecure Kerberos encryption types"
 	depends on RPCSEC_GSS_KRB5
 	default n
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index c47d826..339e8c0 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -31,25 +31,20 @@
 #define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
+#define BC_MAX_SLOTS	64U
+
+unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
+{
+	return BC_MAX_SLOTS;
+}
+
 /*
  * Helper routines that track the number of preallocation elements
  * on the transport.
  */
 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
 {
-	return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots);
-}
-
-static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
-{
-	atomic_add(n, &xprt->bc_free_slots);
-	xprt->bc_alloc_count += n;
-}
-
-static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
-{
-	atomic_sub(n, &xprt->bc_free_slots);
-	return xprt->bc_alloc_count -= n;
+	return xprt->bc_alloc_count < xprt->bc_alloc_max;
 }
 
 /*
@@ -145,6 +140,9 @@
 
 	dprintk("RPC:       setup backchannel transport\n");
 
+	if (min_reqs > BC_MAX_SLOTS)
+		min_reqs = BC_MAX_SLOTS;
+
 	/*
 	 * We use a temporary list to keep track of the preallocated
 	 * buffers.  Once we're done building the list we splice it
@@ -172,7 +170,9 @@
 	 */
 	spin_lock(&xprt->bc_pa_lock);
 	list_splice(&tmp_list, &xprt->bc_pa_list);
-	xprt_inc_alloc_count(xprt, min_reqs);
+	xprt->bc_alloc_count += min_reqs;
+	xprt->bc_alloc_max += min_reqs;
+	atomic_add(min_reqs, &xprt->bc_slot_count);
 	spin_unlock(&xprt->bc_pa_lock);
 
 	dprintk("RPC:       setup backchannel transport done\n");
@@ -220,11 +220,13 @@
 		goto out;
 
 	spin_lock_bh(&xprt->bc_pa_lock);
-	xprt_dec_alloc_count(xprt, max_reqs);
+	xprt->bc_alloc_max -= max_reqs;
 	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 		dprintk("RPC:        req=%p\n", req);
 		list_del(&req->rq_bc_pa_list);
 		xprt_free_allocation(req);
+		xprt->bc_alloc_count--;
+		atomic_dec(&xprt->bc_slot_count);
 		if (--max_reqs == 0)
 			break;
 	}
@@ -241,13 +243,14 @@
 	struct rpc_rqst *req = NULL;
 
 	dprintk("RPC:       allocate a backchannel request\n");
-	if (atomic_read(&xprt->bc_free_slots) <= 0)
-		goto not_found;
 	if (list_empty(&xprt->bc_pa_list)) {
 		if (!new)
 			goto not_found;
+		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
+			goto not_found;
 		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
 		xprt->bc_alloc_count++;
+		atomic_inc(&xprt->bc_slot_count);
 	}
 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
 				rq_bc_pa_list);
@@ -291,6 +294,7 @@
 	if (xprt_need_to_requeue(xprt)) {
 		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
 		xprt->bc_alloc_count++;
+		atomic_inc(&xprt->bc_slot_count);
 		req = NULL;
 	}
 	spin_unlock_bh(&xprt->bc_pa_lock);
@@ -357,7 +361,7 @@
 
 	spin_lock(&xprt->bc_pa_lock);
 	list_del(&req->rq_bc_pa_list);
-	xprt_dec_alloc_count(xprt, 1);
+	xprt->bc_alloc_count--;
 	spin_unlock(&xprt->bc_pa_lock);
 
 	req->rq_private_buf.len = copied;
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index b03bfa0..d8679b60 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -528,6 +528,8 @@
 		.bc_xprt = args->bc_xprt,
 	};
 	char servername[48];
+	struct rpc_clnt *clnt;
+	int i;
 
 	if (args->bc_xprt) {
 		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
@@ -590,7 +592,15 @@
 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 		xprt->resvport = 0;
 
-	return rpc_create_xprt(args, xprt);
+	clnt = rpc_create_xprt(args, xprt);
+	if (IS_ERR(clnt) || args->nconnect <= 1)
+		return clnt;
+
+	for (i = 0; i < args->nconnect - 1; i++) {
+		if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
+			break;
+	}
+	return clnt;
 }
 EXPORT_SYMBOL_GPL(rpc_create);
 
@@ -968,13 +978,46 @@
 }
 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 
+struct rpc_xprt *
+rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
+{
+	struct rpc_xprt_switch *xps;
+
+	if (!xprt)
+		return NULL;
+	rcu_read_lock();
+	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
+	atomic_long_inc(&xps->xps_queuelen);
+	rcu_read_unlock();
+	atomic_long_inc(&xprt->queuelen);
+
+	return xprt;
+}
+
+static void
+rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
+{
+	struct rpc_xprt_switch *xps;
+
+	atomic_long_dec(&xprt->queuelen);
+	rcu_read_lock();
+	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
+	atomic_long_dec(&xps->xps_queuelen);
+	rcu_read_unlock();
+
+	xprt_put(xprt);
+}
+
 void rpc_task_release_transport(struct rpc_task *task)
 {
 	struct rpc_xprt *xprt = task->tk_xprt;
 
 	if (xprt) {
 		task->tk_xprt = NULL;
-		xprt_put(xprt);
+		if (task->tk_client)
+			rpc_task_release_xprt(task->tk_client, xprt);
+		else
+			xprt_put(xprt);
 	}
 }
 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
@@ -983,6 +1026,7 @@
 {
 	struct rpc_clnt *clnt = task->tk_client;
 
+	rpc_task_release_transport(task);
 	if (clnt != NULL) {
 		/* Remove from client task list */
 		spin_lock(&clnt->cl_lock);
@@ -992,14 +1036,34 @@
 
 		rpc_release_client(clnt);
 	}
-	rpc_task_release_transport(task);
+}
+
+static struct rpc_xprt *
+rpc_task_get_first_xprt(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt *xprt;
+
+	rcu_read_lock();
+	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+	rcu_read_unlock();
+	return rpc_task_get_xprt(clnt, xprt);
+}
+
+static struct rpc_xprt *
+rpc_task_get_next_xprt(struct rpc_clnt *clnt)
+{
+	return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
 }
 
 static
 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
 {
-	if (!task->tk_xprt)
-		task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
+	if (task->tk_xprt)
+		return;
+	if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
+		task->tk_xprt = rpc_task_get_first_xprt(clnt);
+	else
+		task->tk_xprt = rpc_task_get_next_xprt(clnt);
 }
 
 static
@@ -1462,6 +1526,19 @@
 }
 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
 
+unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt *xprt;
+	unsigned int ret;
+
+	rcu_read_lock();
+	xprt = rcu_dereference(clnt->cl_xprt);
+	ret = xprt->ops->bc_num_slots(xprt);
+	rcu_read_unlock();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
+
 /**
  * rpc_force_rebind - force transport to check that remote port is unchanged
  * @clnt: client to rebind
@@ -1788,6 +1865,7 @@
 	req->rq_snd_buf.head[0].iov_len = 0;
 	xdr_init_encode(&xdr, &req->rq_snd_buf,
 			req->rq_snd_buf.head[0].iov_base, req);
+	xdr_free_bvec(&req->rq_snd_buf);
 	if (rpc_encode_header(task, &xdr))
 		return;
 
@@ -1827,8 +1905,6 @@
 			rpc_call_rpcerror(task, task->tk_status);
 		}
 		return;
-	} else {
-		xprt_request_prepare(task->tk_rqstp);
 	}
 
 	/* Add task to reply queue before transmission to avoid races */
@@ -2696,6 +2772,10 @@
 		return -ENOMEM;
 	data->xps = xprt_switch_get(xps);
 	data->xprt = xprt_get(xprt);
+	if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
+		rpc_cb_add_xprt_release(data);
+		goto success;
+	}
 
 	task = rpc_call_null_helper(clnt, xprt, NULL,
 			RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC|RPC_TASK_NULLCREDS,
@@ -2703,6 +2783,7 @@
 	if (IS_ERR(task))
 		return PTR_ERR(task);
 	rpc_put_task(task);
+success:
 	return 1;
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c
index 707d7aa..fd9bca2 100644
--- a/net/sunrpc/debugfs.c
+++ b/net/sunrpc/debugfs.c
@@ -1,5 +1,5 @@
 // SPDX-License-Identifier: GPL-2.0
-/**
+/*
  * debugfs interface for sunrpc
  *
  * (c) 2014 Jeff Layton <jlayton@primarydata.com>
@@ -117,12 +117,37 @@
 	.release	= tasks_release,
 };
 
+static int do_xprt_debugfs(struct rpc_clnt *clnt, struct rpc_xprt *xprt, void *numv)
+{
+	int len;
+	char name[24]; /* enough for "../../rpc_xprt/ + 8 hex digits + NULL */
+	char link[9]; /* enough for 8 hex digits + NULL */
+	int *nump = numv;
+
+	if (IS_ERR_OR_NULL(xprt->debugfs))
+		return 0;
+	len = snprintf(name, sizeof(name), "../../rpc_xprt/%s",
+		       xprt->debugfs->d_name.name);
+	if (len > sizeof(name))
+		return -1;
+	if (*nump == 0)
+		strcpy(link, "xprt");
+	else {
+		len = snprintf(link, sizeof(link), "xprt%d", *nump);
+		if (len > sizeof(link))
+			return -1;
+	}
+	debugfs_create_symlink(link, clnt->cl_debugfs, name);
+	(*nump)++;
+	return 0;
+}
+
 void
 rpc_clnt_debugfs_register(struct rpc_clnt *clnt)
 {
 	int len;
-	char name[24]; /* enough for "../../rpc_xprt/ + 8 hex digits + NULL */
-	struct rpc_xprt *xprt;
+	char name[9]; /* enough for 8 hex digits + NULL */
+	int xprtnum = 0;
 
 	len = snprintf(name, sizeof(name), "%x", clnt->cl_clid);
 	if (len >= sizeof(name))
@@ -135,26 +160,7 @@
 	debugfs_create_file("tasks", S_IFREG | 0400, clnt->cl_debugfs, clnt,
 			    &tasks_fops);
 
-	rcu_read_lock();
-	xprt = rcu_dereference(clnt->cl_xprt);
-	/* no "debugfs" dentry? Don't bother with the symlink. */
-	if (IS_ERR_OR_NULL(xprt->debugfs)) {
-		rcu_read_unlock();
-		return;
-	}
-	len = snprintf(name, sizeof(name), "../../rpc_xprt/%s",
-			xprt->debugfs->d_name.name);
-	rcu_read_unlock();
-
-	if (len >= sizeof(name))
-		goto out_err;
-
-	debugfs_create_symlink("xprt", clnt->cl_debugfs, name);
-
-	return;
-out_err:
-	debugfs_remove_recursive(clnt->cl_debugfs);
-	clnt->cl_debugfs = NULL;
+	rpc_clnt_iterate_for_each_xprt(clnt, do_xprt_debugfs, &xprtnum);
 }
 
 void
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index a2c1148..1f275ab 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -23,6 +23,7 @@
 #include <linux/sched/mm.h>
 
 #include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/metrics.h>
 
 #include "sunrpc.h"
 
@@ -46,7 +47,7 @@
 
 static void			rpc_async_schedule(struct work_struct *);
 static void			 rpc_release_task(struct rpc_task *task);
-static void __rpc_queue_timer_fn(struct timer_list *t);
+static void __rpc_queue_timer_fn(struct work_struct *);
 
 /*
  * RPC tasks sit here while waiting for conditions to improve.
@@ -58,6 +59,7 @@
  */
 struct workqueue_struct *rpciod_workqueue __read_mostly;
 struct workqueue_struct *xprtiod_workqueue __read_mostly;
+EXPORT_SYMBOL_GPL(xprtiod_workqueue);
 
 unsigned long
 rpc_task_timeout(const struct rpc_task *task)
@@ -87,13 +89,19 @@
 	task->tk_timeout = 0;
 	list_del(&task->u.tk_wait.timer_list);
 	if (list_empty(&queue->timer_list.list))
-		del_timer(&queue->timer_list.timer);
+		cancel_delayed_work(&queue->timer_list.dwork);
 }
 
 static void
 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
 {
-	timer_reduce(&queue->timer_list.timer, expires);
+	unsigned long now = jiffies;
+	queue->timer_list.expires = expires;
+	if (time_before_eq(expires, now))
+		expires = 0;
+	else
+		expires -= now;
+	mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
 }
 
 /*
@@ -107,7 +115,8 @@
 		task->tk_pid, jiffies_to_msecs(timeout - jiffies));
 
 	task->tk_timeout = timeout;
-	rpc_set_queue_timer(queue, timeout);
+	if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
+		rpc_set_queue_timer(queue, timeout);
 	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
 }
 
@@ -250,7 +259,8 @@
 	queue->maxpriority = nr_queues - 1;
 	rpc_reset_waitqueue_priority(queue);
 	queue->qlen = 0;
-	timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0);
+	queue->timer_list.expires = 0;
+	INIT_DEFERRABLE_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
 	INIT_LIST_HEAD(&queue->timer_list.list);
 	rpc_assign_waitqueue_name(queue, qname);
 }
@@ -269,7 +279,7 @@
 
 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
 {
-	del_timer_sync(&queue->timer_list.timer);
+	cancel_delayed_work_sync(&queue->timer_list.dwork);
 }
 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
 
@@ -424,9 +434,9 @@
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&q->lock);
+	spin_lock(&q->lock);
 	__rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority);
-	spin_unlock_bh(&q->lock);
+	spin_unlock(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout);
 
@@ -442,9 +452,9 @@
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&q->lock);
+	spin_lock(&q->lock);
 	__rpc_sleep_on_priority(q, task, task->tk_priority);
-	spin_unlock_bh(&q->lock);
+	spin_unlock(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on);
 
@@ -458,9 +468,9 @@
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&q->lock);
+	spin_lock(&q->lock);
 	__rpc_sleep_on_priority_timeout(q, task, timeout, priority);
-	spin_unlock_bh(&q->lock);
+	spin_unlock(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout);
 
@@ -475,9 +485,9 @@
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&q->lock);
+	spin_lock(&q->lock);
 	__rpc_sleep_on_priority(q, task, priority);
-	spin_unlock_bh(&q->lock);
+	spin_unlock(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
 
@@ -555,9 +565,9 @@
 {
 	if (!RPC_IS_QUEUED(task))
 		return;
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 }
 
 /*
@@ -567,9 +577,9 @@
 {
 	if (!RPC_IS_QUEUED(task))
 		return;
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	rpc_wake_up_task_queue_locked(queue, task);
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
 
@@ -602,9 +612,9 @@
 {
 	if (!RPC_IS_QUEUED(task))
 		return;
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	rpc_wake_up_task_queue_set_status_locked(queue, task, status);
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 }
 
 /*
@@ -667,12 +677,12 @@
 
 	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
 			queue, rpc_qname(queue));
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	task = __rpc_find_next_queued(queue);
 	if (task != NULL)
 		task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
 				task, func, data);
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 
 	return task;
 }
@@ -711,7 +721,7 @@
 {
 	struct list_head *head;
 
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		while (!list_empty(head)) {
@@ -725,7 +735,7 @@
 			break;
 		head--;
 	}
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up);
 
@@ -740,7 +750,7 @@
 {
 	struct list_head *head;
 
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	head = &queue->tasks[queue->maxpriority];
 	for (;;) {
 		while (!list_empty(head)) {
@@ -755,13 +765,15 @@
 			break;
 		head--;
 	}
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 
-static void __rpc_queue_timer_fn(struct timer_list *t)
+static void __rpc_queue_timer_fn(struct work_struct *work)
 {
-	struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer);
+	struct rpc_wait_queue *queue = container_of(work,
+			struct rpc_wait_queue,
+			timer_list.dwork.work);
 	struct rpc_task *task, *n;
 	unsigned long expires, now, timeo;
 
@@ -832,6 +844,10 @@
 void rpc_exit_task(struct rpc_task *task)
 {
 	task->tk_action = NULL;
+	if (task->tk_ops->rpc_count_stats)
+		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
+	else if (task->tk_client)
+		rpc_count_iostats(task, task->tk_client->cl_metrics);
 	if (task->tk_ops->rpc_call_done != NULL) {
 		task->tk_ops->rpc_call_done(task, task->tk_calldata);
 		if (task->tk_action != NULL) {
@@ -927,13 +943,13 @@
 		 * rpc_task pointer may still be dereferenced.
 		 */
 		queue = task->tk_waitqueue;
-		spin_lock_bh(&queue->lock);
+		spin_lock(&queue->lock);
 		if (!RPC_IS_QUEUED(task)) {
-			spin_unlock_bh(&queue->lock);
+			spin_unlock(&queue->lock);
 			continue;
 		}
 		rpc_clear_running(task);
-		spin_unlock_bh(&queue->lock);
+		spin_unlock(&queue->lock);
 		if (task_is_async)
 			return;
 
@@ -1076,7 +1092,8 @@
 	/* Initialize workqueue for async tasks */
 	task->tk_workqueue = task_setup_data->workqueue;
 
-	task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
+	task->tk_xprt = rpc_task_get_xprt(task_setup_data->rpc_client,
+			xprt_get(task_setup_data->rpc_xprt));
 
 	task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred);
 
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index 2b6dc7e..7c74197 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -177,6 +177,8 @@
 
 	execute = ktime_sub(now, task->tk_start);
 	op_metrics->om_execute = ktime_add(op_metrics->om_execute, execute);
+	if (task->tk_status < 0)
+		op_metrics->om_error_status++;
 
 	spin_unlock(&op_metrics->om_lock);
 
@@ -219,13 +221,14 @@
 	a->om_queue = ktime_add(a->om_queue, b->om_queue);
 	a->om_rtt = ktime_add(a->om_rtt, b->om_rtt);
 	a->om_execute = ktime_add(a->om_execute, b->om_execute);
+	a->om_error_status += b->om_error_status;
 }
 
 static void _print_rpc_iostats(struct seq_file *seq, struct rpc_iostats *stats,
 			       int op, const struct rpc_procinfo *procs)
 {
 	_print_name(seq, op, procs);
-	seq_printf(seq, "%lu %lu %lu %Lu %Lu %Lu %Lu %Lu\n",
+	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %llu %lu\n",
 		   stats->om_ops,
 		   stats->om_ntrans,
 		   stats->om_timeouts,
@@ -233,12 +236,20 @@
 		   stats->om_bytes_recv,
 		   ktime_to_ms(stats->om_queue),
 		   ktime_to_ms(stats->om_rtt),
-		   ktime_to_ms(stats->om_execute));
+		   ktime_to_ms(stats->om_execute),
+		   stats->om_error_status);
+}
+
+static int do_print_stats(struct rpc_clnt *clnt, struct rpc_xprt *xprt, void *seqv)
+{
+	struct seq_file *seq = seqv;
+
+	xprt->ops->print_stats(xprt, seq);
+	return 0;
 }
 
 void rpc_clnt_show_stats(struct seq_file *seq, struct rpc_clnt *clnt)
 {
-	struct rpc_xprt *xprt;
 	unsigned int op, maxproc = clnt->cl_maxproc;
 
 	if (!clnt->cl_metrics)
@@ -248,11 +259,7 @@
 	seq_printf(seq, "p/v: %u/%u (%s)\n",
 			clnt->cl_prog, clnt->cl_vers, clnt->cl_program->name);
 
-	rcu_read_lock();
-	xprt = rcu_dereference(clnt->cl_xprt);
-	if (xprt)
-		xprt->ops->print_stats(xprt, seq);
-	rcu_read_unlock();
+	rpc_clnt_iterate_for_each_xprt(clnt, do_print_stats, seq);
 
 	seq_printf(seq, "\tper-op statistics\n");
 	for (op = 0; op < maxproc; op++) {
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index e15cb70..220b799 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1595,7 +1595,7 @@
 	/* Parse and execute the bc call */
 	proc_error = svc_process_common(rqstp, argv, resv);
 
-	atomic_inc(&req->rq_xprt->bc_free_slots);
+	atomic_dec(&req->rq_xprt->bc_slot_count);
 	if (!proc_error) {
 		/* Processing error: drop the request */
 		xprt_free_bc_request(req);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index f6c82b1..783748d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -302,9 +302,9 @@
 
 	if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
 		return 1;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	retval = xprt->ops->reserve_xprt(xprt, task);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return retval;
 }
 
@@ -381,9 +381,9 @@
 {
 	if (xprt->snd_task != task)
 		return;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 /*
@@ -435,9 +435,9 @@
 
 	if (req->rq_cong)
 		return true;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	ret = __xprt_get_cong(xprt, req) != 0;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return ret;
 }
 EXPORT_SYMBOL_GPL(xprt_request_get_cong);
@@ -464,9 +464,9 @@
 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
 {
 	if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->transport_lock);
 		__xprt_lock_write_next_cong(xprt);
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->transport_lock);
 	}
 }
 
@@ -563,9 +563,9 @@
 
 	if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
 		return false;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	ret = xprt_clear_write_space_locked(xprt);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return ret;
 }
 EXPORT_SYMBOL_GPL(xprt_write_space);
@@ -634,9 +634,9 @@
 		req->rq_retries = 0;
 		xprt_reset_majortimeo(req);
 		/* Reset the RTT counters == "slow start" */
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->transport_lock);
 		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->transport_lock);
 		status = -ETIMEDOUT;
 	}
 
@@ -668,11 +668,11 @@
 void xprt_disconnect_done(struct rpc_xprt *xprt)
 {
 	dprintk("RPC:       disconnected transport %p\n", xprt);
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt_clear_connected(xprt);
 	xprt_clear_write_space_locked(xprt);
 	xprt_wake_pending_tasks(xprt, -ENOTCONN);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
 
@@ -684,7 +684,7 @@
 void xprt_force_disconnect(struct rpc_xprt *xprt)
 {
 	/* Don't race with the test_bit() in xprt_clear_locked() */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
@@ -692,7 +692,7 @@
 	else if (xprt->snd_task)
 		rpc_wake_up_queued_task_set_status(&xprt->pending,
 				xprt->snd_task, -ENOTCONN);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
@@ -726,7 +726,7 @@
 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 {
 	/* Don't race with the test_bit() in xprt_clear_locked() */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (cookie != xprt->connect_cookie)
 		goto out;
 	if (test_bit(XPRT_CLOSING, &xprt->state))
@@ -737,7 +737,7 @@
 		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 static bool
@@ -750,6 +750,7 @@
 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
 	__must_hold(&xprt->transport_lock)
 {
+	xprt->last_used = jiffies;
 	if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
 		mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
 }
@@ -759,18 +760,13 @@
 {
 	struct rpc_xprt *xprt = from_timer(xprt, t, timer);
 
-	spin_lock(&xprt->transport_lock);
 	if (!RB_EMPTY_ROOT(&xprt->recv_queue))
-		goto out_abort;
+		return;
 	/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
 	xprt->last_used = jiffies;
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
-		goto out_abort;
-	spin_unlock(&xprt->transport_lock);
+		return;
 	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
-	return;
-out_abort:
-	spin_unlock(&xprt->transport_lock);
 }
 
 bool xprt_lock_connect(struct rpc_xprt *xprt,
@@ -779,7 +775,7 @@
 {
 	bool ret = false;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (!test_bit(XPRT_LOCKED, &xprt->state))
 		goto out;
 	if (xprt->snd_task != task)
@@ -787,13 +783,13 @@
 	xprt->snd_task = cookie;
 	ret = true;
 out:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	return ret;
 }
 
 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
 {
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (xprt->snd_task != cookie)
 		goto out;
 	if (!test_bit(XPRT_LOCKED, &xprt->state))
@@ -802,7 +798,7 @@
 	xprt->ops->release_xprt(xprt, NULL);
 	xprt_schedule_autodisconnect(xprt);
 out:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	wake_up_bit(&xprt->state, XPRT_LOCKED);
 }
 
@@ -850,6 +846,38 @@
 	xprt_release_write(xprt, task);
 }
 
+/**
+ * xprt_reconnect_delay - compute the wait before scheduling a connect
+ * @xprt: transport instance
+ *
+ */
+unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
+{
+	unsigned long start, now = jiffies;
+
+	start = xprt->stat.connect_start + xprt->reestablish_timeout;
+	if (time_after(start, now))
+		return start - now;
+	return 0;
+}
+EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
+
+/**
+ * xprt_reconnect_backoff - compute the new re-establish timeout
+ * @xprt: transport instance
+ * @init_to: initial reestablish timeout
+ *
+ */
+void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
+{
+	xprt->reestablish_timeout <<= 1;
+	if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
+		xprt->reestablish_timeout = xprt->max_reconnect_timeout;
+	if (xprt->reestablish_timeout < init_to)
+		xprt->reestablish_timeout = init_to;
+}
+EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
+
 enum xprt_xid_rb_cmp {
 	XID_RB_EQUAL,
 	XID_RB_LEFT,
@@ -1013,6 +1041,8 @@
 
 	if (!xprt_request_need_enqueue_receive(task, req))
 		return;
+
+	xprt_request_prepare(task->tk_rqstp);
 	spin_lock(&xprt->queue_lock);
 
 	/* Update the softirq receive buffer */
@@ -1412,14 +1442,14 @@
 	xprt_inject_disconnect(xprt);
 
 	task->tk_flags |= RPC_TASK_SENT;
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 
 	xprt->stat.sends++;
 	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
 	xprt->stat.bklog_u += xprt->backlog.qlen;
 	xprt->stat.sending_u += xprt->sending.qlen;
 	xprt->stat.pending_u += xprt->pending.qlen;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	req->rq_connect_cookie = connect_cookie;
 out_dequeue:
@@ -1765,18 +1795,13 @@
 	}
 
 	xprt = req->rq_xprt;
-	if (task->tk_ops->rpc_count_stats != NULL)
-		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
-	else if (task->tk_client)
-		rpc_count_iostats(task, task->tk_client->cl_metrics);
 	xprt_request_dequeue_all(task, req);
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
 	if (xprt->ops->release_request)
 		xprt->ops->release_request(task);
-	xprt->last_used = jiffies;
 	xprt_schedule_autodisconnect(xprt);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	if (req->rq_buffer)
 		xprt->ops->buf_free(task);
 	xprt_inject_disconnect(xprt);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
index 8394124..78c075a 100644
--- a/net/sunrpc/xprtmultipath.c
+++ b/net/sunrpc/xprtmultipath.c
@@ -19,7 +19,7 @@
 #include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/xprtmultipath.h>
 
-typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
+typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps,
 		const struct rpc_xprt *cur);
 
 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
@@ -36,6 +36,7 @@
 	if (xps->xps_nxprts == 0)
 		xps->xps_net = xprt->xprt_net;
 	xps->xps_nxprts++;
+	xps->xps_nactive++;
 }
 
 /**
@@ -51,8 +52,7 @@
 	if (xprt == NULL)
 		return;
 	spin_lock(&xps->xps_lock);
-	if ((xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) &&
-	    !rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
+	if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
 		xprt_switch_add_xprt_locked(xps, xprt);
 	spin_unlock(&xps->xps_lock);
 }
@@ -62,6 +62,7 @@
 {
 	if (unlikely(xprt == NULL))
 		return;
+	xps->xps_nactive--;
 	xps->xps_nxprts--;
 	if (xps->xps_nxprts == 0)
 		xps->xps_net = NULL;
@@ -102,7 +103,9 @@
 	if (xps != NULL) {
 		spin_lock_init(&xps->xps_lock);
 		kref_init(&xps->xps_kref);
-		xps->xps_nxprts = 0;
+		xps->xps_nxprts = xps->xps_nactive = 0;
+		atomic_long_set(&xps->xps_queuelen, 0);
+		xps->xps_net = NULL;
 		INIT_LIST_HEAD(&xps->xps_xprt_list);
 		xps->xps_iter_ops = &rpc_xprt_iter_singular;
 		xprt_switch_add_xprt_locked(xps, xprt);
@@ -193,9 +196,21 @@
 }
 
 static
+bool xprt_is_active(const struct rpc_xprt *xprt)
+{
+	return kref_read(&xprt->kref) != 0;
+}
+
+static
 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
 {
-	return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch);
+	struct rpc_xprt *pos;
+
+	list_for_each_entry_rcu(pos, head, xprt_switch) {
+		if (xprt_is_active(pos))
+			return pos;
+	}
+	return NULL;
 }
 
 static
@@ -213,9 +228,12 @@
 		const struct rpc_xprt *cur)
 {
 	struct rpc_xprt *pos;
+	bool found = false;
 
 	list_for_each_entry_rcu(pos, head, xprt_switch) {
 		if (cur == pos)
+			found = true;
+		if (found && xprt_is_active(pos))
 			return pos;
 	}
 	return NULL;
@@ -260,9 +278,12 @@
 		const struct rpc_xprt *cur)
 {
 	struct rpc_xprt *pos, *prev = NULL;
+	bool found = false;
 
 	list_for_each_entry_rcu(pos, head, xprt_switch) {
 		if (cur == prev)
+			found = true;
+		if (found && xprt_is_active(pos))
 			return pos;
 		prev = pos;
 	}
@@ -270,22 +291,15 @@
 }
 
 static
-struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
+struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps,
 		struct rpc_xprt **cursor,
 		xprt_switch_find_xprt_t find_next)
 {
-	struct rpc_xprt *cur, *pos, *old;
+	struct rpc_xprt *pos, *old;
 
-	cur = READ_ONCE(*cursor);
-	for (;;) {
-		old = cur;
-		pos = find_next(head, old);
-		if (pos == NULL)
-			break;
-		cur = cmpxchg_relaxed(cursor, old, pos);
-		if (cur == old)
-			break;
-	}
+	old = smp_load_acquire(cursor);
+	pos = find_next(xps, old);
+	smp_store_release(cursor, pos);
 	return pos;
 }
 
@@ -297,13 +311,11 @@
 
 	if (xps == NULL)
 		return NULL;
-	return xprt_switch_set_next_cursor(&xps->xps_xprt_list,
-			&xpi->xpi_cursor,
-			find_next);
+	return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next);
 }
 
 static
-struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
+struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head,
 		const struct rpc_xprt *cur)
 {
 	struct rpc_xprt *ret;
@@ -315,6 +327,31 @@
 }
 
 static
+struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps,
+		const struct rpc_xprt *cur)
+{
+	struct list_head *head = &xps->xps_xprt_list;
+	struct rpc_xprt *xprt;
+	unsigned int nactive;
+
+	for (;;) {
+		unsigned long xprt_queuelen, xps_queuelen;
+
+		xprt = __xprt_switch_find_next_entry_roundrobin(head, cur);
+		if (!xprt)
+			break;
+		xprt_queuelen = atomic_long_read(&xprt->queuelen);
+		xps_queuelen = atomic_long_read(&xps->xps_queuelen);
+		nactive = READ_ONCE(xps->xps_nactive);
+		/* Exit loop if xprt_queuelen <= average queue length */
+		if (xprt_queuelen * nactive <= xps_queuelen)
+			break;
+		cur = xprt;
+	}
+	return xprt;
+}
+
+static
 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
 {
 	return xprt_iter_next_entry_multiple(xpi,
@@ -322,9 +359,17 @@
 }
 
 static
+struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps,
+		const struct rpc_xprt *cur)
+{
+	return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur);
+}
+
+static
 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
 {
-	return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
+	return xprt_iter_next_entry_multiple(xpi,
+			xprt_switch_find_next_entry_all);
 }
 
 /*
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ce98659..59e624b 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -52,6 +52,13 @@
 	return maxmsg - RPCRDMA_HDRLEN_MIN;
 }
 
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	return r_xprt->rx_buf.rb_bc_srv_max_requests;
+}
+
 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 794ba4c..0b6dad7 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -144,6 +144,26 @@
 	frwr_release_mr(mr);
 }
 
+/* frwr_reset - Place MRs back on the free list
+ * @req: request to reset
+ *
+ * Used after a failed marshal. For FRWR, this means the MRs
+ * don't have to be fully released and recreated.
+ *
+ * NB: This is safe only as long as none of @req's MRs are
+ * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
+ * Work Request.
+ */
+void frwr_reset(struct rpcrdma_req *req)
+{
+	while (!list_empty(&req->rl_registered)) {
+		struct rpcrdma_mr *mr;
+
+		mr = rpcrdma_mr_pop(&req->rl_registered);
+		rpcrdma_mr_unmap_and_put(mr);
+	}
+}
+
 /**
  * frwr_init_mr - Initialize one MR
  * @ia: interface adapter
@@ -168,7 +188,6 @@
 		goto out_list_err;
 
 	mr->frwr.fr_mr = frmr;
-	mr->frwr.fr_state = FRWR_IS_INVALID;
 	mr->mr_dir = DMA_NONE;
 	INIT_LIST_HEAD(&mr->mr_list);
 	INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
@@ -298,65 +317,6 @@
 }
 
 /**
- * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
- * @cq:	completion queue (ignored)
- * @wc:	completed WR
- *
- */
-static void
-frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr =
-			container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-
-	/* WARNING: Only wr_cqe and status are reliable at this point */
-	if (wc->status != IB_WC_SUCCESS)
-		frwr->fr_state = FRWR_FLUSHED_FR;
-	trace_xprtrdma_wc_fastreg(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
- * @cq:	completion queue (ignored)
- * @wc:	completed WR
- *
- */
-static void
-frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
-						 fr_cqe);
-
-	/* WARNING: Only wr_cqe and status are reliable at this point */
-	if (wc->status != IB_WC_SUCCESS)
-		frwr->fr_state = FRWR_FLUSHED_LI;
-	trace_xprtrdma_wc_li(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
- * @cq:	completion queue (ignored)
- * @wc:	completed WR
- *
- * Awaken anyone waiting for an MR to finish being fenced.
- */
-static void
-frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
-						 fr_cqe);
-
-	/* WARNING: Only wr_cqe and status are reliable at this point */
-	if (wc->status != IB_WC_SUCCESS)
-		frwr->fr_state = FRWR_FLUSHED_LI;
-	trace_xprtrdma_wc_li_wake(wc, frwr);
-	complete(&frwr->fr_linv_done);
-}
-
-/**
  * frwr_map - Register a memory region
  * @r_xprt: controlling transport
  * @seg: memory region co-ordinates
@@ -378,23 +338,15 @@
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
-	struct rpcrdma_frwr *frwr;
 	struct rpcrdma_mr *mr;
 	struct ib_mr *ibmr;
 	struct ib_reg_wr *reg_wr;
 	int i, n;
 	u8 key;
 
-	mr = NULL;
-	do {
-		if (mr)
-			rpcrdma_mr_recycle(mr);
-		mr = rpcrdma_mr_get(r_xprt);
-		if (!mr)
-			return ERR_PTR(-EAGAIN);
-	} while (mr->frwr.fr_state != FRWR_IS_INVALID);
-	frwr = &mr->frwr;
-	frwr->fr_state = FRWR_IS_VALID;
+	mr = rpcrdma_mr_get(r_xprt);
+	if (!mr)
+		goto out_getmr_err;
 
 	if (nsegs > ia->ri_max_frwr_depth)
 		nsegs = ia->ri_max_frwr_depth;
@@ -423,7 +375,7 @@
 	if (!mr->mr_nents)
 		goto out_dmamap_err;
 
-	ibmr = frwr->fr_mr;
+	ibmr = mr->frwr.fr_mr;
 	n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
 	if (unlikely(n != mr->mr_nents))
 		goto out_mapmr_err;
@@ -433,7 +385,7 @@
 	key = (u8)(ibmr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(ibmr, ++key);
 
-	reg_wr = &frwr->fr_regwr;
+	reg_wr = &mr->frwr.fr_regwr;
 	reg_wr->mr = ibmr;
 	reg_wr->key = ibmr->rkey;
 	reg_wr->access = writing ?
@@ -448,6 +400,10 @@
 	*out = mr;
 	return seg;
 
+out_getmr_err:
+	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+	return ERR_PTR(-EAGAIN);
+
 out_dmamap_err:
 	mr->mr_dir = DMA_NONE;
 	trace_xprtrdma_frwr_sgerr(mr, i);
@@ -461,6 +417,23 @@
 }
 
 /**
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
+static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_fastreg(wc, frwr);
+	/* The MR will get recycled when the associated req is retransmitted */
+}
+
+/**
  * frwr_send - post Send WR containing the RPC Call message
  * @ia: interface adapter
  * @req: Prepared RPC Call
@@ -512,31 +485,75 @@
 		if (mr->mr_handle == rep->rr_inv_rkey) {
 			list_del_init(&mr->mr_list);
 			trace_xprtrdma_mr_remoteinv(mr);
-			mr->frwr.fr_state = FRWR_IS_INVALID;
 			rpcrdma_mr_unmap_and_put(mr);
 			break;	/* only one invalidated MR per RPC */
 		}
 }
 
+static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
+{
+	if (wc->status != IB_WC_SUCCESS)
+		rpcrdma_mr_recycle(mr);
+	else
+		rpcrdma_mr_unmap_and_put(mr);
+}
+
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
+static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_li(wc, frwr);
+	__frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_li_wake(wc, frwr);
+	complete(&frwr->fr_linv_done);
+	__frwr_release_mr(wc, mr);
+}
+
 /**
  * frwr_unmap_sync - invalidate memory regions that were registered for @req
- * @r_xprt: controlling transport
- * @mrs: list of MRs to process
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
  *
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
- *
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
+ * Sleeps until it is safe for the host CPU to access the previously mapped
+ * memory regions. This guarantees that registered MRs are properly fenced
+ * from the server before the RPC consumer accesses the data in them. It
+ * also ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
  */
-void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *first, **prev, *last;
 	const struct ib_send_wr *bad_wr;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_frwr *frwr;
 	struct rpcrdma_mr *mr;
-	int count, rc;
+	int rc;
 
 	/* ORDER: Invalidate all of the MRs first
 	 *
@@ -544,33 +561,32 @@
 	 * a single ib_post_send() call.
 	 */
 	frwr = NULL;
-	count = 0;
 	prev = &first;
-	list_for_each_entry(mr, mrs, mr_list) {
-		mr->frwr.fr_state = FRWR_IS_INVALID;
+	while (!list_empty(&req->rl_registered)) {
+		mr = rpcrdma_mr_pop(&req->rl_registered);
+
+		trace_xprtrdma_mr_localinv(mr);
+		r_xprt->rx_stats.local_inv_needed++;
 
 		frwr = &mr->frwr;
-		trace_xprtrdma_mr_localinv(mr);
-
 		frwr->fr_cqe.done = frwr_wc_localinv;
 		last = &frwr->fr_invwr;
-		memset(last, 0, sizeof(*last));
+		last->next = NULL;
 		last->wr_cqe = &frwr->fr_cqe;
+		last->sg_list = NULL;
+		last->num_sge = 0;
 		last->opcode = IB_WR_LOCAL_INV;
+		last->send_flags = IB_SEND_SIGNALED;
 		last->ex.invalidate_rkey = mr->mr_handle;
-		count++;
 
 		*prev = last;
 		prev = &last->next;
 	}
-	if (!frwr)
-		goto unmap;
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
 	 * are complete.
 	 */
-	last->send_flags = IB_SEND_SIGNALED;
 	frwr->fr_cqe.done = frwr_wc_localinv_wake;
 	reinit_completion(&frwr->fr_linv_done);
 
@@ -578,29 +594,20 @@
 	 * replaces the QP. The RPC reply handler won't call us
 	 * unless ri_id->qp is a valid pointer.
 	 */
-	r_xprt->rx_stats.local_inv_needed++;
 	bad_wr = NULL;
-	rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+	trace_xprtrdma_post_send(req, rc);
+
+	/* The final LOCAL_INV WR in the chain is supposed to
+	 * do the wake. If it was never posted, the wake will
+	 * not happen, so don't wait in that case.
+	 */
 	if (bad_wr != first)
 		wait_for_completion(&frwr->fr_linv_done);
-	if (rc)
-		goto out_release;
+	if (!rc)
+		return;
 
-	/* ORDER: Now DMA unmap all of the MRs, and return
-	 * them to the free MR list.
-	 */
-unmap:
-	while (!list_empty(mrs)) {
-		mr = rpcrdma_mr_pop(mrs);
-		rpcrdma_mr_unmap_and_put(mr);
-	}
-	return;
-
-out_release:
-	pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
-
-	/* Unmap and release the MRs in the LOCAL_INV WRs that did not
-	 * get posted.
+	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
 	 */
 	while (bad_wr) {
 		frwr = container_of(bad_wr, struct rpcrdma_frwr,
@@ -612,3 +619,101 @@
 		rpcrdma_mr_recycle(mr);
 	}
 }
+
+/**
+ * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
+ * @cq:	completion queue (ignored)
+ * @wc:	completed WR
+ *
+ */
+static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct rpcrdma_frwr *frwr =
+		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+	/* WARNING: Only wr_cqe and status are reliable at this point */
+	trace_xprtrdma_wc_li_done(wc, frwr);
+	rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
+	__frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_unmap_async - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * This guarantees that registered MRs are properly fenced from the
+ * server before the RPC consumer accesses the data in them. It also
+ * ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+	struct ib_send_wr *first, *last, **prev;
+	const struct ib_send_wr *bad_wr;
+	struct rpcrdma_frwr *frwr;
+	struct rpcrdma_mr *mr;
+	int rc;
+
+	/* Chain the LOCAL_INV Work Requests and post them with
+	 * a single ib_post_send() call.
+	 */
+	frwr = NULL;
+	prev = &first;
+	while (!list_empty(&req->rl_registered)) {
+		mr = rpcrdma_mr_pop(&req->rl_registered);
+
+		trace_xprtrdma_mr_localinv(mr);
+		r_xprt->rx_stats.local_inv_needed++;
+
+		frwr = &mr->frwr;
+		frwr->fr_cqe.done = frwr_wc_localinv;
+		frwr->fr_req = req;
+		last = &frwr->fr_invwr;
+		last->next = NULL;
+		last->wr_cqe = &frwr->fr_cqe;
+		last->sg_list = NULL;
+		last->num_sge = 0;
+		last->opcode = IB_WR_LOCAL_INV;
+		last->send_flags = IB_SEND_SIGNALED;
+		last->ex.invalidate_rkey = mr->mr_handle;
+
+		*prev = last;
+		prev = &last->next;
+	}
+
+	/* Strong send queue ordering guarantees that when the
+	 * last WR in the chain completes, all WRs in the chain
+	 * are complete. The last completion will wake up the
+	 * RPC waiter.
+	 */
+	frwr->fr_cqe.done = frwr_wc_localinv_done;
+
+	/* Transport disconnect drains the receive CQ before it
+	 * replaces the QP. The RPC reply handler won't call us
+	 * unless ri_id->qp is a valid pointer.
+	 */
+	bad_wr = NULL;
+	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+	trace_xprtrdma_post_send(req, rc);
+	if (!rc)
+		return;
+
+	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
+	 */
+	while (bad_wr) {
+		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
+		mr = container_of(frwr, struct rpcrdma_mr, frwr);
+		bad_wr = bad_wr->next;
+
+		rpcrdma_mr_recycle(mr);
+	}
+
+	/* The final LOCAL_INV WR in the chain is supposed to
+	 * do the wake. If it was never posted, the wake will
+	 * not happen, so wake here in that case.
+	 */
+	rpcrdma_complete_rqst(req->rl_reply);
+}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 85115a2..4345e69 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -366,6 +366,9 @@
 	unsigned int pos;
 	int nsegs;
 
+	if (rtype == rpcrdma_noch)
+		goto done;
+
 	pos = rqst->rq_snd_buf.head[0].iov_len;
 	if (rtype == rpcrdma_areadch)
 		pos = 0;
@@ -389,7 +392,8 @@
 		nsegs -= mr->mr_nents;
 	} while (nsegs);
 
-	return 0;
+done:
+	return encode_item_not_present(xdr);
 }
 
 /* Register and XDR encode the Write list. Supports encoding a list
@@ -417,6 +421,9 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_writech)
+		goto done;
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
@@ -451,7 +458,8 @@
 	/* Update count of segments in this Write chunk */
 	*segcount = cpu_to_be32(nchunks);
 
-	return 0;
+done:
+	return encode_item_not_present(xdr);
 }
 
 /* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -476,6 +484,9 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_replych)
+		return encode_item_not_present(xdr);
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 	if (nsegs < 0)
@@ -511,6 +522,16 @@
 	return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
+	struct rpcrdma_rep *rep = req->rl_reply;
+
+	rpcrdma_complete_rqst(rep);
+	rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
  * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
@@ -520,6 +541,9 @@
 {
 	struct ib_sge *sge;
 
+	if (!sc->sc_unmap_count)
+		return;
+
 	/* The first two SGEs contain the transport header and
 	 * the inline buffer. These are always left mapped so
 	 * they can be cheaply re-used.
@@ -529,9 +553,7 @@
 		ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
 				  DMA_TO_DEVICE);
 
-	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
-			       &sc->sc_req->rl_flags))
-		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+	kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +688,7 @@
 out:
 	sc->sc_wr.num_sge += sge_no;
 	if (sc->sc_unmap_count)
-		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+		kref_get(&req->rl_kref);
 	return true;
 
 out_regbuf:
@@ -699,22 +721,28 @@
 			  struct rpcrdma_req *req, u32 hdrlen,
 			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
+	int ret;
+
+	ret = -EAGAIN;
 	req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 	if (!req->rl_sendctx)
-		return -EAGAIN;
+		goto err;
 	req->rl_sendctx->sc_wr.num_sge = 0;
 	req->rl_sendctx->sc_unmap_count = 0;
 	req->rl_sendctx->sc_req = req;
-	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+	kref_init(&req->rl_kref);
 
+	ret = -EIO;
 	if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
-		return -EIO;
-
+		goto err;
 	if (rtype != rpcrdma_areadch)
 		if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
-			return -EIO;
-
+			goto err;
 	return 0;
+
+err:
+	trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+	return ret;
 }
 
 /**
@@ -842,50 +870,28 @@
 	 * send a Call message with a Position Zero Read chunk and a
 	 * regular Read chunk at the same time.
 	 */
-	if (rtype != rpcrdma_noch) {
-		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
+	ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 	if (ret)
 		goto out_err;
 
-	if (wtype == rpcrdma_writech) {
-		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
-	if (ret)
-		goto out_err;
-
-	if (wtype != rpcrdma_replych)
-		ret = encode_item_not_present(xdr);
-	else
-		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
-	if (ret)
-		goto out_err;
-
-	trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
-	ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+	ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
 					&rqst->rq_snd_buf, rtype);
 	if (ret)
 		goto out_err;
+
+	trace_xprtrdma_marshal(req, rtype, wtype);
 	return 0;
 
 out_err:
 	trace_xprtrdma_marshal_failed(rqst, ret);
-	switch (ret) {
-	case -EAGAIN:
-		xprt_wait_for_buffer_space(rqst->rq_xprt);
-		break;
-	case -ENOBUFS:
-		break;
-	default:
-		r_xprt->rx_stats.failed_marshal_count++;
-	}
+	r_xprt->rx_stats.failed_marshal_count++;
+	frwr_reset(req);
 	return ret;
 }
 
@@ -1269,51 +1275,17 @@
 	goto out;
 }
 
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-	/* Invalidate and unmap the data payloads before waking
-	 * the waiting application. This guarantees the memory
-	 * regions are properly fenced from the server before the
-	 * application accesses the data. It also ensures proper
-	 * send flow control: waking the next RPC waits until this
-	 * RPC has relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&req->rl_registered))
-		frwr_unmap_sync(r_xprt, &req->rl_registered);
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
 
-	/* Ensure that any DMA mapped pages associated with
-	 * the Send of the RPC Call have been unmapped before
-	 * allowing the RPC to complete. This protects argument
-	 * memory not controlled by the RPC client from being
-	 * re-used before we're done with it.
-	 */
-	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-		r_xprt->rx_stats.reply_waits_for_send++;
-		out_of_line_wait_on_bit(&req->rl_flags,
-					RPCRDMA_REQ_F_TX_RESOURCES,
-					bit_wait,
-					TASK_UNINTERRUPTIBLE);
-	}
+	rpcrdma_complete_rqst(req->rl_reply);
 }
 
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
-{
-	struct rpcrdma_rep *rep =
-			container_of(work, struct rpcrdma_rep, rr_work);
-	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-
-	trace_xprtrdma_defer_cmp(rep);
-	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
-		frwr_reminv(rep, &req->rl_registered);
-	rpcrdma_release_rqst(r_xprt, req);
-	rpcrdma_complete_rqst(rep);
-}
-
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
  *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
@@ -1360,10 +1332,10 @@
 	else if (credits > buf->rb_max_requests)
 		credits = buf->rb_max_requests;
 	if (buf->rb_credits != credits) {
-		spin_lock_bh(&xprt->transport_lock);
+		spin_lock(&xprt->transport_lock);
 		buf->rb_credits = credits;
 		xprt->cwnd = credits << RPC_CWNDSHIFT;
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->transport_lock);
 	}
 
 	req = rpcr_to_rdmar(rqst);
@@ -1373,10 +1345,16 @@
 	}
 	req->rl_reply = rep;
 	rep->rr_rqst = rqst;
-	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
-	queue_work(buf->rb_completion_wq, &rep->rr_work);
+
+	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+		frwr_reminv(rep, &req->rl_registered);
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_async(r_xprt, req);
+		/* LocalInv completion will complete the RPC */
+	else
+		kref_put(&req->rl_kref, rpcrdma_reply_done);
 	return;
 
 out_badversion:
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index bed57d8..d1fcc41 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -72,9 +72,9 @@
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
 		credits = r_xprt->rx_buf.rb_bc_max_requests;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt->cwnd = credits << RPC_CWNDSHIFT;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	spin_lock(&xprt->queue_lock);
 	ret = 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 0004535..3fe6651 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -226,9 +226,9 @@
 	 * Enqueue the new transport on the accept queue of the listening
 	 * transport
 	 */
-	spin_lock_bh(&listen_xprt->sc_lock);
+	spin_lock(&listen_xprt->sc_lock);
 	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
-	spin_unlock_bh(&listen_xprt->sc_lock);
+	spin_unlock(&listen_xprt->sc_lock);
 
 	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 	svc_xprt_enqueue(&listen_xprt->sc_xprt);
@@ -401,7 +401,7 @@
 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	clear_bit(XPT_CONN, &xprt->xpt_flags);
 	/* Get the next entry off the accept list */
-	spin_lock_bh(&listen_rdma->sc_lock);
+	spin_lock(&listen_rdma->sc_lock);
 	if (!list_empty(&listen_rdma->sc_accept_q)) {
 		newxprt = list_entry(listen_rdma->sc_accept_q.next,
 				     struct svcxprt_rdma, sc_accept_q);
@@ -409,7 +409,7 @@
 	}
 	if (!list_empty(&listen_rdma->sc_accept_q))
 		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
-	spin_unlock_bh(&listen_rdma->sc_lock);
+	spin_unlock(&listen_rdma->sc_lock);
 	if (!newxprt)
 		return NULL;
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 1f73a6a..52abdda 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -298,6 +298,7 @@
 	module_put(THIS_MODULE);
 }
 
+/* 60 second timeout, no retries */
 static const struct rpc_timeout xprt_rdma_default_timeout = {
 	.to_initval = 60 * HZ,
 	.to_maxval = 60 * HZ,
@@ -323,8 +324,9 @@
 	if (!xprt)
 		return ERR_PTR(-ENOMEM);
 
-	/* 60 second timeout, no retries */
 	xprt->timeout = &xprt_rdma_default_timeout;
+	xprt->connect_timeout = xprt->timeout->to_initval;
+	xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
 	xprt->bind_timeout = RPCRDMA_BIND_TO;
 	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
@@ -487,31 +489,64 @@
 }
 
 /**
- * xprt_rdma_connect - try to establish a transport connection
+ * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
+ * @xprt: controlling transport instance
+ * @connect_timeout: reconnect timeout after client disconnects
+ * @reconnect_timeout: reconnect timeout after server disconnects
+ *
+ */
+static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
+					      unsigned long connect_timeout,
+					      unsigned long reconnect_timeout)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
+
+	spin_lock(&xprt->transport_lock);
+
+	if (connect_timeout < xprt->connect_timeout) {
+		struct rpc_timeout to;
+		unsigned long initval;
+
+		to = *xprt->timeout;
+		initval = connect_timeout;
+		if (initval < RPCRDMA_INIT_REEST_TO << 1)
+			initval = RPCRDMA_INIT_REEST_TO << 1;
+		to.to_initval = initval;
+		to.to_maxval = initval;
+		r_xprt->rx_timeout = to;
+		xprt->timeout = &r_xprt->rx_timeout;
+		xprt->connect_timeout = connect_timeout;
+	}
+
+	if (reconnect_timeout < xprt->max_reconnect_timeout)
+		xprt->max_reconnect_timeout = reconnect_timeout;
+
+	spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * xprt_rdma_connect - schedule an attempt to reconnect
  * @xprt: transport state
- * @task: RPC scheduler context
+ * @task: RPC scheduler context (unused)
  *
  */
 static void
 xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	unsigned long delay;
 
 	trace_xprtrdma_op_connect(r_xprt);
+
+	delay = 0;
 	if (r_xprt->rx_ep.rep_connected != 0) {
-		/* Reconnect */
-		schedule_delayed_work(&r_xprt->rx_connect_worker,
-				      xprt->reestablish_timeout);
-		xprt->reestablish_timeout <<= 1;
-		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
-			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
-		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
-			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
-	} else {
-		schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
-		if (!RPC_IS_ASYNC(task))
-			flush_delayed_work(&r_xprt->rx_connect_worker);
+		delay = xprt_reconnect_delay(xprt);
+		xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
 	}
+	queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
+			   delay);
 }
 
 /**
@@ -550,8 +585,11 @@
 static void
 xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
 {
+	struct rpcrdma_xprt *r_xprt =
+		container_of(xprt, struct rpcrdma_xprt, rx_xprt);
+
 	memset(rqst, 0, sizeof(*rqst));
-	rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
+	rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
 	rpc_wake_up_next(&xprt->backlog);
 }
 
@@ -618,9 +656,16 @@
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 
-	if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
-		rpcrdma_release_rqst(r_xprt, req);
 	trace_xprtrdma_op_free(task, req);
+
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_sync(r_xprt, req);
+
+	/* XXX: If the RPC is completing because of a signal and
+	 * not because a reply was received, we ought to ensure
+	 * that the Send completion has fired, so that memory
+	 * involved with the Send is not still visible to the NIC.
+	 */
 }
 
 /**
@@ -667,7 +712,6 @@
 		goto drop_connection;
 	rqst->rq_xtime = ktime_get();
 
-	__set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
 		goto drop_connection;
 
@@ -760,6 +804,7 @@
 	.send_request		= xprt_rdma_send_request,
 	.close			= xprt_rdma_close,
 	.destroy		= xprt_rdma_destroy,
+	.set_connect_timeout	= xprt_rdma_tcp_set_connect_timeout,
 	.print_stats		= xprt_rdma_print_stats,
 	.enable_swap		= xprt_rdma_enable_swap,
 	.disable_swap		= xprt_rdma_disable_swap,
@@ -767,6 +812,7 @@
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	.bc_setup		= xprt_rdma_bc_setup,
 	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
+	.bc_num_slots		= xprt_rdma_bc_max_slots,
 	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
 	.bc_destroy		= xprt_rdma_bc_destroy,
 #endif
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 84bb379..805b1f35 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -89,14 +89,12 @@
  */
 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 {
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 	/* Flush Receives, then wait for deferred Reply work
 	 * to complete.
 	 */
 	ib_drain_rq(ia->ri_id->qp);
-	drain_workqueue(buf->rb_completion_wq);
 
 	/* Deferred Reply processing might have scheduled
 	 * local invalidations.
@@ -901,7 +899,7 @@
 	 * completions recently. This is a sign the Send Queue is
 	 * backing up. Cause the caller to pause and try again.
 	 */
-	set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
+	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 	r_xprt->rx_stats.empty_sendctx_q++;
 	return NULL;
 }
@@ -936,10 +934,7 @@
 	/* Paired with READ_ONCE */
 	smp_store_release(&buf->rb_sc_tail, next_tail);
 
-	if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
-		smp_mb__after_atomic();
-		xprt_write_space(&sc->sc_xprt->rx_xprt);
-	}
+	xprt_write_space(&sc->sc_xprt->rx_xprt);
 }
 
 static void
@@ -977,8 +972,6 @@
 	r_xprt->rx_stats.mrs_allocated += count;
 	spin_unlock(&buf->rb_mrlock);
 	trace_xprtrdma_createmrs(r_xprt, count);
-
-	xprt_write_space(&r_xprt->rx_xprt);
 }
 
 static void
@@ -990,6 +983,7 @@
 						   rx_buf);
 
 	rpcrdma_mrs_create(r_xprt);
+	xprt_write_space(&r_xprt->rx_xprt);
 }
 
 /**
@@ -1025,7 +1019,6 @@
 	if (!req->rl_recvbuf)
 		goto out4;
 
-	req->rl_buffer = buffer;
 	INIT_LIST_HEAD(&req->rl_registered);
 	spin_lock(&buffer->rb_lock);
 	list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1042,9 +1035,9 @@
 	return NULL;
 }
 
-static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
+static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
+					      bool temp)
 {
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_rep *rep;
 
 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
@@ -1055,27 +1048,22 @@
 					       DMA_FROM_DEVICE, GFP_KERNEL);
 	if (!rep->rr_rdmabuf)
 		goto out_free;
+
 	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
 		     rdmab_length(rep->rr_rdmabuf));
-
 	rep->rr_cqe.done = rpcrdma_wc_receive;
 	rep->rr_rxprt = r_xprt;
-	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
 	rep->rr_recv_wr.next = NULL;
 	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	rep->rr_recv_wr.num_sge = 1;
 	rep->rr_temp = temp;
-
-	spin_lock(&buf->rb_lock);
-	list_add(&rep->rr_list, &buf->rb_recv_bufs);
-	spin_unlock(&buf->rb_lock);
-	return true;
+	return rep;
 
 out_free:
 	kfree(rep);
 out:
-	return false;
+	return NULL;
 }
 
 /**
@@ -1089,7 +1077,6 @@
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	int i, rc;
 
-	buf->rb_flags = 0;
 	buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
 	buf->rb_bc_srv_max_requests = 0;
 	spin_lock_init(&buf->rb_mrlock);
@@ -1122,15 +1109,6 @@
 	if (rc)
 		goto out;
 
-	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
-						WQ_MEM_RECLAIM | WQ_HIGHPRI,
-						0,
-			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
-	if (!buf->rb_completion_wq) {
-		rc = -ENOMEM;
-		goto out;
-	}
-
 	return 0;
 out:
 	rpcrdma_buffer_destroy(buf);
@@ -1204,11 +1182,6 @@
 {
 	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
-	if (buf->rb_completion_wq) {
-		destroy_workqueue(buf->rb_completion_wq);
-		buf->rb_completion_wq = NULL;
-	}
-
 	rpcrdma_sendctxs_destroy(buf);
 
 	while (!list_empty(&buf->rb_recv_bufs)) {
@@ -1325,13 +1298,12 @@
 
 /**
  * rpcrdma_buffer_put - Put request/reply buffers back into pool
+ * @buffers: buffer pool
  * @req: object to return
  *
  */
-void
-rpcrdma_buffer_put(struct rpcrdma_req *req)
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 {
-	struct rpcrdma_buffer *buffers = req->rl_buffer;
 	struct rpcrdma_rep *rep = req->rl_reply;
 
 	req->rl_reply = NULL;
@@ -1484,8 +1456,7 @@
 	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
 	int rc;
 
-	if (!ep->rep_send_count ||
-	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+	if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
 		send_wr->send_flags |= IB_SEND_SIGNALED;
 		ep->rep_send_count = ep->rep_send_batch;
 	} else {
@@ -1505,11 +1476,13 @@
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
-	struct ib_recv_wr *wr, *bad_wr;
+	struct ib_recv_wr *i, *wr, *bad_wr;
+	struct rpcrdma_rep *rep;
 	int needed, count, rc;
 
 	rc = 0;
 	count = 0;
+
 	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
 	if (ep->rep_receive_count > needed)
 		goto out;
@@ -1517,51 +1490,65 @@
 	if (!temp)
 		needed += RPCRDMA_MAX_RECV_BATCH;
 
-	count = 0;
+	/* fast path: all needed reps can be found on the free list */
 	wr = NULL;
+	spin_lock(&buf->rb_lock);
 	while (needed) {
-		struct rpcrdma_regbuf *rb;
-		struct rpcrdma_rep *rep;
-
-		spin_lock(&buf->rb_lock);
 		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
 					       struct rpcrdma_rep, rr_list);
-		if (likely(rep))
-			list_del(&rep->rr_list);
-		spin_unlock(&buf->rb_lock);
-		if (!rep) {
-			if (!rpcrdma_rep_create(r_xprt, temp))
-				break;
-			continue;
-		}
-
-		rb = rep->rr_rdmabuf;
-		if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) {
-			rpcrdma_recv_buffer_put(rep);
+		if (!rep)
 			break;
-		}
 
-		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+		list_del(&rep->rr_list);
 		rep->rr_recv_wr.next = wr;
 		wr = &rep->rr_recv_wr;
-		++count;
 		--needed;
 	}
-	if (!count)
+	spin_unlock(&buf->rb_lock);
+
+	while (needed) {
+		rep = rpcrdma_rep_create(r_xprt, temp);
+		if (!rep)
+			break;
+
+		rep->rr_recv_wr.next = wr;
+		wr = &rep->rr_recv_wr;
+		--needed;
+	}
+	if (!wr)
 		goto out;
 
+	for (i = wr; i; i = i->next) {
+		rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+
+		if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
+			goto release_wrs;
+
+		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+		++count;
+	}
+
 	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
 			  (const struct ib_recv_wr **)&bad_wr);
+out:
+	trace_xprtrdma_post_recvs(r_xprt, count, rc);
 	if (rc) {
-		for (wr = bad_wr; wr; wr = wr->next) {
+		for (wr = bad_wr; wr;) {
 			struct rpcrdma_rep *rep;
 
 			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
+			wr = wr->next;
 			rpcrdma_recv_buffer_put(rep);
 			--count;
 		}
 	}
 	ep->rep_receive_count += count;
-out:
-	trace_xprtrdma_post_recvs(r_xprt, count, rc);
+	return;
+
+release_wrs:
+	for (i = wr; i;) {
+		rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
+		i = i->next;
+		rpcrdma_recv_buffer_put(rep);
+	}
 }
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index d1e0749..92ce09f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -44,7 +44,8 @@
 
 #include <linux/wait.h> 		/* wait_queue_head_t, etc */
 #include <linux/spinlock.h> 		/* spinlock_t, etc */
-#include <linux/atomic.h>			/* atomic_t, etc */
+#include <linux/atomic.h>		/* atomic_t, etc */
+#include <linux/kref.h>			/* struct kref */
 #include <linux/workqueue.h>		/* struct work_struct */
 
 #include <rdma/rdma_cm.h>		/* RDMA connection api */
@@ -202,10 +203,9 @@
 	bool			rr_temp;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 	struct rpcrdma_xprt	*rr_rxprt;
-	struct work_struct	rr_work;
+	struct rpc_rqst		*rr_rqst;
 	struct xdr_buf		rr_hdrbuf;
 	struct xdr_stream	rr_stream;
-	struct rpc_rqst		*rr_rqst;
 	struct list_head	rr_list;
 	struct ib_recv_wr	rr_recv_wr;
 };
@@ -240,18 +240,12 @@
  * An external memory region is any buffer or page that is registered
  * on the fly (ie, not pre-registered).
  */
-enum rpcrdma_frwr_state {
-	FRWR_IS_INVALID,	/* ready to be used */
-	FRWR_IS_VALID,		/* in use */
-	FRWR_FLUSHED_FR,	/* flushed FASTREG WR */
-	FRWR_FLUSHED_LI,	/* flushed LOCALINV WR */
-};
-
+struct rpcrdma_req;
 struct rpcrdma_frwr {
 	struct ib_mr			*fr_mr;
 	struct ib_cqe			fr_cqe;
-	enum rpcrdma_frwr_state		fr_state;
 	struct completion		fr_linv_done;
+	struct rpcrdma_req		*fr_req;
 	union {
 		struct ib_reg_wr	fr_regwr;
 		struct ib_send_wr	fr_invwr;
@@ -326,7 +320,6 @@
 struct rpcrdma_req {
 	struct list_head	rl_list;
 	struct rpc_rqst		rl_slot;
-	struct rpcrdma_buffer	*rl_buffer;
 	struct rpcrdma_rep	*rl_reply;
 	struct xdr_stream	rl_stream;
 	struct xdr_buf		rl_hdrbuf;
@@ -336,18 +329,12 @@
 	struct rpcrdma_regbuf	*rl_recvbuf;	/* rq_rcv_buf */
 
 	struct list_head	rl_all;
-	unsigned long		rl_flags;
+	struct kref		rl_kref;
 
 	struct list_head	rl_registered;	/* registered segments */
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
 };
 
-/* rl_flags */
-enum {
-	RPCRDMA_REQ_F_PENDING = 0,
-	RPCRDMA_REQ_F_TX_RESOURCES,
-};
-
 static inline struct rpcrdma_req *
 rpcr_to_rdmar(const struct rpc_rqst *rqst)
 {
@@ -391,22 +378,15 @@
 	struct list_head	rb_recv_bufs;
 	struct list_head	rb_allreqs;
 
-	unsigned long		rb_flags;
 	u32			rb_max_requests;
 	u32			rb_credits;	/* most recent credit grant */
 
 	u32			rb_bc_srv_max_requests;
 	u32			rb_bc_max_requests;
 
-	struct workqueue_struct *rb_completion_wq;
 	struct delayed_work	rb_refresh_worker;
 };
 
-/* rb_flags */
-enum {
-	RPCRDMA_BUF_F_EMPTY_SCQ = 0,
-};
-
 /*
  * Statistics for RPCRDMA
  */
@@ -452,6 +432,7 @@
 	struct rpcrdma_ep	rx_ep;
 	struct rpcrdma_buffer	rx_buf;
 	struct delayed_work	rx_connect_worker;
+	struct rpc_timeout	rx_timeout;
 	struct rpcrdma_stats	rx_stats;
 };
 
@@ -518,7 +499,8 @@
 }
 
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
-void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
+			struct rpcrdma_req *req);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
@@ -564,6 +546,7 @@
 /* Memory registration calls xprtrdma/frwr_ops.c
  */
 bool frwr_is_supported(struct ib_device *device);
+void frwr_reset(struct rpcrdma_req *req);
 int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
 int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
 void frwr_release_mr(struct rpcrdma_mr *mr);
@@ -574,8 +557,8 @@
 				struct rpcrdma_mr **mr);
 int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
 void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
-void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt,
-		     struct list_head *mrs);
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
 
 /*
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
@@ -598,9 +581,6 @@
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
 void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
-			  struct rpcrdma_req *req);
-void rpcrdma_deferred_completion(struct work_struct *work);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
 {
@@ -625,6 +605,7 @@
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *);
 int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 3665235..e2176c1 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -880,7 +880,7 @@
 			req->rq_slen);
 
 	/* Protect against races with write_space */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 
 	/* Don't race with disconnect */
 	if (xprt_connected(xprt)) {
@@ -890,7 +890,7 @@
 	} else
 		ret = -ENOTCONN;
 
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	/* Race breaker in case memory is freed before above code is called */
 	if (ret == -EAGAIN) {
@@ -909,6 +909,7 @@
 static void
 xs_stream_prepare_request(struct rpc_rqst *req)
 {
+	xdr_free_bvec(&req->rq_rcv_buf);
 	req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL);
 }
 
@@ -1211,6 +1212,15 @@
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 
 	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+	clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
+	clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
+	clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
+}
+
+static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
+{
+	set_bit(nr, &transport->sock_state);
+	queue_work(xprtiod_workqueue, &transport->error_worker);
 }
 
 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
@@ -1231,6 +1241,7 @@
  */
 static void xs_error_report(struct sock *sk)
 {
+	struct sock_xprt *transport;
 	struct rpc_xprt *xprt;
 	int err;
 
@@ -1238,13 +1249,14 @@
 	if (!(xprt = xprt_from_sock(sk)))
 		goto out;
 
+	transport = container_of(xprt, struct sock_xprt, xprt);
 	err = -sk->sk_err;
 	if (err == 0)
 		goto out;
 	dprintk("RPC:       xs_error_report client %p, error=%d...\n",
 			xprt, -err);
 	trace_rpc_socket_error(xprt, sk->sk_socket, err);
-	xprt_wake_pending_tasks(xprt, err);
+	xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
  out:
 	read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -1333,6 +1345,7 @@
 	cancel_delayed_work_sync(&transport->connect_worker);
 	xs_close(xprt);
 	cancel_work_sync(&transport->recv_worker);
+	cancel_work_sync(&transport->error_worker);
 	xs_xprt_free(xprt);
 	module_put(THIS_MODULE);
 }
@@ -1386,9 +1399,9 @@
 	}
 
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, copied);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(task, copied);
 	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
@@ -1498,7 +1511,6 @@
 	trace_rpc_socket_state_change(xprt, sk->sk_socket);
 	switch (sk->sk_state) {
 	case TCP_ESTABLISHED:
-		spin_lock(&xprt->transport_lock);
 		if (!xprt_test_and_set_connected(xprt)) {
 			xprt->connect_cookie++;
 			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
@@ -1507,9 +1519,8 @@
 			xprt->stat.connect_count++;
 			xprt->stat.connect_time += (long)jiffies -
 						   xprt->stat.connect_start;
-			xprt_wake_pending_tasks(xprt, -EAGAIN);
+			xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
 		}
-		spin_unlock(&xprt->transport_lock);
 		break;
 	case TCP_FIN_WAIT1:
 		/* The client initiated a shutdown of the socket */
@@ -1525,7 +1536,7 @@
 		/* The server initiated a shutdown of the socket */
 		xprt->connect_cookie++;
 		clear_bit(XPRT_CONNECTED, &xprt->state);
-		xs_tcp_force_close(xprt);
+		xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
 		/* fall through */
 	case TCP_CLOSING:
 		/*
@@ -1547,7 +1558,7 @@
 			xprt_clear_connecting(xprt);
 		clear_bit(XPRT_CLOSING, &xprt->state);
 		/* Trigger the socket release */
-		xs_tcp_force_close(xprt);
+		xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
 	}
  out:
 	read_unlock_bh(&sk->sk_callback_lock);
@@ -1556,6 +1567,7 @@
 static void xs_write_space(struct sock *sk)
 {
 	struct socket_wq *wq;
+	struct sock_xprt *transport;
 	struct rpc_xprt *xprt;
 
 	if (!sk->sk_socket)
@@ -1564,13 +1576,14 @@
 
 	if (unlikely(!(xprt = xprt_from_sock(sk))))
 		return;
+	transport = container_of(xprt, struct sock_xprt, xprt);
 	rcu_read_lock();
 	wq = rcu_dereference(sk->sk_wq);
 	if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
 		goto out;
 
-	if (xprt_write_space(xprt))
-		sk->sk_write_pending--;
+	xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
+	sk->sk_write_pending--;
 out:
 	rcu_read_unlock();
 }
@@ -1664,9 +1677,9 @@
  */
 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 static int xs_get_random_port(void)
@@ -2201,13 +2214,13 @@
 	unsigned int opt_on = 1;
 	unsigned int timeo;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
 	keepcnt = xprt->timeout->to_retries + 1;
 	timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
 		(xprt->timeout->to_retries + 1);
 	clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 
 	/* TCP Keepalive options */
 	kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
@@ -2232,7 +2245,7 @@
 	struct rpc_timeout to;
 	unsigned long initval;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->transport_lock);
 	if (reconnect_timeout < xprt->max_reconnect_timeout)
 		xprt->max_reconnect_timeout = reconnect_timeout;
 	if (connect_timeout < xprt->connect_timeout) {
@@ -2249,7 +2262,7 @@
 		xprt->connect_timeout = connect_timeout;
 	}
 	set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->transport_lock);
 }
 
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
@@ -2402,25 +2415,6 @@
 	xprt_wake_pending_tasks(xprt, status);
 }
 
-static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt)
-{
-	unsigned long start, now = jiffies;
-
-	start = xprt->stat.connect_start + xprt->reestablish_timeout;
-	if (time_after(start, now))
-		return start - now;
-	return 0;
-}
-
-static void xs_reconnect_backoff(struct rpc_xprt *xprt)
-{
-	xprt->reestablish_timeout <<= 1;
-	if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
-		xprt->reestablish_timeout = xprt->max_reconnect_timeout;
-	if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
-		xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
-}
-
 /**
  * xs_connect - connect a socket to a remote endpoint
  * @xprt: pointer to transport structure
@@ -2450,8 +2444,8 @@
 		/* Start by resetting any existing state */
 		xs_reset_transport(transport);
 
-		delay = xs_reconnect_delay(xprt);
-		xs_reconnect_backoff(xprt);
+		delay = xprt_reconnect_delay(xprt);
+		xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
 
 	} else
 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
@@ -2461,6 +2455,56 @@
 			delay);
 }
 
+static void xs_wake_disconnect(struct sock_xprt *transport)
+{
+	if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
+		xs_tcp_force_close(&transport->xprt);
+}
+
+static void xs_wake_write(struct sock_xprt *transport)
+{
+	if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
+		xprt_write_space(&transport->xprt);
+}
+
+static void xs_wake_error(struct sock_xprt *transport)
+{
+	int sockerr;
+	int sockerr_len = sizeof(sockerr);
+
+	if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
+		return;
+	mutex_lock(&transport->recv_mutex);
+	if (transport->sock == NULL)
+		goto out;
+	if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
+		goto out;
+	if (kernel_getsockopt(transport->sock, SOL_SOCKET, SO_ERROR,
+				(char *)&sockerr, &sockerr_len) != 0)
+		goto out;
+	if (sockerr < 0)
+		xprt_wake_pending_tasks(&transport->xprt, sockerr);
+out:
+	mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_wake_pending(struct sock_xprt *transport)
+{
+	if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
+		xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
+}
+
+static void xs_error_handle(struct work_struct *work)
+{
+	struct sock_xprt *transport = container_of(work,
+			struct sock_xprt, error_worker);
+
+	xs_wake_disconnect(transport);
+	xs_wake_write(transport);
+	xs_wake_error(transport);
+	xs_wake_pending(transport);
+}
+
 /**
  * xs_local_print_stats - display AF_LOCAL socket-specifc stats
  * @xprt: rpc_xprt struct containing statistics
@@ -2745,6 +2789,7 @@
 #ifdef CONFIG_SUNRPC_BACKCHANNEL
 	.bc_setup		= xprt_setup_bc,
 	.bc_maxpayload		= xs_tcp_bc_maxpayload,
+	.bc_num_slots		= xprt_bc_max_slots,
 	.bc_free_rqst		= xprt_free_bc_rqst,
 	.bc_destroy		= xprt_destroy_bc,
 #endif
@@ -2873,6 +2918,7 @@
 	xprt->timeout = &xs_local_default_timeout;
 
 	INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+	INIT_WORK(&transport->error_worker, xs_error_handle);
 	INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
 
 	switch (sun->sun_family) {
@@ -2943,6 +2989,7 @@
 	xprt->timeout = &xs_udp_default_timeout;
 
 	INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
+	INIT_WORK(&transport->error_worker, xs_error_handle);
 	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
 
 	switch (addr->sa_family) {
@@ -3024,6 +3071,7 @@
 		(xprt->timeout->to_retries + 1);
 
 	INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+	INIT_WORK(&transport->error_worker, xs_error_handle);
 	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
 
 	switch (addr->sa_family) {