I think that using atomics and using per-thread or per-NUMA node buckets is orthogonal: one can use either technique without the other.
I am reluctant to use atomics here because I think we need 64-bit stats (packet counters can definitely roll over past 2**32) and 64-bit atomics are internally implemented via a mutex on 32-bit systems. I agree that per-NUMA node buckets, as opposed to the per-thread buckets I have here, would also work well, but it is more difficult to determine the current NUMA node from userspace than it is in the kernel. On Fri, Mar 14, 2014 at 02:25:04AM -0700, Andy Zhou wrote: > How about using atomic operations to maintain stats instead of per thread > buckets? It seems to be a win at least within a NUMA node from kernel > datapath experience. > > > On Tue, Mar 11, 2014 at 1:56 PM, Ben Pfaff <b...@nicira.com> wrote: > > > This allows clients to do more than just increment a counter. The > > following commit will make the first use of that feature. > > > > Signed-off-by: Ben Pfaff <b...@nicira.com> > > --- > > lib/dpif-netdev.c | 75 +++++++++++++++++++++++++++++++++---------- > > lib/ovs-thread.c | 91 > > ++++++++++++++++++----------------------------------- > > lib/ovs-thread.h | 23 +++++++++++--- > > 3 files changed, 108 insertions(+), 81 deletions(-) > > > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > > index 54b8f50..268b04d 100644 > > --- a/lib/dpif-netdev.c > > +++ b/lib/dpif-netdev.c > > @@ -147,10 +147,8 @@ struct dp_netdev { > > > > /* Statistics. > > * > > - * ovsthread_counter is internally synchronized. */ > > - struct ovsthread_counter *n_hit; /* Number of flow table matches. > > */ > > - struct ovsthread_counter *n_missed; /* Number of flow table misses. */ > > - struct ovsthread_counter *n_lost; /* Number of misses not passed > > up. */ > > + * ovsthread_stats is internally synchronized. */ > > + struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. > > */ > > > > /* Ports. > > * > > @@ -170,6 +168,22 @@ static struct dp_netdev_port > > *dp_netdev_lookup_port(const struct dp_netdev *dp, > > odp_port_t) > > OVS_REQ_RDLOCK(dp->port_rwlock); > > > > +enum dp_stat_type { > > + DP_STAT_HIT, /* Packets that matched in the flow > > table. */ > > + DP_STAT_MISS, /* Packets that did not match. */ > > + DP_STAT_LOST, /* Packets not passed up to the client. */ > > + DP_N_STATS > > +}; > > + > > +/* Contained by struct dp_netdev's 'stats' member. */ > > +struct dp_netdev_stats { > > + struct ovs_mutex mutex; /* Protects 'n'. */ > > + > > + /* Indexed by DP_STAT_*, protected by 'mutex'. */ > > + unsigned long long int n[DP_N_STATS] OVS_GUARDED; > > +}; > > + > > + > > /* A port in a netdev-based datapath. */ > > struct dp_netdev_port { > > struct hmap_node node; /* Node in dp_netdev's 'ports'. */ > > @@ -461,9 +475,7 @@ create_dp_netdev(const char *name, const struct > > dpif_class *class, > > ovs_mutex_unlock(&dp->queue_mutex); > > dp->queue_seq = seq_create(); > > > > - dp->n_hit = ovsthread_counter_create(); > > - dp->n_missed = ovsthread_counter_create(); > > - dp->n_lost = ovsthread_counter_create(); > > + ovsthread_stats_init(&dp->stats); > > > > ovs_rwlock_init(&dp->port_rwlock); > > hmap_init(&dp->ports); > > @@ -532,6 +544,8 @@ dp_netdev_free(struct dp_netdev *dp) > > OVS_REQUIRES(dp_netdev_mutex) > > { > > struct dp_netdev_port *port, *next; > > + struct dp_netdev_stats *bucket; > > + int i; > > > > shash_find_and_delete(&dp_netdevs, dp->name); > > > > @@ -544,9 +558,12 @@ dp_netdev_free(struct dp_netdev *dp) > > do_del_port(dp, port->port_no); > > } > > ovs_rwlock_unlock(&dp->port_rwlock); > > - ovsthread_counter_destroy(dp->n_hit); > > - ovsthread_counter_destroy(dp->n_missed); > > - ovsthread_counter_destroy(dp->n_lost); > > + > > + OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) { > > + ovs_mutex_destroy(&bucket->mutex); > > + free_cacheline(bucket); > > + } > > + ovsthread_stats_destroy(&dp->stats); > > > > dp_netdev_purge_queues(dp); > > seq_destroy(dp->queue_seq); > > @@ -604,14 +621,21 @@ static int > > dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats > > *stats) > > { > > struct dp_netdev *dp = get_dp_netdev(dpif); > > + struct dp_netdev_stats *bucket; > > + size_t i; > > > > fat_rwlock_rdlock(&dp->cls.rwlock); > > stats->n_flows = hmap_count(&dp->flow_table); > > fat_rwlock_unlock(&dp->cls.rwlock); > > > > - stats->n_hit = ovsthread_counter_read(dp->n_hit); > > - stats->n_missed = ovsthread_counter_read(dp->n_missed); > > - stats->n_lost = ovsthread_counter_read(dp->n_lost); > > + stats->n_hit = stats->n_missed = stats->n_lost = 0; > > + OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) { > > + ovs_mutex_lock(&bucket->mutex); > > + stats->n_hit += bucket->n[DP_STAT_HIT]; > > + stats->n_missed += bucket->n[DP_STAT_MISS]; > > + stats->n_lost += bucket->n[DP_STAT_LOST]; > > + ovs_mutex_unlock(&bucket->mutex); > > + } > > stats->n_masks = UINT32_MAX; > > stats->n_mask_hit = UINT64_MAX; > > > > @@ -1711,6 +1735,25 @@ dp_netdev_flow_used(struct dp_netdev_flow > > *netdev_flow, > > netdev_flow->tcp_flags |= packet_get_tcp_flags(packet, > > &netdev_flow->flow); > > } > > > > +static void * > > +dp_netdev_stats_new_cb(void) > > +{ > > + struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket); > > + ovs_mutex_init(&bucket->mutex); > > + return bucket; > > +} > > + > > +static void > > +dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type) > > +{ > > + struct dp_netdev_stats *bucket; > > + > > + bucket = ovsthread_stats_bucket_get(&dp->stats, > > dp_netdev_stats_new_cb); > > + ovs_mutex_lock(&bucket->mutex); > > + bucket->n[type]++; > > + ovs_mutex_unlock(&bucket->mutex); > > +} > > + > > static void > > dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet, > > struct pkt_metadata *md) > > @@ -1736,9 +1779,9 @@ dp_netdev_port_input(struct dp_netdev *dp, struct > > ofpbuf *packet, > > actions->actions, actions->size); > > dp_netdev_actions_unref(actions); > > dp_netdev_flow_unref(netdev_flow); > > - ovsthread_counter_inc(dp->n_hit, 1); > > + dp_netdev_count_packet(dp, DP_STAT_HIT); > > } else { > > - ovsthread_counter_inc(dp->n_missed, 1); > > + dp_netdev_count_packet(dp, DP_STAT_MISS); > > dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL); > > } > > } > > @@ -1788,7 +1831,7 @@ dp_netdev_output_userspace(struct dp_netdev *dp, > > struct ofpbuf *packet, > > > > error = 0; > > } else { > > - ovsthread_counter_inc(dp->n_lost, 1); > > + dp_netdev_count_packet(dp, DP_STAT_LOST); > > error = ENOBUFS; > > } > > ovs_mutex_unlock(&dp->queue_mutex); > > diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c > > index 33b9e08..d313c5a 100644 > > --- a/lib/ovs-thread.c > > +++ b/lib/ovs-thread.c > > @@ -377,83 +377,54 @@ may_fork(void) > > return !must_not_fork; > > } > > > > -/* ovsthread_counter. > > - * > > - * We implement the counter as an array of N_COUNTERS individual > > counters, each > > - * with its own lock. Each thread uses one of the counters chosen based > > on a > > - * hash of the thread's ID, the idea being that, statistically, different > > - * threads will tend to use different counters and therefore avoid > > - * interfering with each other. > > - * > > - * Undoubtedly, better implementations are possible. */ > > - > > -/* Basic counter structure. */ > > -struct ovsthread_counter__ { > > - struct ovs_mutex mutex; > > - unsigned long long int value; > > -}; > > - > > -/* Pad the basic counter structure to 64 bytes to avoid cache line > > - * interference. */ > > -struct ovsthread_counter { > > - struct ovsthread_counter__ c; > > - char pad[ROUND_UP(sizeof(struct ovsthread_counter__), 64) > > - - sizeof(struct ovsthread_counter__)]; > > -}; > > - > > -#define N_COUNTERS 16 > > +/* ovsthread_stats. */ > > > > -struct ovsthread_counter * > > -ovsthread_counter_create(void) > > +void > > +ovsthread_stats_init(struct ovsthread_stats *stats) > > { > > - struct ovsthread_counter *c; > > int i; > > > > - c = xmalloc(N_COUNTERS * sizeof *c); > > - for (i = 0; i < N_COUNTERS; i++) { > > - ovs_mutex_init(&c[i].c.mutex); > > - c[i].c.value = 0; > > + ovs_mutex_init(&stats->mutex); > > + for (i = 0; i < ARRAY_SIZE(stats->buckets); i++) { > > + stats->buckets[i] = NULL; > > } > > - return c; > > } > > > > void > > -ovsthread_counter_destroy(struct ovsthread_counter *c) > > +ovsthread_stats_destroy(struct ovsthread_stats *stats) > > { > > - if (c) { > > - int i; > > - > > - for (i = 0; i < N_COUNTERS; i++) { > > - ovs_mutex_destroy(&c[i].c.mutex); > > - } > > - free(c); > > - } > > + ovs_mutex_destroy(&stats->mutex); > > } > > > > -void > > -ovsthread_counter_inc(struct ovsthread_counter *c, unsigned long long int > > n) > > +void * > > +ovsthread_stats_bucket_get(struct ovsthread_stats *stats, > > + void *(*new_bucket)(void)) > > { > > - c = &c[hash_int(ovsthread_id_self(), 0) % N_COUNTERS]; > > - > > - ovs_mutex_lock(&c->c.mutex); > > - c->c.value += n; > > - ovs_mutex_unlock(&c->c.mutex); > > + unsigned int hash = hash_int(ovsthread_id_self(), 0); > > + unsigned int idx = hash & (ARRAY_SIZE(stats->buckets) - 1); > > + void *bucket = stats->buckets[idx]; > > + if (!bucket) { > > + ovs_mutex_lock(&stats->mutex); > > + bucket = stats->buckets[idx]; > > + if (!bucket) { > > + bucket = stats->buckets[idx] = new_bucket(); > > + } > > + ovs_mutex_unlock(&stats->mutex); > > + } > > + return bucket; > > } > > > > -unsigned long long int > > -ovsthread_counter_read(const struct ovsthread_counter *c) > > +size_t > > +ovs_thread_stats_next_bucket(const struct ovsthread_stats *stats, size_t > > i) > > { > > - unsigned long long int sum; > > - int i; > > - > > - sum = 0; > > - for (i = 0; i < N_COUNTERS; i++) { > > - ovs_mutex_lock(&c[i].c.mutex); > > - sum += c[i].c.value; > > - ovs_mutex_unlock(&c[i].c.mutex); > > + for (; i < ARRAY_SIZE(stats->buckets); i++) { > > + if (stats->buckets[i]) { > > + break; > > + } > > } > > - return sum; > > + return i; > > } > > + > > > > /* Parses /proc/cpuinfo for the total number of physical cores on this > > system > > * across all CPU packages, not counting hyper-threads. > > diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h > > index f489308..cf1de64 100644 > > --- a/lib/ovs-thread.h > > +++ b/lib/ovs-thread.h > > @@ -592,11 +592,24 @@ ovsthread_id_self(void) > > * > > * Fully thread-safe. */ > > > > -struct ovsthread_counter *ovsthread_counter_create(void); > > -void ovsthread_counter_destroy(struct ovsthread_counter *); > > -void ovsthread_counter_inc(struct ovsthread_counter *, unsigned long long > > int); > > -unsigned long long int ovsthread_counter_read( > > - const struct ovsthread_counter *); > > +struct ovsthread_stats { > > + struct ovs_mutex mutex; > > + void *volatile buckets[16]; > > +}; > > + > > +void ovsthread_stats_init(struct ovsthread_stats *); > > +void ovsthread_stats_destroy(struct ovsthread_stats *); > > + > > +void *ovsthread_stats_bucket_get(struct ovsthread_stats *, > > + void *(*new_bucket)(void)); > > + > > +#define OVSTHREAD_STATS_FOR_EACH_BUCKET(BUCKET, IDX, STATS) \ > > + for ((IDX) = ovs_thread_stats_next_bucket(STATS, 0); \ > > + ((IDX) < ARRAY_SIZE((STATS)->buckets) \ > > + ? ((BUCKET) = (STATS)->buckets[IDX], true) \ > > + : false); \ > > + (IDX) = ovs_thread_stats_next_bucket(STATS, (IDX) + 1)) > > +size_t ovs_thread_stats_next_bucket(const struct ovsthread_stats *, > > size_t); > > > > bool single_threaded(void); > > > > -- > > 1.7.10.4 > > > > _______________________________________________ > > dev mailing list > > dev@openvswitch.org > > http://openvswitch.org/mailman/listinfo/dev > > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev