implement enqueue and dequeue apis

Signed-off-by: Sunila Sahu <sunila.s...@caviumnetworks.com>
Signed-off-by: Shally Verma <shally.ve...@caviumnetworks.com>
Signed-off-by: Ashish Gupta <ashish.gu...@caviumnetworks.com>
---
 drivers/compress/meson.build      |   2 +-
 drivers/compress/zlib/meson.build |  11 ++
 drivers/compress/zlib/zlib_pmd.c  | 275 +++++++++++++++++++++++++++++++++++++-
 3 files changed, 286 insertions(+), 2 deletions(-)

diff --git a/drivers/compress/meson.build b/drivers/compress/meson.build
index fb136e1..e4d5e5c 100644
--- a/drivers/compress/meson.build
+++ b/drivers/compress/meson.build
@@ -1,7 +1,7 @@
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2018 Intel Corporation
 
-drivers = ['isal']
+drivers = ['isal','zlib']
 
 std_deps = ['compressdev'] # compressdev pulls in all other needed deps
 config_flag_fmt = 'RTE_LIBRTE_@0@_PMD'
diff --git a/drivers/compress/zlib/meson.build 
b/drivers/compress/zlib/meson.build
new file mode 100644
index 0000000..d66de95
--- /dev/null
+++ b/drivers/compress/zlib/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Cavium Networks
+
+dep = dependency('zlib', required: false)
+if not dep.found()
+       build = false
+endif
+deps += 'bus_vdev'
+sources = files('zlib_pmd.c', 'zlib_pmd_ops.c')
+ext_deps += dep
+pkgconfig_extra_libs += '-lz'
diff --git a/drivers/compress/zlib/zlib_pmd.c b/drivers/compress/zlib/zlib_pmd.c
index 3dc71ec..e2681a7 100644
--- a/drivers/compress/zlib/zlib_pmd.c
+++ b/drivers/compress/zlib/zlib_pmd.c
@@ -17,7 +17,239 @@
 #include "zlib_pmd_private.h"
 
 static uint8_t compressdev_driver_id;
-int zlib_logtype_driver;
+
+/** compute the next mbuf in list and assign dst buffer and dlen,
+ * set op->status to appropriate flag if we run out of mbuf
+ */
+#define COMPUTE_DST_BUF(mbuf, dst, dlen)               \
+       ((mbuf = mbuf->next) ?                                          \
+               (dst = rte_pktmbuf_mtod(mbuf, uint8_t *)),              \
+               dlen = rte_pktmbuf_data_len(mbuf) :                     \
+                       !(op->status =                                  \
+                       ((op->op_type == RTE_COMP_OP_STATELESS) ?       \
+                       RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED :    \
+                       RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE)))
+
+static void
+process_zlib_deflate(struct rte_comp_op *op, z_stream *strm)
+{
+       int ret, flush, fin_flush;
+       uint8_t *src, *dst;
+       uint32_t sl, dl, have;
+       struct rte_mbuf *mbuf_src = op->m_src;
+       struct rte_mbuf *mbuf_dst = op->m_dst;
+
+       src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op->src.offset);
+
+       sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
+
+       dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *,
+                       op->dst.offset);
+
+       dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
+
+       if (unlikely(!src || !dst || !strm)) {
+               op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+               ZLIB_LOG_ERR("\nInvalid source or destination buffers");
+               return;
+       }
+       switch (op->flush_flag) {
+       case RTE_COMP_FLUSH_NONE:
+               fin_flush = Z_NO_FLUSH;
+               break;
+       case RTE_COMP_FLUSH_SYNC:
+               fin_flush = Z_SYNC_FLUSH;
+               break;
+       case RTE_COMP_FLUSH_FULL:
+               fin_flush = Z_FULL_FLUSH;
+               break;
+       case RTE_COMP_FLUSH_FINAL:
+               fin_flush = Z_FINISH;
+               break;
+       default:
+               op->status = RTE_COMP_OP_STATUS_ERROR;
+               goto def_end;
+       }
+       if (op->src.length <= sl) {
+               sl = op->src.length;
+               flush = fin_flush;
+       } else {
+               /* if there're more than one mbufs in input,
+                * process intermediate with NO_FLUSH
+                */
+               flush = Z_NO_FLUSH;
+       }
+       /* initialize status to SUCCESS */
+       op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+       do {
+               /* Update z_stream with the inputs provided by application */
+               strm->next_in = src;
+               strm->avail_in = sl;
+
+               do {
+                       strm->avail_out = dl;
+                       strm->next_out = dst;
+                       ret = deflate(strm, flush);
+                       if (unlikely(ret == Z_STREAM_ERROR)) {
+                               /* error return, do not process further */
+                               op->status =  RTE_COMP_OP_STATUS_ERROR;
+                               goto def_end;
+                       }
+                       /* Update op stats */
+                       op->produced += dl - strm->avail_out;
+                       op->consumed += sl - strm->avail_in;
+               /* Break if Z_STREAM_END is encountered or dst mbuf gets over */
+               } while (!(ret == Z_STREAM_END) && (strm->avail_out == 0) &&
+                               COMPUTE_DST_BUF(mbuf_dst, dst, dl));
+
+               /** Compress till the end of compressed blocks provided
+                * or till Z_FINISH
+                * Exit if op->status is not SUCCESS.
+                */
+               if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) ||
+                       (ret == Z_STREAM_END) ||
+                       op->consumed == op->src.length)
+                       goto def_end;
+
+               /** Update last output buffer with respect to availed space */
+               have = dl - strm->avail_out;
+               dst += have;
+               dl = strm->avail_out;
+               /** Update source buffer to next mbuf*/
+               mbuf_src = mbuf_src->next;
+               src = rte_pktmbuf_mtod(mbuf_src, uint8_t *);
+               sl = rte_pktmbuf_data_len(mbuf_src);
+
+               /** Last block to be compressed
+                * Update flush with value provided by app for last block,
+                * For stateless flush should be always Z_FINISH
+                */
+
+               if ((op->src.length - op->consumed) <= sl) {
+                       sl = (op->src.length - op->consumed);
+                       flush = fin_flush;
+               }
+
+       } while (1);
+def_end:
+       if (op->op_type == RTE_COMP_OP_STATELESS)
+               deflateReset(strm);
+}
+
+static void
+process_zlib_inflate(struct rte_comp_op *op, z_stream *strm)
+{
+       int ret, flush;
+       uint8_t *src, *dst;
+       uint32_t sl, dl, have;
+       struct rte_mbuf *mbuf_src = op->m_src;
+       struct rte_mbuf *mbuf_dst = op->m_dst;
+
+       src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op->src.offset);
+
+       sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset;
+
+       dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *,
+                       op->dst.offset);
+
+       dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset;
+
+       if (unlikely(!src || !dst || !strm)) {
+               op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+               ZLIB_LOG_ERR("\nInvalid source or destination buffers");
+               return;
+       }
+       if (op->src.length <= sl)
+               sl = op->src.length;
+
+       /** Ignoring flush value provided from application for decompression */
+       flush = Z_NO_FLUSH;
+       /* initialize status to SUCCESS */
+       op->status = RTE_COMP_OP_STATUS_SUCCESS;
+
+       do {
+               /** Update z_stream with the inputs provided by application */
+               strm->avail_in = sl;
+               strm->next_in = src;
+               do {
+                       strm->avail_out = dl;
+                       strm->next_out = dst;
+
+                       ret = inflate(strm, flush);
+
+                       switch (ret) {
+                       case Z_NEED_DICT:
+                               ret = Z_DATA_ERROR;     /* and fall through */
+                       case Z_DATA_ERROR:
+                       case Z_MEM_ERROR:
+                       case Z_STREAM_ERROR:
+                               op->status = RTE_COMP_OP_STATUS_ERROR;
+                               goto inf_end;
+                       default:
+                               /** Update op stats */
+                               op->produced += dl - strm->avail_out;
+                               op->consumed += sl - strm->avail_in;
+
+                       }
+               /* Break if Z_STREAM_END is encountered or dst mbuf gets over */
+               } while (!(ret == Z_STREAM_END) && (strm->avail_out == 0) &&
+                               COMPUTE_DST_BUF(mbuf_dst, dst, dl));
+
+               /** Compress till the end of compressed blocks provided
+                * or till Z_STREAM_END.
+                * Exit if op->status is not SUCCESS.
+                */
+               if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) ||
+                               (ret == Z_STREAM_END) ||
+                               op->consumed == op->src.length) {
+                       goto inf_end;
+               }
+               /** Adjust previous output buffer with respect to avail_out */
+               have = dl - strm->avail_out;
+               dst += have;
+               dl = strm->avail_out;
+               /** Read next input buffer to be processed */
+               mbuf_src = mbuf_src->next;
+               src = rte_pktmbuf_mtod(mbuf_src, uint8_t *);
+               sl = rte_pktmbuf_data_len(mbuf_src);
+               if ((op->src.length - op->consumed) < sl)
+                       sl = (op->src.length - op->consumed);
+       } while (1);
+inf_end:
+       if (op->op_type == RTE_COMP_OP_STATELESS)
+               inflateReset(strm);
+}
+
+/** Process comp operation for mbuf */
+static inline int
+process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op)
+{
+       struct zlib_stream *stream;
+
+       if (op->src.offset > rte_pktmbuf_data_len(op->m_src) ||
+                       op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) {
+               op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+               ZLIB_LOG_ERR("\nInvalid source or destination buffers");
+               goto comp_err;
+       }
+
+       if (op->op_type == RTE_COMP_OP_STATELESS)
+               stream = &((struct zlib_priv_xform *)op->private_xform)->stream;
+       else if (op->op_type == RTE_COMP_OP_STATEFUL)
+               stream = (struct zlib_stream *)op->stream;
+       else {
+               op->status = RTE_COMP_OP_STATUS_INVALID_ARGS;
+               ZLIB_LOG_ERR("\nInvalid operation type");
+               goto comp_err;
+       }
+       stream->comp(op, &stream->strm);
+comp_err:
+       /* whatever is out of op, put it into completion queue with
+        * its status
+        */
+       return rte_ring_enqueue(qp->processed_pkts, (void *)op);
+}
 
 /** Parse comp xform and set private xform/Stream parameters */
 int
@@ -118,6 +350,43 @@ zlib_set_stream_parameters(const struct rte_comp_xform 
*xform,
        return 0;
 }
 
+static uint16_t
+zlib_pmd_enqueue_burst(void *queue_pair,
+                       struct rte_comp_op **ops, uint16_t nb_ops)
+{
+       struct zlib_qp *qp = queue_pair;
+       int ret, i;
+       int enqd = 0;
+       for (i = 0; i < nb_ops; i++) {
+               ret = process_zlib_op(qp, ops[i]);
+               if (unlikely(ret < 0)) {
+                       /* increment count if failed to push to completion
+                        * queue
+                        */
+                       qp->qp_stats.enqueue_err_count++;
+               } else {
+                       qp->qp_stats.enqueued_count++;
+                       enqd++;
+               }
+       }
+       return enqd;
+}
+
+static uint16_t
+zlib_pmd_dequeue_burst(void *queue_pair,
+                       struct rte_comp_op **ops, uint16_t nb_ops)
+{
+       struct zlib_qp *qp = queue_pair;
+
+       unsigned int nb_dequeued = 0;
+
+       nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts,
+                       (void **)ops, nb_ops, NULL);
+       qp->qp_stats.dequeued_count += nb_dequeued;
+
+       return nb_dequeued;
+}
+
 static int zlib_remove(struct rte_vdev_device *vdev);
 
 static int
@@ -138,6 +407,10 @@ zlib_create(const char *name,
        dev->driver_id = compressdev_driver_id;
        dev->dev_ops = rte_zlib_pmd_ops;
 
+       /* register rx/tx burst functions for data path */
+       dev->dequeue_burst = zlib_pmd_dequeue_burst;
+       dev->enqueue_burst = zlib_pmd_enqueue_burst;
+
        dev->feature_flags = 0;
        dev->feature_flags |= RTE_COMP_FF_SHAREABLE_PRIV_XFORM |
                                RTE_COMP_FF_NONCOMPRESSED_BLOCKS |
-- 
2.9.5

Reply via email to