oleg created this revision.
oleg added reviewers: kmacy, alc, kib.
oleg added a subscriber: freebsd-net-list.

REVISION SUMMARY
  Properly use acqure/release atomics.

REVISION DETAIL
  https://reviews.freebsd.org/D8637

AFFECTED FILES
  sys/sys/buf_ring.h

EMAIL PREFERENCES
  https://reviews.freebsd.org/settings/panel/emailpreferences/

To: oleg, kmacy, alc, kib
Cc: freebsd-net-list
diff --git a/sys/sys/buf_ring.h b/sys/sys/buf_ring.h
--- a/sys/sys/buf_ring.h
+++ b/sys/sys/buf_ring.h
@@ -75,35 +75,29 @@
 #endif	
 	critical_enter();
 	do {
+		cons_tail = atomic_load_acq_32(&br->br_cons_tail);
 		prod_head = br->br_prod_head;
 		prod_next = (prod_head + 1) & br->br_prod_mask;
-		cons_tail = br->br_cons_tail;
 
 		if (prod_next == cons_tail) {
-			rmb();
-			if (prod_head == br->br_prod_head &&
-			    cons_tail == br->br_cons_tail) {
-				br->br_drops++;
-				critical_exit();
-				return (ENOBUFS);
-			}
-			continue;
+			br->br_drops++;
+			critical_exit();
+			return (ENOBUFS);
 		}
-	} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+	} while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next));
 #ifdef DEBUG_BUFRING
 	if (br->br_ring[prod_head] != NULL)
 		panic("dangling value in enqueue");
 #endif	
 	br->br_ring[prod_head] = buf;
-
 	/*
 	 * If there are other enqueues in progress
 	 * that preceded us, we need to wait for them
 	 * to complete 
 	 */   
 	while (br->br_prod_tail != prod_head)
 		cpu_spinwait();
-	atomic_store_rel_int(&br->br_prod_tail, prod_next);
+	atomic_store_rel_32(&br->br_prod_tail, prod_next);
 	critical_exit();
 	return (0);
 }
@@ -115,19 +109,21 @@
 static __inline void *
 buf_ring_dequeue_mc(struct buf_ring *br)
 {
-	uint32_t cons_head, cons_next;
+	uint32_t cons_head, cons_next, prod_tail;
 	void *buf;
 
 	critical_enter();
 	do {
+		prod_tail = atomic_load_acq_32(&br->br_prod_tail);
 		cons_head = br->br_cons_head;
-		cons_next = (cons_head + 1) & br->br_cons_mask;
 
-		if (cons_head == br->br_prod_tail) {
+		if (cons_head == prod_tail) {
 			critical_exit();
 			return (NULL);
 		}
-	} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+
+		cons_next = (cons_head + 1) & br->br_cons_mask;
+	} while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next));
 
 	buf = br->br_ring[cons_head];
 #ifdef DEBUG_BUFRING
@@ -140,10 +136,8 @@
 	 */   
 	while (br->br_cons_tail != cons_head)
 		cpu_spinwait();
-
-	atomic_store_rel_int(&br->br_cons_tail, cons_next);
+	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 	critical_exit();
-
 	return (buf);
 }
 
@@ -155,62 +149,29 @@
 static __inline void *
 buf_ring_dequeue_sc(struct buf_ring *br)
 {
-	uint32_t cons_head, cons_next;
+	uint32_t cons_head, cons_next, prod_tail;
 #ifdef PREFETCH_DEFINED
 	uint32_t cons_next_next;
 #endif
-	uint32_t prod_tail;
 	void *buf;
 
-	/*
-	 * This is a workaround to allow using buf_ring on ARM and ARM64.
-	 * ARM64TODO: Fix buf_ring in a generic way.
-	 * REMARKS: It is suspected that br_cons_head does not require
-	 *   load_acq operation, but this change was extensively tested
-	 *   and confirmed it's working. To be reviewed once again in
-	 *   FreeBSD-12.
-	 *
-	 * Preventing following situation:
-
-	 * Core(0) - buf_ring_enqueue()                                       Core(1) - buf_ring_dequeue_sc()
-	 * -----------------------------------------                                       ----------------------------------------------
-	 *
-	 *                                                                                cons_head = br->br_cons_head;
-	 * atomic_cmpset_acq_32(&br->br_prod_head, ...));
-	 *                                                                                buf = br->br_ring[cons_head];     <see <1>>
-	 * br->br_ring[prod_head] = buf;
-	 * atomic_store_rel_32(&br->br_prod_tail, ...);
-	 *                                                                                prod_tail = br->br_prod_tail;
-	 *                                                                                if (cons_head == prod_tail) 
-	 *                                                                                        return (NULL);
-	 *                                                                                <condition is false and code uses invalid(old) buf>`	
-	 *
-	 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
-	 */	
-#if defined(__arm__) || defined(__aarch64__)
-	cons_head = atomic_load_acq_32(&br->br_cons_head);
-#else
-	cons_head = br->br_cons_head;
-#endif
 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
-	
-	cons_next = (cons_head + 1) & br->br_cons_mask;
-#ifdef PREFETCH_DEFINED
-	cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
+	cons_head = br->br_cons_head;
 	
 	if (cons_head == prod_tail) 
 		return (NULL);
 
-#ifdef PREFETCH_DEFINED	
+	cons_next = (cons_head + 1) & br->br_cons_mask;
+#ifdef PREFETCH_DEFINED
 	if (cons_next != prod_tail) {		
 		prefetch(br->br_ring[cons_next]);
+		cons_next_next = (cons_head + 2) & br->br_cons_mask;
 		if (cons_next_next != prod_tail) 
 			prefetch(br->br_ring[cons_next_next]);
 	}
 #endif
-	br->br_cons_head = cons_next;
 	buf = br->br_ring[cons_head];
+	br->br_cons_head = cons_next;
 
 #ifdef DEBUG_BUFRING
 	br->br_ring[cons_head] = NULL;
@@ -220,7 +181,7 @@
 		panic("inconsistent list cons_tail=%d cons_head=%d",
 		    br->br_cons_tail, cons_head);
 #endif
-	br->br_cons_tail = cons_next;
+	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 	return (buf);
 }
 
@@ -233,19 +194,25 @@
 buf_ring_advance_sc(struct buf_ring *br)
 {
 	uint32_t cons_head, cons_next;
-	uint32_t prod_tail;
 	
+	/*
+	 * atomic_load_acq_32(&br->br_prod_tail) is superfluous here.
+	 * (actually it was done in preceeding buf_ring_peek)
+	 */
 	cons_head = br->br_cons_head;
-	prod_tail = br->br_prod_tail;
-	
-	cons_next = (cons_head + 1) & br->br_cons_mask;
-	if (cons_head == prod_tail) 
+	if (cons_head == br->br_prod_tail) {
+#ifdef DEBUG_BUFRING
+		panic("advance on empty ring. no previous peek?");
+#endif
 		return;
+	}
+
+	cons_next = (cons_head + 1) & br->br_cons_mask;
 	br->br_cons_head = cons_next;
 #ifdef DEBUG_BUFRING
 	br->br_ring[cons_head] = NULL;
 #endif
-	br->br_cons_tail = cons_next;
+	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 }
 
 /*
@@ -280,48 +247,44 @@
 static __inline void *
 buf_ring_peek(struct buf_ring *br)
 {
+	uint32_t cons_head, prod_tail;
 
 #ifdef DEBUG_BUFRING
 	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
 		panic("lock not held on single consumer dequeue");
 #endif	
-	/*
-	 * I believe it is safe to not have a memory barrier
-	 * here because we control cons and tail is worst case
-	 * a lagging indicator so we worst case we might
-	 * return NULL immediately after a buffer has been enqueued
-	 */
-	if (br->br_cons_head == br->br_prod_tail)
+	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
+	cons_head = br->br_cons_head;
+
+	if (cons_head == prod_tail)
 		return (NULL);
 	
-	return (br->br_ring[br->br_cons_head]);
+	return (br->br_ring[cons_head]);
 }
 
 static __inline void *
 buf_ring_peek_clear_sc(struct buf_ring *br)
 {
+	uint32_t cons_head, prod_tail;
+
 #ifdef DEBUG_BUFRING
 	void *ret;
 
 	if (!mtx_owned(br->br_lock))
 		panic("lock not held on single consumer dequeue");
 #endif	
-	/*
-	 * I believe it is safe to not have a memory barrier
-	 * here because we control cons and tail is worst case
-	 * a lagging indicator so we worst case we might
-	 * return NULL immediately after a buffer has been enqueued
-	 */
-	if (br->br_cons_head == br->br_prod_tail)
-		return (NULL);
+	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
+	cons_head = br->br_cons_head;
 
+	if (cons_head == prod_tail)
+		return (NULL);
 #ifdef DEBUG_BUFRING
 	/*
 	 * Single consumer, i.e. cons_head will not move while we are
 	 * running, so atomic_swap_ptr() is not necessary here.
 	 */
-	ret = br->br_ring[br->br_cons_head];
-	br->br_ring[br->br_cons_head] = NULL;
+	ret = br->br_ring[cons_head];
+	br->br_ring[cons_head] = NULL;
 	return (ret);
 #else
 	return (br->br_ring[br->br_cons_head]);
@@ -354,6 +317,4 @@
     struct mtx *);
 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
 
-
-
 #endif

_______________________________________________
freebsd-net@freebsd.org mailing list
https://lists.freebsd.org/mailman/listinfo/freebsd-net
To unsubscribe, send any mail to "freebsd-net-unsubscr...@freebsd.org"

Reply via email to