On Mon, 17 Dec 2018, Peter Zijlstra wrote:
I've put some patches here: git://git.kernel.org/pub/scm/linux/kernel/git/peterz/queue.git locking/core Could you have a look?
So how about the following to reduce some of the performance penalty (at the cost of more complexity)? Thanks, Davidlohr ----------8<----------------------------------------------------------- [PATCH] sched/wake_q: Reduce reference counting for special users Some users, specifically futexes and rwsems, required fixes that allowed the callers to be safe when wakeups occur before they are expected by wake_up_q(). Such scenarios also play games and rely on reference counting, and until now were pivoting on wake_q doing it. With the wake_q_add() call being moved down, this can no longer be the case. As such we end up with a double task refcounting overhead; and these callers care enough about this (being rather core-ish). This patch introduces a wake_q_add_tasksafe() call that serves for callers that have already done refcounting and therefore the task is 'safe' from wake_q point of view (int that it requires reference throughout the entire queue/wakeup cycle). These users also need to check the return value of the operation and do the put() if necessary when the cmpxchg() fails. Regular users of wake_q_add() that don't care about when the wakeup actually happens can just ignore the return value. Signed-off-by: Davidlohr Bueso <dbu...@suse.de> --- include/linux/sched/wake_q.h | 7 ++++-- kernel/futex.c | 4 ++-- kernel/locking/rwsem-xadd.c | 7 +++--- kernel/sched/core.c | 53 +++++++++++++++++++++++++++++++------------- 4 files changed, 49 insertions(+), 22 deletions(-) diff --git a/include/linux/sched/wake_q.h b/include/linux/sched/wake_q.h index 545f37138057..8c1fc6434c6c 100644 --- a/include/linux/sched/wake_q.h +++ b/include/linux/sched/wake_q.h @@ -51,8 +51,11 @@ static inline void wake_q_init(struct wake_q_head *head) head->lastp = &head->first; } -extern void wake_q_add(struct wake_q_head *head, - struct task_struct *task); +extern bool wake_q_add(struct wake_q_head *head, + struct task_struct *task); +extern bool wake_q_add_tasksafe(struct wake_q_head *head, + struct task_struct *task); + extern void wake_up_q(struct wake_q_head *head); #endif /* _LINUX_SCHED_WAKE_Q_H */ diff --git a/kernel/futex.c b/kernel/futex.c index d14971f6ed3d..2ff7e811f13b 100644 --- a/kernel/futex.c +++ b/kernel/futex.c @@ -1402,8 +1402,8 @@ static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q) * Queue the task for later wakeup for after we've released * the hb->lock. wake_q_add() grabs reference to p. */ - wake_q_add(wake_q, p); - put_task_struct(p); + if (!wake_q_add_tasksafe(wake_q, p)) + put_task_struct(p); } /* diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c index 50d9af615dc4..dea4dcf9d8f5 100644 --- a/kernel/locking/rwsem-xadd.c +++ b/kernel/locking/rwsem-xadd.c @@ -211,9 +211,10 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem, * Ensure issuing the wakeup (either by us or someone else) * after setting the reader waiter to nil. */ - wake_q_add(wake_q, tsk); - /* wake_q_add() already take the task ref */ - put_task_struct(tsk); + if (!wake_q_add_tasksafe(wake_q, tsk)) { + /* wake_q_add() already take the task ref */ + put_task_struct(tsk); + } } adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment; diff --git a/kernel/sched/core.c b/kernel/sched/core.c index d740d7a3608d..2c1825fe46e6 100644 --- a/kernel/sched/core.c +++ b/kernel/sched/core.c @@ -396,19 +396,8 @@ static bool set_nr_if_polling(struct task_struct *p) #endif #endif -/** - * wake_q_add() - queue a wakeup for 'later' waking. - * @head: the wake_q_head to add @task to - * @task: the task to queue for 'later' wakeup - * - * Queue a task for later wakeup, most likely by the wake_up_q() call in the - * same context, _HOWEVER_ this is not guaranteed, the wakeup can come - * instantly. - * - * This function must be used as-if it were wake_up_process(); IOW the task - * must be ready to be woken at this location. - */ -void wake_q_add(struct wake_q_head *head, struct task_struct *task) +bool __wake_q_add(struct wake_q_head *head, + struct task_struct *task, bool tasksafe) { struct wake_q_node *node = &task->wake_q; @@ -422,15 +411,49 @@ void wake_q_add(struct wake_q_head *head, struct task_struct *task) */ smp_mb__before_atomic(); if (unlikely(cmpxchg_relaxed(&node->next, NULL, WAKE_Q_TAIL))) - return; + return false; - get_task_struct(task); + if (!tasksafe) + get_task_struct(task); /* * The head is context local, there can be no concurrency. */ *head->lastp = node; head->lastp = &node->next; + return true; +} + +/** + * wake_q_add() - queue a wakeup for 'later' waking. + * @head: the wake_q_head to add @task to + * @task: the task to queue for 'later' wakeup + * + * Queue a task for later wakeup, most likely by the wake_up_q() call in the + * same context, _HOWEVER_ this is not guaranteed, the wakeup can come + * instantly. + * + * This function must be used as-if it were wake_up_process(); IOW the task + * must be ready to be woken at this location. + * + * Returns whether or not the task was successfully queued for wakeup. + * If false, the task is already queued and can happen at any time after + * this call. + */ +bool wake_q_add(struct wake_q_head *head, struct task_struct *task) +{ + return __wake_q_add(head, task, false); +} + +/* + * wake_q_add_tasksafe() is the same as the above wake_q_add(), except that + * the caller has already done the task reference counting for us. Normally + * the 'tasksafe' caller will check the return value and cleanup refcounting + * accordingly. + */ +bool wake_q_add_tasksafe(struct wake_q_head *head, struct task_struct *task) +{ + return __wake_q_add(head, task, true); } void wake_up_q(struct wake_q_head *head) -- 2.16.4