Validate credit configuration.

If user tried to send 64 events, it will take 64 credits. Enqueue API
only gets 32 credits each time. If it does not have sufficient credits,
it just fails and returns. Enqueue API does not retry. On next enqueue,
it will get 32 more and send will work. This results in alternate
enqueues failing.

Add check to make sure DLB2_MAX_ENQUEUE_DEPTH <=
both DLB2_SW_CREDIT_QUANTA_DEFAULT and DLB2_SW_CREDIT_BATCH_SZ.

Add enough retires in the driver to satisfy max enqueue depth credits
based on set quanta. Note the credit quanta size is different for each
port. Retry count = Max enqueue depth / credit quanta in driver before
returning no credit.

Fixes: 3a6d0c04e7fb ("event/dlb2: add port setup")
Cc: sta...@dpdk.org

Signed-off-by: Timothy McDaniel <timothy.mcdan...@intel.com>
---
 drivers/event/dlb2/dlb2.c      | 78 +++++++++++++++++++++++++---------
 drivers/event/dlb2/dlb2_priv.h |  3 +-
 2 files changed, 60 insertions(+), 21 deletions(-)

diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c
index 36f07d0061..3641ed2942 100644
--- a/drivers/event/dlb2/dlb2.c
+++ b/drivers/event/dlb2/dlb2.c
@@ -387,6 +387,11 @@ set_sw_credit_quanta(const char *key __rte_unused,
        if (ret < 0)
                return ret;
 
+       if (*sw_credit_quanta <= 0) {
+               DLB2_LOG_ERR("sw_credit_quanta must be > 0\n");
+               return -EINVAL;
+       }
+
        return 0;
 }
 
@@ -1773,9 +1778,48 @@ dlb2_eventdev_port_setup(struct rte_eventdev *dev,
                return -EINVAL;
        }
 
+       /* Default for worker ports */
+       sw_credit_quanta = dlb2->sw_credit_quanta;
+       hw_credit_quanta = dlb2->hw_credit_quanta;
+
        ev_port->qm_port.is_directed = port_conf->event_port_cfg &
                RTE_EVENT_PORT_CFG_SINGLE_LINK;
 
+       /*
+        * Validate credit config before creating port
+        */
+
+       /* Default for worker ports */
+       sw_credit_quanta = dlb2->sw_credit_quanta;
+       hw_credit_quanta = dlb2->hw_credit_quanta;
+
+       if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_PRODUCER) {
+               /* Producer type ports. Mostly enqueue */
+               sw_credit_quanta = DLB2_SW_CREDIT_P_QUANTA_DEFAULT;
+               hw_credit_quanta = DLB2_SW_CREDIT_P_BATCH_SZ;
+       }
+       if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_CONSUMER) {
+               /* Consumer type ports. Mostly dequeue */
+               sw_credit_quanta = DLB2_SW_CREDIT_C_QUANTA_DEFAULT;
+               hw_credit_quanta = DLB2_SW_CREDIT_C_BATCH_SZ;
+       }
+       ev_port->credit_update_quanta = sw_credit_quanta;
+       ev_port->qm_port.hw_credit_quanta = hw_credit_quanta;
+
+       if (port_conf->enqueue_depth > sw_credit_quanta ||
+           port_conf->enqueue_depth > hw_credit_quanta) {
+               DLB2_LOG_ERR("Invalid port config. Enqueue depth %d must be <= 
credit quanta %d and batch size %d\n",
+                            port_conf->enqueue_depth,
+                            sw_credit_quanta,
+                            hw_credit_quanta);
+               return -EINVAL;
+       }
+       ev_port->enq_retries = port_conf->enqueue_depth / sw_credit_quanta;
+
+       /*
+        * Create port
+        */
+
        if (!ev_port->qm_port.is_directed) {
                ret = dlb2_hw_create_ldb_port(dlb2,
                                              ev_port,
@@ -1811,23 +1855,6 @@ dlb2_eventdev_port_setup(struct rte_eventdev *dev,
        ev_port->inflight_credits = 0;
        ev_port->dlb2 = dlb2; /* reverse link */
 
-       /* Default for worker ports */
-       sw_credit_quanta = dlb2->sw_credit_quanta;
-       hw_credit_quanta = dlb2->hw_credit_quanta;
-
-       if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_PRODUCER) {
-               /* Producer type ports. Mostly enqueue */
-               sw_credit_quanta = DLB2_SW_CREDIT_P_QUANTA_DEFAULT;
-               hw_credit_quanta = DLB2_SW_CREDIT_P_BATCH_SZ;
-       }
-       if (port_conf->event_port_cfg & RTE_EVENT_PORT_CFG_HINT_CONSUMER) {
-               /* Consumer type ports. Mostly dequeue */
-               sw_credit_quanta = DLB2_SW_CREDIT_C_QUANTA_DEFAULT;
-               hw_credit_quanta = DLB2_SW_CREDIT_C_BATCH_SZ;
-       }
-       ev_port->credit_update_quanta = sw_credit_quanta;
-       ev_port->qm_port.hw_credit_quanta = hw_credit_quanta;
-
        /* Tear down pre-existing port->queue links */
        if (dlb2->run_state == DLB2_RUN_STATE_STOPPED)
                dlb2_port_link_teardown(dlb2, &dlb2->ev_ports[ev_port_id]);
@@ -2970,6 +2997,7 @@ __dlb2_event_enqueue_burst(void *event_port,
        struct dlb2_eventdev_port *ev_port = event_port;
        struct dlb2_port *qm_port = &ev_port->qm_port;
        struct process_local_port_data *port_data;
+       int retries = ev_port->enq_retries;
        int i;
 
        RTE_ASSERT(ev_port->enq_configured);
@@ -2993,6 +3021,7 @@ __dlb2_event_enqueue_burst(void *event_port,
                for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
                        const struct rte_event *ev = &events[i + j];
                        int16_t thresh = qm_port->token_pop_thresh;
+                       int ret;
 
                        if (use_delayed &&
                            qm_port->token_pop_mode == DELAYED_POP &&
@@ -3014,9 +3043,18 @@ __dlb2_event_enqueue_burst(void *event_port,
                                break;
                        }
 
-                       if (dlb2_event_enqueue_prep(ev_port, qm_port, ev,
-                                                   &sched_types[j],
-                                                   &queue_ids[j]))
+                       /*
+                        * Retry if insufficient credits
+                        */
+                       do {
+                               ret = dlb2_event_enqueue_prep(ev_port,
+                                                             qm_port,
+                                                             ev,
+                                                             &sched_types[j],
+                                                             &queue_ids[j]);
+                       } while ((ret == -ENOSPC) && (retries-- > 0));
+
+                       if (ret != 0)
                                break;
                }
 
diff --git a/drivers/event/dlb2/dlb2_priv.h b/drivers/event/dlb2/dlb2_priv.h
index 3e47e4776b..4a06d649ab 100644
--- a/drivers/event/dlb2/dlb2_priv.h
+++ b/drivers/event/dlb2/dlb2_priv.h
@@ -114,7 +114,7 @@
 
 #define DLB2_NUM_QES_PER_CACHE_LINE 4
 
-#define DLB2_MAX_ENQUEUE_DEPTH 64
+#define DLB2_MAX_ENQUEUE_DEPTH 32
 #define DLB2_MIN_ENQUEUE_DEPTH 4
 
 #define DLB2_NAME_SIZE 64
@@ -519,6 +519,7 @@ struct dlb2_eventdev_port {
         */
        uint16_t outstanding_releases;
        uint16_t inflight_max; /* app requested max inflights for this port */
+       int enq_retries; /* Number of attempts before ret ENOSPC */
        /* setup_done is set when the event port is setup */
        bool setup_done;
        /* enq_configured is set when the qm port is created */
-- 
2.25.1

Reply via email to