<snip>

> 
> > > On Wed, Jan 15, 2020 at 11:25:07PM -0600, Honnappa Nagarahalli wrote:
> > > > Current APIs assume ring elements to be pointers. However, in many
> > > > use cases, the size can be different. Add new APIs to support
> > > > configurable ring element sizes.
> > > >
> > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> > > > Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com>
> > > > Reviewed-by: Gavin Hu <gavin...@arm.com>
> > > > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
> > > > ---
> > > >  lib/librte_ring/Makefile             |    3 +-
> > > >  lib/librte_ring/meson.build          |    4 +
> > > >  lib/librte_ring/rte_ring.c           |   41 +-
> > > >  lib/librte_ring/rte_ring.h           |    1 +
> > > >  lib/librte_ring/rte_ring_elem.h      | 1003
> ++++++++++++++++++++++++++
> > > >  lib/librte_ring/rte_ring_version.map |    2 +
> > > >  6 files changed, 1045 insertions(+), 9 deletions(-)  create mode
> > > > 100644 lib/librte_ring/rte_ring_elem.h
> > > >
> > >
> > > [...]
> > >
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx,
> > > > +               const void *obj_table, uint32_t n) {
> > > > +       unsigned int i;
> > > > +       uint32_t *ring = (uint32_t *)&r[1];
> > > > +       const uint32_t *obj = (const uint32_t *)obj_table;
> > > > +       if (likely(idx + n < size)) {
> > > > +               for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
> > > > +                       ring[idx] = obj[i];
> > > > +                       ring[idx + 1] = obj[i + 1];
> > > > +                       ring[idx + 2] = obj[i + 2];
> > > > +                       ring[idx + 3] = obj[i + 3];
> > > > +                       ring[idx + 4] = obj[i + 4];
> > > > +                       ring[idx + 5] = obj[i + 5];
> > > > +                       ring[idx + 6] = obj[i + 6];
> > > > +                       ring[idx + 7] = obj[i + 7];
> > > > +               }
> > > > +               switch (n & 0x7) {
> > > > +               case 7:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 6:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 5:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 4:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 3:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 2:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 1:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               }
> > > > +       } else {
> > > > +               for (i = 0; idx < size; i++, idx++)
> > > > +                       ring[idx] = obj[i];
> > > > +               /* Start at the beginning */
> > > > +               for (idx = 0; i < n; i++, idx++)
> > > > +                       ring[idx] = obj[i];
> > > > +       }
> > > > +}
> > > > +
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
> > > > +               const void *obj_table, uint32_t n) {
> > > > +       unsigned int i;
> > > > +       const uint32_t size = r->size;
> > > > +       uint32_t idx = prod_head & r->mask;
> > > > +       uint64_t *ring = (uint64_t *)&r[1];
> > > > +       const uint64_t *obj = (const uint64_t *)obj_table;
> > > > +       if (likely(idx + n < size)) {
> > > > +               for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
> > > > +                       ring[idx] = obj[i];
> > > > +                       ring[idx + 1] = obj[i + 1];
> > > > +                       ring[idx + 2] = obj[i + 2];
> > > > +                       ring[idx + 3] = obj[i + 3];
> > > > +               }
> > > > +               switch (n & 0x3) {
> > > > +               case 3:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 2:
> > > > +                       ring[idx++] = obj[i++]; /* fallthrough */
> > > > +               case 1:
> > > > +                       ring[idx++] = obj[i++];
> > > > +               }
> > > > +       } else {
> > > > +               for (i = 0; idx < size; i++, idx++)
> > > > +                       ring[idx] = obj[i];
> > > > +               /* Start at the beginning */
> > > > +               for (idx = 0; i < n; i++, idx++)
> > > > +                       ring[idx] = obj[i];
> > > > +       }
> > > > +}
> > > > +
> > > > +static __rte_always_inline void
> > > > +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
> > > > +               const void *obj_table, uint32_t n) {
> > > > +       unsigned int i;
> > > > +       const uint32_t size = r->size;
> > > > +       uint32_t idx = prod_head & r->mask;
> > > > +       rte_int128_t *ring = (rte_int128_t *)&r[1];
> > > > +       const rte_int128_t *obj = (const rte_int128_t *)obj_table;
> > > > +       if (likely(idx + n < size)) {
> > > > +               for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
> > > > +                       memcpy((void *)(ring + idx),
> > > > +                               (const void *)(obj + i), 32);
> > > > +               switch (n & 0x1) {
> > > > +               case 1:
> > > > +                       memcpy((void *)(ring + idx),
> > > > +                               (const void *)(obj + i), 16);
> > > > +               }
> > > > +       } else {
> > > > +               for (i = 0; idx < size; i++, idx++)
> > > > +                       memcpy((void *)(ring + idx),
> > > > +                               (const void *)(obj + i), 16);
> > > > +               /* Start at the beginning */
> > > > +               for (idx = 0; i < n; i++, idx++)
> > > > +                       memcpy((void *)(ring + idx),
> > > > +                               (const void *)(obj + i), 16);
> > > > +       }
> > > > +}
> > > > +
> > > > +/* the actual enqueue of elements on the ring.
> > > > + * Placed here since identical code needed in both
> > > > + * single and multi producer enqueue functions.
> > > > + */
> > > > +static __rte_always_inline void
> > > > +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void
> > > *obj_table,
> > > > +               uint32_t esize, uint32_t num)
> > > > +{
> > > > +       /* 8B and 16B copies implemented individually to retain
> > > > +        * the current performance.
> > > > +        */
> > > > +       if (esize == 8)
> > > > +               enqueue_elems_64(r, prod_head, obj_table, num);
> > > > +       else if (esize == 16)
> > > > +               enqueue_elems_128(r, prod_head, obj_table, num);
> > > > +       else {
> > > > +               uint32_t idx, scale, nr_idx, nr_num, nr_size;
> > > > +
> > > > +               /* Normalize to uint32_t */
> > > > +               scale = esize / sizeof(uint32_t);
> > > > +               nr_num = num * scale;
> > > > +               idx = prod_head & r->mask;
> > > > +               nr_idx = idx * scale;
> > > > +               nr_size = r->size * scale;
> > > > +               enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num);
> > > > +       }
> > > > +}
> > >
> > > Following Konstatin's comment on v7, enqueue_elems_128() was
> > > modified to ensure it won't crash if the object is unaligned. Are we
> > > sure that this same problem cannot also occurs with 64b copies on
> > > all supported architectures? (I mean 64b access that is only aligned
> > > on 32b)
> > Konstantin mentioned that the 64b load/store instructions on x86 can
> handle unaligned access.
> 
> Yep, I think we are ok here for IA and IA-32.
> 
> > On aarch64, the load/store (non-atomic, which will be used in this
> > case) can handle unaligned access.
> >
> > + David Christensen to comment for PPC
> 
> If we are in doubt here, probably worth to add a new test-case(s) for UT?
I will modify one of the functional tests to use unaligned address.

> 
> >
> > >
> > > Out of curiosity, would it make a big perf difference to only use
> > > enqueue_elems_32()?
> > Yes, this was having a significant impact on 128b elements. I did not try on
> 64b elements.
> > I will run the perf test with 32b copy for 64b element size and get back.
Following is the data. Mostly, the 64b element copy performs better.

8B with 64b element copy                                                        
                                   8B with 32b element copy
RTE>>ring_perf_autotest                                                         
                                   RTE>>ring_perf_autotest
                
### Testing single element enq/deq ###                                          
                         ### Testing single element enq/deq ###
elem APIs: element size 8B: SP/SC: single: 10.02                                
                    elem APIs: element size 8B: SP/SC: single: 11.21
elem APIs: element size 8B: MP/MC: single: 51.21                                
                elem APIs: element size 8B: MP/MC: single: 46.22
        
### Testing burst enq/deq ###                                                   
                               ### Testing burst enq/deq ###
elem APIs: element size 8B: SP/SC: burst (size: 8): 39.30                       
               elem APIs: element size 8B: SP/SC: burst (size: 8): 62.01
elem APIs: element size 8B: SP/SC: burst (size: 32): 92.73                      
             elem APIs: element size 8B: SP/SC: burst (size: 32): 189.15
elem APIs: element size 8B: MP/MC: burst (size: 8): 76.92                       
           elem APIs: element size 8B: MP/MC: burst (size: 8): 109.28
elem APIs: element size 8B: MP/MC: burst (size: 32): 134.84                     
        elem APIs: element size 8B: MP/MC: burst (size: 32): 229.98
                
### Testing bulk enq/deq ###                                                    
                               ### Testing bulk enq/deq ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 32.56                        
               elem APIs: element size 8B: SP/SC: bulk (size: 8): 62.01
elem APIs: element size 8B: SP/SC: bulk (size: 32): 93.53                       
              elem APIs: element size 8B: SP/SC: bulk (size: 32): 189.78
elem APIs: element size 8B: MP/MC: bulk (size: 8): 76.76                        
                   elem APIs: element size 8B: MP/MC: bulk (size: 8): 109.16
elem APIs: element size 8B: MP/MC: bulk (size: 32): 134.74                      
         elem APIs: element size 8B: MP/MC: bulk (size: 32): 229.87

### Testing empty bulk deq ###                                                  
                             ### Testing empty bulk deq ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 5.00                         
                elem APIs: element size 8B: SP/SC: bulk (size: 8): 3.00
elem APIs: element size 8B: MP/MC: bulk (size: 8): 6.00                         
            elem APIs: element size 8B: MP/MC: bulk (size: 8): 6.00

### Testing using two physical cores ###                                        
                       ### Testing using two physical cores ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 26.33                        
              elem APIs: element size 8B: SP/SC: bulk (size: 8): 47.03
elem APIs: element size 8B: MP/MC: bulk (size: 8): 83.22                        
                   elem APIs: element size 8B: MP/MC: bulk (size: 8): 83.12
elem APIs: element size 8B: SP/SC: bulk (size: 32): 12.48                       
             elem APIs: element size 8B: SP/SC: bulk (size: 32): 20.73
elem APIs: element size 8B: MP/MC: bulk (size: 32): 24.53                       
         elem APIs: element size 8B: MP/MC: bulk (size: 32): 23.18

### Testing using two NUMA nodes ###                                            
                   ### Testing using two NUMA nodes ###
elem APIs: element size 8B: SP/SC: bulk (size: 8): 54.35                        
             elem APIs: element size 8B: SP/SC: bulk (size: 8): 92.40
elem APIs: element size 8B: MP/MC: bulk (size: 8): 197.59                       
        elem APIs: element size 8B: MP/MC: bulk (size: 8): 198.98
elem APIs: element size 8B: SP/SC: bulk (size: 32): 36.54                       
           elem APIs: element size 8B: SP/SC: bulk (size: 32): 57.22
elem APIs: element size 8B: MP/MC: bulk (size: 32): 72.57                       
       elem APIs: element size 8B: MP/MC: bulk (size: 32): 65.65

Reply via email to