RCU callback execution can add significant OS jitter and also can degrade
scheduling latency.  This commit therefore adds the ability for selected
CPUs ("rcu_nocbs=" boot parameter) to have their callbacks offloaded to
kthreads.  If the "rcu_nocb_poll" boot parameter is also specified, these
kthreads will do polling, removing the need for the offloaded CPUs to do
wakeups.  At least one CPU must be doing normal callback processing:
currently CPU 0 cannot be selected as a no-CBs CPU.  In addition, attempts
to offline the last normal-CBs CPU will fail.

This is an experimental patch, so just FYI for the moment.  Known
shortcomings include:

o       The counters should be atomic_long_t rather than atomic_t.

o       No-CBs CPUs can be configured only at boot time.

o       Only a modest number of CPUs can be configured as no-CBs CPUs.
        Definitely a few tens, perhaps a few hundred, but no way thousands.

o       At least one CPU must remain a normal-CBs CPU.

o       Not much in the way of energy-efficiency features, though there
        are some natural energy savings inherent in the implementation
        
o       The per-no-CBs-CPU kthreads are not subject to RCU priority boosting.

o       Care is required when setting the kthreads to RT priority.

Later versions will address some of them, but others are likely to remain.

Signed-off-by: Paul E. McKenney <paul...@linux.vnet.ibm.com>

diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h
index 5bde94d..d4f559b 100644
--- a/include/trace/events/rcu.h
+++ b/include/trace/events/rcu.h
@@ -549,6 +549,7 @@ TRACE_EVENT(rcu_torture_read,
  *     "EarlyExit": rcu_barrier_callback() piggybacked, thus early exit.
  *     "Inc1": rcu_barrier_callback() piggyback check counter incremented.
  *     "Offline": rcu_barrier_callback() found offline CPU
+ *     "OnlineNoCB": rcu_barrier_callback() found online no-CBs CPU.
  *     "OnlineQ": rcu_barrier_callback() found online CPU with callbacks.
  *     "OnlineNQ": rcu_barrier_callback() found online CPU, no callbacks.
  *     "IRQ": An rcu_barrier_callback() callback posted on remote CPU.
diff --git a/init/Kconfig b/init/Kconfig
index c26b8a1..ccda03a 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -598,6 +598,25 @@ config RCU_BOOST_DELAY
 
          Accept the default if unsure.
 
+config RCU_NOCB_CPU
+       bool "Offload RCU callback processing from boot-selected CPUs"
+       depends on TREE_RCU || TREE_PREEMPT_RCU
+       default n
+       help
+         Use this option to reduce OS jitter for aggressive HPC or
+         real-time workloads.
+
+         This option offloads callback invocation from the set of CPUs
+         specified at boot time by the rcu_nocbs parameter.  For each
+         such CPU, a kthread ("rcuoN") will be created to invoke callbacks.
+         Nothing prevents this kthread from running on of of the specified
+         CPUs, but (1) the kthreads may be preempted between each callback
+         and (2) affinity or cgroups can be used to force the kthreads off
+         of those CPUs if desired.
+
+         Say Y here if you want reduced OS jitter on selected CPUs.
+         Say N here if you are unsure.
+
 endmenu # "RCU Subsystem"
 
 config IKCONFIG
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 791aea0..cfc5a91 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -303,7 +303,8 @@ EXPORT_SYMBOL_GPL(rcu_sched_force_quiescent_state);
 static int
 cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 {
-       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
+              rdp->nxttail[RCU_DONE_TAIL] != NULL;
 }
 
 /*
@@ -312,7 +313,9 @@ cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 static int
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-       return *rdp->nxttail[RCU_DONE_TAIL] && !rcu_gp_in_progress(rsp);
+       return rdp->nxttail[RCU_DONE_TAIL] &&
+              *rdp->nxttail[RCU_DONE_TAIL] &&
+              !rcu_gp_in_progress(rsp);
 }
 
 /*
@@ -1071,6 +1074,7 @@ static void init_callback_list(struct rcu_data *rdp)
        rdp->nxtlist = NULL;
        for (i = 0; i < RCU_NEXT_SIZE; i++)
                rdp->nxttail[i] = &rdp->nxtlist;
+       init_nocb_callback_list(rdp);
 }
 
 /*
@@ -1560,6 +1564,10 @@ static void
 rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
                          struct rcu_node *rnp, struct rcu_data *rdp)
 {
+       /* No-CBs CPUs do not have orphanable callbacks. */
+       if (is_nocb_cpu(rdp->cpu))
+               return;
+
        /*
         * Orphan the callbacks.  First adjust the counts.  This is safe
         * because ->onofflock excludes _rcu_barrier()'s adoption of
@@ -1611,6 +1619,10 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
        int i;
        struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
+       /* No-CBs CPUs are handled specially. */
+       if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+               return;
+
        /* Do the accounting first. */
        rdp->qlen_lazy += rsp->qlen_lazy;
        rdp->qlen += rsp->qlen;
@@ -2087,9 +2099,15 @@ static void __call_rcu_core(struct rcu_state *rsp, 
struct rcu_data *rdp,
        }
 }
 
+/*
+ * Helper function for call_rcu() and friends.  The cpu argument will
+ * normally be -1, indicating "currently running CPU".  It may specify
+ * a CPU only if that CPU is a no-CBs CPU.  Currently, only _rcu_barrier()
+ * is expected to specify a CPU.
+ */
 static void
 __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
-          struct rcu_state *rsp, bool lazy)
+          struct rcu_state *rsp, int cpu, bool lazy)
 {
        unsigned long flags;
        struct rcu_data *rdp;
@@ -2109,9 +2127,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct 
rcu_head *rcu),
        rdp = this_cpu_ptr(rsp->rda);
 
        /* Add the callback to our list. */
-       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) {
+       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+               int offline;
+
+               if (cpu != -1)
+                       rdp = per_cpu_ptr(rsp->rda, cpu);
+               offline = !__call_rcu_nocb(rdp, head, lazy);
+               WARN_ON_ONCE(offline);
                /* _call_rcu() is illegal on offline CPU; leak the callback. */
-               WARN_ON_ONCE(1);
                local_irq_restore(flags);
                return;
        }
@@ -2140,7 +2163,7 @@ __call_rcu(struct rcu_head *head, void (*func)(struct 
rcu_head *rcu),
  */
 void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_sched_state, 0);
+       __call_rcu(head, func, &rcu_sched_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_sched);
 
@@ -2149,7 +2172,7 @@ EXPORT_SYMBOL_GPL(call_rcu_sched);
  */
 void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_bh_state, 0);
+       __call_rcu(head, func, &rcu_bh_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
@@ -2538,9 +2561,17 @@ static void _rcu_barrier(struct rcu_state *rsp)
         * When that callback is invoked, we will know that all of the
         * corresponding CPU's preceding callbacks have been invoked.
         */
-       for_each_online_cpu(cpu) {
+       for_each_possible_cpu(cpu) {
+               if (!cpu_online(cpu) && !is_nocb_cpu(cpu))
+                       continue;
                rdp = per_cpu_ptr(rsp->rda, cpu);
-               if (ACCESS_ONCE(rdp->qlen)) {
+               if (is_nocb_cpu(cpu)) {
+                       _rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
+                                          rsp->n_barrier_done);
+                       atomic_inc(&rsp->barrier_cpu_count);
+                       __call_rcu(&rdp->barrier_head, rcu_barrier_callback,
+                                  rsp, cpu, 0);
+               } else if (ACCESS_ONCE(rdp->qlen)) {
                        _rcu_barrier_trace(rsp, "OnlineQ", cpu,
                                           rsp->n_barrier_done);
                        smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -2614,6 +2645,7 @@ rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
 #endif
        rdp->cpu = cpu;
        rdp->rsp = rsp;
+       rcu_boot_init_nocb_percpu_data(rdp);
        raw_spin_unlock_irqrestore(&rnp->lock, flags);
 }
 
@@ -2699,6 +2731,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block 
*self,
        struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
        struct rcu_node *rnp = rdp->mynode;
        struct rcu_state *rsp;
+       int ret = NOTIFY_OK;
 
        trace_rcu_utilization("Start CPU hotplug");
        switch (action) {
@@ -2713,8 +2746,12 @@ static int __cpuinit rcu_cpu_notify(struct 
notifier_block *self,
                rcu_cpu_kthread_setrt(cpu, 1);
                break;
        case CPU_DOWN_PREPARE:
-               rcu_node_kthread_setaffinity(rnp, cpu);
-               rcu_cpu_kthread_setrt(cpu, 0);
+               if (nocb_cpu_expendable(cpu)) {
+                       rcu_node_kthread_setaffinity(rnp, cpu);
+                       rcu_cpu_kthread_setrt(cpu, 0);
+               } else {
+                       ret = NOTIFY_BAD;
+               }
                break;
        case CPU_DYING:
        case CPU_DYING_FROZEN:
@@ -2738,7 +2775,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block 
*self,
                break;
        }
        trace_rcu_utilization("End CPU hotplug");
-       return NOTIFY_OK;
+       return ret;
 }
 
 /*
@@ -2758,6 +2795,7 @@ static int __init rcu_spawn_gp_kthread(void)
                raw_spin_lock_irqsave(&rnp->lock, flags);
                rsp->gp_kthread = t;
                raw_spin_unlock_irqrestore(&rnp->lock, flags);
+               rcu_spawn_nocb_kthreads(rsp);
        }
        return 0;
 }
@@ -2952,6 +2990,7 @@ void __init rcu_init(void)
        rcu_init_one(&rcu_sched_state, &rcu_sched_data);
        rcu_init_one(&rcu_bh_state, &rcu_bh_data);
        __rcu_init_preempt();
+       rcu_init_nocb();
         open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 
        /*
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 499d661..5e5568d 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -323,6 +323,18 @@ struct rcu_data {
        struct rcu_head oom_head;
 #endif /* #ifdef CONFIG_RCU_FAST_NO_HZ */
 
+       /* 7) Callback offloading. */
+#ifdef CONFIG_RCU_NOCB_CPU
+       struct rcu_head *nocb_head;     /* CBs waiting for kthread. */
+       struct rcu_head **nocb_tail;
+       atomic_t nocb_q_count;          /* # CBs waiting for kthread */
+       atomic_t nocb_q_count_lazy;     /*  (approximate). */
+       int nocb_p_count;               /* # CBs being invoked by kthread */
+       int nocb_p_count_lazy;          /*  (approximate). */
+       wait_queue_head_t nocb_wq;      /* For nocb kthreads to sleep on. */
+       struct task_struct *nocb_kthread;
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
        int cpu;
        struct rcu_state *rsp;
 };
@@ -375,6 +387,12 @@ struct rcu_state {
        struct rcu_data __percpu *rda;          /* pointer of percu rcu_data. */
        void (*call)(struct rcu_head *head,     /* call_rcu() flavor. */
                     void (*func)(struct rcu_head *head));
+#ifdef CONFIG_RCU_NOCB_CPU
+       void (*call_remote)(struct rcu_head *head,
+                    void (*func)(struct rcu_head *head));
+                                               /* call_rcu() flavor, but for */
+                                               /*  placing on remote CPU. */
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
        /* The following fields are guarded by the root rcu_node's lock. */
 
@@ -428,6 +446,8 @@ struct rcu_state {
 #define RCU_GP_FLAG_FQS  0x2   /* Need grace-period quiescent-state forcing. */
 
 extern struct list_head rcu_struct_flavors;
+
+/* Sequence through rcu_state structures for each RCU flavor. */
 #define for_each_rcu_flavor(rsp) \
        list_for_each_entry((rsp), &rcu_struct_flavors, flavors)
 
@@ -511,5 +531,32 @@ static void print_cpu_stall_info(struct rcu_state *rsp, 
int cpu);
 static void print_cpu_stall_info_end(void);
 static void zero_cpu_stall_ticks(struct rcu_data *rdp);
 static void increment_cpu_stall_ticks(void);
+static bool is_nocb_cpu(int cpu);
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+                           bool lazy);
+static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+                                     struct rcu_data *rdp);
+static bool nocb_cpu_expendable(int cpu);
+static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
+static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp);
+static void init_nocb_callback_list(struct rcu_data *rdp);
+static void __init rcu_init_nocb(void);
 
 #endif /* #ifndef RCU_TREE_NONCORE */
+
+#ifdef CONFIG_RCU_TRACE
+#ifdef CONFIG_RCU_NOCB_CPU
+/* Sum up queue lengths for tracing. */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long 
*qll)
+{
+       *ql = atomic_read(&rdp->nocb_q_count) + rdp->nocb_p_count;
+       *qll = atomic_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy;
+}
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long 
*qll)
+{
+       *ql = 0;
+       *qll = 0;
+}
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
+#endif /* #ifdef CONFIG_RCU_TRACE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index 5384eda..2341ddd 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -35,6 +35,13 @@
 #define RCU_BOOST_PRIO RCU_KTHREAD_PRIO
 #endif
 
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool have_rcu_nocb_mask;            /* Was rcu_nocb_mask allocated? */
+static bool rcu_nocb_poll;         /* Offload kthread are to poll. */
+module_param(rcu_nocb_poll, bool, 0444);
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
 /*
  * Check the RCU kernel configuration parameters and print informative
  * messages about anything out of the ordinary.  If you like #ifdef, you
@@ -75,6 +82,20 @@ static void __init rcu_bootup_announce_oddness(void)
                printk(KERN_INFO "\tExperimental boot-time adjustment of leaf 
fanout to %d.\n", rcu_fanout_leaf);
        if (nr_cpu_ids != NR_CPUS)
                printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to 
nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids);
+#ifdef CONFIG_RCU_NOCB_CPU
+       if (have_rcu_nocb_mask) {
+               char buf[NR_CPUS * 5];
+
+               if (cpumask_test_cpu(0, rcu_nocb_mask)) {
+                       cpumask_clear_cpu(0, rcu_nocb_mask);
+                       pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n");
+               }
+               cpulist_scnprintf(buf, sizeof(buf), rcu_nocb_mask);
+               pr_info("\tExperimental no-CBs CPUs: %s.\n", buf);
+               if (rcu_nocb_poll)
+                       pr_info("\tExperimental polled no-CBs CPUs.\n");
+       }
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 }
 
 #ifdef CONFIG_TREE_PREEMPT_RCU
@@ -641,7 +662,7 @@ static void rcu_preempt_do_callbacks(void)
  */
 void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_preempt_state, 0);
+       __call_rcu(head, func, &rcu_preempt_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -655,7 +676,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
 void kfree_call_rcu(struct rcu_head *head,
                    void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_preempt_state, 1);
+       __call_rcu(head, func, &rcu_preempt_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -1012,7 +1033,7 @@ static void rcu_preempt_check_callbacks(int cpu)
 void kfree_call_rcu(struct rcu_head *head,
                    void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_sched_state, 1);
+       __call_rcu(head, func, &rcu_sched_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -2347,3 +2368,373 @@ static void increment_cpu_stall_ticks(void)
 }
 
 #endif /* #else #ifdef CONFIG_RCU_CPU_STALL_INFO */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For each CPU in the set, there is a
+ * kthread created that pulls the callbacks from the corresponding CPU,
+ * waits for a grace period to elapse, and invokes the callbacks.
+ * The no-CBs CPUs do a wake_up() on their kthread when they insert
+ * a callback into any empty list, unless the rcu_nocb_poll boot parameter
+ * has been specified, in which case each kthread actively polls its
+ * CPU.  (Which isn't so great for energy efficiency, but which does
+ * reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callback processing could also in theory be used as
+ * an energy-efficiency measure because CPUs with no RCU callbacks
+ * queued are more aggressive about entering dyntick-idle mode.
+ */
+
+
+/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
+static int __init rcu_nocb_setup(char *str)
+{
+       alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+       have_rcu_nocb_mask = true;
+       cpulist_parse(str, rcu_nocb_mask);
+       return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+/* Is the specified CPU a no-CPUs CPU? */
+static bool is_nocb_cpu(int cpu)
+{
+       if (have_rcu_nocb_mask)
+               return cpumask_test_cpu(cpu, rcu_nocb_mask);
+       return false;
+}
+
+/*
+ * Enqueue the specified string of rcu_head structures onto the specified
+ * CPU's no-CBs lists.  The CPU is specified by rdp, the head of the
+ * string by rhp, and the tail of the string by rhtp.  The non-lazy/lazy
+ * counts are supplied by rhcount and rhcount_lazy.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
+                                   struct rcu_head *rhp,
+                                   struct rcu_head **rhtp,
+                                   int rhcount, int rhcount_lazy)
+{
+       int len;
+       struct rcu_head **old_rhpp;
+       struct task_struct *t;
+
+       /* Enqueue the callback on the nocb list and update counts. */
+       old_rhpp = xchg(&rdp->nocb_tail, rhtp);
+       ACCESS_ONCE(*old_rhpp) = rhp;
+       atomic_add(rhcount, &rdp->nocb_q_count);
+       atomic_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
+
+       /* If we are not being polled and there is a kthread, awaken it ... */
+       t = ACCESS_ONCE(rdp->nocb_kthread);
+       if (rcu_nocb_poll | !t)
+               return;
+       len = atomic_read(&rdp->nocb_q_count);
+       if (old_rhpp == &rdp->nocb_head) {
+               wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */
+               rdp->qlen_last_fqs_check = 0;
+       } else if (len > rdp->qlen_last_fqs_check + qhimark) {
+               wake_up_process(t); /* ... or if many callbacks queued. */
+               rdp->qlen_last_fqs_check = LONG_MAX / 2;
+       }
+       return;
+}
+
+/*
+ * This is a helper for __call_rcu(), which invokes this when the normal
+ * callback queue is inoperable.  If this is not a no-CBs CPU, this
+ * function returns failure back to __call_rcu(), which can complain
+ * appropriately.
+ *
+ * Otherwise, this function queues the callback where the corresponding
+ * "rcuo" kthread can find it.
+ */
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+                           bool lazy)
+{
+
+       if (!is_nocb_cpu(rdp->cpu))
+               return 0;
+       __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy);
+       return 1;
+}
+
+/*
+ * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
+ * not a no-CBs CPU.
+ */
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+                                                    struct rcu_data *rdp)
+{
+       long ql = rsp->qlen;
+       long qll = rsp->qlen_lazy;
+
+       /* If this is not a no-CBs CPU, tell the caller to do it the old way. */
+       if (!is_nocb_cpu(smp_processor_id()))
+               return 0;
+       rsp->qlen = 0;
+       rsp->qlen_lazy = 0;
+
+       /* First, enqueue the donelist, if any.  This preserves CB ordering. */
+       if (rsp->orphan_donelist != NULL) {
+               __call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
+                                       rsp->orphan_donetail, ql, qll);
+               ql = qll = 0;
+               rsp->orphan_donelist = NULL;
+               rsp->orphan_donetail = &rsp->orphan_donelist;
+       }
+       if (rsp->orphan_nxtlist != NULL) {
+               __call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
+                                       rsp->orphan_nxttail, ql, qll);
+               ql = qll = 0;
+               rsp->orphan_nxtlist = NULL;
+               rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+       }
+       return 1;
+}
+
+/*
+ * There must be at least one non-no-CBs CPU in operation at any given
+ * time, because no-CBs CPUs are not capable of initiating grace periods
+ * independently.  This function therefore complains if the specified
+ * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to
+ * avoid offlining the last such CPU.  (Recursion is a wonderful thing,
+ * but you have to have a base case!)
+ */
+static bool nocb_cpu_expendable(int cpu)
+{
+       cpumask_var_t non_nocb_cpus;
+       int ret;
+
+       /*
+        * If there are no no-CB CPUs or if this CPU is not a no-CB CPU,
+        * then offlining this CPU is harmless.  Let it happen.
+        */
+       if (!have_rcu_nocb_mask || is_nocb_cpu(cpu))
+               return 1;
+
+       /* If no memory, play it safe and keep the CPU around. */
+       if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO))
+               return 0;
+       cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask);
+       cpumask_clear_cpu(cpu, non_nocb_cpus);
+       ret = !cpumask_empty(non_nocb_cpus);
+       free_cpumask_var(non_nocb_cpus);
+       return ret;
+}
+
+/*
+ * Helper structure for remote registry of RCU callbacks.
+ * This is needed for when a no-CBs CPU needs to start a grace period.
+ * If it just invokes call_rcu(), the resulting callback will be queued,
+ * which can result in deadlock.
+ */
+struct rcu_head_remote {
+       struct rcu_head *rhp;
+       call_rcu_func_t *crf;
+       void (*func)(struct rcu_head *rhp);
+};
+
+/*
+ * Register a callback as specified by the rcu_head_remote struct.
+ * This function is intended to be invoked via smp_call_function_single().
+ */
+static void call_rcu_local(void *arg)
+{
+       struct rcu_head_remote *rhrp =
+               container_of(arg, struct rcu_head_remote, rhp);
+
+       rhrp->crf(rhrp->rhp, rhrp->func);
+}
+
+/*
+ * Set up an rcu_head_remote structure and the invoke call_rcu_local()
+ * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via
+ * smp_call_function_single().
+ */
+static void invoke_crf_remote(struct rcu_head *rhp,
+                             void (*func)(struct rcu_head *rhp),
+                             call_rcu_func_t crf)
+{
+       struct rcu_head_remote rhr;
+
+       rhr.rhp = rhp;
+       rhr.crf = crf;
+       rhr.func = func;
+       smp_call_function_single(0, call_rcu_local, &rhr, 1);
+}
+
+/*
+ * Helper functions to be passed to wait_rcu_gp(), each of which
+ * invokes invoke_crf_remote() to register a callback appropriately.
+ */
+static void __maybe_unused
+call_rcu_preempt_remote(struct rcu_head *rhp,
+                       void (*func)(struct rcu_head *rhp))
+{
+       invoke_crf_remote(rhp, func, call_rcu);
+}
+static void call_rcu_bh_remote(struct rcu_head *rhp,
+                              void (*func)(struct rcu_head *rhp))
+{
+       invoke_crf_remote(rhp, func, call_rcu_bh);
+}
+static void call_rcu_sched_remote(struct rcu_head *rhp,
+                                 void (*func)(struct rcu_head *rhp))
+{
+       invoke_crf_remote(rhp, func, call_rcu_sched);
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
+ * callbacks queued by the corresponding no-CBs CPU.
+ */
+static int rcu_nocb_kthread(void *arg)
+{
+       int c, cl;
+       struct rcu_head *list;
+       struct rcu_head *next;
+       struct rcu_head **tail;
+       struct rcu_data *rdp = arg;
+
+       /* Each pass through this loop invokes one batch of callbacks */
+       for (;;) {
+               /* If not polling, wait for next batch of callbacks. */
+               if (!rcu_nocb_poll)
+                       wait_event(rdp->nocb_wq, rdp->nocb_head);
+               list = ACCESS_ONCE(rdp->nocb_head);
+               if (!list) {
+                       schedule_timeout_interruptible(1);
+                       continue;
+               }
+
+               /*
+                * Extract queued callbacks, update counts, and wait
+                * for a grace period to elapse.
+                */
+               ACCESS_ONCE(rdp->nocb_head) = NULL;
+               tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
+               c = atomic_xchg(&rdp->nocb_q_count, 0);
+               cl = atomic_xchg(&rdp->nocb_q_count_lazy, 0);
+               ACCESS_ONCE(rdp->nocb_p_count) += c;
+               ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
+               wait_rcu_gp(rdp->rsp->call_remote);
+
+               /* Each pass through the following loop invokes a callback. */
+               trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
+               c = cl = 0;
+               while (list) {
+                       next = list->next;
+                       /* Wait for enqueuing to complete, if needed. */
+                       while (next == NULL && &list->next != tail) {
+                               schedule_timeout_interruptible(1);
+                               next = list->next;
+                       }
+                       debug_rcu_head_unqueue(list);
+                       local_bh_disable();
+                       if (__rcu_reclaim(rdp->rsp->name, list))
+                               cl++;
+                       c++;
+                       local_bh_enable();
+                       list = next;
+               }
+               trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
+               ACCESS_ONCE(rdp->nocb_p_count) -= c;
+               ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
+               rdp->n_cbs_invoked += c;
+       }
+       return 0;
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+       rdp->nocb_tail = &rdp->nocb_head;
+       init_waitqueue_head(&rdp->nocb_wq);
+}
+
+/* Create a kthread for each RCU flavor for each no-CBs CPU. */
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+       int cpu;
+       struct rcu_data *rdp;
+       struct task_struct *t;
+
+       if (rcu_nocb_mask == NULL)
+               return;
+       for_each_cpu(cpu, rcu_nocb_mask) {
+               rdp = per_cpu_ptr(rsp->rda, cpu);
+               t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);
+               BUG_ON(IS_ERR(t));
+               ACCESS_ONCE(rdp->nocb_kthread) = t;
+       }
+}
+
+/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+       if (rcu_nocb_mask == NULL ||
+           !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask))
+               return;
+       rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+}
+
+/* Initialize the ->call_remote fields in the rcu_state structures. */
+static void __init rcu_init_nocb(void)
+{
+#ifdef CONFIG_PREEMPT_RCU
+       rcu_preempt_state.call_remote = call_rcu_preempt_remote;
+#endif /* #ifdef CONFIG_PREEMPT_RCU */
+       rcu_bh_state.call_remote = call_rcu_bh_remote;
+       rcu_sched_state.call_remote = call_rcu_sched_remote;
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static bool is_nocb_cpu(int cpu)
+{
+       return false;
+}
+
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+                           bool lazy)
+{
+       return 0;
+}
+
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+                                                    struct rcu_data *rdp)
+{
+       return 0;
+}
+
+static bool nocb_cpu_expendable(int cpu)
+{
+       return 1;
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+}
+
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_init_nocb(void)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index 7340efd..4ce0f91 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -84,6 +84,8 @@ static char convert_kthread_status(unsigned int 
kthread_status)
 
 static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 {
+       long ql, qll;
+
        if (!rdp->beenonline)
                return;
        seq_printf(m, "%3d%cc=%lu g=%lu pq=%d qp=%d",
@@ -97,8 +99,11 @@ static void print_one_rcu_data(struct seq_file *m, struct 
rcu_data *rdp)
                   rdp->dynticks->dynticks_nmi_nesting,
                   rdp->dynticks_fqs);
        seq_printf(m, " of=%lu", rdp->offline_fqs);
+       rcu_nocb_q_lengths(rdp, &ql, &qll);
+       qll += rdp->qlen_lazy;
+       ql += rdp->qlen;
        seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c",
-                  rdp->qlen_lazy, rdp->qlen,
+                  qll, ql,
                   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
                        rdp->nxttail[RCU_NEXT_TAIL]],
                   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=
@@ -147,6 +152,8 @@ static const struct file_operations rcudata_fops = {
 
 static void print_one_rcu_data_csv(struct seq_file *m, struct rcu_data *rdp)
 {
+       long ql, qll;
+
        if (!rdp->beenonline)
                return;
        seq_printf(m, "%d,%s,%lu,%lu,%d,%d",
@@ -160,7 +167,10 @@ static void print_one_rcu_data_csv(struct seq_file *m, 
struct rcu_data *rdp)
                   rdp->dynticks->dynticks_nmi_nesting,
                   rdp->dynticks_fqs);
        seq_printf(m, ",%lu", rdp->offline_fqs);
-       seq_printf(m, ",%ld,%ld,\"%c%c%c%c\"", rdp->qlen_lazy, rdp->qlen,
+       rcu_nocb_q_lengths(rdp, &ql, &qll);
+       qll += rdp->qlen_lazy;
+       ql += rdp->qlen;
+       seq_printf(m, ",%ld,%ld,\"%c%c%c%c\"", qll, ql,
                   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
                        rdp->nxttail[RCU_NEXT_TAIL]],
                   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to