> -----Original Message-----
> From: dev <dev-boun...@dpdk.org> On Behalf Of Timothy McDaniel
> Sent: Wednesday, November 11, 2020 3:27 PM
> Cc: dev@dpdk.org; Carrillo, Erik G <erik.g.carri...@intel.com>; Eads, Gage
> <gage.e...@intel.com>; Van Haaren, Harry <harry.van.haa...@intel.com>;
> jer...@marvell.com; tho...@monjalon.net; david.march...@redhat.com
> Subject: [dpdk-dev] [PATCH] event/dlb2: add missing delayed token pop logic
> 
> The code contained in this commit was inadvertently omitted when dissecting
> the dlb2 code base into discrete patches for upstream.
> 
> Signed-off-by: Timothy McDaniel <timothy.mcdan...@intel.com>
> ---
>  drivers/event/dlb2/dlb2.c          | 314 
> +++++++++++++++++++++++--------------
>  drivers/event/dlb2/dlb2_selftest.c |   4 +-
>  2 files changed, 201 insertions(+), 117 deletions(-)
> 
> diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c index
> d42e48b..8672486 100644
> --- a/drivers/event/dlb2/dlb2.c
> +++ b/drivers/event/dlb2/dlb2.c
> @@ -1082,6 +1082,25 @@ dlb2_init_qe_mem(struct dlb2_port *qm_port, char
> *mz_name)
>       return ret;
>  }
> 
> +static inline uint16_t
> +dlb2_event_enqueue_delayed(void *event_port,
> +                        const struct rte_event events[]);
> +
> +static inline uint16_t
> +dlb2_event_enqueue_burst_delayed(void *event_port,
> +                              const struct rte_event events[],
> +                              uint16_t num);
> +
> +static inline uint16_t
> +dlb2_event_enqueue_new_burst_delayed(void *event_port,
> +                                  const struct rte_event events[],
> +                                  uint16_t num);
> +
> +static inline uint16_t
> +dlb2_event_enqueue_forward_burst_delayed(void *event_port,
> +                                      const struct rte_event events[],
> +                                      uint16_t num);
> +
>  static int
>  dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
>                       struct dlb2_eventdev_port *ev_port,
> @@ -1198,6 +1217,20 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev
> *dlb2,
> 
>       qm_port->dequeue_depth = dequeue_depth;
>       qm_port->token_pop_thresh = dequeue_depth;
> +
> +     /* The default enqueue functions do not include delayed-pop support
> for
> +      * performance reasons.
> +      */
> +     if (qm_port->token_pop_mode == DELAYED_POP) {
> +             dlb2->event_dev->enqueue = dlb2_event_enqueue_delayed;
> +             dlb2->event_dev->enqueue_burst =
> +                     dlb2_event_enqueue_burst_delayed;
> +             dlb2->event_dev->enqueue_new_burst =
> +                     dlb2_event_enqueue_new_burst_delayed;
> +             dlb2->event_dev->enqueue_forward_burst =
> +                     dlb2_event_enqueue_forward_burst_delayed;
> +     }
> +
>       qm_port->owed_tokens = 0;
>       qm_port->issued_releases = 0;
> 
> @@ -2427,11 +2460,6 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
>       case 3:
>       case 2:
>       case 1:
> -             /* At least one QE will be valid, so only zero out three */
> -             qe[1].cmd_byte = 0;
> -             qe[2].cmd_byte = 0;
> -             qe[3].cmd_byte = 0;
> -
>               for (i = 0; i < num; i++) {
>                       qe[i].cmd_byte =
>                               cmd_byte_map[qm_port-
> >is_directed][ev[i].op];
> @@ -2452,6 +2480,8 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
>                       qe[i].u.event_type.sub = ev[i].sub_event_type;
>               }
>               break;
> +     case 0:
> +             break;
>       }
>  }
> 
> @@ -2578,29 +2608,57 @@ dlb2_event_enqueue_prep(struct
> dlb2_eventdev_port *ev_port,  }
> 
>  static inline uint16_t
> -dlb2_event_enqueue_burst(void *event_port,
> -                      const struct rte_event events[],
> -                      uint16_t num)
> +__dlb2_event_enqueue_burst(void *event_port,
> +                        const struct rte_event events[],
> +                        uint16_t num,
> +                        bool use_delayed)
>  {
>       struct dlb2_eventdev_port *ev_port = event_port;
>       struct dlb2_port *qm_port = &ev_port->qm_port;
>       struct process_local_port_data *port_data;
> -     int i, cnt;
> +     int i;
> 
>       RTE_ASSERT(ev_port->enq_configured);
>       RTE_ASSERT(events != NULL);
> 
> -     cnt = 0;
> +     i = 0;
> 
>       port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
> 
> -     for (i = 0; i < num; i += DLB2_NUM_QES_PER_CACHE_LINE) {
> +     while (i < num) {
>               uint8_t sched_types[DLB2_NUM_QES_PER_CACHE_LINE];
>               uint8_t queue_ids[DLB2_NUM_QES_PER_CACHE_LINE];
> +             int pop_offs = 0;
>               int j = 0;
> 
> +             memset(qm_port->qe4,
> +                    0,
> +                    DLB2_NUM_QES_PER_CACHE_LINE *
> +                    sizeof(struct dlb2_enqueue_qe));
> +
>               for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < num;
> j++) {
>                       const struct rte_event *ev = &events[i + j];
> +                     int16_t thresh = qm_port->token_pop_thresh;
> +
> +                     if (use_delayed &&
> +                         qm_port->token_pop_mode == DELAYED_POP &&
> +                         (ev->op == RTE_EVENT_OP_FORWARD ||
> +                          ev->op == RTE_EVENT_OP_RELEASE) &&
> +                         qm_port->issued_releases >= thresh - 1) {
> +                             /* Insert the token pop QE and break out. This
> +                              * may result in a partial HCW, but that is
> +                              * simpler than supporting arbitrary QE
> +                              * insertion.
> +                              */
> +                             dlb2_construct_token_pop_qe(qm_port, j);
> +
> +                             /* Reset the releases for the next QE batch */
> +                             qm_port->issued_releases -= thresh;
> +
> +                             pop_offs = 1;
> +                             j++;
> +                             break;
> +                     }
> 
>                       if (dlb2_event_enqueue_prep(ev_port, qm_port, ev,
>                                                   &sched_types[j],
> @@ -2611,38 +2669,52 @@ dlb2_event_enqueue_burst(void *event_port,
>               if (j == 0)
>                       break;
> 
> -             dlb2_event_build_hcws(qm_port, &events[i], j,
> +             dlb2_event_build_hcws(qm_port, &events[i], j - pop_offs,
>                                     sched_types, queue_ids);
> 
> -             if (qm_port->token_pop_mode == DELAYED_POP && j < 4 &&
> -                 qm_port->issued_releases >= qm_port->token_pop_thresh -
> 1) {
> -                     dlb2_construct_token_pop_qe(qm_port, j);
> -
> -                     /* Reset the releases counter for the next QE batch */
> -                     qm_port->issued_releases -= qm_port-
> >token_pop_thresh;
> -             }
> -
>               dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
> 
> -             cnt += j;
> +             /* Don't include the token pop QE in the enqueue count */
> +             i += j - pop_offs;
> 
> -             if (j < DLB2_NUM_QES_PER_CACHE_LINE)
> +             /* Don't interpret j < DLB2_NUM_... as out-of-credits if
> +              * pop_offs != 0
> +              */
> +             if (j < DLB2_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
>                       break;
>       }
> 
> -     if (qm_port->token_pop_mode == DELAYED_POP &&
> -         qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
> -             dlb2_consume_qe_immediate(qm_port, qm_port-
> >owed_tokens);
> -             qm_port->issued_releases -= qm_port->token_pop_thresh;
> -     }
> -     return cnt;
> +     return i;
> +}
> +
> +static uint16_t
> +dlb2_event_enqueue_burst(void *event_port,
> +                          const struct rte_event events[],
> +                          uint16_t num)
> +{
> +     return __dlb2_event_enqueue_burst(event_port, events, num, false); }
> +
> +static uint16_t
> +dlb2_event_enqueue_burst_delayed(void *event_port,
> +                                  const struct rte_event events[],
> +                                  uint16_t num)
> +{
> +     return __dlb2_event_enqueue_burst(event_port, events, num, true);
>  }
> 
>  static inline uint16_t
>  dlb2_event_enqueue(void *event_port,
>                  const struct rte_event events[])
>  {
> -     return dlb2_event_enqueue_burst(event_port, events, 1);
> +     return __dlb2_event_enqueue_burst(event_port, events, 1, false); }
> +
> +static inline uint16_t
> +dlb2_event_enqueue_delayed(void *event_port,
> +                        const struct rte_event events[])
> +{
> +     return __dlb2_event_enqueue_burst(event_port, events, 1, true);
>  }
> 
>  static uint16_t
> @@ -2650,7 +2722,15 @@ dlb2_event_enqueue_new_burst(void *event_port,
>                            const struct rte_event events[],
>                            uint16_t num)
>  {
> -     return dlb2_event_enqueue_burst(event_port, events, num);
> +     return __dlb2_event_enqueue_burst(event_port, events, num, false); }
> +
> +static uint16_t
> +dlb2_event_enqueue_new_burst_delayed(void *event_port,
> +                                  const struct rte_event events[],
> +                                  uint16_t num)
> +{
> +     return __dlb2_event_enqueue_burst(event_port, events, num, true);
>  }
> 
>  static uint16_t
> @@ -2658,7 +2738,93 @@ dlb2_event_enqueue_forward_burst(void
> *event_port,
>                                const struct rte_event events[],
>                                uint16_t num)
>  {
> -     return dlb2_event_enqueue_burst(event_port, events, num);
> +     return __dlb2_event_enqueue_burst(event_port, events, num, false); }
> +
> +static uint16_t
> +dlb2_event_enqueue_forward_burst_delayed(void *event_port,
> +                                      const struct rte_event events[],
> +                                      uint16_t num)
> +{
> +     return __dlb2_event_enqueue_burst(event_port, events, num, true); }
> +
> +static void
> +dlb2_event_release(struct dlb2_eventdev *dlb2,
> +                uint8_t port_id,
> +                int n)
> +{
> +     struct process_local_port_data *port_data;
> +     struct dlb2_eventdev_port *ev_port;
> +     struct dlb2_port *qm_port;
> +     int i;
> +
> +     if (port_id > dlb2->num_ports) {
> +             DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
> +                          port_id);
> +             rte_errno = -EINVAL;
> +             return;
> +     }
> +
> +     ev_port = &dlb2->ev_ports[port_id];
> +     qm_port = &ev_port->qm_port;
> +     port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
> +
> +     i = 0;
> +
> +     if (qm_port->is_directed) {
> +             i = n;
> +             goto sw_credit_update;
> +     }
> +
> +     while (i < n) {
> +             int pop_offs = 0;
> +             int j = 0;
> +
> +             /* Zero-out QEs */
> +             qm_port->qe4[0].cmd_byte = 0;
> +             qm_port->qe4[1].cmd_byte = 0;
> +             qm_port->qe4[2].cmd_byte = 0;
> +             qm_port->qe4[3].cmd_byte = 0;
> +
> +             for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
> +                     int16_t thresh = qm_port->token_pop_thresh;
> +
> +                     if (qm_port->token_pop_mode == DELAYED_POP &&
> +                         qm_port->issued_releases >= thresh - 1) {
> +                             /* Insert the token pop QE */
> +                             dlb2_construct_token_pop_qe(qm_port, j);
> +
> +                             /* Reset the releases for the next QE batch */
> +                             qm_port->issued_releases -= thresh;
> +
> +                             pop_offs = 1;
> +                             j++;
> +                             break;
> +                     }
> +
> +                     qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
> +                     qm_port->issued_releases++;
> +             }
> +
> +             dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
> +
> +             /* Don't include the token pop QE in the release count */
> +             i += j - pop_offs;
> +     }
> +
> +sw_credit_update:
> +     /* each release returns one credit */
> +     if (!ev_port->outstanding_releases) {
> +             DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
> +                          __func__);
> +             return;
> +     }
> +     ev_port->outstanding_releases -= i;
> +     ev_port->inflight_credits += i;
> +
> +     /* Replenish s/w credits if enough releases are performed */
> +     dlb2_replenish_sw_credits(dlb2, ev_port);
>  }
> 
>  static inline void
> @@ -3067,86 +3233,6 @@ dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
>       qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;  }
> 
> -static int
> -dlb2_event_release(struct dlb2_eventdev *dlb2,
> -                uint8_t port_id,
> -                int n)
> -{
> -     struct process_local_port_data *port_data;
> -     struct dlb2_eventdev_port *ev_port;
> -     struct dlb2_port *qm_port;
> -     int i, cnt;
> -
> -     if (port_id > dlb2->num_ports) {
> -             DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
> -                          port_id);
> -             rte_errno = -EINVAL;
> -             return rte_errno;
> -     }
> -
> -     ev_port = &dlb2->ev_ports[port_id];
> -     qm_port = &ev_port->qm_port;
> -     port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
> -
> -     cnt = 0;
> -
> -     if (qm_port->is_directed) {
> -             cnt = n;
> -             goto sw_credit_update;
> -     }
> -
> -     for (i = 0; i < n; i += DLB2_NUM_QES_PER_CACHE_LINE) {
> -             int j;
> -
> -             /* Zero-out QEs */
> -             qm_port->qe4[0].cmd_byte = 0;
> -             qm_port->qe4[1].cmd_byte = 0;
> -             qm_port->qe4[2].cmd_byte = 0;
> -             qm_port->qe4[3].cmd_byte = 0;
> -
> -             for (j = 0; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n;
> j++)
> -                     qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
> -
> -             qm_port->issued_releases += j;
> -
> -             if (j == 0)
> -                     break;
> -
> -             if (qm_port->token_pop_mode == DELAYED_POP && j < 4 &&
> -                 qm_port->issued_releases >= qm_port->token_pop_thresh -
> 1) {
> -                     dlb2_construct_token_pop_qe(qm_port, j);
> -
> -                     /* Reset the releases counter for the next QE batch */
> -                     qm_port->issued_releases -= qm_port-
> >token_pop_thresh;
> -             }
> -
> -             dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
> -
> -             cnt += j;
> -     }
> -
> -     if (qm_port->token_pop_mode == DELAYED_POP &&
> -         qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
> -             dlb2_consume_qe_immediate(qm_port, qm_port-
> >owed_tokens);
> -             qm_port->issued_releases -= qm_port->token_pop_thresh;
> -     }
> -
> -sw_credit_update:
> -     /* each release returns one credit */
> -     if (!ev_port->outstanding_releases) {
> -             DLB2_LOG_ERR("Unrecoverable application error. Outstanding
> releases underflowed.\n");
> -             rte_errno = -ENOTRECOVERABLE;
> -             return rte_errno;
> -     }
> -
> -     ev_port->outstanding_releases -= cnt;
> -     ev_port->inflight_credits += cnt;
> -
> -     /* Replenish s/w credits if enough releases are performed */
> -     dlb2_replenish_sw_credits(dlb2, ev_port);
> -     return 0;
> -}
> -
>  static inline int16_t
>  dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
>                      struct dlb2_eventdev_port *ev_port, @@ -3367,8 +3453,7
> @@ dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev,
> uint16_t num,
>       if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
>               uint16_t out_rels = ev_port->outstanding_releases;
> 
> -             if (dlb2_event_release(dlb2, ev_port->id, out_rels))
> -                     return 0; /* rte_errno is set */
> +             dlb2_event_release(dlb2, ev_port->id, out_rels);
> 
>               DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
>       }
> @@ -3405,8 +3490,7 @@ dlb2_event_dequeue_burst_sparse(void *event_port,
> struct rte_event *ev,
>       if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
>               uint16_t out_rels = ev_port->outstanding_releases;
> 
> -             if (dlb2_event_release(dlb2, ev_port->id, out_rels))
> -                     return 0; /* rte_errno is set */
> +             dlb2_event_release(dlb2, ev_port->id, out_rels);
> 
>               DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
>       }
> diff --git a/drivers/event/dlb2/dlb2_selftest.c
> b/drivers/event/dlb2/dlb2_selftest.c
> index f433654..5cf66c5 100644
> --- a/drivers/event/dlb2/dlb2_selftest.c
> +++ b/drivers/event/dlb2/dlb2_selftest.c
> @@ -1320,7 +1320,7 @@ test_delayed_pop(void)
>               }
>       }
> 
> -     /* Dequeue dequeue_depth events but only release dequeue_depth - 2.
> +     /* Dequeue dequeue_depth events but only release dequeue_depth - 1.
>        * Delayed pop won't perform the pop and no more events will be
>        * scheduled.
>        */
> @@ -1336,7 +1336,7 @@ test_delayed_pop(void)
> 
>       ev.op = RTE_EVENT_OP_RELEASE;
> 
> -     for (i = 0; i < port_conf.dequeue_depth - 2; i++) {
> +     for (i = 0; i < port_conf.dequeue_depth - 1; i++) {
>               if (rte_event_enqueue_burst(evdev, 0, &ev, 1) != 1) {
>                       printf("%d: RELEASE enqueue expected to succeed\n",
>                              __LINE__);
> --
> 2.6.4

Looks good, and it fixed the delayed token pop issue in QA tests.

Reviewed-by: Mike Ximing Chen <mike.ximing.c...@intel.com>

Reply via email to