The netflow code has its tentacles all over the ofproto-dpif module. This is fine today, but in future facets, which correspond roughly to netflow_flows, will be retired. In preparation, this patch hides as much implementation detail as possible inside the netflow module.
Signed-off-by: Ethan Jackson <et...@nicira.com> --- ofproto/netflow.c | 246 ++++++++++++++++++++++++++++++------------ ofproto/netflow.h | 29 ++--- ofproto/ofproto-dpif-upcall.c | 3 +- ofproto/ofproto-dpif.c | 74 +++---------- ofproto/ofproto.h | 8 -- 5 files changed, 196 insertions(+), 164 deletions(-) diff --git a/ofproto/netflow.c b/ofproto/netflow.c index a094bac..9055757 100644 --- a/ofproto/netflow.c +++ b/ofproto/netflow.c @@ -22,6 +22,7 @@ #include <unistd.h> #include "byte-order.h" #include "collectors.h" +#include "dpif.h" #include "flow.h" #include "lib/netflow.h" #include "ofpbuf.h" @@ -49,8 +50,37 @@ struct netflow { long long int active_timeout; /* Timeout for flows that are still active. */ long long int next_timeout; /* Next scheduled active timeout. */ long long int reconfig_time; /* When we reconfigured the timeouts. */ + + struct hmap flows; /* Contains 'netflow_flows'. */ +}; + +struct netflow_flow { + struct hmap_node hmap_node; + + long long int last_expired; /* Time this flow last timed out. */ + long long int created; /* Time flow was created since time out. */ + + ofp_port_t output_iface; /* Output interface index. */ + uint16_t tcp_flags; /* Bitwise-OR of all TCP flags seen. */ + + ofp_port_t in_port; /* Input port. */ + ovs_be32 nw_src; /* IPv4 source address. */ + ovs_be32 nw_dst; /* IPv4 destination address. */ + uint8_t nw_tos; /* IP ToS (including DSCP and ECN). */ + uint8_t nw_proto; /* IP protocol. */ + ovs_be16 tp_src; /* TCP/UDP/SCTP source port. */ + ovs_be16 tp_dst; /* TCP/UDP/SCTP destination port. */ + + uint64_t packet_count; /* Packets from subrules. */ + uint64_t byte_count; /* Bytes from subrules. */ + long long int used; /* Last-used time (0 if never used). */ }; +static struct netflow_flow *netflow_flow_lookup(struct netflow *, + struct flow *); +static uint32_t netflow_flow_hash(struct flow *); +static void netflow_expire__(struct netflow *, struct netflow_flow *); + void netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) { @@ -67,7 +97,6 @@ netflow_mask_wc(struct flow *flow, struct flow_wildcards *wc) static void gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, - struct ofexpired *expired, uint32_t packet_count, uint32_t byte_count) { struct netflow_v5_header *nf_hdr; @@ -94,38 +123,37 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, nf_hdr->count = htons(ntohs(nf_hdr->count) + 1); nf_rec = ofpbuf_put_zeros(&nf->packet, sizeof *nf_rec); - nf_rec->src_addr = expired->flow.nw_src; - nf_rec->dst_addr = expired->flow.nw_dst; + nf_rec->src_addr = nf_flow->nw_src; + nf_rec->dst_addr = nf_flow->nw_dst; nf_rec->nexthop = htonl(0); if (nf->add_id_to_iface) { uint16_t iface = (nf->engine_id & 0x7f) << 9; - nf_rec->input = htons(iface - | (ofp_to_u16(expired->flow.in_port.ofp_port) & 0x1ff)); + nf_rec->input = htons(iface | (ofp_to_u16(nf_flow->in_port) & 0x1ff)); nf_rec->output = htons(iface | (ofp_to_u16(nf_flow->output_iface) & 0x1ff)); } else { - nf_rec->input = htons(ofp_to_u16(expired->flow.in_port.ofp_port)); + nf_rec->input = htons(ofp_to_u16(nf_flow->in_port)); nf_rec->output = htons(ofp_to_u16(nf_flow->output_iface)); } nf_rec->packet_count = htonl(packet_count); nf_rec->byte_count = htonl(byte_count); nf_rec->init_time = htonl(nf_flow->created - nf->boot_time); - nf_rec->used_time = htonl(MAX(nf_flow->created, expired->used) + nf_rec->used_time = htonl(MAX(nf_flow->created, nf_flow->used) - nf->boot_time); - if (expired->flow.nw_proto == IPPROTO_ICMP) { + if (nf_flow->nw_proto == IPPROTO_ICMP) { /* In NetFlow, the ICMP type and code are concatenated and * placed in the 'dst_port' field. */ - uint8_t type = ntohs(expired->flow.tp_src); - uint8_t code = ntohs(expired->flow.tp_dst); + uint8_t type = ntohs(nf_flow->tp_src); + uint8_t code = ntohs(nf_flow->tp_dst); nf_rec->src_port = htons(0); nf_rec->dst_port = htons((type << 8) | code); } else { - nf_rec->src_port = expired->flow.tp_src; - nf_rec->dst_port = expired->flow.tp_dst; + nf_rec->src_port = nf_flow->tp_src; + nf_rec->dst_port = nf_flow->tp_dst; } - nf_rec->tcp_flags = (uint8_t)nf_flow->tcp_flags; - nf_rec->ip_proto = expired->flow.nw_proto; - nf_rec->ip_tos = expired->flow.nw_tos & IP_DSCP_MASK; + nf_rec->tcp_flags = (uint8_t) nf_flow->tcp_flags; + nf_rec->ip_proto = nf_flow->nw_proto; + nf_rec->ip_tos = nf_flow->nw_tos & IP_DSCP_MASK; /* NetFlow messages are limited to 30 records. */ if (ntohs(nf_hdr->count) >= 30) { @@ -134,17 +162,65 @@ gen_netflow_rec(struct netflow *nf, struct netflow_flow *nf_flow, } void -netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow, - struct ofexpired *expired) +netflow_flow_update(struct netflow *nf, struct flow *flow, + ofp_port_t output_iface, + const struct dpif_flow_stats *stats) { - uint64_t pkt_delta = expired->packet_count - nf_flow->packet_count_off; - uint64_t byte_delta = expired->byte_count - nf_flow->byte_count_off; + struct netflow_flow *nf_flow; + long long int used; + + /* NetFlow only reports on IP packets. */ + if (flow->dl_type != htons(ETH_TYPE_IP)) { + return; + } + + nf_flow = netflow_flow_lookup(nf, flow); + if (!nf_flow) { + nf_flow = xzalloc(sizeof *nf_flow); + nf_flow->in_port = flow->in_port.ofp_port; + nf_flow->nw_src = flow->nw_src; + nf_flow->nw_dst = flow->nw_dst; + nf_flow->nw_tos = flow->nw_tos; + nf_flow->nw_proto = flow->nw_proto; + nf_flow->tp_src = flow->tp_src; + nf_flow->tp_dst = flow->tp_dst; + nf_flow->created = stats->used; + nf_flow->output_iface = output_iface; + hmap_insert(&nf->flows, &nf_flow->hmap_node, netflow_flow_hash(flow)); + } + + if (nf_flow->output_iface != output_iface) { + netflow_expire__(nf, nf_flow); + nf_flow->output_iface = output_iface; + } + + nf_flow->packet_count += stats->n_packets; + nf_flow->byte_count += stats->n_bytes; + nf_flow->tcp_flags |= stats->tcp_flags; + + used = MAX(nf_flow->used, stats->used); + if (nf_flow->used != used) { + nf_flow->used = used; + if (!nf->active_timeout || !nf_flow->last_expired + || nf->reconfig_time > nf_flow->last_expired) { + /* Keep the time updated to prevent a flood of expiration in + * the future. */ + nf_flow->last_expired = time_msec(); + } + } +} + +static void +netflow_expire__(struct netflow *nf, struct netflow_flow *nf_flow) +{ + uint64_t pkt_delta, byte_delta; + + pkt_delta = nf_flow->packet_count; + byte_delta = nf_flow->byte_count; nf_flow->last_expired += nf->active_timeout; - /* NetFlow only reports on IP packets and we should only report flows - * that actually have traffic. */ - if (expired->flow.dl_type != htons(ETH_TYPE_IP) || pkt_delta == 0) { + if (pkt_delta == 0) { return; } @@ -159,7 +235,7 @@ netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow, uint32_t pkt_count = pkt_delta / n_recs; uint32_t byte_count = byte_delta / n_recs; - gen_netflow_rec(nf, nf_flow, expired, pkt_count, byte_count); + gen_netflow_rec(nf, nf_flow, pkt_count, byte_count); pkt_delta -= pkt_count; byte_delta -= byte_count; @@ -181,26 +257,65 @@ netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow, /* Update flow tracking data. */ nf_flow->created = 0; - nf_flow->packet_count_off = expired->packet_count; - nf_flow->byte_count_off = expired->byte_count; + nf_flow->packet_count = 0; + nf_flow->byte_count = 0; nf_flow->tcp_flags = 0; } +void +netflow_expire(struct netflow *nf, struct flow *flow) +{ + struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + + if (nf_flow) { + netflow_expire__(nf, nf_flow); + } +} + +void +netflow_flow_clear(struct netflow *nf, struct flow *flow) +{ + struct netflow_flow *nf_flow = netflow_flow_lookup(nf, flow); + + if (nf_flow) { + ovs_assert(!nf_flow->packet_count); + ovs_assert(!nf_flow->byte_count); + hmap_remove(&nf->flows, &nf_flow->hmap_node); + free(nf_flow); + } +} + /* Returns true if it's time to send out a round of NetFlow active timeouts, * false otherwise. */ -bool +void netflow_run(struct netflow *nf) { + long long int now = time_msec(); + struct netflow_flow *nf_flow, *next; + if (nf->packet.size) { collectors_send(nf->collectors, nf->packet.data, nf->packet.size); nf->packet.size = 0; } - if (nf->active_timeout && time_msec() >= nf->next_timeout) { - nf->next_timeout = time_msec() + 1000; - return true; - } else { - return false; + if (!nf->active_timeout || now < nf->next_timeout) { + return; + } + + nf->next_timeout = now + 1000; + + HMAP_FOR_EACH_SAFE (nf_flow, next, hmap_node, &nf->flows) { + if (now > nf_flow->last_expired + nf->active_timeout) { + bool idle = nf_flow->used < nf_flow->last_expired; + netflow_expire__(nf, nf_flow); + + if (idle) { + /* If the netflow_flow hasn't been used in a while, it's + * possilbe the upper layer lost track of it. */ + hmap_remove(&nf->flows, &nf_flow->hmap_node); + free(nf_flow); + } + } } } @@ -254,6 +369,7 @@ netflow_create(void) nf->collectors = NULL; nf->add_id_to_iface = false; nf->netflow_cnt = 0; + hmap_init(&nf->flows); ofpbuf_init(&nf->packet, 1500); return nf; } @@ -267,52 +383,42 @@ netflow_destroy(struct netflow *nf) free(nf); } } + +/* Helpers. */ -/* Initializes a new 'nf_flow' given that the caller has already cleared it to - * all-zero-bits. */ -void -netflow_flow_init(struct netflow_flow *nf_flow OVS_UNUSED) +static struct netflow_flow * +netflow_flow_lookup(struct netflow *nf, struct flow *flow) { - /* Nothing to do. */ -} - -void -netflow_flow_clear(struct netflow_flow *nf_flow) -{ - ofp_port_t output_iface = nf_flow->output_iface; - - memset(nf_flow, 0, sizeof *nf_flow); - nf_flow->output_iface = output_iface; -} - -void -netflow_flow_update_time(struct netflow *nf, struct netflow_flow *nf_flow, - long long int used) -{ - if (!nf_flow->created) { - nf_flow->created = used; + struct netflow_flow *nf_flow; + + HMAP_FOR_EACH_WITH_HASH (nf_flow, hmap_node, netflow_flow_hash(flow), + &nf->flows) { + if (flow->in_port.ofp_port == nf_flow->in_port + && flow->nw_src == nf_flow->nw_src + && flow->nw_dst == nf_flow->nw_dst + && flow->nw_tos == nf_flow->nw_tos + && flow->nw_proto == nf_flow->nw_proto + && flow->tp_src == nf_flow->tp_src + && flow->tp_dst == nf_flow->tp_dst) { + return nf_flow; + } } - if (!nf || !nf->active_timeout || !nf_flow->last_expired || - nf->reconfig_time > nf_flow->last_expired) { - /* Keep the time updated to prevent a flood of expiration in - * the future. */ - nf_flow->last_expired = time_msec(); - } + return NULL; } -void -netflow_flow_update_flags(struct netflow_flow *nf_flow, uint16_t tcp_flags) +static uint32_t +netflow_flow_hash(struct flow *flow) { - nf_flow->tcp_flags |= tcp_flags; -} + uint32_t hash = 0; -bool -netflow_active_timeout_expired(struct netflow *nf, struct netflow_flow *nf_flow) -{ - if (nf->active_timeout) { - return time_msec() > nf_flow->last_expired + nf->active_timeout; - } + hash = mhash_add(hash, (OVS_FORCE uint32_t) flow->in_port.ofp_port); + hash = mhash_add(hash, ntohl(flow->nw_src)); + hash = mhash_add(hash, ntohl(flow->nw_dst)); + hash = mhash_add(hash, flow->nw_tos); + hash = mhash_add(hash, flow->nw_proto); + hash = mhash_add(hash, ntohs(flow->tp_src)); + hash = mhash_add(hash, ntohs(flow->tp_dst)); - return false; + return mhash_finish(hash, 28); } diff --git a/ofproto/netflow.h b/ofproto/netflow.h index e1a2443..6493841 100644 --- a/ofproto/netflow.h +++ b/ofproto/netflow.h @@ -28,8 +28,6 @@ * accounted.) */ #define NF_ACTIVE_TIMEOUT_DEFAULT 600 -struct ofexpired; - struct netflow_options { struct sset collectors; uint8_t engine_type; @@ -42,33 +40,20 @@ struct netflow_options { #define NF_OUT_MULTI OFP_PORT_C(UINT16_MAX - 1) #define NF_OUT_DROP OFP_PORT_C(UINT16_MAX - 2) -struct netflow_flow { - long long int last_expired; /* Time this flow last timed out. */ - long long int created; /* Time flow was created since time out. */ - - uint64_t packet_count_off; /* Packet count at last time out. */ - uint64_t byte_count_off; /* Byte count at last time out. */ - - ofp_port_t output_iface; /* Output interface index. */ - uint16_t tcp_flags; /* Bitwise-OR of all TCP flags seen. */ -}; - struct netflow *netflow_create(void); void netflow_destroy(struct netflow *); int netflow_set_options(struct netflow *, const struct netflow_options *); -void netflow_expire(struct netflow *, struct netflow_flow *, - struct ofexpired *); +void netflow_expire(struct netflow *, struct flow *); -bool netflow_run(struct netflow *); +void netflow_run(struct netflow *); void netflow_wait(struct netflow *); void netflow_mask_wc(struct flow *, struct flow_wildcards *); -void netflow_flow_init(struct netflow_flow *); -void netflow_flow_clear(struct netflow_flow *); -void netflow_flow_update_time(struct netflow *, struct netflow_flow *, - long long int used); -void netflow_flow_update_flags(struct netflow_flow *, uint16_t tcp_flags); -bool netflow_active_timeout_expired(struct netflow *, struct netflow_flow *); +void netflow_flow_clear(struct netflow *netflow, struct flow *flow); + +void netflow_flow_update(struct netflow *nf, struct flow *flow, + ofp_port_t output_iface, + const struct dpif_flow_stats *); #endif /* netflow.h */ diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index 285b092..6e65572 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -252,8 +252,7 @@ udpif_revalidate(struct udpif *udpif) /* Since we remove each miss on revalidation, their statistics won't be * accounted to the appropriate 'facet's in the upper layer. In most * cases, this is alright because we've already pushed the stats to the - * relevant rules. However, NetFlow requires absolute packet counts on - * 'facet's which could now be incorrect. */ + * relevant rules. */ atomic_add(&udpif->reval_seq, 1, &junk); guarded_list_pop_all(&udpif->fmbs, &fmbs); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index a8eb0cb..a38bf87 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -285,7 +285,6 @@ struct facet { /* Accounting. */ uint64_t accounted_bytes; /* Bytes processed by facet_account(). */ - struct netflow_flow nf_flow; /* Per-flow NetFlow tracking data. */ uint16_t tcp_flags; /* TCP flags seen for this 'rule'. */ struct xlate_out xout; @@ -540,9 +539,6 @@ static void handle_upcalls(struct dpif_backer *); /* Flow expiration. */ static int expire(struct dpif_backer *); -/* NetFlow. */ -static void send_netflow_active_timeouts(struct ofproto_dpif *); - /* Global variables. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -1481,9 +1477,7 @@ run(struct ofproto *ofproto_) } if (ofproto->netflow) { - if (netflow_run(ofproto->netflow)) { - send_netflow_active_timeouts(ofproto); - } + netflow_run(ofproto->netflow); } if (ofproto->sflow) { dpif_sflow_run(ofproto->sflow); @@ -3807,8 +3801,6 @@ facet_create(const struct flow_miss *miss) facet->learn_rl = time_msec() + 500; list_init(&facet->subfacets); - netflow_flow_init(&facet->nf_flow); - netflow_flow_update_time(ofproto->netflow, &facet->nf_flow, facet->used); xlate_out_copy(&facet->xout, &miss->xout); @@ -3818,7 +3810,11 @@ facet_create(const struct flow_miss *miss) classifier_insert(&ofproto->facets, &facet->cr); ovs_rwlock_unlock(&ofproto->facets.rwlock); - facet->nf_flow.output_iface = facet->xout.nf_output_iface; + if (ofproto->netflow && !facet_is_controller_flow(facet)) { + netflow_flow_update(ofproto->netflow, &facet->flow, + facet->xout.nf_output_iface, &miss->stats); + } + return facet; } @@ -4038,19 +4034,13 @@ facet_flush_stats(struct facet *facet) } if (ofproto->netflow && !facet_is_controller_flow(facet)) { - struct ofexpired expired; - expired.flow = facet->flow; - expired.packet_count = facet->packet_count; - expired.byte_count = facet->byte_count; - expired.used = facet->used; - netflow_expire(ofproto->netflow, &facet->nf_flow, &expired); + netflow_expire(ofproto->netflow, &facet->flow); + netflow_flow_clear(ofproto->netflow, &facet->flow); } /* Reset counters to prevent double counting if 'facet' ever gets * reinstalled. */ facet_reset_counters(facet); - - netflow_flow_clear(&facet->nf_flow); facet->tcp_flags = 0; } @@ -4226,7 +4216,6 @@ facet_revalidate(struct facet *facet) facet->xout.has_fin_timeout = xout.has_fin_timeout; facet->xout.nf_output_iface = xout.nf_output_iface; facet->xout.mirrors = xout.mirrors; - facet->nf_flow.output_iface = facet->xout.nf_output_iface; ovs_mutex_lock(&new_rule->up.mutex); facet->used = MAX(facet->used, new_rule->up.created); @@ -4284,9 +4273,10 @@ facet_push_stats(struct facet *facet, bool may_learn) facet->prev_byte_count = facet->byte_count; facet->prev_used = facet->used; - netflow_flow_update_time(facet->ofproto->netflow, &facet->nf_flow, - facet->used); - netflow_flow_update_flags(&facet->nf_flow, facet->tcp_flags); + if (facet->ofproto->netflow && !facet_is_controller_flow(facet)) { + netflow_flow_update(facet->ofproto->netflow, &facet->flow, + facet->xout.nf_output_iface, &stats); + } mirror_update_stats(facet->ofproto->mbridge, facet->xout.mirrors, stats.n_packets, stats.n_bytes); flow_push_stats(facet->ofproto, &facet->flow, &stats, may_learn); @@ -5053,46 +5043,6 @@ get_netflow_ids(const struct ofproto *ofproto_, dpif_get_netflow_ids(ofproto->backer->dpif, engine_type, engine_id); } - -static void -send_active_timeout(struct ofproto_dpif *ofproto, struct facet *facet) -{ - if (!facet_is_controller_flow(facet) && - netflow_active_timeout_expired(ofproto->netflow, &facet->nf_flow)) { - struct subfacet *subfacet; - struct ofexpired expired; - - LIST_FOR_EACH (subfacet, list_node, &facet->subfacets) { - if (subfacet->path == SF_FAST_PATH) { - struct dpif_flow_stats stats; - - subfacet_install(subfacet, &facet->xout.odp_actions, - &stats); - subfacet_update_stats(subfacet, &stats); - } - } - - expired.flow = facet->flow; - expired.packet_count = facet->packet_count; - expired.byte_count = facet->byte_count; - expired.used = facet->used; - netflow_expire(ofproto->netflow, &facet->nf_flow, &expired); - } -} - -static void -send_netflow_active_timeouts(struct ofproto_dpif *ofproto) -{ - struct cls_cursor cursor; - struct facet *facet; - - ovs_rwlock_rdlock(&ofproto->facets.rwlock); - cls_cursor_init(&cursor, &ofproto->facets, NULL); - CLS_CURSOR_FOR_EACH (facet, cr, &cursor) { - send_active_timeout(ofproto, facet); - } - ovs_rwlock_unlock(&ofproto->facets.rwlock); -} static struct ofproto_dpif * ofproto_dpif_lookup(const char *name) diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h index 50185ff..557eed9 100644 --- a/ofproto/ofproto.h +++ b/ofproto/ofproto.h @@ -53,13 +53,6 @@ struct ofproto_controller_info { } pairs; }; -struct ofexpired { - struct flow flow; - uint64_t packet_count; /* Packets from subrules. */ - uint64_t byte_count; /* Bytes from subrules. */ - long long int used; /* Last-used time (0 if never used). */ -}; - struct ofproto_sflow_options { struct sset targets; uint32_t sampling_rate; @@ -70,7 +63,6 @@ struct ofproto_sflow_options { char *control_ip; }; - struct ofproto_ipfix_bridge_exporter_options { struct sset targets; uint32_t sampling_rate; -- 1.8.1.2 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev