> -----Original Message-----
> From: Lukas Straub <lukasstra...@web.de>
> Sent: Friday, May 8, 2020 2:08 PM
> To: Zhang, Chen <chen.zh...@intel.com>
> Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> <lizhij...@cn.fujitsu.com>; Jason Wang <jasow...@redhat.com>; Marc-
> André Lureau <marcandre.lur...@redhat.com>; Paolo Bonzini
> <pbonz...@redhat.com>
> Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
>
> On Fri, 8 May 2020 02:19:00 +0000
> "Zhang, Chen" <chen.zh...@intel.com> wrote:
> > > > No need to init the notify_sendco each time, because the notify
> > > > dev just
> > > an optional parameter.
> > > > You can use the if (s->notify_dev) here. Just Xen use the
> chr_notify_dev.
> > >
> > > Ok, I will change that and the code below in the next version.
> > >
> > > > Overall, make the chr_send job to coroutine is a good idea. It
> > > > looks good
> > > for me.
> > > > And your patch inspired me, it looks we can re-use the
> > > > compare_chr_send
> > > code on filter mirror/redirector too.
> > >
> > > I already have patch for that, but I don't think it is a good idea,
> > > because the guest then can send packets faster than colo-compare can
> > > process. This leads bufferbloat and the performance drops in my tests:
> > > Client-to-server tcp:
> > > without patch: ~66 Mbit/s
> > > with patch: ~59 Mbit/s
> > > Server-to-client tcp:
> > > without patch: ~702 Kbit/s
> > > with patch: ~328 Kbit/s
> >
> > Oh, a big performance drop, is that caused by memcpy/zero_copy parts ?
> >
> > Thanks
> > Zhang Chen
>
> No, there is no memcpy overhead with this patch, see below.
I means for the filter mirror/redirector parts why coroutine will lead huge
performance drop?
Thanks
Zhang Chen
>
> Regards,
> Lukas Straub
>
> ---
> net/filter-mirror.c | 142 +++++++++++++++++++++++++++++++++-----------
> 1 file changed, 106 insertions(+), 36 deletions(-)
>
> diff --git a/net/filter-mirror.c b/net/filter-mirror.c index
> d83e815545..6bcd317502 100644
> --- a/net/filter-mirror.c
> +++ b/net/filter-mirror.c
> @@ -20,6 +20,8 @@
> #include "chardev/char-fe.h"
> #include "qemu/iov.h"
> #include "qemu/sockets.h"
> +#include "block/aio-wait.h"
> +#include "qemu/coroutine.h"
>
> #define FILTER_MIRROR(obj) \
> OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR) @@ -31,6
> +33,18 @@ #define TYPE_FILTER_REDIRECTOR "filter-redirector"
> #define REDIRECTOR_MAX_LEN NET_BUFSIZE
>
> +typedef struct SendCo {
> + Coroutine *co;
> + GQueue send_list;
> + bool done;
> + int ret;
> +} SendCo;
> +
> +typedef struct SendEntry {
> + ssize_t size;
> + uint8_t buf[];
> +} SendEntry;
> +
> typedef struct MirrorState {
> NetFilterState parent_obj;
> char *indev;
> @@ -38,59 +52,101 @@ typedef struct MirrorState {
> CharBackend chr_in;
> CharBackend chr_out;
> SocketReadState rs;
> + SendCo sendco;
> bool vnet_hdr;
> } MirrorState;
>
> -static int filter_send(MirrorState *s,
> - const struct iovec *iov,
> - int iovcnt)
> +static void coroutine_fn _filter_send(void *opaque)
> {
> + MirrorState *s = opaque;
> + SendCo *sendco = &s->sendco;
> NetFilterState *nf = NETFILTER(s);
> int ret = 0;
> - ssize_t size = 0;
> - uint32_t len = 0;
> - char *buf;
> -
> - size = iov_size(iov, iovcnt);
> - if (!size) {
> - return 0;
> - }
>
> - len = htonl(size);
> - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> - if (ret != sizeof(len)) {
> - goto err;
> - }
> + while (!g_queue_is_empty(&sendco->send_list)) {
> + SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> + uint32_t len = htonl(entry->size);
>
> - if (s->vnet_hdr) {
> - /*
> - * If vnet_hdr = on, we send vnet header len to make other
> - * module(like colo-compare) know how to parse net
> - * packet correctly.
> - */
> - ssize_t vnet_hdr_len;
> + ret = qemu_chr_fe_write_all(&s->chr_out,
> + (uint8_t *)&len,
> + sizeof(len));
> + if (ret != sizeof(len)) {
> + g_free(entry);
> + goto err;
> + }
>
> - vnet_hdr_len = nf->netdev->vnet_hdr_len;
> + if (s->vnet_hdr) {
> + /*
> + * If vnet_hdr = on, we send vnet header len to make other
> + * module(like colo-compare) know how to parse net
> + * packet correctly.
> + */
> +
> + len = htonl(nf->netdev->vnet_hdr_len);
> + ret = qemu_chr_fe_write_all(&s->chr_out,
> + (uint8_t *)&len,
> + sizeof(len));
> + if (ret != sizeof(len)) {
> + g_free(entry);
> + goto err;
> + }
> + }
>
> - len = htonl(vnet_hdr_len);
> - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,
> sizeof(len));
> - if (ret != sizeof(len)) {
> + ret = qemu_chr_fe_write_all(&s->chr_out,
> + (uint8_t *)entry->buf,
> + entry->size);
> + if (ret != entry->size) {
> + g_free(entry);
> goto err;
> }
> - }
>
> - buf = g_malloc(size);
> - iov_to_buf(iov, iovcnt, 0, buf, size);
> - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> - g_free(buf);
> - if (ret != size) {
> - goto err;
> + g_free(entry);
> }
>
> - return 0;
> + sendco->ret = 0;
> + goto out;
>
> err:
> - return ret < 0 ? ret : -EIO;
> + while (!g_queue_is_empty(&sendco->send_list)) {
> + SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> + g_free(entry);
> + }
> + sendco->ret = ret < 0 ? ret : -EIO;
> +out:
> + sendco->co = NULL;
> + sendco->done = true;
> + aio_wait_kick();
> +}
> +
> +static int filter_send(MirrorState *s,
> + const struct iovec *iov,
> + int iovcnt)
> +{
> + SendCo *sendco = &s->sendco;
> + SendEntry *entry;
> +
> + ssize_t size = iov_size(iov, iovcnt);
> + if (!size) {
> + return 0;
> + }
> +
> + entry = g_malloc(sizeof(SendEntry) + size);
> + entry->size = size;
> + iov_to_buf(iov, iovcnt, 0, entry->buf, size);
> + g_queue_push_head(&sendco->send_list, entry);
> +
> + if (sendco->done) {
> + sendco->co = qemu_coroutine_create(_filter_send, s);
> + sendco->done = false;
> + qemu_coroutine_enter(sendco->co);
> + if (sendco->done) {
> + /* report early errors */
> + return sendco->ret;
> + }
> + }
> +
> + /* assume success */
> + return 0;
> }
>
> static void redirector_to_filter(NetFilterState *nf, @@ -194,6 +250,10 @@
> static void filter_mirror_cleanup(NetFilterState *nf) {
> MirrorState *s = FILTER_MIRROR(nf);
>
> + AIO_WAIT_WHILE(NULL, !s->sendco.done);
> +
> + g_queue_clear(&s->sendco.send_list);
> +
> qemu_chr_fe_deinit(&s->chr_out, false); }
>
> @@ -201,6 +261,10 @@ static void filter_redirector_cleanup(NetFilterState
> *nf) {
> MirrorState *s = FILTER_REDIRECTOR(nf);
>
> + AIO_WAIT_WHILE(NULL, !s->sendco.done);
> +
> + g_queue_clear(&s->sendco.send_list);
> +
> qemu_chr_fe_deinit(&s->chr_in, false);
> qemu_chr_fe_deinit(&s->chr_out, false); } @@ -224,6 +288,9 @@ static
> void filter_mirror_setup(NetFilterState *nf, Error **errp)
> }
>
> qemu_chr_fe_init(&s->chr_out, chr, errp);
> +
> + s->sendco.done = true;
> + g_queue_init(&s->sendco.send_list);
> }
>
> static void redirector_rs_finalize(SocketReadState *rs) @@ -281,6 +348,9
> @@ static void filter_redirector_setup(NetFilterState *nf, Error **errp)
> return;
> }
> }
> +
> + s->sendco.done = true;
> + g_queue_init(&s->sendco.send_list);
> }
>
> static void filter_mirror_class_init(ObjectClass *oc, void *data)
> --
> 2.20.1