On Mon, Aug 18, 2014 at 1:29 PM, Daniele Di Proietto
<ddiproie...@vmware.com> wrote:
> Since lookups in the classifier can be pretty expensive, we introduce this
> (thread local) cache which simply compares the miniflows of the packets
>
Patch looks good, I have few comment, nothing major.

> Signed-off-by: Daniele Di Proietto <ddiproie...@vmware.com>
> ---
>  lib/dpif-netdev.c | 422 
> +++++++++++++++++++++++++++++++++++++++++++++---------
>  1 file changed, 357 insertions(+), 65 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 4e02a6e..983c72f 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -87,6 +87,59 @@ static struct shash dp_netdevs 
> OVS_GUARDED_BY(dp_netdev_mutex)
>
>  static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
>
> +/* Stores a miniflow */
> +
> +/* There are fields in the flow structure that we never use. Therefore we can
> + * save a few words of memory */
> +#define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S - MINI_N_INLINE \
> +                                 - FLOW_U32_SIZE(regs) \
> +                                 - FLOW_U32_SIZE(metadata) \

You can move MINI_N_INLINE to second line.

> +                                )
> +struct netdev_flow_key {
> +    struct miniflow flow;
> +    uint32_t buf[NETDEV_KEY_BUF_SIZE_U32];
> +};
> +
> +/* Exact match cache for frequently used flows
> + *
> + * The cache uses a 32-bit hash of the packet (which can be the RSS hash) to
> + * search its entries for a miniflow that matches exactly the miniflow of the
> + * packet. It stores the 'cls_rule'(rule) that matches the miniflow.
> + *
> + * A cache entry holds a reference to its 'dp_netdev_flow'.
> + *
> + * A miniflow with a given hash can be in one of EM_FLOW_HASH_SEGS different
> + * entries. Given its hash (h), the miniflow can be in the entries whose 
> index
> + * is:
> + *
> + * h                           & EM_FLOW_HASH_MASK
> + * h >>     EM_FLOW_HASH_SHIFT & EM_FLOW_HASH_MASK
> + * h >> 2 * EM_FLOW_HASH_SHIFT & EM_FLOW_HASH_MASK
> + * h >> 3 * EM_FLOW_HASH_SHIFT & EM_FLOW_HASH_MASK
> + * ...
> + *
> + * Thread-safety
> + * =============
> + *
> + * Each pmd_thread has its own private exact match cache.
> + * If dp_netdev_input is not called from a pmd thread, a mutex is used.
> + */
> +
> +#define EM_FLOW_HASH_SHIFT 10
> +#define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT)
> +#define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1)
> +#define EM_FLOW_HASH_SEGS 2
> +
> +struct emc_entry {
> +    struct dp_netdev_flow *flow;
> +    struct netdev_flow_key mf;
> +    uint32_t hash;
> +};
> +
> +struct emc_cache {
> +    struct emc_entry entries[EM_FLOW_HASH_ENTRIES];
> +};
> +
>  /* Datapath based on the network device interface from netdev.h.
>   *
>   *
> @@ -101,6 +154,7 @@ static struct vlog_rate_limit upcall_rl = 
> VLOG_RATE_LIMIT_INIT(600, 600);
>   *    dp_netdev_mutex (global)
>   *    port_mutex
>   *    flow_mutex
> + *    emc_mutex
>   */
>  struct dp_netdev {
>      const struct dpif_class *const class;
> @@ -141,6 +195,12 @@ struct dp_netdev {
>      struct pmd_thread *pmd_threads;
>      size_t n_pmd_threads;
>      int pmd_count;
> +
> +    /* Exact match cache for non-pmd devices.
> +     * Pmd devices use instead each thread's flow_cache for this purpose.
> +     * Protected by emc_mutex */
> +    struct emc_cache flow_cache OVS_GUARDED;
> +    struct ovs_mutex emc_mutex;
>  };
>
>  static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev 
> *dp,
> @@ -173,20 +233,6 @@ struct dp_netdev_port {
>      char *type;                 /* Port type as requested by user. */
>  };
>
> -
> -/* Stores a miniflow */
> -
> -/* There are fields in the flow structure that we never use. Therefore we can
> - * save a few words of memory */
> -#define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S - MINI_N_INLINE \
> -                                 - FLOW_U32_SIZE(regs) \
> -                                 - FLOW_U32_SIZE(metadata) \
> -                                )
> -struct netdev_flow_key {
> -    struct miniflow flow;
> -    uint32_t buf[NETDEV_KEY_BUF_SIZE_U32];
> -};
> -
>  /* A flow in dp_netdev's 'flow_table'.
>   *
>   *
> @@ -225,6 +271,7 @@ struct netdev_flow_key {
>   * requires synchronization, as noted in more detail below.
>   */
>  struct dp_netdev_flow {
> +    bool dead;
>      /* Packet classification. */
>      const struct cls_rule cr;   /* In owning dp_netdev's 'cls'. */
>
> @@ -248,6 +295,7 @@ struct dp_netdev_flow {
>  };
>
>  static void dp_netdev_flow_unref(struct dp_netdev_flow *);
> +static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
>
>  /* Contained by struct dp_netdev_flow's 'stats' member.  */
>  struct dp_netdev_flow_stats {
> @@ -292,6 +340,7 @@ static void dp_netdev_actions_free(struct 
> dp_netdev_actions *);
>   **/
>  struct pmd_thread {
>      struct dp_netdev *dp;
> +    struct emc_cache flow_cache;
>      pthread_t thread;
>      int id;
>      atomic_uint change_seq;
> @@ -323,15 +372,41 @@ static int dpif_netdev_open(const struct dpif_class *, 
> const char *name,
>  static void dp_netdev_execute_actions(struct dp_netdev *dp,
>                                        struct dpif_packet **, int c,
>                                        bool may_steal, struct pkt_metadata *,
> +                                      struct emc_cache *flow_cache,
>                                        const struct nlattr *actions,
>                                        size_t actions_len);
>  static void dp_netdev_port_input(struct dp_netdev *dp,
> +                                 struct emc_cache *flow_cache,
>                                   struct dpif_packet **packets, int cnt,
>                                   odp_port_t port_no);
>
>  static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>
> +static void emc_clear_entry(struct emc_entry *ce);
> +
> +static void
> +emc_cache_init(struct emc_cache *flow_cache)
> +{
> +    int i;
> +
> +    for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) {
> +        flow_cache->entries[i].flow = NULL;
> +        miniflow_initialize(&flow_cache->entries[i].mf.flow,
> +                            flow_cache->entries[i].mf.buf);
> +    }
> +}
> +
> +static void
> +emc_cache_uninit(struct emc_cache *flow_cache)
> +{
> +    int i;
> +
> +    for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) {
> +        emc_clear_entry(&flow_cache->entries[i]);
> +    }
> +}
> +
>  static struct dpif_netdev *
>  dpif_netdev_cast(const struct dpif *dpif)
>  {
> @@ -479,6 +554,11 @@ create_dp_netdev(const char *name, const struct 
> dpif_class *class,
>          return error;
>      }
>
> +    ovs_mutex_init(&dp->emc_mutex);
> +    ovs_mutex_lock(&dp->emc_mutex);
> +    emc_cache_init(&dp->flow_cache);
> +    ovs_mutex_unlock(&dp->emc_mutex);
> +
I am not sure why emc_cache_init() is called under the lock.

>      *dpp = dp;
>      return 0;
>  }
> @@ -543,6 +623,12 @@ dp_netdev_free(struct dp_netdev *dp)
>      cmap_destroy(&dp->ports);
>      fat_rwlock_destroy(&dp->upcall_rwlock);
>      latch_destroy(&dp->exit_latch);
> +
> +    ovs_mutex_lock(&dp->emc_mutex);
> +    emc_cache_uninit(&dp->flow_cache);
> +    ovs_mutex_unlock(&dp->emc_mutex);
> +    ovs_mutex_destroy(&dp->emc_mutex);
> +

same as above.

>      free(CONST_CAST(char *, dp->name));
>      free(dp);
>  }
> @@ -918,6 +1004,7 @@ dp_netdev_remove_flow(struct dp_netdev *dp, struct 
> dp_netdev_flow *flow)
>
>      classifier_remove(&dp->cls, cr);
>      cmap_remove(&dp->flow_table, node, flow_hash(&flow->flow, 0));
> +    flow->dead = true;
>
>      dp_netdev_flow_unref(flow);
>  }
> @@ -1025,6 +1112,118 @@ dp_netdev_flow_cast(const struct cls_rule *cr)
>      return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL;
>  }
>
> +static bool dp_netdev_flow_ref(struct dp_netdev_flow *flow)
> +{
> +    return ovs_refcount_try_ref_rcu(&flow->ref_cnt);
> +}
> +
> +static inline bool
> +emc_entry_alive(struct emc_entry *ce)
> +{
> +    return ce->flow && !ce->flow->dead;
> +}
> +
> +static void
> +emc_clear_entry(struct emc_entry *ce)
> +{
> +    if (ce->flow) {
> +        dp_netdev_flow_unref(ce->flow);
> +        ce->flow = NULL;
> +    }
> +}
> +
> +static inline void
> +emc_change_entry(struct emc_entry *ce, struct dp_netdev_flow *flow,
> +                 const struct miniflow *mf, uint32_t hash)
> +{
> +    if (ce->flow != flow) {
> +        if (ce->flow) {
> +            dp_netdev_flow_unref(ce->flow);
> +        }
> +
> +        if(dp_netdev_flow_ref(flow)) {
> +            ce->flow = flow;
> +        } else {
> +            ce->flow = NULL;
> +        }
> +    }
> +    if (mf) {
> +        miniflow_clone_inline(&ce->mf.flow, mf, count_1bits(mf->map));
> +        ce->hash = hash;
> +    }
> +}
> +
> +static inline void
> +emc_insert(struct emc_cache *cache, const struct miniflow *mf, uint32_t hash,
> +           struct dp_netdev_flow *flow)
> +{
> +    struct emc_entry *to_be_replaced = NULL;
> +    uint32_t search_hash = hash;
> +    int i;
> +
> +    /* Each flow can be in one of EM_FLOW_HASH_SEGS entries */
> +    for (i = 0; i < EM_FLOW_HASH_SEGS; i++) {
> +        struct emc_entry *current_entry;
> +        int current_idx;
> +
> +        current_idx = search_hash & EM_FLOW_HASH_MASK;
> +        current_entry = &cache->entries[current_idx];
> +
> +        if (emc_entry_alive(current_entry)) {
> +            if (current_entry->hash == hash
> +                && miniflow_equal(&current_entry->mf.flow, mf)) {
> +                /* We found the entry with the 'mf' miniflow */
> +                emc_change_entry(current_entry, flow, NULL, 0);
> +
There is no need to check for live flow first, just check if hash and
mf is same and then directly change entry.

> +                return;
> +            }
> +        }
> +
> +        /* Replacement policy: put the flow in an empty (not alive) entry, or
> +         * in the first entry where it can be */
> +        if (!to_be_replaced
> +            || (emc_entry_alive(to_be_replaced)
> +                && !emc_entry_alive(current_entry))) {
> +            to_be_replaced = current_entry;
> +        }
> +
> +        search_hash >>= EM_FLOW_HASH_SHIFT;
> +    }
> +    /* We didn't find the miniflow in the cache.
> +     * The 'to_be_replaced' entry is where the new flow will be stored */
> +
> +    emc_change_entry(to_be_replaced, flow, mf, hash);
> +}
> +
> +static inline struct dp_netdev_flow *
> +emc_lookup(struct emc_cache *cache, const struct miniflow *mf,
> +           uint32_t hash)
> +{
> +    uint32_t search_hash = hash;
> +    int i;
> +
> +    /* Each flow can be in one of EM_FLOW_HASH_SEGS entries */
> +    for (i = 0; i < EM_FLOW_HASH_SEGS; i++) {
> +        struct emc_entry *current_entry;
> +        int current_idx;
> +
> +        current_idx = search_hash & EM_FLOW_HASH_MASK;
> +        current_entry = &cache->entries[current_idx];
> +
> +        if (emc_entry_alive(current_entry)) {
> +            if (current_entry->hash == hash
> +                && miniflow_equal(&current_entry->mf.flow, mf)) {
> +                /* We found the entry with the 'mf' miniflow */
> +                return current_entry->flow;
> +            }
First we should check for hash and then check if it live, it saves
fetching flow object.

> +        }
> +
> +        search_hash >>= EM_FLOW_HASH_SHIFT;
> +    }
> +
We can have iterator API for emc so that we can use same in insert and
in lookup.

> +    return NULL;
> +}
> +

>  static struct dp_netdev_flow *
>  dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key)
>  {
> @@ -1516,8 +1715,11 @@ dpif_netdev_execute(struct dpif *dpif, struct 
> dpif_execute *execute)
>      packet.ofpbuf = *execute->packet;
>      pp = &packet;
>
> +    ovs_mutex_lock(&dp->emc_mutex);
>      dp_netdev_execute_actions(dp, &pp, 1, false, md,
> -                              execute->actions, execute->actions_len);
> +                              &dp->flow_cache, execute->actions,
> +                              execute->actions_len);
> +    ovs_mutex_unlock(&dp->emc_mutex);
>
>      /* Even though may_steal is set to false, some actions could modify or
>       * reallocate the ofpbuf memory. We need to pass those changes to the
> @@ -1595,15 +1797,16 @@ dp_netdev_actions_free(struct dp_netdev_actions 
> *actions)
>
>  static void
>  dp_netdev_process_rxq_port(struct dp_netdev *dp,
> -                          struct dp_netdev_port *port,
> -                          struct netdev_rxq *rxq)
> +                           struct emc_cache *flow_cache,
> +                           struct dp_netdev_port *port,
> +                           struct netdev_rxq *rxq)
>  {
>      struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
>      int error, cnt;
>
>      error = netdev_rxq_recv(rxq, packets, &cnt);
>      if (!error) {
> -        dp_netdev_port_input(dp, packets, cnt, port->port_no);
> +        dp_netdev_port_input(dp, flow_cache, packets, cnt, port->port_no);
>      } else if (error != EAGAIN && error != EOPNOTSUPP) {
>          static struct vlog_rate_limit rl
>              = VLOG_RATE_LIMIT_INIT(1, 5);
> @@ -1620,15 +1823,18 @@ dpif_netdev_run(struct dpif *dpif)
>      struct dp_netdev_port *port;
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>
> +    ovs_mutex_lock(&dp->emc_mutex);
>      CMAP_FOR_EACH (port, node, &dp->ports) {
>          if (!netdev_is_pmd(port->netdev)) {
>              int i;
>
>              for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> -                dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
> +                dp_netdev_process_rxq_port(dp, &dp->flow_cache, port,
> +                                           port->rxq[i]);
>              }
>          }
>      }
> +    ovs_mutex_unlock(&dp->emc_mutex);
>  }
>
>  static void
> @@ -1710,6 +1916,7 @@ pmd_thread_main(void *f_)
>      poll_cnt = 0;
>      poll_list = NULL;
>
> +    emc_cache_init(&f->flow_cache);
>      pmd_thread_setaffinity_cpu(f->id);
>  reload:
>      poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
> @@ -1718,7 +1925,8 @@ reload:
>          int i;
>
>          for (i = 0; i < poll_cnt; i++) {
> -            dp_netdev_process_rxq_port(dp,  poll_list[i].port, 
> poll_list[i].rx);
> +            dp_netdev_process_rxq_port(dp, &f->flow_cache, poll_list[i].port,
> +                                       poll_list[i].rx);
>          }
>
>          if (lc++ > 1024) {
> @@ -1744,6 +1952,8 @@ reload:
>           port_unref(poll_list[i].port);
>      }
>
> +    emc_cache_uninit(&f->flow_cache);
> +
emc cache flows should be free on reload, otherwise flows might stick
around for longer time.

>      free(poll_list);
>      return NULL;
>  }
> @@ -1936,8 +2146,8 @@ struct packet_batch {
>  };
>
>  static inline void
> -packet_batch_update(struct packet_batch *batch,
> -                    struct dpif_packet *packet, const struct miniflow *mf)
> +packet_batch_update(struct packet_batch *batch, struct dpif_packet *packet,
> +                    const struct miniflow *mf)
>  {
>      batch->tcp_flags |= miniflow_get_tcp_flags(mf);
>      batch->packets[batch->packet_count++] = packet;
> @@ -1957,7 +2167,8 @@ packet_batch_init(struct packet_batch *batch, struct 
> dp_netdev_flow *flow,
>  }
>
>  static inline void
> -packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp)
> +packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
> +                     struct emc_cache *flow_cache)
>  {
>      struct dp_netdev_actions *actions;
>      struct dp_netdev_flow *flow = batch->flow;
> @@ -1967,36 +2178,114 @@ packet_batch_execute(struct packet_batch *batch, 
> struct dp_netdev *dp)
>
>      actions = dp_netdev_flow_get_actions(flow);
>
> -    dp_netdev_execute_actions(dp, batch->packets,
> -                              batch->packet_count, true, &batch->md,
> +    dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, true,
> +                              &batch->md, flow_cache,
>                                actions->actions, actions->size);
>
>      dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
>  }
>
> -static void
> -dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
> -                struct pkt_metadata *md)
> +static inline bool
> +dp_netdev_queue_batches(struct dpif_packet *pkt, struct pkt_metadata *md,
> +                        struct dp_netdev_flow *flow, const struct miniflow 
> *mf,
> +                        struct packet_batch *batches, size_t *n_batches,
> +                        size_t max_batches)
> +{
> +    struct packet_batch *batch = NULL;
> +    int j;
> +
> +    /* XXX: This O(n^2) algortihm makes sense if we're operating under the
> +     * assumption that the number of distinct flows (and therefore the
> +     * number of distinct batches) is quite small.  If this turns out not
> +     * to be the case, it may make sense to pre sort based on the
> +     * netdev_flow pointer.  That done we can get the appropriate batching
> +     * in O(n * log(n)) instead. */
> +    for (j = *n_batches - 1; j >= 0; j--) {
> +        if (batches[j].flow == flow) {
> +            batch = &batches[j];
Update batch here and return to save couple of check below for case
where we have batch already started.
> +        }
> +    }
> +    if (!batch && *n_batches >= max_batches) {
> +        return false;
> +    }
> +
> +    if (!batch) {
> +        batch = &batches[(*n_batches)++];
> +        packet_batch_init(batch, flow, md);
> +    }
> +    packet_batch_update(batch, pkt, mf);
> +    return true;
> +}
> +
> +/* Try to process all ('cnt') the 'packets' using only the exact match cache
> + * 'flow_cache'. If a flow is not found for a packet 'packets[i]', or if 
> there
> + * is no matching batch for a packet's flow, the miniflow is copied into 
> 'keys'
> + * and the packet pointer is moved at the beginning of the 'packets' array.
> + *
> + * The function returns the number of packets that needs to be processed in 
> the
> + * 'packets' array (they have been moved to the beginning of the vector).
> + */
> +static inline size_t
> +emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
> +               struct dpif_packet **packets, size_t cnt,
> +               struct pkt_metadata *md, struct netdev_flow_key *keys)
>  {
> -    struct packet_batch batches[NETDEV_MAX_RX_BATCH];
> -    struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
> -    const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. 
> */
> -    struct cls_rule *rules[NETDEV_MAX_RX_BATCH];
> +    struct packet_batch batches[4];
>      size_t n_batches, i;
> -    bool any_miss;
> +    size_t notfound_cnt = 0;
>
> +    n_batches = 0;
>      for (i = 0; i < cnt; i++) {
> +        struct netdev_flow_key key;
> +        struct dp_netdev_flow *flow;
> +        uint32_t hash;
> +
>          if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) 
> {
>              dpif_packet_delete(packets[i]);
> -            mfs[i] = NULL;
>              continue;
>          }
>
> -        miniflow_initialize(&keys[i].flow, keys[i].buf);
> -        miniflow_extract(&packets[i]->ofpbuf, md, &keys[i].flow);
> -        mfs[i] = &keys[i].flow;
> +        miniflow_initialize(&key.flow, key.buf);
> +        miniflow_extract(&packets[i]->ofpbuf, md, &key.flow);
> +
> +        hash = dp_netdev_packet_rss(packets[i], &key.flow);
> +
> +        flow = emc_lookup(flow_cache, &key.flow, hash);
> +        if (!flow || !dp_netdev_queue_batches(packets[i], md, flow,  
> &key.flow,
> +                                  batches, &n_batches, ARRAY_SIZE(batches))) 
> {
> +            struct dpif_packet *swap;
> +
check for "notfound_cnt equal to i" can save swapping operation.
better if you just write a swap function.

:> +            keys[notfound_cnt] = key;
> +
> +            swap = packets[notfound_cnt];
> +            packets[notfound_cnt] = packets[i];
> +            packets[i] = swap;
> +
> +            notfound_cnt++;
> +        }
> +    }
> +
> +    for (i = 0; i < n_batches; i++) {
> +        packet_batch_execute(&batches[i], dp, flow_cache);
>      }
>
> +    return notfound_cnt;
> +}
> +
> +static inline void
> +fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
> +                     struct dpif_packet **packets, size_t cnt,
> +                     struct pkt_metadata *md, struct netdev_flow_key *keys)
> +{
> +    struct packet_batch batches[cnt];
> +    const struct miniflow *mfs[cnt]; /* NULL at bad packets. */
> +    struct cls_rule *rules[cnt];
> +    size_t n_batches, i;
> +    bool any_miss;
> +
> +    for (i = 0; i < cnt; i++) {
> +        mfs[i] = &keys[i].flow;
> +    }
>      any_miss = !classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
>      if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) 
> {
>          uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
> @@ -2040,7 +2329,7 @@ dp_netdev_input(struct dp_netdev *dp, struct 
> dpif_packet **packets, int cnt,
>               * the actions.  Otherwise, if there are any slow path actions,
>               * we'll send the packet up twice. */
>              dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
> -                                      ofpbuf_data(&actions),
> +                                      flow_cache, ofpbuf_data(&actions),
>                                        ofpbuf_size(&actions));
>
>              add_actions = ofpbuf_size(&put_actions)
> @@ -2070,53 +2359,53 @@ dp_netdev_input(struct dp_netdev *dp, struct 
> dpif_packet **packets, int cnt,
>      n_batches = 0;
>      for (i = 0; i < cnt; i++) {
>          struct dp_netdev_flow *flow;
> -        struct packet_batch *batch;
> -        size_t j;
> +        uint32_t hash;
>
>          if (OVS_UNLIKELY(!rules[i] || !mfs[i])) {
>              continue;
>          }
>
> -        /* XXX: This O(n^2) algortihm makes sense if we're operating under 
> the
> -         * assumption that the number of distinct flows (and therefore the
> -         * number of distinct batches) is quite small.  If this turns out not
> -         * to be the case, it may make sense to pre sort based on the
> -         * netdev_flow pointer.  That done we can get the appropriate 
> batching
> -         * in O(n * log(n)) instead. */
> -        batch = NULL;
> -        flow = dp_netdev_flow_cast(rules[i]);
> -        for (j = 0; j < n_batches; j++) {
> -            if (batches[j].flow == flow) {
> -                batch = &batches[j];
> -                break;
> -            }
> -        }
> +        hash = dp_netdev_packet_rss(packets[i], mfs[i]);
>
> -        if (!batch) {
> -            batch = &batches[n_batches++];
> -            packet_batch_init(batch, flow, md);
> -        }
> -        packet_batch_update(batch, packets[i], mfs[i]);
> +        flow = dp_netdev_flow_cast(rules[i]);
> +        emc_insert(flow_cache, mfs[i], hash, flow);
> +        dp_netdev_queue_batches(packets[i], md, flow, mfs[i], batches,
> +                                &n_batches, ARRAY_SIZE(batches));
>      }
>
>      for (i = 0; i < n_batches; i++) {
> -        packet_batch_execute(&batches[i], dp);
> +        packet_batch_execute(&batches[i], dp, flow_cache);
> +    }
> +}
> +
> +static void
> +dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
> +                struct dpif_packet **packets, int cnt, struct pkt_metadata 
> *md)
> +{
> +    struct netdev_flow_key keys[cnt];
> +    size_t newcnt;
> +
> +    newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys);
> +    if (OVS_UNLIKELY(newcnt)) {
> +        fast_path_processing(dp, flow_cache, packets, newcnt, md, keys);
>      }
>  }
>
> +
>  static void
> -dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets,
> -                     int cnt, odp_port_t port_no)
> +dp_netdev_port_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
> +                     struct dpif_packet **packets, int cnt, odp_port_t 
> port_no)
>  {
>      uint32_t *recirc_depth = recirc_depth_get();
>      struct pkt_metadata md = PKT_METADATA_INITIALIZER(port_no);
>
>      *recirc_depth = 0;
> -    dp_netdev_input(dp, packets, cnt, &md);
> +    dp_netdev_input(dp, flow_cache, packets, cnt, &md);
>  }
>
>  struct dp_netdev_execute_aux {
>      struct dp_netdev *dp;
> +    struct emc_cache *flow_cache;
>  };
>
>  static void
> @@ -2173,6 +2462,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, 
> int cnt,
>                                           NULL);
>                  if (!error || error == ENOSPC) {
>                      dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
> +                                              aux->flow_cache,
>                                                ofpbuf_data(&actions),
>                                                ofpbuf_size(&actions));
>                  }
> @@ -2234,7 +2524,8 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, 
> int cnt,
>                  /* Hash is private to each packet */
>                  recirc_md.dp_hash = packets[i]->dp_hash;
>
> -                dp_netdev_input(dp, &recirc_pkt, 1, &recirc_md);
> +                dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1,
> +                                &recirc_md);
>              }
>              (*depth)--;
>
> @@ -2265,9 +2556,10 @@ static void
>  dp_netdev_execute_actions(struct dp_netdev *dp,
>                            struct dpif_packet **packets, int cnt,
>                            bool may_steal, struct pkt_metadata *md,
> +                          struct emc_cache *flow_cache,
>                            const struct nlattr *actions, size_t actions_len)
>  {
> -    struct dp_netdev_execute_aux aux = {dp};
> +    struct dp_netdev_execute_aux aux = {dp, flow_cache};
>
>      odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
>                          actions_len, dp_execute_cb);
> --
> 2.1.0.rc1
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to