Currently, rcu_normal_wake_from_gp is only enabled by default
on small systems(<= 16 CPUs) or when a user explicitly set it
enabled.

Introduce an adaptive latching mechanism:
 * Track the number of in-flight synchronize_rcu() requests
   using a new rcu_sr_normal_count counter;

 * If the count reaches/exceeds RCU_SR_NORMAL_LATCH_THR(64),
   it sets the rcu_sr_normal_latched, reverting new requests
   onto the scaled wait_rcu_gp() path;

 * The latch is cleared only when the pending requests are fully
   drained(nr == 0);

 * Enables rcu_normal_wake_from_gp by default for all systems,
   relying on this dynamic throttling instead of static CPU
   limits.

Testing(synthetic flood workload):
  * Kernel version: 6.19.0-rc6
  * Number of CPUs: 1536
  * 60K concurrent synchronize_rcu() calls

Perf(cycles, system-wide):
  total cycles: 932020263832
  rcu_sr_normal_add_req(): 2650282811 cycles(~0.28%)

Perf report excerpt:
  0.01%  0.01%  sync_test/...  [k] rcu_sr_normal_add_req

Measured overhead of rcu_sr_normal_add_req() remained ~0.28%
of total CPU cycles in this synthetic stress test.

Tested-by: Samir M <[email protected]>
Suggested-by: Joel Fernandes <[email protected]>
Signed-off-by: Uladzislau Rezki (Sony) <[email protected]>
---
 .../admin-guide/kernel-parameters.txt         | 10 ++---
 kernel/rcu/tree.c                             | 41 +++++++++++++------
 2 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/Documentation/admin-guide/kernel-parameters.txt 
b/Documentation/admin-guide/kernel-parameters.txt
index cb850e5290c2..d0574a02510d 100644
--- a/Documentation/admin-guide/kernel-parameters.txt
+++ b/Documentation/admin-guide/kernel-parameters.txt
@@ -5867,13 +5867,13 @@ Kernel parameters
                        use a call_rcu[_hurry]() path. Please note, this is for 
a
                        normal grace period.
 
-                       How to enable it:
+                       How to disable it:
 
-                       echo 1 > 
/sys/module/rcutree/parameters/rcu_normal_wake_from_gp
-                       or pass a boot parameter 
"rcutree.rcu_normal_wake_from_gp=1"
+                       echo 0 > 
/sys/module/rcutree/parameters/rcu_normal_wake_from_gp
+                       or pass a boot parameter 
"rcutree.rcu_normal_wake_from_gp=0"
 
-                       Default is 1 if num_possible_cpus() <= 16 and it is not 
explicitly
-                       disabled by the boot parameter passing 0.
+                       Default is 1 if it is not explicitly disabled by the 
boot parameter
+                       passing 0.
 
        rcuscale.gp_async= [KNL]
                        Measure performance of asynchronous
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 55df6d37145e..86dc88a70fd0 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1632,17 +1632,21 @@ static void rcu_sr_put_wait_head(struct llist_node 
*node)
        atomic_set_release(&sr_wn->inuse, 0);
 }
 
-/* Enable rcu_normal_wake_from_gp automatically on small systems. */
-#define WAKE_FROM_GP_CPU_THRESHOLD 16
-
-static int rcu_normal_wake_from_gp = -1;
+static int rcu_normal_wake_from_gp = 1;
 module_param(rcu_normal_wake_from_gp, int, 0644);
 static struct workqueue_struct *sync_wq;
 
+#define RCU_SR_NORMAL_LATCH_THR 64
+
+/* Number of in-flight synchronize_rcu() calls queued on srs_next. */
+static atomic_long_t rcu_sr_normal_count;
+static atomic_t rcu_sr_normal_latched;
+
 static void rcu_sr_normal_complete(struct llist_node *node)
 {
        struct rcu_synchronize *rs = container_of(
                (struct rcu_head *) node, struct rcu_synchronize, head);
+       long nr;
 
        WARN_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) &&
                !poll_state_synchronize_rcu_full(&rs->oldstate),
@@ -1650,6 +1654,15 @@ static void rcu_sr_normal_complete(struct llist_node 
*node)
 
        /* Finally. */
        complete(&rs->completion);
+       nr = atomic_long_dec_return(&rcu_sr_normal_count);
+       WARN_ON_ONCE(nr < 0);
+
+       /*
+        * Unlatch: switch back to normal path when fully
+        * drained and if it has been latched.
+        */
+       if (nr == 0)
+               (void)atomic_cmpxchg(&rcu_sr_normal_latched, 1, 0);
 }
 
 static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work)
@@ -1795,7 +1808,14 @@ static bool rcu_sr_normal_gp_init(void)
 
 static void rcu_sr_normal_add_req(struct rcu_synchronize *rs)
 {
+       long nr;
+
        llist_add((struct llist_node *) &rs->head, &rcu_state.srs_next);
+       nr = atomic_long_inc_return(&rcu_sr_normal_count);
+
+       /* Latch: only when flooded and if unlatched. */
+       if (nr >= RCU_SR_NORMAL_LATCH_THR)
+               (void)atomic_cmpxchg(&rcu_sr_normal_latched, 0, 1);
 }
 
 /*
@@ -3278,14 +3298,15 @@ static void synchronize_rcu_normal(void)
 {
        struct rcu_synchronize rs;
 
+       init_rcu_head_on_stack(&rs.head);
        trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("request"));
 
-       if (READ_ONCE(rcu_normal_wake_from_gp) < 1) {
+       if (READ_ONCE(rcu_normal_wake_from_gp) < 1 ||
+                       atomic_read(&rcu_sr_normal_latched)) {
                wait_rcu_gp(call_rcu_hurry);
                goto trace_complete_out;
        }
 
-       init_rcu_head_on_stack(&rs.head);
        init_completion(&rs.completion);
 
        /*
@@ -3302,10 +3323,10 @@ static void synchronize_rcu_normal(void)
 
        /* Now we can wait. */
        wait_for_completion(&rs.completion);
-       destroy_rcu_head_on_stack(&rs.head);
 
 trace_complete_out:
        trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("complete"));
+       destroy_rcu_head_on_stack(&rs.head);
 }
 
 /**
@@ -4904,12 +4925,6 @@ void __init rcu_init(void)
        sync_wq = alloc_workqueue("sync_wq", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
        WARN_ON(!sync_wq);
 
-       /* Respect if explicitly disabled via a boot parameter. */
-       if (rcu_normal_wake_from_gp < 0) {
-               if (num_possible_cpus() <= WAKE_FROM_GP_CPU_THRESHOLD)
-                       rcu_normal_wake_from_gp = 1;
-       }
-
        /* Fill in default value for rcutree.qovld boot parameter. */
        /* -After- the rcu_node ->lock fields are initialized! */
        if (qovld < 0)
-- 
2.47.3


Reply via email to