From: Kirill Rybalchenko <kirill.rybalche...@intel.com>

Multi-core scheduling mode is a mode where scheduler distributes
crypto operations in a round-robin base, between several core
assigned as workers.

Signed-off-by: Kirill Rybalchenko <kirill.rybalche...@intel.com>
---
 app/test-crypto-perf/cperf_test_throughput.c       |   2 +
 drivers/crypto/scheduler/Makefile                  |   1 +
 drivers/crypto/scheduler/rte_cryptodev_scheduler.c |   7 +
 drivers/crypto/scheduler/rte_cryptodev_scheduler.h |   6 +
 drivers/crypto/scheduler/scheduler_multicore.c     | 405 +++++++++++++++++++++
 drivers/crypto/scheduler/scheduler_pmd.c           |  73 +++-
 drivers/crypto/scheduler/scheduler_pmd_private.h   |   4 +
 lib/librte_cryptodev/rte_cryptodev.c               |   2 +-
 8 files changed, 497 insertions(+), 3 deletions(-)
 create mode 100644 drivers/crypto/scheduler/scheduler_multicore.c

diff --git a/app/test-crypto-perf/cperf_test_throughput.c 
b/app/test-crypto-perf/cperf_test_throughput.c
index 61b27ea..0504a37 100644
--- a/app/test-crypto-perf/cperf_test_throughput.c
+++ b/app/test-crypto-perf/cperf_test_throughput.c
@@ -502,6 +502,8 @@ cperf_throughput_test_runner(void *test_ctx)
 
        }
 
+       rte_cryptodev_stop(ctx->dev_id);
+
        return 0;
 }
 
diff --git a/drivers/crypto/scheduler/Makefile 
b/drivers/crypto/scheduler/Makefile
index c273e78..b045410 100644
--- a/drivers/crypto/scheduler/Makefile
+++ b/drivers/crypto/scheduler/Makefile
@@ -56,5 +56,6 @@ SRCS-$(CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER) += 
rte_cryptodev_scheduler.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER) += scheduler_roundrobin.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER) += scheduler_pkt_size_distr.c
 SRCS-$(CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER) += scheduler_failover.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_CRYPTO_SCHEDULER) += scheduler_multicore.c
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/crypto/scheduler/rte_cryptodev_scheduler.c 
b/drivers/crypto/scheduler/rte_cryptodev_scheduler.c
index 319dcf0..5f50f65 100644
--- a/drivers/crypto/scheduler/rte_cryptodev_scheduler.c
+++ b/drivers/crypto/scheduler/rte_cryptodev_scheduler.c
@@ -351,6 +351,13 @@ rte_cryptodev_scheduler_mode_set(uint8_t scheduler_id,
                        return -1;
                }
                break;
+       case CDEV_SCHED_MODE_MULTICORE:
+               if (rte_cryptodev_scheduler_load_user_scheduler(scheduler_id,
+                               multicore_scheduler) < 0) {
+                       CS_LOG_ERR("Failed to load scheduler");
+                       return -1;
+               }
+               break;
        default:
                CS_LOG_ERR("Not yet supported");
                return -ENOTSUP;
diff --git a/drivers/crypto/scheduler/rte_cryptodev_scheduler.h 
b/drivers/crypto/scheduler/rte_cryptodev_scheduler.h
index 2ba6e47..5d8ac45 100644
--- a/drivers/crypto/scheduler/rte_cryptodev_scheduler.h
+++ b/drivers/crypto/scheduler/rte_cryptodev_scheduler.h
@@ -64,6 +64,8 @@ extern "C" {
 #define SCHEDULER_MODE_NAME_PKT_SIZE_DISTR     packet-size-distr
 /** Fail-over scheduling mode string */
 #define SCHEDULER_MODE_NAME_FAIL_OVER          fail-over
+/** multi-core scheduling mode string */
+#define SCHEDULER_MODE_NAME_MULTI_CORE         multi-core
 
 /**
  * Crypto scheduler PMD operation modes
@@ -78,6 +80,8 @@ enum rte_cryptodev_scheduler_mode {
        CDEV_SCHED_MODE_PKT_SIZE_DISTR,
        /** Fail-over mode */
        CDEV_SCHED_MODE_FAILOVER,
+       /** multi-core mode */
+       CDEV_SCHED_MODE_MULTICORE,
 
        CDEV_SCHED_MODE_COUNT /**< number of modes */
 };
@@ -327,6 +331,8 @@ extern struct rte_cryptodev_scheduler *roundrobin_scheduler;
 extern struct rte_cryptodev_scheduler *pkt_size_based_distr_scheduler;
 /** Fail-over mode scheduler */
 extern struct rte_cryptodev_scheduler *failover_scheduler;
+/** multi-core mode scheduler */
+extern struct rte_cryptodev_scheduler *multicore_scheduler;
 
 #ifdef __cplusplus
 }
diff --git a/drivers/crypto/scheduler/scheduler_multicore.c 
b/drivers/crypto/scheduler/scheduler_multicore.c
new file mode 100644
index 0000000..12e5734
--- /dev/null
+++ b/drivers/crypto/scheduler/scheduler_multicore.c
@@ -0,0 +1,405 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2017 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <unistd.h>
+
+#include <rte_cryptodev.h>
+#include <rte_malloc.h>
+
+#include "rte_cryptodev_scheduler_operations.h"
+#include "scheduler_pmd_private.h"
+
+#define MC_SCHED_RING_SIZE             1024
+#define MC_SCHED_ENQ_RING_NAME "MCS_ENQR_"
+#define MC_SCHED_DEQ_RING_NAME "MCS_DEQR_"
+
+#define MC_SCHED_BUFFER_SIZE 32
+#define MC_SCHED_BUFFER_MASK (MC_SCHED_BUFFER_SIZE - 1)
+
+/** multi-core scheduler context */
+struct mc_scheduler_ctx {
+       unsigned int num_workers;             /**< Number of workers polling */
+       unsigned int stop_signal;
+
+       struct rte_ring *sched_enq_ring[MAX_NB_WORKER_CORES];
+       struct rte_ring *sched_deq_ring[MAX_NB_WORKER_CORES];
+};
+
+struct mc_scheduler_qp_ctx {
+       struct scheduler_slave slaves[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES];
+       uint32_t nb_slaves;
+
+       uint32_t last_enq_worker_idx;
+       uint32_t last_deq_worker_idx;
+
+       struct mc_scheduler_ctx *mc_private_ctx;
+};
+
+static uint16_t
+schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
+{
+       struct mc_scheduler_qp_ctx *mc_qp_ctx =
+                       ((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
+       struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx;
+       uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx;
+
+       uint16_t i, processed_ops = 0;
+
+       if (unlikely(nb_ops == 0))
+               return 0;
+
+       for (i = 0; i <  mc_ctx->num_workers && nb_ops != 0; i++) {
+               struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx];
+               uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring,
+                       (void *)(&ops[processed_ops]), nb_ops, NULL);
+
+               nb_ops -= nb_queue_ops;
+               processed_ops += nb_queue_ops;
+
+               if (++worker_idx == mc_ctx->num_workers)
+                       worker_idx = 0;
+       }
+       mc_qp_ctx->last_enq_worker_idx = worker_idx;
+
+       return processed_ops;
+}
+
+static uint16_t
+schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops,
+               uint16_t nb_ops)
+{
+       struct rte_ring *order_ring =
+                       ((struct scheduler_qp_ctx *)qp)->order_ring;
+       uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
+                       nb_ops);
+       uint16_t nb_ops_enqd = schedule_enqueue(qp, ops,
+                       nb_ops_to_enq);
+
+       scheduler_order_insert(order_ring, ops, nb_ops_enqd);
+
+       return nb_ops_enqd;
+}
+
+
+static uint16_t
+schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
+{
+       struct mc_scheduler_qp_ctx *mc_qp_ctx =
+                       ((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
+       struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx;
+       uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx;
+       uint16_t i, processed_ops = 0;
+
+       for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) {
+               struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx];
+               uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring,
+                       (void *)(&ops[processed_ops]), nb_ops, NULL);
+
+               nb_ops -= nb_deq_ops;
+               processed_ops += nb_deq_ops;
+
+               if (++worker_idx == mc_ctx->num_workers)
+                       worker_idx = 0;
+       }
+
+       mc_qp_ctx->last_deq_worker_idx = worker_idx;
+
+       return processed_ops;
+
+}
+
+static uint16_t
+schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
+               uint16_t nb_ops)
+{
+       struct rte_ring *order_ring =
+                       ((struct scheduler_qp_ctx *)qp)->order_ring;
+
+       schedule_dequeue(qp, ops, nb_ops);
+
+       return scheduler_order_drain(order_ring, ops, nb_ops);
+}
+
+static int
+slave_attach(__rte_unused struct rte_cryptodev *dev,
+               __rte_unused uint8_t slave_id)
+{
+       return 0;
+}
+
+static int
+slave_detach(__rte_unused struct rte_cryptodev *dev,
+               __rte_unused uint8_t slave_id)
+{
+       return 0;
+}
+
+static int
+mc_scheduler_worker(struct rte_cryptodev *dev)
+{
+       struct scheduler_ctx *sched_ctx = dev->data->dev_private;
+       struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
+       struct rte_ring *enq_ring;
+       struct rte_ring *deq_ring;
+       unsigned int core_id = rte_lcore_id();
+       int worker_idx = -1;
+       struct scheduler_slave *slave;
+       struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE];
+       struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE];
+       struct rte_cryptodev_sym_session *sessions[MC_SCHED_BUFFER_SIZE];
+       struct scheduler_session *sess0, *sess1, *sess2, *sess3;
+       uint16_t processed_ops;
+       uint16_t inflight_ops = 0;
+       uint16_t i;
+
+       for (i = 0; i < sched_ctx->nb_wc; i++) {
+               if (sched_ctx->wc_pool[i] == core_id) {
+                       worker_idx = i;
+                       break;
+               }
+       }
+       if (worker_idx == -1) {
+               CS_LOG_ERR("worker on core %u:cannot find worker index!\n", 
core_id);
+               return -1;
+       }
+
+       slave = &sched_ctx->slaves[worker_idx];
+       enq_ring = mc_ctx->sched_enq_ring[worker_idx];
+       deq_ring = mc_ctx->sched_deq_ring[worker_idx];
+
+       while (!mc_ctx->stop_signal) {
+               uint16_t nb_deq_ops = rte_ring_dequeue_burst(enq_ring,
+                       (void *)enq_ops, MC_SCHED_BUFFER_SIZE, NULL);
+               if (nb_deq_ops) {
+                       uint16_t i;
+
+                       for (i = 0; i < nb_deq_ops && i < 4; i++)
+                               rte_prefetch0(enq_ops[i]->sym->session);
+
+                       for (i = 0; (i < (nb_deq_ops - 8)) && (nb_deq_ops > 8); 
i += 4) {
+                               sess0 = (struct scheduler_session *)
+                                               
enq_ops[i]->sym->session->_private;
+                               sess1 = (struct scheduler_session *)
+                                               
enq_ops[i+1]->sym->session->_private;
+                               sess2 = (struct scheduler_session *)
+                                               
enq_ops[i+2]->sym->session->_private;
+                               sess3 = (struct scheduler_session *)
+                                               
enq_ops[i+3]->sym->session->_private;
+
+                               sessions[i] = enq_ops[i]->sym->session;
+                               sessions[i + 1] = enq_ops[i + 1]->sym->session;
+                               sessions[i + 2] = enq_ops[i + 2]->sym->session;
+                               sessions[i + 3] = enq_ops[i + 3]->sym->session;
+
+                               enq_ops[i]->sym->session = 
sess0->sessions[worker_idx];
+                               enq_ops[i + 1]->sym->session = 
sess1->sessions[worker_idx];
+                               enq_ops[i + 2]->sym->session = 
sess2->sessions[worker_idx];
+                               enq_ops[i + 3]->sym->session = 
sess3->sessions[worker_idx];
+
+                               rte_prefetch0(enq_ops[i + 4]->sym->session);
+                               rte_prefetch0(enq_ops[i + 5]->sym->session);
+                               rte_prefetch0(enq_ops[i + 6]->sym->session);
+                               rte_prefetch0(enq_ops[i + 7]->sym->session);
+                       }
+
+                       for (; i < nb_deq_ops; i++) {
+                               sess0 = (struct scheduler_session *)
+                                               
enq_ops[i]->sym->session->_private;
+                               sessions[i] = enq_ops[i]->sym->session;
+                               enq_ops[i]->sym->session = 
sess0->sessions[worker_idx];
+                       }
+
+                       processed_ops = 
rte_cryptodev_enqueue_burst(slave->dev_id,
+                                               slave->qp_id, enq_ops, 
nb_deq_ops);
+
+                       if (unlikely(processed_ops < nb_deq_ops)) {
+                               for (i = processed_ops; i < nb_deq_ops; i++)
+                                       enq_ops[i]->sym->session = sessions[i];
+                       }
+
+                       inflight_ops += processed_ops;
+               }
+               if (inflight_ops > 0) {
+                       processed_ops = 
rte_cryptodev_dequeue_burst(slave->dev_id,
+                                       slave->qp_id, deq_ops, 
MC_SCHED_BUFFER_SIZE);
+                       if (processed_ops) {
+                               uint16_t nb_enq_ops = 
rte_ring_enqueue_burst(deq_ring,
+                                       (void *)deq_ops, processed_ops, NULL);
+                               inflight_ops -= nb_enq_ops;
+                       }
+               }
+               rte_pause();
+       }
+
+       return 0;
+}
+
+static int
+scheduler_start(struct rte_cryptodev *dev)
+{
+       struct scheduler_ctx *sched_ctx = dev->data->dev_private;
+       struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
+       uint16_t i;
+
+       mc_ctx->stop_signal = 0;
+
+       for (i = 0; i < sched_ctx->nb_wc; i++)
+               rte_eal_remote_launch(
+                       (lcore_function_t *)mc_scheduler_worker, dev, 
sched_ctx->wc_pool[i]);
+
+       if (sched_ctx->reordering_enabled) {
+               dev->enqueue_burst = &schedule_enqueue_ordering;
+               dev->dequeue_burst = &schedule_dequeue_ordering;
+       } else {
+               dev->enqueue_burst = &schedule_enqueue;
+               dev->dequeue_burst = &schedule_dequeue;
+       }
+
+       for (i = 0; i < dev->data->nb_queue_pairs; i++) {
+               struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
+               struct mc_scheduler_qp_ctx *mc_qp_ctx =
+                               qp_ctx->private_qp_ctx;
+               uint32_t j;
+
+               memset(mc_qp_ctx->slaves, 0,
+                               RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES *
+                               sizeof(struct scheduler_slave));
+               for (j = 0; j < sched_ctx->nb_slaves; j++) {
+                       mc_qp_ctx->slaves[j].dev_id =
+                                       sched_ctx->slaves[j].dev_id;
+                       mc_qp_ctx->slaves[j].qp_id = i;
+               }
+
+               mc_qp_ctx->nb_slaves = sched_ctx->nb_slaves;
+
+               mc_qp_ctx->last_enq_worker_idx = 0;
+               mc_qp_ctx->last_deq_worker_idx = 0;
+       }
+
+       return 0;
+}
+
+static int
+scheduler_stop(struct rte_cryptodev *dev)
+{
+       struct scheduler_ctx *sched_ctx = dev->data->dev_private;
+       struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
+
+       mc_ctx->stop_signal = 1;
+       for (uint16_t i = 0; i < sched_ctx->nb_wc; i++)
+               rte_eal_wait_lcore(sched_ctx->wc_pool[i]);
+
+       return 0;
+}
+
+static int
+scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id)
+{
+       struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];
+       struct mc_scheduler_qp_ctx *mc_qp_ctx;
+       struct scheduler_ctx *sched_ctx = dev->data->dev_private;
+       struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
+
+       mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0,
+                       rte_socket_id());
+       if (!mc_qp_ctx) {
+               CS_LOG_ERR("failed allocate memory for private queue pair");
+               return -ENOMEM;
+       }
+
+       mc_qp_ctx->mc_private_ctx = mc_ctx;
+       qp_ctx->private_qp_ctx = (void *)mc_qp_ctx;
+
+       return 0;
+}
+
+static int
+scheduler_create_private_ctx(struct rte_cryptodev *dev)
+{
+       struct scheduler_ctx *sched_ctx = dev->data->dev_private;
+       struct mc_scheduler_ctx *mc_ctx;
+       uint16_t i;
+
+       if (sched_ctx->private_ctx)
+               rte_free(sched_ctx->private_ctx);
+
+       mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0,
+                       rte_socket_id());
+       if (!mc_ctx) {
+               CS_LOG_ERR("failed allocate memory");
+               return -ENOMEM;
+       }
+
+       mc_ctx->num_workers = sched_ctx->nb_wc;
+       for (i = 0; i < sched_ctx->nb_wc; i++) {
+               char r_name[16];
+
+               snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME "%u", 
i);
+               mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, 
MC_SCHED_RING_SIZE,
+                       rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+               if (!mc_ctx->sched_enq_ring[i]) {
+                       CS_LOG_ERR("Cannot create ring for worker %u", i);
+                       return -1;
+               }
+               snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME "%u", 
i);
+               mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, 
MC_SCHED_RING_SIZE,
+                       rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+               if (!mc_ctx->sched_deq_ring[i]) {
+                       CS_LOG_ERR("Cannot create ring for worker %u", i);
+                       return -1;
+               }
+       }
+
+       sched_ctx->private_ctx = (void *)mc_ctx;
+
+       return 0;
+}
+
+struct rte_cryptodev_scheduler_ops scheduler_mc_ops = {
+       slave_attach,
+       slave_detach,
+       scheduler_start,
+       scheduler_stop,
+       scheduler_config_qp,
+       scheduler_create_private_ctx,
+       NULL,   /* option_set */
+       NULL    /* option_get */
+};
+
+struct rte_cryptodev_scheduler mc_scheduler = {
+               .name = "multicore-scheduler",
+               .description = "scheduler which will run burst across "
+                               "multiple cpu cores",
+               .mode = CDEV_SCHED_MODE_MULTICORE,
+               .ops = &scheduler_mc_ops
+};
+
+struct rte_cryptodev_scheduler *multicore_scheduler = &mc_scheduler;
diff --git a/drivers/crypto/scheduler/scheduler_pmd.c 
b/drivers/crypto/scheduler/scheduler_pmd.c
index 0b63c20..582c8fb 100644
--- a/drivers/crypto/scheduler/scheduler_pmd.c
+++ b/drivers/crypto/scheduler/scheduler_pmd.c
@@ -46,6 +46,7 @@ struct scheduler_init_params {
        uint32_t nb_slaves;
        enum rte_cryptodev_scheduler_mode mode;
        uint32_t enable_ordering;
+       uint64_t wcmask;
        char slave_names[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES]
                        [RTE_CRYPTODEV_SCHEDULER_NAME_MAX_LEN];
 };
@@ -57,6 +58,8 @@ struct scheduler_init_params {
 #define RTE_CRYPTODEV_VDEV_MAX_NB_QP_ARG       ("max_nb_queue_pairs")
 #define RTE_CRYPTODEV_VDEV_MAX_NB_SESS_ARG     ("max_nb_sessions")
 #define RTE_CRYPTODEV_VDEV_SOCKET_ID           ("socket_id")
+#define RTE_CRYPTODEV_VDEV_COREMASK                    ("coremask")
+#define RTE_CRYPTODEV_VDEV_CORELIST                    ("corelist")
 
 const char *scheduler_valid_params[] = {
        RTE_CRYPTODEV_VDEV_NAME,
@@ -65,7 +68,9 @@ const char *scheduler_valid_params[] = {
        RTE_CRYPTODEV_VDEV_ORDERING,
        RTE_CRYPTODEV_VDEV_MAX_NB_QP_ARG,
        RTE_CRYPTODEV_VDEV_MAX_NB_SESS_ARG,
-       RTE_CRYPTODEV_VDEV_SOCKET_ID
+       RTE_CRYPTODEV_VDEV_SOCKET_ID,
+       RTE_CRYPTODEV_VDEV_COREMASK,
+       RTE_CRYPTODEV_VDEV_CORELIST
 };
 
 struct scheduler_parse_map {
@@ -79,7 +84,9 @@ const struct scheduler_parse_map scheduler_mode_map[] = {
        {RTE_STR(SCHEDULER_MODE_NAME_PKT_SIZE_DISTR),
                        CDEV_SCHED_MODE_PKT_SIZE_DISTR},
        {RTE_STR(SCHEDULER_MODE_NAME_FAIL_OVER),
-                       CDEV_SCHED_MODE_FAILOVER}
+                       CDEV_SCHED_MODE_FAILOVER},
+       {RTE_STR(SCHEDULER_MODE_NAME_MULTI_CORE),
+                       CDEV_SCHED_MODE_MULTICORE}
 };
 
 const struct scheduler_parse_map scheduler_ordering_map[] = {
@@ -117,6 +124,17 @@ cryptodev_scheduler_create(const char *name,
        sched_ctx->max_nb_queue_pairs =
                        init_params->def_p.max_nb_queue_pairs;
 
+       if (init_params->mode == CDEV_SCHED_MODE_MULTICORE) {
+               sched_ctx->nb_wc = 0;
+               for (uint16_t i = 0; i < MAX_NB_WORKER_CORES; i++) {
+                       if (init_params->wcmask & (1ULL << i)) {
+                               sched_ctx->wc_pool[sched_ctx->nb_wc++] = i;
+                               RTE_LOG(INFO, PMD, "  Worker core[%u]=%u 
added\n",
+                                               sched_ctx->nb_wc-1, i);
+                       }
+               }
+       }
+
        if (init_params->mode > CDEV_SCHED_MODE_USERDEFINED &&
                        init_params->mode < CDEV_SCHED_MODE_COUNT) {
                ret = rte_cryptodev_scheduler_mode_set(dev->data->dev_id,
@@ -251,6 +269,42 @@ parse_integer_arg(const char *key __rte_unused,
        return 0;
 }
 
+/** Parse integer from hexadecimal integer argument */
+static int
+parse_coremask_arg(const char *key __rte_unused,
+               const char *value, void *extra_args)
+{
+       struct scheduler_init_params *params = extra_args;
+
+       params->wcmask = strtoull(value, NULL, 16);
+
+       return 0;
+}
+
+/** Parse integer from list of integers argument */
+static int
+parse_corelist_arg(const char *key __rte_unused,
+               const char *value, void *extra_args)
+{
+       struct scheduler_init_params *params = extra_args;
+
+       params->wcmask = 0ULL;
+
+       const char *token = value;
+       while (isdigit(token[0])) {
+               char *rval;
+               unsigned int core = strtoul(token, &rval, 10);
+
+               params->wcmask |= 1ULL << core;
+               token = (const char *)rval;
+               if (token[0] == '\0')
+                       break;
+               token++;
+       }
+
+       return 0;
+}
+
 /** Parse name */
 static int
 parse_name_arg(const char *key __rte_unused,
@@ -370,6 +424,18 @@ scheduler_parse_init_params(struct scheduler_init_params 
*params,
                if (ret < 0)
                        goto free_kvlist;
 
+               ret = rte_kvargs_process(kvlist, RTE_CRYPTODEV_VDEV_COREMASK,
+                               &parse_coremask_arg,
+                               params);
+               if (ret < 0)
+                       goto free_kvlist;
+
+               ret = rte_kvargs_process(kvlist, RTE_CRYPTODEV_VDEV_CORELIST,
+                               &parse_corelist_arg,
+                               params);
+               if (ret < 0)
+                       goto free_kvlist;
+
                ret = rte_kvargs_process(kvlist, RTE_CRYPTODEV_VDEV_NAME,
                                &parse_name_arg,
                                &params->def_p);
@@ -437,6 +503,9 @@ cryptodev_scheduler_probe(struct rte_vdev_device *vdev)
        if (init_params.def_p.name[0] != '\0')
                RTE_LOG(INFO, PMD, "  User defined name = %s\n",
                        init_params.def_p.name);
+       if (init_params.wcmask != 0)
+               RTE_LOG(INFO, PMD, "  workers core mask = %lx\n",
+                       init_params.wcmask);
 
        return cryptodev_scheduler_create(name,
                                        &init_params);
diff --git a/drivers/crypto/scheduler/scheduler_pmd_private.h 
b/drivers/crypto/scheduler/scheduler_pmd_private.h
index 421dae3..175efd6 100644
--- a/drivers/crypto/scheduler/scheduler_pmd_private.h
+++ b/drivers/crypto/scheduler/scheduler_pmd_private.h
@@ -58,6 +58,8 @@
 #define CS_LOG_DBG(fmt, args...)
 #endif
 
+#define MAX_NB_WORKER_CORES    64
+
 struct scheduler_slave {
        uint8_t dev_id;
        uint16_t qp_id;
@@ -86,6 +88,8 @@ struct scheduler_ctx {
 
        char name[RTE_CRYPTODEV_SCHEDULER_NAME_MAX_LEN];
        char description[RTE_CRYPTODEV_SCHEDULER_DESC_MAX_LEN];
+       uint16_t wc_pool[MAX_NB_WORKER_CORES];
+       uint16_t nb_wc;
 
        char *init_slave_names[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES];
        int nb_init_slaves;
diff --git a/lib/librte_cryptodev/rte_cryptodev.c 
b/lib/librte_cryptodev/rte_cryptodev.c
index b65cd9c..5aa2b8b 100644
--- a/lib/librte_cryptodev/rte_cryptodev.c
+++ b/lib/librte_cryptodev/rte_cryptodev.c
@@ -1032,8 +1032,8 @@ rte_cryptodev_stop(uint8_t dev_id)
                return;
        }
 
-       dev->data->dev_started = 0;
        (*dev->dev_ops->dev_stop)(dev);
+       dev->data->dev_started = 0;
 }
 
 int
-- 
2.7.4

Reply via email to