Following patch breaks down ovs_mutex into multiple locks.  This
patch specifically targets flow-install.  By breaking down
ovs-locking parallel flow-installs are possible.

Signed-off-by: Pravin B Shelar <pshe...@nicira.com>
---
v3:
Handle dup flow in flow insert.
v2:
get rid of mask_list lock.
---
 datapath/datapath.c   |   90 +++++++++-------
 datapath/datapath.h   |    7 +-
 datapath/dp_notify.c  |    4 +-
 datapath/flow.h       |    3 +-
 datapath/flow_table.c |  285 +++++++++++++++++++++++++++++++------------------
 datapath/flow_table.h |    6 +-
 6 files changed, 244 insertions(+), 151 deletions(-)

diff --git a/datapath/datapath.c b/datapath/datapath.c
index 78f18bc..4d7c816 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -87,23 +87,33 @@ static void ovs_notify(struct sk_buff *skb, struct 
genl_info *info,
  * The RTNL lock nests inside ovs_mutex.
  */
 
-static DEFINE_MUTEX(ovs_mutex);
+static DECLARE_RWSEM(ovs_sem);
 
-void ovs_lock(void)
+void ovs_wr_lock(void)
 {
-       mutex_lock(&ovs_mutex);
+       down_write(&ovs_sem);
 }
 
-void ovs_unlock(void)
+void ovs_wr_unlock(void)
 {
-       mutex_unlock(&ovs_mutex);
+       up_write(&ovs_sem);
+}
+
+void ovs_rd_lock(void)
+{
+       down_read(&ovs_sem);
+}
+
+void ovs_rd_unlock(void)
+{
+       up_read(&ovs_sem);
 }
 
 #ifdef CONFIG_LOCKDEP
 int lockdep_ovsl_is_held(void)
 {
        if (debug_locks)
-               return lockdep_is_held(&ovs_mutex);
+               return lockdep_is_held(&ovs_sem);
        else
                return 1;
 }
@@ -539,13 +549,13 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, 
struct genl_info *info)
        local_bh_enable();
        rcu_read_unlock();
 
-       ovs_flow_free(flow, false);
+       ovs_flow_free(&dp->table, flow, false);
        return err;
 
 err_unlock:
        rcu_read_unlock();
 err_flow_free:
-       ovs_flow_free(flow, false);
+       ovs_flow_free(&dp->table, flow, false);
 err_kfree_skb:
        kfree_skb(packet);
 err:
@@ -790,7 +800,7 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, 
struct genl_info *info)
                goto error;
        }
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        error = -ENODEV;
        if (!dp)
@@ -862,7 +872,7 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, 
struct genl_info *info)
                        spin_unlock_bh(&flow->lock);
                }
        }
-       ovs_unlock();
+       ovs_rd_unlock();
 
        if (!IS_ERR(reply))
                ovs_notify(reply, info, &ovs_dp_flow_multicast_group);
@@ -872,9 +882,9 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, 
struct genl_info *info)
        return 0;
 
 err_flow_free:
-       ovs_flow_free(flow, false);
+       ovs_flow_free(&dp->table, flow, false);
 err_unlock_ovs:
-       ovs_unlock();
+       ovs_rd_unlock();
 err_kfree:
        kfree(acts);
 error:
@@ -902,7 +912,7 @@ static int ovs_flow_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
        if (err)
                return err;
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        if (!dp) {
                err = -ENODEV;
@@ -927,10 +937,10 @@ static int ovs_flow_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
        }
 
-       ovs_unlock();
+       ovs_rd_unlock();
        return genlmsg_reply(reply, info);
 unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -945,7 +955,7 @@ static int ovs_flow_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        struct sw_flow_match match;
        int err;
 
-       ovs_lock();
+       ovs_rd_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        if (!dp) {
                err = -ENODEV;
@@ -985,13 +995,13 @@ static int ovs_flow_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
                                     info->snd_seq, 0, OVS_FLOW_CMD_DEL);
        BUG_ON(err < 0);
 
-       ovs_flow_free(flow, true);
-       ovs_unlock();
+       ovs_flow_free(&dp->table, flow, true);
+       ovs_rd_unlock();
 
        ovs_notify(reply, info, &ovs_dp_flow_multicast_group);
        return 0;
 unlock:
-       ovs_unlock();
+       ovs_rd_unlock();
        return err;
 }
 
@@ -1169,7 +1179,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
        if (!a[OVS_DP_ATTR_NAME] || !a[OVS_DP_ATTR_UPCALL_PID])
                goto err;
 
-       ovs_lock();
+       ovs_wr_lock();
 
        err = -ENOMEM;
        dp = kzalloc(sizeof(*dp), GFP_KERNEL);
@@ -1225,7 +1235,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
        ovs_net = net_generic(ovs_dp_get_net(dp), ovs_net_id);
        list_add_tail_rcu(&dp->list_node, &ovs_net->dps);
 
-       ovs_unlock();
+       ovs_wr_unlock();
 
        ovs_notify(reply, info, &ovs_dp_datapath_multicast_group);
        return 0;
@@ -1242,7 +1252,7 @@ err_free_dp:
        release_net(ovs_dp_get_net(dp));
        kfree(dp);
 err_unlock_ovs:
-       ovs_unlock();
+       ovs_wr_unlock();
 err:
        return err;
 }
@@ -1277,7 +1287,7 @@ static int ovs_dp_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        struct datapath *dp;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        dp = lookup_datapath(sock_net(skb->sk), info->userhdr, info->attrs);
        err = PTR_ERR(dp);
        if (IS_ERR(dp))
@@ -1290,13 +1300,13 @@ static int ovs_dp_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
 
        __dp_destroy(dp);
-       ovs_unlock();
+       ovs_wr_unlock();
 
        ovs_notify(reply, info, &ovs_dp_datapath_multicast_group);
 
        return 0;
 unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1306,7 +1316,7 @@ static int ovs_dp_cmd_set(struct sk_buff *skb, struct 
genl_info *info)
        struct datapath *dp;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        dp = lookup_datapath(sock_net(skb->sk), info->userhdr, info->attrs);
        err = PTR_ERR(dp);
        if (IS_ERR(dp))
@@ -1322,12 +1332,12 @@ static int ovs_dp_cmd_set(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
        }
 
-       ovs_unlock();
+       ovs_wr_unlock();
        ovs_notify(reply, info, &ovs_dp_datapath_multicast_group);
 
        return 0;
 unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1337,7 +1347,7 @@ static int ovs_dp_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
        struct datapath *dp;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        dp = lookup_datapath(sock_net(skb->sk), info->userhdr, info->attrs);
        if (IS_ERR(dp)) {
                err = PTR_ERR(dp);
@@ -1351,11 +1361,11 @@ static int ovs_dp_cmd_get(struct sk_buff *skb, struct 
genl_info *info)
                goto unlock;
        }
 
-       ovs_unlock();
+       ovs_wr_unlock();
        return genlmsg_reply(reply, info);
 
 unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1535,7 +1545,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
            !a[OVS_VPORT_ATTR_UPCALL_PID])
                goto exit;
 
-       ovs_lock();
+       ovs_wr_lock();
        dp = get_dp(sock_net(skb->sk), ovs_header->dp_ifindex);
        err = -ENODEV;
        if (!dp)
@@ -1591,7 +1601,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct 
genl_info *info)
        ovs_notify(reply, info, &ovs_dp_vport_multicast_group);
 
 exit_unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
 exit:
        return err;
 }
@@ -1603,7 +1613,7 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, struct 
genl_info *info)
        struct vport *vport;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        vport = lookup_vport(sock_net(skb->sk), info->userhdr, a);
        err = PTR_ERR(vport);
        if (IS_ERR(vport))
@@ -1637,14 +1647,14 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, 
struct genl_info *info)
                                      info->snd_seq, 0, OVS_VPORT_CMD_NEW);
        BUG_ON(err < 0);
 
-       ovs_unlock();
+       ovs_wr_unlock();
        ovs_notify(reply, info, &ovs_dp_vport_multicast_group);
        return 0;
 
 exit_free:
        kfree_skb(reply);
 exit_unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1655,7 +1665,7 @@ static int ovs_vport_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        struct vport *vport;
        int err;
 
-       ovs_lock();
+       ovs_wr_lock();
        vport = lookup_vport(sock_net(skb->sk), info->userhdr, a);
        err = PTR_ERR(vport);
        if (IS_ERR(vport))
@@ -1678,7 +1688,7 @@ static int ovs_vport_cmd_del(struct sk_buff *skb, struct 
genl_info *info)
        ovs_notify(reply, info, &ovs_dp_vport_multicast_group);
 
 exit_unlock:
-       ovs_unlock();
+       ovs_wr_unlock();
        return err;
 }
 
@@ -1847,10 +1857,10 @@ static void __net_exit ovs_exit_net(struct net *net)
        struct datapath *dp, *dp_next;
        struct ovs_net *ovs_net = net_generic(net, ovs_net_id);
 
-       ovs_lock();
+       ovs_wr_lock();
        list_for_each_entry_safe(dp, dp_next, &ovs_net->dps, list_node)
                __dp_destroy(dp);
-       ovs_unlock();
+       ovs_wr_unlock();
 
        cancel_work_sync(&ovs_net->dp_notify_work);
 }
diff --git a/datapath/datapath.h b/datapath/datapath.h
index 64920de..d3eaf3d 100644
--- a/datapath/datapath.h
+++ b/datapath/datapath.h
@@ -132,8 +132,11 @@ struct ovs_net {
 };
 
 extern int ovs_net_id;
-void ovs_lock(void);
-void ovs_unlock(void);
+
+void ovs_wr_lock(void);
+void ovs_wr_unlock(void);
+void ovs_rd_lock(void);
+void ovs_rd_unlock(void);
 
 #ifdef CONFIG_LOCKDEP
 int lockdep_ovsl_is_held(void);
diff --git a/datapath/dp_notify.c b/datapath/dp_notify.c
index b5178fc..f8c4769 100644
--- a/datapath/dp_notify.c
+++ b/datapath/dp_notify.c
@@ -51,7 +51,7 @@ void ovs_dp_notify_wq(struct work_struct *work)
        struct ovs_net *ovs_net = container_of(work, struct ovs_net, 
dp_notify_work);
        struct datapath *dp;
 
-       ovs_lock();
+       ovs_wr_lock();
        list_for_each_entry(dp, &ovs_net->dps, list_node) {
                int i;
 
@@ -72,7 +72,7 @@ void ovs_dp_notify_wq(struct work_struct *work)
                        }
                }
        }
-       ovs_unlock();
+       ovs_wr_unlock();
 }
 
 static int dp_device_event(struct notifier_block *unused, unsigned long event,
diff --git a/datapath/flow.h b/datapath/flow.h
index c50c76f..a112821 100644
--- a/datapath/flow.h
+++ b/datapath/flow.h
@@ -127,7 +127,7 @@ struct sw_flow_key_range {
 };
 
 struct sw_flow_mask {
-       int ref_count;
+       atomic_t ref_count;
        struct rcu_head rcu;
        struct list_head list;
        struct sw_flow_key_range range;
@@ -163,6 +163,7 @@ struct sw_flow {
        u64 packet_count;       /* Number of packets matched. */
        u64 byte_count;         /* Number of bytes matched. */
        u8 tcp_flags;           /* Union of seen TCP flags. */
+       struct list_head list;
 };
 
 struct arp_eth_header {
diff --git a/datapath/flow_table.c b/datapath/flow_table.c
index b983669..784c38e 100644
--- a/datapath/flow_table.c
+++ b/datapath/flow_table.c
@@ -49,8 +49,10 @@
 
 #define TBL_MIN_BUCKETS                1024
 #define REHASH_INTERVAL                (10 * 60 * HZ)
+#define MAX_FLOW_TABLE_LOCKS   (NR_CPUS * 4)
 
 static struct kmem_cache *flow_cache;
+static spinlock_t flow_table_locks[MAX_FLOW_TABLE_LOCKS];
 
 void ovs_match_init(struct sw_flow_match *match,
                    struct sw_flow_key *key,
@@ -108,7 +110,7 @@ int ovs_flow_tbl_count(struct flow_table *table)
 {
        struct hash_table *htable = rcu_dereference_check(table->htable, 
lockdep_ovsl_is_held());
 
-       return htable->count;
+       return atomic_read(&htable->count);
 }
 
 static struct flex_array *alloc_buckets(unsigned int n_buckets)
@@ -140,50 +142,6 @@ static void __flow_free(struct sw_flow *flow)
        kmem_cache_free(flow_cache, flow);
 }
 
-static void rcu_free_flow_callback(struct rcu_head *rcu)
-{
-       struct sw_flow *flow = container_of(rcu, struct sw_flow, rcu);
-
-       __flow_free(flow);
-}
-
-static void rcu_free_sw_flow_mask_cb(struct rcu_head *rcu)
-{
-       struct sw_flow_mask *mask = container_of(rcu, struct sw_flow_mask, rcu);
-
-       kfree(mask);
-}
-
-static void flow_mask_del_ref(struct sw_flow_mask *mask, bool deferred)
-{
-       if (!mask)
-               return;
-
-       BUG_ON(!mask->ref_count);
-       mask->ref_count--;
-
-       if (!mask->ref_count) {
-               list_del_rcu(&mask->list);
-               if (deferred)
-                       call_rcu(&mask->rcu, rcu_free_sw_flow_mask_cb);
-               else
-                       kfree(mask);
-       }
-}
-
-void ovs_flow_free(struct sw_flow *flow, bool deferred)
-{
-       if (!flow)
-               return;
-
-       flow_mask_del_ref(flow->mask, deferred);
-
-       if (deferred)
-               call_rcu(&flow->rcu, rcu_free_flow_callback);
-       else
-               __flow_free(flow);
-}
-
 static void free_buckets(struct flex_array *buckets)
 {
        flex_array_free(buckets);
@@ -204,7 +162,7 @@ static void __flow_tbl_destroy(struct hash_table *table)
 
                hlist_for_each_entry_safe(flow, n, head, hash_node[ver]) {
                        hlist_del(&flow->hash_node[ver]);
-                       ovs_flow_free(flow, false);
+                       __flow_free(flow);
                }
        }
 
@@ -227,7 +185,7 @@ static struct hash_table *__flow_tbl_alloc(int new_size)
                return NULL;
        }
        table->n_buckets = new_size;
-       table->count = 0;
+       atomic_set(&table->count, 0);
        table->node_ver = 0;
        table->keep_flows = false;
        get_random_bytes(&table->hash_seed, sizeof(u32));
@@ -247,6 +205,7 @@ int ovs_flow_tbl_init(struct flow_table *table)
        rcu_assign_pointer(table->htable, htable);
        INIT_LIST_HEAD(&table->mask_list);
        table->last_rehash = jiffies;
+       init_rwsem(&table->rehash_lock);
        return 0;
 }
 
@@ -257,11 +216,74 @@ static void flow_tbl_destroy_rcu_cb(struct rcu_head *rcu)
        __flow_tbl_destroy(table);
 }
 
-static void __ovs_flow_tbl_destroy(struct hash_table *table, bool deferred)
+static void rcu_free_sw_flow_mask_cb(struct rcu_head *rcu)
 {
+       struct sw_flow_mask *mask = container_of(rcu, struct sw_flow_mask, rcu);
+
+       kfree(mask);
+}
+
+static void flow_mask_del_ref(struct flow_table *tbl,
+                             struct sw_flow_mask *mask, bool deferred)
+{
+       if (!mask)
+               return;
+
+       if (atomic_dec_and_test(&mask->ref_count)) {
+               down_write(&tbl->rehash_lock);
+               list_del_rcu(&mask->list);
+               up_write(&tbl->rehash_lock);
+               if (deferred)
+                       call_rcu(&mask->rcu, rcu_free_sw_flow_mask_cb);
+               else
+                       kfree(mask);
+       }
+}
+
+static void rcu_free_flow_callback(struct rcu_head *rcu)
+{
+       struct sw_flow *flow = container_of(rcu, struct sw_flow, rcu);
+
+       __flow_free(flow);
+}
+
+void ovs_flow_free(struct flow_table *tbl, struct sw_flow *flow, bool deferred)
+{
+       if (!flow)
+               return;
+
+       flow_mask_del_ref(tbl, flow->mask, deferred);
+
+       if (deferred)
+               call_rcu(&flow->rcu, rcu_free_flow_callback);
+       else
+               __flow_free(flow);
+}
+
+static void __ovs_flow_tbl_destroy(struct flow_table *flow_table,
+                                  struct hash_table *table, bool deferred)
+{
+       int i;
+
        if (!table)
                return;
 
+       if (table->keep_flows)
+               goto skip_flows;
+
+       for (i = 0; i < table->n_buckets; i++) {
+               struct sw_flow *flow;
+               struct hlist_head *head = flex_array_get(table->buckets, i);
+               struct hlist_node *n;
+               int ver = table->node_ver;
+
+               hlist_for_each_entry_safe(flow, n, head, hash_node[ver])
+                       flow_mask_del_ref(flow_table, flow->mask, deferred);
+
+       }
+
+skip_flows:
+
        if (deferred)
                call_rcu(&table->rcu, flow_tbl_destroy_rcu_cb);
        else
@@ -272,7 +294,7 @@ void ovs_flow_tbl_destroy(struct flow_table *table)
 {
        struct hash_table *htable = ovsl_dereference(table->htable);
 
-       __ovs_flow_tbl_destroy(htable, false);
+       __ovs_flow_tbl_destroy(table, htable, false);
 }
 
 struct sw_flow *ovs_flow_tbl_dump_next(struct hash_table *table,
@@ -311,12 +333,55 @@ static struct hlist_head *find_bucket(struct hash_table 
*table, u32 hash)
 
 static void __tbl_insert(struct hash_table *table, struct sw_flow *flow)
 {
-       struct hlist_head *head;
+       struct hlist_head *head = find_bucket(table, flow->hash);
 
-       head = find_bucket(table, flow->hash);
        hlist_add_head_rcu(&flow->hash_node[table->node_ver], head);
+       atomic_inc(&table->count);
+}
+
+static bool __cmp_key(const struct sw_flow_key *key1,
+                     const struct sw_flow_key *key2,
+                     int key_start, int key_end)
+{
+       const long *cp1 = (long *)((u8 *)key1 + key_start);
+       const long *cp2 = (long *)((u8 *)key2 + key_start);
+       long diffs = 0;
+       int i;
 
-       table->count++;
+       for (i = key_start; i < key_end;  i += sizeof(long))
+               diffs |= *cp1++ ^ *cp2++;
+
+       return diffs == 0;
+}
+
+static bool __flow_cmp_masked_key(const struct sw_flow *flow,
+                                 const struct sw_flow_key *key,
+                                 int key_start, int key_end)
+{
+       return __cmp_key(&flow->key, key, key_start, key_end);
+}
+
+static int tbl_insert(struct hash_table *table, struct sw_flow *new_flow,
+                     struct sw_flow_mask *new_mask)
+{
+       struct hlist_head *head = find_bucket(table, new_flow->hash);
+       int key_start = new_mask->range.start;
+       int key_end = new_mask->range.end;
+       struct sw_flow *flow;
+
+       spin_lock(&flow_table_locks[new_flow->hash % MAX_FLOW_TABLE_LOCKS]);
+       hlist_for_each_entry_rcu(flow, head, hash_node[table->node_ver]) {
+               if (flow->mask == new_mask &&
+                   __flow_cmp_masked_key(flow, &new_flow->key,
+                                         key_start, key_end))
+                       return -EEXIST;
+       }
+
+       hlist_add_head_rcu(&new_flow->hash_node[table->node_ver], head);
+       spin_unlock(&flow_table_locks[new_flow->hash % MAX_FLOW_TABLE_LOCKS]);
+
+       atomic_inc(&table->count);
+       return 0;
 }
 
 static void flow_table_copy_flows(struct hash_table *old,
@@ -360,16 +425,26 @@ int ovs_flows_tbl_flush(struct flow_table *flow_table)
 {
        struct hash_table *old_table;
        struct hash_table *new_htable;
+       int err;
+
+       down_write(&flow_table->rehash_lock);
 
        old_table = ovsl_dereference(flow_table->htable);
        new_htable = __flow_tbl_alloc(TBL_MIN_BUCKETS);
-       if (!new_htable)
-               return -ENOMEM;
+       if (!new_htable) {
+               up_write(&flow_table->rehash_lock);
+               err = -ENOMEM;
+               goto out;
+       }
 
        rcu_assign_pointer(flow_table->htable, new_htable);
 
-       __ovs_flow_tbl_destroy(old_table, true);
-       return 0;
+       __ovs_flow_tbl_destroy(flow_table, old_table, true);
+       err = 0;
+
+out:
+       up_write(&flow_table->rehash_lock);
+       return err;
 }
 
 static u32 flow_hash(const struct sw_flow_key *key, int key_start,
@@ -393,28 +468,6 @@ static int flow_key_start(const struct sw_flow_key *key)
                                          sizeof(long));
 }
 
-static bool __cmp_key(const struct sw_flow_key *key1,
-                     const struct sw_flow_key *key2,
-                     int key_start, int key_end)
-{
-       const long *cp1 = (long *)((u8 *)key1 + key_start);
-       const long *cp2 = (long *)((u8 *)key2 + key_start);
-       long diffs = 0;
-       int i;
-
-       for (i = key_start; i < key_end;  i += sizeof(long))
-               diffs |= *cp1++ ^ *cp2++;
-
-       return diffs == 0;
-}
-
-static bool __flow_cmp_masked_key(const struct sw_flow *flow,
-                                 const struct sw_flow_key *key,
-                                 int key_start, int key_end)
-{
-       return __cmp_key(&flow->key, key, key_start, key_end);
-}
-
 static bool __flow_cmp_unmasked_key(const struct sw_flow *flow,
                                    const struct sw_flow_key *key,
                                    int key_start, int key_end)
@@ -493,9 +546,10 @@ void ovs_flow_tbl_remove(struct flow_table *table, struct 
sw_flow *flow)
 {
        struct hash_table *htbl = ovsl_dereference(table->htable);
 
-       BUG_ON(htbl->count == 0);
+       spin_lock(&flow_table_locks[flow->hash % MAX_FLOW_TABLE_LOCKS]);
        hlist_del_rcu(&flow->hash_node[htbl->node_ver]);
-       htbl->count--;
+       spin_unlock(&flow_table_locks[flow->hash % MAX_FLOW_TABLE_LOCKS]);
+       atomic_dec(&htbl->count);
 }
 
 static struct sw_flow_mask *mask_alloc(void)
@@ -504,16 +558,11 @@ static struct sw_flow_mask *mask_alloc(void)
 
        mask = kmalloc(sizeof(*mask), GFP_KERNEL);
        if (mask)
-               mask->ref_count = 0;
+               atomic_set(&mask->ref_count, 0);
 
        return mask;
 }
 
-static void mask_add_ref(struct sw_flow_mask *mask)
-{
-       mask->ref_count++;
-}
-
 static bool mask_equal(const struct sw_flow_mask *a,
                       const struct sw_flow_mask *b)
 {
@@ -552,15 +601,21 @@ static int flow_mask_insert(struct flow_table *tbl, 
struct sw_flow *flow,
        mask = flow_mask_find(tbl, new);
        if (!mask) {
                /* Allocate a new mask if none exsits. */
-               mask = mask_alloc();
-               if (!mask)
-                       return -ENOMEM;
-               mask->key = new->key;
-               mask->range = new->range;
-               list_add_rcu(&mask->list, &tbl->mask_list);
+               down_write(&tbl->rehash_lock);
+               mask = flow_mask_find(tbl, new);
+               if (!mask) {
+                       mask = mask_alloc();
+                       if (!mask)
+                               return -ENOMEM;
+                       mask->key = new->key;
+                       mask->range = new->range;
+
+                       list_add_rcu(&mask->list, &tbl->mask_list);
+               }
+               up_write(&tbl->rehash_lock);
        }
 
-       mask_add_ref(mask);
+       atomic_inc(&mask->ref_count);
        flow->mask = mask;
        return 0;
 }
@@ -572,26 +627,44 @@ int ovs_flow_tbl_insert(struct flow_table *table, struct 
sw_flow *flow,
        struct hash_table *htable;
        int err;
 
-       err = flow_mask_insert(table, flow, new);
-       if (err)
-               return err;
+       flow->hash = flow_hash(&flow->key, new->range.start,
+                              new->range.end);
 
-       flow->hash = flow_hash(&flow->key, flow->mask->range.start,
-                       flow->mask->range.end);
+       down_read(&table->rehash_lock);
        htable = ovsl_dereference(table->htable);
-       __tbl_insert(htable, flow);
+       err = tbl_insert(htable, flow, new);
+       up_read(&table->rehash_lock);
+       if (unlikely(err))
+               return err;
+
+       err = flow_mask_insert(table, flow, new);
+       if (unlikely(err)) {
+               ovs_flow_tbl_remove(table, flow);
+               return err;
+       }
 
        /* Expand table, if necessary, to make room. */
-       if (htable->count > htable->n_buckets)
-               new_htable = flow_tbl_expand(htable);
-       else if (time_after(jiffies, table->last_rehash + REHASH_INTERVAL))
-               new_htable = flow_tbl_rehash(htable);
+       if (atomic_read(&htable->count) > htable->n_buckets) {
+               down_write(&table->rehash_lock);
+               if (atomic_read(&htable->count) > htable->n_buckets)
+                       new_htable = flow_tbl_expand(htable);
+
+       } else if (time_after(jiffies, table->last_rehash + REHASH_INTERVAL)) {
+               down_write(&table->rehash_lock);
+               if (time_after(jiffies, table->last_rehash + REHASH_INTERVAL))
+                       new_htable = flow_tbl_rehash(htable);
+       } else {
+               return 0;
+       }
 
        if (new_htable) {
                rcu_assign_pointer(table->htable, new_htable);
-               __ovs_flow_tbl_destroy(htable, true);
+               __ovs_flow_tbl_destroy(table, htable, true);
                table->last_rehash = jiffies;
        }
+
+       up_write(&table->rehash_lock);
+
        return 0;
 }
 
@@ -599,6 +672,8 @@ int ovs_flow_tbl_insert(struct flow_table *table, struct 
sw_flow *flow,
  * Returns zero if successful or a negative error code. */
 int ovs_flow_init(void)
 {
+       int i;
+
        BUILD_BUG_ON(__alignof__(struct sw_flow_key) % __alignof__(long));
        BUILD_BUG_ON(sizeof(struct sw_flow_key) % sizeof(long));
 
@@ -607,6 +682,8 @@ int ovs_flow_init(void)
        if (flow_cache == NULL)
                return -ENOMEM;
 
+       for (i = 0; i < MAX_FLOW_TABLE_LOCKS; i++)
+               spin_lock_init(&flow_table_locks[i]);
        return 0;
 }
 
diff --git a/datapath/flow_table.h b/datapath/flow_table.h
index ff1489f..0c8a9e1 100644
--- a/datapath/flow_table.h
+++ b/datapath/flow_table.h
@@ -38,7 +38,8 @@
 
 struct hash_table {
        struct flex_array *buckets;
-       unsigned int count, n_buckets;
+       unsigned int n_buckets;
+       atomic_t count;
        struct rcu_head rcu;
        int node_ver;
        u32 hash_seed;
@@ -46,6 +47,7 @@ struct hash_table {
 };
 
 struct flow_table {
+       struct rw_semaphore rehash_lock;
        struct hash_table __rcu *htable;
        struct list_head mask_list;
        unsigned long last_rehash;
@@ -58,7 +60,7 @@ int ovs_flow_init(void);
 void ovs_flow_exit(void);
 
 struct sw_flow *ovs_flow_alloc(void);
-void ovs_flow_free(struct sw_flow *, bool deferred);
+void ovs_flow_free(struct flow_table *tbl, struct sw_flow *flow, bool 
deferred);
 
 int ovs_flow_tbl_init(struct flow_table *);
 int ovs_flow_tbl_count(struct flow_table *table);
-- 
1.7.1

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to