In the current log space reservation slowpath code, the log space
waiters are waken up by an incoming waiter while holding the lock. As
the process of waking up a task can be time consuming, doing it while
holding the lock can make spinlock contention, if present, more severe.

This patch changes the slowpath code to use the wake_q for waking up
tasks without holding the lock, thus improving performance and reducing
spinlock contention level.

Running the AIM7 fserver workload on a 2-socket 24-core 48-thread
Broadwell system with a small xfs filesystem on ramfs, the performance
increased from 192,666 jobs/min to 285,221 with this change.

Signed-off-by: Waiman Long <long...@redhat.com>
---
 fs/xfs/xfs_linux.h |  1 +
 fs/xfs/xfs_log.c   | 50 ++++++++++++++++++++++++++++++++++++----------
 2 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/fs/xfs/xfs_linux.h b/fs/xfs/xfs_linux.h
index edbd5a210df2..1548a353da1e 100644
--- a/fs/xfs/xfs_linux.h
+++ b/fs/xfs/xfs_linux.h
@@ -60,6 +60,7 @@ typedef __u32                 xfs_nlink_t;
 #include <linux/list_sort.h>
 #include <linux/ratelimit.h>
 #include <linux/rhashtable.h>
+#include <linux/sched/wake_q.h>
 
 #include <asm/page.h>
 #include <asm/div64.h>
diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c
index ac1dc8db7112..70d5f85ff059 100644
--- a/fs/xfs/xfs_log.c
+++ b/fs/xfs/xfs_log.c
@@ -221,7 +221,8 @@ STATIC bool
 xlog_grant_head_wake(
        struct xlog             *log,
        struct xlog_grant_head  *head,
-       int                     *free_bytes)
+       int                     *free_bytes,
+       struct wake_q_head      *wakeq)
 {
        struct xlog_ticket      *tic;
        int                     need_bytes;
@@ -240,7 +241,7 @@ xlog_grant_head_wake(
                        continue;
 
                trace_xfs_log_grant_wake_up(log, tic);
-               wake_up_process(tic->t_task);
+               wake_q_add(wakeq, tic->t_task);
                tic->t_flags |= XLOG_TIC_WAKING;
        }
 
@@ -252,8 +253,9 @@ xlog_grant_head_wait(
        struct xlog             *log,
        struct xlog_grant_head  *head,
        struct xlog_ticket      *tic,
-       int                     need_bytes) __releases(&head->lock)
-                                           __acquires(&head->lock)
+       int                     need_bytes,
+       struct wake_q_head      *wakeq) __releases(&head->lock)
+                                       __acquires(&head->lock)
 {
        list_add_tail(&tic->t_queue, &head->waiters);
 
@@ -265,6 +267,11 @@ xlog_grant_head_wait(
                __set_current_state(TASK_UNINTERRUPTIBLE);
                spin_unlock(&head->lock);
 
+               if (wakeq) {
+                       wake_up_q(wakeq);
+                       wakeq = NULL;
+               }
+
                XFS_STATS_INC(log->l_mp, xs_sleep_logspace);
 
                trace_xfs_log_grant_sleep(log, tic);
@@ -272,7 +279,21 @@ xlog_grant_head_wait(
                trace_xfs_log_grant_wake(log, tic);
 
                spin_lock(&head->lock);
-               tic->t_flags &= ~XLOG_TIC_WAKING;
+               /*
+                * The XLOG_TIC_WAKING flag should be set. However, it is
+                * very unlikely that the current task is still in the
+                * wake_q. If that happens (maybe anonymous wakeup), we
+                * have to wait until the task is dequeued before proceeding
+                * to avoid the possibility of having the task put into
+                * another wake_q simultaneously.
+                */
+               if (tic->t_flags & XLOG_TIC_WAKING) {
+                       while (task_in_wake_q(current))
+                               cpu_relax();
+
+                       tic->t_flags &= ~XLOG_TIC_WAKING;
+               }
+
                if (XLOG_FORCED_SHUTDOWN(log))
                        goto shutdown;
        } while (xlog_space_left(log, &head->grant) < need_bytes);
@@ -310,6 +331,7 @@ xlog_grant_head_check(
 {
        int                     free_bytes;
        int                     error = 0;
+       DEFINE_WAKE_Q(wakeq);
 
        ASSERT(!(log->l_flags & XLOG_ACTIVE_RECOVERY));
 
@@ -323,15 +345,17 @@ xlog_grant_head_check(
        free_bytes = xlog_space_left(log, &head->grant);
        if (!list_empty_careful(&head->waiters)) {
                spin_lock(&head->lock);
-               if (!xlog_grant_head_wake(log, head, &free_bytes) ||
+               if (!xlog_grant_head_wake(log, head, &free_bytes, &wakeq) ||
                    free_bytes < *need_bytes) {
                        error = xlog_grant_head_wait(log, head, tic,
-                                                    *need_bytes);
+                                                    *need_bytes, &wakeq);
+                       wake_q_init(&wakeq);    /* Set wake_q to empty */
                }
                spin_unlock(&head->lock);
+               wake_up_q(&wakeq);
        } else if (free_bytes < *need_bytes) {
                spin_lock(&head->lock);
-               error = xlog_grant_head_wait(log, head, tic, *need_bytes);
+               error = xlog_grant_head_wait(log, head, tic, *need_bytes, NULL);
                spin_unlock(&head->lock);
        }
 
@@ -1077,6 +1101,7 @@ xfs_log_space_wake(
 {
        struct xlog             *log = mp->m_log;
        int                     free_bytes;
+       DEFINE_WAKE_Q(wakeq);
 
        if (XLOG_FORCED_SHUTDOWN(log))
                return;
@@ -1086,8 +1111,11 @@ xfs_log_space_wake(
 
                spin_lock(&log->l_write_head.lock);
                free_bytes = xlog_space_left(log, &log->l_write_head.grant);
-               xlog_grant_head_wake(log, &log->l_write_head, &free_bytes);
+               xlog_grant_head_wake(log, &log->l_write_head, &free_bytes,
+                                    &wakeq);
                spin_unlock(&log->l_write_head.lock);
+               wake_up_q(&wakeq);
+               wake_q_init(&wakeq); /* Re-init wake_q to be reused again */
        }
 
        if (!list_empty_careful(&log->l_reserve_head.waiters)) {
@@ -1095,8 +1123,10 @@ xfs_log_space_wake(
 
                spin_lock(&log->l_reserve_head.lock);
                free_bytes = xlog_space_left(log, &log->l_reserve_head.grant);
-               xlog_grant_head_wake(log, &log->l_reserve_head, &free_bytes);
+               xlog_grant_head_wake(log, &log->l_reserve_head, &free_bytes,
+                                    &wakeq);
                spin_unlock(&log->l_reserve_head.lock);
+               wake_up_q(&wakeq);
        }
 }
 
-- 
2.18.0

Reply via email to