diff mbox series

[v3,3/6] net/colo-compare.c: Fix deadlock in compare_chr_send

Message ID 6f3906393aaaf0adf21d45a5bf7a41536c7de205.1587935686.git.lukasstraub2@web.de
State New
Headers show
Series colo-compare bugfixes | expand

Commit Message

Lukas Straub April 26, 2020, 9:18 p.m. UTC
The chr_out chardev is connected to a filter-redirector
running in the main loop. qemu_chr_fe_write_all might block
here in compare_chr_send if the (socket-)buffer is full.
If another filter-redirector in the main loop want's to
send data to chr_pri_in it might also block if the buffer
is full. This leads to a deadlock because both event loops
get blocked.

Fix this by converting compare_chr_send to a coroutine and
putting the packets in a send queue. Also create a new
function notify_chr_send, since that should be independend.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++-----------
 1 file changed, 130 insertions(+), 43 deletions(-)

Comments

Zhang, Chen April 27, 2020, 3:36 a.m. UTC | #1
> -----Original Message-----
> From: Lukas Straub <lukasstraub2@web.de>
> Sent: Monday, April 27, 2020 5:19 AM
> To: qemu-devel <qemu-devel@nongnu.org>
> Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian
> <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> <pbonzini@redhat.com>
> Subject: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
> 
> The chr_out chardev is connected to a filter-redirector running in the main
> loop. qemu_chr_fe_write_all might block here in compare_chr_send if the
> (socket-)buffer is full.
> If another filter-redirector in the main loop want's to send data to chr_pri_in
> it might also block if the buffer is full. This leads to a deadlock because both
> event loops get blocked.
> 
> Fix this by converting compare_chr_send to a coroutine and putting the
> packets in a send queue. Also create a new function notify_chr_send, since
> that should be independend.
> 
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> ---
>  net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++-------
> ----
>  1 file changed, 130 insertions(+), 43 deletions(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c index
> 1de4220fe2..ff6a740284 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -32,6 +32,9 @@
>  #include "migration/migration.h"
>  #include "util.h"
> 
> +#include "block/aio-wait.h"
> +#include "qemu/coroutine.h"
> +
>  #define TYPE_COLO_COMPARE "colo-compare"
>  #define COLO_COMPARE(obj) \
>      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6
> +80,20 @@ static int event_unhandled_count;
>   *                    |packet  |  |packet  +    |packet  | |packet  +
>   *                    +--------+  +--------+    +--------+ +--------+
>   */
> +
> +typedef struct SendCo {
> +    Coroutine *co;
> +    GQueue send_list;
> +    bool done;
> +    int ret;
> +} SendCo;
> +
> +typedef struct SendEntry {
> +    uint32_t size;
> +    uint32_t vnet_hdr_len;
> +    uint8_t buf[];
> +} SendEntry;
> +
>  typedef struct CompareState {
>      Object parent;
> 
> @@ -91,6 +108,7 @@ typedef struct CompareState {
>      SocketReadState pri_rs;
>      SocketReadState sec_rs;
>      SocketReadState notify_rs;
> +    SendCo sendco;
>      bool vnet_hdr;
>      uint32_t compare_timeout;
>      uint32_t expired_scan_cycle;
> @@ -126,8 +144,11 @@ enum {
>  static int compare_chr_send(CompareState *s,
>                              const uint8_t *buf,
>                              uint32_t size,
> -                            uint32_t vnet_hdr_len,
> -                            bool notify_remote_frame);
> +                            uint32_t vnet_hdr_len);
> +
> +static int notify_chr_send(CompareState *s,
> +                           const uint8_t *buf,
> +                           uint32_t size);
> 
>  static bool packet_matches_str(const char *str,
>                                 const uint8_t *buf, @@ -145,7 +166,7 @@ static void
> notify_remote_frame(CompareState *s)
>      char msg[] = "DO_CHECKPOINT";
>      int ret = 0;
> 
> -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> +    ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
>      if (ret < 0) {
>          error_report("Notify Xen COLO-frame failed");
>      }
> @@ -271,8 +292,7 @@ static void colo_release_primary_pkt(CompareState
> *s, Packet *pkt)
>      ret = compare_chr_send(s,
>                             pkt->data,
>                             pkt->size,
> -                           pkt->vnet_hdr_len,
> -                           false);
> +                           pkt->vnet_hdr_len);
>      if (ret < 0) {
>          error_report("colo send primary packet failed");
>      }
> @@ -699,63 +719,123 @@ static void colo_compare_connection(void
> *opaque, void *user_data)
>      }
>  }
> 
> -static int compare_chr_send(CompareState *s,
> -                            const uint8_t *buf,
> -                            uint32_t size,
> -                            uint32_t vnet_hdr_len,
> -                            bool notify_remote_frame)
> +static void coroutine_fn _compare_chr_send(void *opaque)
>  {
> +    CompareState *s = opaque;
> +    SendCo *sendco = &s->sendco;
>      int ret = 0;
> -    uint32_t len = htonl(size);
> 
> -    if (!size) {
> -        return 0;
> -    }
> +    while (!g_queue_is_empty(&sendco->send_list)) {
> +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> +        uint32_t len = htonl(entry->size);
> 
> -    if (notify_remote_frame) {
> -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> -                                    (uint8_t *)&len,
> -                                    sizeof(len));
> -    } else {
>          ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> -    }
> 
> -    if (ret != sizeof(len)) {
> -        goto err;
> -    }
> +        if (ret != sizeof(len)) {
> +            g_free(entry);
> +            goto err;
> +        }
> 
> -    if (s->vnet_hdr) {
> -        /*
> -         * We send vnet header len make other module(like filter-redirector)
> -         * know how to parse net packet correctly.
> -         */
> -        len = htonl(vnet_hdr_len);
> +        if (s->vnet_hdr) {
> +            /*
> +             * We send vnet header len make other module(like filter-redirector)
> +             * know how to parse net packet correctly.
> +             */
> +            len = htonl(entry->vnet_hdr_len);
> 
> -        if (!notify_remote_frame) {
>              ret = qemu_chr_fe_write_all(&s->chr_out,
>                                          (uint8_t *)&len,
>                                          sizeof(len));
> +
> +            if (ret != sizeof(len)) {
> +                g_free(entry);
> +                goto err;
> +            }
>          }
> 
> -        if (ret != sizeof(len)) {
> +        ret = qemu_chr_fe_write_all(&s->chr_out,
> +                                    (uint8_t *)entry->buf,
> +                                    entry->size);
> +
> +        if (ret != entry->size) {
> +            g_free(entry);
>              goto err;
>          }
> +
> +        g_free(entry);
>      }
> 
> -    if (notify_remote_frame) {
> -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> -                                    (uint8_t *)buf,
> -                                    size);
> -    } else {
> -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> +    sendco->ret = 0;
> +    goto out;
> +
> +err:
> +    while (!g_queue_is_empty(&sendco->send_list)) {
> +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> +        g_free(entry);
>      }
> +    sendco->ret = ret < 0 ? ret : -EIO;
> +out:
> +    sendco->co = NULL;
> +    sendco->done = true;
> +    aio_wait_kick();
> +}
> +
> +static int compare_chr_send(CompareState *s,
> +                            const uint8_t *buf,
> +                            uint32_t size,
> +                            uint32_t vnet_hdr_len) {
> +    SendCo *sendco = &s->sendco;
> +    SendEntry *entry;
> +
> +    if (!size) {
> +        return 0;
> +    }
> +
> +    entry = g_malloc(sizeof(SendEntry) + size);
> +    entry->size = size;
> +    entry->vnet_hdr_len = vnet_hdr_len;
> +    memcpy(entry->buf, buf, size);
> +    g_queue_push_head(&sendco->send_list, entry);
> +
> +    if (sendco->done) {
> +        sendco->co = qemu_coroutine_create(_compare_chr_send, s);
> +        sendco->done = false;
> +        qemu_coroutine_enter(sendco->co);
> +        if (sendco->done) {
> +            /* report early errors */
> +            return sendco->ret;
> +        }
> +    }
> +
> +    /* assume success */
> +    return 0;
> +}
> +

Why not make notify_chr_send same as compare_chr_send?

Thanks
Zhang Chen

> +static int notify_chr_send(CompareState *s,
> +                           const uint8_t *buf,
> +                           uint32_t size) {
> +    int ret = 0;
> +    uint32_t len = htonl(size);
> +
> +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> +                            (uint8_t *)&len,
> +                            sizeof(len));
> +
> +    if (ret != sizeof(len)) {
> +        goto err;
> +    }
> +
> +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> +                                (uint8_t *)buf,
> +                                size);
> 
>      if (ret != size) {
>          goto err;
>      }
> 
>      return 0;
> -
>  err:
>      return ret < 0 ? ret : -EIO;
>  }
> @@ -1062,8 +1142,7 @@ static void
> compare_pri_rs_finalize(SocketReadState *pri_rs)
>          compare_chr_send(s,
>                           pri_rs->buf,
>                           pri_rs->packet_len,
> -                         pri_rs->vnet_hdr_len,
> -                         false);
> +                         pri_rs->vnet_hdr_len);
>      } else {
>          /* compare packet in the specified connection */
>          colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@ static void
> compare_notify_rs_finalize(SocketReadState *notify_rs)
>      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
>                             notify_rs->buf,
>                             notify_rs->packet_len)) {
> -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> +        ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
>          if (ret < 0) {
>              error_report("Notify Xen COLO-frame INIT failed");
>          }
> @@ -1199,6 +1278,9 @@ static void colo_compare_complete(UserCreatable
> *uc, Error **errp)
> 
>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> 
> +    s->sendco.done = true;
> +    g_queue_init(&s->sendco.send_list);
> +
>      g_queue_init(&s->conn_list);
> 
>      qemu_mutex_init(&event_mtx);
> @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, void
> *user_data)
>          compare_chr_send(s,
>                           pkt->data,
>                           pkt->size,
> -                         pkt->vnet_hdr_len,
> -                         false);
> +                         pkt->vnet_hdr_len);
>          packet_destroy(pkt, NULL);
>      }
>      while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6
> +1362,11 @@ static void colo_compare_finalize(Object *obj)
>      CompareState *s = COLO_COMPARE(obj);
>      CompareState *tmp = NULL;
> 
> +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> +    aio_context_acquire(ctx);
> +    AIO_WAIT_WHILE(ctx, !s->sendco.done);
> +    aio_context_release(ctx);
> +
>      qemu_chr_fe_deinit(&s->chr_pri_in, false);
>      qemu_chr_fe_deinit(&s->chr_sec_in, false);
>      qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@ static
> void colo_compare_finalize(Object *obj)
>      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> 
>      g_queue_clear(&s->conn_list);
> +    g_queue_clear(&s->sendco.send_list);
> 
>      if (s->connection_track_table) {
>          g_hash_table_destroy(s->connection_track_table);
> --
> 2.20.1
Lukas Straub April 27, 2020, 7:22 a.m. UTC | #2
On Mon, 27 Apr 2020 03:36:57 +0000
"Zhang, Chen" <chen.zhang@intel.com> wrote:

> > -----Original Message-----
> > From: Lukas Straub <lukasstraub2@web.de>
> > Sent: Monday, April 27, 2020 5:19 AM
> > To: qemu-devel <qemu-devel@nongnu.org>
> > Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian
> > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > <pbonzini@redhat.com>
> > Subject: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> > compare_chr_send
> > 
> > The chr_out chardev is connected to a filter-redirector running in the main
> > loop. qemu_chr_fe_write_all might block here in compare_chr_send if the
> > (socket-)buffer is full.
> > If another filter-redirector in the main loop want's to send data to chr_pri_in
> > it might also block if the buffer is full. This leads to a deadlock because both
> > event loops get blocked.
> > 
> > Fix this by converting compare_chr_send to a coroutine and putting the
> > packets in a send queue. Also create a new function notify_chr_send, since
> > that should be independend.
> > 
> > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> > ---
> >  net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++-------
> > ----
> >  1 file changed, 130 insertions(+), 43 deletions(-)
> > 
> > diff --git a/net/colo-compare.c b/net/colo-compare.c index
> > 1de4220fe2..ff6a740284 100644
> > --- a/net/colo-compare.c
> > +++ b/net/colo-compare.c
> > @@ -32,6 +32,9 @@
> >  #include "migration/migration.h"
> >  #include "util.h"
> > 
> > +#include "block/aio-wait.h"
> > +#include "qemu/coroutine.h"
> > +
> >  #define TYPE_COLO_COMPARE "colo-compare"
> >  #define COLO_COMPARE(obj) \
> >      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -77,6
> > +80,20 @@ static int event_unhandled_count;
> >   *                    |packet  |  |packet  +    |packet  | |packet  +
> >   *                    +--------+  +--------+    +--------+ +--------+
> >   */
> > +
> > +typedef struct SendCo {
> > +    Coroutine *co;
> > +    GQueue send_list;
> > +    bool done;
> > +    int ret;
> > +} SendCo;
> > +
> > +typedef struct SendEntry {
> > +    uint32_t size;
> > +    uint32_t vnet_hdr_len;
> > +    uint8_t buf[];
> > +} SendEntry;
> > +
> >  typedef struct CompareState {
> >      Object parent;
> > 
> > @@ -91,6 +108,7 @@ typedef struct CompareState {
> >      SocketReadState pri_rs;
> >      SocketReadState sec_rs;
> >      SocketReadState notify_rs;
> > +    SendCo sendco;
> >      bool vnet_hdr;
> >      uint32_t compare_timeout;
> >      uint32_t expired_scan_cycle;
> > @@ -126,8 +144,11 @@ enum {
> >  static int compare_chr_send(CompareState *s,
> >                              const uint8_t *buf,
> >                              uint32_t size,
> > -                            uint32_t vnet_hdr_len,
> > -                            bool notify_remote_frame);
> > +                            uint32_t vnet_hdr_len);
> > +
> > +static int notify_chr_send(CompareState *s,
> > +                           const uint8_t *buf,
> > +                           uint32_t size);
> > 
> >  static bool packet_matches_str(const char *str,
> >                                 const uint8_t *buf, @@ -145,7 +166,7 @@ static void
> > notify_remote_frame(CompareState *s)
> >      char msg[] = "DO_CHECKPOINT";
> >      int ret = 0;
> > 
> > -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > +    ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> >      if (ret < 0) {
> >          error_report("Notify Xen COLO-frame failed");
> >      }
> > @@ -271,8 +292,7 @@ static void colo_release_primary_pkt(CompareState
> > *s, Packet *pkt)
> >      ret = compare_chr_send(s,
> >                             pkt->data,
> >                             pkt->size,
> > -                           pkt->vnet_hdr_len,
> > -                           false);
> > +                           pkt->vnet_hdr_len);
> >      if (ret < 0) {
> >          error_report("colo send primary packet failed");
> >      }
> > @@ -699,63 +719,123 @@ static void colo_compare_connection(void
> > *opaque, void *user_data)
> >      }
> >  }
> > 
> > -static int compare_chr_send(CompareState *s,
> > -                            const uint8_t *buf,
> > -                            uint32_t size,
> > -                            uint32_t vnet_hdr_len,
> > -                            bool notify_remote_frame)
> > +static void coroutine_fn _compare_chr_send(void *opaque)
> >  {
> > +    CompareState *s = opaque;
> > +    SendCo *sendco = &s->sendco;
> >      int ret = 0;
> > -    uint32_t len = htonl(size);
> > 
> > -    if (!size) {
> > -        return 0;
> > -    }
> > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > +        uint32_t len = htonl(entry->size);
> > 
> > -    if (notify_remote_frame) {
> > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > -                                    (uint8_t *)&len,
> > -                                    sizeof(len));
> > -    } else {
> >          ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
> > -    }
> > 
> > -    if (ret != sizeof(len)) {
> > -        goto err;
> > -    }
> > +        if (ret != sizeof(len)) {
> > +            g_free(entry);
> > +            goto err;
> > +        }
> > 
> > -    if (s->vnet_hdr) {
> > -        /*
> > -         * We send vnet header len make other module(like filter-redirector)
> > -         * know how to parse net packet correctly.
> > -         */
> > -        len = htonl(vnet_hdr_len);
> > +        if (s->vnet_hdr) {
> > +            /*
> > +             * We send vnet header len make other module(like filter-redirector)
> > +             * know how to parse net packet correctly.
> > +             */
> > +            len = htonl(entry->vnet_hdr_len);
> > 
> > -        if (!notify_remote_frame) {
> >              ret = qemu_chr_fe_write_all(&s->chr_out,
> >                                          (uint8_t *)&len,
> >                                          sizeof(len));
> > +
> > +            if (ret != sizeof(len)) {
> > +                g_free(entry);
> > +                goto err;
> > +            }
> >          }
> > 
> > -        if (ret != sizeof(len)) {
> > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > +                                    (uint8_t *)entry->buf,
> > +                                    entry->size);
> > +
> > +        if (ret != entry->size) {
> > +            g_free(entry);
> >              goto err;
> >          }
> > +
> > +        g_free(entry);
> >      }
> > 
> > -    if (notify_remote_frame) {
> > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > -                                    (uint8_t *)buf,
> > -                                    size);
> > -    } else {
> > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > +    sendco->ret = 0;
> > +    goto out;
> > +
> > +err:
> > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > +        g_free(entry);
> >      }
> > +    sendco->ret = ret < 0 ? ret : -EIO;
> > +out:
> > +    sendco->co = NULL;
> > +    sendco->done = true;
> > +    aio_wait_kick();
> > +}
> > +
> > +static int compare_chr_send(CompareState *s,
> > +                            const uint8_t *buf,
> > +                            uint32_t size,
> > +                            uint32_t vnet_hdr_len) {
> > +    SendCo *sendco = &s->sendco;
> > +    SendEntry *entry;
> > +
> > +    if (!size) {
> > +        return 0;
> > +    }
> > +
> > +    entry = g_malloc(sizeof(SendEntry) + size);
> > +    entry->size = size;
> > +    entry->vnet_hdr_len = vnet_hdr_len;
> > +    memcpy(entry->buf, buf, size);
> > +    g_queue_push_head(&sendco->send_list, entry);
> > +
> > +    if (sendco->done) {
> > +        sendco->co = qemu_coroutine_create(_compare_chr_send, s);
> > +        sendco->done = false;
> > +        qemu_coroutine_enter(sendco->co);
> > +        if (sendco->done) {
> > +            /* report early errors */
> > +            return sendco->ret;
> > +        }
> > +    }
> > +
> > +    /* assume success */
> > +    return 0;
> > +}
> > +  
> 
> Why not make notify_chr_send same as compare_chr_send?

Hello,
The notify chardev_dev is not affected from this deadlock issue and is independent from the outdev chardev. So it wouldn't make sense for notify messages to wait in the queue if the outdev chardev is blocked. Also, the code is easier to understand this way.

Regards,
Lukas Straub

> Thanks
> Zhang Chen
> 
> > +static int notify_chr_send(CompareState *s,
> > +                           const uint8_t *buf,
> > +                           uint32_t size) {
> > +    int ret = 0;
> > +    uint32_t len = htonl(size);
> > +
> > +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > +                            (uint8_t *)&len,
> > +                            sizeof(len));
> > +
> > +    if (ret != sizeof(len)) {
> > +        goto err;
> > +    }
> > +
> > +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > +                                (uint8_t *)buf,
> > +                                size);
> > 
> >      if (ret != size) {
> >          goto err;
> >      }
> > 
> >      return 0;
> > -
> >  err:
> >      return ret < 0 ? ret : -EIO;
> >  }
> > @@ -1062,8 +1142,7 @@ static void
> > compare_pri_rs_finalize(SocketReadState *pri_rs)
> >          compare_chr_send(s,
> >                           pri_rs->buf,
> >                           pri_rs->packet_len,
> > -                         pri_rs->vnet_hdr_len,
> > -                         false);
> > +                         pri_rs->vnet_hdr_len);
> >      } else {
> >          /* compare packet in the specified connection */
> >          colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@ static void
> > compare_notify_rs_finalize(SocketReadState *notify_rs)
> >      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
> >                             notify_rs->buf,
> >                             notify_rs->packet_len)) {
> > -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > +        ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> >          if (ret < 0) {
> >              error_report("Notify Xen COLO-frame INIT failed");
> >          }
> > @@ -1199,6 +1278,9 @@ static void colo_compare_complete(UserCreatable
> > *uc, Error **errp)
> > 
> >      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> > 
> > +    s->sendco.done = true;
> > +    g_queue_init(&s->sendco.send_list);
> > +
> >      g_queue_init(&s->conn_list);
> > 
> >      qemu_mutex_init(&event_mtx);
> > @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque, void
> > *user_data)
> >          compare_chr_send(s,
> >                           pkt->data,
> >                           pkt->size,
> > -                         pkt->vnet_hdr_len,
> > -                         false);
> > +                         pkt->vnet_hdr_len);
> >          packet_destroy(pkt, NULL);
> >      }
> >      while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6
> > +1362,11 @@ static void colo_compare_finalize(Object *obj)
> >      CompareState *s = COLO_COMPARE(obj);
> >      CompareState *tmp = NULL;
> > 
> > +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> > +    aio_context_acquire(ctx);
> > +    AIO_WAIT_WHILE(ctx, !s->sendco.done);
> > +    aio_context_release(ctx);
> > +
> >      qemu_chr_fe_deinit(&s->chr_pri_in, false);
> >      qemu_chr_fe_deinit(&s->chr_sec_in, false);
> >      qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@ static
> > void colo_compare_finalize(Object *obj)
> >      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > 
> >      g_queue_clear(&s->conn_list);
> > +    g_queue_clear(&s->sendco.send_list);
> > 
> >      if (s->connection_track_table) {
> >          g_hash_table_destroy(s->connection_track_table);
> > --
> > 2.20.1  
>
Zhang, Chen April 29, 2020, 5:37 a.m. UTC | #3
> -----Original Message-----
> From: Lukas Straub <lukasstraub2@web.de>
> Sent: Monday, April 27, 2020 3:22 PM
> To: Zhang, Chen <chen.zhang@intel.com>
> Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> <pbonzini@redhat.com>
> Subject: Re: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> compare_chr_send
> 
> On Mon, 27 Apr 2020 03:36:57 +0000
> "Zhang, Chen" <chen.zhang@intel.com> wrote:
> 
> > > -----Original Message-----
> > > From: Lukas Straub <lukasstraub2@web.de>
> > > Sent: Monday, April 27, 2020 5:19 AM
> > > To: qemu-devel <qemu-devel@nongnu.org>
> > > Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian
> > > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > > <pbonzini@redhat.com>
> > > Subject: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> > > compare_chr_send
> > >
> > > The chr_out chardev is connected to a filter-redirector running in
> > > the main loop. qemu_chr_fe_write_all might block here in
> > > compare_chr_send if the (socket-)buffer is full.
> > > If another filter-redirector in the main loop want's to send data to
> > > chr_pri_in it might also block if the buffer is full. This leads to
> > > a deadlock because both event loops get blocked.
> > >
> > > Fix this by converting compare_chr_send to a coroutine and putting
> > > the packets in a send queue. Also create a new function
> > > notify_chr_send, since that should be independend.
> > >
> > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> > > ---
> > >  net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++---
> ----
> > > ----
> > >  1 file changed, 130 insertions(+), 43 deletions(-)
> > >
> > > diff --git a/net/colo-compare.c b/net/colo-compare.c index
> > > 1de4220fe2..ff6a740284 100644
> > > --- a/net/colo-compare.c
> > > +++ b/net/colo-compare.c
> > > @@ -32,6 +32,9 @@
> > >  #include "migration/migration.h"
> > >  #include "util.h"
> > >
> > > +#include "block/aio-wait.h"
> > > +#include "qemu/coroutine.h"
> > > +
> > >  #define TYPE_COLO_COMPARE "colo-compare"
> > >  #define COLO_COMPARE(obj) \
> > >      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -
> 77,6
> > > +80,20 @@ static int event_unhandled_count;
> > >   *                    |packet  |  |packet  +    |packet  | |packet  +
> > >   *                    +--------+  +--------+    +--------+ +--------+
> > >   */
> > > +
> > > +typedef struct SendCo {
> > > +    Coroutine *co;
> > > +    GQueue send_list;
> > > +    bool done;
> > > +    int ret;
> > > +} SendCo;
> > > +
> > > +typedef struct SendEntry {
> > > +    uint32_t size;
> > > +    uint32_t vnet_hdr_len;
> > > +    uint8_t buf[];
> > > +} SendEntry;
> > > +
> > >  typedef struct CompareState {
> > >      Object parent;
> > >
> > > @@ -91,6 +108,7 @@ typedef struct CompareState {
> > >      SocketReadState pri_rs;
> > >      SocketReadState sec_rs;
> > >      SocketReadState notify_rs;
> > > +    SendCo sendco;
> > >      bool vnet_hdr;
> > >      uint32_t compare_timeout;
> > >      uint32_t expired_scan_cycle;
> > > @@ -126,8 +144,11 @@ enum {
> > >  static int compare_chr_send(CompareState *s,
> > >                              const uint8_t *buf,
> > >                              uint32_t size,
> > > -                            uint32_t vnet_hdr_len,
> > > -                            bool notify_remote_frame);
> > > +                            uint32_t vnet_hdr_len);
> > > +
> > > +static int notify_chr_send(CompareState *s,
> > > +                           const uint8_t *buf,
> > > +                           uint32_t size);
> > >
> > >  static bool packet_matches_str(const char *str,
> > >                                 const uint8_t *buf, @@ -145,7 +166,7
> > > @@ static void notify_remote_frame(CompareState *s)
> > >      char msg[] = "DO_CHECKPOINT";
> > >      int ret = 0;
> > >
> > > -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > > +    ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> > >      if (ret < 0) {
> > >          error_report("Notify Xen COLO-frame failed");
> > >      }
> > > @@ -271,8 +292,7 @@ static void
> > > colo_release_primary_pkt(CompareState
> > > *s, Packet *pkt)
> > >      ret = compare_chr_send(s,
> > >                             pkt->data,
> > >                             pkt->size,
> > > -                           pkt->vnet_hdr_len,
> > > -                           false);
> > > +                           pkt->vnet_hdr_len);
> > >      if (ret < 0) {
> > >          error_report("colo send primary packet failed");
> > >      }
> > > @@ -699,63 +719,123 @@ static void colo_compare_connection(void
> > > *opaque, void *user_data)
> > >      }
> > >  }
> > >
> > > -static int compare_chr_send(CompareState *s,
> > > -                            const uint8_t *buf,
> > > -                            uint32_t size,
> > > -                            uint32_t vnet_hdr_len,
> > > -                            bool notify_remote_frame)
> > > +static void coroutine_fn _compare_chr_send(void *opaque)
> > >  {
> > > +    CompareState *s = opaque;
> > > +    SendCo *sendco = &s->sendco;
> > >      int ret = 0;
> > > -    uint32_t len = htonl(size);
> > >
> > > -    if (!size) {
> > > -        return 0;
> > > -    }
> > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > +        uint32_t len = htonl(entry->size);
> > >
> > > -    if (notify_remote_frame) {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > -                                    (uint8_t *)&len,
> > > -                                    sizeof(len));
> > > -    } else {
> > >          ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,
> sizeof(len));
> > > -    }
> > >
> > > -    if (ret != sizeof(len)) {
> > > -        goto err;
> > > -    }
> > > +        if (ret != sizeof(len)) {
> > > +            g_free(entry);
> > > +            goto err;
> > > +        }
> > >
> > > -    if (s->vnet_hdr) {
> > > -        /*
> > > -         * We send vnet header len make other module(like filter-redirector)
> > > -         * know how to parse net packet correctly.
> > > -         */
> > > -        len = htonl(vnet_hdr_len);
> > > +        if (s->vnet_hdr) {
> > > +            /*
> > > +             * We send vnet header len make other module(like filter-
> redirector)
> > > +             * know how to parse net packet correctly.
> > > +             */
> > > +            len = htonl(entry->vnet_hdr_len);
> > >
> > > -        if (!notify_remote_frame) {
> > >              ret = qemu_chr_fe_write_all(&s->chr_out,
> > >                                          (uint8_t *)&len,
> > >                                          sizeof(len));
> > > +
> > > +            if (ret != sizeof(len)) {
> > > +                g_free(entry);
> > > +                goto err;
> > > +            }
> > >          }
> > >
> > > -        if (ret != sizeof(len)) {
> > > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > > +                                    (uint8_t *)entry->buf,
> > > +                                    entry->size);
> > > +
> > > +        if (ret != entry->size) {
> > > +            g_free(entry);
> > >              goto err;
> > >          }
> > > +
> > > +        g_free(entry);
> > >      }
> > >
> > > -    if (notify_remote_frame) {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > -                                    (uint8_t *)buf,
> > > -                                    size);
> > > -    } else {
> > > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > > +    sendco->ret = 0;
> > > +    goto out;
> > > +
> > > +err:
> > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > +        g_free(entry);
> > >      }
> > > +    sendco->ret = ret < 0 ? ret : -EIO;
> > > +out:
> > > +    sendco->co = NULL;
> > > +    sendco->done = true;
> > > +    aio_wait_kick();
> > > +}
> > > +
> > > +static int compare_chr_send(CompareState *s,
> > > +                            const uint8_t *buf,
> > > +                            uint32_t size,
> > > +                            uint32_t vnet_hdr_len) {
> > > +    SendCo *sendco = &s->sendco;
> > > +    SendEntry *entry;
> > > +
> > > +    if (!size) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    entry = g_malloc(sizeof(SendEntry) + size);
> > > +    entry->size = size;
> > > +    entry->vnet_hdr_len = vnet_hdr_len;
> > > +    memcpy(entry->buf, buf, size);
> > > +    g_queue_push_head(&sendco->send_list, entry);
> > > +
> > > +    if (sendco->done) {
> > > +        sendco->co = qemu_coroutine_create(_compare_chr_send, s);
> > > +        sendco->done = false;
> > > +        qemu_coroutine_enter(sendco->co);
> > > +        if (sendco->done) {
> > > +            /* report early errors */
> > > +            return sendco->ret;
> > > +        }
> > > +    }
> > > +
> > > +    /* assume success */
> > > +    return 0;
> > > +}
> > > +
> >
> > Why not make notify_chr_send same as compare_chr_send?
> 
> Hello,
> The notify chardev_dev is not affected from this deadlock issue and is
> independent from the outdev chardev. So it wouldn't make sense for notify
> messages to wait in the queue if the outdev chardev is blocked. Also, the
> code is easier to understand this way.
> 

Yes, I means maybe the deadlock issue will also occur in Xen COLO side, we can resolve the potential problem here.

Thanks
Zhang Chen

> Regards,
> Lukas Straub
> 
> > Thanks
> > Zhang Chen
> >
> > > +static int notify_chr_send(CompareState *s,
> > > +                           const uint8_t *buf,
> > > +                           uint32_t size) {
> > > +    int ret = 0;
> > > +    uint32_t len = htonl(size);
> > > +
> > > +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > +                            (uint8_t *)&len,
> > > +                            sizeof(len));
> > > +
> > > +    if (ret != sizeof(len)) {
> > > +        goto err;
> > > +    }
> > > +
> > > +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > +                                (uint8_t *)buf,
> > > +                                size);
> > >
> > >      if (ret != size) {
> > >          goto err;
> > >      }
> > >
> > >      return 0;
> > > -
> > >  err:
> > >      return ret < 0 ? ret : -EIO;
> > >  }
> > > @@ -1062,8 +1142,7 @@ static void
> > > compare_pri_rs_finalize(SocketReadState *pri_rs)
> > >          compare_chr_send(s,
> > >                           pri_rs->buf,
> > >                           pri_rs->packet_len,
> > > -                         pri_rs->vnet_hdr_len,
> > > -                         false);
> > > +                         pri_rs->vnet_hdr_len);
> > >      } else {
> > >          /* compare packet in the specified connection */
> > >          colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@
> > > static void compare_notify_rs_finalize(SocketReadState *notify_rs)
> > >      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
> > >                             notify_rs->buf,
> > >                             notify_rs->packet_len)) {
> > > -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > > +        ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> > >          if (ret < 0) {
> > >              error_report("Notify Xen COLO-frame INIT failed");
> > >          }
> > > @@ -1199,6 +1278,9 @@ static void
> > > colo_compare_complete(UserCreatable
> > > *uc, Error **errp)
> > >
> > >      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> > >
> > > +    s->sendco.done = true;
> > > +    g_queue_init(&s->sendco.send_list);
> > > +
> > >      g_queue_init(&s->conn_list);
> > >
> > >      qemu_mutex_init(&event_mtx);
> > > @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque,
> > > void
> > > *user_data)
> > >          compare_chr_send(s,
> > >                           pkt->data,
> > >                           pkt->size,
> > > -                         pkt->vnet_hdr_len,
> > > -                         false);
> > > +                         pkt->vnet_hdr_len);
> > >          packet_destroy(pkt, NULL);
> > >      }
> > >      while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6
> > > +1362,11 @@ static void colo_compare_finalize(Object *obj)
> > >      CompareState *s = COLO_COMPARE(obj);
> > >      CompareState *tmp = NULL;
> > >
> > > +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> > > +    aio_context_acquire(ctx);
> > > +    AIO_WAIT_WHILE(ctx, !s->sendco.done);
> > > +    aio_context_release(ctx);
> > > +
> > >      qemu_chr_fe_deinit(&s->chr_pri_in, false);
> > >      qemu_chr_fe_deinit(&s->chr_sec_in, false);
> > >      qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@
> > > static void colo_compare_finalize(Object *obj)
> > >      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > >
> > >      g_queue_clear(&s->conn_list);
> > > +    g_queue_clear(&s->sendco.send_list);
> > >
> > >      if (s->connection_track_table) {
> > >          g_hash_table_destroy(s->connection_track_table);
> > > --
> > > 2.20.1
> >
Lukas Straub April 29, 2020, 7:51 a.m. UTC | #4
On Wed, 29 Apr 2020 05:37:17 +0000
"Zhang, Chen" <chen.zhang@intel.com> wrote:

> > -----Original Message-----
> > From: Lukas Straub <lukasstraub2@web.de>
> > Sent: Monday, April 27, 2020 3:22 PM
> > To: Zhang, Chen <chen.zhang@intel.com>
> > Cc: qemu-devel <qemu-devel@nongnu.org>; Li Zhijian
> > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > <pbonzini@redhat.com>
> > Subject: Re: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> > compare_chr_send
> > 
> > On Mon, 27 Apr 2020 03:36:57 +0000
> > "Zhang, Chen" <chen.zhang@intel.com> wrote:
> >   
> > > > -----Original Message-----
> > > > From: Lukas Straub <lukasstraub2@web.de>
> > > > Sent: Monday, April 27, 2020 5:19 AM
> > > > To: qemu-devel <qemu-devel@nongnu.org>
> > > > Cc: Zhang, Chen <chen.zhang@intel.com>; Li Zhijian
> > > > <lizhijian@cn.fujitsu.com>; Jason Wang <jasowang@redhat.com>; Marc-
> > > > André Lureau <marcandre.lureau@redhat.com>; Paolo Bonzini
> > > > <pbonzini@redhat.com>
> > > > Subject: [PATCH v3 3/6] net/colo-compare.c: Fix deadlock in
> > > > compare_chr_send
> > > >
> > > > The chr_out chardev is connected to a filter-redirector running in
> > > > the main loop. qemu_chr_fe_write_all might block here in
> > > > compare_chr_send if the (socket-)buffer is full.
> > > > If another filter-redirector in the main loop want's to send data to
> > > > chr_pri_in it might also block if the buffer is full. This leads to
> > > > a deadlock because both event loops get blocked.
> > > >
> > > > Fix this by converting compare_chr_send to a coroutine and putting
> > > > the packets in a send queue. Also create a new function
> > > > notify_chr_send, since that should be independend.
> > > >
> > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>
> > > > ---
> > > >  net/colo-compare.c | 173 ++++++++++++++++++++++++++++++++++---  
> > ----  
> > > > ----
> > > >  1 file changed, 130 insertions(+), 43 deletions(-)
> > > >
> > > > diff --git a/net/colo-compare.c b/net/colo-compare.c index
> > > > 1de4220fe2..ff6a740284 100644
> > > > --- a/net/colo-compare.c
> > > > +++ b/net/colo-compare.c
> > > > @@ -32,6 +32,9 @@
> > > >  #include "migration/migration.h"
> > > >  #include "util.h"
> > > >
> > > > +#include "block/aio-wait.h"
> > > > +#include "qemu/coroutine.h"
> > > > +
> > > >  #define TYPE_COLO_COMPARE "colo-compare"
> > > >  #define COLO_COMPARE(obj) \
> > > >      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) @@ -  
> > 77,6  
> > > > +80,20 @@ static int event_unhandled_count;
> > > >   *                    |packet  |  |packet  +    |packet  | |packet  +
> > > >   *                    +--------+  +--------+    +--------+ +--------+
> > > >   */
> > > > +
> > > > +typedef struct SendCo {
> > > > +    Coroutine *co;
> > > > +    GQueue send_list;
> > > > +    bool done;
> > > > +    int ret;
> > > > +} SendCo;
> > > > +
> > > > +typedef struct SendEntry {
> > > > +    uint32_t size;
> > > > +    uint32_t vnet_hdr_len;
> > > > +    uint8_t buf[];
> > > > +} SendEntry;
> > > > +
> > > >  typedef struct CompareState {
> > > >      Object parent;
> > > >
> > > > @@ -91,6 +108,7 @@ typedef struct CompareState {
> > > >      SocketReadState pri_rs;
> > > >      SocketReadState sec_rs;
> > > >      SocketReadState notify_rs;
> > > > +    SendCo sendco;
> > > >      bool vnet_hdr;
> > > >      uint32_t compare_timeout;
> > > >      uint32_t expired_scan_cycle;
> > > > @@ -126,8 +144,11 @@ enum {
> > > >  static int compare_chr_send(CompareState *s,
> > > >                              const uint8_t *buf,
> > > >                              uint32_t size,
> > > > -                            uint32_t vnet_hdr_len,
> > > > -                            bool notify_remote_frame);
> > > > +                            uint32_t vnet_hdr_len);
> > > > +
> > > > +static int notify_chr_send(CompareState *s,
> > > > +                           const uint8_t *buf,
> > > > +                           uint32_t size);
> > > >
> > > >  static bool packet_matches_str(const char *str,
> > > >                                 const uint8_t *buf, @@ -145,7 +166,7
> > > > @@ static void notify_remote_frame(CompareState *s)
> > > >      char msg[] = "DO_CHECKPOINT";
> > > >      int ret = 0;
> > > >
> > > > -    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > > > +    ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> > > >      if (ret < 0) {
> > > >          error_report("Notify Xen COLO-frame failed");
> > > >      }
> > > > @@ -271,8 +292,7 @@ static void
> > > > colo_release_primary_pkt(CompareState
> > > > *s, Packet *pkt)
> > > >      ret = compare_chr_send(s,
> > > >                             pkt->data,
> > > >                             pkt->size,
> > > > -                           pkt->vnet_hdr_len,
> > > > -                           false);
> > > > +                           pkt->vnet_hdr_len);
> > > >      if (ret < 0) {
> > > >          error_report("colo send primary packet failed");
> > > >      }
> > > > @@ -699,63 +719,123 @@ static void colo_compare_connection(void
> > > > *opaque, void *user_data)
> > > >      }
> > > >  }
> > > >
> > > > -static int compare_chr_send(CompareState *s,
> > > > -                            const uint8_t *buf,
> > > > -                            uint32_t size,
> > > > -                            uint32_t vnet_hdr_len,
> > > > -                            bool notify_remote_frame)
> > > > +static void coroutine_fn _compare_chr_send(void *opaque)
> > > >  {
> > > > +    CompareState *s = opaque;
> > > > +    SendCo *sendco = &s->sendco;
> > > >      int ret = 0;
> > > > -    uint32_t len = htonl(size);
> > > >
> > > > -    if (!size) {
> > > > -        return 0;
> > > > -    }
> > > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > > +        uint32_t len = htonl(entry->size);
> > > >
> > > > -    if (notify_remote_frame) {
> > > > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > > -                                    (uint8_t *)&len,
> > > > -                                    sizeof(len));
> > > > -    } else {
> > > >          ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len,  
> > sizeof(len));  
> > > > -    }
> > > >
> > > > -    if (ret != sizeof(len)) {
> > > > -        goto err;
> > > > -    }
> > > > +        if (ret != sizeof(len)) {
> > > > +            g_free(entry);
> > > > +            goto err;
> > > > +        }
> > > >
> > > > -    if (s->vnet_hdr) {
> > > > -        /*
> > > > -         * We send vnet header len make other module(like filter-redirector)
> > > > -         * know how to parse net packet correctly.
> > > > -         */
> > > > -        len = htonl(vnet_hdr_len);
> > > > +        if (s->vnet_hdr) {
> > > > +            /*
> > > > +             * We send vnet header len make other module(like filter-  
> > redirector)  
> > > > +             * know how to parse net packet correctly.
> > > > +             */
> > > > +            len = htonl(entry->vnet_hdr_len);
> > > >
> > > > -        if (!notify_remote_frame) {
> > > >              ret = qemu_chr_fe_write_all(&s->chr_out,
> > > >                                          (uint8_t *)&len,
> > > >                                          sizeof(len));
> > > > +
> > > > +            if (ret != sizeof(len)) {
> > > > +                g_free(entry);
> > > > +                goto err;
> > > > +            }
> > > >          }
> > > >
> > > > -        if (ret != sizeof(len)) {
> > > > +        ret = qemu_chr_fe_write_all(&s->chr_out,
> > > > +                                    (uint8_t *)entry->buf,
> > > > +                                    entry->size);
> > > > +
> > > > +        if (ret != entry->size) {
> > > > +            g_free(entry);
> > > >              goto err;
> > > >          }
> > > > +
> > > > +        g_free(entry);
> > > >      }
> > > >
> > > > -    if (notify_remote_frame) {
> > > > -        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > > -                                    (uint8_t *)buf,
> > > > -                                    size);
> > > > -    } else {
> > > > -        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
> > > > +    sendco->ret = 0;
> > > > +    goto out;
> > > > +
> > > > +err:
> > > > +    while (!g_queue_is_empty(&sendco->send_list)) {
> > > > +        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
> > > > +        g_free(entry);
> > > >      }
> > > > +    sendco->ret = ret < 0 ? ret : -EIO;
> > > > +out:
> > > > +    sendco->co = NULL;
> > > > +    sendco->done = true;
> > > > +    aio_wait_kick();
> > > > +}
> > > > +
> > > > +static int compare_chr_send(CompareState *s,
> > > > +                            const uint8_t *buf,
> > > > +                            uint32_t size,
> > > > +                            uint32_t vnet_hdr_len) {
> > > > +    SendCo *sendco = &s->sendco;
> > > > +    SendEntry *entry;
> > > > +
> > > > +    if (!size) {
> > > > +        return 0;
> > > > +    }
> > > > +
> > > > +    entry = g_malloc(sizeof(SendEntry) + size);
> > > > +    entry->size = size;
> > > > +    entry->vnet_hdr_len = vnet_hdr_len;
> > > > +    memcpy(entry->buf, buf, size);
> > > > +    g_queue_push_head(&sendco->send_list, entry);
> > > > +
> > > > +    if (sendco->done) {
> > > > +        sendco->co = qemu_coroutine_create(_compare_chr_send, s);
> > > > +        sendco->done = false;
> > > > +        qemu_coroutine_enter(sendco->co);
> > > > +        if (sendco->done) {
> > > > +            /* report early errors */
> > > > +            return sendco->ret;
> > > > +        }
> > > > +    }
> > > > +
> > > > +    /* assume success */
> > > > +    return 0;
> > > > +}
> > > > +  
> > >
> > > Why not make notify_chr_send same as compare_chr_send?  
> > 
> > Hello,
> > The notify chardev_dev is not affected from this deadlock issue and is
> > independent from the outdev chardev. So it wouldn't make sense for notify
> > messages to wait in the queue if the outdev chardev is blocked. Also, the
> > code is easier to understand this way.
> >   
> 
> Yes, I means maybe the deadlock issue will also occur in Xen COLO side, we can resolve the potential problem here.

Ok,
I will change it in the next version.

> Thanks
> Zhang Chen
> 
> > Regards,
> > Lukas Straub
> >   
> > > Thanks
> > > Zhang Chen
> > >  
> > > > +static int notify_chr_send(CompareState *s,
> > > > +                           const uint8_t *buf,
> > > > +                           uint32_t size) {
> > > > +    int ret = 0;
> > > > +    uint32_t len = htonl(size);
> > > > +
> > > > +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > > +                            (uint8_t *)&len,
> > > > +                            sizeof(len));
> > > > +
> > > > +    if (ret != sizeof(len)) {
> > > > +        goto err;
> > > > +    }
> > > > +
> > > > +    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
> > > > +                                (uint8_t *)buf,
> > > > +                                size);
> > > >
> > > >      if (ret != size) {
> > > >          goto err;
> > > >      }
> > > >
> > > >      return 0;
> > > > -
> > > >  err:
> > > >      return ret < 0 ? ret : -EIO;
> > > >  }
> > > > @@ -1062,8 +1142,7 @@ static void
> > > > compare_pri_rs_finalize(SocketReadState *pri_rs)
> > > >          compare_chr_send(s,
> > > >                           pri_rs->buf,
> > > >                           pri_rs->packet_len,
> > > > -                         pri_rs->vnet_hdr_len,
> > > > -                         false);
> > > > +                         pri_rs->vnet_hdr_len);
> > > >      } else {
> > > >          /* compare packet in the specified connection */
> > > >          colo_compare_connection(conn, s); @@ -1093,7 +1172,7 @@
> > > > static void compare_notify_rs_finalize(SocketReadState *notify_rs)
> > > >      if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
> > > >                             notify_rs->buf,
> > > >                             notify_rs->packet_len)) {
> > > > -        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
> > > > +        ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
> > > >          if (ret < 0) {
> > > >              error_report("Notify Xen COLO-frame INIT failed");
> > > >          }
> > > > @@ -1199,6 +1278,9 @@ static void
> > > > colo_compare_complete(UserCreatable
> > > > *uc, Error **errp)
> > > >
> > > >      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> > > >
> > > > +    s->sendco.done = true;
> > > > +    g_queue_init(&s->sendco.send_list);
> > > > +
> > > >      g_queue_init(&s->conn_list);
> > > >
> > > >      qemu_mutex_init(&event_mtx);
> > > > @@ -1224,8 +1306,7 @@ static void colo_flush_packets(void *opaque,
> > > > void
> > > > *user_data)
> > > >          compare_chr_send(s,
> > > >                           pkt->data,
> > > >                           pkt->size,
> > > > -                         pkt->vnet_hdr_len,
> > > > -                         false);
> > > > +                         pkt->vnet_hdr_len);
> > > >          packet_destroy(pkt, NULL);
> > > >      }
> > > >      while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1281,6
> > > > +1362,11 @@ static void colo_compare_finalize(Object *obj)
> > > >      CompareState *s = COLO_COMPARE(obj);
> > > >      CompareState *tmp = NULL;
> > > >
> > > > +    AioContext *ctx = iothread_get_aio_context(s->iothread);
> > > > +    aio_context_acquire(ctx);
> > > > +    AIO_WAIT_WHILE(ctx, !s->sendco.done);
> > > > +    aio_context_release(ctx);
> > > > +
> > > >      qemu_chr_fe_deinit(&s->chr_pri_in, false);
> > > >      qemu_chr_fe_deinit(&s->chr_sec_in, false);
> > > >      qemu_chr_fe_deinit(&s->chr_out, false); @@ -1305,6 +1391,7 @@
> > > > static void colo_compare_finalize(Object *obj)
> > > >      g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > > >
> > > >      g_queue_clear(&s->conn_list);
> > > > +    g_queue_clear(&s->sendco.send_list);
> > > >
> > > >      if (s->connection_track_table) {
> > > >          g_hash_table_destroy(s->connection_track_table);
> > > > --
> > > > 2.20.1  
> > >  
>
diff mbox series

Patch

diff --git a/net/colo-compare.c b/net/colo-compare.c
index 1de4220fe2..ff6a740284 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -32,6 +32,9 @@ 
 #include "migration/migration.h"
 #include "util.h"
 
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
+
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -77,6 +80,20 @@  static int event_unhandled_count;
  *                    |packet  |  |packet  +    |packet  | |packet  +
  *                    +--------+  +--------+    +--------+ +--------+
  */
+
+typedef struct SendCo {
+    Coroutine *co;
+    GQueue send_list;
+    bool done;
+    int ret;
+} SendCo;
+
+typedef struct SendEntry {
+    uint32_t size;
+    uint32_t vnet_hdr_len;
+    uint8_t buf[];
+} SendEntry;
+
 typedef struct CompareState {
     Object parent;
 
@@ -91,6 +108,7 @@  typedef struct CompareState {
     SocketReadState pri_rs;
     SocketReadState sec_rs;
     SocketReadState notify_rs;
+    SendCo sendco;
     bool vnet_hdr;
     uint32_t compare_timeout;
     uint32_t expired_scan_cycle;
@@ -126,8 +144,11 @@  enum {
 static int compare_chr_send(CompareState *s,
                             const uint8_t *buf,
                             uint32_t size,
-                            uint32_t vnet_hdr_len,
-                            bool notify_remote_frame);
+                            uint32_t vnet_hdr_len);
+
+static int notify_chr_send(CompareState *s,
+                           const uint8_t *buf,
+                           uint32_t size);
 
 static bool packet_matches_str(const char *str,
                                const uint8_t *buf,
@@ -145,7 +166,7 @@  static void notify_remote_frame(CompareState *s)
     char msg[] = "DO_CHECKPOINT";
     int ret = 0;
 
-    ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+    ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
     if (ret < 0) {
         error_report("Notify Xen COLO-frame failed");
     }
@@ -271,8 +292,7 @@  static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
     ret = compare_chr_send(s,
                            pkt->data,
                            pkt->size,
-                           pkt->vnet_hdr_len,
-                           false);
+                           pkt->vnet_hdr_len);
     if (ret < 0) {
         error_report("colo send primary packet failed");
     }
@@ -699,63 +719,123 @@  static void colo_compare_connection(void *opaque, void *user_data)
     }
 }
 
-static int compare_chr_send(CompareState *s,
-                            const uint8_t *buf,
-                            uint32_t size,
-                            uint32_t vnet_hdr_len,
-                            bool notify_remote_frame)
+static void coroutine_fn _compare_chr_send(void *opaque)
 {
+    CompareState *s = opaque;
+    SendCo *sendco = &s->sendco;
     int ret = 0;
-    uint32_t len = htonl(size);
 
-    if (!size) {
-        return 0;
-    }
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        uint32_t len = htonl(entry->size);
 
-    if (notify_remote_frame) {
-        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
-                                    (uint8_t *)&len,
-                                    sizeof(len));
-    } else {
         ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
-    }
 
-    if (ret != sizeof(len)) {
-        goto err;
-    }
+        if (ret != sizeof(len)) {
+            g_free(entry);
+            goto err;
+        }
 
-    if (s->vnet_hdr) {
-        /*
-         * We send vnet header len make other module(like filter-redirector)
-         * know how to parse net packet correctly.
-         */
-        len = htonl(vnet_hdr_len);
+        if (s->vnet_hdr) {
+            /*
+             * We send vnet header len make other module(like filter-redirector)
+             * know how to parse net packet correctly.
+             */
+            len = htonl(entry->vnet_hdr_len);
 
-        if (!notify_remote_frame) {
             ret = qemu_chr_fe_write_all(&s->chr_out,
                                         (uint8_t *)&len,
                                         sizeof(len));
+
+            if (ret != sizeof(len)) {
+                g_free(entry);
+                goto err;
+            }
         }
 
-        if (ret != sizeof(len)) {
+        ret = qemu_chr_fe_write_all(&s->chr_out,
+                                    (uint8_t *)entry->buf,
+                                    entry->size);
+
+        if (ret != entry->size) {
+            g_free(entry);
             goto err;
         }
+
+        g_free(entry);
     }
 
-    if (notify_remote_frame) {
-        ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
-                                    (uint8_t *)buf,
-                                    size);
-    } else {
-        ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
+    sendco->ret = 0;
+    goto out;
+
+err:
+    while (!g_queue_is_empty(&sendco->send_list)) {
+        SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
+        g_free(entry);
     }
+    sendco->ret = ret < 0 ? ret : -EIO;
+out:
+    sendco->co = NULL;
+    sendco->done = true;
+    aio_wait_kick();
+}
+
+static int compare_chr_send(CompareState *s,
+                            const uint8_t *buf,
+                            uint32_t size,
+                            uint32_t vnet_hdr_len)
+{
+    SendCo *sendco = &s->sendco;
+    SendEntry *entry;
+
+    if (!size) {
+        return 0;
+    }
+
+    entry = g_malloc(sizeof(SendEntry) + size);
+    entry->size = size;
+    entry->vnet_hdr_len = vnet_hdr_len;
+    memcpy(entry->buf, buf, size);
+    g_queue_push_head(&sendco->send_list, entry);
+
+    if (sendco->done) {
+        sendco->co = qemu_coroutine_create(_compare_chr_send, s);
+        sendco->done = false;
+        qemu_coroutine_enter(sendco->co);
+        if (sendco->done) {
+            /* report early errors */
+            return sendco->ret;
+        }
+    }
+
+    /* assume success */
+    return 0;
+}
+
+static int notify_chr_send(CompareState *s,
+                           const uint8_t *buf,
+                           uint32_t size)
+{
+    int ret = 0;
+    uint32_t len = htonl(size);
+
+    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+                            (uint8_t *)&len,
+                            sizeof(len));
+
+    if (ret != sizeof(len)) {
+        goto err;
+    }
+
+    ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
+                                (uint8_t *)buf,
+                                size);
 
     if (ret != size) {
         goto err;
     }
 
     return 0;
-
 err:
     return ret < 0 ? ret : -EIO;
 }
@@ -1062,8 +1142,7 @@  static void compare_pri_rs_finalize(SocketReadState *pri_rs)
         compare_chr_send(s,
                          pri_rs->buf,
                          pri_rs->packet_len,
-                         pri_rs->vnet_hdr_len,
-                         false);
+                         pri_rs->vnet_hdr_len);
     } else {
         /* compare packet in the specified connection */
         colo_compare_connection(conn, s);
@@ -1093,7 +1172,7 @@  static void compare_notify_rs_finalize(SocketReadState *notify_rs)
     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
                            notify_rs->buf,
                            notify_rs->packet_len)) {
-        ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
+        ret = notify_chr_send(s, (uint8_t *)msg, strlen(msg));
         if (ret < 0) {
             error_report("Notify Xen COLO-frame INIT failed");
         }
@@ -1199,6 +1278,9 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
 
     QTAILQ_INSERT_TAIL(&net_compares, s, next);
 
+    s->sendco.done = true;
+    g_queue_init(&s->sendco.send_list);
+
     g_queue_init(&s->conn_list);
 
     qemu_mutex_init(&event_mtx);
@@ -1224,8 +1306,7 @@  static void colo_flush_packets(void *opaque, void *user_data)
         compare_chr_send(s,
                          pkt->data,
                          pkt->size,
-                         pkt->vnet_hdr_len,
-                         false);
+                         pkt->vnet_hdr_len);
         packet_destroy(pkt, NULL);
     }
     while (!g_queue_is_empty(&conn->secondary_list)) {
@@ -1281,6 +1362,11 @@  static void colo_compare_finalize(Object *obj)
     CompareState *s = COLO_COMPARE(obj);
     CompareState *tmp = NULL;
 
+    AioContext *ctx = iothread_get_aio_context(s->iothread);
+    aio_context_acquire(ctx);
+    AIO_WAIT_WHILE(ctx, !s->sendco.done);
+    aio_context_release(ctx);
+
     qemu_chr_fe_deinit(&s->chr_pri_in, false);
     qemu_chr_fe_deinit(&s->chr_sec_in, false);
     qemu_chr_fe_deinit(&s->chr_out, false);
@@ -1305,6 +1391,7 @@  static void colo_compare_finalize(Object *obj)
     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 
     g_queue_clear(&s->conn_list);
+    g_queue_clear(&s->sendco.send_list);
 
     if (s->connection_track_table) {
         g_hash_table_destroy(s->connection_track_table);