Add an extra parameter to the ring dequeue burst/bulk functions so that
those functions can optionally return the amount of remaining objs in the
ring. This information can be used by applications in a number of ways,
for instance, with single-consumer queues, it provides a max
dequeue size which is guaranteed to work.

Signed-off-by: Bruce Richardson <bruce.richard...@intel.com>
---
 app/pdump/main.c                                   |  2 +-
 app/test-pipeline/runtime.c                        |  6 +-
 app/test/test_link_bonding_mode4.c                 |  3 +-
 app/test/test_pmd_ring_perf.c                      |  7 +-
 app/test/test_ring.c                               | 54 ++++++-------
 app/test/test_ring_perf.c                          | 20 +++--
 app/test/test_table_acl.c                          |  2 +-
 app/test/test_table_pipeline.c                     |  2 +-
 app/test/test_table_ports.c                        |  8 +-
 app/test/virtual_pmd.c                             |  4 +-
 drivers/crypto/null/null_crypto_pmd.c              |  2 +-
 drivers/net/bonding/rte_eth_bond_pmd.c             |  3 +-
 drivers/net/ring/rte_eth_ring.c                    |  2 +-
 examples/distributor/main.c                        |  2 +-
 examples/load_balancer/runtime.c                   |  6 +-
 .../client_server_mp/mp_client/client.c            |  3 +-
 examples/packet_ordering/main.c                    |  6 +-
 examples/qos_sched/app_thread.c                    |  6 +-
 examples/quota_watermark/qw/main.c                 |  5 +-
 examples/server_node_efd/node/node.c               |  2 +-
 lib/librte_hash/rte_cuckoo_hash.c                  |  3 +-
 lib/librte_mempool/rte_mempool_ring.c              |  4 +-
 lib/librte_port/rte_port_frag.c                    |  3 +-
 lib/librte_port/rte_port_ring.c                    |  6 +-
 lib/librte_ring/rte_ring.h                         | 90 +++++++++++-----------
 25 files changed, 137 insertions(+), 114 deletions(-)

diff --git a/app/pdump/main.c b/app/pdump/main.c
index b88090d..3b13753 100644
--- a/app/pdump/main.c
+++ b/app/pdump/main.c
@@ -496,7 +496,7 @@ pdump_rxtx(struct rte_ring *ring, uint8_t vdev_id, struct 
pdump_stats *stats)
 
        /* first dequeue packets from ring of primary process */
        const uint16_t nb_in_deq = rte_ring_dequeue_burst(ring,
-                       (void *)rxtx_bufs, BURST_SIZE);
+                       (void *)rxtx_bufs, BURST_SIZE, NULL);
        stats->dequeue_pkts += nb_in_deq;
 
        if (nb_in_deq) {
diff --git a/app/test-pipeline/runtime.c b/app/test-pipeline/runtime.c
index c06ff54..8970e1c 100644
--- a/app/test-pipeline/runtime.c
+++ b/app/test-pipeline/runtime.c
@@ -121,7 +121,8 @@ app_main_loop_worker(void) {
                ret = rte_ring_sc_dequeue_bulk(
                        app.rings_rx[i],
                        (void **) worker_mbuf->array,
-                       app.burst_size_worker_read);
+                       app.burst_size_worker_read,
+                       NULL);
 
                if (ret == 0)
                        continue;
@@ -151,7 +152,8 @@ app_main_loop_tx(void) {
                ret = rte_ring_sc_dequeue_bulk(
                        app.rings_tx[i],
                        (void **) &app.mbuf_tx[i].array[n_mbufs],
-                       app.burst_size_tx_read);
+                       app.burst_size_tx_read,
+                       NULL);
 
                if (ret == 0)
                        continue;
diff --git a/app/test/test_link_bonding_mode4.c 
b/app/test/test_link_bonding_mode4.c
index 8df28b4..15091b1 100644
--- a/app/test/test_link_bonding_mode4.c
+++ b/app/test/test_link_bonding_mode4.c
@@ -193,7 +193,8 @@ static uint8_t lacpdu_rx_count[RTE_MAX_ETHPORTS] = {0, };
 static int
 slave_get_pkts(struct slave_conf *slave, struct rte_mbuf **buf, uint16_t size)
 {
-       return rte_ring_dequeue_burst(slave->tx_queue, (void **)buf, size);
+       return rte_ring_dequeue_burst(slave->tx_queue, (void **)buf,
+                       size, NULL);
 }
 
 /*
diff --git a/app/test/test_pmd_ring_perf.c b/app/test/test_pmd_ring_perf.c
index 045a7f2..004882a 100644
--- a/app/test/test_pmd_ring_perf.c
+++ b/app/test/test_pmd_ring_perf.c
@@ -67,7 +67,7 @@ test_empty_dequeue(void)
 
        const uint64_t sc_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0]);
+               rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
        const uint64_t sc_end = rte_rdtsc();
 
        const uint64_t eth_start = rte_rdtsc();
@@ -99,7 +99,7 @@ test_single_enqueue_dequeue(void)
        rte_compiler_barrier();
        for (i = 0; i < iterations; i++) {
                rte_ring_enqueue_bulk(r, &burst, 1, NULL);
-               rte_ring_dequeue_bulk(r, &burst, 1);
+               rte_ring_dequeue_bulk(r, &burst, 1, NULL);
        }
        const uint64_t sc_end = rte_rdtsc_precise();
        rte_compiler_barrier();
@@ -133,7 +133,8 @@ test_bulk_enqueue_dequeue(void)
                for (i = 0; i < iterations; i++) {
                        rte_ring_sp_enqueue_bulk(r, (void *)burst,
                                        bulk_sizes[sz], NULL);
-                       rte_ring_sc_dequeue_bulk(r, (void *)burst, 
bulk_sizes[sz]);
+                       rte_ring_sc_dequeue_bulk(r, (void *)burst,
+                                       bulk_sizes[sz], NULL);
                }
                const uint64_t sc_end = rte_rdtsc();
 
diff --git a/app/test/test_ring.c b/app/test/test_ring.c
index aa2a711..5b61ef1 100644
--- a/app/test/test_ring.c
+++ b/app/test/test_ring.c
@@ -119,7 +119,8 @@ test_ring_basic_full_empty(void * const src[], void *dst[])
                    __func__, i, rand);
                TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src,
                                rand, NULL));
-               TEST_RING_VERIFY(rand == rte_ring_dequeue_bulk(r, dst, rand));
+               TEST_RING_VERIFY(rand == rte_ring_dequeue_bulk(r, dst,
+                               rand, NULL));
 
                /* fill the ring */
                TEST_RING_VERIFY(0 != rte_ring_enqueue_bulk(r, src, rsz, NULL));
@@ -129,7 +130,8 @@ test_ring_basic_full_empty(void * const src[], void *dst[])
                TEST_RING_VERIFY(0 == rte_ring_empty(r));
 
                /* empty the ring */
-               TEST_RING_VERIFY(rsz == rte_ring_dequeue_bulk(r, dst, rsz));
+               TEST_RING_VERIFY(rsz == rte_ring_dequeue_bulk(r, dst,
+                               rsz, NULL));
                TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
                TEST_RING_VERIFY(0 == rte_ring_count(r));
                TEST_RING_VERIFY(0 == rte_ring_full(r));
@@ -186,19 +188,19 @@ test_ring_basic(void)
                goto fail;
 
        printf("dequeue 1 obj\n");
-       ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
+       ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1, NULL);
        cur_dst += 1;
        if (ret == 0)
                goto fail;
 
        printf("dequeue 2 objs\n");
-       ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
+       ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if (ret == 0)
                goto fail;
 
        printf("dequeue MAX_BULK objs\n");
-       ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
+       ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK, NULL);
        cur_dst += MAX_BULK;
        if (ret == 0)
                goto fail;
@@ -232,19 +234,19 @@ test_ring_basic(void)
                goto fail;
 
        printf("dequeue 1 obj\n");
-       ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
+       ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1, NULL);
        cur_dst += 1;
        if (ret == 0)
                goto fail;
 
        printf("dequeue 2 objs\n");
-       ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
+       ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if (ret == 0)
                goto fail;
 
        printf("dequeue MAX_BULK objs\n");
-       ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
+       ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK, NULL);
        cur_dst += MAX_BULK;
        if (ret == 0)
                goto fail;
@@ -265,7 +267,7 @@ test_ring_basic(void)
                cur_src += MAX_BULK;
                if (ret == 0)
                        goto fail;
-               ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
+               ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK, NULL);
                cur_dst += MAX_BULK;
                if (ret == 0)
                        goto fail;
@@ -303,13 +305,13 @@ test_ring_basic(void)
                printf("Cannot enqueue\n");
                goto fail;
        }
-       ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
+       ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems, NULL);
        cur_dst += num_elems;
        if (ret == 0) {
                printf("Cannot dequeue\n");
                goto fail;
        }
-       ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
+       ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems, NULL);
        cur_dst += num_elems;
        if (ret == 0) {
                printf("Cannot dequeue2\n");
@@ -390,19 +392,19 @@ test_ring_burst_basic(void)
                goto fail;
 
        printf("dequeue 1 obj\n");
-       ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
+       ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1, NULL) ;
        cur_dst += 1;
        if ((ret & RTE_RING_SZ_MASK) != 1)
                goto fail;
 
        printf("dequeue 2 objs\n");
-       ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
+       ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
        printf("dequeue MAX_BULK objs\n");
-       ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+       ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
        cur_dst += MAX_BULK;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                goto fail;
@@ -451,19 +453,19 @@ test_ring_burst_basic(void)
 
        printf("Test dequeue without enough objects \n");
        for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
-               ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+               ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
                cur_dst += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                        goto fail;
        }
 
        /* Available memory space for the exact MAX_BULK entries */
-       ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
+       ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
-       ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+       ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
        cur_dst += MAX_BULK - 3;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
                goto fail;
@@ -505,19 +507,19 @@ test_ring_burst_basic(void)
                goto fail;
 
        printf("dequeue 1 obj\n");
-       ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
+       ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1, NULL);
        cur_dst += 1;
        if ((ret & RTE_RING_SZ_MASK) != 1)
                goto fail;
 
        printf("dequeue 2 objs\n");
-       ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
+       ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
        printf("dequeue MAX_BULK objs\n");
-       ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+       ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
        cur_dst += MAX_BULK;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                goto fail;
@@ -539,7 +541,7 @@ test_ring_burst_basic(void)
                cur_src += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                        goto fail;
-               ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+               ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
                cur_dst += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                        goto fail;
@@ -578,19 +580,19 @@ test_ring_burst_basic(void)
 
        printf("Test dequeue without enough objects \n");
        for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
-               ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+               ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
                cur_dst += MAX_BULK;
                if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
                        goto fail;
        }
 
        /* Available objects - the exact MAX_BULK */
-       ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
+       ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
-       ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+       ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK, NULL);
        cur_dst += MAX_BULK - 3;
        if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
                goto fail;
@@ -613,7 +615,7 @@ test_ring_burst_basic(void)
        if ((ret & RTE_RING_SZ_MASK) != 2)
                goto fail;
 
-       ret = rte_ring_dequeue_burst(r, cur_dst, 2);
+       ret = rte_ring_dequeue_burst(r, cur_dst, 2, NULL);
        cur_dst += 2;
        if (ret != 2)
                goto fail;
@@ -753,7 +755,7 @@ test_ring_basic_ex(void)
                goto fail_test;
        }
 
-       ret = rte_ring_dequeue_burst(rp, obj, 2);
+       ret = rte_ring_dequeue_burst(rp, obj, 2, NULL);
        if (ret != 2) {
                printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
                goto fail_test;
diff --git a/app/test/test_ring_perf.c b/app/test/test_ring_perf.c
index f95a8e9..ed89896 100644
--- a/app/test/test_ring_perf.c
+++ b/app/test/test_ring_perf.c
@@ -152,12 +152,12 @@ test_empty_dequeue(void)
 
        const uint64_t sc_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0]);
+               rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
        const uint64_t sc_end = rte_rdtsc();
 
        const uint64_t mc_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0]);
+               rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
        const uint64_t mc_end = rte_rdtsc();
 
        printf("SC empty dequeue: %.2F\n",
@@ -230,13 +230,13 @@ dequeue_bulk(void *p)
 
        const uint64_t sc_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               while (rte_ring_sc_dequeue_bulk(r, burst, size) == 0)
+               while (rte_ring_sc_dequeue_bulk(r, burst, size, NULL) == 0)
                        rte_pause();
        const uint64_t sc_end = rte_rdtsc();
 
        const uint64_t mc_start = rte_rdtsc();
        for (i = 0; i < iterations; i++)
-               while (rte_ring_mc_dequeue_bulk(r, burst, size) == 0)
+               while (rte_ring_mc_dequeue_bulk(r, burst, size, NULL) == 0)
                        rte_pause();
        const uint64_t mc_end = rte_rdtsc();
 
@@ -325,7 +325,8 @@ test_burst_enqueue_dequeue(void)
                for (i = 0; i < iterations; i++) {
                        rte_ring_sp_enqueue_burst(r, burst,
                                        bulk_sizes[sz], NULL);
-                       rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
+                       rte_ring_sc_dequeue_burst(r, burst,
+                                       bulk_sizes[sz], NULL);
                }
                const uint64_t sc_end = rte_rdtsc();
 
@@ -333,7 +334,8 @@ test_burst_enqueue_dequeue(void)
                for (i = 0; i < iterations; i++) {
                        rte_ring_mp_enqueue_burst(r, burst,
                                        bulk_sizes[sz], NULL);
-                       rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
+                       rte_ring_mc_dequeue_burst(r, burst,
+                                       bulk_sizes[sz], NULL);
                }
                const uint64_t mc_end = rte_rdtsc();
 
@@ -361,7 +363,8 @@ test_bulk_enqueue_dequeue(void)
                for (i = 0; i < iterations; i++) {
                        rte_ring_sp_enqueue_bulk(r, burst,
                                        bulk_sizes[sz], NULL);
-                       rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
+                       rte_ring_sc_dequeue_bulk(r, burst,
+                                       bulk_sizes[sz], NULL);
                }
                const uint64_t sc_end = rte_rdtsc();
 
@@ -369,7 +372,8 @@ test_bulk_enqueue_dequeue(void)
                for (i = 0; i < iterations; i++) {
                        rte_ring_mp_enqueue_bulk(r, burst,
                                        bulk_sizes[sz], NULL);
-                       rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
+                       rte_ring_mc_dequeue_bulk(r, burst,
+                                       bulk_sizes[sz], NULL);
                }
                const uint64_t mc_end = rte_rdtsc();
 
diff --git a/app/test/test_table_acl.c b/app/test/test_table_acl.c
index b3bfda4..4d43be7 100644
--- a/app/test/test_table_acl.c
+++ b/app/test/test_table_acl.c
@@ -713,7 +713,7 @@ test_pipeline_single_filter(int expected_count)
                void *objs[RING_TX_SIZE];
                struct rte_mbuf *mbuf;
 
-               ret = rte_ring_sc_dequeue_burst(rings_tx[i], objs, 10);
+               ret = rte_ring_sc_dequeue_burst(rings_tx[i], objs, 10, NULL);
                if (ret <= 0) {
                        printf("Got no objects from ring %d - error code %d\n",
                                i, ret);
diff --git a/app/test/test_table_pipeline.c b/app/test/test_table_pipeline.c
index 36bfeda..b58aa5d 100644
--- a/app/test/test_table_pipeline.c
+++ b/app/test/test_table_pipeline.c
@@ -494,7 +494,7 @@ test_pipeline_single_filter(int test_type, int 
expected_count)
                void *objs[RING_TX_SIZE];
                struct rte_mbuf *mbuf;
 
-               ret = rte_ring_sc_dequeue_burst(rings_tx[i], objs, 10);
+               ret = rte_ring_sc_dequeue_burst(rings_tx[i], objs, 10, NULL);
                if (ret <= 0)
                        printf("Got no objects from ring %d - error code %d\n",
                                i, ret);
diff --git a/app/test/test_table_ports.c b/app/test/test_table_ports.c
index 395f4f3..39592ce 100644
--- a/app/test/test_table_ports.c
+++ b/app/test/test_table_ports.c
@@ -163,7 +163,7 @@ test_port_ring_writer(void)
        rte_port_ring_writer_ops.f_flush(port);
        expected_pkts = 1;
        received_pkts = rte_ring_sc_dequeue_burst(port_ring_writer_params.ring,
-               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz);
+               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz, NULL);
 
        if (received_pkts < expected_pkts)
                return -7;
@@ -178,7 +178,7 @@ test_port_ring_writer(void)
 
        expected_pkts = RTE_PORT_IN_BURST_SIZE_MAX;
        received_pkts = rte_ring_sc_dequeue_burst(port_ring_writer_params.ring,
-               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz);
+               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz, NULL);
 
        if (received_pkts < expected_pkts)
                return -8;
@@ -193,7 +193,7 @@ test_port_ring_writer(void)
 
        expected_pkts = RTE_PORT_IN_BURST_SIZE_MAX;
        received_pkts = rte_ring_sc_dequeue_burst(port_ring_writer_params.ring,
-               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz);
+               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz, NULL);
 
        if (received_pkts < expected_pkts)
                return -8;
@@ -208,7 +208,7 @@ test_port_ring_writer(void)
 
        expected_pkts = RTE_PORT_IN_BURST_SIZE_MAX;
        received_pkts = rte_ring_sc_dequeue_burst(port_ring_writer_params.ring,
-               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz);
+               (void **)res_mbuf, port_ring_writer_params.tx_burst_sz, NULL);
 
        if (received_pkts < expected_pkts)
                return -9;
diff --git a/app/test/virtual_pmd.c b/app/test/virtual_pmd.c
index 39e070c..b209355 100644
--- a/app/test/virtual_pmd.c
+++ b/app/test/virtual_pmd.c
@@ -342,7 +342,7 @@ virtual_ethdev_rx_burst_success(void *queue __rte_unused,
        dev_private = vrtl_eth_dev->data->dev_private;
 
        rx_count = rte_ring_dequeue_burst(dev_private->rx_queue, (void **) bufs,
-                       nb_pkts);
+                       nb_pkts, NULL);
 
        /* increments ipackets count */
        dev_private->eth_stats.ipackets += rx_count;
@@ -508,7 +508,7 @@ virtual_ethdev_get_mbufs_from_tx_queue(uint8_t port_id,
 
        dev_private = vrtl_eth_dev->data->dev_private;
        return rte_ring_dequeue_burst(dev_private->tx_queue, (void **)pkt_burst,
-               burst_length);
+               burst_length, NULL);
 }
 
 static uint8_t
diff --git a/drivers/crypto/null/null_crypto_pmd.c 
b/drivers/crypto/null/null_crypto_pmd.c
index ed5a9fc..f68ec8d 100644
--- a/drivers/crypto/null/null_crypto_pmd.c
+++ b/drivers/crypto/null/null_crypto_pmd.c
@@ -155,7 +155,7 @@ null_crypto_pmd_dequeue_burst(void *queue_pair, struct 
rte_crypto_op **ops,
        unsigned nb_dequeued;
 
        nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts,
-                       (void **)ops, nb_ops);
+                       (void **)ops, nb_ops, NULL);
        qp->qp_stats.dequeued_count += nb_dequeued;
 
        return nb_dequeued;
diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c 
b/drivers/net/bonding/rte_eth_bond_pmd.c
index f3ac9e2..96638af 100644
--- a/drivers/net/bonding/rte_eth_bond_pmd.c
+++ b/drivers/net/bonding/rte_eth_bond_pmd.c
@@ -1008,7 +1008,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf 
**bufs,
                struct port *port = &mode_8023ad_ports[slaves[i]];
 
                slave_slow_nb_pkts[i] = rte_ring_dequeue_burst(port->tx_ring,
-                               slow_pkts, BOND_MODE_8023AX_SLAVE_TX_PKTS);
+                               slow_pkts, BOND_MODE_8023AX_SLAVE_TX_PKTS,
+                               NULL);
                slave_nb_pkts[i] = slave_slow_nb_pkts[i];
 
                for (j = 0; j < slave_slow_nb_pkts[i]; j++)
diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c
index adbf478..77ef3a1 100644
--- a/drivers/net/ring/rte_eth_ring.c
+++ b/drivers/net/ring/rte_eth_ring.c
@@ -88,7 +88,7 @@ eth_ring_rx(void *q, struct rte_mbuf **bufs, uint16_t nb_bufs)
        void **ptrs = (void *)&bufs[0];
        struct ring_queue *r = q;
        const uint16_t nb_rx = (uint16_t)rte_ring_dequeue_burst(r->rng,
-                       ptrs, nb_bufs);
+                       ptrs, nb_bufs, NULL);
        if (r->rng->flags & RING_F_SC_DEQ)
                r->rx_pkts.cnt += nb_rx;
        else
diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index cfd360b..5cb6185 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -330,7 +330,7 @@ lcore_tx(struct rte_ring *in_r)
 
                        struct rte_mbuf *bufs[BURST_SIZE];
                        const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-                                       (void *)bufs, BURST_SIZE);
+                                       (void *)bufs, BURST_SIZE, NULL);
                        app_stats.tx.dequeue_pkts += nb_rx;
 
                        /* if we get no traffic, flush anything we have */
diff --git a/examples/load_balancer/runtime.c b/examples/load_balancer/runtime.c
index 1645994..8192c08 100644
--- a/examples/load_balancer/runtime.c
+++ b/examples/load_balancer/runtime.c
@@ -349,7 +349,8 @@ app_lcore_io_tx(
                        ret = rte_ring_sc_dequeue_bulk(
                                ring,
                                (void **) &lp->tx.mbuf_out[port].array[n_mbufs],
-                               bsz_rd);
+                               bsz_rd,
+                               NULL);
 
                        if (unlikely(ret == 0))
                                continue;
@@ -504,7 +505,8 @@ app_lcore_worker(
                ret = rte_ring_sc_dequeue_bulk(
                        ring_in,
                        (void **) lp->mbuf_in.array,
-                       bsz_rd);
+                       bsz_rd,
+                       NULL);
 
                if (unlikely(ret == 0))
                        continue;
diff --git a/examples/multi_process/client_server_mp/mp_client/client.c 
b/examples/multi_process/client_server_mp/mp_client/client.c
index dca9eb9..01b535c 100644
--- a/examples/multi_process/client_server_mp/mp_client/client.c
+++ b/examples/multi_process/client_server_mp/mp_client/client.c
@@ -279,7 +279,8 @@ main(int argc, char *argv[])
                uint16_t i, rx_pkts;
                uint8_t port;
 
-               rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts, PKT_READ_SIZE);
+               rx_pkts = rte_ring_dequeue_burst(rx_ring, pkts,
+                               PKT_READ_SIZE, NULL);
 
                if (unlikely(rx_pkts == 0)){
                        if (need_flush)
diff --git a/examples/packet_ordering/main.c b/examples/packet_ordering/main.c
index d268350..7719dad 100644
--- a/examples/packet_ordering/main.c
+++ b/examples/packet_ordering/main.c
@@ -462,7 +462,7 @@ worker_thread(void *args_ptr)
 
                /* dequeue the mbufs from rx_to_workers ring */
                burst_size = rte_ring_dequeue_burst(ring_in,
-                               (void *)burst_buffer, MAX_PKTS_BURST);
+                               (void *)burst_buffer, MAX_PKTS_BURST, NULL);
                if (unlikely(burst_size == 0))
                        continue;
 
@@ -510,7 +510,7 @@ send_thread(struct send_thread_args *args)
 
                /* deque the mbufs from workers_to_tx ring */
                nb_dq_mbufs = rte_ring_dequeue_burst(args->ring_in,
-                               (void *)mbufs, MAX_PKTS_BURST);
+                               (void *)mbufs, MAX_PKTS_BURST, NULL);
 
                if (unlikely(nb_dq_mbufs == 0))
                        continue;
@@ -595,7 +595,7 @@ tx_thread(struct rte_ring *ring_in)
 
                /* deque the mbufs from workers_to_tx ring */
                dqnum = rte_ring_dequeue_burst(ring_in,
-                               (void *)mbufs, MAX_PKTS_BURST);
+                               (void *)mbufs, MAX_PKTS_BURST, NULL);
 
                if (unlikely(dqnum == 0))
                        continue;
diff --git a/examples/qos_sched/app_thread.c b/examples/qos_sched/app_thread.c
index 0c81a15..15f117f 100644
--- a/examples/qos_sched/app_thread.c
+++ b/examples/qos_sched/app_thread.c
@@ -179,7 +179,7 @@ app_tx_thread(struct thread_conf **confs)
 
        while ((conf = confs[conf_idx])) {
                retval = rte_ring_sc_dequeue_bulk(conf->tx_ring, (void **)mbufs,
-                                       burst_conf.qos_dequeue);
+                                       burst_conf.qos_dequeue, NULL);
                if (likely(retval != 0)) {
                        app_send_packets(conf, mbufs, burst_conf.qos_dequeue);
 
@@ -218,7 +218,7 @@ app_worker_thread(struct thread_conf **confs)
 
                /* Read packet from the ring */
                nb_pkt = rte_ring_sc_dequeue_burst(conf->rx_ring, (void 
**)mbufs,
-                                       burst_conf.ring_burst);
+                                       burst_conf.ring_burst, NULL);
                if (likely(nb_pkt)) {
                        int nb_sent = rte_sched_port_enqueue(conf->sched_port, 
mbufs,
                                        nb_pkt);
@@ -254,7 +254,7 @@ app_mixed_thread(struct thread_conf **confs)
 
                /* Read packet from the ring */
                nb_pkt = rte_ring_sc_dequeue_burst(conf->rx_ring, (void 
**)mbufs,
-                                       burst_conf.ring_burst);
+                                       burst_conf.ring_burst, NULL);
                if (likely(nb_pkt)) {
                        int nb_sent = rte_sched_port_enqueue(conf->sched_port, 
mbufs,
                                        nb_pkt);
diff --git a/examples/quota_watermark/qw/main.c 
b/examples/quota_watermark/qw/main.c
index 8fb7eb1..ef39053 100644
--- a/examples/quota_watermark/qw/main.c
+++ b/examples/quota_watermark/qw/main.c
@@ -243,7 +243,7 @@ pipeline_stage(__attribute__((unused)) void *args)
             }
 
             /* Dequeue up to quota mbuf from rx */
-            nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts, *quota);
+            nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts, *quota, NULL);
             if (unlikely(nb_dq_pkts < 0))
                 continue;
 
@@ -297,7 +297,8 @@ send_stage(__attribute__((unused)) void *args)
                 continue;
 
             /* Dequeue packets from tx and send them */
-            nb_dq_pkts = (uint16_t) rte_ring_dequeue_burst(tx, (void *) 
tx_pkts, *quota);
+            nb_dq_pkts = (uint16_t) rte_ring_dequeue_burst(tx, (void *) 
tx_pkts,
+                           *quota, NULL);
             rte_eth_tx_burst(dest_port_id, 0, tx_pkts, nb_dq_pkts);
 
             /* TODO: Check if nb_dq_pkts == nb_tx_pkts? */
diff --git a/examples/server_node_efd/node/node.c 
b/examples/server_node_efd/node/node.c
index 9ec6a05..f780b92 100644
--- a/examples/server_node_efd/node/node.c
+++ b/examples/server_node_efd/node/node.c
@@ -392,7 +392,7 @@ main(int argc, char *argv[])
                 */
                while (rx_pkts > 0 &&
                                unlikely(rte_ring_dequeue_bulk(rx_ring, pkts,
-                                       rx_pkts) == 0))
+                                       rx_pkts, NULL) == 0))
                        rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring),
                                        PKT_READ_SIZE);
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.c 
b/lib/librte_hash/rte_cuckoo_hash.c
index 6552199..645c0cf 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -536,7 +536,8 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, 
const void *key,
                if (cached_free_slots->len == 0) {
                        /* Need to get another burst of free slots from global 
ring */
                        n_slots = rte_ring_mc_dequeue_burst(h->free_slots,
-                                       cached_free_slots->objs, 
LCORE_CACHE_SIZE);
+                                       cached_free_slots->objs,
+                                       LCORE_CACHE_SIZE, NULL);
                        if (n_slots == 0)
                                return -ENOSPC;
 
diff --git a/lib/librte_mempool/rte_mempool_ring.c 
b/lib/librte_mempool/rte_mempool_ring.c
index 9b8fd2b..5c132bf 100644
--- a/lib/librte_mempool/rte_mempool_ring.c
+++ b/lib/librte_mempool/rte_mempool_ring.c
@@ -58,14 +58,14 @@ static int
 common_ring_mc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
 {
        return rte_ring_mc_dequeue_bulk(mp->pool_data,
-                       obj_table, n) == 0 ? -ENOBUFS : 0;
+                       obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
 }
 
 static int
 common_ring_sc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
 {
        return rte_ring_sc_dequeue_bulk(mp->pool_data,
-                       obj_table, n) == 0 ? -ENOBUFS : 0;
+                       obj_table, n, NULL) == 0 ? -ENOBUFS : 0;
 }
 
 static unsigned
diff --git a/lib/librte_port/rte_port_frag.c b/lib/librte_port/rte_port_frag.c
index 0fcace9..320407e 100644
--- a/lib/librte_port/rte_port_frag.c
+++ b/lib/librte_port/rte_port_frag.c
@@ -186,7 +186,8 @@ rte_port_ring_reader_frag_rx(void *port,
                /* If "pkts" buffer is empty, read packet burst from ring */
                if (p->n_pkts == 0) {
                        p->n_pkts = rte_ring_sc_dequeue_burst(p->ring,
-                               (void **) p->pkts, RTE_PORT_IN_BURST_SIZE_MAX);
+                               (void **) p->pkts, RTE_PORT_IN_BURST_SIZE_MAX,
+                               NULL);
                        RTE_PORT_RING_READER_FRAG_STATS_PKTS_IN_ADD(p, 
p->n_pkts);
                        if (p->n_pkts == 0)
                                return n_pkts_out;
diff --git a/lib/librte_port/rte_port_ring.c b/lib/librte_port/rte_port_ring.c
index 9fadac7..492b0e7 100644
--- a/lib/librte_port/rte_port_ring.c
+++ b/lib/librte_port/rte_port_ring.c
@@ -111,7 +111,8 @@ rte_port_ring_reader_rx(void *port, struct rte_mbuf **pkts, 
uint32_t n_pkts)
        struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
        uint32_t nb_rx;
 
-       nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
+       nb_rx = rte_ring_sc_dequeue_burst(p->ring, (void **) pkts,
+                       n_pkts, NULL);
        RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
 
        return nb_rx;
@@ -124,7 +125,8 @@ rte_port_ring_multi_reader_rx(void *port, struct rte_mbuf 
**pkts,
        struct rte_port_ring_reader *p = (struct rte_port_ring_reader *) port;
        uint32_t nb_rx;
 
-       nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts, n_pkts);
+       nb_rx = rte_ring_mc_dequeue_burst(p->ring, (void **) pkts,
+                       n_pkts, NULL);
        RTE_PORT_RING_READER_STATS_PKTS_IN_ADD(p, nb_rx);
 
        return nb_rx;
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 2f8995c..b6123ba 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -497,7 +497,8 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const 
*obj_table,
 
 static inline unsigned int __attribute__((always_inline))
 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
-                unsigned n, enum rte_ring_queue_behavior behavior)
+                unsigned int n, enum rte_ring_queue_behavior behavior,
+                unsigned int *available)
 {
        uint32_t cons_head, prod_tail;
        uint32_t cons_next, entries;
@@ -506,11 +507,6 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void 
**obj_table,
        unsigned int i;
        uint32_t mask = r->mask;
 
-       /* Avoid the unnecessary cmpset operation below, which is also
-        * potentially harmful when n equals 0. */
-       if (n == 0)
-               return 0;
-
        /* move cons.head atomically */
        do {
                /* Restore n as it may change every loop */
@@ -525,15 +521,11 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void 
**obj_table,
                entries = (prod_tail - cons_head);
 
                /* Set the actual entries for dequeue */
-               if (n > entries) {
-                       if (behavior == RTE_RING_QUEUE_FIXED)
-                               return 0;
-                       else {
-                               if (unlikely(entries == 0))
-                                       return 0;
-                               n = entries;
-                       }
-               }
+               if (n > entries)
+                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : entries;
+
+               if (unlikely(n == 0))
+                       goto end;
 
                cons_next = cons_head + n;
                success = rte_atomic32_cmpset(&r->cons.head, cons_head,
@@ -552,7 +544,9 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void 
**obj_table,
                rte_pause();
 
        r->cons.tail = cons_next;
-
+end:
+       if (available != NULL)
+               *available = entries - n;
        return n;
 }
 
@@ -581,7 +575,8 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void 
**obj_table,
  */
 static inline unsigned int __attribute__((always_inline))
 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
-                unsigned n, enum rte_ring_queue_behavior behavior)
+                unsigned int n, enum rte_ring_queue_behavior behavior,
+                unsigned int *available)
 {
        uint32_t cons_head, prod_tail;
        uint32_t cons_next, entries;
@@ -596,15 +591,11 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void 
**obj_table,
         * and size(ring)-1. */
        entries = prod_tail - cons_head;
 
-       if (n > entries) {
-               if (behavior == RTE_RING_QUEUE_FIXED)
-                       return 0;
-               else {
-                       if (unlikely(entries == 0))
-                               return 0;
-                       n = entries;
-               }
-       }
+       if (n > entries)
+               n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : entries;
+
+       if (unlikely(entries == 0))
+               goto end;
 
        cons_next = cons_head + n;
        r->cons.head = cons_next;
@@ -614,6 +605,9 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void 
**obj_table,
        rte_smp_rmb();
 
        r->cons.tail = cons_next;
+end:
+       if (available != NULL)
+               *available = entries - n;
        return n;
 }
 
@@ -760,9 +754,11 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
  *   The number of objects dequeued, either 0 or n
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
+rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
+               unsigned int n, unsigned int *available)
 {
-       return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+       return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+                       available);
 }
 
 /**
@@ -779,9 +775,11 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void 
**obj_table, unsigned n)
  *   The number of objects dequeued, either 0 or n
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
+rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
+               unsigned int n, unsigned int *available)
 {
-       return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+       return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+                       available);
 }
 
 /**
@@ -801,12 +799,13 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void 
**obj_table, unsigned n)
  *   The number of objects dequeued, either 0 or n
  */
 static inline unsigned int __attribute__((always_inline))
-rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
+rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
+               unsigned int *available)
 {
        if (r->cons.sc_dequeue)
-               return rte_ring_sc_dequeue_bulk(r, obj_table, n);
+               return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
        else
-               return rte_ring_mc_dequeue_bulk(r, obj_table, n);
+               return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
 }
 
 /**
@@ -827,7 +826,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, 
unsigned n)
 static inline int __attribute__((always_inline))
 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
 {
-       return rte_ring_mc_dequeue_bulk(r, obj_p, 1)  ? 0 : -ENOBUFS;
+       return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL)  ? 0 : -ENOBUFS;
 }
 
 /**
@@ -845,7 +844,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
 static inline int __attribute__((always_inline))
 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
 {
-       return rte_ring_sc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
+       return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -867,7 +866,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
 static inline int __attribute__((always_inline))
 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 {
-       return rte_ring_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
+       return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -1057,9 +1056,11 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const 
*obj_table,
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
 static inline unsigned __attribute__((always_inline))
-rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
+rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
+               unsigned int n, unsigned int *available)
 {
-       return __rte_ring_mc_do_dequeue(r, obj_table, n, 
RTE_RING_QUEUE_VARIABLE);
+       return __rte_ring_mc_do_dequeue(r, obj_table, n,
+                       RTE_RING_QUEUE_VARIABLE, available);
 }
 
 /**
@@ -1077,9 +1078,11 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void 
**obj_table, unsigned n)
  *   - n: Actual number of objects dequeued, 0 if ring is empty
  */
 static inline unsigned __attribute__((always_inline))
-rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
+rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
+               unsigned int n, unsigned int *available)
 {
-       return __rte_ring_sc_do_dequeue(r, obj_table, n, 
RTE_RING_QUEUE_VARIABLE);
+       return __rte_ring_sc_do_dequeue(r, obj_table, n,
+                       RTE_RING_QUEUE_VARIABLE, available);
 }
 
 /**
@@ -1099,12 +1102,13 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void 
**obj_table, unsigned n)
  *   - Number of objects dequeued
  */
 static inline unsigned __attribute__((always_inline))
-rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
+rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
+               unsigned int n, unsigned int *available)
 {
        if (r->cons.sc_dequeue)
-               return rte_ring_sc_dequeue_burst(r, obj_table, n);
+               return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
        else
-               return rte_ring_mc_dequeue_burst(r, obj_table, n);
+               return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
 }
 
 #ifdef __cplusplus
-- 
2.9.3

Reply via email to