Following patch breaks down ovs_mutex into multiple locks.  This
patch specifically targets flow-install.  By breaking down
ovs-locking parallel flow-installs are possible.

Signed-off-by: Pravin B Shelar <pshe...@nicira.com>
---
 datapath/datapath.c   |   90 ++++++++++++++-----------
 datapath/datapath.h   |    7 ++-
 datapath/dp_notify.c  |    4 +-
 datapath/flow.h       |    3 +-
 datapath/flow_table.c |  181 +++++++++++++++++++++++++++++++------------------
 datapath/flow_table.h |    7 ++-
 6 files changed, 180 insertions(+), 112 deletions(-)

diff --git a/datapath/datapath.c b/datapath/datapath.c
index b4035ce..67f3d19 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -86,23 +86,33 @@ static void ovs_notify(struct sk_buff *skb, struct 
genl_info *info,
  * The RTNL lock nests inside ovs_mutex.
  */
 
-static DEFINE_MUTEX(ovs_mutex);
+static DECLARE_RWSEM(ovs_sem);
 
-void ovs_lock(void)
+void ovs_wr_lock(void)
 {
-       mutex_lock(&ovs_mutex);
+       down_write(&ovs_sem);
 }
 
-void ovs_unlock(void)
+void ovs_wr_unlock(void)
 {
-       mutex_unlock(&ovs_mutex);
+       up_write(&ovs_sem);
+}
+
+void ovs_rd_lock(void)
+{
+       down_read(&ovs_sem);
+}
+
+void ovs_rd_unlock(void)
+{
+       up_read(&ovs_sem);
 }
 
 #ifdef CONFIG_LOCKDEP
 int lockdep_ovsl_is_held(void)
 {
        if (debug_locks)
-               return lockdep_is_held(&ovs_mutex);
+               return lockdep_is_held(&ovs_sem);
        else
                return 1;
 }
@@ -538,13 +548,13 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, 
struct genl_info *info)
        local_bh_enable();
        rcu_read_unlock();
 
-       ovs_flow_free(flow, false);
+       ovs_flow_free(&dp->table, flow, false);
        return err;
 
 err_unlock:
        rcu_read_unlock();
 err_flow_free:
-       ovs_flow_free(flow, false);
+       ovs_flow_free(&dp->table, flow, false);
 err_kfree_skb:
        kfree_skb(packet);
 err:
@@ -789,7 +799,7 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, 
struct genl_info *info)
                goto error;
        }
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        error = -ENODEV;
        if (!dp)
@@ -861,7 +871,7 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, 
struct genl_info *info)
                        spin_unlock_bh(&flow->lock);
                }
        }
-       ovs_unlock();
+       ovs_rd_unlock();
 
        if (!IS_ERR(reply))
                ovs_notify(reply, info, &ovs_dp_flow_multicast_group);
@@ -871,9 +881,9 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, 
struct genl_info *info)
        return 0;
 
 err_flow_free:
-       ovs_flow_free(flow, false);
+       ovs_flow_free(&dp->table, flow, false);
 err_unlock_ovs:
-       ovs_unlock();
+       ovs_rd_unlock();
 err_kfree:
        kfree(acts);
 error:
@@ -901,7 +911,7 @@ static int ovs_flow_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
        if (err)
                return err;
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        if (!dp) {
                err = -ENODEV;
@@ -921,10 +931,10 @@ static int ovs_flow_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
        }
 
-       ovs_unlock();
+       ovs_rd_unlock();
        return genlmsg_reply(reply, info);
 unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -939,7 +949,7 @@ static int ovs_flow_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        struct sw_flow_match match;
        int err;
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        if (!dp) {
                err = -ENODEV;
@@ -974,13 +984,13 @@ static int ovs_flow_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
                                     info->snd_seq, 0, OVS_FLOW_CMD_DEL);
        BUG_ON(err < 0);
 
-       ovs_flow_free(flow, true);
-       ovs_unlock();
+       ovs_flow_free(&dp->table, flow, true);
+       ovs_rd_unlock();
 
        ovs_notify(reply, info, &ovs_dp_flow_multicast_group);
        return 0;
 unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -1158,7 +1168,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
        if (!a[OVS_DP_ATTR_NAME] || !a[OVS_DP_ATTR_UPCALL_PID])
                goto err;
 
-       ovs_lock();
+       ovs_wr_lock();
 
        err = -ENOMEM;
        dp = kzalloc(sizeof(*dp), GFP_KERNEL);
@@ -1214,7 +1224,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
        ovs_net = net_generic(ovs_dp_get_net(dp), ovs_net_id);
        list_add_tail_rcu(&dp->list_node, &ovs_net->dps);
 
-       ovs_unlock();
+       ovs_wr_unlock();
 
        ovs_notify(reply, info, &ovs_dp_datapath_multicast_group);
        return 0;
@@ -1231,7 +1241,7 @@ err_free_dp:
        release_net(ovs_dp_get_net(dp));
        kfree(dp);
 err_unlock_ovs:
-       ovs_unlock();
+       ovs_wr_unlock();
 err:
        return err;
 }
@@ -1266,7 +1276,7 @@ static int ovs_dp_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        struct datapath *dp;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        dp = lookup_datapath(sock_net(skb->sk), info->userhdr, info->attrs);
        err = PTR_ERR(dp);
        if (IS_ERR(dp))
@@ -1279,13 +1289,13 @@ static int ovs_dp_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
 
        __dp_destroy(dp);
-       ovs_unlock();
+       ovs_wr_unlock();
 
        ovs_notify(reply, info, &ovs_dp_datapath_multicast_group);
 
        return 0;
 unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1295,7 +1305,7 @@ static int ovs_dp_cmd_set(struct sk_buff *skb, struct 
genl_info *info)
        struct datapath *dp;
        int err;
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = lookup_datapath(sock_net(skb->sk), info->userhdr, info->attrs);
        err = PTR_ERR(dp);
        if (IS_ERR(dp))
@@ -1311,12 +1321,12 @@ static int ovs_dp_cmd_set(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
        }
 
-       ovs_unlock();
+       ovs_rd_unlock();
        ovs_notify(reply, info, &ovs_dp_datapath_multicast_group);
 
        return 0;
 unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -1326,7 +1336,7 @@ static int ovs_dp_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
        struct datapath *dp;
        int err;
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = lookup_datapath(sock_net(skb->sk), info->userhdr, info->attrs);
        if (IS_ERR(dp)) {
                err = PTR_ERR(dp);
@@ -1340,11 +1350,11 @@ static int ovs_dp_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
        }
 
-       ovs_unlock();
+       ovs_rd_unlock();
        return genlmsg_reply(reply, info);
 
 unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -1524,7 +1534,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
            !a[OVS_VPORT_ATTR_UPCALL_PID])
                goto exit;
 
-       ovs_lock();
+       ovs_wr_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        err = -ENODEV;
        if (!dp)
@@ -1580,7 +1590,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
        ovs_notify(reply, info, &ovs_dp_vport_multicast_group);
 
 exit_unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
 exit:
        return err;
 }
@@ -1592,7 +1602,7 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, struct 
genl_info *info)
        struct vport *vport;
        int err;
 
-       ovs_lock();
+       ovs_rd_lock();
        vport = lookup_vport(sock_net(skb->sk), info->userhdr, a);
        err = PTR_ERR(vport);
        if (IS_ERR(vport))
@@ -1626,14 +1636,14 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, 
struct genl_info *info)
                                      info->snd_seq, 0, OVS_VPORT_CMD_NEW);
        BUG_ON(err < 0);
 
-       ovs_unlock();
+       ovs_rd_unlock();
        ovs_notify(reply, info, &ovs_dp_vport_multicast_group);
        return 0;
 
 exit_free:
        kfree_skb(reply);
 exit_unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -1644,7 +1654,7 @@ static int ovs_vport_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        struct vport *vport;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        vport = lookup_vport(sock_net(skb->sk), info->userhdr, a);
        err = PTR_ERR(vport);
        if (IS_ERR(vport))
@@ -1667,7 +1677,7 @@ static int ovs_vport_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        ovs_notify(reply, info, &ovs_dp_vport_multicast_group);
 
 exit_unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1836,10 +1846,10 @@ static void __net_exit ovs_exit_net(struct net *net)
        struct datapath *dp, *dp_next;
        struct ovs_net *ovs_net = net_generic(net, ovs_net_id);
 
-       ovs_lock();
+       ovs_wr_lock();
        list_for_each_entry_safe(dp, dp_next, &ovs_net->dps, list_node)
                __dp_destroy(dp);
-       ovs_unlock();
+       ovs_wr_unlock();
 
        cancel_work_sync(&ovs_net->dp_notify_work);
 }
diff --git a/datapath/datapath.h b/datapath/datapath.h
index 64920de..d3eaf3d 100644
--- a/datapath/datapath.h
+++ b/datapath/datapath.h
@@ -132,8 +132,11 @@ struct ovs_net {
 };
 
 extern int ovs_net_id;
-void ovs_lock(void);
-void ovs_unlock(void);
+
+void ovs_wr_lock(void);
+void ovs_wr_unlock(void);
+void ovs_rd_lock(void);
+void ovs_rd_unlock(void);
 
 #ifdef CONFIG_LOCKDEP
 int lockdep_ovsl_is_held(void);
diff --git a/datapath/dp_notify.c b/datapath/dp_notify.c
index b5178fc..f8c4769 100644
--- a/datapath/dp_notify.c
+++ b/datapath/dp_notify.c
@@ -51,7 +51,7 @@ void ovs_dp_notify_wq(struct work_struct *work)
        struct ovs_net *ovs_net = container_of(work, struct ovs_net, 
dp_notify_work);
        struct datapath *dp;
 
-       ovs_lock();
+       ovs_wr_lock();
        list_for_each_entry(dp, &ovs_net->dps, list_node) {
                int i;
 
@@ -72,7 +72,7 @@ void ovs_dp_notify_wq(struct work_struct *work)
                        }
                }
        }
-       ovs_unlock();
+       ovs_wr_unlock();
 }
 
 static int dp_device_event(struct notifier_block *unused, unsigned long event,
diff --git a/datapath/flow.h b/datapath/flow.h
index 3da2d0a..a72b728 100644
--- a/datapath/flow.h
+++ b/datapath/flow.h
@@ -127,7 +127,7 @@ struct sw_flow_key_range {
 };
 
 struct sw_flow_mask {
-       int ref_count;
+       atomic_t ref_count;
        struct rcu_head rcu;
        struct list_head list;
        struct sw_flow_key_range range;
@@ -163,6 +163,7 @@ struct sw_flow {
        u64 packet_count;       /* Number of packets matched. */
        u64 byte_count;         /* Number of bytes matched. */
        u8 tcp_flags;           /* Union of seen TCP flags. */
+       struct list_head list;
 };
 
 struct arp_eth_header {
diff --git a/datapath/flow_table.c b/datapath/flow_table.c
index 9acf81f..03d217e 100644
--- a/datapath/flow_table.c
+++ b/datapath/flow_table.c
@@ -49,8 +49,10 @@
 
 #define TBL_MIN_BUCKETS                1024
 #define REHASH_INTERVAL                (10 * 60 * HZ)
+#define MAX_FLOW_TABLE_LOCKS   (NR_CPUS * 4)
 
 static struct kmem_cache *flow_cache;
+static spinlock_t flow_table_locks[MAX_FLOW_TABLE_LOCKS];
 
 void ovs_match_init(struct sw_flow_match *match,
                    struct sw_flow_key *key,
@@ -144,7 +146,7 @@ int ovs_flow_tbl_count(struct flow_table *table)
 {
        struct hash_table *htable = rcu_dereference_check(table->htable, 
lockdep_ovsl_is_held());
 
-       return htable->count;
+       return atomic_read(&htable->count);
 }
 
 static struct flex_array *alloc_buckets(unsigned int n_buckets)
@@ -176,50 +178,6 @@ static void __flow_free(struct sw_flow *flow)
        kmem_cache_free(flow_cache, flow);
 }
 
-static void rcu_free_flow_callback(struct rcu_head *rcu)
-{
-       struct sw_flow *flow = container_of(rcu, struct sw_flow, rcu);
-
-       __flow_free(flow);
-}
-
-static void rcu_free_sw_flow_mask_cb(struct rcu_head *rcu)
-{
-       struct sw_flow_mask *mask = container_of(rcu, struct sw_flow_mask, rcu);
-
-       kfree(mask);
-}
-
-static void flow_mask_del_ref(struct sw_flow_mask *mask, bool deferred)
-{
-       if (!mask)
-               return;
-
-       BUG_ON(!mask->ref_count);
-       mask->ref_count--;
-
-       if (!mask->ref_count) {
-               list_del_rcu(&mask->list);
-               if (deferred)
-                       call_rcu(&mask->rcu, rcu_free_sw_flow_mask_cb);
-               else
-                       kfree(mask);
-       }
-}
-
-void ovs_flow_free(struct sw_flow *flow, bool deferred)
-{
-       if (!flow)
-               return;
-
-       flow_mask_del_ref(flow->mask, deferred);
-
-       if (deferred)
-               call_rcu(&flow->rcu, rcu_free_flow_callback);
-       else
-               __flow_free(flow);
-}
-
 static void free_buckets(struct flex_array *buckets)
 {
        flex_array_free(buckets);
@@ -240,7 +198,7 @@ static void __flow_tbl_destroy(struct hash_table *table)
 
                hlist_for_each_entry_safe(flow, n, head, hash_node[ver]) {
                        hlist_del(&flow->hash_node[ver]);
-                       ovs_flow_free(flow, false);
+                       __flow_free(flow);
                }
        }
 
@@ -263,7 +221,7 @@ static struct hash_table *__flow_tbl_alloc(int new_size)
                return NULL;
        }
        table->n_buckets = new_size;
-       table->count = 0;
+       atomic_set(&table->count, 0);
        table->node_ver = 0;
        table->keep_flows = false;
        get_random_bytes(&table->hash_seed, sizeof(u32));
@@ -283,6 +241,8 @@ int ovs_flow_table_init(struct flow_table *table)
        rcu_assign_pointer(table->htable, htable);
        INIT_LIST_HEAD(&table->mask_list);
        table->last_rehash = jiffies;
+       init_rwsem(&table->rehash_lock);
+       spin_lock_init(&table->list_lock);
        return 0;
 }
 
@@ -293,11 +253,74 @@ static void flow_tbl_destroy_rcu_cb(struct rcu_head *rcu)
        __flow_tbl_destroy(table);
 }
 
-static void __ovs_flow_tbl_destroy(struct hash_table *table, bool deferred)
+static void rcu_free_sw_flow_mask_cb(struct rcu_head *rcu)
 {
+       struct sw_flow_mask *mask = container_of(rcu, struct sw_flow_mask, rcu);
+
+       kfree(mask);
+}
+
+static void flow_mask_del_ref(struct flow_table *tbl,
+                             struct sw_flow_mask *mask, bool deferred)
+{
+       if (!mask)
+               return;
+
+       if (atomic_dec_and_test(&mask->ref_count)) {
+               spin_lock(&tbl->list_lock);
+               list_del_rcu(&mask->list);
+               spin_unlock(&tbl->list_lock);
+               if (deferred)
+                       call_rcu(&mask->rcu, rcu_free_sw_flow_mask_cb);
+               else
+                       kfree(mask);
+       }
+}
+
+static void rcu_free_flow_callback(struct rcu_head *rcu)
+{
+       struct sw_flow *flow = container_of(rcu, struct sw_flow, rcu);
+
+       __flow_free(flow);
+}
+
+void ovs_flow_free(struct flow_table *tbl, struct sw_flow *flow, bool deferred)
+{
+       if (!flow)
+               return;
+
+       flow_mask_del_ref(tbl, flow->mask, deferred);
+
+       if (deferred)
+               call_rcu(&flow->rcu, rcu_free_flow_callback);
+       else
+               __flow_free(flow);
+}
+
+static void __ovs_flow_tbl_destroy(struct flow_table *flow_table,
+                                  struct hash_table *table, bool deferred)
+{
+       int i;
+
        if (!table)
                return;
 
+       if (table->keep_flows)
+               goto skip_flows;
+
+       for (i = 0; i < table->n_buckets; i++) {
+               struct sw_flow *flow;
+               struct hlist_head *head = flex_array_get(table->buckets, i);
+               struct hlist_node *n;
+               int ver = table->node_ver;
+
+               hlist_for_each_entry_safe(flow, n, head, hash_node[ver])
+                       flow_mask_del_ref(flow_table, flow->mask, deferred);
+
+       }
+
+skip_flows:
+
        if (deferred)
                call_rcu(&table->rcu, flow_tbl_destroy_rcu_cb);
        else
@@ -308,7 +331,7 @@ void ovs_flow_tbl_destroy(struct flow_table *table, bool 
deferred)
 {
        struct hash_table *htable = ovsl_dereference(table->htable);
 
-       __ovs_flow_tbl_destroy(htable, deferred);
+       __ovs_flow_tbl_destroy(table, htable, deferred);
 }
 
 struct sw_flow *ovs_flow_dump_next(struct hash_table *table,
@@ -350,9 +373,11 @@ static void __tbl_insert(struct hash_table *table, struct 
sw_flow *flow)
        struct hlist_head *head;
 
        head = find_bucket(table, flow->hash);
+       spin_lock(&flow_table_locks[flow->hash % MAX_FLOW_TABLE_LOCKS]);
        hlist_add_head_rcu(&flow->hash_node[table->node_ver], head);
+       spin_unlock(&flow_table_locks[flow->hash % MAX_FLOW_TABLE_LOCKS]);
 
-       table->count++;
+       atomic_inc(&table->count);
 }
 
 static void flow_table_copy_flows(struct hash_table *old,
@@ -396,16 +421,26 @@ int ovs_flush_flows(struct flow_table *flow_table)
 {
        struct hash_table *old_table;
        struct hash_table *new_htable;
+       int err;
+
+       down_write(&flow_table->rehash_lock);
 
        old_table = ovsl_dereference(flow_table->htable);
        new_htable = __flow_tbl_alloc(TBL_MIN_BUCKETS);
-       if (!new_htable)
-               return -ENOMEM;
+       if (!new_htable) {
+               up_write(&flow_table->rehash_lock);
+               err = -ENOMEM;
+               goto out;
+       }
 
        rcu_assign_pointer(flow_table->htable, new_htable);
 
-       __ovs_flow_tbl_destroy(old_table, true);
-       return 0;
+       __ovs_flow_tbl_destroy(flow_table, old_table, true);
+       err = 0;
+
+out:
+       up_write(&flow_table->rehash_lock);
+       return err;
 }
 
 static u32 flow_hash(const struct sw_flow_key *key, int key_start,
@@ -545,9 +580,10 @@ void ovs_flow_remove(struct flow_table *table, struct 
sw_flow *flow)
 {
        struct hash_table *htbl = ovsl_dereference(table->htable);
 
-       BUG_ON(htbl->count == 0);
+       spin_lock(&flow_table_locks[flow->hash % MAX_FLOW_TABLE_LOCKS]);
        hlist_del_rcu(&flow->hash_node[htbl->node_ver]);
-       htbl->count--;
+       spin_unlock(&flow_table_locks[flow->hash % MAX_FLOW_TABLE_LOCKS]);
+       atomic_dec(&htbl->count);
 }
 
 static struct sw_flow_mask *mask_alloc(void)
@@ -556,16 +592,11 @@ static struct sw_flow_mask *mask_alloc(void)
 
        mask = kmalloc(sizeof(*mask), GFP_KERNEL);
        if (mask)
-               mask->ref_count = 0;
+               atomic_set(&mask->ref_count, 0);
 
        return mask;
 }
 
-static void mask_add_ref(struct sw_flow_mask *mask)
-{
-       mask->ref_count++;
-}
-
 static bool mask_equal(const struct sw_flow_mask *a,
                       const struct sw_flow_mask *b)
 {
@@ -609,10 +640,13 @@ static int flow_mask_insert(struct flow_table *tbl, 
struct sw_flow *flow,
                        return -ENOMEM;
                mask->key = new->key;
                mask->range = new->range;
+
+               spin_lock(&tbl->list_lock);
                list_add_rcu(&mask->list, &tbl->mask_list);
+               spin_unlock(&tbl->list_lock);
        }
 
-       mask_add_ref(mask);
+       atomic_inc(&mask->ref_count);
        flow->mask = mask;
        return 0;
 }
@@ -622,6 +656,7 @@ int ovs_flow_insert(struct flow_table *table, struct 
sw_flow *flow,
 {
        struct hash_table *htable = NULL;
        struct hash_table *new_htable = NULL;
+       bool wr_locked = false;
        int err;
 
        err = flow_mask_insert(table, flow, new);
@@ -631,10 +666,17 @@ int ovs_flow_insert(struct flow_table *table, struct 
sw_flow *flow,
        htable = ovsl_dereference(table->htable);
 
        /* Expand table, if necessary, to make room. */
-       if (htable->count > htable->n_buckets)
+       if (atomic_read(&htable->count) > htable->n_buckets) {
+               wr_locked = true;
+               down_write(&table->rehash_lock);
                new_htable = flow_tbl_expand(htable);
-       else if (time_after(jiffies, table->last_rehash + REHASH_INTERVAL))
+       } else if (time_after(jiffies, table->last_rehash + REHASH_INTERVAL)) {
+               wr_locked = true;
+               down_write(&table->rehash_lock);
                new_htable = flow_tbl_rehash(htable);
+       } else {
+               down_read(&table->rehash_lock);
+       }
 
        if (new_htable && !IS_ERR(new_htable)) {
                rcu_assign_pointer(table->htable, new_htable);
@@ -647,6 +689,11 @@ int ovs_flow_insert(struct flow_table *table, struct 
sw_flow *flow,
                        flow->mask->range.end);
        __tbl_insert(htable, flow);
 
+       if (unlikely(wr_locked))
+               up_write(&table->rehash_lock);
+       else
+               up_read(&table->rehash_lock);
+
        return 0;
 }
 
@@ -654,6 +701,8 @@ int ovs_flow_insert(struct flow_table *table, struct 
sw_flow *flow,
  * Returns zero if successful or a negative error code. */
 int ovs_flow_init(void)
 {
+       int i;
+
        BUILD_BUG_ON(__alignof__(struct sw_flow_key) % __alignof__(long));
        BUILD_BUG_ON(sizeof(struct sw_flow_key) % sizeof(long));
 
@@ -662,6 +711,8 @@ int ovs_flow_init(void)
        if (flow_cache == NULL)
                return -ENOMEM;
 
+       for (i = 0; i < MAX_FLOW_TABLE_LOCKS; i++)
+               spin_lock_init(&flow_table_locks[i]);
        return 0;
 }
 
diff --git a/datapath/flow_table.h b/datapath/flow_table.h
index 4b4f05e..4bd0d7d 100644
--- a/datapath/flow_table.h
+++ b/datapath/flow_table.h
@@ -38,7 +38,8 @@
 
 struct hash_table {
        struct flex_array *buckets;
-       unsigned int count, n_buckets;
+       unsigned int n_buckets;
+       atomic_t count;
        struct rcu_head rcu;
        int node_ver;
        u32 hash_seed;
@@ -46,8 +47,10 @@ struct hash_table {
 };
 
 struct flow_table {
+       struct rw_semaphore rehash_lock;
        struct hash_table __rcu *htable;
        struct list_head mask_list;
+       spinlock_t list_lock;
        unsigned long last_rehash;
 };
 
@@ -59,7 +62,7 @@ void ovs_flow_exit(void);
 
 struct sw_flow *ovs_flow_alloc(void);
 void ovs_flow_deferred_free(struct sw_flow *);
-void ovs_flow_free(struct sw_flow *, bool deferred);
+void ovs_flow_free(struct flow_table *tbl, struct sw_flow *flow, bool 
deferred);
 
 void ovs_flow_used(struct sw_flow *, struct sk_buff *);
 u64 ovs_flow_used_time(unsigned long flow_jiffies);
-- 
1.7.1

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to