oleg created this revision. oleg added reviewers: kmacy, alc, kib. oleg added a subscriber: freebsd-net-list.
REVISION SUMMARY Properly use acqure/release atomics. REVISION DETAIL https://reviews.freebsd.org/D8637 AFFECTED FILES sys/sys/buf_ring.h EMAIL PREFERENCES https://reviews.freebsd.org/settings/panel/emailpreferences/ To: oleg, kmacy, alc, kib Cc: freebsd-net-list
diff --git a/sys/sys/buf_ring.h b/sys/sys/buf_ring.h --- a/sys/sys/buf_ring.h +++ b/sys/sys/buf_ring.h @@ -75,35 +75,29 @@ #endif critical_enter(); do { + cons_tail = atomic_load_acq_32(&br->br_cons_tail); prod_head = br->br_prod_head; prod_next = (prod_head + 1) & br->br_prod_mask; - cons_tail = br->br_cons_tail; if (prod_next == cons_tail) { - rmb(); - if (prod_head == br->br_prod_head && - cons_tail == br->br_cons_tail) { - br->br_drops++; - critical_exit(); - return (ENOBUFS); - } - continue; + br->br_drops++; + critical_exit(); + return (ENOBUFS); } - } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); + } while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); #endif br->br_ring[prod_head] = buf; - /* * If there are other enqueues in progress * that preceded us, we need to wait for them * to complete */ while (br->br_prod_tail != prod_head) cpu_spinwait(); - atomic_store_rel_int(&br->br_prod_tail, prod_next); + atomic_store_rel_32(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -115,19 +109,21 @@ static __inline void * buf_ring_dequeue_mc(struct buf_ring *br) { - uint32_t cons_head, cons_next; + uint32_t cons_head, cons_next, prod_tail; void *buf; critical_enter(); do { + prod_tail = atomic_load_acq_32(&br->br_prod_tail); cons_head = br->br_cons_head; - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == br->br_prod_tail) { + if (cons_head == prod_tail) { critical_exit(); return (NULL); } - } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); + + cons_next = (cons_head + 1) & br->br_cons_mask; + } while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next)); buf = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING @@ -140,10 +136,8 @@ */ while (br->br_cons_tail != cons_head) cpu_spinwait(); - - atomic_store_rel_int(&br->br_cons_tail, cons_next); + atomic_store_rel_32(&br->br_cons_tail, cons_next); critical_exit(); - return (buf); } @@ -155,62 +149,29 @@ static __inline void * buf_ring_dequeue_sc(struct buf_ring *br) { - uint32_t cons_head, cons_next; + uint32_t cons_head, cons_next, prod_tail; #ifdef PREFETCH_DEFINED uint32_t cons_next_next; #endif - uint32_t prod_tail; void *buf; - /* - * This is a workaround to allow using buf_ring on ARM and ARM64. - * ARM64TODO: Fix buf_ring in a generic way. - * REMARKS: It is suspected that br_cons_head does not require - * load_acq operation, but this change was extensively tested - * and confirmed it's working. To be reviewed once again in - * FreeBSD-12. - * - * Preventing following situation: - - * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() - * ----------------------------------------- ---------------------------------------------- - * - * cons_head = br->br_cons_head; - * atomic_cmpset_acq_32(&br->br_prod_head, ...)); - * buf = br->br_ring[cons_head]; <see <1>> - * br->br_ring[prod_head] = buf; - * atomic_store_rel_32(&br->br_prod_tail, ...); - * prod_tail = br->br_prod_tail; - * if (cons_head == prod_tail) - * return (NULL); - * <condition is false and code uses invalid(old) buf>` - * - * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. - */ -#if defined(__arm__) || defined(__aarch64__) - cons_head = atomic_load_acq_32(&br->br_cons_head); -#else - cons_head = br->br_cons_head; -#endif prod_tail = atomic_load_acq_32(&br->br_prod_tail); - - cons_next = (cons_head + 1) & br->br_cons_mask; -#ifdef PREFETCH_DEFINED - cons_next_next = (cons_head + 2) & br->br_cons_mask; -#endif + cons_head = br->br_cons_head; if (cons_head == prod_tail) return (NULL); -#ifdef PREFETCH_DEFINED + cons_next = (cons_head + 1) & br->br_cons_mask; +#ifdef PREFETCH_DEFINED if (cons_next != prod_tail) { prefetch(br->br_ring[cons_next]); + cons_next_next = (cons_head + 2) & br->br_cons_mask; if (cons_next_next != prod_tail) prefetch(br->br_ring[cons_next_next]); } #endif - br->br_cons_head = cons_next; buf = br->br_ring[cons_head]; + br->br_cons_head = cons_next; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; @@ -220,7 +181,7 @@ panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); #endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); return (buf); } @@ -233,19 +194,25 @@ buf_ring_advance_sc(struct buf_ring *br) { uint32_t cons_head, cons_next; - uint32_t prod_tail; + /* + * atomic_load_acq_32(&br->br_prod_tail) is superfluous here. + * (actually it was done in preceeding buf_ring_peek) + */ cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; - - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == prod_tail) + if (cons_head == br->br_prod_tail) { +#ifdef DEBUG_BUFRING + panic("advance on empty ring. no previous peek?"); +#endif return; + } + + cons_next = (cons_head + 1) & br->br_cons_mask; br->br_cons_head = cons_next; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; #endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); } /* @@ -280,48 +247,44 @@ static __inline void * buf_ring_peek(struct buf_ring *br) { + uint32_t cons_head, prod_tail; #ifdef DEBUG_BUFRING if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + cons_head = br->br_cons_head; + + if (cons_head == prod_tail) return (NULL); - return (br->br_ring[br->br_cons_head]); + return (br->br_ring[cons_head]); } static __inline void * buf_ring_peek_clear_sc(struct buf_ring *br) { + uint32_t cons_head, prod_tail; + #ifdef DEBUG_BUFRING void *ret; if (!mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) - return (NULL); + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + cons_head = br->br_cons_head; + if (cons_head == prod_tail) + return (NULL); #ifdef DEBUG_BUFRING /* * Single consumer, i.e. cons_head will not move while we are * running, so atomic_swap_ptr() is not necessary here. */ - ret = br->br_ring[br->br_cons_head]; - br->br_ring[br->br_cons_head] = NULL; + ret = br->br_ring[cons_head]; + br->br_ring[cons_head] = NULL; return (ret); #else return (br->br_ring[br->br_cons_head]); @@ -354,6 +317,4 @@ struct mtx *); void buf_ring_free(struct buf_ring *br, struct malloc_type *type); - - #endif
_______________________________________________ freebsd-net@freebsd.org mailing list https://lists.freebsd.org/mailman/listinfo/freebsd-net To unsubscribe, send any mail to "freebsd-net-unsubscr...@freebsd.org"