With these changes functions:
  - rte_ipsec_crypto_prepare
  - rte_ipsec_process
 can be safely used in MT environment, as long as the user can guarantee
 that they obey multiple readers/single writer model for SQN+replay_window
 operations.
 To be more specific:
 for outbound SA there are no restrictions.
 for inbound SA the caller has to guarantee that at any given moment
 only one thread is executing rte_ipsec_process() for given SA.
 Note that it is caller responsibility to maintain correct order
 of packets to be processed.

Signed-off-by: Konstantin Ananyev <konstantin.anan...@intel.com>
---
 lib/librte_ipsec/ipsec_sqn.h    | 129 +++++++++++++++++++++++++++++++++++++++-
 lib/librte_ipsec/rte_ipsec_sa.h |  27 +++++++++
 lib/librte_ipsec/rwl.h          |  68 +++++++++++++++++++++
 lib/librte_ipsec/sa.c           |  22 +++++--
 lib/librte_ipsec/sa.h           |  22 +++++--
 5 files changed, 258 insertions(+), 10 deletions(-)
 create mode 100644 lib/librte_ipsec/rwl.h

diff --git a/lib/librte_ipsec/ipsec_sqn.h b/lib/librte_ipsec/ipsec_sqn.h
index 7477b8d59..a3c993a52 100644
--- a/lib/librte_ipsec/ipsec_sqn.h
+++ b/lib/librte_ipsec/ipsec_sqn.h
@@ -5,6 +5,8 @@
 #ifndef _IPSEC_SQN_H_
 #define _IPSEC_SQN_H_
 
+#include "rwl.h"
+
 #define WINDOW_BUCKET_BITS             6 /* uint64_t */
 #define WINDOW_BUCKET_SIZE             (1 << WINDOW_BUCKET_BITS)
 #define WINDOW_BIT_LOC_MASK            (WINDOW_BUCKET_SIZE - 1)
@@ -15,6 +17,9 @@
 
 #define IS_ESN(sa)     ((sa)->sqn_mask == UINT64_MAX)
 
+#define        SQN_ATOMIC(sa)  ((sa)->type & RTE_IPSEC_SATP_SQN_ATOM)
+
+
 /*
  * for given size, calculate required number of buckets.
  */
@@ -109,8 +114,12 @@ esn_outb_update_sqn(struct rte_ipsec_sa *sa, uint32_t *num)
        uint64_t n, s, sqn;
 
        n = *num;
-       sqn = sa->sqn.outb + n;
-       sa->sqn.outb = sqn;
+       if (SQN_ATOMIC(sa))
+               sqn = (uint64_t)rte_atomic64_add_return(&sa->sqn.outb.atom, n);
+       else {
+               sqn = sa->sqn.outb.raw + n;
+               sa->sqn.outb.raw = sqn;
+       }
 
        /* overflow */
        if (sqn > sa->sqn_mask) {
@@ -173,6 +182,19 @@ esn_inb_update_sqn(struct replay_sqn *rsn, const struct 
rte_ipsec_sa *sa,
 }
 
 /**
+ * To achieve ability to do multiple readers single writer for
+ * SA replay window information and sequence number (RSN)
+ * basic RCU schema is used:
+ * SA have 2 copies of RSN (one for readers, another for writers).
+ * Each RSN contains a rwlock that has to be grabbed (for read/write)
+ * to avoid races between readers and writer.
+ * Writer is responsible to make a copy or reader RSN, update it
+ * and mark newly updated RSN as readers one.
+ * That approach is intended to minimize contention and cache sharing
+ * between writer and readers.
+ */
+
+/**
  * Based on number of buckets calculated required size for the
  * structure that holds replay window and sequnce number (RSN) information.
  */
@@ -187,4 +209,107 @@ rsn_size(uint32_t nb_bucket)
        return sz;
 }
 
+/**
+ * Copy replay window and SQN.
+ */
+static inline void
+rsn_copy(const struct rte_ipsec_sa *sa, uint32_t dst, uint32_t src)
+{
+       uint32_t i, n;
+       struct replay_sqn *d;
+       const struct replay_sqn *s;
+
+       d = sa->sqn.inb.rsn[dst];
+       s = sa->sqn.inb.rsn[src];
+
+       n = sa->replay.nb_bucket;
+
+       d->sqn = s->sqn;
+       for (i = 0; i != n; i++)
+               d->window[i] = s->window[i];
+}
+
+/**
+ * Get RSN for read-only access.
+ */
+static inline struct replay_sqn *
+rsn_acquire(struct rte_ipsec_sa *sa)
+{
+       uint32_t n;
+       struct replay_sqn *rsn;
+
+       n = sa->sqn.inb.rdidx;
+       rsn = sa->sqn.inb.rsn[n];
+
+       if (!SQN_ATOMIC(sa))
+               return rsn;
+
+       /* check there are no writers */
+       while (rwl_try_read_lock(&rsn->rwl) < 0) {
+               rte_pause();
+               n = sa->sqn.inb.rdidx;
+               rsn = sa->sqn.inb.rsn[n];
+               rte_compiler_barrier();
+       }
+
+       return rsn;
+}
+
+/**
+ * Release read-only access for RSN.
+ */
+static inline void
+rsn_release(struct rte_ipsec_sa *sa, struct replay_sqn *rsn)
+{
+       if (SQN_ATOMIC(sa))
+               rwl_read_unlock(&rsn->rwl);
+}
+
+/**
+ * Start RSN update.
+ */
+static inline struct replay_sqn *
+rsn_update_start(struct rte_ipsec_sa *sa)
+{
+       uint32_t k, n;
+       struct replay_sqn *rsn;
+
+       n = sa->sqn.inb.wridx;
+
+       /* no active writers */
+       RTE_ASSERT(n == sa->sqn.inb.rdidx);
+
+       if (!SQN_ATOMIC(sa))
+               return sa->sqn.inb.rsn[n];
+
+       k = REPLAY_SQN_NEXT(n);
+       sa->sqn.inb.wridx = k;
+
+       rsn = sa->sqn.inb.rsn[k];
+       rwl_write_lock(&rsn->rwl);
+       rsn_copy(sa, k, n);
+
+       return rsn;
+}
+
+/**
+ * Finish RSN update.
+ */
+static inline void
+rsn_update_finish(struct rte_ipsec_sa *sa, struct replay_sqn *rsn)
+{
+       uint32_t n;
+
+       if (!SQN_ATOMIC(sa))
+               return;
+
+       n = sa->sqn.inb.wridx;
+       RTE_ASSERT(n != sa->sqn.inb.rdidx);
+       RTE_ASSERT(rsn - sa->sqn.inb.rsn == n);
+
+       rwl_write_unlock(&rsn->rwl);
+       sa->sqn.inb.rdidx = n;
+}
+
+
 #endif /* _IPSEC_SQN_H_ */
diff --git a/lib/librte_ipsec/rte_ipsec_sa.h b/lib/librte_ipsec/rte_ipsec_sa.h
index 0efda33de..3324cbedb 100644
--- a/lib/librte_ipsec/rte_ipsec_sa.h
+++ b/lib/librte_ipsec/rte_ipsec_sa.h
@@ -54,12 +54,34 @@ struct rte_ipsec_sa_prm {
 };
 
 /**
+ * Indicates that SA will(/will not) need an 'atomic' access
+ * to sequence number and replay window.
+ * 'atomic' here means:
+ * functions:
+ *  - rte_ipsec_crypto_prepare
+ *  - rte_ipsec_process
+ * can be safely used in MT environment, as long as the user can guarantee
+ * that they obey multiple readers/single writer model for SQN+replay_window
+ * operations.
+ * To be more specific:
+ * for outbound SA there are no restrictions.
+ * for inbound SA the caller has to guarantee that at any given moment
+ * only one thread is executing rte_ipsec_process() for given SA.
+ * Note that it is caller responsibility to maintain correct order
+ * of packets to be processed.
+ * In other words - it is a caller responsibility to serialize process()
+ * invocations.
+ */
+#define        RTE_IPSEC_SAFLAG_SQN_ATOM       (1ULL << 0)
+
+/**
  * SA type is an 64-bit value that contain the following information:
  * - IP version (IPv4/IPv6)
  * - IPsec proto (ESP/AH)
  * - inbound/outbound
  * - mode (TRANSPORT/TUNNEL)
  * - for TUNNEL outer IP version (IPv4/IPv6)
+ * - are SA SQN operations 'atomic'
  * ...
  */
 
@@ -68,6 +90,7 @@ enum {
        RTE_SATP_LOG_PROTO,
        RTE_SATP_LOG_DIR,
        RTE_SATP_LOG_MODE,
+       RTE_SATP_LOG_SQN = RTE_SATP_LOG_MODE + 2,
        RTE_SATP_LOG_NUM
 };
 
@@ -88,6 +111,10 @@ enum {
 #define RTE_IPSEC_SATP_MODE_TUNLV4     (1ULL << RTE_SATP_LOG_MODE)
 #define RTE_IPSEC_SATP_MODE_TUNLV6     (2ULL << RTE_SATP_LOG_MODE)
 
+#define RTE_IPSEC_SATP_SQN_MASK                (1ULL << RTE_SATP_LOG_SQN)
+#define RTE_IPSEC_SATP_SQN_RAW         (0ULL << RTE_SATP_LOG_SQN)
+#define RTE_IPSEC_SATP_SQN_ATOM                (1ULL << RTE_SATP_LOG_SQN)
+
 /**
  * get type of given SA
  * @return
diff --git a/lib/librte_ipsec/rwl.h b/lib/librte_ipsec/rwl.h
new file mode 100644
index 000000000..fc44d1e9f
--- /dev/null
+++ b/lib/librte_ipsec/rwl.h
@@ -0,0 +1,68 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Intel Corporation
+ */
+
+#ifndef _RWL_H_
+#define _RWL_H_
+
+/**
+ * @file rwl.h
+ *
+ * Analog of read-write locks, very much in favour of read side.
+ * Assumes, that there are no more then INT32_MAX concurrent readers.
+ * Consider to move into librte_eal.
+ */
+
+/**
+ * release read-lock.
+ * @param p
+ *   pointer to atomic variable.
+ */
+static inline void
+rwl_read_unlock(rte_atomic32_t *p)
+{
+       rte_atomic32_sub(p, 1);
+}
+
+/**
+ * try to grab read-lock.
+ * @param p
+ *   pointer to atomic variable.
+ * @return
+ *   positive value on success
+ */
+static inline int
+rwl_try_read_lock(rte_atomic32_t *p)
+{
+       int32_t rc;
+
+       rc = rte_atomic32_add_return(p, 1);
+       if (rc < 0)
+               rwl_read_unlock(p);
+       return rc;
+}
+
+/**
+ * grab write-lock.
+ * @param p
+ *   pointer to atomic variable.
+ */
+static inline void
+rwl_write_lock(rte_atomic32_t *p)
+{
+       while (rte_atomic32_cmpset((volatile uint32_t *)p, 0, INT32_MIN) == 0)
+               rte_pause();
+}
+
+/**
+ * release write-lock.
+ * @param p
+ *   pointer to atomic variable.
+ */
+static inline void
+rwl_write_unlock(rte_atomic32_t *p)
+{
+       rte_atomic32_sub(p, INT32_MIN);
+}
+
+#endif /* _RWL_H_ */
diff --git a/lib/librte_ipsec/sa.c b/lib/librte_ipsec/sa.c
index ae8ce4f24..e2852b020 100644
--- a/lib/librte_ipsec/sa.c
+++ b/lib/librte_ipsec/sa.c
@@ -89,6 +89,9 @@ ipsec_sa_size(uint32_t wsz, uint64_t type, uint32_t 
*nb_bucket)
        *nb_bucket = n;
 
        sz = rsn_size(n);
+       if ((type & RTE_IPSEC_SATP_SQN_MASK) == RTE_IPSEC_SATP_SQN_ATOM)
+               sz *= REPLAY_SQN_NUM;
+
        sz += sizeof(struct rte_ipsec_sa);
        return sz;
 }
@@ -135,6 +138,12 @@ fill_sa_type(const struct rte_ipsec_sa_prm *prm)
                        tp |= RTE_IPSEC_SATP_IPV4;
        }
 
+       /* interpret flags */
+       if (prm->flags & RTE_IPSEC_SAFLAG_SQN_ATOM)
+               tp |= RTE_IPSEC_SATP_SQN_ATOM;
+       else
+               tp |= RTE_IPSEC_SATP_SQN_RAW;
+
        return tp;
 }
 
@@ -151,7 +160,7 @@ esp_inb_tun_init(struct rte_ipsec_sa *sa)
 static void
 esp_outb_tun_init(struct rte_ipsec_sa *sa, const struct rte_ipsec_sa_prm *prm)
 {
-       sa->sqn.outb = 1;
+       sa->sqn.outb.raw = 1;
        sa->hdr_len = prm->tun.hdr_len;
        sa->hdr_l3_off = prm->tun.hdr_l3_off;
        memcpy(sa->hdr, prm->tun.hdr, sa->hdr_len);
@@ -279,7 +288,10 @@ rte_ipsec_sa_init(struct rte_ipsec_sa *sa, const struct 
rte_ipsec_sa_prm *prm,
                sa->replay.win_sz = prm->replay_win_sz;
                sa->replay.nb_bucket = nb;
                sa->replay.bucket_index_mask = sa->replay.nb_bucket - 1;
-               sa->sqn.inb = (struct replay_sqn *)(sa + 1);
+               sa->sqn.inb.rsn[0] = (struct replay_sqn *)(sa + 1);
+               if ((type & RTE_IPSEC_SATP_SQN_MASK) == RTE_IPSEC_SATP_SQN_ATOM)
+                       sa->sqn.inb.rsn[1] = (struct replay_sqn *)
+                               ((uintptr_t)sa->sqn.inb.rsn[0] + rsn_size(nb));
        }
 
        return sz;
@@ -564,7 +576,7 @@ esp_inb_tun_prepare(struct rte_ipsec_sa *sa, struct 
rte_mbuf *mb[],
        struct replay_sqn *rsn;
        union sym_op_data icv;
 
-       rsn = sa->sqn.inb;
+       rsn = rsn_acquire(sa);
 
        k = 0;
        for (i = 0; i != num; i++) {
@@ -583,6 +595,7 @@ esp_inb_tun_prepare(struct rte_ipsec_sa *sa, struct 
rte_mbuf *mb[],
                }
        }
 
+       rsn_release(sa, rsn);
        return k;
 }
 
@@ -732,7 +745,7 @@ esp_inb_rsn_update(struct rte_ipsec_sa *sa, const uint32_t 
sqn[],
        uint32_t i, k;
        struct replay_sqn *rsn;
 
-       rsn = sa->sqn.inb;
+       rsn = rsn_update_start(sa);
 
        k = 0;
        for (i = 0; i != num; i++) {
@@ -742,6 +755,7 @@ esp_inb_rsn_update(struct rte_ipsec_sa *sa, const uint32_t 
sqn[],
                        dr[i - k] = mb[i];
        }
 
+       rsn_update_finish(sa, rsn);
        return k;
 }
 
diff --git a/lib/librte_ipsec/sa.h b/lib/librte_ipsec/sa.h
index 13a5a68f3..9fe1f8483 100644
--- a/lib/librte_ipsec/sa.h
+++ b/lib/librte_ipsec/sa.h
@@ -9,7 +9,7 @@
 #define IPSEC_MAX_IV_SIZE      16
 #define IPSEC_MAX_IV_QWORD     (IPSEC_MAX_IV_SIZE / sizeof(uint64_t))
 
-/* helper structures to store/update crypto session/op data */
+/* these definitions probably has to be in rte_crypto_sym.h */
 union sym_op_ofslen {
        uint64_t raw;
        struct {
@@ -26,8 +26,11 @@ union sym_op_data {
        };
 };
 
-/* Inbound replay window and last sequence number */
+#define REPLAY_SQN_NUM         2
+#define REPLAY_SQN_NEXT(n)     ((n) ^ 1)
+
 struct replay_sqn {
+       rte_atomic32_t rwl;
        uint64_t sqn;
        __extension__ uint64_t window[0];
 };
@@ -64,10 +67,21 @@ struct rte_ipsec_sa {
 
        /*
         * sqn and replay window
+        * In case of SA handled by multiple threads *sqn* cacheline
+        * could be shared by multiple cores.
+        * To minimise perfomance impact, we try to locate in a separate
+        * place from other frequently accesed data.
         */
        union {
-               uint64_t outb;
-               struct replay_sqn *inb;
+               union {
+                       rte_atomic64_t atom;
+                       uint64_t raw;
+               } outb;
+               struct {
+                       uint32_t rdidx; /* read index */
+                       uint32_t wridx; /* write index */
+                       struct replay_sqn *rsn[REPLAY_SQN_NUM];
+               } inb;
        } sqn;
 
 } __rte_cache_aligned;
-- 
2.13.6

Reply via email to