How about using atomic operations to maintain stats instead of per thread
buckets? It seems to be a win at least within a NUMA node from kernel
datapath experience.


On Tue, Mar 11, 2014 at 1:56 PM, Ben Pfaff <b...@nicira.com> wrote:

> This allows clients to do more than just increment a counter.  The
> following commit will make the first use of that feature.
>
> Signed-off-by: Ben Pfaff <b...@nicira.com>
> ---
>  lib/dpif-netdev.c |   75 +++++++++++++++++++++++++++++++++----------
>  lib/ovs-thread.c  |   91
> ++++++++++++++++++-----------------------------------
>  lib/ovs-thread.h  |   23 +++++++++++---
>  3 files changed, 108 insertions(+), 81 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 54b8f50..268b04d 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -147,10 +147,8 @@ struct dp_netdev {
>
>      /* Statistics.
>       *
> -     * ovsthread_counter is internally synchronized. */
> -    struct ovsthread_counter *n_hit;    /* Number of flow table matches.
> */
> -    struct ovsthread_counter *n_missed; /* Number of flow table misses. */
> -    struct ovsthread_counter *n_lost;   /* Number of misses not passed
> up. */
> +     * ovsthread_stats is internally synchronized. */
> +    struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'.
> */
>
>      /* Ports.
>       *
> @@ -170,6 +168,22 @@ static struct dp_netdev_port
> *dp_netdev_lookup_port(const struct dp_netdev *dp,
>                                                      odp_port_t)
>      OVS_REQ_RDLOCK(dp->port_rwlock);
>
> +enum dp_stat_type {
> +    DP_STAT_HIT,                /* Packets that matched in the flow
> table. */
> +    DP_STAT_MISS,               /* Packets that did not match. */
> +    DP_STAT_LOST,               /* Packets not passed up to the client. */
> +    DP_N_STATS
> +};
> +
> +/* Contained by struct dp_netdev's 'stats' member.  */
> +struct dp_netdev_stats {
> +    struct ovs_mutex mutex;          /* Protects 'n'. */
> +
> +    /* Indexed by DP_STAT_*, protected by 'mutex'. */
> +    unsigned long long int n[DP_N_STATS] OVS_GUARDED;
> +};
> +
> +
>  /* A port in a netdev-based datapath. */
>  struct dp_netdev_port {
>      struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
> @@ -461,9 +475,7 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
>      ovs_mutex_unlock(&dp->queue_mutex);
>      dp->queue_seq = seq_create();
>
> -    dp->n_hit = ovsthread_counter_create();
> -    dp->n_missed = ovsthread_counter_create();
> -    dp->n_lost = ovsthread_counter_create();
> +    ovsthread_stats_init(&dp->stats);
>
>      ovs_rwlock_init(&dp->port_rwlock);
>      hmap_init(&dp->ports);
> @@ -532,6 +544,8 @@ dp_netdev_free(struct dp_netdev *dp)
>      OVS_REQUIRES(dp_netdev_mutex)
>  {
>      struct dp_netdev_port *port, *next;
> +    struct dp_netdev_stats *bucket;
> +    int i;
>
>      shash_find_and_delete(&dp_netdevs, dp->name);
>
> @@ -544,9 +558,12 @@ dp_netdev_free(struct dp_netdev *dp)
>          do_del_port(dp, port->port_no);
>      }
>      ovs_rwlock_unlock(&dp->port_rwlock);
> -    ovsthread_counter_destroy(dp->n_hit);
> -    ovsthread_counter_destroy(dp->n_missed);
> -    ovsthread_counter_destroy(dp->n_lost);
> +
> +    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> +        ovs_mutex_destroy(&bucket->mutex);
> +        free_cacheline(bucket);
> +    }
> +    ovsthread_stats_destroy(&dp->stats);
>
>      dp_netdev_purge_queues(dp);
>      seq_destroy(dp->queue_seq);
> @@ -604,14 +621,21 @@ static int
>  dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats
> *stats)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
> +    struct dp_netdev_stats *bucket;
> +    size_t i;
>
>      fat_rwlock_rdlock(&dp->cls.rwlock);
>      stats->n_flows = hmap_count(&dp->flow_table);
>      fat_rwlock_unlock(&dp->cls.rwlock);
>
> -    stats->n_hit = ovsthread_counter_read(dp->n_hit);
> -    stats->n_missed = ovsthread_counter_read(dp->n_missed);
> -    stats->n_lost = ovsthread_counter_read(dp->n_lost);
> +    stats->n_hit = stats->n_missed = stats->n_lost = 0;
> +    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> +        ovs_mutex_lock(&bucket->mutex);
> +        stats->n_hit += bucket->n[DP_STAT_HIT];
> +        stats->n_missed += bucket->n[DP_STAT_MISS];
> +        stats->n_lost += bucket->n[DP_STAT_LOST];
> +        ovs_mutex_unlock(&bucket->mutex);
> +    }
>      stats->n_masks = UINT32_MAX;
>      stats->n_mask_hit = UINT64_MAX;
>
> @@ -1711,6 +1735,25 @@ dp_netdev_flow_used(struct dp_netdev_flow
> *netdev_flow,
>      netdev_flow->tcp_flags |= packet_get_tcp_flags(packet,
> &netdev_flow->flow);
>  }
>
> +static void *
> +dp_netdev_stats_new_cb(void)
> +{
> +    struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
> +    ovs_mutex_init(&bucket->mutex);
> +    return bucket;
> +}
> +
> +static void
> +dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
> +{
> +    struct dp_netdev_stats *bucket;
> +
> +    bucket = ovsthread_stats_bucket_get(&dp->stats,
> dp_netdev_stats_new_cb);
> +    ovs_mutex_lock(&bucket->mutex);
> +    bucket->n[type]++;
> +    ovs_mutex_unlock(&bucket->mutex);
> +}
> +
>  static void
>  dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
>                       struct pkt_metadata *md)
> @@ -1736,9 +1779,9 @@ dp_netdev_port_input(struct dp_netdev *dp, struct
> ofpbuf *packet,
>                                    actions->actions, actions->size);
>          dp_netdev_actions_unref(actions);
>          dp_netdev_flow_unref(netdev_flow);
> -        ovsthread_counter_inc(dp->n_hit, 1);
> +        dp_netdev_count_packet(dp, DP_STAT_HIT);
>      } else {
> -        ovsthread_counter_inc(dp->n_missed, 1);
> +        dp_netdev_count_packet(dp, DP_STAT_MISS);
>          dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
>      }
>  }
> @@ -1788,7 +1831,7 @@ dp_netdev_output_userspace(struct dp_netdev *dp,
> struct ofpbuf *packet,
>
>          error = 0;
>      } else {
> -        ovsthread_counter_inc(dp->n_lost, 1);
> +        dp_netdev_count_packet(dp, DP_STAT_LOST);
>          error = ENOBUFS;
>      }
>      ovs_mutex_unlock(&dp->queue_mutex);
> diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> index 33b9e08..d313c5a 100644
> --- a/lib/ovs-thread.c
> +++ b/lib/ovs-thread.c
> @@ -377,83 +377,54 @@ may_fork(void)
>      return !must_not_fork;
>  }
>
> -/* ovsthread_counter.
> - *
> - * We implement the counter as an array of N_COUNTERS individual
> counters, each
> - * with its own lock.  Each thread uses one of the counters chosen based
> on a
> - * hash of the thread's ID, the idea being that, statistically, different
> - * threads will tend to use different counters and therefore avoid
> - * interfering with each other.
> - *
> - * Undoubtedly, better implementations are possible. */
> -
> -/* Basic counter structure. */
> -struct ovsthread_counter__ {
> -    struct ovs_mutex mutex;
> -    unsigned long long int value;
> -};
> -
> -/* Pad the basic counter structure to 64 bytes to avoid cache line
> - * interference. */
> -struct ovsthread_counter {
> -    struct ovsthread_counter__ c;
> -    char pad[ROUND_UP(sizeof(struct ovsthread_counter__), 64)
> -             - sizeof(struct ovsthread_counter__)];
> -};
> -
> -#define N_COUNTERS 16
> +/* ovsthread_stats. */
>
> -struct ovsthread_counter *
> -ovsthread_counter_create(void)
> +void
> +ovsthread_stats_init(struct ovsthread_stats *stats)
>  {
> -    struct ovsthread_counter *c;
>      int i;
>
> -    c = xmalloc(N_COUNTERS * sizeof *c);
> -    for (i = 0; i < N_COUNTERS; i++) {
> -        ovs_mutex_init(&c[i].c.mutex);
> -        c[i].c.value = 0;
> +    ovs_mutex_init(&stats->mutex);
> +    for (i = 0; i < ARRAY_SIZE(stats->buckets); i++) {
> +        stats->buckets[i] = NULL;
>      }
> -    return c;
>  }
>
>  void
> -ovsthread_counter_destroy(struct ovsthread_counter *c)
> +ovsthread_stats_destroy(struct ovsthread_stats *stats)
>  {
> -    if (c) {
> -        int i;
> -
> -        for (i = 0; i < N_COUNTERS; i++) {
> -            ovs_mutex_destroy(&c[i].c.mutex);
> -        }
> -        free(c);
> -    }
> +    ovs_mutex_destroy(&stats->mutex);
>  }
>
> -void
> -ovsthread_counter_inc(struct ovsthread_counter *c, unsigned long long int
> n)
> +void *
> +ovsthread_stats_bucket_get(struct ovsthread_stats *stats,
> +                           void *(*new_bucket)(void))
>  {
> -    c = &c[hash_int(ovsthread_id_self(), 0) % N_COUNTERS];
> -
> -    ovs_mutex_lock(&c->c.mutex);
> -    c->c.value += n;
> -    ovs_mutex_unlock(&c->c.mutex);
> +    unsigned int hash = hash_int(ovsthread_id_self(), 0);
> +    unsigned int idx = hash & (ARRAY_SIZE(stats->buckets) - 1);
> +    void *bucket = stats->buckets[idx];
> +    if (!bucket) {
> +        ovs_mutex_lock(&stats->mutex);
> +        bucket = stats->buckets[idx];
> +        if (!bucket) {
> +            bucket = stats->buckets[idx] = new_bucket();
> +        }
> +        ovs_mutex_unlock(&stats->mutex);
> +    }
> +    return bucket;
>  }
>
> -unsigned long long int
> -ovsthread_counter_read(const struct ovsthread_counter *c)
> +size_t
> +ovs_thread_stats_next_bucket(const struct ovsthread_stats *stats, size_t
> i)
>  {
> -    unsigned long long int sum;
> -    int i;
> -
> -    sum = 0;
> -    for (i = 0; i < N_COUNTERS; i++) {
> -        ovs_mutex_lock(&c[i].c.mutex);
> -        sum += c[i].c.value;
> -        ovs_mutex_unlock(&c[i].c.mutex);
> +    for (; i < ARRAY_SIZE(stats->buckets); i++) {
> +        if (stats->buckets[i]) {
> +            break;
> +        }
>      }
> -    return sum;
> +    return i;
>  }
> +
>
>  /* Parses /proc/cpuinfo for the total number of physical cores on this
> system
>   * across all CPU packages, not counting hyper-threads.
> diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> index f489308..cf1de64 100644
> --- a/lib/ovs-thread.h
> +++ b/lib/ovs-thread.h
> @@ -592,11 +592,24 @@ ovsthread_id_self(void)
>   *
>   * Fully thread-safe. */
>
> -struct ovsthread_counter *ovsthread_counter_create(void);
> -void ovsthread_counter_destroy(struct ovsthread_counter *);
> -void ovsthread_counter_inc(struct ovsthread_counter *, unsigned long long
> int);
> -unsigned long long int ovsthread_counter_read(
> -    const struct ovsthread_counter *);
> +struct ovsthread_stats {
> +    struct ovs_mutex mutex;
> +    void *volatile buckets[16];
> +};
> +
> +void ovsthread_stats_init(struct ovsthread_stats *);
> +void ovsthread_stats_destroy(struct ovsthread_stats *);
> +
> +void *ovsthread_stats_bucket_get(struct ovsthread_stats *,
> +                                 void *(*new_bucket)(void));
> +
> +#define OVSTHREAD_STATS_FOR_EACH_BUCKET(BUCKET, IDX, STATS)             \
> +    for ((IDX) = ovs_thread_stats_next_bucket(STATS, 0);                \
> +         ((IDX) < ARRAY_SIZE((STATS)->buckets)                          \
> +          ? ((BUCKET) = (STATS)->buckets[IDX], true)                    \
> +          : false);                                                     \
> +         (IDX) = ovs_thread_stats_next_bucket(STATS, (IDX) + 1))
> +size_t ovs_thread_stats_next_bucket(const struct ovsthread_stats *,
> size_t);
>
>  bool single_threaded(void);
>
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to