diff mbox series

[v4,3/6] net/colo-compare.c: Fix deadlock in compare_chr_send

Message ID 5536749ec95380f18c01789c472324565c060bcc.1588587700.git.lukasstraub2@web.de
State New
Headers show
Series colo-compare bugfixes | expand

Commit Message

Lukas Straub May 4, 2020, 10:28 a.m. UTC
The chr_out chardev is connected to a filter-redirector
running in the main loop. qemu_chr_fe_write_all might block
here in compare_chr_send if the (socket-)buffer is full.
If another filter-redirector in the main loop want's to
send data to chr_pri_in it might also block if the buffer
is full. This leads to a deadlock because both event loops
get blocked.

Fix this by converting compare_chr_send to a coroutine and
putting the packets in a send queue.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 net/colo-compare.c | 187 ++++++++++++++++++++++++++++++++++-----------
 net/colo.c         |   7 ++
 net/colo.h         |   1 +
 3 files changed, 150 insertions(+), 45 deletions(-)

Comments

Zhang, Chen May 7, 2020, 11 a.m. UTC | #1
> -----Original Message-----
> From: Lukas Straub <lukasstraub2@web.de>
> Sent: Monday, May 4, 2020 6:28 PM
> To: qemu-devel <qemu-devel@nongnu.org>
> Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian 
> <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc- 
> André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini 
> <pbonzini@redhat.com>
> Subject: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in 
> compare_chr_send
> 
> The chr_out chardev is connected to a filter-redirector running in the 
> main loop. qemu_chr_fe_write_all might block here in compare_chr_send 
> if the (socket-)buffer is full.
> If another filter-redirector in the main loop want's to send data to 
> chr_pri_in it might also block if the buffer is full. This leads to a 
> deadlock because both event loops get blocked.
> 
> Fix this by converting compare_chr_send to a coroutine and putting the 
> packets in a send queue.
> 
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> ---
>  net/colo-compare.c | 187 ++++++++++++++++++++++++++++++++++-------
> ----
>  net/colo.c         |   7 ++
>  net/colo.h         |   1 +
>  3 files changed, 150 insertions(+), 45 deletions(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c index 
> 1de4220fe2..2a4e7f7c4e 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -32,6 +32,9 @@
>  #include "migration/migration.h"
>  #include "util.h"
> 
> +#include "block/aio-wait.h"
> +#include "qemu/coroutine.h"
> +
>  #define TYPE_COLO_COMPARE "colo-compare"
>  #define COLO_COMPARE(obj) \
>      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6
> +80,23 @@ static int event_unhandled_count;
>   *                    |packet  |  |packet  +    |packet  | |packet  +
>   *                    +--------+  +--------+    +--------+ +--------+
>   */
> +
> +typedef struct SendCo {
> +    Coroutine *co;
> +    struct CompareState *s;
> +    CharBackend *chr;
> +    GQueue send_list;
> +    bool notify_remote_frame;
> +    bool done;
> +    int ret;
> +} SendCo;
> +
> +typedef struct SendEntry {
> +    uint32_t size;
> +    uint32_t vnet_hdr_len;
> +    uint8_t *buf;
> +} SendEntry;
> +
>  typedef struct CompareState {
>      Object parent;
> 
> @@ -91,6 +111,8 @@ typedef struct CompareState {
>      SocketReadState pri_rs;
>      SocketReadState sec_rs;
>      SocketReadState notify_rs;
> +    SendCo out_sendco;
> +    SendCo notify_sendco;
>      bool vnet_hdr;
>      uint32_t compare_timeout;
>      uint32_t expired_scan_cycle;
> @@ -124,10 +146,11 @@ enum {
> 
> 
>  static int compare_chr_send(CompareState *s,
> -                            const uint8_t *buf,
> +                            uint8_t *buf,
>                              uint32_t size,
>                              uint32_t vnet_hdr_len,
> -                            bool notify_remote_frame);
> +                            bool notify_remote_frame,
> +                            bool zero_copy);
> 
>  static bool packet_matches_str(const char *str,
>                                 const uint8_t *buf, @@ -145,7 +168,7 
> @@ static void notify_remote_frame(CompareState *s)
>      char msg[] = "DO_CHECKPOINT";
>      int ret = 0;
> 
> -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> +    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, 
> + false);
>      if (ret < 0) {
>          error_report("Notify Xen COLO-frame failed");
>      }
> @@ -272,12 +295,13 @@ static void
> colo_release_primary_pkt(CompareState *s, Packet *pkt)
>                             pkt->data,
>                             pkt->size,
>                             pkt->vnet_hdr_len,
> -                           false);
> +                           false,
> +                           true);
>      if (ret < 0) {
>          error_report("colo send primary packet failed");
>      }
>      trace_colo_compare_main("packet same and release packet");
> -    packet_destroy(pkt, NULL);
> +    packet_destroy_partial(pkt, NULL);
>  }
> 
>  /*
> @@ -699,65 +723,115 @@ static void colo_compare_connection(void 
> *opaque, void *user_data)
>      }
>  }
> 
> -static int compare_chr_send(CompareState *s,
> -                            const uint8_t *buf,
> -                            uint32_t size,
> -                            uint32_t vnet_hdr_len,
> -                            bool notify_remote_frame)
> +static void coroutine_fn _compare_chr_send(void *opaque)
>  {
> +    SendCo *sendco = opaque;
> +    CompareState *s = sendco->s;
>      int ret = 0;
> -    uint32_t len = htonl(size);
> 
> -    if (!size) {
> -        return 0;
> -    }
> +    while (!g_queue_is_empty(&sendco->send_list)) {
> +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> +        uint32_t len = htonl(entry->size);
> 
> -    if (notify_remote_frame) {
> -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> -                                    (uint8_t *)&len,
> -                                    sizeof(len));
> -    } else {
> -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> -    }
> +        ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, 
> + sizeof(len));
> 
> -    if (ret != sizeof(len)) {
> -        goto err;
> -    }
> +        if (ret != sizeof(len)) {
> +            g_free(entry->buf);
> +            g_slice_free(SendEntry, entry);
> +            goto err;
> +        }
> 
> -    if (s->vnet_hdr) {
> -        /*
> -         * We send vnet header len make other module(like filter-redirector)
> -         * know how to parse net packet correctly.
> -         */
> -        len = htonl(vnet_hdr_len);
> +        if (!sendco->notify_remote_frame && s->vnet_hdr) {
> +            /*
> +             * We send vnet header len make other module(like filter-redirector)
> +             * know how to parse net packet correctly.
> +             */
> +            len = htonl(entry->vnet_hdr_len);
> 
> -        if (!notify_remote_frame) {
> -            ret = qemu_chr_fe_write_all(&s->chr_out,
> +            ret = qemu_chr_fe_write_all(sendco->chr,
>                                          (uint8_t *)&len,
>                                          sizeof(len));
> +
> +            if (ret != sizeof(len)) {
> +                g_free(entry->buf);
> +                g_slice_free(SendEntry, entry);
> +                goto err;
> +            }
>          }
> 
> -        if (ret != sizeof(len)) {
> +        ret = qemu_chr_fe_write_all(sendco->chr,
> +                                    (uint8_t *)entry->buf,
> +                                    entry->size);
> +
> +        if (ret != entry->size) {
> +            g_free(entry->buf);
> +            g_slice_free(SendEntry, entry);
>              goto err;
>          }
> +
> +        g_free(entry->buf);
> +        g_slice_free(SendEntry, entry);
>      }
> 
> +    sendco->ret = 0;
> +    goto out;
> +
> +err:
> +    while (!g_queue_is_empty(&sendco->send_list)) {
> +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> +        g_free(entry->buf);
> +        g_slice_free(SendEntry, entry);
> +    }
> +    sendco->ret = ret < 0 ? ret : -EIO;
> +out:
> +    sendco->co = NULL;
> +    sendco->done = true;
> +    aio_wait_kick();
> +}
> +
> +static int compare_chr_send(CompareState *s,
> +                            uint8_t *buf,
> +                            uint32_t size,
> +                            uint32_t vnet_hdr_len,
> +                            bool notify_remote_frame,
> +                            bool zero_copy) {
> +    SendCo *sendco;
> +    SendEntry *entry;
> +
>      if (notify_remote_frame) {
> -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> -                                    (uint8_t *)buf,
> -                                    size);
> +        sendco = &s->notify_sendco;
>      } else {
> -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> +        sendco = &s->out_sendco;
>      }
> 
> -    if (ret != size) {
> -        goto err;
> +    if (!size) {
> +        return 0;
>      }
> 
> -    return 0;
> +    entry = g_slice_new(SendEntry);
> +    entry->size = size;
> +    entry->vnet_hdr_len = vnet_hdr_len;
> +    if (zero_copy) {
> +        entry->buf = buf;
> +    } else {
> +        entry->buf = g_malloc(size);
> +        memcpy(entry->buf, buf, size);
> +    }
> +    g_queue_push_head(&sendco->send_list, entry);
> +
> +    if (sendco->done) {
> +        sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
> +        sendco->done = false;
> +        qemu_coroutine_enter(sendco->co);
> +        if (sendco->done) {
> +            /* report early errors */
> +            return sendco->ret;
> +        }
> +    }
> 
> -err:
> -    return ret < 0 ? ret : -EIO;
> +    /* assume success */
> +    return 0;
>  }
> 
>  static int compare_chr_can_read(void *opaque) @@ -1063,6 +1137,7 @@ 
> static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>                           pri_rs->buf,
>                           pri_rs->packet_len,
>                           pri_rs->vnet_hdr_len,
> +                         false,
>                           false);
>      } else {
>          /* compare packet in the specified connection */ @@ -1093,7
> +1168,7 @@ static void compare_notify_rs_finalize(SocketReadState
> +*notify_rs)
>      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
>                             notify_rs->buf,
>                             notify_rs->packet_len)) {
> -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> +        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, 
> + true, false);
>          if (ret < 0) {
>              error_report("Notify Xen COLO-frame INIT failed");
>          }
> @@ -1199,6 +1274,18 @@ static void
> colo_compare_complete(UserCreatable *uc, Error **errp)
> 
>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> 
> +    s->out_sendco.s = s;
> +    s->out_sendco.chr = &s->chr_out;
> +    s->out_sendco.notify_remote_frame = false;
> +    s->out_sendco.done = true;
> +    g_queue_init(&s->out_sendco.send_list);
> +
> +    s->notify_sendco.s = s;
> +    s->notify_sendco.chr = &s->chr_notify_dev;
> +    s->notify_sendco.notify_remote_frame = true;
> +    s->notify_sendco.done = true;
> +    g_queue_init(&s->notify_sendco.send_list);
> +

No need to init the notify_sendco each time, because the notify dev just an optional parameter.
You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev.

Overall, make the chr_send job to coroutine is a good idea. It looks good for me.
And your patch inspired me, it looks we can re-use the compare_chr_send code on filter mirror/redirector too.

Tested-by: Zhang Chen <chen.zhang@intel.com>


>      g_queue_init(&s->conn_list);
> 
>      qemu_mutex_init(&event_mtx);
> @@ -1225,8 +1312,9 @@ static void colo_flush_packets(void *opaque, 
> void
> *user_data)
>                           pkt->data,
>                           pkt->size,
>                           pkt->vnet_hdr_len,
> -                         false);
> -        packet_destroy(pkt, NULL);
> +                         false,
> +                         true);
> +        packet_destroy_partial(pkt, NULL);
>      }
>      while (!g_queue_is_empty(&conn->secondary_list)) {
>          pkt = g_queue_pop_head(&conn->secondary_list);
> @@ -1301,10 +1389,19 @@ static void colo_compare_finalize(Object *obj)
>          }
>      }
> 
> +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> +    aio_context_acquire(ctx);
> +    AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
> +    AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);

Same as above.

> +    aio_context_release(ctx);
> +
>      /* Release all unhandled packets after compare thead exited */
>      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> +    AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
> 
>      g_queue_clear(&s->conn_list);
> +    g_queue_clear(&s->out_sendco.send_list);
> +    g_queue_clear(&s->notify_sendco.send_list);

Same as above.

> 
>      if (s->connection_track_table) {
>          g_hash_table_destroy(s->connection_track_table);
> diff --git a/net/colo.c b/net/colo.c
> index 8196b35837..a6c66d829a 100644
> --- a/net/colo.c
> +++ b/net/colo.c
> @@ -185,6 +185,13 @@ void packet_destroy(void *opaque, void *user_data)
>      g_slice_free(Packet, pkt);
>  }
> 
> +void packet_destroy_partial(void *opaque, void *user_data) {
> +    Packet *pkt = opaque;
> +
> +    g_slice_free(Packet, pkt);
> +}
> +
>  /*
>   * Clear hashtable, stop this hash growing really huge
>   */
> diff --git a/net/colo.h b/net/colo.h
> index 679314b1ca..573ab91785 100644
> --- a/net/colo.h
> +++ b/net/colo.h
> @@ -102,5 +102,6 @@ bool connection_has_tracked(GHashTable 
> *connection_track_table,  void connection_hashtable_reset(GHashTable
> *connection_track_table);  Packet *packet_new(const void *data, int 
> size, int vnet_hdr_len);  void packet_destroy(void *opaque, void 
> *user_data);
> +void packet_destroy_partial(void *opaque, void *user_data);
> 
>  #endif /* NET_COLO_H */
> --
> 2.20.1
Lukas Straub May 7, 2020, 3:51 p.m. UTC | #2
On Thu, 7 May 2020 11:00:26 +0000
"Zhang, Chen" <chen.zhang@intel.com> wrote:

> > -----Original Message-----
> > From: Lukas Straub <lukasstraub2@web.de>
> > Sent: Monday, May 4, 2020 6:28 PM
> > To: qemu-devel <qemu-devel@nongnu.org>
> > Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian 
> > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc- 
> > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini 
> > <pbonzini@redhat.com>
> > Subject: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in 
> > compare_chr_send
> > 
> > The chr_out chardev is connected to a filter-redirector running in the 
> > main loop. qemu_chr_fe_write_all might block here in compare_chr_send 
> > if the (socket-)buffer is full.
> > If another filter-redirector in the main loop want's to send data to 
> > chr_pri_in it might also block if the buffer is full. This leads to a 
> > deadlock because both event loops get blocked.
> > 
> > Fix this by converting compare_chr_send to a coroutine and putting the 
> > packets in a send queue.
> > 
> > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> > ---
> >  net/colo-compare.c | 187 ++++++++++++++++++++++++++++++++++-------
> > ----
> >  net/colo.c         |   7 ++
> >  net/colo.h         |   1 +
> >  3 files changed, 150 insertions(+), 45 deletions(-)
> > 
> > diff --git a/net/colo-compare.c b/net/colo-compare.c index 
> > 1de4220fe2..2a4e7f7c4e 100644
> > --- a/net/colo-compare.c
> > +++ b/net/colo-compare.c
> > @@ -32,6 +32,9 @@
> >  #include "migration/migration.h"
> >  #include "util.h"
> > 
> > +#include "block/aio-wait.h"
> > +#include "qemu/coroutine.h"
> > +
> >  #define TYPE_COLO_COMPARE "colo-compare"
> >  #define COLO_COMPARE(obj) \
> >      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6
> > +80,23 @@ static int event_unhandled_count;
> >   *                    |packet  |  |packet  +    |packet  | |packet  +
> >   *                    +--------+  +--------+    +--------+ +--------+
> >   */
> > +
> > +typedef struct SendCo {
> > +    Coroutine *co;
> > +    struct CompareState *s;
> > +    CharBackend *chr;
> > +    GQueue send_list;
> > +    bool notify_remote_frame;
> > +    bool done;
> > +    int ret;
> > +} SendCo;
> > +
> > +typedef struct SendEntry {
> > +    uint32_t size;
> > +    uint32_t vnet_hdr_len;
> > +    uint8_t *buf;
> > +} SendEntry;
> > +
> >  typedef struct CompareState {
> >      Object parent;
> > 
> > @@ -91,6 +111,8 @@ typedef struct CompareState {
> >      SocketReadState pri_rs;
> >      SocketReadState sec_rs;
> >      SocketReadState notify_rs;
> > +    SendCo out_sendco;
> > +    SendCo notify_sendco;
> >      bool vnet_hdr;
> >      uint32_t compare_timeout;
> >      uint32_t expired_scan_cycle;
> > @@ -124,10 +146,11 @@ enum {
> > 
> > 
> >  static int compare_chr_send(CompareState *s,
> > -                            const uint8_t *buf,
> > +                            uint8_t *buf,
> >                              uint32_t size,
> >                              uint32_t vnet_hdr_len,
> > -                            bool notify_remote_frame);
> > +                            bool notify_remote_frame,
> > +                            bool zero_copy);
> > 
> >  static bool packet_matches_str(const char *str,
> >                                 const uint8_t *buf, @@ -145,7 +168,7 
> > @@ static void notify_remote_frame(CompareState *s)
> >      char msg[] = "DO_CHECKPOINT";
> >      int ret = 0;
> > 
> > -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > +    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, 
> > + false);
> >      if (ret < 0) {
> >          error_report("Notify Xen COLO-frame failed");
> >      }
> > @@ -272,12 +295,13 @@ static void
> > colo_release_primary_pkt(CompareState *s, Packet *pkt)
> >                             pkt->data,
> >                             pkt->size,
> >                             pkt->vnet_hdr_len,
> > -                           false);
> > +                           false,
> > +                           true);
> >      if (ret < 0) {
> >          error_report("colo send primary packet failed");
> >      }
> >      trace_colo_compare_main("packet same and release packet");
> > -    packet_destroy(pkt, NULL);
> > +    packet_destroy_partial(pkt, NULL);
> >  }
> > 
> >  /*
> > @@ -699,65 +723,115 @@ static void colo_compare_connection(void 
> > *opaque, void *user_data)
> >      }
> >  }
> > 
> > -static int compare_chr_send(CompareState *s,
> > -                            const uint8_t *buf,
> > -                            uint32_t size,
> > -                            uint32_t vnet_hdr_len,
> > -                            bool notify_remote_frame)
> > +static void coroutine_fn _compare_chr_send(void *opaque)
> >  {
> > +    SendCo *sendco = opaque;
> > +    CompareState *s = sendco->s;
> >      int ret = 0;
> > -    uint32_t len = htonl(size);
> > 
> > -    if (!size) {
> > -        return 0;
> > -    }
> > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > +        uint32_t len = htonl(entry->size);
> > 
> > -    if (notify_remote_frame) {
> > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > -                                    (uint8_t *)&len,
> > -                                    sizeof(len));
> > -    } else {
> > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> > -    }
> > +        ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, 
> > + sizeof(len));
> > 
> > -    if (ret != sizeof(len)) {
> > -        goto err;
> > -    }
> > +        if (ret != sizeof(len)) {
> > +            g_free(entry->buf);
> > +            g_slice_free(SendEntry, entry);
> > +            goto err;
> > +        }
> > 
> > -    if (s->vnet_hdr) {
> > -        /*
> > -         * We send vnet header len make other module(like filter-redirector)
> > -         * know how to parse net packet correctly.
> > -         */
> > -        len = htonl(vnet_hdr_len);
> > +        if (!sendco->notify_remote_frame && s->vnet_hdr) {
> > +            /*
> > +             * We send vnet header len make other module(like filter-redirector)
> > +             * know how to parse net packet correctly.
> > +             */
> > +            len = htonl(entry->vnet_hdr_len);
> > 
> > -        if (!notify_remote_frame) {
> > -            ret = qemu_chr_fe_write_all(&s->chr_out,
> > +            ret = qemu_chr_fe_write_all(sendco->chr,
> >                                          (uint8_t *)&len,
> >                                          sizeof(len));
> > +
> > +            if (ret != sizeof(len)) {
> > +                g_free(entry->buf);
> > +                g_slice_free(SendEntry, entry);
> > +                goto err;
> > +            }
> >          }
> > 
> > -        if (ret != sizeof(len)) {
> > +        ret = qemu_chr_fe_write_all(sendco->chr,
> > +                                    (uint8_t *)entry->buf,
> > +                                    entry->size);
> > +
> > +        if (ret != entry->size) {
> > +            g_free(entry->buf);
> > +            g_slice_free(SendEntry, entry);
> >              goto err;
> >          }
> > +
> > +        g_free(entry->buf);
> > +        g_slice_free(SendEntry, entry);
> >      }
> > 
> > +    sendco->ret = 0;
> > +    goto out;
> > +
> > +err:
> > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > +        g_free(entry->buf);
> > +        g_slice_free(SendEntry, entry);
> > +    }
> > +    sendco->ret = ret < 0 ? ret : -EIO;
> > +out:
> > +    sendco->co = NULL;
> > +    sendco->done = true;
> > +    aio_wait_kick();
> > +}
> > +
> > +static int compare_chr_send(CompareState *s,
> > +                            uint8_t *buf,
> > +                            uint32_t size,
> > +                            uint32_t vnet_hdr_len,
> > +                            bool notify_remote_frame,
> > +                            bool zero_copy) {
> > +    SendCo *sendco;
> > +    SendEntry *entry;
> > +
> >      if (notify_remote_frame) {
> > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > -                                    (uint8_t *)buf,
> > -                                    size);
> > +        sendco = &s->notify_sendco;
> >      } else {
> > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > +        sendco = &s->out_sendco;
> >      }
> > 
> > -    if (ret != size) {
> > -        goto err;
> > +    if (!size) {
> > +        return 0;
> >      }
> > 
> > -    return 0;
> > +    entry = g_slice_new(SendEntry);
> > +    entry->size = size;
> > +    entry->vnet_hdr_len = vnet_hdr_len;
> > +    if (zero_copy) {
> > +        entry->buf = buf;
> > +    } else {
> > +        entry->buf = g_malloc(size);
> > +        memcpy(entry->buf, buf, size);
> > +    }
> > +    g_queue_push_head(&sendco->send_list, entry);
> > +
> > +    if (sendco->done) {
> > +        sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
> > +        sendco->done = false;
> > +        qemu_coroutine_enter(sendco->co);
> > +        if (sendco->done) {
> > +            /* report early errors */
> > +            return sendco->ret;
> > +        }
> > +    }
> > 
> > -err:
> > -    return ret < 0 ? ret : -EIO;
> > +    /* assume success */
> > +    return 0;
> >  }
> > 
> >  static int compare_chr_can_read(void *opaque) @@ -1063,6 +1137,7 @@ 
> > static void compare_pri_rs_finalize(SocketReadState *pri_rs)
> >                           pri_rs->buf,
> >                           pri_rs->packet_len,
> >                           pri_rs->vnet_hdr_len,
> > +                         false,
> >                           false);
> >      } else {
> >          /* compare packet in the specified connection */ @@ -1093,7
> > +1168,7 @@ static void compare_notify_rs_finalize(SocketReadState
> > +*notify_rs)
> >      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
> >                             notify_rs->buf,
> >                             notify_rs->packet_len)) {
> > -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > +        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, 
> > + true, false);
> >          if (ret < 0) {
> >              error_report("Notify Xen COLO-frame INIT failed");
> >          }
> > @@ -1199,6 +1274,18 @@ static void
> > colo_compare_complete(UserCreatable *uc, Error **errp)
> > 
> >      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> > 
> > +    s->out_sendco.s = s;
> > +    s->out_sendco.chr = &s->chr_out;
> > +    s->out_sendco.notify_remote_frame = false;
> > +    s->out_sendco.done = true;
> > +    g_queue_init(&s->out_sendco.send_list);
> > +
> > +    s->notify_sendco.s = s;
> > +    s->notify_sendco.chr = &s->chr_notify_dev;
> > +    s->notify_sendco.notify_remote_frame = true;
> > +    s->notify_sendco.done = true;
> > +    g_queue_init(&s->notify_sendco.send_list);
> > +  
> 
> No need to init the notify_sendco each time, because the notify dev just an optional parameter.
> You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev.

Ok, I will change that and the code below in the next version.

> Overall, make the chr_send job to coroutine is a good idea. It looks good for me.
> And your patch inspired me, it looks we can re-use the compare_chr_send code on filter mirror/redirector too.

I already have patch for that, but I don't think it is a good idea, because the guest then can send packets faster than colo-compare can process. This leads bufferbloat and the performance drops in my tests:
Client-to-server tcp:
without patch: ~66 Mbit/s
with patch: ~59 Mbit/s
Server-to-client tcp:
without patch: ~702 Kbit/s
with patch: ~328 Kbit/s

Regards,
Lukas Straub

> Tested-by: Zhang Chen <chen.zhang@intel.com>
> 
> 
> >      g_queue_init(&s->conn_list);
> > 
> >      qemu_mutex_init(&event_mtx);
> > @@ -1225,8 +1312,9 @@ static void colo_flush_packets(void *opaque, 
> > void
> > *user_data)
> >                           pkt->data,
> >                           pkt->size,
> >                           pkt->vnet_hdr_len,
> > -                         false);
> > -        packet_destroy(pkt, NULL);
> > +                         false,
> > +                         true);
> > +        packet_destroy_partial(pkt, NULL);
> >      }
> >      while (!g_queue_is_empty(&conn->secondary_list)) {
> >          pkt = g_queue_pop_head(&conn->secondary_list);
> > @@ -1301,10 +1389,19 @@ static void colo_compare_finalize(Object *obj)
> >          }
> >      }
> > 
> > +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> > +    aio_context_acquire(ctx);
> > +    AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
> > +    AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);  
> 
> Same as above.
> 
> > +    aio_context_release(ctx);
> > +
> >      /* Release all unhandled packets after compare thead exited */
> >      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > +    AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
> > 
> >      g_queue_clear(&s->conn_list);
> > +    g_queue_clear(&s->out_sendco.send_list);
> > +    g_queue_clear(&s->notify_sendco.send_list);  
> 
> Same as above.
> 
> > 
> >      if (s->connection_track_table) {
> >          g_hash_table_destroy(s->connection_track_table);
> > diff --git a/net/colo.c b/net/colo.c
> > index 8196b35837..a6c66d829a 100644
> > --- a/net/colo.c
> > +++ b/net/colo.c
> > @@ -185,6 +185,13 @@ void packet_destroy(void *opaque, void *user_data)
> >      g_slice_free(Packet, pkt);
> >  }
> > 
> > +void packet_destroy_partial(void *opaque, void *user_data) {
> > +    Packet *pkt = opaque;
> > +
> > +    g_slice_free(Packet, pkt);
> > +}
> > +
> >  /*
> >   * Clear hashtable, stop this hash growing really huge
> >   */
> > diff --git a/net/colo.h b/net/colo.h
> > index 679314b1ca..573ab91785 100644
> > --- a/net/colo.h
> > +++ b/net/colo.h
> > @@ -102,5 +102,6 @@ bool connection_has_tracked(GHashTable 
> > *connection_track_table,  void connection_hashtable_reset(GHashTable
> > *connection_track_table);  Packet *packet_new(const void *data, int 
> > size, int vnet_hdr_len);  void packet_destroy(void *opaque, void 
> > *user_data);
> > +void packet_destroy_partial(void *opaque, void *user_data);
> > 
> >  #endif /* NET_COLO_H */
> > --
> > 2.20.1  
>
Zhang, Chen May 8, 2020, 2:19 a.m. UTC | #3
> -----Original Message-----
> From: Lukas Straub <lukasstraub2@web.de>
> Sent: Thursday, May 7, 2020 11:51 PM
> To: Zhang, Chen <chen.zhang@intel.com>
> Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> <pbonzini@redhat.com>
> Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
> 
> On Thu, 7 May 2020 11:00:26 +0000
> "Zhang, Chen" <chen.zhang@intel.com> wrote:
> 
> > > -----Original Message-----
> > > From: Lukas Straub <lukasstraub2@web.de>
> > > Sent: Monday, May 4, 2020 6:28 PM
> > > To: qemu-devel <qemu-devel@nongnu.org>
> > > Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian
> > > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > > <pbonzini@redhat.com>
> > > Subject: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> > > compare_chr_send
> > >
> > > The chr_out chardev is connected to a filter-redirector running in
> > > the main loop. qemu_chr_fe_write_all might block here in
> > > compare_chr_send if the (socket-)buffer is full.
> > > If another filter-redirector in the main loop want's to send data to
> > > chr_pri_in it might also block if the buffer is full. This leads to
> > > a deadlock because both event loops get blocked.
> > >
> > > Fix this by converting compare_chr_send to a coroutine and putting
> > > the packets in a send queue.
> > >
> > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> > > ---
> > >  net/colo-compare.c | 187 ++++++++++++++++++++++++++++++++++---
> ----
> > > ----
> > >  net/colo.c         |   7 ++
> > >  net/colo.h         |   1 +
> > >  3 files changed, 150 insertions(+), 45 deletions(-)
> > >
> > > diff --git a/net/colo-compare.c b/net/colo-compare.c index
> > > 1de4220fe2..2a4e7f7c4e 100644
> > > --- a/net/colo-compare.c
> > > +++ b/net/colo-compare.c
> > > @@ -32,6 +32,9 @@
> > >  #include "migration/migration.h"
> > >  #include "util.h"
> > >
> > > +#include "block/aio-wait.h"
> > > +#include "qemu/coroutine.h"
> > > +
> > >  #define TYPE_COLO_COMPARE "colo-compare"
> > >  #define COLO_COMPARE(obj) \
> > >      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -
> 77,6
> > > +80,23 @@ static int event_unhandled_count;
> > >   *                    |packet  |  |packet  +    |packet  | |packet  +
> > >   *                    +--------+  +--------+    +--------+ +--------+
> > >   */
> > > +
> > > +typedef struct SendCo {
> > > +    Coroutine *co;
> > > +    struct CompareState *s;
> > > +    CharBackend *chr;
> > > +    GQueue send_list;
> > > +    bool notify_remote_frame;
> > > +    bool done;
> > > +    int ret;
> > > +} SendCo;
> > > +
> > > +typedef struct SendEntry {
> > > +    uint32_t size;
> > > +    uint32_t vnet_hdr_len;
> > > +    uint8_t *buf;
> > > +} SendEntry;
> > > +
> > >  typedef struct CompareState {
> > >      Object parent;
> > >
> > > @@ -91,6 +111,8 @@ typedef struct CompareState {
> > >      SocketReadState pri_rs;
> > >      SocketReadState sec_rs;
> > >      SocketReadState notify_rs;
> > > +    SendCo out_sendco;
> > > +    SendCo notify_sendco;
> > >      bool vnet_hdr;
> > >      uint32_t compare_timeout;
> > >      uint32_t expired_scan_cycle;
> > > @@ -124,10 +146,11 @@ enum {
> > >
> > >
> > >  static int compare_chr_send(CompareState *s,
> > > -                            const uint8_t *buf,
> > > +                            uint8_t *buf,
> > >                              uint32_t size,
> > >                              uint32_t vnet_hdr_len,
> > > -                            bool notify_remote_frame);
> > > +                            bool notify_remote_frame,
> > > +                            bool zero_copy);
> > >
> > >  static bool packet_matches_str(const char *str,
> > >                                 const uint8_t *buf, @@ -145,7 +168,7
> > > @@ static void notify_remote_frame(CompareState *s)
> > >      char msg[] = "DO_CHECKPOINT";
> > >      int ret = 0;
> > >
> > > -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > > +    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true,
> > > + false);
> > >      if (ret < 0) {
> > >          error_report("Notify Xen COLO-frame failed");
> > >      }
> > > @@ -272,12 +295,13 @@ static void
> > > colo_release_primary_pkt(CompareState *s, Packet *pkt)
> > >                             pkt->data,
> > >                             pkt->size,
> > >                             pkt->vnet_hdr_len,
> > > -                           false);
> > > +                           false,
> > > +                           true);
> > >      if (ret < 0) {
> > >          error_report("colo send primary packet failed");
> > >      }
> > >      trace_colo_compare_main("packet same and release packet");
> > > -    packet_destroy(pkt, NULL);
> > > +    packet_destroy_partial(pkt, NULL);
> > >  }
> > >
> > >  /*
> > > @@ -699,65 +723,115 @@ static void colo_compare_connection(void
> > > *opaque, void *user_data)
> > >      }
> > >  }
> > >
> > > -static int compare_chr_send(CompareState *s,
> > > -                            const uint8_t *buf,
> > > -                            uint32_t size,
> > > -                            uint32_t vnet_hdr_len,
> > > -                            bool notify_remote_frame)
> > > +static void coroutine_fn _compare_chr_send(void *opaque)
> > >  {
> > > +    SendCo *sendco = opaque;
> > > +    CompareState *s = sendco->s;
> > >      int ret = 0;
> > > -    uint32_t len = htonl(size);
> > >
> > > -    if (!size) {
> > > -        return 0;
> > > -    }
> > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > +        uint32_t len = htonl(entry->size);
> > >
> > > -    if (notify_remote_frame) {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > -                                    (uint8_t *)&len,
> > > -                                    sizeof(len));
> > > -    } else {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,
> sizeof(len));
> > > -    }
> > > +        ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len,
> > > + sizeof(len));
> > >
> > > -    if (ret != sizeof(len)) {
> > > -        goto err;
> > > -    }
> > > +        if (ret != sizeof(len)) {
> > > +            g_free(entry->buf);
> > > +            g_slice_free(SendEntry, entry);
> > > +            goto err;
> > > +        }
> > >
> > > -    if (s->vnet_hdr) {
> > > -        /*
> > > -         * We send vnet header len make other module(like filter-redirector)
> > > -         * know how to parse net packet correctly.
> > > -         */
> > > -        len = htonl(vnet_hdr_len);
> > > +        if (!sendco->notify_remote_frame && s->vnet_hdr) {
> > > +            /*
> > > +             * We send vnet header len make other module(like filter-
> redirector)
> > > +             * know how to parse net packet correctly.
> > > +             */
> > > +            len = htonl(entry->vnet_hdr_len);
> > >
> > > -        if (!notify_remote_frame) {
> > > -            ret = qemu_chr_fe_write_all(&s->chr_out,
> > > +            ret = qemu_chr_fe_write_all(sendco->chr,
> > >                                          (uint8_t *)&len,
> > >                                          sizeof(len));
> > > +
> > > +            if (ret != sizeof(len)) {
> > > +                g_free(entry->buf);
> > > +                g_slice_free(SendEntry, entry);
> > > +                goto err;
> > > +            }
> > >          }
> > >
> > > -        if (ret != sizeof(len)) {
> > > +        ret = qemu_chr_fe_write_all(sendco->chr,
> > > +                                    (uint8_t *)entry->buf,
> > > +                                    entry->size);
> > > +
> > > +        if (ret != entry->size) {
> > > +            g_free(entry->buf);
> > > +            g_slice_free(SendEntry, entry);
> > >              goto err;
> > >          }
> > > +
> > > +        g_free(entry->buf);
> > > +        g_slice_free(SendEntry, entry);
> > >      }
> > >
> > > +    sendco->ret = 0;
> > > +    goto out;
> > > +
> > > +err:
> > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > +        g_free(entry->buf);
> > > +        g_slice_free(SendEntry, entry);
> > > +    }
> > > +    sendco->ret = ret < 0 ? ret : -EIO;
> > > +out:
> > > +    sendco->co = NULL;
> > > +    sendco->done = true;
> > > +    aio_wait_kick();
> > > +}
> > > +
> > > +static int compare_chr_send(CompareState *s,
> > > +                            uint8_t *buf,
> > > +                            uint32_t size,
> > > +                            uint32_t vnet_hdr_len,
> > > +                            bool notify_remote_frame,
> > > +                            bool zero_copy) {
> > > +    SendCo *sendco;
> > > +    SendEntry *entry;
> > > +
> > >      if (notify_remote_frame) {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > -                                    (uint8_t *)buf,
> > > -                                    size);
> > > +        sendco = &s->notify_sendco;
> > >      } else {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > > +        sendco = &s->out_sendco;
> > >      }
> > >
> > > -    if (ret != size) {
> > > -        goto err;
> > > +    if (!size) {
> > > +        return 0;
> > >      }
> > >
> > > -    return 0;
> > > +    entry = g_slice_new(SendEntry);
> > > +    entry->size = size;
> > > +    entry->vnet_hdr_len = vnet_hdr_len;
> > > +    if (zero_copy) {
> > > +        entry->buf = buf;
> > > +    } else {
> > > +        entry->buf = g_malloc(size);
> > > +        memcpy(entry->buf, buf, size);
> > > +    }
> > > +    g_queue_push_head(&sendco->send_list, entry);
> > > +
> > > +    if (sendco->done) {
> > > +        sendco->co = qemu_coroutine_create(_compare_chr_send,
> sendco);
> > > +        sendco->done = false;
> > > +        qemu_coroutine_enter(sendco->co);
> > > +        if (sendco->done) {
> > > +            /* report early errors */
> > > +            return sendco->ret;
> > > +        }
> > > +    }
> > >
> > > -err:
> > > -    return ret < 0 ? ret : -EIO;
> > > +    /* assume success */
> > > +    return 0;
> > >  }
> > >
> > >  static int compare_chr_can_read(void *opaque) @@ -1063,6 +1137,7
> @@
> > > static void compare_pri_rs_finalize(SocketReadState *pri_rs)
> > >                           pri_rs->buf,
> > >                           pri_rs->packet_len,
> > >                           pri_rs->vnet_hdr_len,
> > > +                         false,
> > >                           false);
> > >      } else {
> > >          /* compare packet in the specified connection */ @@ -1093,7
> > > +1168,7 @@ static void compare_notify_rs_finalize(SocketReadState
> > > +*notify_rs)
> > >      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
> > >                             notify_rs->buf,
> > >                             notify_rs->packet_len)) {
> > > -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > > +        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0,
> > > + true, false);
> > >          if (ret < 0) {
> > >              error_report("Notify Xen COLO-frame INIT failed");
> > >          }
> > > @@ -1199,6 +1274,18 @@ static void
> > > colo_compare_complete(UserCreatable *uc, Error **errp)
> > >
> > >      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> > >
> > > +    s->out_sendco.s = s;
> > > +    s->out_sendco.chr = &s->chr_out;
> > > +    s->out_sendco.notify_remote_frame = false;
> > > +    s->out_sendco.done = true;
> > > +    g_queue_init(&s->out_sendco.send_list);
> > > +
> > > +    s->notify_sendco.s = s;
> > > +    s->notify_sendco.chr = &s->chr_notify_dev;
> > > +    s->notify_sendco.notify_remote_frame = true;
> > > +    s->notify_sendco.done = true;
> > > +    g_queue_init(&s->notify_sendco.send_list);
> > > +
> >
> > No need to init the notify_sendco each time, because the notify dev just
> an optional parameter.
> > You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev.
> 
> Ok, I will change that and the code below in the next version.
> 
> > Overall, make the chr_send job to coroutine is a good idea. It looks good
> for me.
> > And your patch inspired me, it looks we can re-use the compare_chr_send
> code on filter mirror/redirector too.
> 
> I already have patch for that, but I don't think it is a good idea, because the
> guest then can send packets faster than colo-compare can process. This leads
> bufferbloat and the performance drops in my tests:
> Client-to-server tcp:
> without patch: ~66 Mbit/s
> with patch: ~59 Mbit/s
> Server-to-client tcp:
> without patch: ~702 Kbit/s
> with patch: ~328 Kbit/s

Oh, a big performance drop, is that caused by memcpy/zero_copy parts ? 

Thanks
Zhang Chen

> 
> Regards,
> Lukas Straub
> 
> > Tested-by: Zhang Chen <chen.zhang@intel.com>
> >
> >
> > >      g_queue_init(&s->conn_list);
> > >
> > >      qemu_mutex_init(&event_mtx);
> > > @@ -1225,8 +1312,9 @@ static void colo_flush_packets(void *opaque,
> > > void
> > > *user_data)
> > >                           pkt->data,
> > >                           pkt->size,
> > >                           pkt->vnet_hdr_len,
> > > -                         false);
> > > -        packet_destroy(pkt, NULL);
> > > +                         false,
> > > +                         true);
> > > +        packet_destroy_partial(pkt, NULL);
> > >      }
> > >      while (!g_queue_is_empty(&conn->secondary_list)) {
> > >          pkt = g_queue_pop_head(&conn->secondary_list);
> > > @@ -1301,10 +1389,19 @@ static void colo_compare_finalize(Object *obj)
> > >          }
> > >      }
> > >
> > > +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> > > +    aio_context_acquire(ctx);
> > > +    AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
> > > +    AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
> >
> > Same as above.
> >
> > > +    aio_context_release(ctx);
> > > +
> > >      /* Release all unhandled packets after compare thead exited */
> > >      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > > +    AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
> > >
> > >      g_queue_clear(&s->conn_list);
> > > +    g_queue_clear(&s->out_sendco.send_list);
> > > +    g_queue_clear(&s->notify_sendco.send_list);
> >
> > Same as above.
> >
> > >
> > >      if (s->connection_track_table) {
> > >          g_hash_table_destroy(s->connection_track_table);
> > > diff --git a/net/colo.c b/net/colo.c index 8196b35837..a6c66d829a
> > > 100644
> > > --- a/net/colo.c
> > > +++ b/net/colo.c
> > > @@ -185,6 +185,13 @@ void packet_destroy(void *opaque, void
> *user_data)
> > >      g_slice_free(Packet, pkt);
> > >  }
> > >
> > > +void packet_destroy_partial(void *opaque, void *user_data) {
> > > +    Packet *pkt = opaque;
> > > +
> > > +    g_slice_free(Packet, pkt);
> > > +}
> > > +
> > >  /*
> > >   * Clear hashtable, stop this hash growing really huge
> > >   */
> > > diff --git a/net/colo.h b/net/colo.h index 679314b1ca..573ab91785
> > > 100644
> > > --- a/net/colo.h
> > > +++ b/net/colo.h
> > > @@ -102,5 +102,6 @@ bool connection_has_tracked(GHashTable
> > > *connection_track_table,  void connection_hashtable_reset(GHashTable
> > > *connection_track_table);  Packet *packet_new(const void *data, int
> > > size, int vnet_hdr_len);  void packet_destroy(void *opaque, void
> > > *user_data);
> > > +void packet_destroy_partial(void *opaque, void *user_data);
> > >
> > >  #endif /* NET_COLO_H */
> > > --
> > > 2.20.1
> >
Lukas Straub May 8, 2020, 6:08 a.m. UTC | #4
On Fri, 8 May 2020 02:19:00 +0000
"Zhang, Chen" <chen.zhang@intel.com> wrote:
> > > No need to init the notify_sendco each time, because the notify dev just  
> > an optional parameter.  
> > > You can use the if (s->notify_dev) here. Just Xen use the chr_notify_dev.  
> > 
> > Ok, I will change that and the code below in the next version.
> >   
> > > Overall, make the chr_send job to coroutine is a good idea. It looks good  
> > for me.  
> > > And your patch inspired me, it looks we can re-use the compare_chr_send  
> > code on filter mirror/redirector too.
> > 
> > I already have patch for that, but I don't think it is a good idea, because the
> > guest then can send packets faster than colo-compare can process. This leads
> > bufferbloat and the performance drops in my tests:
> > Client-to-server tcp:
> > without patch: ~66 Mbit/s
> > with patch: ~59 Mbit/s
> > Server-to-client tcp:
> > without patch: ~702 Kbit/s
> > with patch: ~328 Kbit/s  
> 
> Oh, a big performance drop, is that caused by memcpy/zero_copy parts ? 
> 
> Thanks
> Zhang Chen

No, there is no memcpy overhead with this patch, see below.

Regards,
Lukas Straub

---
 net/filter-mirror.c | 142 +++++++++++++++++++++++++++++++++-----------
 1 file changed, 106 insertions(+), 36 deletions(-)

diff --git a/net/filter-mirror.c b/net/filter-mirror.c
index d83e815545..6bcd317502 100644
--- a/net/filter-mirror.c
+++ b/net/filter-mirror.c
@@ -20,6 +20,8 @@
 #include "chardev/char-fe.h"
 #include "qemu/iov.h"
 #include "qemu/sockets.h"
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
 
 #define FILTER_MIRROR(obj) \
     OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR)
@@ -31,6 +33,18 @@
 #define TYPE_FILTER_REDIRECTOR "filter-redirector"
 #define REDIRECTOR_MAX_LEN NET_BUFSIZE
 
+typedef struct SendCo {
+    Coroutine *co;
+    GQueue send_list;
+    bool done;
+    int ret;
+} SendCo;
+
+typedef struct SendEntry {
+    ssize_t size;
+    uint8_t buf[];
+} SendEntry;
+
 typedef struct MirrorState {
     NetFilterState parent_obj;
     char *indev;
@@ -38,59 +52,101 @@ typedef struct MirrorState {
     CharBackend chr_in;
     CharBackend chr_out;
     SocketReadState rs;
+    SendCo sendco;
     bool vnet_hdr;
 } MirrorState;
 
-static int filter_send(MirrorState *s,
-                       const struct iovec *iov,
-                       int iovcnt)
+static void coroutine_fn _filter_send(void *opaque)
 {
+    MirrorState *s = opaque;
+    SendCo *sendco = &s->sendco;
     NetFilterState *nf = NETFILTER(s);
     int ret = 0;
-    ssize_t size = 0;
-    uint32_t len = 0;
-    char *buf;
-
-    size = iov_size(iov, iovcnt);
-    if (!size) {
-        return 0;
-    }
 
-    len = htonl(size);
-    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-    if (ret != sizeof(len)) {
-        goto err;
-    }
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        uint32_t len = htonl(entry->size);
 
-    if (s->vnet_hdr) {
-        /*
-         * If vnet_hdr = on, we send vnet header len to make other
-         * module(like colo-compare) know how to parse net
-         * packet correctly.
-         */
-        ssize_t vnet_hdr_len;
+        ret = qemu_chr_fe_write_all(&s->chr_out,
+                                    (uint8_t *)&len,
+                                    sizeof(len));
+        if (ret != sizeof(len)) {
+            g_free(entry);
+            goto err;
+        }
 
-        vnet_hdr_len = nf->netdev->vnet_hdr_len;
+        if (s->vnet_hdr) {
+            /*
+             * If vnet_hdr = on, we send vnet header len to make other
+             * module(like colo-compare) know how to parse net
+             * packet correctly.
+             */
+
+            len = htonl(nf->netdev->vnet_hdr_len);
+            ret = qemu_chr_fe_write_all(&s->chr_out,
+                                        (uint8_t *)&len,
+                                        sizeof(len));
+            if (ret != sizeof(len)) {
+                g_free(entry);
+                goto err;
+            }
+        }
 
-        len = htonl(vnet_hdr_len);
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-        if (ret != sizeof(len)) {
+        ret = qemu_chr_fe_write_all(&s->chr_out,
+                                    (uint8_t *)entry->buf,
+                                    entry->size);
+        if (ret != entry->size) {
+            g_free(entry);
             goto err;
         }
-    }
 
-    buf = g_malloc(size);
-    iov_to_buf(iov, iovcnt, 0, buf, size);
-    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
-    g_free(buf);
-    if (ret != size) {
-        goto err;
+        g_free(entry);
     }
 
-    return 0;
+    sendco->ret = 0;
+    goto out;
 
 err:
-    return ret < 0 ? ret : -EIO;
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        g_free(entry);
+    }
+    sendco->ret = ret < 0 ? ret : -EIO;
+out:
+    sendco->co = NULL;
+    sendco->done = true;
+    aio_wait_kick();
+}
+
+static int filter_send(MirrorState *s,
+                       const struct iovec *iov,
+                       int iovcnt)
+{
+    SendCo *sendco = &s->sendco;
+    SendEntry *entry;
+
+    ssize_t size = iov_size(iov, iovcnt);
+    if (!size) {
+        return 0;
+    }
+
+    entry = g_malloc(sizeof(SendEntry) + size);
+    entry->size = size;
+    iov_to_buf(iov, iovcnt, 0, entry->buf, size);
+    g_queue_push_head(&sendco->send_list, entry);
+
+    if (sendco->done) {
+        sendco->co = qemu_coroutine_create(_filter_send, s);
+        sendco->done = false;
+        qemu_coroutine_enter(sendco->co);
+        if (sendco->done) {
+            /* report early errors */
+            return sendco->ret;
+        }
+    }
+
+    /* assume success */
+    return 0;
 }
 
 static void redirector_to_filter(NetFilterState *nf,
@@ -194,6 +250,10 @@ static void filter_mirror_cleanup(NetFilterState *nf)
 {
     MirrorState *s = FILTER_MIRROR(nf);
 
+    AIO_WAIT_WHILE(NULL, !s->sendco.done);
+
+    g_queue_clear(&s->sendco.send_list);
+
     qemu_chr_fe_deinit(&s->chr_out, false);
 }
 
@@ -201,6 +261,10 @@ static void filter_redirector_cleanup(NetFilterState *nf)
 {
     MirrorState *s = FILTER_REDIRECTOR(nf);
 
+    AIO_WAIT_WHILE(NULL, !s->sendco.done);
+
+    g_queue_clear(&s->sendco.send_list);
+
     qemu_chr_fe_deinit(&s->chr_in, false);
     qemu_chr_fe_deinit(&s->chr_out, false);
 }
@@ -224,6 +288,9 @@ static void filter_mirror_setup(NetFilterState *nf, Error **errp)
     }
 
     qemu_chr_fe_init(&s->chr_out, chr, errp);
+
+    s->sendco.done = true;
+    g_queue_init(&s->sendco.send_list);
 }
 
 static void redirector_rs_finalize(SocketReadState *rs)
@@ -281,6 +348,9 @@ static void filter_redirector_setup(NetFilterState *nf, Error **errp)
             return;
         }
     }
+
+    s->sendco.done = true;
+    g_queue_init(&s->sendco.send_list);
 }
 
 static void filter_mirror_class_init(ObjectClass *oc, void *data)
Zhang, Chen May 8, 2020, 6:28 a.m. UTC | #5
> -----Original Message-----
> From: Lukas Straub <lukasstraub2@web.de>
> Sent: Friday, May 8, 2020 2:08 PM
> To: Zhang, Chen <chen.zhang@intel.com>
> Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> <pbonzini@redhat.com>
> Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
> 
> On Fri, 8 May 2020 02:19:00 +0000
> "Zhang, Chen" <chen.zhang@intel.com> wrote:
> > > > No need to init the notify_sendco each time, because the notify
> > > > dev just
> > > an optional parameter.
> > > > You can use the if (s->notify_dev) here. Just Xen use the
> chr_notify_dev.
> > >
> > > Ok, I will change that and the code below in the next version.
> > >
> > > > Overall, make the chr_send job to coroutine is a good idea. It
> > > > looks good
> > > for me.
> > > > And your patch inspired me, it looks we can re-use the
> > > > compare_chr_send
> > > code on filter mirror/redirector too.
> > >
> > > I already have patch for that, but I don't think it is a good idea,
> > > because the guest then can send packets faster than colo-compare can
> > > process. This leads bufferbloat and the performance drops in my tests:
> > > Client-to-server tcp:
> > > without patch: ~66 Mbit/s
> > > with patch: ~59 Mbit/s
> > > Server-to-client tcp:
> > > without patch: ~702 Kbit/s
> > > with patch: ~328 Kbit/s
> >
> > Oh, a big performance drop, is that caused by memcpy/zero_copy parts ?
> >
> > Thanks
> > Zhang Chen
> 
> No, there is no memcpy overhead with this patch, see below.

I means for the filter mirror/redirector parts why coroutine will lead huge performance drop?

Thanks
Zhang Chen

> 
> Regards,
> Lukas Straub
> 
> ---
>  net/filter-mirror.c | 142 +++++++++++++++++++++++++++++++++-----------
>  1 file changed, 106 insertions(+), 36 deletions(-)
> 
> diff --git a/net/filter-mirror.c b/net/filter-mirror.c index
> d83e815545..6bcd317502 100644
> --- a/net/filter-mirror.c
> +++ b/net/filter-mirror.c
> @@ -20,6 +20,8 @@
>  #include "chardev/char-fe.h"
>  #include "qemu/iov.h"
>  #include "qemu/sockets.h"
> +#include "block/aio-wait.h"
> +#include "qemu/coroutine.h"
> 
>  #define FILTER_MIRROR(obj) \
>      OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR) @@ -31,6
> +33,18 @@  #define TYPE_FILTER_REDIRECTOR "filter-redirector"
>  #define REDIRECTOR_MAX_LEN NET_BUFSIZE
> 
> +typedef struct SendCo {
> +    Coroutine *co;
> +    GQueue send_list;
> +    bool done;
> +    int ret;
> +} SendCo;
> +
> +typedef struct SendEntry {
> +    ssize_t size;
> +    uint8_t buf[];
> +} SendEntry;
> +
>  typedef struct MirrorState {
>      NetFilterState parent_obj;
>      char *indev;
> @@ -38,59 +52,101 @@ typedef struct MirrorState {
>      CharBackend chr_in;
>      CharBackend chr_out;
>      SocketReadState rs;
> +    SendCo sendco;
>      bool vnet_hdr;
>  } MirrorState;
> 
> -static int filter_send(MirrorState *s,
> -                       const struct iovec *iov,
> -                       int iovcnt)
> +static void coroutine_fn _filter_send(void *opaque)
>  {
> +    MirrorState *s = opaque;
> +    SendCo *sendco = &s->sendco;
>      NetFilterState *nf = NETFILTER(s);
>      int ret = 0;
> -    ssize_t size = 0;
> -    uint32_t len = 0;
> -    char *buf;
> -
> -    size = iov_size(iov, iovcnt);
> -    if (!size) {
> -        return 0;
> -    }
> 
> -    len = htonl(size);
> -    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> -    if (ret != sizeof(len)) {
> -        goto err;
> -    }
> +    while (!g_queue_is_empty(&sendco->send_list)) {
> +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> +        uint32_t len = htonl(entry->size);
> 
> -    if (s->vnet_hdr) {
> -        /*
> -         * If vnet_hdr = on, we send vnet header len to make other
> -         * module(like colo-compare) know how to parse net
> -         * packet correctly.
> -         */
> -        ssize_t vnet_hdr_len;
> +        ret = qemu_chr_fe_write_all(&s->chr_out,
> +                                    (uint8_t *)&len,
> +                                    sizeof(len));
> +        if (ret != sizeof(len)) {
> +            g_free(entry);
> +            goto err;
> +        }
> 
> -        vnet_hdr_len = nf->netdev->vnet_hdr_len;
> +        if (s->vnet_hdr) {
> +            /*
> +             * If vnet_hdr = on, we send vnet header len to make other
> +             * module(like colo-compare) know how to parse net
> +             * packet correctly.
> +             */
> +
> +            len = htonl(nf->netdev->vnet_hdr_len);
> +            ret = qemu_chr_fe_write_all(&s->chr_out,
> +                                        (uint8_t *)&len,
> +                                        sizeof(len));
> +            if (ret != sizeof(len)) {
> +                g_free(entry);
> +                goto err;
> +            }
> +        }
> 
> -        len = htonl(vnet_hdr_len);
> -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> -        if (ret != sizeof(len)) {
> +        ret = qemu_chr_fe_write_all(&s->chr_out,
> +                                    (uint8_t *)entry->buf,
> +                                    entry->size);
> +        if (ret != entry->size) {
> +            g_free(entry);
>              goto err;
>          }
> -    }
> 
> -    buf = g_malloc(size);
> -    iov_to_buf(iov, iovcnt, 0, buf, size);
> -    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> -    g_free(buf);
> -    if (ret != size) {
> -        goto err;
> +        g_free(entry);
>      }
> 
> -    return 0;
> +    sendco->ret = 0;
> +    goto out;
> 
>  err:
> -    return ret < 0 ? ret : -EIO;
> +    while (!g_queue_is_empty(&sendco->send_list)) {
> +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> +        g_free(entry);
> +    }
> +    sendco->ret = ret < 0 ? ret : -EIO;
> +out:
> +    sendco->co = NULL;
> +    sendco->done = true;
> +    aio_wait_kick();
> +}
> +
> +static int filter_send(MirrorState *s,
> +                       const struct iovec *iov,
> +                       int iovcnt)
> +{
> +    SendCo *sendco = &s->sendco;
> +    SendEntry *entry;
> +
> +    ssize_t size = iov_size(iov, iovcnt);
> +    if (!size) {
> +        return 0;
> +    }
> +
> +    entry = g_malloc(sizeof(SendEntry) + size);
> +    entry->size = size;
> +    iov_to_buf(iov, iovcnt, 0, entry->buf, size);
> +    g_queue_push_head(&sendco->send_list, entry);
> +
> +    if (sendco->done) {
> +        sendco->co = qemu_coroutine_create(_filter_send, s);
> +        sendco->done = false;
> +        qemu_coroutine_enter(sendco->co);
> +        if (sendco->done) {
> +            /* report early errors */
> +            return sendco->ret;
> +        }
> +    }
> +
> +    /* assume success */
> +    return 0;
>  }
> 
>  static void redirector_to_filter(NetFilterState *nf, @@ -194,6 +250,10 @@
> static void filter_mirror_cleanup(NetFilterState *nf)  {
>      MirrorState *s = FILTER_MIRROR(nf);
> 
> +    AIO_WAIT_WHILE(NULL, !s->sendco.done);
> +
> +    g_queue_clear(&s->sendco.send_list);
> +
>      qemu_chr_fe_deinit(&s->chr_out, false);  }
> 
> @@ -201,6 +261,10 @@ static void filter_redirector_cleanup(NetFilterState
> *nf)  {
>      MirrorState *s = FILTER_REDIRECTOR(nf);
> 
> +    AIO_WAIT_WHILE(NULL, !s->sendco.done);
> +
> +    g_queue_clear(&s->sendco.send_list);
> +
>      qemu_chr_fe_deinit(&s->chr_in, false);
>      qemu_chr_fe_deinit(&s->chr_out, false);  } @@ -224,6 +288,9 @@ static
> void filter_mirror_setup(NetFilterState *nf, Error **errp)
>      }
> 
>      qemu_chr_fe_init(&s->chr_out, chr, errp);
> +
> +    s->sendco.done = true;
> +    g_queue_init(&s->sendco.send_list);
>  }
> 
>  static void redirector_rs_finalize(SocketReadState *rs) @@ -281,6 +348,9
> @@ static void filter_redirector_setup(NetFilterState *nf, Error **errp)
>              return;
>          }
>      }
> +
> +    s->sendco.done = true;
> +    g_queue_init(&s->sendco.send_list);
>  }
> 
>  static void filter_mirror_class_init(ObjectClass *oc, void *data)
> --
> 2.20.1
Lukas Straub May 8, 2020, 7:56 a.m. UTC | #6
On Fri, 8 May 2020 06:28:45 +0000
"Zhang, Chen" <chen.zhang@intel.com> wrote:

> > -----Original Message-----
> > From: Lukas Straub <lukasstraub2@web.de>
> > Sent: Friday, May 8, 2020 2:08 PM
> > To: Zhang, Chen <chen.zhang@intel.com>
> > Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > <pbonzini@redhat.com>
> > Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> > compare_chr_send
> > 
> > On Fri, 8 May 2020 02:19:00 +0000
> > "Zhang, Chen" <chen.zhang@intel.com> wrote:  
> > > > > No need to init the notify_sendco each time, because the notify
> > > > > dev just  
> > > > an optional parameter.  
> > > > > You can use the if (s->notify_dev) here. Just Xen use the  
> > chr_notify_dev.  
> > > >
> > > > Ok, I will change that and the code below in the next version.
> > > >  
> > > > > Overall, make the chr_send job to coroutine is a good idea. It
> > > > > looks good  
> > > > for me.  
> > > > > And your patch inspired me, it looks we can re-use the
> > > > > compare_chr_send  
> > > > code on filter mirror/redirector too.
> > > >
> > > > I already have patch for that, but I don't think it is a good idea,
> > > > because the guest then can send packets faster than colo-compare can
> > > > process. This leads bufferbloat and the performance drops in my tests:
> > > > Client-to-server tcp:
> > > > without patch: ~66 Mbit/s
> > > > with patch: ~59 Mbit/s
> > > > Server-to-client tcp:
> > > > without patch: ~702 Kbit/s
> > > > with patch: ~328 Kbit/s  
> > >
> > > Oh, a big performance drop, is that caused by memcpy/zero_copy parts ?
> > >
> > > Thanks
> > > Zhang Chen  
> > 
> > No, there is no memcpy overhead with this patch, see below.  
> 
> I means for the filter mirror/redirector parts why coroutine will lead huge performance drop?

It's because having a additional buffer before the network bottleneck (colo-compare in our case) confuses TCP's congestion-control:
TCP will speed up the data transfer until packets start to drop (or the network interface is blocked). This feedback has to be quick so TCP can select a suitable transfer speed. But with the patch, the guest will fill the buffer as fast as it can (it does not "see" the slow bandwidth of colo-compare behind the buffer) until it it hits against the TCP congestion window. At this point TCP drastically reduces its transfer speed and it stays low because the full buffer delays the packets so it doesn't receive ACK's so it can't speed up the transfer again. Until the buffer is empty again (can take up to a second in my tests). Then this cycle repeats.

Regards,
Lukas Straub

> Thanks
> Zhang Chen
> 
> > 
> > Regards,
> > Lukas Straub
> > 
> > ---
> >  net/filter-mirror.c | 142 +++++++++++++++++++++++++++++++++-----------
> >  1 file changed, 106 insertions(+), 36 deletions(-)
> > 
> > diff --git a/net/filter-mirror.c b/net/filter-mirror.c index
> > d83e815545..6bcd317502 100644
> > --- a/net/filter-mirror.c
> > +++ b/net/filter-mirror.c
> > @@ -20,6 +20,8 @@
> >  #include "chardev/char-fe.h"
> >  #include "qemu/iov.h"
> >  #include "qemu/sockets.h"
> > +#include "block/aio-wait.h"
> > +#include "qemu/coroutine.h"
> > 
> >  #define FILTER_MIRROR(obj) \
> >      OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR) @@ -31,6
> > +33,18 @@  #define TYPE_FILTER_REDIRECTOR "filter-redirector"
> >  #define REDIRECTOR_MAX_LEN NET_BUFSIZE
> > 
> > +typedef struct SendCo {
> > +    Coroutine *co;
> > +    GQueue send_list;
> > +    bool done;
> > +    int ret;
> > +} SendCo;
> > +
> > +typedef struct SendEntry {
> > +    ssize_t size;
> > +    uint8_t buf[];
> > +} SendEntry;
> > +
> >  typedef struct MirrorState {
> >      NetFilterState parent_obj;
> >      char *indev;
> > @@ -38,59 +52,101 @@ typedef struct MirrorState {
> >      CharBackend chr_in;
> >      CharBackend chr_out;
> >      SocketReadState rs;
> > +    SendCo sendco;
> >      bool vnet_hdr;
> >  } MirrorState;
> > 
> > -static int filter_send(MirrorState *s,
> > -                       const struct iovec *iov,
> > -                       int iovcnt)
> > +static void coroutine_fn _filter_send(void *opaque)
> >  {
> > +    MirrorState *s = opaque;
> > +    SendCo *sendco = &s->sendco;
> >      NetFilterState *nf = NETFILTER(s);
> >      int ret = 0;
> > -    ssize_t size = 0;
> > -    uint32_t len = 0;
> > -    char *buf;
> > -
> > -    size = iov_size(iov, iovcnt);
> > -    if (!size) {
> > -        return 0;
> > -    }
> > 
> > -    len = htonl(size);
> > -    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> > -    if (ret != sizeof(len)) {
> > -        goto err;
> > -    }
> > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > +        uint32_t len = htonl(entry->size);
> > 
> > -    if (s->vnet_hdr) {
> > -        /*
> > -         * If vnet_hdr = on, we send vnet header len to make other
> > -         * module(like colo-compare) know how to parse net
> > -         * packet correctly.
> > -         */
> > -        ssize_t vnet_hdr_len;
> > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > +                                    (uint8_t *)&len,
> > +                                    sizeof(len));
> > +        if (ret != sizeof(len)) {
> > +            g_free(entry);
> > +            goto err;
> > +        }
> > 
> > -        vnet_hdr_len = nf->netdev->vnet_hdr_len;
> > +        if (s->vnet_hdr) {
> > +            /*
> > +             * If vnet_hdr = on, we send vnet header len to make other
> > +             * module(like colo-compare) know how to parse net
> > +             * packet correctly.
> > +             */
> > +
> > +            len = htonl(nf->netdev->vnet_hdr_len);
> > +            ret = qemu_chr_fe_write_all(&s->chr_out,
> > +                                        (uint8_t *)&len,
> > +                                        sizeof(len));
> > +            if (ret != sizeof(len)) {
> > +                g_free(entry);
> > +                goto err;
> > +            }
> > +        }
> > 
> > -        len = htonl(vnet_hdr_len);
> > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> > -        if (ret != sizeof(len)) {
> > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > +                                    (uint8_t *)entry->buf,
> > +                                    entry->size);
> > +        if (ret != entry->size) {
> > +            g_free(entry);
> >              goto err;
> >          }
> > -    }
> > 
> > -    buf = g_malloc(size);
> > -    iov_to_buf(iov, iovcnt, 0, buf, size);
> > -    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > -    g_free(buf);
> > -    if (ret != size) {
> > -        goto err;
> > +        g_free(entry);
> >      }
> > 
> > -    return 0;
> > +    sendco->ret = 0;
> > +    goto out;
> > 
> >  err:
> > -    return ret < 0 ? ret : -EIO;
> > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > +        g_free(entry);
> > +    }
> > +    sendco->ret = ret < 0 ? ret : -EIO;
> > +out:
> > +    sendco->co = NULL;
> > +    sendco->done = true;
> > +    aio_wait_kick();
> > +}
> > +
> > +static int filter_send(MirrorState *s,
> > +                       const struct iovec *iov,
> > +                       int iovcnt)
> > +{
> > +    SendCo *sendco = &s->sendco;
> > +    SendEntry *entry;
> > +
> > +    ssize_t size = iov_size(iov, iovcnt);
> > +    if (!size) {
> > +        return 0;
> > +    }
> > +
> > +    entry = g_malloc(sizeof(SendEntry) + size);
> > +    entry->size = size;
> > +    iov_to_buf(iov, iovcnt, 0, entry->buf, size);
> > +    g_queue_push_head(&sendco->send_list, entry);
> > +
> > +    if (sendco->done) {
> > +        sendco->co = qemu_coroutine_create(_filter_send, s);
> > +        sendco->done = false;
> > +        qemu_coroutine_enter(sendco->co);
> > +        if (sendco->done) {
> > +            /* report early errors */
> > +            return sendco->ret;
> > +        }
> > +    }
> > +
> > +    /* assume success */
> > +    return 0;
> >  }
> > 
> >  static void redirector_to_filter(NetFilterState *nf, @@ -194,6 +250,10 @@
> > static void filter_mirror_cleanup(NetFilterState *nf)  {
> >      MirrorState *s = FILTER_MIRROR(nf);
> > 
> > +    AIO_WAIT_WHILE(NULL, !s->sendco.done);
> > +
> > +    g_queue_clear(&s->sendco.send_list);
> > +
> >      qemu_chr_fe_deinit(&s->chr_out, false);  }
> > 
> > @@ -201,6 +261,10 @@ static void filter_redirector_cleanup(NetFilterState
> > *nf)  {
> >      MirrorState *s = FILTER_REDIRECTOR(nf);
> > 
> > +    AIO_WAIT_WHILE(NULL, !s->sendco.done);
> > +
> > +    g_queue_clear(&s->sendco.send_list);
> > +
> >      qemu_chr_fe_deinit(&s->chr_in, false);
> >      qemu_chr_fe_deinit(&s->chr_out, false);  } @@ -224,6 +288,9 @@ static
> > void filter_mirror_setup(NetFilterState *nf, Error **errp)
> >      }
> > 
> >      qemu_chr_fe_init(&s->chr_out, chr, errp);
> > +
> > +    s->sendco.done = true;
> > +    g_queue_init(&s->sendco.send_list);
> >  }
> > 
> >  static void redirector_rs_finalize(SocketReadState *rs) @@ -281,6 +348,9
> > @@ static void filter_redirector_setup(NetFilterState *nf, Error **errp)
> >              return;
> >          }
> >      }
> > +
> > +    s->sendco.done = true;
> > +    g_queue_init(&s->sendco.send_list);
> >  }
> > 
> >  static void filter_mirror_class_init(ObjectClass *oc, void *data)
> > --
> > 2.20.1  
>
Zhang, Chen May 11, 2020, 8:30 a.m. UTC | #7
> -----Original Message-----
> From: Lukas Straub <lukasstraub2@web.de>
> Sent: Friday, May 8, 2020 3:56 PM
> To: Zhang, Chen <chen.zhang@intel.com>
> Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> <pbonzini@redhat.com>
> Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
> 
> On Fri, 8 May 2020 06:28:45 +0000
> "Zhang, Chen" <chen.zhang@intel.com> wrote:
> 
> > > -----Original Message-----
> > > From: Lukas Straub <lukasstraub2@web.de>
> > > Sent: Friday, May 8, 2020 2:08 PM
> > > To: Zhang, Chen <chen.zhang@intel.com>
> > > Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> > > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > > <pbonzini@redhat.com>
> > > Subject: Re: [PATCH v4 3/6] net/colo-compare.c: Fix deadlock in
> > > compare_chr_send
> > >
> > > On Fri, 8 May 2020 02:19:00 +0000
> > > "Zhang, Chen" <chen.zhang@intel.com> wrote:
> > > > > > No need to init the notify_sendco each time, because the
> > > > > > notify dev just
> > > > > an optional parameter.
> > > > > > You can use the if (s->notify_dev) here. Just Xen use the
> > > chr_notify_dev.
> > > > >
> > > > > Ok, I will change that and the code below in the next version.
> > > > >
> > > > > > Overall, make the chr_send job to coroutine is a good idea. It
> > > > > > looks good
> > > > > for me.
> > > > > > And your patch inspired me, it looks we can re-use the
> > > > > > compare_chr_send
> > > > > code on filter mirror/redirector too.
> > > > >
> > > > > I already have patch for that, but I don't think it is a good
> > > > > idea, because the guest then can send packets faster than
> > > > > colo-compare can process. This leads bufferbloat and the
> performance drops in my tests:
> > > > > Client-to-server tcp:
> > > > > without patch: ~66 Mbit/s
> > > > > with patch: ~59 Mbit/s
> > > > > Server-to-client tcp:
> > > > > without patch: ~702 Kbit/s
> > > > > with patch: ~328 Kbit/s
> > > >
> > > > Oh, a big performance drop, is that caused by memcpy/zero_copy
> parts ?
> > > >
> > > > Thanks
> > > > Zhang Chen
> > >
> > > No, there is no memcpy overhead with this patch, see below.
> >
> > I means for the filter mirror/redirector parts why coroutine will lead huge
> performance drop?
> 
> It's because having a additional buffer before the network bottleneck (colo-
> compare in our case) confuses TCP's congestion-control:
> TCP will speed up the data transfer until packets start to drop (or the
> network interface is blocked). This feedback has to be quick so TCP can select
> a suitable transfer speed. But with the patch, the guest will fill the buffer as
> fast as it can (it does not "see" the slow bandwidth of colo-compare behind
> the buffer) until it it hits against the TCP congestion window. At this point TCP
> drastically reduces its transfer speed and it stays low because the full buffer
> delays the packets so it doesn't receive ACK's so it can't speed up the
> transfer again. Until the buffer is empty again (can take up to a second in my
> tests). Then this cycle repeats.

Make sense!
After fix above issue:
Reviewed-by: Zhang Chen <chen.zhang@intel.com>

Thanks
Zhang Chen

> 
> Regards,
> Lukas Straub
> 
> > Thanks
> > Zhang Chen
> >
> > >
> > > Regards,
> > > Lukas Straub
> > >
> > > ---
> > >  net/filter-mirror.c | 142
> > > +++++++++++++++++++++++++++++++++-----------
> > >  1 file changed, 106 insertions(+), 36 deletions(-)
> > >
> > > diff --git a/net/filter-mirror.c b/net/filter-mirror.c index
> > > d83e815545..6bcd317502 100644
> > > --- a/net/filter-mirror.c
> > > +++ b/net/filter-mirror.c
> > > @@ -20,6 +20,8 @@
> > >  #include "chardev/char-fe.h"
> > >  #include "qemu/iov.h"
> > >  #include "qemu/sockets.h"
> > > +#include "block/aio-wait.h"
> > > +#include "qemu/coroutine.h"
> > >
> > >  #define FILTER_MIRROR(obj) \
> > >      OBJECT_CHECK(MirrorState, (obj), TYPE_FILTER_MIRROR) @@ -31,6
> > > +33,18 @@  #define TYPE_FILTER_REDIRECTOR "filter-redirector"
> > >  #define REDIRECTOR_MAX_LEN NET_BUFSIZE
> > >
> > > +typedef struct SendCo {
> > > +    Coroutine *co;
> > > +    GQueue send_list;
> > > +    bool done;
> > > +    int ret;
> > > +} SendCo;
> > > +
> > > +typedef struct SendEntry {
> > > +    ssize_t size;
> > > +    uint8_t buf[];
> > > +} SendEntry;
> > > +
> > >  typedef struct MirrorState {
> > >      NetFilterState parent_obj;
> > >      char *indev;
> > > @@ -38,59 +52,101 @@ typedef struct MirrorState {
> > >      CharBackend chr_in;
> > >      CharBackend chr_out;
> > >      SocketReadState rs;
> > > +    SendCo sendco;
> > >      bool vnet_hdr;
> > >  } MirrorState;
> > >
> > > -static int filter_send(MirrorState *s,
> > > -                       const struct iovec *iov,
> > > -                       int iovcnt)
> > > +static void coroutine_fn _filter_send(void *opaque)
> > >  {
> > > +    MirrorState *s = opaque;
> > > +    SendCo *sendco = &s->sendco;
> > >      NetFilterState *nf = NETFILTER(s);
> > >      int ret = 0;
> > > -    ssize_t size = 0;
> > > -    uint32_t len = 0;
> > > -    char *buf;
> > > -
> > > -    size = iov_size(iov, iovcnt);
> > > -    if (!size) {
> > > -        return 0;
> > > -    }
> > >
> > > -    len = htonl(size);
> > > -    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,
> sizeof(len));
> > > -    if (ret != sizeof(len)) {
> > > -        goto err;
> > > -    }
> > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > +        uint32_t len = htonl(entry->size);
> > >
> > > -    if (s->vnet_hdr) {
> > > -        /*
> > > -         * If vnet_hdr = on, we send vnet header len to make other
> > > -         * module(like colo-compare) know how to parse net
> > > -         * packet correctly.
> > > -         */
> > > -        ssize_t vnet_hdr_len;
> > > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > > +                                    (uint8_t *)&len,
> > > +                                    sizeof(len));
> > > +        if (ret != sizeof(len)) {
> > > +            g_free(entry);
> > > +            goto err;
> > > +        }
> > >
> > > -        vnet_hdr_len = nf->netdev->vnet_hdr_len;
> > > +        if (s->vnet_hdr) {
> > > +            /*
> > > +             * If vnet_hdr = on, we send vnet header len to make other
> > > +             * module(like colo-compare) know how to parse net
> > > +             * packet correctly.
> > > +             */
> > > +
> > > +            len = htonl(nf->netdev->vnet_hdr_len);
> > > +            ret = qemu_chr_fe_write_all(&s->chr_out,
> > > +                                        (uint8_t *)&len,
> > > +                                        sizeof(len));
> > > +            if (ret != sizeof(len)) {
> > > +                g_free(entry);
> > > +                goto err;
> > > +            }
> > > +        }
> > >
> > > -        len = htonl(vnet_hdr_len);
> > > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,
> sizeof(len));
> > > -        if (ret != sizeof(len)) {
> > > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > > +                                    (uint8_t *)entry->buf,
> > > +                                    entry->size);
> > > +        if (ret != entry->size) {
> > > +            g_free(entry);
> > >              goto err;
> > >          }
> > > -    }
> > >
> > > -    buf = g_malloc(size);
> > > -    iov_to_buf(iov, iovcnt, 0, buf, size);
> > > -    ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > > -    g_free(buf);
> > > -    if (ret != size) {
> > > -        goto err;
> > > +        g_free(entry);
> > >      }
> > >
> > > -    return 0;
> > > +    sendco->ret = 0;
> > > +    goto out;
> > >
> > >  err:
> > > -    return ret < 0 ? ret : -EIO;
> > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > +        g_free(entry);
> > > +    }
> > > +    sendco->ret = ret < 0 ? ret : -EIO;
> > > +out:
> > > +    sendco->co = NULL;
> > > +    sendco->done = true;
> > > +    aio_wait_kick();
> > > +}
> > > +
> > > +static int filter_send(MirrorState *s,
> > > +                       const struct iovec *iov,
> > > +                       int iovcnt)
> > > +{
> > > +    SendCo *sendco = &s->sendco;
> > > +    SendEntry *entry;
> > > +
> > > +    ssize_t size = iov_size(iov, iovcnt);
> > > +    if (!size) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    entry = g_malloc(sizeof(SendEntry) + size);
> > > +    entry->size = size;
> > > +    iov_to_buf(iov, iovcnt, 0, entry->buf, size);
> > > +    g_queue_push_head(&sendco->send_list, entry);
> > > +
> > > +    if (sendco->done) {
> > > +        sendco->co = qemu_coroutine_create(_filter_send, s);
> > > +        sendco->done = false;
> > > +        qemu_coroutine_enter(sendco->co);
> > > +        if (sendco->done) {
> > > +            /* report early errors */
> > > +            return sendco->ret;
> > > +        }
> > > +    }
> > > +
> > > +    /* assume success */
> > > +    return 0;
> > >  }
> > >
> > >  static void redirector_to_filter(NetFilterState *nf, @@ -194,6
> > > +250,10 @@ static void filter_mirror_cleanup(NetFilterState *nf)  {
> > >      MirrorState *s = FILTER_MIRROR(nf);
> > >
> > > +    AIO_WAIT_WHILE(NULL, !s->sendco.done);
> > > +
> > > +    g_queue_clear(&s->sendco.send_list);
> > > +
> > >      qemu_chr_fe_deinit(&s->chr_out, false);  }
> > >
> > > @@ -201,6 +261,10 @@ static void
> > > filter_redirector_cleanup(NetFilterState
> > > *nf)  {
> > >      MirrorState *s = FILTER_REDIRECTOR(nf);
> > >
> > > +    AIO_WAIT_WHILE(NULL, !s->sendco.done);
> > > +
> > > +    g_queue_clear(&s->sendco.send_list);
> > > +
> > >      qemu_chr_fe_deinit(&s->chr_in, false);
> > >      qemu_chr_fe_deinit(&s->chr_out, false);  } @@ -224,6 +288,9 @@
> > > static void filter_mirror_setup(NetFilterState *nf, Error **errp)
> > >      }
> > >
> > >      qemu_chr_fe_init(&s->chr_out, chr, errp);
> > > +
> > > +    s->sendco.done = true;
> > > +    g_queue_init(&s->sendco.send_list);
> > >  }
> > >
> > >  static void redirector_rs_finalize(SocketReadState *rs) @@ -281,6
> > > +348,9 @@ static void filter_redirector_setup(NetFilterState *nf, Error
> **errp)
> > >              return;
> > >          }
> > >      }
> > > +
> > > +    s->sendco.done = true;
> > > +    g_queue_init(&s->sendco.send_list);
> > >  }
> > >
> > >  static void filter_mirror_class_init(ObjectClass *oc, void *data)
> > > --
> > > 2.20.1
> >
diff mbox series

Patch

diff --git a/net/colo-compare.c b/net/colo-compare.c
index 1de4220fe2..2a4e7f7c4e 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -32,6 +32,9 @@ 
 #include "migration/migration.h"
 #include "util.h"
 
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
+
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -77,6 +80,23 @@  static int event_unhandled_count;
  *                    |packet  |  |packet  +    |packet  | |packet  +
  *                    +--------+  +--------+    +--------+ +--------+
  */
+
+typedef struct SendCo {
+    Coroutine *co;
+    struct CompareState *s;
+    CharBackend *chr;
+    GQueue send_list;
+    bool notify_remote_frame;
+    bool done;
+    int ret;
+} SendCo;
+
+typedef struct SendEntry {
+    uint32_t size;
+    uint32_t vnet_hdr_len;
+    uint8_t *buf;
+} SendEntry;
+
 typedef struct CompareState {
     Object parent;
 
@@ -91,6 +111,8 @@  typedef struct CompareState {
     SocketReadState pri_rs;
     SocketReadState sec_rs;
     SocketReadState notify_rs;
+    SendCo out_sendco;
+    SendCo notify_sendco;
     bool vnet_hdr;
     uint32_t compare_timeout;
     uint32_t expired_scan_cycle;
@@ -124,10 +146,11 @@  enum {
 
 
 static int compare_chr_send(CompareState *s,
-                            const uint8_t *buf,
+                            uint8_t *buf,
                             uint32_t size,
                             uint32_t vnet_hdr_len,
-                            bool notify_remote_frame);
+                            bool notify_remote_frame,
+                            bool zero_copy);
 
 static bool packet_matches_str(const char *str,
                                const uint8_t *buf,
@@ -145,7 +168,7 @@  static void notify_remote_frame(CompareState *s)
     char msg[] = "DO_CHECKPOINT";
     int ret = 0;
 
-    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
     if (ret < 0) {
         error_report("Notify Xen COLO-frame failed");
     }
@@ -272,12 +295,13 @@  static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
                            pkt->data,
                            pkt->size,
                            pkt->vnet_hdr_len,
-                           false);
+                           false,
+                           true);
     if (ret < 0) {
         error_report("colo send primary packet failed");
     }
     trace_colo_compare_main("packet same and release packet");
-    packet_destroy(pkt, NULL);
+    packet_destroy_partial(pkt, NULL);
 }
 
 /*
@@ -699,65 +723,115 @@  static void colo_compare_connection(void *opaque, void *user_data)
     }
 }
 
-static int compare_chr_send(CompareState *s,
-                            const uint8_t *buf,
-                            uint32_t size,
-                            uint32_t vnet_hdr_len,
-                            bool notify_remote_frame)
+static void coroutine_fn _compare_chr_send(void *opaque)
 {
+    SendCo *sendco = opaque;
+    CompareState *s = sendco->s;
     int ret = 0;
-    uint32_t len = htonl(size);
 
-    if (!size) {
-        return 0;
-    }
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        uint32_t len = htonl(entry->size);
 
-    if (notify_remote_frame) {
-        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
-                                    (uint8_t *)&len,
-                                    sizeof(len));
-    } else {
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-    }
+        ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
 
-    if (ret != sizeof(len)) {
-        goto err;
-    }
+        if (ret != sizeof(len)) {
+            g_free(entry->buf);
+            g_slice_free(SendEntry, entry);
+            goto err;
+        }
 
-    if (s->vnet_hdr) {
-        /*
-         * We send vnet header len make other module(like filter-redirector)
-         * know how to parse net packet correctly.
-         */
-        len = htonl(vnet_hdr_len);
+        if (!sendco->notify_remote_frame && s->vnet_hdr) {
+            /*
+             * We send vnet header len make other module(like filter-redirector)
+             * know how to parse net packet correctly.
+             */
+            len = htonl(entry->vnet_hdr_len);
 
-        if (!notify_remote_frame) {
-            ret = qemu_chr_fe_write_all(&s->chr_out,
+            ret = qemu_chr_fe_write_all(sendco->chr,
                                         (uint8_t *)&len,
                                         sizeof(len));
+
+            if (ret != sizeof(len)) {
+                g_free(entry->buf);
+                g_slice_free(SendEntry, entry);
+                goto err;
+            }
         }
 
-        if (ret != sizeof(len)) {
+        ret = qemu_chr_fe_write_all(sendco->chr,
+                                    (uint8_t *)entry->buf,
+                                    entry->size);
+
+        if (ret != entry->size) {
+            g_free(entry->buf);
+            g_slice_free(SendEntry, entry);
             goto err;
         }
+
+        g_free(entry->buf);
+        g_slice_free(SendEntry, entry);
     }
 
+    sendco->ret = 0;
+    goto out;
+
+err:
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        g_free(entry->buf);
+        g_slice_free(SendEntry, entry);
+    }
+    sendco->ret = ret < 0 ? ret : -EIO;
+out:
+    sendco->co = NULL;
+    sendco->done = true;
+    aio_wait_kick();
+}
+
+static int compare_chr_send(CompareState *s,
+                            uint8_t *buf,
+                            uint32_t size,
+                            uint32_t vnet_hdr_len,
+                            bool notify_remote_frame,
+                            bool zero_copy)
+{
+    SendCo *sendco;
+    SendEntry *entry;
+
     if (notify_remote_frame) {
-        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
-                                    (uint8_t *)buf,
-                                    size);
+        sendco = &s->notify_sendco;
     } else {
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
+        sendco = &s->out_sendco;
     }
 
-    if (ret != size) {
-        goto err;
+    if (!size) {
+        return 0;
     }
 
-    return 0;
+    entry = g_slice_new(SendEntry);
+    entry->size = size;
+    entry->vnet_hdr_len = vnet_hdr_len;
+    if (zero_copy) {
+        entry->buf = buf;
+    } else {
+        entry->buf = g_malloc(size);
+        memcpy(entry->buf, buf, size);
+    }
+    g_queue_push_head(&sendco->send_list, entry);
+
+    if (sendco->done) {
+        sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
+        sendco->done = false;
+        qemu_coroutine_enter(sendco->co);
+        if (sendco->done) {
+            /* report early errors */
+            return sendco->ret;
+        }
+    }
 
-err:
-    return ret < 0 ? ret : -EIO;
+    /* assume success */
+    return 0;
 }
 
 static int compare_chr_can_read(void *opaque)
@@ -1063,6 +1137,7 @@  static void compare_pri_rs_finalize(SocketReadState *pri_rs)
                          pri_rs->buf,
                          pri_rs->packet_len,
                          pri_rs->vnet_hdr_len,
+                         false,
                          false);
     } else {
         /* compare packet in the specified connection */
@@ -1093,7 +1168,7 @@  static void compare_notify_rs_finalize(SocketReadState *notify_rs)
     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
                            notify_rs->buf,
                            notify_rs->packet_len)) {
-        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
         if (ret < 0) {
             error_report("Notify Xen COLO-frame INIT failed");
         }
@@ -1199,6 +1274,18 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
 
     QTAILQ_INSERT_TAIL(&net_compares, s, next);
 
+    s->out_sendco.s = s;
+    s->out_sendco.chr = &s->chr_out;
+    s->out_sendco.notify_remote_frame = false;
+    s->out_sendco.done = true;
+    g_queue_init(&s->out_sendco.send_list);
+
+    s->notify_sendco.s = s;
+    s->notify_sendco.chr = &s->chr_notify_dev;
+    s->notify_sendco.notify_remote_frame = true;
+    s->notify_sendco.done = true;
+    g_queue_init(&s->notify_sendco.send_list);
+
     g_queue_init(&s->conn_list);
 
     qemu_mutex_init(&event_mtx);
@@ -1225,8 +1312,9 @@  static void colo_flush_packets(void *opaque, void *user_data)
                          pkt->data,
                          pkt->size,
                          pkt->vnet_hdr_len,
-                         false);
-        packet_destroy(pkt, NULL);
+                         false,
+                         true);
+        packet_destroy_partial(pkt, NULL);
     }
     while (!g_queue_is_empty(&conn->secondary_list)) {
         pkt = g_queue_pop_head(&conn->secondary_list);
@@ -1301,10 +1389,19 @@  static void colo_compare_finalize(Object *obj)
         }
     }
 
+    AioContext *ctx = iothread_get_aio_context(s->iothread);
+    aio_context_acquire(ctx);
+    AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
+    AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
+    aio_context_release(ctx);
+
     /* Release all unhandled packets after compare thead exited */
     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+    AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
 
     g_queue_clear(&s->conn_list);
+    g_queue_clear(&s->out_sendco.send_list);
+    g_queue_clear(&s->notify_sendco.send_list);
 
     if (s->connection_track_table) {
         g_hash_table_destroy(s->connection_track_table);
diff --git a/net/colo.c b/net/colo.c
index 8196b35837..a6c66d829a 100644
--- a/net/colo.c
+++ b/net/colo.c
@@ -185,6 +185,13 @@  void packet_destroy(void *opaque, void *user_data)
     g_slice_free(Packet, pkt);
 }
 
+void packet_destroy_partial(void *opaque, void *user_data)
+{
+    Packet *pkt = opaque;
+
+    g_slice_free(Packet, pkt);
+}
+
 /*
  * Clear hashtable, stop this hash growing really huge
  */
diff --git a/net/colo.h b/net/colo.h
index 679314b1ca..573ab91785 100644
--- a/net/colo.h
+++ b/net/colo.h
@@ -102,5 +102,6 @@  bool connection_has_tracked(GHashTable *connection_track_table,
 void connection_hashtable_reset(GHashTable *connection_track_table);
 Packet *packet_new(const void *data, int size, int vnet_hdr_len);
 void packet_destroy(void *opaque, void *user_data);
+void packet_destroy_partial(void *opaque, void *user_data);
 
 #endif /* NET_COLO_H */