By enabling shared Rx queue, received packets come from all member ports
in same shared Rx queue.

This patch adds a common forwarding function for shared Rx queue, groups
source forwarding stream by looking up local streams on current lcore
with packet source port(mbuf->port) and queue, then invokes callback to
handle received packets for source stream.

Signed-off-by: Xueming Li <xuemi...@nvidia.com>
---
 app/test-pmd/testpmd.c | 69 ++++++++++++++++++++++++++++++++++++++++++
 app/test-pmd/testpmd.h |  4 +++
 2 files changed, 73 insertions(+)

diff --git a/app/test-pmd/testpmd.c b/app/test-pmd/testpmd.c
index d941bd982e..f46bd97948 100644
--- a/app/test-pmd/testpmd.c
+++ b/app/test-pmd/testpmd.c
@@ -2034,6 +2034,75 @@ flush_fwd_rx_queues(void)
        }
 }
 
+/**
+ * Get packet source stream by source port and queue.
+ * All streams of same shared Rx queue locates on same core.
+ */
+static struct fwd_stream *
+forward_stream_get(struct fwd_stream *fs, uint16_t port)
+{
+       streamid_t sm_id;
+       struct fwd_lcore *fc;
+       struct fwd_stream **fsm;
+       streamid_t nb_fs;
+
+       fc = fs->lcore;
+       fsm = &fwd_streams[fc->stream_idx];
+       nb_fs = fc->stream_nb;
+       for (sm_id = 0; sm_id < nb_fs; sm_id++) {
+               if (fsm[sm_id]->rx_port == port &&
+                   fsm[sm_id]->rx_queue == fs->rx_queue)
+                       return fsm[sm_id];
+       }
+       return NULL;
+}
+
+/**
+ * Forward packet by source port and queue.
+ */
+static void
+forward_by_port(struct fwd_stream *src_fs, uint16_t port, uint16_t nb_rx,
+               struct rte_mbuf **pkts, packet_fwd_cb fwd)
+{
+       struct fwd_stream *fs = forward_stream_get(src_fs, port);
+
+       if (fs != NULL) {
+               fs->rx_packets += nb_rx;
+               fwd(fs, nb_rx, pkts);
+       } else {
+               /* Source stream not found, drop all packets. */
+               src_fs->fwd_dropped += nb_rx;
+               while (nb_rx > 0)
+                       rte_pktmbuf_free(pkts[--nb_rx]);
+       }
+}
+
+/**
+ * Forward packets from shared Rx queue.
+ *
+ * Source port of packets are identified by mbuf->port.
+ */
+void
+forward_shared_rxq(struct fwd_stream *fs, uint16_t nb_rx,
+                  struct rte_mbuf **pkts_burst, packet_fwd_cb fwd)
+{
+       uint16_t i, nb_fs_rx = 1, port;
+
+       /* Locate real source fs according to mbuf->port. */
+       for (i = 0; i < nb_rx; ++i) {
+               rte_prefetch0(pkts_burst[i + 1]);
+               port = pkts_burst[i]->port;
+               if (i + 1 == nb_rx || pkts_burst[i + 1]->port != port) {
+                       /* Forward packets with same source port. */
+                       forward_by_port(fs, port, nb_fs_rx,
+                                       &pkts_burst[i + 1 - nb_fs_rx], fwd);
+                       nb_fs_rx = 1;
+               } else {
+                       nb_fs_rx++;
+               }
+       }
+}
+
 static void
 run_pkt_fwd_on_lcore(struct fwd_lcore *fc, packet_fwd_t pkt_fwd)
 {
diff --git a/app/test-pmd/testpmd.h b/app/test-pmd/testpmd.h
index 6497c56359..13141dfed9 100644
--- a/app/test-pmd/testpmd.h
+++ b/app/test-pmd/testpmd.h
@@ -272,6 +272,8 @@ struct fwd_lcore {
 typedef void (*port_fwd_begin_t)(portid_t pi);
 typedef void (*port_fwd_end_t)(portid_t pi);
 typedef void (*packet_fwd_t)(struct fwd_stream *fs);
+typedef void (*packet_fwd_cb)(struct fwd_stream *fs, uint16_t nb_rx,
+                             struct rte_mbuf **pkts);
 
 struct fwd_engine {
        const char       *fwd_mode_name; /**< Forwarding mode name. */
@@ -897,6 +899,8 @@ char *list_pkt_forwarding_modes(void);
 char *list_pkt_forwarding_retry_modes(void);
 void set_pkt_forwarding_mode(const char *fwd_mode);
 void start_packet_forwarding(int with_tx_first);
+void forward_shared_rxq(struct fwd_stream *fs, uint16_t nb_rx,
+                       struct rte_mbuf **pkts_burst, packet_fwd_cb fwd);
 void fwd_stats_display(void);
 void fwd_stats_reset(void);
 void stop_packet_forwarding(void);
-- 
2.25.1

Reply via email to