rcu: Migrate callbacks earlier in the CPU-offline timeline

RCU callbacks must be migrated away from an outgoing CPU, and this is
done near the end of the CPU-hotplug operation, after the outgoing CPU is
long gone.  Unfortunately, this means that other CPU-hotplug callbacks
can execute while the outgoing CPU's callbacks are still immobilized
on the long-gone CPU's callback lists.  If any of these CPU-hotplug
callbacks must wait, either directly or indirectly, for the invocation
of any of the immobilized RCU callbacks, the system will hang.

This commit avoids such hangs by migrating the callbacks away from the
outgoing CPU immediately upon its departure.  This migration makes use
of the cpuhp_report_idle_dead function that is called by the outgoing CPU
just after it has entered the idle loop for the last time.  This function
IPIs an online CPU in order to wake up the thread orchestrating the
CPU-hotplug operation, and this commit makes this IPI handler migrate the
immobilized RCU callbacks.  Thus, RCU is able to advance these callbacks
and invoke them, which allows all the after-the-fact CPU-hotplug callbacks
to wait on these RCU callbacks without risk of a hang.

Reported-by: Jeffrey Hugo <jhugo@codeaurora.org>
Link: http://lkml.kernel.org/r/db9c91f6-1b17-6136-84f0-03c3c2581ab4@codeaurora.org
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: Thomas Gleixner <tglx@linutronix.de>
Cc: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
Cc: Ingo Molnar <mingo@kernel.org>
Cc: Anna-Maria Gleixner <anna-maria@linutronix.de>
Cc: Boris Ostrovsky <boris.ostrovsky@oracle.com>
Cc: Richard Weinberger <richard@nod.at>
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index ecca9d2..96f1baf 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -109,6 +109,7 @@ void rcu_bh_qs(void);
 void rcu_check_callbacks(int user);
 void rcu_report_dead(unsigned int cpu);
 void rcu_cpu_starting(unsigned int cpu);
+void rcutree_migrate_callbacks(int cpu);
 
 #ifdef CONFIG_RCU_STALL_COMMON
 void rcu_sysrq_start(void);
diff --git a/kernel/cpu.c b/kernel/cpu.c
index 9ae6fbe..47ef984 100644
--- a/kernel/cpu.c
+++ b/kernel/cpu.c
@@ -59,6 +59,7 @@ struct cpuhp_cpu_state {
 	struct hlist_node	*node;
 	enum cpuhp_state	cb_state;
 	int			result;
+	int			cpu;
 	struct completion	done;
 #endif
 };
@@ -736,6 +737,7 @@ static void cpuhp_complete_idle_dead(void *arg)
 {
 	struct cpuhp_cpu_state *st = arg;
 
+	rcutree_migrate_callbacks(st->cpu);
 	complete(&st->done);
 }
 
@@ -1828,5 +1830,11 @@ void __init boot_cpu_init(void)
  */
 void __init boot_cpu_state_init(void)
 {
+	int __maybe_unused cpu;
+
 	per_cpu_ptr(&cpuhp_state, smp_processor_id())->state = CPUHP_ONLINE;
+#ifdef CONFIG_SMP
+	for_each_possible_cpu(cpu)
+		per_cpu_ptr(&cpuhp_state, cpu)->cpu = cpu;
+#endif
 }
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 94ec745..e6d5349 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2547,85 +2547,6 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 }
 
 /*
- * Send the specified CPU's RCU callbacks to the orphanage.  The
- * specified CPU must be offline, and the caller must hold the
- * ->orphan_lock.
- */
-static void
-rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
-			  struct rcu_node *rnp, struct rcu_data *rdp)
-{
-	lockdep_assert_held(&rsp->orphan_lock);
-
-	/* No-CBs CPUs do not have orphanable callbacks. */
-	if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) || rcu_is_nocb_cpu(rdp->cpu))
-		return;
-
-	/*
-	 * Orphan the callbacks.  First adjust the counts.  This is safe
-	 * because _rcu_barrier() excludes CPU-hotplug operations, so it
-	 * cannot be running now.  Thus no memory barrier is required.
-	 */
-	rdp->n_cbs_orphaned += rcu_segcblist_n_cbs(&rdp->cblist);
-	rcu_segcblist_extract_count(&rdp->cblist, &rsp->orphan_done);
-
-	/*
-	 * Next, move those callbacks still needing a grace period to
-	 * the orphanage, where some other CPU will pick them up.
-	 * Some of the callbacks might have gone partway through a grace
-	 * period, but that is too bad.  They get to start over because we
-	 * cannot assume that grace periods are synchronized across CPUs.
-	 */
-	rcu_segcblist_extract_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
-
-	/*
-	 * Then move the ready-to-invoke callbacks to the orphanage,
-	 * where some other CPU will pick them up.  These will not be
-	 * required to pass though another grace period: They are done.
-	 */
-	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rsp->orphan_done);
-
-	/* Finally, disallow further callbacks on this CPU.  */
-	rcu_segcblist_disable(&rdp->cblist);
-}
-
-/*
- * Adopt the RCU callbacks from the specified rcu_state structure's
- * orphanage.  The caller must hold the ->orphan_lock.
- */
-static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
-{
-	struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
-
-	lockdep_assert_held(&rsp->orphan_lock);
-
-	/* No-CBs CPUs are handled specially. */
-	if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) ||
-	    rcu_nocb_adopt_orphan_cbs(rsp, rdp, flags))
-		return;
-
-	/* Do the accounting first. */
-	rdp->n_cbs_adopted += rsp->orphan_done.len;
-	if (rsp->orphan_done.len_lazy != rsp->orphan_done.len)
-		rcu_idle_count_callbacks_posted();
-	rcu_segcblist_insert_count(&rdp->cblist, &rsp->orphan_done);
-
-	/*
-	 * We do not need a memory barrier here because the only way we
-	 * can get here if there is an rcu_barrier() in flight is if
-	 * we are the task doing the rcu_barrier().
-	 */
-
-	/* First adopt the ready-to-invoke callbacks, then the done ones. */
-	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rsp->orphan_done);
-	WARN_ON_ONCE(rsp->orphan_done.head);
-	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
-	WARN_ON_ONCE(rsp->orphan_pend.head);
-	WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) !=
-		     !rcu_segcblist_n_cbs(&rdp->cblist));
-}
-
-/*
  * Trace the fact that this CPU is going offline.
  */
 static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
@@ -2695,7 +2616,6 @@ static void rcu_cleanup_dead_rnp(struct rcu_node *rnp_leaf)
  */
 static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 {
-	unsigned long flags;
 	struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
 	struct rcu_node *rnp = rdp->mynode;  /* Outgoing CPU's rdp & rnp. */
 
@@ -2704,18 +2624,6 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
 	/* Adjust any no-longer-needed kthreads. */
 	rcu_boost_kthread_setaffinity(rnp, -1);
-
-	/* Orphan the dead CPU's callbacks, and adopt them if appropriate. */
-	raw_spin_lock_irqsave(&rsp->orphan_lock, flags);
-	rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
-	rcu_adopt_orphan_cbs(rsp, flags);
-	raw_spin_unlock_irqrestore(&rsp->orphan_lock, flags);
-
-	WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
-		  !rcu_segcblist_empty(&rdp->cblist),
-		  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
-		  cpu, rcu_segcblist_n_cbs(&rdp->cblist),
-		  rcu_segcblist_first_cb(&rdp->cblist));
 }
 
 /*
@@ -3927,6 +3835,116 @@ void rcu_report_dead(unsigned int cpu)
 	for_each_rcu_flavor(rsp)
 		rcu_cleanup_dying_idle_cpu(cpu, rsp);
 }
+
+/*
+ * Send the specified CPU's RCU callbacks to the orphanage.  The
+ * specified CPU must be offline, and the caller must hold the
+ * ->orphan_lock.
+ */
+static void
+rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
+			  struct rcu_node *rnp, struct rcu_data *rdp)
+{
+	lockdep_assert_held(&rsp->orphan_lock);
+
+	/* No-CBs CPUs do not have orphanable callbacks. */
+	if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) || rcu_is_nocb_cpu(rdp->cpu))
+		return;
+
+	/*
+	 * Orphan the callbacks.  First adjust the counts.  This is safe
+	 * because _rcu_barrier() excludes CPU-hotplug operations, so it
+	 * cannot be running now.  Thus no memory barrier is required.
+	 */
+	rdp->n_cbs_orphaned += rcu_segcblist_n_cbs(&rdp->cblist);
+	rcu_segcblist_extract_count(&rdp->cblist, &rsp->orphan_done);
+
+	/*
+	 * Next, move those callbacks still needing a grace period to
+	 * the orphanage, where some other CPU will pick them up.
+	 * Some of the callbacks might have gone partway through a grace
+	 * period, but that is too bad.  They get to start over because we
+	 * cannot assume that grace periods are synchronized across CPUs.
+	 */
+	rcu_segcblist_extract_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
+
+	/*
+	 * Then move the ready-to-invoke callbacks to the orphanage,
+	 * where some other CPU will pick them up.  These will not be
+	 * required to pass though another grace period: They are done.
+	 */
+	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rsp->orphan_done);
+
+	/* Finally, disallow further callbacks on this CPU.  */
+	rcu_segcblist_disable(&rdp->cblist);
+}
+
+/*
+ * Adopt the RCU callbacks from the specified rcu_state structure's
+ * orphanage.  The caller must hold the ->orphan_lock.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
+{
+	struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
+
+	lockdep_assert_held(&rsp->orphan_lock);
+
+	/* No-CBs CPUs are handled specially. */
+	if (!IS_ENABLED(CONFIG_HOTPLUG_CPU) ||
+	    rcu_nocb_adopt_orphan_cbs(rsp, rdp, flags))
+		return;
+
+	/* Do the accounting first. */
+	rdp->n_cbs_adopted += rsp->orphan_done.len;
+	if (rsp->orphan_done.len_lazy != rsp->orphan_done.len)
+		rcu_idle_count_callbacks_posted();
+	rcu_segcblist_insert_count(&rdp->cblist, &rsp->orphan_done);
+
+	/*
+	 * We do not need a memory barrier here because the only way we
+	 * can get here if there is an rcu_barrier() in flight is if
+	 * we are the task doing the rcu_barrier().
+	 */
+
+	/* First adopt the ready-to-invoke callbacks, then the done ones. */
+	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rsp->orphan_done);
+	WARN_ON_ONCE(rsp->orphan_done.head);
+	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
+	WARN_ON_ONCE(rsp->orphan_pend.head);
+	WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) !=
+		     !rcu_segcblist_n_cbs(&rdp->cblist));
+}
+
+/* Orphan the dead CPU's callbacks, and then adopt them. */
+static void rcu_migrate_callbacks(int cpu, struct rcu_state *rsp)
+{
+	unsigned long flags;
+	struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
+	struct rcu_node *rnp = rdp->mynode;  /* Outgoing CPU's rdp & rnp. */
+
+	raw_spin_lock_irqsave(&rsp->orphan_lock, flags);
+	rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
+	rcu_adopt_orphan_cbs(rsp, flags);
+	raw_spin_unlock_irqrestore(&rsp->orphan_lock, flags);
+	WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
+		  !rcu_segcblist_empty(&rdp->cblist),
+		  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
+		  cpu, rcu_segcblist_n_cbs(&rdp->cblist),
+		  rcu_segcblist_first_cb(&rdp->cblist));
+}
+
+/*
+ * The outgoing CPU has just passed through the dying-idle state,
+ * and we are being invoked from the CPU that was IPIed to continue the
+ * offline operation.  We need to migrate the outgoing CPU's callbacks.
+ */
+void rcutree_migrate_callbacks(int cpu)
+{
+	struct rcu_state *rsp;
+
+	for_each_rcu_flavor(rsp)
+		rcu_migrate_callbacks(cpu, rsp);
+}
 #endif
 
 /*