implement enqueue and dequeue apis Signed-off-by: Sunila Sahu <sunila.s...@caviumnetworks.com> Signed-off-by: Shally Verma <shally.ve...@caviumnetworks.com> Signed-off-by: Ashish Gupta <ashish.gu...@caviumnetworks.com> --- drivers/compress/meson.build | 2 +- drivers/compress/zlib/meson.build | 11 ++ drivers/compress/zlib/zlib_pmd.c | 275 +++++++++++++++++++++++++++++++++++++- 3 files changed, 286 insertions(+), 2 deletions(-)
diff --git a/drivers/compress/meson.build b/drivers/compress/meson.build index fb136e1..e4d5e5c 100644 --- a/drivers/compress/meson.build +++ b/drivers/compress/meson.build @@ -1,7 +1,7 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Intel Corporation -drivers = ['isal'] +drivers = ['isal','zlib'] std_deps = ['compressdev'] # compressdev pulls in all other needed deps config_flag_fmt = 'RTE_LIBRTE_@0@_PMD' diff --git a/drivers/compress/zlib/meson.build b/drivers/compress/zlib/meson.build new file mode 100644 index 0000000..d66de95 --- /dev/null +++ b/drivers/compress/zlib/meson.build @@ -0,0 +1,11 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2018 Cavium Networks + +dep = dependency('zlib', required: false) +if not dep.found() + build = false +endif +deps += 'bus_vdev' +sources = files('zlib_pmd.c', 'zlib_pmd_ops.c') +ext_deps += dep +pkgconfig_extra_libs += '-lz' diff --git a/drivers/compress/zlib/zlib_pmd.c b/drivers/compress/zlib/zlib_pmd.c index 3dc71ec..e2681a7 100644 --- a/drivers/compress/zlib/zlib_pmd.c +++ b/drivers/compress/zlib/zlib_pmd.c @@ -17,7 +17,239 @@ #include "zlib_pmd_private.h" static uint8_t compressdev_driver_id; -int zlib_logtype_driver; + +/** compute the next mbuf in list and assign dst buffer and dlen, + * set op->status to appropriate flag if we run out of mbuf + */ +#define COMPUTE_DST_BUF(mbuf, dst, dlen) \ + ((mbuf = mbuf->next) ? \ + (dst = rte_pktmbuf_mtod(mbuf, uint8_t *)), \ + dlen = rte_pktmbuf_data_len(mbuf) : \ + !(op->status = \ + ((op->op_type == RTE_COMP_OP_STATELESS) ? \ + RTE_COMP_OP_STATUS_OUT_OF_SPACE_TERMINATED : \ + RTE_COMP_OP_STATUS_OUT_OF_SPACE_RECOVERABLE))) + +static void +process_zlib_deflate(struct rte_comp_op *op, z_stream *strm) +{ + int ret, flush, fin_flush; + uint8_t *src, *dst; + uint32_t sl, dl, have; + struct rte_mbuf *mbuf_src = op->m_src; + struct rte_mbuf *mbuf_dst = op->m_dst; + + src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op->src.offset); + + sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset; + + dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *, + op->dst.offset); + + dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset; + + if (unlikely(!src || !dst || !strm)) { + op->status = RTE_COMP_OP_STATUS_INVALID_ARGS; + ZLIB_LOG_ERR("\nInvalid source or destination buffers"); + return; + } + switch (op->flush_flag) { + case RTE_COMP_FLUSH_NONE: + fin_flush = Z_NO_FLUSH; + break; + case RTE_COMP_FLUSH_SYNC: + fin_flush = Z_SYNC_FLUSH; + break; + case RTE_COMP_FLUSH_FULL: + fin_flush = Z_FULL_FLUSH; + break; + case RTE_COMP_FLUSH_FINAL: + fin_flush = Z_FINISH; + break; + default: + op->status = RTE_COMP_OP_STATUS_ERROR; + goto def_end; + } + if (op->src.length <= sl) { + sl = op->src.length; + flush = fin_flush; + } else { + /* if there're more than one mbufs in input, + * process intermediate with NO_FLUSH + */ + flush = Z_NO_FLUSH; + } + /* initialize status to SUCCESS */ + op->status = RTE_COMP_OP_STATUS_SUCCESS; + + do { + /* Update z_stream with the inputs provided by application */ + strm->next_in = src; + strm->avail_in = sl; + + do { + strm->avail_out = dl; + strm->next_out = dst; + ret = deflate(strm, flush); + if (unlikely(ret == Z_STREAM_ERROR)) { + /* error return, do not process further */ + op->status = RTE_COMP_OP_STATUS_ERROR; + goto def_end; + } + /* Update op stats */ + op->produced += dl - strm->avail_out; + op->consumed += sl - strm->avail_in; + /* Break if Z_STREAM_END is encountered or dst mbuf gets over */ + } while (!(ret == Z_STREAM_END) && (strm->avail_out == 0) && + COMPUTE_DST_BUF(mbuf_dst, dst, dl)); + + /** Compress till the end of compressed blocks provided + * or till Z_FINISH + * Exit if op->status is not SUCCESS. + */ + if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) || + (ret == Z_STREAM_END) || + op->consumed == op->src.length) + goto def_end; + + /** Update last output buffer with respect to availed space */ + have = dl - strm->avail_out; + dst += have; + dl = strm->avail_out; + /** Update source buffer to next mbuf*/ + mbuf_src = mbuf_src->next; + src = rte_pktmbuf_mtod(mbuf_src, uint8_t *); + sl = rte_pktmbuf_data_len(mbuf_src); + + /** Last block to be compressed + * Update flush with value provided by app for last block, + * For stateless flush should be always Z_FINISH + */ + + if ((op->src.length - op->consumed) <= sl) { + sl = (op->src.length - op->consumed); + flush = fin_flush; + } + + } while (1); +def_end: + if (op->op_type == RTE_COMP_OP_STATELESS) + deflateReset(strm); +} + +static void +process_zlib_inflate(struct rte_comp_op *op, z_stream *strm) +{ + int ret, flush; + uint8_t *src, *dst; + uint32_t sl, dl, have; + struct rte_mbuf *mbuf_src = op->m_src; + struct rte_mbuf *mbuf_dst = op->m_dst; + + src = rte_pktmbuf_mtod_offset(mbuf_src, uint8_t *, op->src.offset); + + sl = rte_pktmbuf_data_len(mbuf_src) - op->src.offset; + + dst = rte_pktmbuf_mtod_offset(mbuf_dst, unsigned char *, + op->dst.offset); + + dl = rte_pktmbuf_data_len(mbuf_dst) - op->dst.offset; + + if (unlikely(!src || !dst || !strm)) { + op->status = RTE_COMP_OP_STATUS_INVALID_ARGS; + ZLIB_LOG_ERR("\nInvalid source or destination buffers"); + return; + } + if (op->src.length <= sl) + sl = op->src.length; + + /** Ignoring flush value provided from application for decompression */ + flush = Z_NO_FLUSH; + /* initialize status to SUCCESS */ + op->status = RTE_COMP_OP_STATUS_SUCCESS; + + do { + /** Update z_stream with the inputs provided by application */ + strm->avail_in = sl; + strm->next_in = src; + do { + strm->avail_out = dl; + strm->next_out = dst; + + ret = inflate(strm, flush); + + switch (ret) { + case Z_NEED_DICT: + ret = Z_DATA_ERROR; /* and fall through */ + case Z_DATA_ERROR: + case Z_MEM_ERROR: + case Z_STREAM_ERROR: + op->status = RTE_COMP_OP_STATUS_ERROR; + goto inf_end; + default: + /** Update op stats */ + op->produced += dl - strm->avail_out; + op->consumed += sl - strm->avail_in; + + } + /* Break if Z_STREAM_END is encountered or dst mbuf gets over */ + } while (!(ret == Z_STREAM_END) && (strm->avail_out == 0) && + COMPUTE_DST_BUF(mbuf_dst, dst, dl)); + + /** Compress till the end of compressed blocks provided + * or till Z_STREAM_END. + * Exit if op->status is not SUCCESS. + */ + if ((op->status != RTE_COMP_OP_STATUS_SUCCESS) || + (ret == Z_STREAM_END) || + op->consumed == op->src.length) { + goto inf_end; + } + /** Adjust previous output buffer with respect to avail_out */ + have = dl - strm->avail_out; + dst += have; + dl = strm->avail_out; + /** Read next input buffer to be processed */ + mbuf_src = mbuf_src->next; + src = rte_pktmbuf_mtod(mbuf_src, uint8_t *); + sl = rte_pktmbuf_data_len(mbuf_src); + if ((op->src.length - op->consumed) < sl) + sl = (op->src.length - op->consumed); + } while (1); +inf_end: + if (op->op_type == RTE_COMP_OP_STATELESS) + inflateReset(strm); +} + +/** Process comp operation for mbuf */ +static inline int +process_zlib_op(struct zlib_qp *qp, struct rte_comp_op *op) +{ + struct zlib_stream *stream; + + if (op->src.offset > rte_pktmbuf_data_len(op->m_src) || + op->dst.offset > rte_pktmbuf_data_len(op->m_dst)) { + op->status = RTE_COMP_OP_STATUS_INVALID_ARGS; + ZLIB_LOG_ERR("\nInvalid source or destination buffers"); + goto comp_err; + } + + if (op->op_type == RTE_COMP_OP_STATELESS) + stream = &((struct zlib_priv_xform *)op->private_xform)->stream; + else if (op->op_type == RTE_COMP_OP_STATEFUL) + stream = (struct zlib_stream *)op->stream; + else { + op->status = RTE_COMP_OP_STATUS_INVALID_ARGS; + ZLIB_LOG_ERR("\nInvalid operation type"); + goto comp_err; + } + stream->comp(op, &stream->strm); +comp_err: + /* whatever is out of op, put it into completion queue with + * its status + */ + return rte_ring_enqueue(qp->processed_pkts, (void *)op); +} /** Parse comp xform and set private xform/Stream parameters */ int @@ -118,6 +350,43 @@ zlib_set_stream_parameters(const struct rte_comp_xform *xform, return 0; } +static uint16_t +zlib_pmd_enqueue_burst(void *queue_pair, + struct rte_comp_op **ops, uint16_t nb_ops) +{ + struct zlib_qp *qp = queue_pair; + int ret, i; + int enqd = 0; + for (i = 0; i < nb_ops; i++) { + ret = process_zlib_op(qp, ops[i]); + if (unlikely(ret < 0)) { + /* increment count if failed to push to completion + * queue + */ + qp->qp_stats.enqueue_err_count++; + } else { + qp->qp_stats.enqueued_count++; + enqd++; + } + } + return enqd; +} + +static uint16_t +zlib_pmd_dequeue_burst(void *queue_pair, + struct rte_comp_op **ops, uint16_t nb_ops) +{ + struct zlib_qp *qp = queue_pair; + + unsigned int nb_dequeued = 0; + + nb_dequeued = rte_ring_dequeue_burst(qp->processed_pkts, + (void **)ops, nb_ops, NULL); + qp->qp_stats.dequeued_count += nb_dequeued; + + return nb_dequeued; +} + static int zlib_remove(struct rte_vdev_device *vdev); static int @@ -138,6 +407,10 @@ zlib_create(const char *name, dev->driver_id = compressdev_driver_id; dev->dev_ops = rte_zlib_pmd_ops; + /* register rx/tx burst functions for data path */ + dev->dequeue_burst = zlib_pmd_dequeue_burst; + dev->enqueue_burst = zlib_pmd_enqueue_burst; + dev->feature_flags = 0; dev->feature_flags |= RTE_COMP_FF_SHAREABLE_PRIV_XFORM | RTE_COMP_FF_NONCOMPRESSED_BLOCKS | -- 2.9.5