The netdev-dummy thus far only support passive connection. It will listen for incoming connections requests. This patch allows active stream to be configured as well.
This patch effectively allows a netdev-dummy port make point-to-point connection to another, without being a patch port. This feature will be useful in test cases for future commits. Signed-off-by: Andy Zhou <az...@nicira.com> --- lib/netdev-dummy.c | 338 ++++++++++++++++++++++++++++++++++--------------- tests/ofproto-dpif.at | 58 +++++++-- 2 files changed, 282 insertions(+), 114 deletions(-) diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index fd30454..856af1c 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -69,10 +69,15 @@ struct netdev_dummy { unsigned int change_seq OVS_GUARDED; int ifindex OVS_GUARDED; + /* Passive connections states. */ struct pstream *pstream OVS_GUARDED; struct dummy_stream *streams OVS_GUARDED; size_t n_streams OVS_GUARDED; + /* Active connection states. */ + struct dummy_stream astream OVS_GUARDED; + bool astream_connected OVS_GUARDED; + FILE *tx_pcap, *rx_pcap OVS_GUARDED; struct list rxes OVS_GUARDED; /* List of child "netdev_rx_dummy"s. */ @@ -118,105 +123,193 @@ netdev_rx_dummy_cast(const struct netdev_rx *rx) } static void -netdev_dummy_run(void) +dummy_stream_init(struct dummy_stream *s, struct stream *stream) { - struct netdev_dummy *dev; + int rxbuf_size = stream ? 2048 : 0; + s->stream = stream; + ofpbuf_init(&s->rxbuf, rxbuf_size); + list_init(&s->txq); +} - ovs_mutex_lock(&dummy_list_mutex); - LIST_FOR_EACH (dev, list_node, &dummy_list) { - size_t i; +static void +dummy_stream_wait(struct dummy_stream *s) +{ + stream_run_wait(s->stream); + if (!list_is_empty(&s->txq)) { + stream_send_wait(s->stream); + } + stream_recv_wait(s->stream); +} - ovs_mutex_lock(&dev->mutex); +static void +dummy_stream_send(struct dummy_stream *s, const void *buffer, size_t size) +{ + if (list_size(&s->txq) < NETDEV_DUMMY_MAX_QUEUE) { + struct ofpbuf *b; - if (dev->pstream) { - struct stream *new_stream; - int error; + b = ofpbuf_clone_data_with_headroom(buffer, size, 2); + put_unaligned_be16(ofpbuf_push_uninit(b, 2), htons(size)); + list_push_back(&s->txq, &b->list_node); + } +} - error = pstream_accept(dev->pstream, &new_stream); - if (!error) { - struct dummy_stream *s; - - dev->streams = xrealloc(dev->streams, - ((dev->n_streams + 1) - * sizeof *dev->streams)); - s = &dev->streams[dev->n_streams++]; - s->stream = new_stream; - ofpbuf_init(&s->rxbuf, 2048); - list_init(&s->txq); - } else if (error != EAGAIN) { - VLOG_WARN("%s: accept failed (%s)", - pstream_get_name(dev->pstream), ovs_strerror(error)); - pstream_close(dev->pstream); - dev->pstream = NULL; +static int +dummy_stream_run__(struct netdev_dummy *dev, struct dummy_stream* s) +{ + int error = 0; + size_t n; + + stream_run(s->stream); + + if (!list_is_empty(&s->txq)) { + struct ofpbuf *txbuf; + int retval; + + txbuf = ofpbuf_from_list(list_front(&s->txq)); + retval = stream_send(s->stream, txbuf->data, txbuf->size); + if (retval > 0) { + ofpbuf_pull(txbuf, retval); + if (!txbuf->size) { + list_remove(&txbuf->list_node); + ofpbuf_delete(txbuf); } + } else if (retval != -EAGAIN) { + error = -retval; } + } - for (i = 0; i < dev->n_streams; i++) { - struct dummy_stream *s = &dev->streams[i]; - int error = 0; - size_t n; - - stream_run(s->stream); - - if (!list_is_empty(&s->txq)) { - struct ofpbuf *txbuf; - int retval; - - txbuf = ofpbuf_from_list(list_front(&s->txq)); - retval = stream_send(s->stream, txbuf->data, txbuf->size); - if (retval > 0) { - ofpbuf_pull(txbuf, retval); - if (!txbuf->size) { - list_remove(&txbuf->list_node); - ofpbuf_delete(txbuf); - } - } else if (retval != -EAGAIN) { - error = -retval; - } - } - - if (!error) { - if (s->rxbuf.size < 2) { - n = 2 - s->rxbuf.size; - } else { - uint16_t frame_len; - - frame_len = ntohs(get_unaligned_be16(s->rxbuf.data)); - if (frame_len < ETH_HEADER_LEN) { - error = EPROTO; - n = 0; - } else { - n = (2 + frame_len) - s->rxbuf.size; - } - } + if (!error) { + if (s->rxbuf.size < 2) { + n = 2 - s->rxbuf.size; + } else { + uint16_t frame_len; + + frame_len = ntohs(get_unaligned_be16(s->rxbuf.data)); + if (frame_len < ETH_HEADER_LEN) { + error = EPROTO; + n = 0; + } else { + n = (2 + frame_len) - s->rxbuf.size; } - if (!error) { - int retval; - - ofpbuf_prealloc_tailroom(&s->rxbuf, n); - retval = stream_recv(s->stream, ofpbuf_tail(&s->rxbuf), n); - if (retval > 0) { - s->rxbuf.size += retval; - if (retval == n && s->rxbuf.size > 2) { - ofpbuf_pull(&s->rxbuf, 2); - netdev_dummy_queue_packet(dev, - ofpbuf_clone(&s->rxbuf)); - ofpbuf_clear(&s->rxbuf); - } - } else if (retval != -EAGAIN) { - error = (retval < 0 ? -retval - : s->rxbuf.size ? EPROTO - : EOF); - } + } + } + if (!error) { + int retval; + + ofpbuf_prealloc_tailroom(&s->rxbuf, n); + retval = stream_recv(s->stream, ofpbuf_tail(&s->rxbuf), n); + if (retval > 0) { + s->rxbuf.size += retval; + if (retval == n && s->rxbuf.size > 2) { + ofpbuf_pull(&s->rxbuf, 2); + netdev_dummy_queue_packet(dev, + ofpbuf_clone(&s->rxbuf)); + ofpbuf_clear(&s->rxbuf); } + } else if (retval != -EAGAIN) { + error = (retval < 0 ? -retval + : s->rxbuf.size ? EPROTO + : EOF); + } + } - if (error) { + return error; +} + +static void +netdev_dummy_passive_stream_run__(struct netdev_dummy *dev) + OVS_REQUIRES(dev->mutex) +{ + struct stream *new_stream; + int error; + size_t i; + + error = pstream_accept(dev->pstream, &new_stream); + if (!error) { + struct dummy_stream *s; + + dev->streams = xrealloc(dev->streams, + ((dev->n_streams + 1) + * sizeof *dev->streams)); + s = &dev->streams[dev->n_streams++]; + dummy_stream_init(s, new_stream); + } else if (error != EAGAIN) { + VLOG_WARN("%s: accept failed (%s)", + pstream_get_name(dev->pstream), ovs_strerror(error)); + pstream_close(dev->pstream); + dev->pstream = NULL; + } + + for (i = 0; i < dev->n_streams; i++) { + struct dummy_stream *s = &dev->streams[i]; + + error = dummy_stream_run__(dev, s); + if (error) { + VLOG_DBG("%s: closing connection (%s)", + stream_get_name(s->stream), + ovs_retval_to_string(error)); + dummy_stream_close(&dev->streams[i]); + dev->streams[i] = dev->streams[--dev->n_streams]; + } + } +} + +static void +netdev_dummy_stream_run__(struct netdev_dummy *dev) + OVS_REQUIRES(dev->mutex) +{ + + if (!dev->astream_connected) { + int err=stream_connect(dev->astream.stream); + + switch (err) { + case 0: /* Connected. */ + dev->astream_connected = true; + VLOG_DBG("%s: connected", + stream_get_name(dev->astream.stream)); + break; + + case EAGAIN: + return; + + default: VLOG_DBG("%s: closing connection (%s)", - stream_get_name(s->stream), - ovs_retval_to_string(error)); - dummy_stream_close(&dev->streams[i]); - dev->streams[i] = dev->streams[--dev->n_streams]; - } + stream_get_name(dev->astream.stream), + ovs_strerror(err)); + + stream_close(dev->astream.stream); + dev->astream.stream = NULL; + return; + } + } + + if (dev->astream_connected) { + int error; + error = dummy_stream_run__(dev, &dev->astream); + if (error) { + VLOG_DBG("%s: closing active connection (%s)", + stream_get_name(dev->astream.stream), + ovs_retval_to_string(error)); + dummy_stream_close(&dev->astream); + dev->astream_connected = false; + } + } +} + +static void +netdev_dummy_run(void) +{ + struct netdev_dummy *dev; + + ovs_mutex_lock(&dummy_list_mutex); + LIST_FOR_EACH (dev, list_node, &dummy_list) { + + ovs_mutex_lock(&dev->mutex); + + if (dev->pstream) { + netdev_dummy_passive_stream_run__(dev); + } else if (dev->astream.stream) { + netdev_dummy_stream_run__(dev); } ovs_mutex_unlock(&dev->mutex); @@ -247,12 +340,10 @@ netdev_dummy_wait(void) } for (i = 0; i < dev->n_streams; i++) { struct dummy_stream *s = &dev->streams[i]; - - stream_run_wait(s->stream); - if (!list_is_empty(&s->txq)) { - stream_send_wait(s->stream); - } - stream_recv_wait(s->stream); + dummy_stream_wait(s); + } + if (dev->astream.stream) { + dummy_stream_wait(&dev->astream); } ovs_mutex_unlock(&dev->mutex); } @@ -292,6 +383,9 @@ netdev_dummy_construct(struct netdev *netdev_) netdev->streams = NULL; netdev->n_streams = 0; + dummy_stream_init(&netdev->astream, NULL); + netdev->astream_connected = false; + list_init(&netdev->rxes); ovs_mutex_unlock(&netdev->mutex); @@ -313,11 +407,15 @@ netdev_dummy_destruct(struct netdev *netdev_) ovs_mutex_unlock(&dummy_list_mutex); ovs_mutex_lock(&netdev->mutex); + pstream_close(netdev->pstream); for (i = 0; i < netdev->n_streams; i++) { dummy_stream_close(&netdev->streams[i]); } free(netdev->streams); + + dummy_stream_close(&netdev->astream); + ovs_mutex_unlock(&netdev->mutex); ovs_mutex_destroy(&netdev->mutex); } @@ -345,6 +443,10 @@ netdev_dummy_get_config(const struct netdev *netdev_, struct smap *args) smap_add(args, "pstream", pstream_get_name(netdev->pstream)); } + if (netdev->astream.stream) { + smap_add(args, "stream", stream_get_name(netdev->astream.stream)); + } + ovs_mutex_unlock(&netdev->mutex); return 0; } @@ -353,7 +455,7 @@ static int netdev_dummy_set_config(struct netdev *netdev_, const struct smap *args) { struct netdev_dummy *netdev = netdev_dummy_cast(netdev_); - const char *pstream; + const char *pstream, *stream; const char *pcap; ovs_mutex_lock(&netdev->mutex); @@ -377,6 +479,39 @@ netdev_dummy_set_config(struct netdev *netdev_, const struct smap *args) } } + stream = smap_get(args, "stream"); + if(pstream + || !stream + || !netdev->astream.stream + || strcmp(stream_get_name(netdev->astream.stream), stream)) { + dummy_stream_close(&netdev->astream); + netdev->astream_connected = false; + + if (stream && !pstream) { + int error; + struct stream *active_stream; + + error = stream_open(stream, &active_stream, DSCP_DEFAULT); + dummy_stream_init(&netdev->astream, active_stream); + + switch (error) { + case 0: + netdev->astream_connected = true; + VLOG_INFO("%s: connected", stream_get_name(active_stream)); + break; + + case EAGAIN: + break; + + default: + VLOG_WARN("%s: open failed (%s)", stream, + ovs_strerror(error)); + stream_close(active_stream); + break; + } + } + } + if (netdev->rx_pcap) { fclose(netdev->rx_pcap); } @@ -546,17 +681,16 @@ netdev_dummy_send(struct netdev *netdev, const void *buffer, size_t size) fflush(dev->tx_pcap); } - for (i = 0; i < dev->n_streams; i++) { - struct dummy_stream *s = &dev->streams[i]; - - if (list_size(&s->txq) < NETDEV_DUMMY_MAX_QUEUE) { - struct ofpbuf *b; + if (dev->pstream) { + for (i = 0; i < dev->n_streams; i++) { + struct dummy_stream *s = &dev->streams[i]; - b = ofpbuf_clone_data_with_headroom(buffer, size, 2); - put_unaligned_be16(ofpbuf_push_uninit(b, 2), htons(size)); - list_push_back(&s->txq, &b->list_node); + dummy_stream_send(s, buffer, size); } + } else if (dev->astream_connected) { + dummy_stream_send(&dev->astream, buffer, size); } + ovs_mutex_unlock(&dev->mutex); return 0; diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at index 4d8d460..7cf10ab 100644 --- a/tests/ofproto-dpif.at +++ b/tests/ofproto-dpif.at @@ -1,5 +1,51 @@ AT_BANNER([ofproto-dpif]) +# Strips out uninteresting parts of flow output, as well as parts +# that vary from one run to another (e.g., timing and bond actions). +m4_define([STRIP_USED], [[sed ' + s/used:[0-9]*\.[0-9]*/used:0.0/ +' | sort]]) +m4_define([STRIP_XOUT], [[sed ' + s/used:[0-9]*\.[0-9]*/used:0.0/ + s/actions:.*/actions: <del>/ + s/packets:[0-9]*/packets:0/ + s/bytes:[0-9]*/bytes:0/ +' | sort]]) + +AT_SETUP([ofproto-dpif - dummy interface]) +# Create br0 with interfaces p1 and p7 +# and br1 with interfaces p2 and p8 +# with p1 and p2 connected via unix domain socket +OVS_VSWITCHD_START( + [add-port br0 p1 -- set interface p1 type=dummy options:pstream=punix:$OVS_RUNDIR/p0.sock ofport_request=1 -- \ + add-port br0 p7 -- set interface p7 ofport_request=7 type=dummy -- \ + add-br br1 -- \ + set bridge br1 other-config:hwaddr=aa:66:aa:66:00:00 -- \ + set bridge br1 datapath-type=dummy other-config:datapath-id=1234 \ + fail-mode=secure -- \ + add-port br1 p2 -- set interface p2 type=dummy options:stream=unix:$OVS_RUNDIR/p0.sock ofport_request=2 -- \ + add-port br1 p8 -- set interface p8 ofport_request=8 type=dummy --]) + +AT_CHECK([ovs-ofctl add-flow br0 action=normal]) +AT_CHECK([ovs-ofctl add-flow br1 action=normal]) +ovs-appctl time/stop +ovs-appctl time/warp 5000 +AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)']) +AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)']) +ovs-appctl time/warp 100 + +AT_CHECK([ovs-appctl dpif/dump-flows br0 | STRIP_XOUT], [0], [dnl +skb_priority(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3/0.0.0.0,dst=10.0.0.4/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del> +skb_priority(0),in_port(7),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2/0.0.0.0,dst=10.0.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del> +]) + +AT_CHECK([ovs-appctl dpif/dump-flows br1 | STRIP_XOUT], [0], [dnl +skb_priority(0),in_port(2),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2/0.0.0.0,dst=10.0.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del> +skb_priority(0),in_port(8),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3/0.0.0.0,dst=10.0.0.4/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del> +]) +OVS_VSWITCHD_STOP +AT_CLEANUP + AT_SETUP([ofproto-dpif - resubmit]) OVS_VSWITCHD_START ADD_OF_PORTS([br0], [1], [10], [11], [12], [13], [14], [15], @@ -2527,18 +2573,6 @@ AT_CLEANUP dnl ---------------------------------------------------------------------- AT_BANNER([ofproto-dpif -- megaflows]) -# Strips out uninteresting parts of megaflow output, as well as parts -# that vary from one run to another (e.g., timing and bond actions). -m4_define([STRIP_USED], [[sed ' - s/used:[0-9]*\.[0-9]*/used:0.0/ -' | sort]]) -m4_define([STRIP_XOUT], [[sed ' - s/used:[0-9]*\.[0-9]*/used:0.0/ - s/actions:.*/actions: <del>/ - s/packets:[0-9]*/packets:0/ - s/bytes:[0-9]*/bytes:0/ -' | sort]]) - AT_SETUP([ofproto-dpif megaflow - port classification]) OVS_VSWITCHD_START ADD_OF_PORTS([br0], [1], [2]) -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev