Acked-by: Abhinandan Gujjar <abhinandan.guj...@intel.com>
> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundap...@intel.com>
> Sent: Thursday, February 10, 2022 11:11 PM
> To: Jayatheerthan, Jay <jay.jayatheert...@intel.com>;
> jerinjac...@gmail.com; Gujjar, Abhinandan S
> <abhinandan.guj...@intel.com>; dev@dpdk.org
> Subject: [PATCH v5 1/2] eventdev/crypto_adapter: move crypto ops to
> circular buffer
>
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
>
> Signed-off-by: Ganapati Kundapura <ganapati.kundap...@intel.com>
>
> ---
> v5:
> * Add branch prediction to if conditions
>
> v4:
> * Retain the non enqueued crypto ops in circular buffer to
> process later and stop the dequeue from eventdev till
> all the crypto ops are enqueued to cryptodev
>
> check space in circular buffer and stop dequeue from
> eventdev if some ops failed to flush to cdev
> and no space for another batch is available in circular buffer
>
> Enable dequeue from eventdev after all the ops are flushed
>
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
> ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
> and add to cirular buffer for later processing.
>
> v2:
> * reset crypto adapter next cdev id before dequeueing from the
> next cdev
> ---
>
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..0b484f3 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
> #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
> #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
>
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
> /* Flush an instance's enqueue buffers every
> CRYPTO_ENQ_FLUSH_THRESHOLD
> * iterations of eca_crypto_adapter_enq_run()
> */
> #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
>
> +struct crypto_ops_circular_buffer {
> + /* index of head element in circular buffer */
> + uint16_t head;
> + /* index of tail element in circular buffer */
> + uint16_t tail;
> + /* number of elements in buffer */
> + uint16_t count;
> + /* size of circular buffer */
> + uint16_t size;
> + /* Pointer to hold rte_crypto_ops for batching */
> + struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
> struct event_crypto_adapter {
> /* Event device identifier */
> uint8_t eventdev_id;
> @@ -37,6 +53,10 @@ struct event_crypto_adapter {
> uint8_t event_port_id;
> /* Store event device's implicit release capability */
> uint8_t implicit_release_disabled;
> + /* Flag to indicate backpressure at cryptodev
> + * Stop further dequeuing events from eventdev
> + */
> + bool stop_enq_to_cryptodev;
> /* Max crypto ops processed in any service function invocation */
> uint32_t max_nb;
> /* Lock to serialize config updates with service function */ @@ -47,6
> +67,8 @@ struct event_crypto_adapter {
> struct crypto_device_info *cdevs;
> /* Loop counter to flush crypto ops */
> uint16_t transmit_loop_count;
> + /* Circular buffer for batching crypto ops to eventdev */
> + struct crypto_ops_circular_buffer ebuf;
> /* Per instance stats structure */
> struct rte_event_crypto_adapter_stats crypto_stats;
> /* Configuration callback for rte_service configuration */ @@ -93,10
> +115,8 @@ struct crypto_device_info { struct crypto_queue_pair_info {
> /* Set to indicate queue pair is enabled */
> bool qp_enabled;
> - /* Pointer to hold rte_crypto_ops for batching */
> - struct rte_crypto_op **op_buffer;
> - /* No of crypto ops accumulated */
> - uint8_t len;
> + /* Circular buffer for batching crypto ops to cdev */
> + struct crypto_ops_circular_buffer cbuf;
> } __rte_cache_aligned;
>
> static struct event_crypto_adapter **event_crypto_adapter; @@ -141,6
> +161,84 @@ eca_init(void)
> return 0;
> }
>
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> + return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline bool
> +eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer
> +*bufp) {
> + return (bufp->size - bufp->count) >= BATCH_SIZE; }
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> + rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> + struct crypto_ops_circular_buffer *bufp,
> + uint16_t sz)
> +{
> + bufp->op_buffer = rte_zmalloc(name,
> + sizeof(struct rte_crypto_op *) * sz,
> + 0);
> + if (bufp->op_buffer == NULL)
> + return -ENOMEM;
> +
> + bufp->size = sz;
> + return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> + struct rte_crypto_op *op)
> +{
> + uint16_t *tailp = &bufp->tail;
> +
> + bufp->op_buffer[*tailp] = op;
> + /* circular buffer, go round */
> + *tailp = (*tailp + 1) % bufp->size;
> + bufp->count++;
> +
> + return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer
> *bufp,
> + uint8_t cdev_id, uint16_t qp_id,
> + uint16_t *nb_ops_flushed)
> +{
> + uint16_t n = 0;
> + uint16_t *headp = &bufp->head;
> + uint16_t *tailp = &bufp->tail;
> + struct rte_crypto_op **ops = bufp->op_buffer;
> +
> + if (*tailp > *headp)
> + n = *tailp - *headp;
> + else if (*tailp < *headp)
> + n = bufp->size - *headp;
> + else {
> + *nb_ops_flushed = 0;
> + return 0; /* buffer empty */
> + }
> +
> + *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> + &ops[*headp], n);
> + bufp->count -= *nb_ops_flushed;
> + if (!bufp->count) {
> + *headp = 0;
> + *tailp = 0;
> + } else
> + *headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> + return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
> static inline struct event_crypto_adapter * eca_id_to_adapter(uint8_t id) {
> @@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
> return -ENOMEM;
> }
>
> + if (eca_circular_buffer_init("eca_edev_circular_buffer",
> + &adapter->ebuf,
> + CRYPTO_ADAPTER_BUFFER_SZ)) {
> + RTE_EDEV_LOG_ERR("Failed to get memory for eventdev
> buffer");
> + rte_free(adapter);
> + return -ENOMEM;
> + }
> +
> ret = rte_event_dev_info_get(dev_id, &dev_info);
> if (ret < 0) {
> RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d:
> %s!",
> dev_id, dev_info.driver_name);
> + eca_circular_buffer_free(&adapter->ebuf);
> rte_free(adapter);
> return ret;
> }
> @@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
> socket_id);
> if (adapter->cdevs == NULL) {
> RTE_EDEV_LOG_ERR("Failed to get mem for crypto
> devices\n");
> + eca_circular_buffer_free(&adapter->ebuf);
> rte_free(adapter);
> return -ENOMEM;
> }
> @@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct
> event_crypto_adapter *adapter, struct rte_event *ev,
> struct crypto_queue_pair_info *qp_info = NULL;
> struct rte_crypto_op *crypto_op;
> unsigned int i, n;
> - uint16_t qp_id, len, ret;
> + uint16_t qp_id, nb_enqueued = 0;
> uint8_t cdev_id;
> + int ret;
>
> - len = 0;
> ret = 0;
> n = 0;
> stats->event_deq_count += cnt;
> @@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> rte_crypto_op_free(crypto_op);
> continue;
> }
> - len = qp_info->len;
> - qp_info->op_buffer[len] = crypto_op;
> - len++;
> + eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
> } else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
> crypto_op->private_data_offset) {
> m_data = (union rte_event_crypto_metadata *) @@
> -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> rte_crypto_op_free(crypto_op);
> continue;
> }
> - len = qp_info->len;
> - qp_info->op_buffer[len] = crypto_op;
> - len++;
> + eca_circular_buffer_add(&qp_info->cbuf,
> crypto_op);
> } else {
> rte_pktmbuf_free(crypto_op->sym->m_src);
> rte_crypto_op_free(crypto_op);
> continue;
> }
>
> - if (len == BATCH_SIZE) {
> - struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> - ret = rte_cryptodev_enqueue_burst(cdev_id,
> - qp_id,
> - op_buffer,
> - BATCH_SIZE);
> -
> - stats->crypto_enq_count += ret;
> -
> - while (ret < len) {
> - struct rte_crypto_op *op;
> - op = op_buffer[ret++];
> - stats->crypto_enq_fail++;
> - rte_pktmbuf_free(op->sym->m_src);
> - rte_crypto_op_free(op);
> - }
> -
> - len = 0;
> + if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> + ret = eca_circular_buffer_flush_to_cdev(&qp_info-
> >cbuf,
> + cdev_id,
> + qp_id,
> +
> &nb_enqueued);
> + /**
> + * If some crypto ops failed to flush to cdev and
> + * space for another batch is not available, stop
> + * dequeue from eventdev momentarily
> + */
> + if (unlikely(ret < 0 &&
> + !eca_circular_buffer_space_for_batch(
> + &qp_info->cbuf)))
> + adapter->stop_enq_to_cryptodev = true;
> }
>
> - if (qp_info)
> - qp_info->len = len;
> - n += ret;
> + stats->crypto_enq_count += nb_enqueued;
> + n += nb_enqueued;
> }
>
> return n;
> }
>
> static unsigned int
> -eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
> +eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
> + uint8_t cdev_id, uint16_t *nb_ops_flushed)
> {
> - struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> struct crypto_device_info *curr_dev;
> struct crypto_queue_pair_info *curr_queue;
> - struct rte_crypto_op **op_buffer;
> struct rte_cryptodev *dev;
> - uint8_t cdev_id;
> + uint16_t nb = 0, nb_enqueued = 0;
> uint16_t qp;
> - uint16_t ret;
> - uint16_t num_cdev = rte_cryptodev_count();
>
> - ret = 0;
> - for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
> - curr_dev = &adapter->cdevs[cdev_id];
> - dev = curr_dev->dev;
> - if (dev == NULL)
> - continue;
> - for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
> + curr_dev = &adapter->cdevs[cdev_id];
> + if (unlikely(curr_dev == NULL))
> + return 0;
>
> - curr_queue = &curr_dev->qpairs[qp];
> - if (!curr_queue->qp_enabled)
> - continue;
> + dev = rte_cryptodev_pmd_get_dev(cdev_id);
> + for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
>
> - op_buffer = curr_queue->op_buffer;
> - ret = rte_cryptodev_enqueue_burst(cdev_id,
> - qp,
> - op_buffer,
> - curr_queue->len);
> - stats->crypto_enq_count += ret;
> -
> - while (ret < curr_queue->len) {
> - struct rte_crypto_op *op;
> - op = op_buffer[ret++];
> - stats->crypto_enq_fail++;
> - rte_pktmbuf_free(op->sym->m_src);
> - rte_crypto_op_free(op);
> - }
> - curr_queue->len = 0;
> - }
> + curr_queue = &curr_dev->qpairs[qp];
> + if (unlikely(curr_queue == NULL || !curr_queue-
> >qp_enabled))
> + continue;
> +
> + eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> + cdev_id,
> + qp,
> + &nb_enqueued);
> + *nb_ops_flushed += curr_queue->cbuf.count;
> + nb += nb_enqueued;
> }
>
> - return ret;
> + return nb;
> +}
> +
> +static unsigned int
> +eca_crypto_enq_flush(struct event_crypto_adapter *adapter) {
> + struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> + uint8_t cdev_id;
> + uint16_t nb_enqueued = 0;
> + uint16_t nb_ops_flushed = 0;
> + uint16_t num_cdev = rte_cryptodev_count();
> +
> + for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
> + nb_enqueued += eca_crypto_cdev_flush(adapter,
> + cdev_id,
> + &nb_ops_flushed);
> + /**
> + * Enable dequeue from eventdev if all ops from circular
> + * buffer flushed to cdev
> + */
> + if (!nb_ops_flushed)
> + adapter->stop_enq_to_cryptodev = false;
> +
> + stats->crypto_enq_count += nb_enqueued;
> +
> + return nb_enqueued;
> }
>
> static int
> @@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
> if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
> return 0;
>
> + if (unlikely(adapter->stop_enq_to_cryptodev)) {
> + nb_enqueued += eca_crypto_enq_flush(adapter);
> +
> + if (unlikely(adapter->stop_enq_to_cryptodev))
> + goto skip_event_dequeue_burst;
> + }
> +
> for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
> stats->event_poll_count++;
> n = rte_event_dequeue_burst(event_dev_id,
> @@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
> nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
> }
>
> +skip_event_dequeue_burst:
> +
> if ((++adapter->transmit_loop_count &
> (CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
> nb_enqueued += eca_crypto_enq_flush(adapter); @@ -
> 499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
> return nb_enqueued;
> }
>
> -static inline void
> +static inline uint16_t
> eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> - struct rte_crypto_op **ops, uint16_t num)
> + struct rte_crypto_op **ops, uint16_t num)
> {
> struct rte_event_crypto_adapter_stats *stats = &adapter-
> >crypto_stats;
> union rte_event_crypto_metadata *m_data = NULL; @@ -518,6
> +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> num = RTE_MIN(num, BATCH_SIZE);
> for (i = 0; i < num; i++) {
> struct rte_event *ev = &events[nb_ev++];
> +
> + m_data = NULL;
> if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
> m_data =
> rte_cryptodev_sym_session_get_user_data(
> ops[i]->sym->session);
> @@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct
> event_crypto_adapter *adapter,
> event_port_id,
> &events[nb_enqueued],
> nb_ev - nb_enqueued);
> +
> } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
> nb_enqueued < nb_ev);
>
> - /* Free mbufs and rte_crypto_ops for failed events */
> - for (i = nb_enqueued; i < nb_ev; i++) {
> - struct rte_crypto_op *op = events[i].event_ptr;
> - rte_pktmbuf_free(op->sym->m_src);
> - rte_crypto_op_free(op);
> - }
> -
> stats->event_enq_fail_count += nb_ev - nb_enqueued;
> stats->event_enq_count += nb_enqueued;
> stats->event_enq_retry_count += retry - 1;
> +
> + return nb_enqueued;
> +}
> +
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter
> *adapter,
> + struct crypto_ops_circular_buffer *bufp) {
> + uint16_t n = 0, nb_ops_flushed;
> + uint16_t *headp = &bufp->head;
> + uint16_t *tailp = &bufp->tail;
> + struct rte_crypto_op **ops = bufp->op_buffer;
> +
> + if (*tailp > *headp)
> + n = *tailp - *headp;
> + else if (*tailp < *headp)
> + n = bufp->size - *headp;
> + else
> + return 0; /* buffer empty */
> +
> + nb_ops_flushed = eca_ops_enqueue_burst(adapter, ops, n);
> + bufp->count -= nb_ops_flushed;
> + if (!bufp->count) {
> + *headp = 0;
> + *tailp = 0;
> + return 0; /* buffer empty */
> + }
> +
> + *headp = (*headp + nb_ops_flushed) % bufp->size;
> + return 1;
> }
>
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> + if (likely(adapter->ebuf.count == 0))
> + return;
> +
> + while (eca_circular_buffer_flush_to_evdev(adapter,
> + &adapter->ebuf))
> + ;
> +}
> static inline unsigned int
> eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
> unsigned int max_deq)
> @@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> struct crypto_device_info *curr_dev;
> struct crypto_queue_pair_info *curr_queue;
> struct rte_crypto_op *ops[BATCH_SIZE];
> - uint16_t n, nb_deq;
> + uint16_t n, nb_deq, nb_enqueued, i;
> struct rte_cryptodev *dev;
> uint8_t cdev_id;
> uint16_t qp, dev_qps;
> @@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> uint16_t num_cdev = rte_cryptodev_count();
>
> nb_deq = 0;
> + eca_ops_buffer_flush(adapter);
> +
> do {
> - uint16_t queues = 0;
> done = true;
>
> for (cdev_id = adapter->next_cdev_id;
> cdev_id < num_cdev; cdev_id++) {
> + uint16_t queues = 0;
> +
> curr_dev = &adapter->cdevs[cdev_id];
> dev = curr_dev->dev;
> - if (dev == NULL)
> + if (unlikely(dev == NULL))
> continue;
> +
> dev_qps = dev->data->nb_queue_pairs;
>
> for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
> queues++) {
>
> curr_queue = &curr_dev->qpairs[qp];
> - if (!curr_queue->qp_enabled)
> + if (unlikely(curr_queue == NULL ||
> + !curr_queue->qp_enabled))
> continue;
>
> n = rte_cryptodev_dequeue_burst(cdev_id,
> qp, @@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> continue;
>
> done = false;
> + nb_enqueued = 0;
> +
> stats->crypto_deq_count += n;
> - eca_ops_enqueue_burst(adapter, ops, n);
> +
> + if (unlikely(!adapter->ebuf.count))
> + nb_enqueued =
> eca_ops_enqueue_burst(
> + adapter, ops, n);
> +
> + if (likely(nb_enqueued == n))
> + goto check;
> +
> + /* Failed to enqueue events case */
> + for (i = nb_enqueued; i < n; i++)
> + eca_circular_buffer_add(
> + &adapter->ebuf,
> + ops[nb_enqueued]);
> +
> +check:
> nb_deq += n;
>
> - if (nb_deq > max_deq) {
> + if (nb_deq >= max_deq) {
> if ((qp + 1) == dev_qps) {
> adapter->next_cdev_id =
> (cdev_id + 1)
> @@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
> }
> }
> }
> + adapter->next_cdev_id = 0;
> } while (done == false);
> return nb_deq;
> }
> @@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
> return -ENOMEM;
>
> qpairs = dev_info->qpairs;
> - qpairs->op_buffer = rte_zmalloc_socket(adapter-
> >mem_name,
> - BATCH_SIZE *
> - sizeof(struct rte_crypto_op *),
> - 0, adapter->socket_id);
> - if (!qpairs->op_buffer) {
> +
> + if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> + &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
> + RTE_EDEV_LOG_ERR("Failed to get memory for
> cryptodev "
> + "buffer");
> rte_free(qpairs);
> return -ENOMEM;
> }
> --
> 2.6.4