diff mbox

[RFC,V7,6/7] colo-compare: introduce packet comparison thread

Message ID 1468827650-17234-7-git-send-email-zhangchen.fnst@cn.fujitsu.com
State New
Headers show

Commit Message

Zhang Chen July 18, 2016, 7:40 a.m. UTC
If primary packet is same with secondary packet,
we will send primary packet and drop secondary
packet, otherwise notify COLO frame to do checkpoint.
If primary packet comes and secondary packet not,
after REGULAR_PACKET_CHECK_MS milliseconds we set
the primary packet as old_packet,then do a checkpoint.

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-base.c    |   1 +
 net/colo-base.h    |   3 +
 net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 trace-events       |   2 +
 4 files changed, 222 insertions(+)

Comments

Daniel P. Berrangé July 18, 2016, 8:37 a.m. UTC | #1
On Mon, Jul 18, 2016 at 03:40:49PM +0800, Zhang Chen wrote:
> If primary packet is same with secondary packet,
> we will send primary packet and drop secondary
> packet, otherwise notify COLO frame to do checkpoint.
> If primary packet comes and secondary packet not,
> after REGULAR_PACKET_CHECK_MS milliseconds we set
> the primary packet as old_packet,then do a checkpoint.
> 
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  net/colo-base.c    |   1 +
>  net/colo-base.h    |   3 +
>  net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  trace-events       |   2 +
>  4 files changed, 222 insertions(+)
> 
> +static void *colo_compare_thread(void *opaque)
> +{
> +    GMainContext *worker_context;
> +    GMainLoop *compare_loop;
> +    CompareState *s = opaque;
> +
> +    worker_context = g_main_context_new();
> +    g_assert(g_main_context_get_thread_default() == NULL);
> +    g_main_context_push_thread_default(worker_context);
> +    g_assert(g_main_context_get_thread_default() == worker_context);
> +
> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
> +                          compare_pri_chr_in, NULL, s);
> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
> +                          compare_sec_chr_in, NULL, s);

Looking at this I see you've used g_main_context_push_thread_default
in order to avoid having to modify the qemu_chr_add_handlers() method
to accept a GMainContext. Personally I think I'd favour explicit
passing of a GMainContext, rather than modifying Qemu Char code &
QIOChannel to magically use per-thread main context.  In particular
I'm concerned that one day other code may use the
g_main_context_push_thread_default method for some other purpose
but still want their char devs handled by the main thread. Your
solution makes this impossible and adds magic which could surprise
us in bad ways.

So I think I'd rather see qemu_chr_add_handlers() modified to accept
a GMainContext, or perhaps add a qemu_chr_add_handlers_full() method
which takes a GMainContext, so we can avoid modifying all existing
callers ofo qemu_chr_add_handlers.  There should be no need to modify
QIOChannel at all, since you can use qio_channel_create_watch instead
of qio_channel_add_watch and simply attach to the context you want
directly.


Regards,
Daniel
Zhang Chen July 19, 2016, 3:32 a.m. UTC | #2
On 07/18/2016 04:37 PM, Daniel P. Berrange wrote:
> On Mon, Jul 18, 2016 at 03:40:49PM +0800, Zhang Chen wrote:
>> If primary packet is same with secondary packet,
>> we will send primary packet and drop secondary
>> packet, otherwise notify COLO frame to do checkpoint.
>> If primary packet comes and secondary packet not,
>> after REGULAR_PACKET_CHECK_MS milliseconds we set
>> the primary packet as old_packet,then do a checkpoint.
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-base.c    |   1 +
>>   net/colo-base.h    |   3 +
>>   net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   trace-events       |   2 +
>>   4 files changed, 222 insertions(+)
>>
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> +    GMainContext *worker_context;
>> +    GMainLoop *compare_loop;
>> +    CompareState *s = opaque;
>> +
>> +    worker_context = g_main_context_new();
>> +    g_assert(g_main_context_get_thread_default() == NULL);
>> +    g_main_context_push_thread_default(worker_context);
>> +    g_assert(g_main_context_get_thread_default() == worker_context);
>> +
>> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
>> +                          compare_pri_chr_in, NULL, s);
>> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
>> +                          compare_sec_chr_in, NULL, s);
> Looking at this I see you've used g_main_context_push_thread_default
> in order to avoid having to modify the qemu_chr_add_handlers() method
> to accept a GMainContext. Personally I think I'd favour explicit
> passing of a GMainContext, rather than modifying Qemu Char code &
> QIOChannel to magically use per-thread main context.  In particular
> I'm concerned that one day other code may use the
> g_main_context_push_thread_default method for some other purpose
> but still want their char devs handled by the main thread. Your
> solution makes this impossible and adds magic which could surprise
> us in bad ways.
>
> So I think I'd rather see qemu_chr_add_handlers() modified to accept
> a GMainContext, or perhaps add a qemu_chr_add_handlers_full() method
> which takes a GMainContext, so we can avoid modifying all existing
> callers ofo qemu_chr_add_handlers.  There should be no need to modify
> QIOChannel at all, since you can use qio_channel_create_watch instead
> of qio_channel_add_watch and simply attach to the context you want
> directly.

I get your point.
Considering current qemu code, I think add a flag
in CharDriverState to decide whether or not to
run g_source_attach(xxx, g_main_context_get_thread_default());
is a easy way for this.
How about this way?

Thanks for your review.
Zhang Chen

>
> Regards,
> Daniel
Daniel P. Berrangé July 19, 2016, 9:03 a.m. UTC | #3
On Tue, Jul 19, 2016 at 11:32:41AM +0800, Zhang Chen wrote:
> 
> 
> On 07/18/2016 04:37 PM, Daniel P. Berrange wrote:
> > On Mon, Jul 18, 2016 at 03:40:49PM +0800, Zhang Chen wrote:
> > > If primary packet is same with secondary packet,
> > > we will send primary packet and drop secondary
> > > packet, otherwise notify COLO frame to do checkpoint.
> > > If primary packet comes and secondary packet not,
> > > after REGULAR_PACKET_CHECK_MS milliseconds we set
> > > the primary packet as old_packet,then do a checkpoint.
> > > 
> > > Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> > > Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> > > Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> > > ---
> > >   net/colo-base.c    |   1 +
> > >   net/colo-base.h    |   3 +
> > >   net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
> > >   trace-events       |   2 +
> > >   4 files changed, 222 insertions(+)
> > > 
> > > +static void *colo_compare_thread(void *opaque)
> > > +{
> > > +    GMainContext *worker_context;
> > > +    GMainLoop *compare_loop;
> > > +    CompareState *s = opaque;
> > > +
> > > +    worker_context = g_main_context_new();
> > > +    g_assert(g_main_context_get_thread_default() == NULL);
> > > +    g_main_context_push_thread_default(worker_context);
> > > +    g_assert(g_main_context_get_thread_default() == worker_context);
> > > +
> > > +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
> > > +                          compare_pri_chr_in, NULL, s);
> > > +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
> > > +                          compare_sec_chr_in, NULL, s);
> > Looking at this I see you've used g_main_context_push_thread_default
> > in order to avoid having to modify the qemu_chr_add_handlers() method
> > to accept a GMainContext. Personally I think I'd favour explicit
> > passing of a GMainContext, rather than modifying Qemu Char code &
> > QIOChannel to magically use per-thread main context.  In particular
> > I'm concerned that one day other code may use the
> > g_main_context_push_thread_default method for some other purpose
> > but still want their char devs handled by the main thread. Your
> > solution makes this impossible and adds magic which could surprise
> > us in bad ways.
> > 
> > So I think I'd rather see qemu_chr_add_handlers() modified to accept
> > a GMainContext, or perhaps add a qemu_chr_add_handlers_full() method
> > which takes a GMainContext, so we can avoid modifying all existing
> > callers ofo qemu_chr_add_handlers.  There should be no need to modify
> > QIOChannel at all, since you can use qio_channel_create_watch instead
> > of qio_channel_add_watch and simply attach to the context you want
> > directly.
> 
> I get your point.
> Considering current qemu code, I think add a flag
> in CharDriverState to decide whether or not to
> run g_source_attach(xxx, g_main_context_get_thread_default());
> is a easy way for this.
> How about this way?

I really prefer to see the GMainContext passed in with the API(s)
which register callbacks, not provided via the backdoor via the
glib thread default global state, or via chardev global state.

Regards,
Daniel
Zhang Chen July 20, 2016, 3:29 a.m. UTC | #4
On 07/19/2016 05:03 PM, Daniel P. Berrange wrote:
> On Tue, Jul 19, 2016 at 11:32:41AM +0800, Zhang Chen wrote:
>>
>> On 07/18/2016 04:37 PM, Daniel P. Berrange wrote:
>>> On Mon, Jul 18, 2016 at 03:40:49PM +0800, Zhang Chen wrote:
>>>> If primary packet is same with secondary packet,
>>>> we will send primary packet and drop secondary
>>>> packet, otherwise notify COLO frame to do checkpoint.
>>>> If primary packet comes and secondary packet not,
>>>> after REGULAR_PACKET_CHECK_MS milliseconds we set
>>>> the primary packet as old_packet,then do a checkpoint.
>>>>
>>>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>>>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>>>> ---
>>>>    net/colo-base.c    |   1 +
>>>>    net/colo-base.h    |   3 +
>>>>    net/colo-compare.c | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>>    trace-events       |   2 +
>>>>    4 files changed, 222 insertions(+)
>>>>
>>>> +static void *colo_compare_thread(void *opaque)
>>>> +{
>>>> +    GMainContext *worker_context;
>>>> +    GMainLoop *compare_loop;
>>>> +    CompareState *s = opaque;
>>>> +
>>>> +    worker_context = g_main_context_new();
>>>> +    g_assert(g_main_context_get_thread_default() == NULL);
>>>> +    g_main_context_push_thread_default(worker_context);
>>>> +    g_assert(g_main_context_get_thread_default() == worker_context);
>>>> +
>>>> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
>>>> +                          compare_pri_chr_in, NULL, s);
>>>> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
>>>> +                          compare_sec_chr_in, NULL, s);
>>> Looking at this I see you've used g_main_context_push_thread_default
>>> in order to avoid having to modify the qemu_chr_add_handlers() method
>>> to accept a GMainContext. Personally I think I'd favour explicit
>>> passing of a GMainContext, rather than modifying Qemu Char code &
>>> QIOChannel to magically use per-thread main context.  In particular
>>> I'm concerned that one day other code may use the
>>> g_main_context_push_thread_default method for some other purpose
>>> but still want their char devs handled by the main thread. Your
>>> solution makes this impossible and adds magic which could surprise
>>> us in bad ways.
>>>
>>> So I think I'd rather see qemu_chr_add_handlers() modified to accept
>>> a GMainContext, or perhaps add a qemu_chr_add_handlers_full() method
>>> which takes a GMainContext, so we can avoid modifying all existing
>>> callers ofo qemu_chr_add_handlers.  There should be no need to modify
>>> QIOChannel at all, since you can use qio_channel_create_watch instead
>>> of qio_channel_add_watch and simply attach to the context you want
>>> directly.
>> I get your point.
>> Considering current qemu code, I think add a flag
>> in CharDriverState to decide whether or not to
>> run g_source_attach(xxx, g_main_context_get_thread_default());
>> is a easy way for this.
>> How about this way?
> I really prefer to see the GMainContext passed in with the API(s)
> which register callbacks, not provided via the backdoor via the
> glib thread default global state, or via chardev global state.
>

OK,I will add this API in next version.
void qemu_chr_add_handlers_full(CharDriverState *s,
                            IOCanReadHandler *fd_can_read,
                            IOReadHandler *fd_read,
                            IOEventHandler *fd_event,
                            void *opaque,
GMainContext *context)

But maybe have some codes duplicate with qemu_chr_add_handlers(),
Do you have suggestion to avoid it?


Thanks
Zhang Chen
diff mbox

Patch

diff --git a/net/colo-base.c b/net/colo-base.c
index 7e91dec..eb1b631 100644
--- a/net/colo-base.c
+++ b/net/colo-base.c
@@ -132,6 +132,7 @@  Packet *packet_new(const void *data, int size)
 
     pkt->data = g_memdup(data, size);
     pkt->size = size;
+    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 
     return pkt;
 }
diff --git a/net/colo-base.h b/net/colo-base.h
index 0505608..06d6dca 100644
--- a/net/colo-base.h
+++ b/net/colo-base.h
@@ -17,6 +17,7 @@ 
 
 #include "slirp/slirp.h"
 #include "qemu/jhash.h"
+#include "qemu/timer.h"
 
 #define HASHTABLE_MAX_SIZE 16384
 
@@ -28,6 +29,8 @@  typedef struct Packet {
     };
     uint8_t *transport_layer;
     int size;
+    /* Time of packet creation, in wall clock ms */
+    int64_t creation_ms;
 } Packet;
 
 typedef struct ConnectionKey {
diff --git a/net/colo-compare.c b/net/colo-compare.c
index 5f87710..942e326 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -36,6 +36,8 @@ 
 
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
 #define MAX_QUEUE_SIZE 1024
+/* TODO: Should be configurable */
+#define REGULAR_PACKET_CHECK_MS 3000
 
 /*
   + CompareState ++
@@ -83,6 +85,10 @@  typedef struct CompareState {
     GQueue unprocessed_connections;
     /* proxy current hash size */
     uint32_t hashtable_size;
+    /* compare thread, a thread for each NIC */
+    QemuThread thread;
+    /* Timer used on the primary to find packets that are never matched */
+    QEMUTimer *timer;
 } CompareState;
 
 typedef struct CompareClass {
@@ -170,6 +176,112 @@  static int packet_enqueue(CompareState *s, int mode)
     return 0;
 }
 
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return:    0  means packet same
+ *            > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
+                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
+                               inet_ntoa(spkt->ip->ip_src),
+                               inet_ntoa(spkt->ip->ip_dst));
+
+    if (ppkt->size == spkt->size) {
+        return memcmp(ppkt->data, spkt->data, spkt->size);
+    } else {
+        return -1;
+    }
+}
+
+static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
+{
+    trace_colo_compare_main("compare all");
+    return colo_packet_compare(ppkt, spkt);
+}
+
+static void colo_old_packet_check_one(void *opaque_packet,
+                                      void *opaque_found)
+{
+    int64_t now;
+    bool *found_old = (bool *)opaque_found;
+    Packet *ppkt = (Packet *)opaque_packet;
+
+    if (*found_old) {
+        /* Someone found an old packet earlier in the queue */
+        return;
+    }
+
+    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
+    if ((now - ppkt->creation_ms) > REGULAR_PACKET_CHECK_MS) {
+        trace_colo_old_packet_check_found(ppkt->creation_ms);
+        *found_old = true;
+    }
+}
+
+static void colo_old_packet_check_one_conn(void *opaque,
+                                           void *user_data)
+{
+    bool found_old = false;
+    Connection *conn = opaque;
+
+    g_queue_foreach(&conn->primary_list, colo_old_packet_check_one,
+                    &found_old);
+    if (found_old) {
+        /* do checkpoint will flush old packet */
+        /* TODO: colo_notify_checkpoint();*/
+    }
+}
+
+/*
+ * Look for old packets that the secondary hasn't matched,
+ * if we have some then we have to checkpoint to wake
+ * the secondary up.
+ */
+static void colo_old_packet_check(void *opaque)
+{
+    CompareState *s = opaque;
+
+    g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, NULL);
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare connection
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    CompareState *s = user_data;
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+    GList *result = NULL;
+    int ret;
+
+    while (!g_queue_is_empty(&conn->primary_list) &&
+           !g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_tail(&conn->primary_list);
+        result = g_queue_find_custom(&conn->secondary_list,
+                              pkt, (GCompareFunc)colo_packet_compare_all);
+
+        if (result) {
+            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
+            if (ret < 0) {
+                error_report("colo_send_primary_packet failed");
+            }
+            trace_colo_compare_main("packet same and release packet");
+            g_queue_remove(&conn->secondary_list, result->data);
+        } else {
+            trace_colo_compare_main("packet different");
+            g_queue_push_tail(&conn->primary_list, pkt);
+            /* TODO: colo_notify_checkpoint();*/
+            break;
+        }
+    }
+}
+
 static int compare_chr_send(CharDriverState *out,
                             const uint8_t *buf,
                             uint32_t size)
@@ -197,6 +309,69 @@  err:
     return ret < 0 ? ret : -EIO;
 }
 
+static int compare_chr_can_read(void *opaque)
+{
+    return COMPARE_READ_LEN_MAX;
+}
+
+/*
+ * called from the main thread on the primary for packets
+ * arriving over the socket from the primary.
+ */
+static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->pri_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        error_report("colo-compare primary_in error");
+    }
+}
+
+/*
+ * called from the main thread on the primary for packets
+ * arriving over the socket from the secondary.
+ */
+static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->sec_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        error_report("colo-compare secondary_in error");
+    }
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+    GMainContext *worker_context;
+    GMainLoop *compare_loop;
+    CompareState *s = opaque;
+
+    worker_context = g_main_context_new();
+    g_assert(g_main_context_get_thread_default() == NULL);
+    g_main_context_push_thread_default(worker_context);
+    g_assert(g_main_context_get_thread_default() == worker_context);
+
+    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
+                          compare_pri_chr_in, NULL, s);
+    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
+                          compare_sec_chr_in, NULL, s);
+
+    compare_loop = g_main_loop_new(worker_context, FALSE);
+
+    g_main_loop_run(compare_loop);
+
+    g_main_loop_unref(compare_loop);
+    g_main_context_pop_thread_default(worker_context);
+    g_main_context_unref(worker_context);
+    return NULL;
+}
+
 static char *compare_get_pri_indev(Object *obj, Error **errp)
 {
     CompareState *s = COLO_COMPARE(obj);
@@ -249,6 +424,9 @@  static void compare_pri_rs_finalize(SocketReadState *pri_rs)
     if (packet_enqueue(s, PRIMARY_IN)) {
         trace_colo_compare_main("primary: unsupported packet in");
         compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+    } else {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
     }
 }
 
@@ -258,16 +436,35 @@  static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 
     if (packet_enqueue(s, SECONDARY_IN)) {
         trace_colo_compare_main("secondary: unsupported packet in");
+    } else {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
     }
 }
 
 /*
+ * Check old packet regularly so it can watch for any packets
+ * that the secondary hasn't produced equivalents of.
+ */
+static void check_old_packet_regular(void *opaque)
+{
+    CompareState *s = opaque;
+
+    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+              REGULAR_PACKET_CHECK_MS);
+    /* if have old packet we will notify checkpoint */
+    colo_old_packet_check(s);
+}
+
+/*
  * called from the main thread on the primary
  * to setup colo-compare.
  */
 static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
+    char thread_name[64];
+    static int compare_id;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -319,6 +516,18 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       g_free,
                                                       connection_destroy);
 
+    sprintf(thread_name, "compare %d", compare_id);
+    qemu_thread_create(&s->thread, thread_name,
+                       colo_compare_thread, s,
+                       QEMU_THREAD_JOINABLE);
+    compare_id++;
+
+    /* A regular timer to kick any packets that the secondary doesn't match */
+    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
+                            check_old_packet_regular, s);
+    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+                        REGULAR_PACKET_CHECK_MS);
+
     return;
 }
 
@@ -360,6 +569,13 @@  static void colo_compare_finalize(Object *obj)
 
     g_queue_free(&s->conn_list);
 
+    if (s->thread.thread) {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
+        qemu_thread_join(&s->thread);
+    }
+    timer_del(s->timer);
+
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);
diff --git a/trace-events b/trace-events
index 703de1a..1537e91 100644
--- a/trace-events
+++ b/trace-events
@@ -1919,3 +1919,5 @@  aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
 
 # net/colo-compare.c
 colo_compare_main(const char *chr) ": %s"
+colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
+colo_old_packet_check_found(int64_t old_time) "%" PRId64