Calling Rx/Tx functions on a stopped queue is not supported.
Do not run packet forwarding for streams that use stopped queues.

Each stream has a read-only "disabled" field,
so that lcore function can skip such streams.
Forwarding engines can set this field
using a new "stream_init" callback function
by checking relevant queue states.
A helper function is provided to check if a given
Rx queue, Tx queue, or both of them are stopped.

Fixes: 5f4ec54f1d16 ("testpmd: queue start and stop")
Cc: sta...@dpdk.org

Signed-off-by: Dmitry Kozlyuk <dkozl...@nvidia.com>
Acked-by: Matan Azrad <ma...@nvidia.com>
---
 app/test-pmd/5tswap.c         | 13 ++++++++
 app/test-pmd/csumonly.c       | 13 ++++++++
 app/test-pmd/flowgen.c        | 13 ++++++++
 app/test-pmd/icmpecho.c       | 13 ++++++++
 app/test-pmd/ieee1588fwd.c    | 13 ++++++++
 app/test-pmd/iofwd.c          | 13 ++++++++
 app/test-pmd/macfwd.c         | 13 ++++++++
 app/test-pmd/noisy_vnf.c      | 13 ++++++++
 app/test-pmd/rxonly.c         | 13 ++++++++
 app/test-pmd/shared_rxq_fwd.c | 13 ++++++++
 app/test-pmd/testpmd.c        | 57 ++++++++++++++++++++++++++++++++++-
 app/test-pmd/testpmd.h        |  4 +++
 app/test-pmd/txonly.c         | 13 ++++++++
 13 files changed, 203 insertions(+), 1 deletion(-)

diff --git a/app/test-pmd/5tswap.c b/app/test-pmd/5tswap.c
index 629d3e0d31..2aa1f1843b 100644
--- a/app/test-pmd/5tswap.c
+++ b/app/test-pmd/5tswap.c
@@ -185,9 +185,22 @@ pkt_burst_5tuple_swap(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+stream_init_5tuple_swap(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine five_tuple_swap_fwd_engine = {
        .fwd_mode_name  = "5tswap",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = stream_init_5tuple_swap,
        .packet_fwd     = pkt_burst_5tuple_swap,
 };
diff --git a/app/test-pmd/csumonly.c b/app/test-pmd/csumonly.c
index 5274d498ee..a031cae2ca 100644
--- a/app/test-pmd/csumonly.c
+++ b/app/test-pmd/csumonly.c
@@ -1178,9 +1178,22 @@ pkt_burst_checksum_forward(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+stream_init_checksum_forward(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine csum_fwd_engine = {
        .fwd_mode_name  = "csum",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = stream_init_checksum_forward,
        .packet_fwd     = pkt_burst_checksum_forward,
 };
diff --git a/app/test-pmd/flowgen.c b/app/test-pmd/flowgen.c
index 9ceef3b54a..e2c1bfd82c 100644
--- a/app/test-pmd/flowgen.c
+++ b/app/test-pmd/flowgen.c
@@ -207,9 +207,22 @@ flowgen_begin(portid_t pi)
        return 0;
 }
 
+static int
+flowgen_stream_init(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine flow_gen_engine = {
        .fwd_mode_name  = "flowgen",
        .port_fwd_begin = flowgen_begin,
        .port_fwd_end   = NULL,
+       .stream_init    = flowgen_stream_init,
        .packet_fwd     = pkt_burst_flow_gen,
 };
diff --git a/app/test-pmd/icmpecho.c b/app/test-pmd/icmpecho.c
index 99c94cb282..dd3699ff3b 100644
--- a/app/test-pmd/icmpecho.c
+++ b/app/test-pmd/icmpecho.c
@@ -512,9 +512,22 @@ reply_to_icmp_echo_rqsts(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+icmpecho_stream_init(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine icmp_echo_engine = {
        .fwd_mode_name  = "icmpecho",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = icmpecho_stream_init,
        .packet_fwd     = reply_to_icmp_echo_rqsts,
 };
diff --git a/app/test-pmd/ieee1588fwd.c b/app/test-pmd/ieee1588fwd.c
index 9ff817aa68..f9f73f2c14 100644
--- a/app/test-pmd/ieee1588fwd.c
+++ b/app/test-pmd/ieee1588fwd.c
@@ -211,9 +211,22 @@ port_ieee1588_fwd_end(portid_t pi)
        rte_eth_timesync_disable(pi);
 }
 
+static int
+port_ieee1588_stream_init(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine ieee1588_fwd_engine = {
        .fwd_mode_name  = "ieee1588",
        .port_fwd_begin = port_ieee1588_fwd_begin,
        .port_fwd_end   = port_ieee1588_fwd_end,
+       .stream_init    = port_ieee1588_stream_init,
        .packet_fwd     = ieee1588_packet_fwd,
 };
diff --git a/app/test-pmd/iofwd.c b/app/test-pmd/iofwd.c
index 19cd920f70..b736a2a3bc 100644
--- a/app/test-pmd/iofwd.c
+++ b/app/test-pmd/iofwd.c
@@ -88,9 +88,22 @@ pkt_burst_io_forward(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+stream_init_forward(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine io_fwd_engine = {
        .fwd_mode_name  = "io",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = stream_init_forward,
        .packet_fwd     = pkt_burst_io_forward,
 };
diff --git a/app/test-pmd/macfwd.c b/app/test-pmd/macfwd.c
index 812a0c721f..64b65c8c51 100644
--- a/app/test-pmd/macfwd.c
+++ b/app/test-pmd/macfwd.c
@@ -119,9 +119,22 @@ pkt_burst_mac_forward(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+stream_init_mac_forward(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine mac_fwd_engine = {
        .fwd_mode_name  = "mac",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = stream_init_mac_forward,
        .packet_fwd     = pkt_burst_mac_forward,
 };
diff --git a/app/test-pmd/noisy_vnf.c b/app/test-pmd/noisy_vnf.c
index e4434bea95..58f53212a4 100644
--- a/app/test-pmd/noisy_vnf.c
+++ b/app/test-pmd/noisy_vnf.c
@@ -277,9 +277,22 @@ noisy_fwd_begin(portid_t pi)
        return 0;
 }
 
+static int
+stream_init_noisy_vnf(struct fwd_stream *fs)
+{
+       bool rx_stopped, tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = rx_stopped || tx_stopped;
+       return ret;
+}
+
 struct fwd_engine noisy_vnf_engine = {
        .fwd_mode_name  = "noisy",
        .port_fwd_begin = noisy_fwd_begin,
        .port_fwd_end   = noisy_fwd_end,
+       .stream_init    = stream_init_noisy_vnf,
        .packet_fwd     = pkt_burst_noisy_vnf,
 };
diff --git a/app/test-pmd/rxonly.c b/app/test-pmd/rxonly.c
index d1a579d8d8..945ea2d27a 100644
--- a/app/test-pmd/rxonly.c
+++ b/app/test-pmd/rxonly.c
@@ -68,9 +68,22 @@ pkt_burst_receive(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+stream_init_receive(struct fwd_stream *fs)
+{
+       bool rx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, NULL);
+       if (ret == 0)
+               fs->disabled = rx_stopped;
+       return ret;
+}
+
 struct fwd_engine rx_only_engine = {
        .fwd_mode_name  = "rxonly",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = stream_init_receive,
        .packet_fwd     = pkt_burst_receive,
 };
diff --git a/app/test-pmd/shared_rxq_fwd.c b/app/test-pmd/shared_rxq_fwd.c
index da54a383fd..9389df2627 100644
--- a/app/test-pmd/shared_rxq_fwd.c
+++ b/app/test-pmd/shared_rxq_fwd.c
@@ -107,9 +107,22 @@ shared_rxq_fwd(struct fwd_stream *fs)
        get_end_cycles(fs, start_tsc);
 }
 
+static int
+shared_rxq_stream_init(struct fwd_stream *fs)
+{
+       bool rx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, &rx_stopped, NULL);
+       if (ret == 0)
+               fs->disabled = rx_stopped;
+       return ret;
+}
+
 struct fwd_engine shared_rxq_engine = {
        .fwd_mode_name  = "shared_rxq",
        .port_fwd_begin = NULL,
        .port_fwd_end   = NULL,
+       .stream_init    = shared_rxq_stream_init,
        .packet_fwd     = shared_rxq_fwd,
 };
diff --git a/app/test-pmd/testpmd.c b/app/test-pmd/testpmd.c
index fe2ce19f99..b3e360121a 100644
--- a/app/test-pmd/testpmd.c
+++ b/app/test-pmd/testpmd.c
@@ -1763,6 +1763,37 @@ reconfig(portid_t new_port_id, unsigned socket_id)
        init_port_config();
 }
 
+int
+fwd_stream_get_stopped_queues(struct fwd_stream *fs, bool *rx, bool *tx)
+{
+       struct rte_eth_rxq_info rx_qinfo;
+       struct rte_eth_txq_info tx_qinfo;
+       int ret;
+
+       if (rx != NULL) {
+               ret = rte_eth_rx_queue_info_get(fs->rx_port, fs->rx_queue,
+                                               &rx_qinfo);
+               if (ret < 0) {
+                       RTE_LOG(ERR, USER1, "Cannot get port %d RX queue %d 
info: %s\n",
+                               fs->rx_port, fs->rx_queue,
+                               rte_strerror(rte_errno));
+                       return ret;
+               }
+               *rx = rx_qinfo.queue_state == RTE_ETH_QUEUE_STATE_STOPPED;
+       }
+       if (tx != NULL) {
+               ret = rte_eth_tx_queue_info_get(fs->tx_port, fs->tx_queue,
+                                               &tx_qinfo);
+               if (ret < 0) {
+                       TESTPMD_LOG(ERR, "Cannot get port %d TX queue %d info: 
%s\n",
+                                   fs->tx_port, fs->tx_queue,
+                                   rte_strerror(rte_errno));
+                       return ret;
+               }
+               *tx = tx_qinfo.queue_state == RTE_ETH_QUEUE_STATE_STOPPED;
+       }
+       return 0;
+}
 
 int
 init_fwd_streams(void)
@@ -2155,6 +2186,21 @@ flush_fwd_rx_queues(void)
        for (j = 0; j < 2; j++) {
                for (rxp = 0; rxp < cur_fwd_config.nb_fwd_ports; rxp++) {
                        for (rxq = 0; rxq < nb_rxq; rxq++) {
+                               struct rte_eth_rxq_info rx_qinfo;
+                               int ret;
+
+                               ret = rte_eth_rx_queue_info_get(rxp, rxq,
+                                                               &rx_qinfo);
+                               if (ret < 0) {
+                                       TESTPMD_LOG(ERR, "Cannot get port %d RX 
queue %d info: %s\n",
+                                                   rxp, rxq,
+                                                   rte_strerror(rte_errno));
+                                       return;
+                               }
+                               if (rx_qinfo.queue_state ==
+                                   RTE_ETH_QUEUE_STATE_STOPPED)
+                                       continue;
+
                                port_id = fwd_ports_ids[rxp];
                                /**
                                * testpmd can stuck in the below do while loop
@@ -2201,7 +2247,8 @@ run_pkt_fwd_on_lcore(struct fwd_lcore *fc, packet_fwd_t 
pkt_fwd)
        nb_fs = fc->stream_nb;
        do {
                for (sm_id = 0; sm_id < nb_fs; sm_id++)
-                       (*pkt_fwd)(fsm[sm_id]);
+                       if (!fsm[sm_id]->disabled)
+                               (*pkt_fwd)(fsm[sm_id]);
 #ifdef RTE_LIB_BITRATESTATS
                if (bitrate_enabled != 0 &&
                                bitrate_lcore_id == rte_lcore_id()) {
@@ -2283,6 +2330,7 @@ start_packet_forwarding(int with_tx_first)
 {
        port_fwd_begin_t port_fwd_begin;
        port_fwd_end_t  port_fwd_end;
+       stream_init_t stream_init = cur_fwd_eng->stream_init;
        unsigned int i;
 
        if (strcmp(cur_fwd_eng->fwd_mode_name, "rxonly") == 0 && !nb_rxq)
@@ -2313,6 +2361,13 @@ start_packet_forwarding(int with_tx_first)
        if (!pkt_fwd_shared_rxq_check())
                return;
 
+       if (stream_init != NULL)
+               for (i = 0; i < cur_fwd_config.nb_fwd_streams; i++)
+                       if (stream_init(fwd_streams[i]) < 0) {
+                               TESTPMD_LOG(ERR, "Cannot init stream\n");
+                               return;
+                       }
+
        port_fwd_begin = cur_fwd_config.fwd_eng->port_fwd_begin;
        if (port_fwd_begin != NULL) {
                for (i = 0; i < cur_fwd_config.nb_fwd_ports; i++) {
diff --git a/app/test-pmd/testpmd.h b/app/test-pmd/testpmd.h
index 31f766c965..59edae645e 100644
--- a/app/test-pmd/testpmd.h
+++ b/app/test-pmd/testpmd.h
@@ -134,6 +134,7 @@ struct fwd_stream {
        portid_t   tx_port;   /**< forwarding port of received packets */
        queueid_t  tx_queue;  /**< TX queue to send forwarded packets */
        streamid_t peer_addr; /**< index of peer ethernet address of packets */
+       bool       disabled;  /**< the stream is disabled and should not run */
 
        unsigned int retry_enabled;
 
@@ -323,12 +324,14 @@ struct fwd_lcore {
  */
 typedef int (*port_fwd_begin_t)(portid_t pi);
 typedef void (*port_fwd_end_t)(portid_t pi);
+typedef int (*stream_init_t)(struct fwd_stream *fs);
 typedef void (*packet_fwd_t)(struct fwd_stream *fs);
 
 struct fwd_engine {
        const char       *fwd_mode_name; /**< Forwarding mode name. */
        port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special to do. */
        port_fwd_end_t   port_fwd_end;   /**< NULL if nothing special to do. */
+       stream_init_t    stream_init;    /**< NULL if nothing special to do. */
        packet_fwd_t     packet_fwd;     /**< Mandatory. */
 };
 
@@ -887,6 +890,7 @@ void rxtx_config_display(void);
 void fwd_config_setup(void);
 void set_def_fwd_config(void);
 void reconfig(portid_t new_port_id, unsigned socket_id);
+int fwd_stream_get_stopped_queues(struct fwd_stream *fs, bool *rx, bool *tx);
 int init_fwd_streams(void);
 void update_fwd_ports(portid_t new_pid);
 
diff --git a/app/test-pmd/txonly.c b/app/test-pmd/txonly.c
index fc039a622c..1fa5238896 100644
--- a/app/test-pmd/txonly.c
+++ b/app/test-pmd/txonly.c
@@ -504,9 +504,22 @@ tx_only_begin(portid_t pi)
        return 0;
 }
 
+static int
+tx_only_stream_init(struct fwd_stream *fs)
+{
+       bool tx_stopped;
+       int ret;
+
+       ret = fwd_stream_get_stopped_queues(fs, NULL, &tx_stopped);
+       if (ret == 0)
+               fs->disabled = tx_stopped;
+       return ret;
+}
+
 struct fwd_engine tx_only_engine = {
        .fwd_mode_name  = "txonly",
        .port_fwd_begin = tx_only_begin,
        .port_fwd_end   = NULL,
+       .stream_init    = tx_only_stream_init,
        .packet_fwd     = pkt_burst_transmit,
 };
-- 
2.25.1

Reply via email to