diff mbox series

[v3,2/2] net/colo-compare.c: handling of the full primary or secondary queue

Message ID 20200325094354.19677-3-dereksu@qnap.com
State New
Headers show
Series COLO: handling of the full primary or secondary queue | expand

Commit Message

Derek Su March 25, 2020, 9:43 a.m. UTC
The pervious handling of the full primary or queue is only dropping
the packet. If there are lots of clients to the guest VM,
the "drop" will lead to the lost of the networking connection
until next checkpoint.

To address the issue, this patch drops the packet firstly.
Then, send all queued primary packets, remove all queued secondary
packets and do checkpoint.

Signed-off-by: Derek Su <dereksu@qnap.com>
---
 net/colo-compare.c | 41 ++++++++++++++++++++++++++++++-----------
 1 file changed, 30 insertions(+), 11 deletions(-)

Comments

Lukas Straub March 27, 2020, 5:45 p.m. UTC | #1
On Wed, 25 Mar 2020 17:43:54 +0800
Derek Su <dereksu@qnap.com> wrote:

> The pervious handling of the full primary or queue is only dropping
> the packet. If there are lots of clients to the guest VM,
> the "drop" will lead to the lost of the networking connection
> until next checkpoint.
> 
> To address the issue, this patch drops the packet firstly.
> Then, send all queued primary packets, remove all queued secondary
> packets and do checkpoint.
> 
> Signed-off-by: Derek Su <dereksu@qnap.com>
> ---
>  net/colo-compare.c | 41 ++++++++++++++++++++++++++++++-----------
>  1 file changed, 30 insertions(+), 11 deletions(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index cdd87b2aa8..1a52f50fbe 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
>      [SECONDARY_IN] = "secondary",
>  };
>  
> +enum {
> +    QUEUE_INSERT_ERR = -1,
> +    QUEUE_INSERT_OK = 0,
> +    QUEUE_INSERT_FULL = 1,
> +};
> +
>  static int compare_chr_send(CompareState *s,
>                              const uint8_t *buf,
>                              uint32_t size,
> @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
>  }
>  
>  /*
> - * Return 0 on success, if return -1 means the pkt
> - * is unsupported(arp and ipv6) and will be sent later
> + * Return QUEUE_INSERT_OK on success.
> + * If return QUEUE_INSERT_FULL means list is full, and
> + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> + * will be sent later
>   */
>  static int packet_enqueue(CompareState *s, int mode, Connection **con)
>  {
> @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
>      if (parse_packet_early(pkt)) {
>          packet_destroy(pkt, NULL);
>          pkt = NULL;
> -        return -1;
> +        return QUEUE_INSERT_ERR;
>      }
>      fill_connection_key(pkt, &key);
>  
> @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
>                       "drop packet", colo_mode[mode]);
>          packet_destroy(pkt, NULL);
>          pkt = NULL;
> +        return QUEUE_INSERT_FULL;
>      }
>  
>      *con = conn;
>  
> -    return 0;
> +    return QUEUE_INSERT_OK;
>  }
>  
>  static inline bool after(uint32_t seq1, uint32_t seq2)
> @@ -995,17 +1004,22 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>  {
>      CompareState *s = container_of(pri_rs, CompareState, pri_rs);
>      Connection *conn = NULL;
> +    int ret;
>  
> -    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
> +    ret = packet_enqueue(s, PRIMARY_IN, &conn);
> +    if (ret == QUEUE_INSERT_OK) {
> +        /* compare packet in the specified connection */
> +        colo_compare_connection(conn, s);
> +    } else if (ret == QUEUE_INSERT_FULL) {
> +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> +        colo_compare_inconsistency_notify(s);
> +    } else {
>          trace_colo_compare_main("primary: unsupported packet in");
>          compare_chr_send(s,
>                           pri_rs->buf,
>                           pri_rs->packet_len,
>                           pri_rs->vnet_hdr_len,
>                           false);
> -    } else {
> -        /* compare packet in the specified connection */
> -        colo_compare_connection(conn, s);
>      }
>  }
>  
> @@ -1013,12 +1027,17 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>  {
>      CompareState *s = container_of(sec_rs, CompareState, sec_rs);
>      Connection *conn = NULL;
> +    int ret;
>  
> -    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
> -        trace_colo_compare_main("secondary: unsupported packet in");
> -    } else {
> +    ret = packet_enqueue(s, SECONDARY_IN, &conn);
> +    if (ret == QUEUE_INSERT_OK) {
>          /* compare packet in the specified connection */
>          colo_compare_connection(conn, s);
> +    } else if (ret == QUEUE_INSERT_FULL) {
> +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> +        colo_compare_inconsistency_notify(s);
> +    } else {
> +        trace_colo_compare_main("secondary: unsupported packet in");
>      }
>  }
>  

Hi,
I don't think we have to flush here because the (post-)checkpoint event will flush the packets for us.

Regards,
Lukas Straub
Derek Su March 27, 2020, 6:20 p.m. UTC | #2
Lukas Straub <lukasstraub2@web.de> 於 2020年3月28日 週六 上午1:46寫道:
>
> On Wed, 25 Mar 2020 17:43:54 +0800
> Derek Su <dereksu@qnap.com> wrote:
>
> > The pervious handling of the full primary or queue is only dropping
> > the packet. If there are lots of clients to the guest VM,
> > the "drop" will lead to the lost of the networking connection
> > until next checkpoint.
> >
> > To address the issue, this patch drops the packet firstly.
> > Then, send all queued primary packets, remove all queued secondary
> > packets and do checkpoint.
> >
> > Signed-off-by: Derek Su <dereksu@qnap.com>
> > ---
> >  net/colo-compare.c | 41 ++++++++++++++++++++++++++++++-----------
> >  1 file changed, 30 insertions(+), 11 deletions(-)
> >
> > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > index cdd87b2aa8..1a52f50fbe 100644
> > --- a/net/colo-compare.c
> > +++ b/net/colo-compare.c
> > @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
> >      [SECONDARY_IN] = "secondary",
> >  };
> >
> > +enum {
> > +    QUEUE_INSERT_ERR = -1,
> > +    QUEUE_INSERT_OK = 0,
> > +    QUEUE_INSERT_FULL = 1,
> > +};
> > +
> >  static int compare_chr_send(CompareState *s,
> >                              const uint8_t *buf,
> >                              uint32_t size,
> > @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
> >  }
> >
> >  /*
> > - * Return 0 on success, if return -1 means the pkt
> > - * is unsupported(arp and ipv6) and will be sent later
> > + * Return QUEUE_INSERT_OK on success.
> > + * If return QUEUE_INSERT_FULL means list is full, and
> > + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> > + * will be sent later
> >   */
> >  static int packet_enqueue(CompareState *s, int mode, Connection **con)
> >  {
> > @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
> >      if (parse_packet_early(pkt)) {
> >          packet_destroy(pkt, NULL);
> >          pkt = NULL;
> > -        return -1;
> > +        return QUEUE_INSERT_ERR;
> >      }
> >      fill_connection_key(pkt, &key);
> >
> > @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
> >                       "drop packet", colo_mode[mode]);
> >          packet_destroy(pkt, NULL);
> >          pkt = NULL;
> > +        return QUEUE_INSERT_FULL;
> >      }
> >
> >      *con = conn;
> >
> > -    return 0;
> > +    return QUEUE_INSERT_OK;
> >  }
> >
> >  static inline bool after(uint32_t seq1, uint32_t seq2)
> > @@ -995,17 +1004,22 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
> >  {
> >      CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> >      Connection *conn = NULL;
> > +    int ret;
> >
> > -    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
> > +    ret = packet_enqueue(s, PRIMARY_IN, &conn);
> > +    if (ret == QUEUE_INSERT_OK) {
> > +        /* compare packet in the specified connection */
> > +        colo_compare_connection(conn, s);
> > +    } else if (ret == QUEUE_INSERT_FULL) {
> > +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > +        colo_compare_inconsistency_notify(s);
> > +    } else {
> >          trace_colo_compare_main("primary: unsupported packet in");
> >          compare_chr_send(s,
> >                           pri_rs->buf,
> >                           pri_rs->packet_len,
> >                           pri_rs->vnet_hdr_len,
> >                           false);
> > -    } else {
> > -        /* compare packet in the specified connection */
> > -        colo_compare_connection(conn, s);
> >      }
> >  }
> >
> > @@ -1013,12 +1027,17 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
> >  {
> >      CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> >      Connection *conn = NULL;
> > +    int ret;
> >
> > -    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
> > -        trace_colo_compare_main("secondary: unsupported packet in");
> > -    } else {
> > +    ret = packet_enqueue(s, SECONDARY_IN, &conn);
> > +    if (ret == QUEUE_INSERT_OK) {
> >          /* compare packet in the specified connection */
> >          colo_compare_connection(conn, s);
> > +    } else if (ret == QUEUE_INSERT_FULL) {
> > +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > +        colo_compare_inconsistency_notify(s);
> > +    } else {
> > +        trace_colo_compare_main("secondary: unsupported packet in");
> >      }
> >  }
> >
>
> Hi,
> I don't think we have to flush here because the (post-)checkpoint event will flush the packets for us.
>

Hi,
Yes, the periodical checkpoint can flush the packets.

But, if many clients send many packets to the vm,
there is a high probability that packets are dropped because the
primary/secondary queues are always full.
It causes lots of re-transmission between clients and vm and
deteriorate service response to clients.

Sincerely,
Derek Su

> Regards,
> Lukas Straub
Lukas Straub March 27, 2020, 6:28 p.m. UTC | #3
On Sat, 28 Mar 2020 02:20:21 +0800
Derek Su <jwsu1986@gmail.com> wrote:

> Lukas Straub <lukasstraub2@web.de> 於 2020年3月28日 週六 上午1:46寫道:
> >
> > On Wed, 25 Mar 2020 17:43:54 +0800
> > Derek Su <dereksu@qnap.com> wrote:
> >  
> > > The pervious handling of the full primary or queue is only dropping
> > > the packet. If there are lots of clients to the guest VM,
> > > the "drop" will lead to the lost of the networking connection
> > > until next checkpoint.
> > >
> > > To address the issue, this patch drops the packet firstly.
> > > Then, send all queued primary packets, remove all queued secondary
> > > packets and do checkpoint.
> > >
> > > Signed-off-by: Derek Su <dereksu@qnap.com>
> > > ---
> > >  net/colo-compare.c | 41 ++++++++++++++++++++++++++++++-----------
> > >  1 file changed, 30 insertions(+), 11 deletions(-)
> > >
> > > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > > index cdd87b2aa8..1a52f50fbe 100644
> > > --- a/net/colo-compare.c
> > > +++ b/net/colo-compare.c
> > > @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
> > >      [SECONDARY_IN] = "secondary",
> > >  };
> > >
> > > +enum {
> > > +    QUEUE_INSERT_ERR = -1,
> > > +    QUEUE_INSERT_OK = 0,
> > > +    QUEUE_INSERT_FULL = 1,
> > > +};
> > > +
> > >  static int compare_chr_send(CompareState *s,
> > >                              const uint8_t *buf,
> > >                              uint32_t size,
> > > @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
> > >  }
> > >
> > >  /*
> > > - * Return 0 on success, if return -1 means the pkt
> > > - * is unsupported(arp and ipv6) and will be sent later
> > > + * Return QUEUE_INSERT_OK on success.
> > > + * If return QUEUE_INSERT_FULL means list is full, and
> > > + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> > > + * will be sent later
> > >   */
> > >  static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > >  {
> > > @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > >      if (parse_packet_early(pkt)) {
> > >          packet_destroy(pkt, NULL);
> > >          pkt = NULL;
> > > -        return -1;
> > > +        return QUEUE_INSERT_ERR;
> > >      }
> > >      fill_connection_key(pkt, &key);
> > >
> > > @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > >                       "drop packet", colo_mode[mode]);
> > >          packet_destroy(pkt, NULL);
> > >          pkt = NULL;
> > > +        return QUEUE_INSERT_FULL;
> > >      }
> > >
> > >      *con = conn;
> > >
> > > -    return 0;
> > > +    return QUEUE_INSERT_OK;
> > >  }
> > >
> > >  static inline bool after(uint32_t seq1, uint32_t seq2)
> > > @@ -995,17 +1004,22 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
> > >  {
> > >      CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> > >      Connection *conn = NULL;
> > > +    int ret;
> > >
> > > -    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
> > > +    ret = packet_enqueue(s, PRIMARY_IN, &conn);
> > > +    if (ret == QUEUE_INSERT_OK) {
> > > +        /* compare packet in the specified connection */
> > > +        colo_compare_connection(conn, s);
> > > +    } else if (ret == QUEUE_INSERT_FULL) {
> > > +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > > +        colo_compare_inconsistency_notify(s);
> > > +    } else {
> > >          trace_colo_compare_main("primary: unsupported packet in");
> > >          compare_chr_send(s,
> > >                           pri_rs->buf,
> > >                           pri_rs->packet_len,
> > >                           pri_rs->vnet_hdr_len,
> > >                           false);
> > > -    } else {
> > > -        /* compare packet in the specified connection */
> > > -        colo_compare_connection(conn, s);
> > >      }
> > >  }
> > >
> > > @@ -1013,12 +1027,17 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
> > >  {
> > >      CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> > >      Connection *conn = NULL;
> > > +    int ret;
> > >
> > > -    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
> > > -        trace_colo_compare_main("secondary: unsupported packet in");
> > > -    } else {
> > > +    ret = packet_enqueue(s, SECONDARY_IN, &conn);
> > > +    if (ret == QUEUE_INSERT_OK) {
> > >          /* compare packet in the specified connection */
> > >          colo_compare_connection(conn, s);
> > > +    } else if (ret == QUEUE_INSERT_FULL) {
> > > +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > > +        colo_compare_inconsistency_notify(s);
> > > +    } else {
> > > +        trace_colo_compare_main("secondary: unsupported packet in");
> > >      }
> > >  }
> > >  
> >
> > Hi,
> > I don't think we have to flush here because the (post-)checkpoint event will flush the packets for us.
> >  
> 
> Hi,
> Yes, the periodical checkpoint can flush the packets.
> 
> But, if many clients send many packets to the vm,
> there is a high probability that packets are dropped because the
> primary/secondary queues are always full.
> It causes lots of re-transmission between clients and vm and
> deteriorate service response to clients.
> 
> Sincerely,
> Derek Su

I mean that we can still initiate a checkpoint here, but not do the flushing here.

Regards,
Lukas Straub

> > Regards,
> > Lukas Straub
Derek Su March 28, 2020, 3:35 a.m. UTC | #4
Lukas Straub <lukasstraub2@web.de> 於 2020年3月28日 週六 上午2:28寫道:
>
> On Sat, 28 Mar 2020 02:20:21 +0800
> Derek Su <jwsu1986@gmail.com> wrote:
>
> > Lukas Straub <lukasstraub2@web.de> 於 2020年3月28日 週六 上午1:46寫道:
> > >
> > > On Wed, 25 Mar 2020 17:43:54 +0800
> > > Derek Su <dereksu@qnap.com> wrote:
> > >
> > > > The pervious handling of the full primary or queue is only dropping
> > > > the packet. If there are lots of clients to the guest VM,
> > > > the "drop" will lead to the lost of the networking connection
> > > > until next checkpoint.
> > > >
> > > > To address the issue, this patch drops the packet firstly.
> > > > Then, send all queued primary packets, remove all queued secondary
> > > > packets and do checkpoint.
> > > >
> > > > Signed-off-by: Derek Su <dereksu@qnap.com>
> > > > ---
> > > >  net/colo-compare.c | 41 ++++++++++++++++++++++++++++++-----------
> > > >  1 file changed, 30 insertions(+), 11 deletions(-)
> > > >
> > > > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > > > index cdd87b2aa8..1a52f50fbe 100644
> > > > --- a/net/colo-compare.c
> > > > +++ b/net/colo-compare.c
> > > > @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
> > > >      [SECONDARY_IN] = "secondary",
> > > >  };
> > > >
> > > > +enum {
> > > > +    QUEUE_INSERT_ERR = -1,
> > > > +    QUEUE_INSERT_OK = 0,
> > > > +    QUEUE_INSERT_FULL = 1,
> > > > +};
> > > > +
> > > >  static int compare_chr_send(CompareState *s,
> > > >                              const uint8_t *buf,
> > > >                              uint32_t size,
> > > > @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
> > > >  }
> > > >
> > > >  /*
> > > > - * Return 0 on success, if return -1 means the pkt
> > > > - * is unsupported(arp and ipv6) and will be sent later
> > > > + * Return QUEUE_INSERT_OK on success.
> > > > + * If return QUEUE_INSERT_FULL means list is full, and
> > > > + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> > > > + * will be sent later
> > > >   */
> > > >  static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > > >  {
> > > > @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > > >      if (parse_packet_early(pkt)) {
> > > >          packet_destroy(pkt, NULL);
> > > >          pkt = NULL;
> > > > -        return -1;
> > > > +        return QUEUE_INSERT_ERR;
> > > >      }
> > > >      fill_connection_key(pkt, &key);
> > > >
> > > > @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > > >                       "drop packet", colo_mode[mode]);
> > > >          packet_destroy(pkt, NULL);
> > > >          pkt = NULL;
> > > > +        return QUEUE_INSERT_FULL;
> > > >      }
> > > >
> > > >      *con = conn;
> > > >
> > > > -    return 0;
> > > > +    return QUEUE_INSERT_OK;
> > > >  }
> > > >
> > > >  static inline bool after(uint32_t seq1, uint32_t seq2)
> > > > @@ -995,17 +1004,22 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
> > > >  {
> > > >      CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> > > >      Connection *conn = NULL;
> > > > +    int ret;
> > > >
> > > > -    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
> > > > +    ret = packet_enqueue(s, PRIMARY_IN, &conn);
> > > > +    if (ret == QUEUE_INSERT_OK) {
> > > > +        /* compare packet in the specified connection */
> > > > +        colo_compare_connection(conn, s);
> > > > +    } else if (ret == QUEUE_INSERT_FULL) {
> > > > +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > > > +        colo_compare_inconsistency_notify(s);
> > > > +    } else {
> > > >          trace_colo_compare_main("primary: unsupported packet in");
> > > >          compare_chr_send(s,
> > > >                           pri_rs->buf,
> > > >                           pri_rs->packet_len,
> > > >                           pri_rs->vnet_hdr_len,
> > > >                           false);
> > > > -    } else {
> > > > -        /* compare packet in the specified connection */
> > > > -        colo_compare_connection(conn, s);
> > > >      }
> > > >  }
> > > >
> > > > @@ -1013,12 +1027,17 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
> > > >  {
> > > >      CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> > > >      Connection *conn = NULL;
> > > > +    int ret;
> > > >
> > > > -    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
> > > > -        trace_colo_compare_main("secondary: unsupported packet in");
> > > > -    } else {
> > > > +    ret = packet_enqueue(s, SECONDARY_IN, &conn);
> > > > +    if (ret == QUEUE_INSERT_OK) {
> > > >          /* compare packet in the specified connection */
> > > >          colo_compare_connection(conn, s);
> > > > +    } else if (ret == QUEUE_INSERT_FULL) {
> > > > +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> > > > +        colo_compare_inconsistency_notify(s);
> > > > +    } else {
> > > > +        trace_colo_compare_main("secondary: unsupported packet in");
> > > >      }
> > > >  }
> > > >
> > >
> > > Hi,
> > > I don't think we have to flush here because the (post-)checkpoint event will flush the packets for us.
> > >
> >
> > Hi,
> > Yes, the periodical checkpoint can flush the packets.
> >
> > But, if many clients send many packets to the vm,
> > there is a high probability that packets are dropped because the
> > primary/secondary queues are always full.
> > It causes lots of re-transmission between clients and vm and
> > deteriorate service response to clients.
> >
> > Sincerely,
> > Derek Su
>
> I mean that we can still initiate a checkpoint here, but not do the flushing here.
>
> Regards,
> Lukas Straub

Hi,

Your're right.
After checking the colo_do_checkpoint_transaction function,  the flush
in the patch is redundant.
It will be fixed in the next version.
Thanks.

Derek Su
>
> > > Regards,
> > > Lukas Straub
>
diff mbox series

Patch

diff --git a/net/colo-compare.c b/net/colo-compare.c
index cdd87b2aa8..1a52f50fbe 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -125,6 +125,12 @@  static const char *colo_mode[] = {
     [SECONDARY_IN] = "secondary",
 };
 
+enum {
+    QUEUE_INSERT_ERR = -1,
+    QUEUE_INSERT_OK = 0,
+    QUEUE_INSERT_FULL = 1,
+};
+
 static int compare_chr_send(CompareState *s,
                             const uint8_t *buf,
                             uint32_t size,
@@ -211,8 +217,10 @@  static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
 }
 
 /*
- * Return 0 on success, if return -1 means the pkt
- * is unsupported(arp and ipv6) and will be sent later
+ * Return QUEUE_INSERT_OK on success.
+ * If return QUEUE_INSERT_FULL means list is full, and
+ * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
+ * will be sent later
  */
 static int packet_enqueue(CompareState *s, int mode, Connection **con)
 {
@@ -234,7 +242,7 @@  static int packet_enqueue(CompareState *s, int mode, Connection **con)
     if (parse_packet_early(pkt)) {
         packet_destroy(pkt, NULL);
         pkt = NULL;
-        return -1;
+        return QUEUE_INSERT_ERR;
     }
     fill_connection_key(pkt, &key);
 
@@ -258,11 +266,12 @@  static int packet_enqueue(CompareState *s, int mode, Connection **con)
                      "drop packet", colo_mode[mode]);
         packet_destroy(pkt, NULL);
         pkt = NULL;
+        return QUEUE_INSERT_FULL;
     }
 
     *con = conn;
 
-    return 0;
+    return QUEUE_INSERT_OK;
 }
 
 static inline bool after(uint32_t seq1, uint32_t seq2)
@@ -995,17 +1004,22 @@  static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 {
     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
     Connection *conn = NULL;
+    int ret;
 
-    if (packet_enqueue(s, PRIMARY_IN, &conn)) {
+    ret = packet_enqueue(s, PRIMARY_IN, &conn);
+    if (ret == QUEUE_INSERT_OK) {
+        /* compare packet in the specified connection */
+        colo_compare_connection(conn, s);
+    } else if (ret == QUEUE_INSERT_FULL) {
+        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+        colo_compare_inconsistency_notify(s);
+    } else {
         trace_colo_compare_main("primary: unsupported packet in");
         compare_chr_send(s,
                          pri_rs->buf,
                          pri_rs->packet_len,
                          pri_rs->vnet_hdr_len,
                          false);
-    } else {
-        /* compare packet in the specified connection */
-        colo_compare_connection(conn, s);
     }
 }
 
@@ -1013,12 +1027,17 @@  static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 {
     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
     Connection *conn = NULL;
+    int ret;
 
-    if (packet_enqueue(s, SECONDARY_IN, &conn)) {
-        trace_colo_compare_main("secondary: unsupported packet in");
-    } else {
+    ret = packet_enqueue(s, SECONDARY_IN, &conn);
+    if (ret == QUEUE_INSERT_OK) {
         /* compare packet in the specified connection */
         colo_compare_connection(conn, s);
+    } else if (ret == QUEUE_INSERT_FULL) {
+        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+        colo_compare_inconsistency_notify(s);
+    } else {
+        trace_colo_compare_main("secondary: unsupported packet in");
     }
 }