In future patches upcall handler threads will need to update netflow. Signed-off-by: Ethan Jackson <et...@nicira.com> --- ofproto/netflow.c | 100 +++++++++++++++++++++++++++++++++++-------------- ofproto/netflow.h | 4 +- ofproto/ofproto-dpif.c | 4 +- 3 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/ofproto/netflow.c b/ofproto/netflow.c index 9055757..bf3501d 100644 --- a/ofproto/netflow.c +++ b/ofproto/netflow.c @@ -52,6 +52,8 @@ struct netflow { long long int reconfig_time; /* When we reconfigured the timeouts. */ struct hmap flows; /* Contains 'netflow_flows'. */ + + atomic_int ref_cnt; }; struct netflow_flow { @@ -76,10 +78,14 @@ struct netflow_flow { long long int used; /* Last-used time (0 if never used). */ }; +static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER; + static struct netflow_flow *netflow_flow_lookup(struct netflow *, - struct flow *); + struct flow *) + OVS_REQUIRES(mutex); static uint32_t netflow_flow_hash(struct flow *); -static void netflow_expire__(struct netflow *, struct netflow_flow *); +static void netflow_expire__(struct netflow *, struct netflow_flow *) + OVS_REQUIRES(mutex); void netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) @@ -98,6 +104,7 @@ netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) static void gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, uint32_t packet_count, uint32_t byte_count) + OVS_REQUIRES(mutex) { struct netflow_v5_header *nf_hdr; struct netflow_v5_record *nf_rec; @@ -165,6 +172,7 @@ void netflow_flow_update(struct netflow *nf, struct flow *flow, ofp_port_t output_iface, const struct dpif_flow_stats *stats) + OVS_EXCLUDED(mutex) { struct netflow_flow *nf_flow; long long int used; @@ -174,6 +182,7 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, return; } + ovs_mutex_lock(&mutex); nf_flow = netflow_flow_lookup(nf, flow); if (!nf_flow) { nf_flow = xzalloc(sizeof *nf_flow); @@ -208,10 +217,13 @@ netflow_flow_update(struct netflow *nf, struct flow *flow, nf_flow->last_expired = time_msec(); } } + + ovs_mutex_unlock(&mutex); } static void netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) + OVS_REQUIRES(mutex) { uint64_t pkt_delta, byte_delta; @@ -263,80 +275,90 @@ netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) } void -netflow_expire(struct netflow *nf, struct flow *flow) +netflow_expire(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { netflow_expire__(nf, nf_flow); } + ovs_mutex_unlock(&mutex); } void -netflow_flow_clear(struct netflow *nf, struct flow *flow) +netflow_flow_clear(struct netflow *nf, struct flow *flow) OVS_EXCLUDED(mutex) { - struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + struct netflow_flow *nf_flow; + ovs_mutex_lock(&mutex); + nf_flow = netflow_flow_lookup(nf, flow); if (nf_flow) { ovs_assert(!nf_flow->packet_count); ovs_assert(!nf_flow->byte_count); hmap_remove(&nf->flows, &nf_flow->hmap_node); free(nf_flow); } + ovs_mutex_unlock(&mutex); } /* Returns true if it's time to send out a round of NetFlow active timeouts, * false otherwise. */ void -netflow_run(struct netflow *nf) +netflow_run(struct netflow *nf) OVS_EXCLUDED(mutex) { long long int now = time_msec(); - struct netflow_flow *nf_flow, *next; + ovs_mutex_lock(&mutex); if (nf->packet.size) { collectors_send(nf->collectors, nf->packet.data, nf->packet.size); nf->packet.size = 0; } - if (!nf->active_timeout || now < nf->next_timeout) { - return; - } - - nf->next_timeout = now + 1000; - - HMAP_FOR_EACH_SAFE (nf_flow, next, hmap_node, &nf->flows) { - if (now > nf_flow->last_expired + nf->active_timeout) { - bool idle = nf_flow->used < nf_flow->last_expired; - netflow_expire__(nf, nf_flow); - - if (idle) { - /* If the netflow_flow hasn't been used in a while, it's - * possilbe the upper layer lost track of it. */ - hmap_remove(&nf->flows, &nf_flow->hmap_node); - free(nf_flow); + if (nf->active_timeout && now >= nf->next_timeout) { + struct netflow_flow *nf_flow, *next; + + nf->next_timeout = now + 1000; + HMAP_FOR_EACH_SAFE (nf_flow, next, hmap_node, &nf->flows) { + if (now > nf_flow->last_expired + nf->active_timeout) { + bool idle = nf_flow->used < nf_flow->last_expired; + netflow_expire__(nf, nf_flow); + + if (idle) { + /* If the netflow_flow hasn't been used in a while, it's + * possilbe the upper layer lost track of it. */ + hmap_remove(&nf->flows, &nf_flow->hmap_node); + free(nf_flow); + } } } } + ovs_mutex_unlock(&mutex); } void -netflow_wait(struct netflow *nf) +netflow_wait(struct netflow *nf) OVS_EXCLUDED(mutex) { + ovs_mutex_lock(&mutex); if (nf->active_timeout) { poll_timer_wait_until(nf->next_timeout); } if (nf->packet.size) { poll_immediate_wake(); } + ovs_mutex_unlock(&mutex); } int netflow_set_options(struct netflow *nf, const struct netflow_options *nf_options) + OVS_EXCLUDED(mutex) { int error = 0; long long int old_timeout; + ovs_mutex_lock(&mutex); nf->engine_type = nf_options->engine_type; nf->engine_id = nf_options->engine_id; nf->add_id_to_iface = nf_options->add_id_to_iface; @@ -355,6 +377,7 @@ netflow_set_options(struct netflow *nf, nf->reconfig_time = time_msec(); nf->next_timeout = time_msec(); } + ovs_mutex_unlock(&mutex); return error; } @@ -370,14 +393,35 @@ netflow_create(void) nf->add_id_to_iface = false; nf->netflow_cnt = 0; hmap_init(&nf->flows); + atomic_init(&nf->ref_cnt, 1); ofpbuf_init(&nf->packet, 1500); return nf; } -void -netflow_destroy(struct netflow *nf) +struct netflow * +netflow_ref(const struct netflow *nf_) { + struct netflow *nf = CONST_CAST(struct netflow *, nf_); if (nf) { + int orig; + atomic_add(&nf->ref_cnt, 1, &orig); + ovs_assert(orig > 0); + } + return nf; +} + +void +netflow_unref(struct netflow *nf) +{ + int orig; + + if (!nf) { + return; + } + + atomic_sub(&nf->ref_cnt, 1, &orig); + ovs_assert(orig > 0); + if (orig == 1) { ofpbuf_uninit(&nf->packet); collectors_destroy(nf->collectors); free(nf); @@ -387,7 +431,7 @@ netflow_destroy(struct netflow *nf) /* Helpers. */ static struct netflow_flow * -netflow_flow_lookup(struct netflow *nf, struct flow *flow) +netflow_flow_lookup(struct netflow *nf, struct flow *flow) OVS_REQUIRES(mutex) { struct netflow_flow *nf_flow; diff --git a/ofproto/netflow.h b/ofproto/netflow.h index 6493841..f37cfa7 100644 --- a/ofproto/netflow.h +++ b/ofproto/netflow.h @@ -41,7 +41,9 @@ struct netflow_options { #define NF_OUT_DROP OFP_PORT_C(UINT16_MAX - 2) struct netflow *netflow_create(void); -void netflow_destroy(struct netflow *); +struct netflow *netflow_ref(const struct netflow *); +void netflow_unref(struct netflow *); + int netflow_set_options(struct netflow *, const struct netflow_options *); void netflow_expire(struct netflow *, struct flow *); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index a38bf87..70187c8 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -1406,7 +1406,7 @@ destruct(struct ofproto *ofproto_) mbridge_unref(ofproto->mbridge); - netflow_destroy(ofproto->netflow); + netflow_unref(ofproto->netflow); dpif_sflow_unref(ofproto->sflow); hmap_destroy(&ofproto->bundles); mac_learning_unref(ofproto->ml); @@ -5028,7 +5028,7 @@ set_netflow(struct ofproto *ofproto_, return netflow_set_options(ofproto->netflow, netflow_options); } else if (ofproto->netflow) { ofproto->backer->need_revalidate = REV_RECONFIGURE; - netflow_destroy(ofproto->netflow); + netflow_unref(ofproto->netflow); ofproto->netflow = NULL; } -- 1.8.1.2 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev