| From: Sebastian Andrzej Siewior <bigeasy@linutronix.de> |
| Date: Fri, 30 Oct 2015 11:59:07 +0100 |
| Subject: ipc/msg: Implement lockless pipelined wakeups |
| |
| This patch moves the wakeup_process() invocation so it is not done under |
| the perm->lock by making use of a lockless wake_q. With this change, the |
| waiter is woken up once the message has been assigned and it does not |
| need to loop on SMP if the message points to NULL. In the signal case we |
| still need to check the pointer under the lock to verify the state. |
| |
| This change should also avoid the introduction of preempt_disable() in |
| -RT which avoids a busy-loop which pools for the NULL -> !NULL |
| change if the waiter has a higher priority compared to the waker. |
| |
| Cc: Davidlohr Bueso <dave@stgolabs.net> |
| Cc: Manfred Spraul <manfred@colorfullife.com> |
| Cc: Andrew Morton <akpm@linux-foundation.org> |
| Cc: George Spelvin <linux@horizon.com> |
| Cc: Thomas Gleixner <tglx@linutronix.de> |
| Cc: Peter Zijlstra <peterz@infradead.org> |
| Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de> |
| --- |
| |
| ipc/msg.c | 101 +++++++++++++++++--------------------------------------------- |
| 1 file changed, 28 insertions(+), 73 deletions(-) |
| |
| --- a/ipc/msg.c |
| +++ b/ipc/msg.c |
| @@ -183,20 +183,14 @@ static void ss_wakeup(struct list_head * |
| } |
| } |
| |
| -static void expunge_all(struct msg_queue *msq, int res) |
| +static void expunge_all(struct msg_queue *msq, int res, |
| + struct wake_q_head *wake_q) |
| { |
| struct msg_receiver *msr, *t; |
| |
| list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) { |
| - msr->r_msg = NULL; /* initialize expunge ordering */ |
| - wake_up_process(msr->r_tsk); |
| - /* |
| - * Ensure that the wakeup is visible before setting r_msg as |
| - * the receiving end depends on it: either spinning on a nil, |
| - * or dealing with -EAGAIN cases. See lockless receive part 1 |
| - * and 2 in do_msgrcv(). |
| - */ |
| - smp_wmb(); /* barrier (B) */ |
| + |
| + wake_q_add(wake_q, msr->r_tsk); |
| msr->r_msg = ERR_PTR(res); |
| } |
| } |
| @@ -213,11 +207,13 @@ static void freeque(struct ipc_namespace |
| { |
| struct msg_msg *msg, *t; |
| struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm); |
| + WAKE_Q(wake_q); |
| |
| - expunge_all(msq, -EIDRM); |
| + expunge_all(msq, -EIDRM, &wake_q); |
| ss_wakeup(&msq->q_senders, 1); |
| msg_rmid(ns, msq); |
| ipc_unlock_object(&msq->q_perm); |
| + wake_up_q(&wake_q); |
| rcu_read_unlock(); |
| |
| list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) { |
| @@ -342,6 +338,7 @@ static int msgctl_down(struct ipc_namesp |
| struct kern_ipc_perm *ipcp; |
| struct msqid64_ds uninitialized_var(msqid64); |
| struct msg_queue *msq; |
| + WAKE_Q(wake_q); |
| int err; |
| |
| if (cmd == IPC_SET) { |
| @@ -389,7 +386,7 @@ static int msgctl_down(struct ipc_namesp |
| /* sleeping receivers might be excluded by |
| * stricter permissions. |
| */ |
| - expunge_all(msq, -EAGAIN); |
| + expunge_all(msq, -EAGAIN, &wake_q); |
| /* sleeping senders might be able to send |
| * due to a larger queue size. |
| */ |
| @@ -402,6 +399,7 @@ static int msgctl_down(struct ipc_namesp |
| |
| out_unlock0: |
| ipc_unlock_object(&msq->q_perm); |
| + wake_up_q(&wake_q); |
| out_unlock1: |
| rcu_read_unlock(); |
| out_up: |
| @@ -566,7 +564,8 @@ static int testmsg(struct msg_msg *msg, |
| return 0; |
| } |
| |
| -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) |
| +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg, |
| + struct wake_q_head *wake_q) |
| { |
| struct msg_receiver *msr, *t; |
| |
| @@ -577,27 +576,13 @@ static inline int pipelined_send(struct |
| |
| list_del(&msr->r_list); |
| if (msr->r_maxsize < msg->m_ts) { |
| - /* initialize pipelined send ordering */ |
| - msr->r_msg = NULL; |
| - wake_up_process(msr->r_tsk); |
| - /* barrier (B) see barrier comment below */ |
| - smp_wmb(); |
| + wake_q_add(wake_q, msr->r_tsk); |
| msr->r_msg = ERR_PTR(-E2BIG); |
| } else { |
| - msr->r_msg = NULL; |
| msq->q_lrpid = task_pid_vnr(msr->r_tsk); |
| msq->q_rtime = get_seconds(); |
| - wake_up_process(msr->r_tsk); |
| - /* |
| - * Ensure that the wakeup is visible before |
| - * setting r_msg, as the receiving can otherwise |
| - * exit - once r_msg is set, the receiver can |
| - * continue. See lockless receive part 1 and 2 |
| - * in do_msgrcv(). Barrier (B). |
| - */ |
| - smp_wmb(); |
| + wake_q_add(wake_q, msr->r_tsk); |
| msr->r_msg = msg; |
| - |
| return 1; |
| } |
| } |
| @@ -613,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, vo |
| struct msg_msg *msg; |
| int err; |
| struct ipc_namespace *ns; |
| + WAKE_Q(wake_q); |
| |
| ns = current->nsproxy->ipc_ns; |
| |
| @@ -698,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, vo |
| msq->q_lspid = task_tgid_vnr(current); |
| msq->q_stime = get_seconds(); |
| |
| - if (!pipelined_send(msq, msg)) { |
| + if (!pipelined_send(msq, msg, &wake_q)) { |
| /* no one is waiting for this message, enqueue it */ |
| list_add_tail(&msg->m_list, &msq->q_messages); |
| msq->q_cbytes += msgsz; |
| @@ -712,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, vo |
| |
| out_unlock0: |
| ipc_unlock_object(&msq->q_perm); |
| + wake_up_q(&wake_q); |
| out_unlock1: |
| rcu_read_unlock(); |
| if (msg != NULL) |
| @@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *b |
| rcu_read_lock(); |
| |
| /* Lockless receive, part 2: |
| - * Wait until pipelined_send or expunge_all are outside of |
| - * wake_up_process(). There is a race with exit(), see |
| - * ipc/mqueue.c for the details. The correct serialization |
| - * ensures that a receiver cannot continue without the wakeup |
| - * being visibible _before_ setting r_msg: |
| + * The work in pipelined_send() and expunge_all(): |
| + * - Set pointer to message |
| + * - Queue the receiver task for later wakeup |
| + * - Wake up the process after the lock is dropped. |
| * |
| - * CPU 0 CPU 1 |
| - * <loop receiver> |
| - * smp_rmb(); (A) <-- pair -. <waker thread> |
| - * <load ->r_msg> | msr->r_msg = NULL; |
| - * | wake_up_process(); |
| - * <continue> `------> smp_wmb(); (B) |
| - * msr->r_msg = msg; |
| - * |
| - * Where (A) orders the message value read and where (B) orders |
| - * the write to the r_msg -- done in both pipelined_send and |
| - * expunge_all. |
| + * Should the process wake up before this wakeup (due to a |
| + * signal) it will either see the message and continue … |
| */ |
| - for (;;) { |
| - /* |
| - * Pairs with writer barrier in pipelined_send |
| - * or expunge_all. |
| - */ |
| - smp_rmb(); /* barrier (A) */ |
| - msg = (struct msg_msg *)msr_d.r_msg; |
| - if (msg) |
| - break; |
| |
| - /* |
| - * The cpu_relax() call is a compiler barrier |
| - * which forces everything in this loop to be |
| - * re-loaded. |
| - */ |
| - cpu_relax(); |
| - } |
| - |
| - /* Lockless receive, part 3: |
| - * If there is a message or an error then accept it without |
| - * locking. |
| - */ |
| + msg = (struct msg_msg *)msr_d.r_msg; |
| if (msg != ERR_PTR(-EAGAIN)) |
| goto out_unlock1; |
| |
| - /* Lockless receive, part 3: |
| - * Acquire the queue spinlock. |
| - */ |
| + /* |
| + * … or see -EAGAIN, acquire the lock to check the message |
| + * again. |
| + */ |
| ipc_lock_object(&msq->q_perm); |
| |
| - /* Lockless receive, part 4: |
| - * Repeat test after acquiring the spinlock. |
| - */ |
| msg = (struct msg_msg *)msr_d.r_msg; |
| if (msg != ERR_PTR(-EAGAIN)) |
| goto out_unlock0; |