mm/page_alloc: Add remote draining support to per-cpu lists

page_alloc.c's per-cpu page lists are currently protected using local
locks. While performance savvy, this doesn't allow for remote access to
these structures. CPUs requiring system-wide per-cpu list drains get
around this by scheduling drain work on all CPUs. That said, some select
setups like systems with NOHZ_FULL CPUs, aren't well suited to this, as
they can't handle interruptions of any sort.

To mitigate this introduce a new lock-less remote draining mechanism. It
leverages the fact that the per-cpu page lists are accessed through
indirection, and that the pointer can be updated atomically. It goes
like this:

 - Atomically switch the per-cpu lists pointers to ones pointing to an
   empty list.

 - Wait for a grace period so as for all concurrent users of the old
   per-cpu lists pointer to finish updating them. Note that whatever
   they were doing, the result was going to be flushed anyway[1].

 - Remotely flush the old lists now that we know nobody is using them.
   Once empty, these per-cpu lists will be used for the next drain.
   Concurrent access to the drain process is protected by a mutex.

RCU guarantees atomicity both while dereferencing the per-cpu lists
pointer and replacing it. It also checks for RCU critical
section/locking correctness, as all readers have to hold their per-cpu
pagesets local lock. Also, synchronize_rcu_expedited() is used to
minimize hangs during low memory situations, without interrupting
NOHZ_FULL CPUs, since they are in an extended quiescent state.

As a side effect to all this we now have to promote the spin_lock() in
free_pcppages_bulk() to spin_lock_irqsave() since not all function users
enter with interrupts disabled.

Accesses to the pcplists like the ones in mm/vmstat.c don't require RCU
supervision since they can handle outdated data.

Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>

[1] The old mechanism disabled preemption as the means for
    serialization, so per-cpu drain works were already stepping over
    whatever was being processed concurrently to the drain call.
diff --git a/include/linux/mmzone.h b/include/linux/mmzone.h
index fb023da9..c112e78 100644
--- a/include/linux/mmzone.h
+++ b/include/linux/mmzone.h
@@ -365,13 +365,27 @@ struct per_cpu_pages {
 	short expire;		/* When 0, remote pagesets are drained */
 #endif
 
-	struct pcplists *lp;
+	/*
+	 * Having two pcplists allows us to remotely flush them in a lock-less
+	 * manner: we atomically switch the 'lp' and 'drain' pointers, wait a
+	 * grace period to synchronize against concurrent users of 'lp', and
+	 * safely free whatever is left in 'drain'.
+	 *
+	 * All accesses to 'lp' are protected by local locks, which also serve
+	 * as RCU critical section delimiters. 'lp' should only be dereferenced
+	 * *once* per critical section.
+	 *
+	 * See mm/page_alloc.c's __drain_all_pages() for the bulk of the remote
+	 * drain implementation.
+	 */
+	struct pcplists __rcu *lp;
+	struct pcplists *drain;
 	struct pcplists {
 		/* Number of pages in the lists */
 		int count;
 		/* Lists of pages, one per migrate type stored on the pcp-lists */
 		struct list_head lists[NR_PCP_LISTS];
-	} __private pcplists;
+	} __private pcplists[2];
 };
 
 struct per_cpu_zonestat {
diff --git a/mm/page_alloc.c b/mm/page_alloc.c
index 4da6fcb..9ce9e4d 100644
--- a/mm/page_alloc.c
+++ b/mm/page_alloc.c
@@ -147,13 +147,7 @@ DEFINE_PER_CPU(int, _numa_mem_);		/* Kernel "local memory" node */
 EXPORT_PER_CPU_SYMBOL(_numa_mem_);
 #endif
 
-/* work_structs for global per-cpu drains */
-struct pcpu_drain {
-	struct zone *zone;
-	struct work_struct work;
-};
 static DEFINE_MUTEX(pcpu_drain_mutex);
-static DEFINE_PER_CPU(struct pcpu_drain, pcpu_drain);
 
 #ifdef CONFIG_GCC_PLUGIN_LATENT_ENTROPY
 volatile unsigned long latent_entropy __latent_entropy;
@@ -1448,6 +1442,7 @@ static void free_pcppages_bulk(struct zone *zone, int count,
 	int prefetch_nr = READ_ONCE(pcp->batch);
 	bool isolated_pageblocks;
 	struct page *page, *tmp;
+	unsigned long flags;
 	LIST_HEAD(head);
 
 	/*
@@ -1511,11 +1506,7 @@ static void free_pcppages_bulk(struct zone *zone, int count,
 	}
 	lp->count -= nr_freed;
 
-	/*
-	 * local_lock_irq held so equivalent to spin_lock_irqsave for
-	 * both PREEMPT_RT and non-PREEMPT_RT configurations.
-	 */
-	spin_lock(&zone->lock);
+	spin_lock_irqsave(&zone->lock, flags);
 	isolated_pageblocks = has_isolate_pageblock(zone);
 
 	/*
@@ -1538,7 +1529,7 @@ static void free_pcppages_bulk(struct zone *zone, int count,
 		__free_one_page(page, page_to_pfn(page), zone, order, mt, FPI_NONE);
 		trace_mm_page_pcpu_drain(page, order, mt);
 	}
-	spin_unlock(&zone->lock);
+	spin_unlock_irqrestore(&zone->lock, flags);
 }
 
 static void free_one_page(struct zone *zone,
@@ -3076,7 +3067,7 @@ void drain_zone_pages(struct zone *zone, struct per_cpu_pages *pcp)
 
 	local_lock_irqsave(&pagesets.lock, flags);
 	batch = READ_ONCE(pcp->batch);
-	lp = pcp->lp;
+	lp = rcu_dereference_check(pcp->lp, lockdep_is_held(this_cpu_ptr(&pagesets.lock)));
 	to_drain = min(lp->count, batch);
 	if (to_drain > 0)
 		free_pcppages_bulk(zone, to_drain, pcp, lp);
@@ -3100,7 +3091,7 @@ static void drain_pages_zone(unsigned int cpu, struct zone *zone)
 	local_lock_irqsave(&pagesets.lock, flags);
 
 	pcp = per_cpu_ptr(zone->per_cpu_pageset, cpu);
-	lp = pcp->lp;
+	lp = rcu_dereference_check(pcp->lp, lockdep_is_held(this_cpu_ptr(&pagesets.lock)));
 	if (lp->count)
 		free_pcppages_bulk(zone, lp->count, pcp, lp);
 
@@ -3139,24 +3130,6 @@ void drain_local_pages(struct zone *zone)
 		drain_pages(cpu);
 }
 
-static void drain_local_pages_wq(struct work_struct *work)
-{
-	struct pcpu_drain *drain;
-
-	drain = container_of(work, struct pcpu_drain, work);
-
-	/*
-	 * drain_all_pages doesn't use proper cpu hotplug protection so
-	 * we can race with cpu offline when the WQ can move this from
-	 * a cpu pinned worker to an unbound one. We can operate on a different
-	 * cpu which is alright but we also have to make sure to not move to
-	 * a different one.
-	 */
-	preempt_disable();
-	drain_local_pages(drain->zone);
-	preempt_enable();
-}
-
 /*
  * The implementation of drain_all_pages(), exposing an extra parameter to
  * drain on all cpus.
@@ -3169,6 +3142,8 @@ static void drain_local_pages_wq(struct work_struct *work)
  */
 static void __drain_all_pages(struct zone *zone, bool force_all_cpus)
 {
+	struct per_cpu_pages *pcp;
+	struct zone *z;
 	int cpu;
 
 	/*
@@ -3178,13 +3153,6 @@ static void __drain_all_pages(struct zone *zone, bool force_all_cpus)
 	static cpumask_t cpus_with_pcps;
 
 	/*
-	 * Make sure nobody triggers this path before mm_percpu_wq is fully
-	 * initialized.
-	 */
-	if (WARN_ON_ONCE(!mm_percpu_wq))
-		return;
-
-	/*
 	 * Do not drain if one is already in progress unless it's specific to
 	 * a zone. Such callers are primarily CMA and memory hotplug and need
 	 * the drain to be complete when the call returns.
@@ -3202,8 +3170,6 @@ static void __drain_all_pages(struct zone *zone, bool force_all_cpus)
 	 * disables preemption as part of its processing
 	 */
 	for_each_online_cpu(cpu) {
-		struct per_cpu_pages *pcp;
-		struct zone *z;
 		bool has_pcps = false;
 		struct pcplists *lp;
 
@@ -3214,12 +3180,12 @@ static void __drain_all_pages(struct zone *zone, bool force_all_cpus)
 			 */
 			has_pcps = true;
 		} else if (zone) {
-			lp = per_cpu_ptr(zone->per_cpu_pageset, cpu)->lp;
+			lp = READ_ONCE(per_cpu_ptr(zone->per_cpu_pageset, cpu)->lp);
 			if (lp->count)
 				has_pcps = true;
 		} else {
 			for_each_populated_zone(z) {
-				lp = per_cpu_ptr(z->per_cpu_pageset, cpu)->lp;
+				lp = READ_ONCE(per_cpu_ptr(z->per_cpu_pageset, cpu)->lp);
 				if (lp->count) {
 					has_pcps = true;
 					break;
@@ -3233,16 +3199,36 @@ static void __drain_all_pages(struct zone *zone, bool force_all_cpus)
 			cpumask_clear_cpu(cpu, &cpus_with_pcps);
 	}
 
+	if (!force_all_cpus && cpumask_empty(&cpus_with_pcps))
+	       goto exit;
+
 	for_each_cpu(cpu, &cpus_with_pcps) {
-		struct pcpu_drain *drain = per_cpu_ptr(&pcpu_drain, cpu);
+	       for_each_populated_zone(z) {
+		       if (zone && zone != z)
+			       continue;
 
-		drain->zone = zone;
-		INIT_WORK(&drain->work, drain_local_pages_wq);
-		queue_work_on(cpu, mm_percpu_wq, &drain->work);
+		       pcp = per_cpu_ptr(z->per_cpu_pageset, cpu);
+		       pcp->drain = rcu_replace_pointer(pcp->lp, pcp->drain,
+					       mutex_is_locked(&pcpu_drain_mutex));
+	       }
 	}
-	for_each_cpu(cpu, &cpus_with_pcps)
-		flush_work(&per_cpu_ptr(&pcpu_drain, cpu)->work);
 
+	synchronize_rcu_expedited();
+
+	for_each_cpu(cpu, &cpus_with_pcps) {
+		for_each_populated_zone(z) {
+			int count;
+
+			pcp = per_cpu_ptr(z->per_cpu_pageset, cpu);
+			count = pcp->drain->count;
+			if (!count)
+			       continue;
+
+			free_pcppages_bulk(z, count, pcp, pcp->drain);
+		}
+	}
+
+exit:
 	mutex_unlock(&pcpu_drain_mutex);
 }
 
@@ -3378,7 +3364,7 @@ static void free_unref_page_commit(struct page *page, unsigned long pfn,
 
 	__count_vm_event(PGFREE);
 	pcp = this_cpu_ptr(zone->per_cpu_pageset);
-	lp = pcp->lp;
+	lp = rcu_dereference_check(pcp->lp, lockdep_is_held(this_cpu_ptr(&pagesets.lock)));
 	pindex = order_to_pindex(migratetype, order);
 	list_add(&page->lru, &lp->lists[pindex]);
 	lp->count += 1 << order;
@@ -3614,7 +3600,7 @@ struct page *__rmqueue_pcplist(struct zone *zone, unsigned int order,
 	struct pcplists *lp;
 	struct page *page;
 
-	lp = pcp->lp;
+	lp = rcu_dereference_check(pcp->lp, lockdep_is_held(this_cpu_ptr(&pagesets.lock)));
 	list = &lp->lists[order_to_pindex(migratetype, order)];
 
 	do {
@@ -5886,8 +5872,12 @@ void show_free_areas(unsigned int filter, nodemask_t *nodemask)
 		if (show_mem_node_skip(filter, zone_to_nid(zone), nodemask))
 			continue;
 
-		for_each_online_cpu(cpu)
-			free_pcp += per_cpu_ptr(zone->per_cpu_pageset, cpu)->lp->count;
+		for_each_online_cpu(cpu) {
+			struct pcplists *lp;
+
+			lp = READ_ONCE(per_cpu_ptr(zone->per_cpu_pageset, cpu)->lp);
+			free_pcp += lp->count;
+		}
 	}
 
 	printk("active_anon:%lu inactive_anon:%lu isolated_anon:%lu\n"
@@ -5980,8 +5970,12 @@ void show_free_areas(unsigned int filter, nodemask_t *nodemask)
 			continue;
 
 		free_pcp = 0;
-		for_each_online_cpu(cpu)
-			free_pcp += per_cpu_ptr(zone->per_cpu_pageset, cpu)->lp->count;
+		for_each_online_cpu(cpu) {
+			struct pcplists *lp;
+
+			lp = READ_ONCE(per_cpu_ptr(zone->per_cpu_pageset, cpu)->lp);
+			free_pcp += lp->count;
+		}
 
 		show_node(zone);
 		printk(KERN_CONT
@@ -6022,7 +6016,7 @@ void show_free_areas(unsigned int filter, nodemask_t *nodemask)
 			K(zone_page_state(zone, NR_MLOCK)),
 			K(zone_page_state(zone, NR_BOUNCE)),
 			K(free_pcp),
-			K(this_cpu_read(zone->per_cpu_pageset)->lp->count),
+			K(READ_ONCE(this_cpu_ptr(zone->per_cpu_pageset)->lp)->count),
 			K(zone_page_state(zone, NR_FREE_CMA_PAGES)));
 		printk("lowmem_reserve[]:");
 		for (i = 0; i < MAX_NR_ZONES; i++)
@@ -6886,10 +6880,13 @@ static void per_cpu_pages_init(struct per_cpu_pages *pcp, struct per_cpu_zonesta
 	memset(pcp, 0, sizeof(*pcp));
 	memset(pzstats, 0, sizeof(*pzstats));
 
-	pcp->lp = &ACCESS_PRIVATE(pcp, pcplists);
+	pcp->lp = &ACCESS_PRIVATE(pcp, pcplists[0]);
+	pcp->drain = &ACCESS_PRIVATE(pcp, pcplists[1]);
 
-	for (pindex = 0; pindex < NR_PCP_LISTS; pindex++)
+	for (pindex = 0; pindex < NR_PCP_LISTS; pindex++) {
 		INIT_LIST_HEAD(&pcp->lp->lists[pindex]);
+		INIT_LIST_HEAD(&pcp->drain->lists[pindex]);
+	}
 
 	/*
 	 * Set batch and high values safe for a boot pageset. A true percpu
diff --git a/mm/vmstat.c b/mm/vmstat.c
index 5279d3f..1ffa4fc 100644
--- a/mm/vmstat.c
+++ b/mm/vmstat.c
@@ -856,7 +856,7 @@ static int refresh_cpu_vm_stats(bool do_pagesets)
 			 * if not then there is nothing to expire.
 			 */
 			if (!__this_cpu_read(pcp->expire) ||
-			       !this_cpu_ptr(pcp)->lp->count)
+			       !READ_ONCE(this_cpu_ptr(pcp)->lp)->count)
 				continue;
 
 			/*
@@ -870,7 +870,7 @@ static int refresh_cpu_vm_stats(bool do_pagesets)
 			if (__this_cpu_dec_return(pcp->expire))
 				continue;
 
-			if (this_cpu_ptr(pcp)->lp->count) {
+			if (READ_ONCE(this_cpu_ptr(pcp)->lp)->count) {
 				drain_zone_pages(zone, this_cpu_ptr(pcp));
 				changes++;
 			}
@@ -1707,7 +1707,7 @@ static void zoneinfo_show_print(struct seq_file *m, pg_data_t *pgdat,
 			   "\n              high:  %i"
 			   "\n              batch: %i",
 			   i,
-			   pcp->lp->count,
+			   READ_ONCE(pcp->lp)->count,
 			   pcp->high,
 			   pcp->batch);
 #ifdef CONFIG_SMP