> -----Original Message-----
> From: Lukas Straub <lukasstra...@web.de>
> Sent: Monday, April 27, 2020 5:19 AM
> To: qemu-devel <qemu-devel@nongnu.org>
> Cc: Zhang, Chen <chen.zh...@intel.com>; Li Zhijian
> <lizhij...@cn.fujitsu.com>; Jason Wang <jasow...@redhat.com>; Marc-
> André Lureau <marcandre.lur...@redhat.com>; Paolo Bonzini
> <pbonz...@redhat.com>
> Subject: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
>
> The chr_out chardev is connected to a filter-redirector running in the main
> loop. qemu_chr_fe_write_all might block here in compare_chr_send if the
> (socket-)buffer is full.
> If another filter-redirector in the main loop want's to send data to
> chr_pri_in
> it might also block if the buffer is full. This leads to a deadlock because
> both
> event loops get blocked.
>
> Fix this by converting compare_chr_send to a coroutine and putting the
> packets in a send queue. Also create a new function notify_chr_send, since
> that should be independend.
>
> Signed-off-by: Lukas Straub <lukasstra...@web.de>
> ---
> net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++-------
> ----
> 1 file changed, 130 insertions(+), 43 deletions(-)
>
> diff --git a/net/colo-compare.c b/net/colo-compare.c index
> 1de4220fe2..ff6a740284 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -32,6 +32,9 @@
> #include "migration/migration.h"
> #include "util.h"
>
> +#include "block/aio-wait.h"
> +#include "qemu/coroutine.h"
> +
> #define TYPE_COLO_COMPARE "colo-compare"
> #define COLO_COMPARE(obj) \
> OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6
> +80,20 @@ static int event_unhandled_count;
> * |packet | |packet + |packet | |packet +
> * +--------+ +--------+ +--------+ +--------+
> */
> +
> +typedef struct SendCo {
> + Coroutine *co;
> + GQueue send_list;
> + bool done;
> + int ret;
> +} SendCo;
> +
> +typedef struct SendEntry {
> + uint32_t size;
> + uint32_t vnet_hdr_len;
> + uint8_t buf[];
> +} SendEntry;
> +
> typedef struct CompareState {
> Object parent;
>
> @@ -91,6 +108,7 @@ typedef struct CompareState {
> SocketReadState pri_rs;
> SocketReadState sec_rs;
> SocketReadState notify_rs;
> + SendCo sendco;
> bool vnet_hdr;
> uint32_t compare_timeout;
> uint32_t expired_scan_cycle;
> @@ -126,8 +144,11 @@ enum {
> static int compare_chr_send(CompareState *s,
> const uint8_t *buf,
> uint32_t size,
> - uint32_t vnet_hdr_len,
> - bool notify_remote_frame);
> + uint32_t vnet_hdr_len);
> +
> +static int notify_chr_send(CompareState *s,
> + const uint8_t *buf,
> + uint32_t size);
>
> static bool packet_matches_str(const char *str,
> const uint8_t *buf, @@ -145,7 +166,7 @@
> static void
> notify_remote_frame(CompareState *s)
> char msg[] = "DO_CHECKPOINT";
> int ret = 0;
>
> - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> + ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> if (ret < 0) {
> error_report("Notify Xen COLO-frame failed");
> }
> @@ -271,8 +292,7 @@ static void colo_release_primary_pkt(CompareState
> *s, Packet *pkt)
> ret = compare_chr_send(s,
> pkt->data,
> pkt->size,
> - pkt->vnet_hdr_len,
> - false);
> + pkt->vnet_hdr_len);
> if (ret < 0) {
> error_report("colo send primary packet failed");
> }
> @@ -699,63 +719,123 @@ static void colo_compare_connection(void
> *opaque, void *user_data)
> }
> }
>
> -static int compare_chr_send(CompareState *s,
> - const uint8_t *buf,
> - uint32_t size,
> - uint32_t vnet_hdr_len,
> - bool notify_remote_frame)
> +static void coroutine_fn _compare_chr_send(void *opaque)
> {
> + CompareState *s = opaque;
> + SendCo *sendco = &s->sendco;
> int ret = 0;
> - uint32_t len = htonl(size);
>
> - if (!size) {
> - return 0;
> - }
> + while (!g_queue_is_empty(&sendco->send_list)) {
> + SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> + uint32_t len = htonl(entry->size);
>
> - if (notify_remote_frame) {
> - ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> - (uint8_t *)&len,
> - sizeof(len));
> - } else {
> ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,
> sizeof(len));
> - }
>
> - if (ret != sizeof(len)) {
> - goto err;
> - }
> + if (ret != sizeof(len)) {
> + g_free(entry);
> + goto err;
> + }
>
> - if (s->vnet_hdr) {
> - /*
> - * We send vnet header len make other module(like filter-redirector)
> - * know how to parse net packet correctly.
> - */
> - len = htonl(vnet_hdr_len);
> + if (s->vnet_hdr) {
> + /*
> + * We send vnet header len make other module(like
> filter-redirector)
> + * know how to parse net packet correctly.
> + */
> + len = htonl(entry->vnet_hdr_len);
>
> - if (!notify_remote_frame) {
> ret = qemu_chr_fe_write_all(&s->chr_out,
> (uint8_t *)&len,
> sizeof(len));
> +
> + if (ret != sizeof(len)) {
> + g_free(entry);
> + goto err;
> + }
> }
>
> - if (ret != sizeof(len)) {
> + ret = qemu_chr_fe_write_all(&s->chr_out,
> + (uint8_t *)entry->buf,
> + entry->size);
> +
> + if (ret != entry->size) {
> + g_free(entry);
> goto err;
> }
> +
> + g_free(entry);
> }
>
> - if (notify_remote_frame) {
> - ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> - (uint8_t *)buf,
> - size);
> - } else {
> - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> + sendco->ret = 0;
> + goto out;
> +
> +err:
> + while (!g_queue_is_empty(&sendco->send_list)) {
> + SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> + g_free(entry);
> }
> + sendco->ret = ret < 0 ? ret : -EIO;
> +out:
> + sendco->co = NULL;
> + sendco->done = true;
> + aio_wait_kick();
> +}
> +
> +static int compare_chr_send(CompareState *s,
> + const uint8_t *buf,
> + uint32_t size,
> + uint32_t vnet_hdr_len) {
> + SendCo *sendco = &s->sendco;
> + SendEntry *entry;
> +
> + if (!size) {
> + return 0;
> + }
> +
> + entry = g_malloc(sizeof(SendEntry) + size);
> + entry->size = size;
> + entry->vnet_hdr_len = vnet_hdr_len;
> + memcpy(entry->buf, buf, size);
> + g_queue_push_head(&sendco->send_list, entry);
> +
> + if (sendco->done) {
> + sendco->co = qemu_coroutine_create(_compare_chr_send, s);
> + sendco->done = false;
> + qemu_coroutine_enter(sendco->co);
> + if (sendco->done) {
> + /* report early errors */
> + return sendco->ret;
> + }
> + }
> +
> + /* assume success */
> + return 0;
> +}
> +
Why not make notify_chr_send same as compare_chr_send?
Thanks
Zhang Chen
> +static int notify_chr_send(CompareState *s,
> + const uint8_t *buf,
> + uint32_t size) {
> + int ret = 0;
> + uint32_t len = htonl(size);
> +
> + ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> + (uint8_t *)&len,
> + sizeof(len));
> +
> + if (ret != sizeof(len)) {
> + goto err;
> + }
> +
> + ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> + (uint8_t *)buf,
> + size);
>
> if (ret != size) {
> goto err;
> }
>
> return 0;
> -
> err:
> return ret < 0 ? ret : -EIO;
> }
> @@ -1062,8 +1142,7 @@ static void
> compare_pri_rs_finalize(SocketReadState *pri_rs)
> compare_chr_send(s,
> pri_rs->buf,
> pri_rs->packet_len,
> - pri_rs->vnet_hdr_len,
> - false);
> + pri_rs->vnet_hdr_len);
> } else {
> /* compare packet in the specified connection */
> colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@ static void
> compare_notify_rs_finalize(SocketReadState *notify_rs)
> if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
> notify_rs->buf,
> notify_rs->packet_len)) {
> - ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> + ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> if (ret < 0) {
> error_report("Notify Xen COLO-frame INIT failed");
> }
> @@ -1199,6 +1278,9 @@ static void colo_compare_complete(UserCreatable
> *uc, Error **errp)
>
> QTAILQ_INSERT_TAIL(&net_compares, s, next);
>
> + s->sendco.done = true;
> + g_queue_init(&s->sendco.send_list);
> +
> g_queue_init(&s->conn_list);
>
> qemu_mutex_init(&event_mtx);
> @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, void
> *user_data)
> compare_chr_send(s,
> pkt->data,
> pkt->size,
> - pkt->vnet_hdr_len,
> - false);
> + pkt->vnet_hdr_len);
> packet_destroy(pkt, NULL);
> }
> while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6
> +1362,11 @@ static void colo_compare_finalize(Object *obj)
> CompareState *s = COLO_COMPARE(obj);
> CompareState *tmp = NULL;
>
> + AioContext *ctx = iothread_get_aio_context(s->iothread);
> + aio_context_acquire(ctx);
> + AIO_WAIT_WHILE(ctx, !s->sendco.done);
> + aio_context_release(ctx);
> +
> qemu_chr_fe_deinit(&s->chr_pri_in, false);
> qemu_chr_fe_deinit(&s->chr_sec_in, false);
> qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@ static
> void colo_compare_finalize(Object *obj)
> g_queue_foreach(&s->conn_list, colo_flush_packets, s);
>
> g_queue_clear(&s->conn_list);
> + g_queue_clear(&s->sendco.send_list);
>
> if (s->connection_track_table) {
> g_hash_table_destroy(s->connection_track_table);
> --
> 2.20.1