srcu: Expedited grace periods with reduced memory contention

Commit f60d231a87c5 ("srcu: Crude control of expedited grace periods")
introduced a per-srcu_struct atomic counter to track outstanding
requests for grace periods.  This works, but represents a memory-contention
bottleneck.  This commit therefore uses the srcu_node combining tree
to remove this bottleneck.

This commit adds new ->srcu_gp_seq_needed_exp fields to the
srcu_data, srcu_node, and srcu_struct structures, which track the
farthest-in-the-future grace period that must be expedited, which in
turn requires that all nearer-term grace periods also be expedited.
Requests for expediting start with the srcu_data structure, run up
through the srcu_node tree, and end at the srcu_struct structure.
Note that it may be necessary to expedite a grace period that just
now started, and this is handled by a new srcu_funnel_exp_start()
function, which is invoked when the grace period itself is already
in its way, but when that grace period was not marked as expedited.

A new srcu_get_delay() function returns zero if there is at least one
expedited SRCU grace period in flight, or SRCU_INTERVAL otherwise.
This function is used to calculate delays:  Normal grace periods
are allowed to extend in order to cover more requests with a given
grace-period computation, which decreases per-request overhead.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Tested-by: Mike Galbraith <efault@gmx.de>
diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h
index 3865717..86df48d 100644
--- a/include/linux/srcutree.h
+++ b/include/linux/srcutree.h
@@ -43,6 +43,7 @@
 	spinlock_t lock ____cacheline_internodealigned_in_smp;
 	struct rcu_segcblist srcu_cblist;	/* List of callbacks.*/
 	unsigned long srcu_gp_seq_needed;	/* Furthest future GP needed. */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	bool srcu_cblist_invoking;		/* Invoking these CBs? */
 	struct delayed_work work;		/* Context for CB invoking. */
 	struct rcu_head srcu_barrier_head;	/* For srcu_barrier() use. */
@@ -63,6 +64,7 @@
 						/*  is > ->srcu_gq_seq. */
 	unsigned long srcu_data_have_cbs[4];	/* Which srcu_data structs */
 						/*  have CBs for given GP? */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	struct srcu_node *srcu_parent;		/* Next up in tree. */
 	int grplo;				/* Least CPU for node. */
 	int grphi;				/* Biggest CPU for node. */
@@ -81,7 +83,7 @@
 	unsigned int srcu_idx;			/* Current rdr array element. */
 	unsigned long srcu_gp_seq;		/* Grace-period seq #. */
 	unsigned long srcu_gp_seq_needed;	/* Latest gp_seq needed. */
-	atomic_t srcu_exp_cnt;			/* # ongoing expedited GPs. */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	struct srcu_data __percpu *sda;		/* Per-CPU srcu_data array. */
 	unsigned long srcu_barrier_seq;		/* srcu_barrier seq #. */
 	struct mutex srcu_barrier_mutex;	/* Serialize barrier ops. */
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 72b6cce..4b98e6f 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -72,6 +72,7 @@
 			snp->srcu_have_cbs[i] = 0;
 			snp->srcu_data_have_cbs[i] = 0;
 		}
+		snp->srcu_gp_seq_needed_exp = 0;
 		snp->grplo = -1;
 		snp->grphi = -1;
 		if (snp == &sp->node[0]) {
@@ -102,6 +103,7 @@
 		rcu_segcblist_init(&sdp->srcu_cblist);
 		sdp->srcu_cblist_invoking = false;
 		sdp->srcu_gp_seq_needed = sp->srcu_gp_seq;
+		sdp->srcu_gp_seq_needed_exp = sp->srcu_gp_seq;
 		sdp->mynode = &snp_first[cpu / levelspread[level]];
 		for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) {
 			if (snp->grplo < 0)
@@ -135,7 +137,6 @@
 	mutex_init(&sp->srcu_gp_mutex);
 	sp->srcu_idx = 0;
 	sp->srcu_gp_seq = 0;
-	atomic_set(&sp->srcu_exp_cnt, 0);
 	sp->srcu_barrier_seq = 0;
 	mutex_init(&sp->srcu_barrier_mutex);
 	atomic_set(&sp->srcu_barrier_cpu_cnt, 0);
@@ -143,6 +144,7 @@
 	if (!is_static)
 		sp->sda = alloc_percpu(struct srcu_data);
 	init_srcu_struct_nodes(sp, is_static);
+	sp->srcu_gp_seq_needed_exp = 0;
 	smp_store_release(&sp->srcu_gp_seq_needed, 0); /* Init done. */
 	return sp->sda ? 0 : -ENOMEM;
 }
@@ -307,6 +309,18 @@
 
 #define SRCU_INTERVAL		1
 
+/*
+ * Return grace-period delay, zero if there are expedited grace
+ * periods pending, SRCU_INTERVAL otherwise.
+ */
+static unsigned long srcu_get_delay(struct srcu_struct *sp)
+{
+	if (ULONG_CMP_LT(READ_ONCE(sp->srcu_gp_seq),
+			 READ_ONCE(sp->srcu_gp_seq_needed_exp)))
+		return 0;
+	return SRCU_INTERVAL;
+}
+
 /**
  * cleanup_srcu_struct - deconstruct a sleep-RCU structure
  * @sp: structure to clean up.
@@ -318,7 +332,8 @@
 {
 	int cpu;
 
-	WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
+	if (WARN_ON(!srcu_get_delay(sp)))
+		return; /* Leakage unless caller handles error. */
 	if (WARN_ON(srcu_readers_active(sp)))
 		return; /* Leakage unless caller handles error. */
 	flush_delayed_work(&sp->work);
@@ -444,15 +459,14 @@
  * schedule this invocation on the corresponding CPUs.
  */
 static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp,
-				  unsigned long mask)
+				  unsigned long mask, unsigned long delay)
 {
 	int cpu;
 
 	for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) {
 		if (!(mask & (1 << (cpu - snp->grplo))))
 			continue;
-		srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu),
-				      atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+		srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), delay);
 	}
 }
 
@@ -467,6 +481,7 @@
  */
 static void srcu_gp_end(struct srcu_struct *sp)
 {
+	unsigned long cbdelay;
 	bool cbs;
 	unsigned long gpseq;
 	int idx;
@@ -481,8 +496,11 @@
 	spin_lock_irq(&sp->gp_lock);
 	idx = rcu_seq_state(sp->srcu_gp_seq);
 	WARN_ON_ONCE(idx != SRCU_STATE_SCAN2);
+	cbdelay = srcu_get_delay(sp);
 	rcu_seq_end(&sp->srcu_gp_seq);
 	gpseq = rcu_seq_current(&sp->srcu_gp_seq);
+	if (ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, gpseq))
+		sp->srcu_gp_seq_needed_exp = gpseq;
 	spin_unlock_irq(&sp->gp_lock);
 	mutex_unlock(&sp->srcu_gp_mutex);
 	/* A new grace period can start at this point.  But only one. */
@@ -497,12 +515,14 @@
 			cbs = snp->srcu_have_cbs[idx] == gpseq;
 		snp->srcu_have_cbs[idx] = gpseq;
 		rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
+		if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
+			snp->srcu_gp_seq_needed_exp = gpseq;
 		mask = snp->srcu_data_have_cbs[idx];
 		snp->srcu_data_have_cbs[idx] = 0;
 		spin_unlock_irq(&snp->lock);
 		if (cbs) {
 			smp_mb(); /* GP end before CB invocation. */
-			srcu_schedule_cbs_snp(sp, snp, mask);
+			srcu_schedule_cbs_snp(sp, snp, mask, cbdelay);
 		}
 	}
 
@@ -517,25 +537,52 @@
 		srcu_gp_start(sp);
 		spin_unlock_irq(&sp->gp_lock);
 		/* Throttle expedited grace periods: Should be rare! */
-		srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) &&
-				    rcu_seq_ctr(gpseq) & 0xf
-				    ? 0
-				    : SRCU_INTERVAL);
+		srcu_reschedule(sp, rcu_seq_ctr(gpseq) & 0x3ff
+				    ? 0 : SRCU_INTERVAL);
 	} else {
 		spin_unlock_irq(&sp->gp_lock);
 	}
 }
 
 /*
+ * Funnel-locking scheme to scalably mediate many concurrent expedited
+ * grace-period requests.  This function is invoked for the first known
+ * expedited request for a grace period that has already been requested,
+ * but without expediting.  To start a completely new grace period,
+ * whether expedited or not, use srcu_funnel_gp_start() instead.
+ */
+static void srcu_funnel_exp_start(struct srcu_struct *sp, struct srcu_node *snp,
+				  unsigned long s)
+{
+	unsigned long flags;
+
+	for (; snp != NULL; snp = snp->srcu_parent) {
+		if (rcu_seq_done(&sp->srcu_gp_seq, s) ||
+		    ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
+			return;
+		spin_lock_irqsave(&snp->lock, flags);
+		if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
+			spin_unlock_irqrestore(&snp->lock, flags);
+			return;
+		}
+		WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
+		spin_unlock_irqrestore(&snp->lock, flags);
+	}
+	spin_lock_irqsave(&sp->gp_lock, flags);
+	if (!ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
+		sp->srcu_gp_seq_needed_exp = s;
+	spin_unlock_irqrestore(&sp->gp_lock, flags);
+}
+
+/*
  * Funnel-locking scheme to scalably mediate many concurrent grace-period
  * requests.  The winner has to do the work of actually starting grace
  * period s.  Losers must either ensure that their desired grace-period
  * number is recorded on at least their leaf srcu_node structure, or they
  * must take steps to invoke their own callbacks.
  */
-static void srcu_funnel_gp_start(struct srcu_struct *sp,
-				 struct srcu_data *sdp,
-				 unsigned long s)
+static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
+				 unsigned long s, bool do_norm)
 {
 	unsigned long flags;
 	int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs);
@@ -554,13 +601,20 @@
 			spin_unlock_irqrestore(&snp->lock, flags);
 			if (snp == sdp->mynode && snp_seq != s) {
 				smp_mb(); /* CBs after GP! */
-				srcu_schedule_cbs_sdp(sdp, 0);
+				srcu_schedule_cbs_sdp(sdp, do_norm
+							   ? SRCU_INTERVAL
+							   : 0);
+				return;
 			}
+			if (!do_norm)
+				srcu_funnel_exp_start(sp, snp, s);
 			return;
 		}
 		snp->srcu_have_cbs[idx] = s;
 		if (snp == sdp->mynode)
 			snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
+		if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
+			snp->srcu_gp_seq_needed_exp = s;
 		spin_unlock_irqrestore(&snp->lock, flags);
 	}
 
@@ -573,6 +627,8 @@
 		 */
 		smp_store_release(&sp->srcu_gp_seq_needed, s); /*^^^*/
 	}
+	if (!do_norm && ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
+		sp->srcu_gp_seq_needed_exp = s;
 
 	/* If grace period not already done and none in progress, start it. */
 	if (!rcu_seq_done(&sp->srcu_gp_seq, s) &&
@@ -580,9 +636,7 @@
 		WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed));
 		srcu_gp_start(sp);
 		queue_delayed_work(system_power_efficient_wq, &sp->work,
-				   atomic_read(&sp->srcu_exp_cnt)
-				   ? 0
-				   : SRCU_INTERVAL);
+				   srcu_get_delay(sp));
 	}
 	spin_unlock_irqrestore(&sp->gp_lock, flags);
 }
@@ -597,7 +651,7 @@
 	for (;;) {
 		if (srcu_readers_active_idx_check(sp, idx))
 			return true;
-		if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+		if (--trycount + !srcu_get_delay(sp) <= 0)
 			return false;
 		udelay(SRCU_RETRY_CHECK_DELAY);
 	}
@@ -650,10 +704,11 @@
  * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
  * srcu_struct structure.
  */
-void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
-	       rcu_callback_t func)
+void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
+		 rcu_callback_t func, bool do_norm)
 {
 	unsigned long flags;
+	bool needexp = false;
 	bool needgp = false;
 	unsigned long s;
 	struct srcu_data *sdp;
@@ -672,16 +727,28 @@
 		sdp->srcu_gp_seq_needed = s;
 		needgp = true;
 	}
+	if (!do_norm && ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) {
+		sdp->srcu_gp_seq_needed_exp = s;
+		needexp = true;
+	}
 	spin_unlock_irqrestore(&sdp->lock, flags);
 	if (needgp)
-		srcu_funnel_gp_start(sp, sdp, s);
+		srcu_funnel_gp_start(sp, sdp, s, do_norm);
+	else if (needexp)
+		srcu_funnel_exp_start(sp, sdp->mynode, s);
+}
+
+void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
+	       rcu_callback_t func)
+{
+	__call_srcu(sp, rhp, func, true);
 }
 EXPORT_SYMBOL_GPL(call_srcu);
 
 /*
  * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
  */
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, bool do_norm)
 {
 	struct rcu_synchronize rcu;
 
@@ -697,7 +764,7 @@
 	check_init_srcu_struct(sp);
 	init_completion(&rcu.completion);
 	init_rcu_head_on_stack(&rcu.head);
-	call_srcu(sp, &rcu.head, wakeme_after_rcu);
+	__call_srcu(sp, &rcu.head, wakeme_after_rcu, do_norm);
 	wait_for_completion(&rcu.completion);
 	destroy_rcu_head_on_stack(&rcu.head);
 }
@@ -714,18 +781,7 @@
  */
 void synchronize_srcu_expedited(struct srcu_struct *sp)
 {
-	bool do_norm = rcu_gp_is_normal();
-
-	check_init_srcu_struct(sp);
-	if (!do_norm) {
-		atomic_inc(&sp->srcu_exp_cnt);
-		smp_mb__after_atomic(); /* increment before GP. */
-	}
-	__synchronize_srcu(sp);
-	if (!do_norm) {
-		smp_mb__before_atomic(); /* GP before decrement. */
-		WARN_ON_ONCE(atomic_dec_return(&sp->srcu_exp_cnt) < 0);
-	}
+	__synchronize_srcu(sp, rcu_gp_is_normal());
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
@@ -773,7 +829,7 @@
 	if (rcu_gp_is_expedited())
 		synchronize_srcu_expedited(sp);
 	else
-		__synchronize_srcu(sp);
+		__synchronize_srcu(sp, true);
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
@@ -1008,14 +1064,13 @@
 	sp = container_of(work, struct srcu_struct, work.work);
 
 	srcu_advance_state(sp);
-	srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	srcu_reschedule(sp, srcu_get_delay(sp));
 }
 EXPORT_SYMBOL_GPL(process_srcu);
 
 void srcutorture_get_gp_data(enum rcutorture_type test_type,
-					   struct srcu_struct *sp, int *flags,
-					   unsigned long *gpnum,
-					   unsigned long *completed)
+			     struct srcu_struct *sp, int *flags,
+			     unsigned long *gpnum, unsigned long *completed)
 {
 	if (test_type != SRCU_FLAVOR)
 		return;