Buffer a multi-part requests until all its parts are received. This is achieved by initialising the list_node field of messages and passing them to ofmp_req_filter().
* If the message is not recognised as part of a multi-part requests it is simply returned and processing continues as before. * If the messages part of a multipart request but is not the last message of that request then it is buffered and ofmp_req_filter() returns NULL indicating that the message should be skipped for now. * Otherwise, if the message is the last part of a multipart request then the first message that is the part of the request is returned and any subsequent parts are accessible via its list_head field. Some implementation notes: * As the list_head field may now contain messages ofpbuf_list_delete should be used to delete them as necessary. * This places a limit of OFCONN_MP_REQ_MAX (=1024) on the number of multipart requests may be buffered. * This code should have no affect on message handling at this time as ofpmsg_may_buffer_mp_request() always returns false. It should be updated along with support for traversing the list_head field of messages when adding multipart request support to individual message types. Signed-off-by: Simon Horman <ho...@verge.net.au> --- v3 * Rebase * Correct type of hash parameter to ofmp_req_find, it should be uint32_t rather than ovs_be32 as the value is in host byte order. v2 * Use ofpmsg_may_buffer_mp_request() to only buffer messages for which buffering is enabled. Currently there are none. * Annotate mp_reqs field of struct ofconn with OVS_GUARDED_BY(ofproto_mutex) * Add locking * Add limitation on the number of messages that may be buffered Signed-off-by: Simon Horman <ho...@verge.net.au> --- ofproto/connmgr.c | 243 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 242 insertions(+), 1 deletion(-) diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c index 7fc42b1..d8673d2 100644 --- a/ofproto/connmgr.c +++ b/ofproto/connmgr.c @@ -137,6 +137,12 @@ struct ofconn { /* Active bundles. Contains "struct ofp_bundle"s. */ struct hmap bundles; + + /* Partial multipart messages. Contains "struct ofmp_req"s. */ + struct hmap mp_reqs OVS_GUARDED_BY(ofproto_mutex); +#define OFCONN_MP_REQ_MAX 1024 + /* Number of buffered partial multipart messages */ + size_t mp_req_count OVS_GUARDED_BY(ofproto_mutex); }; static struct ofconn *ofconn_create(struct connmgr *, struct rconn *, @@ -218,6 +224,16 @@ struct connmgr { int in_band_queue; }; +/* Buffer for multipart requests */ +struct ofmp_req { + struct hmap_node hmap_node; /* In struct ofconn's 'mp_reqs' hmap. */ + struct ofpbuf *front; /* First message. + * Remaining messages are linked via its + * list_node */ +}; + +static void ofmp_req_destroy(struct ofconn *, struct ofmp_req *mp_req); + static void update_in_band_remotes(struct connmgr *); static void add_snooper(struct connmgr *, struct vconn *); static void ofmonitor_run(struct connmgr *); @@ -1161,6 +1177,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type, hmap_init(&ofconn->monitors); list_init(&ofconn->updates); + hmap_init(&ofconn->mp_reqs); hmap_init(&ofconn->bundles); @@ -1176,6 +1193,7 @@ ofconn_flush(struct ofconn *ofconn) OVS_REQUIRES(ofproto_mutex) { struct ofmonitor *monitor, *next_monitor; + struct ofmp_req *mp_req, *next_mp_req; int i; ofconn_log_flow_mods(ofconn); @@ -1244,6 +1262,9 @@ ofconn_flush(struct ofconn *ofconn) &ofconn->monitors) { ofmonitor_destroy(monitor); } + HMAP_FOR_EACH_SAFE (mp_req, next_mp_req, hmap_node, &ofconn->mp_reqs) { + ofmp_req_destroy(ofconn, mp_req); + } rconn_packet_counter_destroy(ofconn->monitor_counter); ofconn->monitor_counter = rconn_packet_counter_create(); ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */ @@ -1262,6 +1283,7 @@ ofconn_destroy(struct ofconn *ofconn) ofp_bundle_remove_all(ofconn); hmap_destroy(&ofconn->monitors); + hmap_destroy(&ofconn->mp_reqs); list_remove(&ofconn->node); rconn_destroy(ofconn->rconn); rconn_packet_counter_destroy(ofconn->packet_in_counter); @@ -1304,6 +1326,202 @@ ofconn_may_recv(const struct ofconn *ofconn) return count < OFCONN_REPLY_MAX; } +static struct ofmp_req * +ofmp_req_create(void) +{ + struct ofmp_req *mp_req; + + mp_req = xmalloc(sizeof *mp_req); + mp_req->front = NULL; + + return mp_req; +} + +/* Destroy a buffered multipart request and all its component messages */ +static void +ofmp_req_destroy(struct ofconn *ofconn, struct ofmp_req *mp_req) + OVS_REQUIRES(ofproto_mutex) +{ + if (!mp_req) { + return; + } + if (mp_req->front) { + ofconn->mp_req_count -= list_size(&mp_req->front->list_node) + 1; + ofpbuf_list_delete(&mp_req->front->list_node); + ofpbuf_delete(mp_req->front); + } + hmap_remove(&ofconn->mp_reqs, &mp_req->hmap_node); + free(mp_req); +} + +/* Multi-part requests */ +COVERAGE_DEFINE(mp_req_overflow); + +static uint32_t +ofmp_req_hash(ovs_be32 xid) +{ + return hash_int(ntohl(xid), 0); +} + +/* Find the buffer for a multipart message in 'ofconn' + * with transaction id 'xid' using 'hash' which is calculated + * as ofmpp_req_hash(xid) */ +static struct ofmp_req * +ofmp_req_find(const struct ofconn *ofconn, ovs_be32 xid, uint32_t hash) + OVS_REQUIRES(ofproto_mutex) +{ + struct ofmp_req *mp_req; + + HMAP_FOR_EACH_WITH_HASH (mp_req, hmap_node, hash, &ofconn->mp_reqs) { + const struct ofp_header *oh = ofpbuf_data(mp_req->front); + + if (xid == oh->xid) { + return mp_req; + } + } + + return NULL; +} + +/* Buffer 'part' as a component of a multipart request 'mp_req' in 'ofconn' + * using 'hash' which is calculated as ofmpp_req_hash(xid), where xid + * is the transaction id of 'part'. + * + * If 'mp_req' is NULL then create a new multipart request buffer in + * 'ofconn'. */ +static struct ofmp_req * +ofmp_req_add(struct ofconn *ofconn, struct ofmp_req *mp_req, + struct ofpbuf *part, uint32_t hash) + OVS_REQUIRES(ofproto_mutex) +{ + /* Too many requests! */ + if (ofconn->mp_req_count >= OFCONN_MP_REQ_MAX) { + ofmp_req_destroy(ofconn, mp_req); + return NULL; + } + + if (!mp_req) { + mp_req = ofmp_req_create(); + hmap_insert(&ofconn->mp_reqs, &mp_req->hmap_node, hash); + mp_req->front = part; + } else { + list_push_back(&mp_req->front->list_node, &part->list_node); + } + + ofconn->mp_req_count++; + + return mp_req; +} + +/* Disconnect the component messages from the multipart request buffer + * 'mp_req' that belongs to 'ofconn' and free memory associated with + * 'mp_req'. Returns the front message of the multipart request. + * + * This should be called once all components of a buffered multipart + * request have been received. */ +static struct ofpbuf * +ofmp_req_finished(struct ofconn *ofconn, struct ofmp_req *mp_req) + OVS_REQUIRES(ofproto_mutex) +{ + struct ofpbuf *front; + + front = mp_req->front; + mp_req->front = NULL; + ofmp_req_destroy(ofconn, mp_req); + + return front; +} + +/* Filter 'msg' received for 'ofconn' and: + * - Return '0' if 'msg' is not part of a multipart request, is + * part of a multipart request that may not be buffered that may be + * buffered, or the last part of a multipart request with only one + * component message. + * + * 'msg' will be updated to point to a message that may be processed + * by the caller. This may differer from the value of 'msg'. The + * caller should only use the new value after. + * + * - Return 'EAGAIN' if 'msg' is part of a multipart request + * that may be buffered but is not the last part of the request. + * + * This indicates to the caller that no further processing should + * occur at this time. + * + * - Otherwise a positive error value is returned. */ +static int +ofmp_req_filter(struct ofconn *ofconn, struct ofpbuf **msg) + OVS_EXCLUDED(ofproto_mutex) +{ + const struct ofp_header *oh = ofpbuf_data(*msg); + uint32_t hash = ofmp_req_hash(oh->xid); + struct ofmp_req *mp_req; + enum ofptype type; + enum ofperr error; + int retval = 0; + bool more; + + /* If the message is not a multipart request then + * there is nothing to do here. */ + if (!ofpmsg_is_mp_request(oh)) { + goto out; + } + + /* If the type could not be decoded there is nothing more to do here, + * an error will later be returned by handle_openflow__(). */ + error = ofptype_decode(&type, oh); + if (error) { + goto out; + } + + /* If the message is not allowed to have multiple parts then + * there is nothing to do here. An error will later be returned + * by handle_openflow__() if there are multiple parts. */ + if (!ofpmsg_may_buffer_mp_request(type)) { + goto out; + } + + more = ofpmp_more(oh); + + ovs_mutex_lock(&ofproto_mutex); + /* Find any previous parts of the multipart message */ + mp_req = ofmp_req_find(ofconn, oh->xid, hash); + + if (!mp_req && !more) { + /* Singleton multipart message, it can be processed as-is */ + goto unlock; + } + + /* Add this msg to the previous parts of the multipart message */ + mp_req = ofmp_req_add(ofconn, mp_req, *msg, hash); + if (!mp_req) { + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); + + COVERAGE_INC(mp_req_overflow); + VLOG_WARN_RL(&rl, "multipart request buffer overflow"); + + retval = OFPERR_OFPBRC_MULTIPART_BUFFER_OVERFLOW; + goto unlock; + } + + /* More to come: leave the parts stored in its mp_req + * and do no further processing for now */ + if (more) { + retval = EAGAIN; + goto unlock; + } + + /* The last part. + * Return the first part which is linked to all the other parts + * via its list_node */ + *msg = ofmp_req_finished(ofconn, mp_req); + +unlock: + ovs_mutex_unlock(&ofproto_mutex); +out: + return retval; +} + static void ofconn_run(struct ofconn *ofconn, void (*handle_openflow)(struct ofconn *, @@ -1323,16 +1541,39 @@ ofconn_run(struct ofconn *ofconn, /* Limit the number of iterations to avoid starving other tasks. */ for (i = 0; i < 50 && ofconn_may_recv(ofconn); i++) { - struct ofpbuf *of_msg = rconn_recv(ofconn->rconn); + const struct ofp_header *oh; + struct ofpbuf *of_msg; + enum ofptype type; + int error; + + of_msg = rconn_recv(ofconn->rconn); if (!of_msg) { break; } + oh = ofpbuf_data(of_msg); + if (ofptype_decode(&type, oh)) { + ofpbuf_delete(of_msg); + continue; + } + + list_init(&of_msg->list_node); + + error = ofmp_req_filter(ofconn, &of_msg); + if (error) { + if (error != EAGAIN) { + ofpbuf_list_delete(&of_msg->list_node); + ofconn_send_error(ofconn, ofpbuf_data(of_msg), error); + ofpbuf_delete(of_msg); + } + continue; + } if (mgr->fail_open) { fail_open_maybe_recover(mgr->fail_open); } handle_openflow(ofconn, of_msg); + ofpbuf_list_delete(&of_msg->list_node); ofpbuf_delete(of_msg); } -- 2.0.0.rc2 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev