oleg updated this revision to Diff 22728.
oleg added a comment.

  1. fix issues reported by sbahra_repnop.org here: 
https://reviews.freebsd.org/D1945
  2. fix synchronization chain (producer -> producer -> consumer) noted by alc.
  3. massive comments.

CHANGES SINCE LAST UPDATE
  https://reviews.freebsd.org/D8637?vs=22507&id=22728

REVISION DETAIL
  https://reviews.freebsd.org/D8637

AFFECTED FILES
  sys/sys/buf_ring.h

EMAIL PREFERENCES
  https://reviews.freebsd.org/settings/panel/emailpreferences/

To: oleg, kmacy, kib, alc
Cc: emaste, freebsd-net-list
diff --git a/sys/sys/buf_ring.h b/sys/sys/buf_ring.h
--- a/sys/sys/buf_ring.h
+++ b/sys/sys/buf_ring.h
@@ -75,35 +75,69 @@
 #endif	
 	critical_enter();
 	do {
-		prod_head = br->br_prod_head;
+		/*
+		 * load_acq(prod_head) preceding load(cons_tail) is
+		 * mandatory: cons_tail MUST be not 'older' than prod_head.
+		 * Otherwise following can happen:
+		 * a) initial state: cons_tail: 1 prod_head: 0 ring is full.
+		 * b) cpu0: reads cons_tail: 1
+		 * c) cpu1: runs consumer and sets cons_tail: 2
+		 * d) cpu1: runs producer and sets prod_head: 1
+		 *          ring is full again.
+		 * e) cpu0: reads prod_head: 1, 'ring is full' check will fail.
+		 * f) cpu0: cmpset(prod_head) will be successful.
+		 * g) ring is corrupted.
+		 */
+		prod_head = atomic_load_acq_32(&br->br_prod_head);
 		prod_next = (prod_head + 1) & br->br_prod_mask;
-		cons_tail = br->br_cons_tail;
-
-		if (prod_next == cons_tail) {
-			rmb();
-			if (prod_head == br->br_prod_head &&
-			    cons_tail == br->br_cons_tail) {
-				br->br_drops++;
-				critical_exit();
-				return (ENOBUFS);
-			}
-			continue;
+		/*
+		 * load_acq(cons_tail) is required: thus we synchronize with
+		 * consumers.
+		 */
+		cons_tail = atomic_load_acq_32(&br->br_cons_tail);
+
+		if (prod_next == cons_tail) {	/* ring is full */
+			/*
+			 * prod_head can be 'older' than cons_tail (see above),
+			 * so this can be false positive 'ring is full' error.
+			 * check it.
+			 */
+			if (prod_head != atomic_load_acq_32(&br->br_prod_head))
+				continue;
+			br->br_drops++;
+			critical_exit();
+			return (ENOBUFS);
 		}
-	} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+	} while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next));
 #ifdef DEBUG_BUFRING
 	if (br->br_ring[prod_head] != NULL)
 		panic("dangling value in enqueue");
 #endif	
 	br->br_ring[prod_head] = buf;
-
 	/*
-	 * If there are other enqueues in progress
-	 * that preceded us, we need to wait for them
-	 * to complete 
+	 * If there are other enqueues in progress that preceded us, we need
+	 * to wait for them to complete.
+	 * load_acq(prod_tail) is required for proper synchronization chain
+	 * among consumer and multiple producers.
+	 * Consider following scenario:
+	 * a) initial state: prod_head,prod_tail,cons_head,cons_tail are 0.
+	 *    ring is empty.
+	 * b) cpu0: runs producer, grabs prod_head: 0 and sets prod_head: 1
+	 * c) cpu1: runs producer, grabs prod_head: 1 and sets prod_head: 2
+	 * d) cpu1: stores data to ring[1] and waits for prod_tail: 1
+	 * e) cpu0: stores data to ring[0], sets prod_tail: 1
+	 * f) cpu1: sees prod_tail: 1. If load(prod_tail) was done without _acq
+	 *    barrier there is no guarantee that store to ring[0] is visible.
+	 * g) cpu1: executes store_rel(prod_tail: 2)
+	 * h) cpu2: runs consumer, executes load_acq(prod_tail) and sees
+	 *    prod_tail: 2. acq/rel semantic in consumer/producer guarantees
+	 *    consistent read from ring[1] and _does not guarantee_ consistent
+	 *    read from ring[0] (if we had unordered load(prod_tail) in f).
+	 * i) cpu2: reads junk from ring[0]
 	 */   
-	while (br->br_prod_tail != prod_head)
+	while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)
 		cpu_spinwait();
-	atomic_store_rel_int(&br->br_prod_tail, prod_next);
+	atomic_store_rel_32(&br->br_prod_tail, prod_next);
 	critical_exit();
 	return (0);
 }
@@ -115,19 +149,45 @@
 static __inline void *
 buf_ring_dequeue_mc(struct buf_ring *br)
 {
-	uint32_t cons_head, cons_next;
+	uint32_t cons_head, cons_next, prod_tail;
 	void *buf;
 
 	critical_enter();
 	do {
-		cons_head = br->br_cons_head;
-		cons_next = (cons_head + 1) & br->br_cons_mask;
-
-		if (cons_head == br->br_prod_tail) {
+		/*
+		 * load_acq(cons_head) preceding load(prod_tail) is
+		 * mandatory: prod_tail MUST be not 'older' than cons_head.
+		 * Otherwise following can happen:
+		 * a) initial state: prod_tail: 0 cons_head: 0 ring is empty.
+		 * b) cpu0: reads prod_tail: 0
+		 * c) cpu1: runs producer and sets prod_tail: 1
+		 * d) cpu1: runs consumer and sets cons_head: 1
+		 *          ring is empty again.
+		 * e) cpu0: reads cons_head: 1, 'ring is empty' check will fail.
+		 * f) cpu0: cmpset(cons_head) will be successful.
+		 * g) cpu0: reads junk from empty ring
+		 */
+		cons_head = atomic_load_acq_32(&br->br_cons_head);
+		/*
+		 * load_acq(prod_tail) is required: thus we synchronize with
+		 * producers and later br->br_ring[cons_head] load will be
+		 * consistent.
+		 */
+		prod_tail = atomic_load_acq_32(&br->br_prod_tail);
+
+		if (cons_head == prod_tail) {	/* ring is empty */
+			/*
+			 * Since cons_head can be 'older' than prod_tail this
+			 * can be false positive 'ring is empty'.
+			 * We can check it here or just wait for next
+			 * buf_ring_dequeue_mc() call.
+			 */
 			critical_exit();
 			return (NULL);
 		}
-	} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+
+		cons_next = (cons_head + 1) & br->br_cons_mask;
+	} while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next));
 
 	buf = br->br_ring[cons_head];
 #ifdef DEBUG_BUFRING
@@ -140,10 +200,8 @@
 	 */   
 	while (br->br_cons_tail != cons_head)
 		cpu_spinwait();
-
-	atomic_store_rel_int(&br->br_cons_tail, cons_next);
+	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 	critical_exit();
-
 	return (buf);
 }
 
@@ -155,62 +213,41 @@
 static __inline void *
 buf_ring_dequeue_sc(struct buf_ring *br)
 {
-	uint32_t cons_head, cons_next;
+	uint32_t cons_head, cons_next, prod_tail;
 #ifdef PREFETCH_DEFINED
 	uint32_t cons_next_next;
 #endif
-	uint32_t prod_tail;
 	void *buf;
 
 	/*
-	 * This is a workaround to allow using buf_ring on ARM and ARM64.
-	 * ARM64TODO: Fix buf_ring in a generic way.
-	 * REMARKS: It is suspected that br_cons_head does not require
-	 *   load_acq operation, but this change was extensively tested
-	 *   and confirmed it's working. To be reviewed once again in
-	 *   FreeBSD-12.
-	 *
-	 * Preventing following situation:
-
-	 * Core(0) - buf_ring_enqueue()                                       Core(1) - buf_ring_dequeue_sc()
-	 * -----------------------------------------                                       ----------------------------------------------
-	 *
-	 *                                                                                cons_head = br->br_cons_head;
-	 * atomic_cmpset_acq_32(&br->br_prod_head, ...));
-	 *                                                                                buf = br->br_ring[cons_head];     <see <1>>
-	 * br->br_ring[prod_head] = buf;
-	 * atomic_store_rel_32(&br->br_prod_tail, ...);
-	 *                                                                                prod_tail = br->br_prod_tail;
-	 *                                                                                if (cons_head == prod_tail) 
-	 *                                                                                        return (NULL);
-	 *                                                                                <condition is false and code uses invalid(old) buf>`	
-	 *
-	 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
-	 */	
-#if defined(__arm__) || defined(__aarch64__)
-	cons_head = atomic_load_acq_32(&br->br_cons_head);
-#else
-	cons_head = br->br_cons_head;
-#endif
+	 * buf_ring_dequeue_mc() fatal scenario (see above) is not possible
+	 * for single consumer. So we read prod_tail first in order to
+	 * avoid false positive 'ring is empty' check.
+	 * load_acq(prod_tail) is necessary for later br->br_ring[cons_head]
+	 * load.
+	 */
 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
-	
-	cons_next = (cons_head + 1) & br->br_cons_mask;
-#ifdef PREFETCH_DEFINED
-	cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
+	cons_head = br->br_cons_head;
 	
 	if (cons_head == prod_tail) 
 		return (NULL);
 
-#ifdef PREFETCH_DEFINED	
+	cons_next = (cons_head + 1) & br->br_cons_mask;
+#ifdef PREFETCH_DEFINED
 	if (cons_next != prod_tail) {		
 		prefetch(br->br_ring[cons_next]);
+		cons_next_next = (cons_head + 2) & br->br_cons_mask;
 		if (cons_next_next != prod_tail) 
 			prefetch(br->br_ring[cons_next_next]);
 	}
 #endif
-	br->br_cons_head = cons_next;
+	/*
+	 * The order and atomicity of next two operations are unimportant:
+	 * store(cons_head) is protected by lock (single consumer case),
+	 * load(ring[cons_head]) is ordered by later store_rel(cons_tail).
+	 */
 	buf = br->br_ring[cons_head];
+	br->br_cons_head = cons_next;
 
 #ifdef DEBUG_BUFRING
 	br->br_ring[cons_head] = NULL;
@@ -220,7 +257,7 @@
 		panic("inconsistent list cons_tail=%d cons_head=%d",
 		    br->br_cons_tail, cons_head);
 #endif
-	br->br_cons_tail = cons_next;
+	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 	return (buf);
 }
 
@@ -233,19 +270,25 @@
 buf_ring_advance_sc(struct buf_ring *br)
 {
 	uint32_t cons_head, cons_next;
-	uint32_t prod_tail;
 	
+	/*
+	 * atomic_load_acq_32(&br->br_prod_tail) is superfluous here.
+	 * (actually it was done in preceding buf_ring_peek)
+	 */
 	cons_head = br->br_cons_head;
-	prod_tail = br->br_prod_tail;
-	
-	cons_next = (cons_head + 1) & br->br_cons_mask;
-	if (cons_head == prod_tail) 
+	if (cons_head == br->br_prod_tail) {
+#ifdef DEBUG_BUFRING
+		panic("advance on empty ring. no previous peek?");
+#endif
 		return;
+	}
+
+	cons_next = (cons_head + 1) & br->br_cons_mask;
 	br->br_cons_head = cons_next;
 #ifdef DEBUG_BUFRING
 	br->br_ring[cons_head] = NULL;
 #endif
-	br->br_cons_tail = cons_next;
+	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 }
 
 /*
@@ -280,48 +323,46 @@
 static __inline void *
 buf_ring_peek(struct buf_ring *br)
 {
+	uint32_t cons_head, prod_tail;
 
 #ifdef DEBUG_BUFRING
 	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
 		panic("lock not held on single consumer dequeue");
 #endif	
-	/*
-	 * I believe it is safe to not have a memory barrier
-	 * here because we control cons and tail is worst case
-	 * a lagging indicator so we worst case we might
-	 * return NULL immediately after a buffer has been enqueued
-	 */
-	if (br->br_cons_head == br->br_prod_tail)
+	/* see buf_ring_dequeue_sc() comment */
+	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
+	cons_head = br->br_cons_head;
+
+	if (cons_head == prod_tail)
 		return (NULL);
 	
-	return (br->br_ring[br->br_cons_head]);
+	return (br->br_ring[cons_head]);
 }
 
 static __inline void *
 buf_ring_peek_clear_sc(struct buf_ring *br)
 {
+	uint32_t cons_head, prod_tail;
+
 #ifdef DEBUG_BUFRING
 	void *ret;
 
 	if (!mtx_owned(br->br_lock))
 		panic("lock not held on single consumer dequeue");
 #endif	
-	/*
-	 * I believe it is safe to not have a memory barrier
-	 * here because we control cons and tail is worst case
-	 * a lagging indicator so we worst case we might
-	 * return NULL immediately after a buffer has been enqueued
-	 */
-	if (br->br_cons_head == br->br_prod_tail)
-		return (NULL);
+	/* see buf_ring_dequeue_sc() comment */
+	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
+	cons_head = br->br_cons_head;
 
+	if (cons_head == prod_tail)
+		return (NULL);
 #ifdef DEBUG_BUFRING
 	/*
 	 * Single consumer, i.e. cons_head will not move while we are
 	 * running, so atomic_swap_ptr() is not necessary here.
 	 */
-	ret = br->br_ring[br->br_cons_head];
-	br->br_ring[br->br_cons_head] = NULL;
+	ret = br->br_ring[cons_head];
+	br->br_ring[cons_head] = NULL;
 	return (ret);
 #else
 	return (br->br_ring[br->br_cons_head]);
@@ -354,6 +395,4 @@
     struct mtx *);
 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
 
-
-
 #endif

_______________________________________________
freebsd-net@freebsd.org mailing list
https://lists.freebsd.org/mailman/listinfo/freebsd-net
To unsubscribe, send any mail to "freebsd-net-unsubscr...@freebsd.org"

Reply via email to