Merge tag 'nfsd-5.1' of git://linux-nfs.org/~bfields/linux

Pull NFS server updates from Bruce Fields:
 "Miscellaneous NFS server fixes.

  Probably the most visible bug is one that could artificially limit
  NFSv4.1 performance by limiting the number of oustanding rpcs from a
  single client.

  Neil Brown also gets a special mention for fixing a 14.5-year-old
  memory-corruption bug in the encoding of NFSv3 readdir responses"

* tag 'nfsd-5.1' of git://linux-nfs.org/~bfields/linux:
  nfsd: allow nfsv3 readdir request to be larger.
  nfsd: fix wrong check in write_v4_end_grace()
  nfsd: fix memory corruption caused by readdir
  nfsd: fix performance-limiting session calculation
  svcrpc: fix UDP on servers with lots of threads
  svcrdma: Remove syslog warnings in work completion handlers
  svcrdma: Squelch compiler warning when SUNRPC_DEBUG is disabled
  svcrdma: Use struct_size() in kmalloc()
  svcrpc: fix unlikely races preventing queueing of sockets
  svcrpc: svc_xprt_has_something_to_do seems a little long
  SUNRPC: Don't allow compiler optimisation of svc_xprt_release_slot()
  nfsd: fix an IS_ERR() vs NULL check
diff --git a/fs/nfsd/nfs3proc.c b/fs/nfsd/nfs3proc.c
index 9eb8086..8f933e8 100644
--- a/fs/nfsd/nfs3proc.c
+++ b/fs/nfsd/nfs3proc.c
@@ -463,8 +463,19 @@
 					&resp->common, nfs3svc_encode_entry);
 	memcpy(resp->verf, argp->verf, 8);
 	resp->count = resp->buffer - argp->buffer;
-	if (resp->offset)
-		xdr_encode_hyper(resp->offset, argp->cookie);
+	if (resp->offset) {
+		loff_t offset = argp->cookie;
+
+		if (unlikely(resp->offset1)) {
+			/* we ended up with offset on a page boundary */
+			*resp->offset = htonl(offset >> 32);
+			*resp->offset1 = htonl(offset & 0xffffffff);
+			resp->offset1 = NULL;
+		} else {
+			xdr_encode_hyper(resp->offset, offset);
+		}
+		resp->offset = NULL;
+	}
 
 	RETURN_STATUS(nfserr);
 }
@@ -533,6 +544,7 @@
 		} else {
 			xdr_encode_hyper(resp->offset, offset);
 		}
+		resp->offset = NULL;
 	}
 
 	RETURN_STATUS(nfserr);
@@ -576,7 +588,7 @@
 	resp->f_wtmax  = max_blocksize;
 	resp->f_wtpref = max_blocksize;
 	resp->f_wtmult = PAGE_SIZE;
-	resp->f_dtpref = PAGE_SIZE;
+	resp->f_dtpref = max_blocksize;
 	resp->f_maxfilesize = ~(u32) 0;
 	resp->f_properties = NFS3_FSF_DEFAULT;
 
diff --git a/fs/nfsd/nfs3xdr.c b/fs/nfsd/nfs3xdr.c
index 9b973f4..93fea24 100644
--- a/fs/nfsd/nfs3xdr.c
+++ b/fs/nfsd/nfs3xdr.c
@@ -573,6 +573,8 @@
 nfs3svc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p)
 {
 	struct nfsd3_readdirargs *args = rqstp->rq_argp;
+	u32 max_blocksize = svc_max_payload(rqstp);
+
 	p = decode_fh(p, &args->fh);
 	if (!p)
 		return 0;
@@ -580,7 +582,7 @@
 	args->verf   = p; p += 2;
 	args->dircount = ~0;
 	args->count  = ntohl(*p++);
-	args->count  = min_t(u32, args->count, PAGE_SIZE);
+	args->count  = min_t(u32, args->count, max_blocksize);
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 
 	return xdr_argsize_check(rqstp, p);
@@ -921,6 +923,7 @@
 		} else {
 			xdr_encode_hyper(cd->offset, offset64);
 		}
+		cd->offset = NULL;
 	}
 
 	/*
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index a9d24d5..d219159 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -900,9 +900,9 @@
 		return PTR_ERR(client);
 	}
 	cred = get_backchannel_cred(clp, client, ses);
-	if (IS_ERR(cred)) {
+	if (!cred) {
 		rpc_shutdown_client(client);
-		return PTR_ERR(cred);
+		return -ENOMEM;
 	}
 	clp->cl_cb_client = client;
 	clp->cl_cb_cred = cred;
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index fb3c984..6a45fb0 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -1544,16 +1544,16 @@
 {
 	u32 slotsize = slot_bytes(ca);
 	u32 num = ca->maxreqs;
-	int avail;
+	unsigned long avail, total_avail;
 
 	spin_lock(&nfsd_drc_lock);
-	avail = min((unsigned long)NFSD_MAX_MEM_PER_SESSION,
-		    nfsd_drc_max_mem - nfsd_drc_mem_used);
+	total_avail = nfsd_drc_max_mem - nfsd_drc_mem_used;
+	avail = min((unsigned long)NFSD_MAX_MEM_PER_SESSION, total_avail);
 	/*
 	 * Never use more than a third of the remaining memory,
 	 * unless it's the only way to give this client a slot:
 	 */
-	avail = clamp_t(int, avail, slotsize, avail/3);
+	avail = clamp_t(int, avail, slotsize, total_avail/3);
 	num = min_t(int, num, avail / slotsize);
 	nfsd_drc_mem_used += num * slotsize;
 	spin_unlock(&nfsd_drc_lock);
diff --git a/fs/nfsd/nfsctl.c b/fs/nfsd/nfsctl.c
index 72a7681..f2feb2d 100644
--- a/fs/nfsd/nfsctl.c
+++ b/fs/nfsd/nfsctl.c
@@ -1126,7 +1126,7 @@
 		case 'Y':
 		case 'y':
 		case '1':
-			if (nn->nfsd_serv)
+			if (!nn->nfsd_serv)
 				return -EBUSY;
 			nfsd4_end_grace(nn);
 			break;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 4eb8fbf..61530b1 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -357,15 +357,29 @@
 	struct svc_xprt	*xprt = rqstp->rq_xprt;
 	if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
 		atomic_dec(&xprt->xpt_nr_rqsts);
+		smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */
 		svc_xprt_enqueue(xprt);
 	}
 }
 
-static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
+static bool svc_xprt_ready(struct svc_xprt *xprt)
 {
-	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
+	unsigned long xpt_flags;
+
+	/*
+	 * If another cpu has recently updated xpt_flags,
+	 * sk_sock->flags, xpt_reserved, or xpt_nr_rqsts, we need to
+	 * know about it; otherwise it's possible that both that cpu and
+	 * this one could call svc_xprt_enqueue() without either
+	 * svc_xprt_enqueue() recognizing that the conditions below
+	 * are satisfied, and we could stall indefinitely:
+	 */
+	smp_rmb();
+	xpt_flags = READ_ONCE(xprt->xpt_flags);
+
+	if (xpt_flags & (BIT(XPT_CONN) | BIT(XPT_CLOSE)))
 		return true;
-	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
+	if (xpt_flags & (BIT(XPT_DATA) | BIT(XPT_DEFERRED))) {
 		if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
 		    svc_xprt_slots_in_range(xprt))
 			return true;
@@ -381,7 +395,7 @@
 	struct svc_rqst	*rqstp = NULL;
 	int cpu;
 
-	if (!svc_xprt_has_something_to_do(xprt))
+	if (!svc_xprt_ready(xprt))
 		return;
 
 	/* Mark transport as busy. It will remain in this state until
@@ -475,7 +489,7 @@
 	if (xprt && space < rqstp->rq_reserved) {
 		atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
 		rqstp->rq_reserved = space;
-
+		smp_wmb(); /* See smp_rmb() in svc_xprt_ready() */
 		svc_xprt_enqueue(xprt);
 	}
 }
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index a6a0609..43590a9 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -349,12 +349,16 @@
 /*
  * Set socket snd and rcv buffer lengths
  */
-static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
-				unsigned int rcv)
+static void svc_sock_setbufsize(struct svc_sock *svsk, unsigned int nreqs)
 {
+	unsigned int max_mesg = svsk->sk_xprt.xpt_server->sv_max_mesg;
+	struct socket *sock = svsk->sk_sock;
+
+	nreqs = min(nreqs, INT_MAX / 2 / max_mesg);
+
 	lock_sock(sock->sk);
-	sock->sk->sk_sndbuf = snd * 2;
-	sock->sk->sk_rcvbuf = rcv * 2;
+	sock->sk->sk_sndbuf = nreqs * max_mesg * 2;
+	sock->sk->sk_rcvbuf = nreqs * max_mesg * 2;
 	sock->sk->sk_write_space(sock->sk);
 	release_sock(sock->sk);
 }
@@ -516,9 +520,7 @@
 	     * provides an upper bound on the number of threads
 	     * which will access the socket.
 	     */
-	    svc_sock_setbufsize(svsk->sk_sock,
-				(serv->sv_nrthreads+3) * serv->sv_max_mesg,
-				(serv->sv_nrthreads+3) * serv->sv_max_mesg);
+	    svc_sock_setbufsize(svsk, serv->sv_nrthreads + 3);
 
 	clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 	skb = NULL;
@@ -681,9 +683,7 @@
 	 * receive and respond to one request.
 	 * svc_udp_recvfrom will re-adjust if necessary
 	 */
-	svc_sock_setbufsize(svsk->sk_sock,
-			    3 * svsk->sk_xprt.xpt_server->sv_max_mesg,
-			    3 * svsk->sk_xprt.xpt_server->sv_max_mesg);
+	svc_sock_setbufsize(svsk, 3);
 
 	/* data might have come in before data_ready set up */
 	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 828b149..65e2fb9 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -272,11 +272,8 @@
 			return false;
 		ctxt->rc_temp = true;
 		ret = __svc_rdma_post_recv(rdma, ctxt);
-		if (ret) {
-			pr_err("svcrdma: failure posting recv buffers: %d\n",
-			       ret);
+		if (ret)
 			return false;
-		}
 	}
 	return true;
 }
@@ -314,17 +311,14 @@
 
 	spin_lock(&rdma->sc_rq_dto_lock);
 	list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
-	spin_unlock(&rdma->sc_rq_dto_lock);
+	/* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
 	set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+	spin_unlock(&rdma->sc_rq_dto_lock);
 	if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
 		svc_xprt_enqueue(&rdma->sc_xprt);
 	goto out;
 
 flushed:
-	if (wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
-		       ib_wc_status_msg(wc->status),
-		       wc->status, wc->vendor_err);
 post_err:
 	svc_rdma_recv_ctxt_put(rdma, ctxt);
 	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index dc19517..2121c9b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -64,8 +64,7 @@
 		spin_unlock(&rdma->sc_rw_ctxt_lock);
 	} else {
 		spin_unlock(&rdma->sc_rw_ctxt_lock);
-		ctxt = kmalloc(sizeof(*ctxt) +
-			       SG_CHUNK_SIZE * sizeof(struct scatterlist),
+		ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
 			       GFP_KERNEL);
 		if (!ctxt)
 			goto out;
@@ -213,13 +212,8 @@
 	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
 	wake_up(&rdma->sc_send_wait);
 
-	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+	if (unlikely(wc->status != IB_WC_SUCCESS))
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
-	}
 
 	svc_rdma_write_info_free(info);
 }
@@ -278,18 +272,15 @@
 
 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
 		svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
 	} else {
 		spin_lock(&rdma->sc_rq_dto_lock);
 		list_add_tail(&info->ri_readctxt->rc_list,
 			      &rdma->sc_read_complete_q);
+		/* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
+		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 		spin_unlock(&rdma->sc_rq_dto_lock);
 
-		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
 		svc_xprt_enqueue(&rdma->sc_xprt);
 	}
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1f20011..6fdba72 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -272,10 +272,6 @@
 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 		svc_xprt_enqueue(&rdma->sc_xprt);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: Send: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
 	}
 
 	svc_xprt_put(&rdma->sc_xprt);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 57f86c6..027a3b0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -390,8 +390,8 @@
 	struct ib_qp_init_attr qp_attr;
 	unsigned int ctxts, rq_depth;
 	struct ib_device *dev;
-	struct sockaddr *sap;
 	int ret = 0;
+	RPC_IFDEBUG(struct sockaddr *sap);
 
 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	clear_bit(XPT_CONN, &xprt->xpt_flags);
@@ -525,6 +525,7 @@
 	if (ret)
 		goto errout;
 
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	dprintk("svcrdma: new connection %p accepted:\n", newxprt);
 	sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 	dprintk("    local address   : %pIS:%u\n", sap, rpc_get_port(sap));
@@ -535,6 +536,7 @@
 	dprintk("    rdma_rw_ctxs    : %d\n", ctxts);
 	dprintk("    max_requests    : %d\n", newxprt->sc_max_requests);
 	dprintk("    ord             : %d\n", conn_param.initiator_depth);
+#endif
 
 	trace_svcrdma_xprt_accept(&newxprt->sc_xprt);
 	return &newxprt->sc_xprt;
@@ -588,11 +590,6 @@
 	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
 		ib_drain_qp(rdma->sc_qp);
 
-	/* We should only be called from kref_put */
-	if (kref_read(&xprt->xpt_ref) != 0)
-		pr_err("svcrdma: sc_xprt still in use? (%d)\n",
-		       kref_read(&xprt->xpt_ref));
-
 	svc_rdma_flush_recv_queues(rdma);
 
 	/* Final put of backchannel client transport */