Hi,

We know from debugging tlb IPIs recently that it is possible for a
single IPI to be delivered more than once to a CPU on Intel SMP.  (The
most common mechanism seems to be a CPU detecting CRC failure of a
multicast IPI, and the sender resending the entire multicast so that
any CPU which did receive the first one correctly now sees the IPI
twice.)

However, smp_call_function() on i386 assumes delivery precisely once,
and uses a counter to count the number of CPUs which have either
received or finished processing the IPI.

The patch below replaces the counters with bitmaps, and statically
allocates the call_data_struct structures so that (a) acknowledgement
of the call_function start and end is reliable (ie. wait==1 should
work properly), and (b) a repeated IPI delivery doesn't risk
trying to increment the "finished" field in a call_data_struct which
is no longer on the stack.

--Stephen

--- arch/i386/kernel/smp.c.~1~  Tue Feb 13 22:13:43 2001
+++ arch/i386/kernel/smp.c      Mon Mar 26 20:24:15 2001
@@ -429,9 +429,10 @@
        atomic_t started;
        atomic_t finished;
        int wait;
-};
+} __attribute__ ((__aligned__(SMP_CACHE_BYTES)));
 
 static struct call_data_struct * call_data;
+static struct call_data_struct call_data_array[NR_CPUS];
 
 /*
  * this function sends a 'generic call function' IPI to all other CPUs
@@ -453,32 +454,45 @@
  * hardware interrupt handler, you may call it from a bottom half handler.
  */
 {
-       struct call_data_struct data;
-       int cpus = smp_num_cpus-1;
+       struct call_data_struct *data;
+       int cpus = (cpu_online_map & ~(1 << smp_processor_id()));
 
        if (!cpus)
                return 0;
 
-       data.func = func;
-       data.info = info;
-       atomic_set(&data.started, 0);
-       data.wait = wait;
+       data = &call_data_array[smp_processor_id()];
+       
+       data->func = func;
+       data->info = info;
+       data->wait = wait;
        if (wait)
-               atomic_set(&data.finished, 0);
-
-       spin_lock_bh(&call_lock);
-       call_data = &data;
+               atomic_set(&data->finished, 0);
+       /* We have do to this one last to make sure that the IPI service
+        * code desn't get confused if it gets an unexpected repeat
+        * trigger of an old IPI while we're still setting up the new
+        * one. */
+       atomic_set(&data->started, 0);
+
+       local_bh_disable();
+       spin_lock(&call_lock);
+       call_data = data;
        /* Send a message to all other CPUs and wait for them to respond */
        send_IPI_allbutself(CALL_FUNCTION_VECTOR);
 
        /* Wait for response */
-       while (atomic_read(&data.started) != cpus)
+       while (atomic_read(&data->started) != cpus)
                barrier();
 
+       /* It is now safe to reuse the "call_data" global, but we need
+        * to keep local bottom-halves disabled until after waiters have
+        * been acknowledged to prevent reuse of the per-cpu call data
+        * entry. */
+       spin_unlock(&call_lock);
+
        if (wait)
-               while (atomic_read(&data.finished) != cpus)
+               while (atomic_read(&data->finished) != cpus)
                        barrier();
-       spin_unlock_bh(&call_lock);
+       local_bh_enable();
 
        return 0;
 }
@@ -528,15 +542,17 @@
 
        ack_APIC_irq();
        /*
-        * Notify initiating CPU that I've grabbed the data and am
-        * about to execute the function
+        * Notify initiating CPU that I've grabbed the data and am about
+        * to execute the function (and avoid servicing any single IPI
+        * twice)
         */
-       atomic_inc(&call_data->started);
+       if (test_and_set_bit(smp_processor_id(), &call_data->started))
+               return;
        /*
         * At this point the info structure may be out of scope unless wait==1
         */
        (*func)(info);
        if (wait)
-               atomic_inc(&call_data->finished);
+               set_bit(smp_processor_id(), &call_data->finished);
 }
 

Reply via email to