diff mbox series

[V8,02/17] colo-compare: implement the process of checkpoint

Message ID 20180603050546.6827-3-zhangckid@gmail.com
State New
Headers show
Series COLO: integrate colo frame with block replication and COLO proxy | expand

Commit Message

Zhang Chen June 3, 2018, 5:05 a.m. UTC
While do checkpoint, we need to flush all the unhandled packets,
By using the filter notifier mechanism, we can easily to notify
every compare object to do this process, which runs inside
of compare threads as a coroutine.

Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
Signed-off-by: Zhang Chen <zhangckid@gmail.com>
---
 include/migration/colo.h |  6 ++++
 net/colo-compare.c       | 76 ++++++++++++++++++++++++++++++++++++++++
 net/colo-compare.h       | 22 ++++++++++++
 3 files changed, 104 insertions(+)
 create mode 100644 net/colo-compare.h

Comments

Jason Wang June 4, 2018, 6:31 a.m. UTC | #1
On 2018年06月03日 13:05, Zhang Chen wrote:
> While do checkpoint, we need to flush all the unhandled packets,
> By using the filter notifier mechanism, we can easily to notify
> every compare object to do this process, which runs inside
> of compare threads as a coroutine.
>
> Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
> Signed-off-by: Zhang Chen <zhangckid@gmail.com>
> ---
>   include/migration/colo.h |  6 ++++
>   net/colo-compare.c       | 76 ++++++++++++++++++++++++++++++++++++++++
>   net/colo-compare.h       | 22 ++++++++++++
>   3 files changed, 104 insertions(+)
>   create mode 100644 net/colo-compare.h
>
> diff --git a/include/migration/colo.h b/include/migration/colo.h
> index 2fe48ad353..fefb2fcf4c 100644
> --- a/include/migration/colo.h
> +++ b/include/migration/colo.h
> @@ -16,6 +16,12 @@
>   #include "qemu-common.h"
>   #include "qapi/qapi-types-migration.h"
>   
> +enum colo_event {
> +    COLO_EVENT_NONE,
> +    COLO_EVENT_CHECKPOINT,
> +    COLO_EVENT_FAILOVER,
> +};
> +
>   void colo_info_init(void);
>   
>   void migrate_start_colo_process(MigrationState *s);
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 23b2d2c4cc..7ff3ae8904 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -27,11 +27,16 @@
>   #include "qemu/sockets.h"
>   #include "net/colo.h"
>   #include "sysemu/iothread.h"
> +#include "net/colo-compare.h"
> +#include "migration/colo.h"
>   
>   #define TYPE_COLO_COMPARE "colo-compare"
>   #define COLO_COMPARE(obj) \
>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>   
> +static QTAILQ_HEAD(, CompareState) net_compares =
> +       QTAILQ_HEAD_INITIALIZER(net_compares);
> +
>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>   #define MAX_QUEUE_SIZE 1024
>   
> @@ -41,6 +46,10 @@
>   /* TODO: Should be configurable */
>   #define REGULAR_PACKET_CHECK_MS 3000
>   
> +static QemuMutex event_mtx;
> +static QemuCond event_complete_cond;
> +static int event_unhandled_count;
> +
>   /*
>    *  + CompareState ++
>    *  |               |
> @@ -87,6 +96,11 @@ typedef struct CompareState {
>       IOThread *iothread;
>       GMainContext *worker_context;
>       QEMUTimer *packet_check_timer;
> +
> +    QEMUBH *event_bh;
> +    enum colo_event event;
> +
> +    QTAILQ_ENTRY(CompareState) next;
>   } CompareState;
>   
>   typedef struct CompareClass {
> @@ -736,6 +750,25 @@ static void check_old_packet_regular(void *opaque)
>                   REGULAR_PACKET_CHECK_MS);
>   }
>   
> +/* Public API, Used for COLO frame to notify compare event */
> +void colo_notify_compares_event(void *opaque, int event, Error **errp)
> +{
> +    CompareState *s;
> +
> +    qemu_mutex_lock(&event_mtx);
> +    QTAILQ_FOREACH(s, &net_compares, next) {
> +        s->event = event;
> +        qemu_bh_schedule(s->event_bh);
> +        event_unhandled_count++;
> +    }
> +    /* Wait all compare threads to finish handling this event */
> +    while (event_unhandled_count > 0) {
> +        qemu_cond_wait(&event_complete_cond, &event_mtx);
> +    }
> +
> +    qemu_mutex_unlock(&event_mtx);
> +}
> +
>   static void colo_compare_timer_init(CompareState *s)
>   {
>       AioContext *ctx = iothread_get_aio_context(s->iothread);
> @@ -756,6 +789,28 @@ static void colo_compare_timer_del(CompareState *s)
>       }
>    }
>   
> +static void colo_flush_packets(void *opaque, void *user_data);
> +
> +static void colo_compare_handle_event(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    switch (s->event) {
> +    case COLO_EVENT_CHECKPOINT:
> +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
> +        break;
> +    case COLO_EVENT_FAILOVER:
> +        break;
> +    default:
> +        break;
> +    }
> +    qemu_mutex_lock(&event_mtx);

Isn't this a deadlock? Since colo_notify_compares_event() won't release 
event_mtx until event_unhandled_count reaches zero.

> +    assert(event_unhandled_count > 0);
> +    event_unhandled_count--;
> +    qemu_cond_broadcast(&event_complete_cond);
> +    qemu_mutex_unlock(&event_mtx);
> +}
> +
>   static void colo_compare_iothread(CompareState *s)
>   {
>       object_ref(OBJECT(s->iothread));
> @@ -769,6 +824,7 @@ static void colo_compare_iothread(CompareState *s)
>                                s, s->worker_context, true);
>   
>       colo_compare_timer_init(s);
> +    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
>   }
>   
>   static char *compare_get_pri_indev(Object *obj, Error **errp)
> @@ -926,8 +982,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>       net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
>       net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
>   
> +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
> +
>       g_queue_init(&s->conn_list);
>   
> +    qemu_mutex_init(&event_mtx);
> +    qemu_cond_init(&event_complete_cond);
> +
>       s->connection_track_table = g_hash_table_new_full(connection_key_hash,
>                                                         connection_key_equal,
>                                                         g_free,
> @@ -990,6 +1051,7 @@ static void colo_compare_init(Object *obj)
>   static void colo_compare_finalize(Object *obj)
>   {
>       CompareState *s = COLO_COMPARE(obj);
> +    CompareState *tmp = NULL;
>   
>       qemu_chr_fe_deinit(&s->chr_pri_in, false);
>       qemu_chr_fe_deinit(&s->chr_sec_in, false);
> @@ -997,6 +1059,16 @@ static void colo_compare_finalize(Object *obj)
>       if (s->iothread) {
>           colo_compare_timer_del(s);
>       }
> +
> +    qemu_bh_delete(s->event_bh);
> +
> +    QTAILQ_FOREACH(tmp, &net_compares, next) {
> +        if (!strcmp(tmp->outdev, s->outdev)) {

This looks not elegant, can we compare by address or just use QLIST?

Thanks

> +            QTAILQ_REMOVE(&net_compares, s, next);
> +            break;
> +        }
> +    }
> +
>       /* Release all unhandled packets after compare thead exited */
>       g_queue_foreach(&s->conn_list, colo_flush_packets, s);
>   
> @@ -1009,6 +1081,10 @@ static void colo_compare_finalize(Object *obj)
>       if (s->iothread) {
>           object_unref(OBJECT(s->iothread));
>       }
> +
> +    qemu_mutex_destroy(&event_mtx);
> +    qemu_cond_destroy(&event_complete_cond);
> +
>       g_free(s->pri_indev);
>       g_free(s->sec_indev);
>       g_free(s->outdev);
> diff --git a/net/colo-compare.h b/net/colo-compare.h
> new file mode 100644
> index 0000000000..1b1ce76aea
> --- /dev/null
> +++ b/net/colo-compare.h
> @@ -0,0 +1,22 @@
> +/*
> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
> + * (a.k.a. Fault Tolerance or Continuous Replication)
> + *
> + * Copyright (c) 2017 HUAWEI TECHNOLOGIES CO., LTD.
> + * Copyright (c) 2017 FUJITSU LIMITED
> + * Copyright (c) 2017 Intel Corporation
> + *
> + * Authors:
> + *    zhanghailiang <zhang.zhanghailiang@huawei.com>
> + *    Zhang Chen <zhangckid@gmail.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_COLO_COMPARE_H
> +#define QEMU_COLO_COMPARE_H
> +
> +void colo_notify_compares_event(void *opaque, int event, Error **errp);
> +
> +#endif /* QEMU_COLO_COMPARE_H */
Zhang Chen June 10, 2018, 2:08 p.m. UTC | #2
On Mon, Jun 4, 2018 at 2:31 PM, Jason Wang <jasowang@redhat.com> wrote:

>
>
> On 2018年06月03日 13:05, Zhang Chen wrote:
>
>> While do checkpoint, we need to flush all the unhandled packets,
>> By using the filter notifier mechanism, we can easily to notify
>> every compare object to do this process, which runs inside
>> of compare threads as a coroutine.
>>
>> Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
>> Signed-off-by: Zhang Chen <zhangckid@gmail.com>
>> ---
>>   include/migration/colo.h |  6 ++++
>>   net/colo-compare.c       | 76 ++++++++++++++++++++++++++++++++++++++++
>>   net/colo-compare.h       | 22 ++++++++++++
>>   3 files changed, 104 insertions(+)
>>   create mode 100644 net/colo-compare.h
>>
>> diff --git a/include/migration/colo.h b/include/migration/colo.h
>> index 2fe48ad353..fefb2fcf4c 100644
>> --- a/include/migration/colo.h
>> +++ b/include/migration/colo.h
>> @@ -16,6 +16,12 @@
>>   #include "qemu-common.h"
>>   #include "qapi/qapi-types-migration.h"
>>   +enum colo_event {
>> +    COLO_EVENT_NONE,
>> +    COLO_EVENT_CHECKPOINT,
>> +    COLO_EVENT_FAILOVER,
>> +};
>> +
>>   void colo_info_init(void);
>>     void migrate_start_colo_process(MigrationState *s);
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 23b2d2c4cc..7ff3ae8904 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -27,11 +27,16 @@
>>   #include "qemu/sockets.h"
>>   #include "net/colo.h"
>>   #include "sysemu/iothread.h"
>> +#include "net/colo-compare.h"
>> +#include "migration/colo.h"
>>     #define TYPE_COLO_COMPARE "colo-compare"
>>   #define COLO_COMPARE(obj) \
>>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>   +static QTAILQ_HEAD(, CompareState) net_compares =
>> +       QTAILQ_HEAD_INITIALIZER(net_compares);
>> +
>>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>>   #define MAX_QUEUE_SIZE 1024
>>   @@ -41,6 +46,10 @@
>>   /* TODO: Should be configurable */
>>   #define REGULAR_PACKET_CHECK_MS 3000
>>   +static QemuMutex event_mtx;
>> +static QemuCond event_complete_cond;
>> +static int event_unhandled_count;
>> +
>>   /*
>>    *  + CompareState ++
>>    *  |               |
>> @@ -87,6 +96,11 @@ typedef struct CompareState {
>>       IOThread *iothread;
>>       GMainContext *worker_context;
>>       QEMUTimer *packet_check_timer;
>> +
>> +    QEMUBH *event_bh;
>> +    enum colo_event event;
>> +
>> +    QTAILQ_ENTRY(CompareState) next;
>>   } CompareState;
>>     typedef struct CompareClass {
>> @@ -736,6 +750,25 @@ static void check_old_packet_regular(void *opaque)
>>                   REGULAR_PACKET_CHECK_MS);
>>   }
>>   +/* Public API, Used for COLO frame to notify compare event */
>> +void colo_notify_compares_event(void *opaque, int event, Error **errp)
>> +{
>> +    CompareState *s;
>> +
>> +    qemu_mutex_lock(&event_mtx);
>> +    QTAILQ_FOREACH(s, &net_compares, next) {
>> +        s->event = event;
>> +        qemu_bh_schedule(s->event_bh);
>> +        event_unhandled_count++;
>> +    }
>> +    /* Wait all compare threads to finish handling this event */
>> +    while (event_unhandled_count > 0) {
>> +        qemu_cond_wait(&event_complete_cond, &event_mtx);
>> +    }
>> +
>> +    qemu_mutex_unlock(&event_mtx);
>> +}
>> +
>>   static void colo_compare_timer_init(CompareState *s)
>>   {
>>       AioContext *ctx = iothread_get_aio_context(s->iothread);
>> @@ -756,6 +789,28 @@ static void colo_compare_timer_del(CompareState *s)
>>       }
>>    }
>>   +static void colo_flush_packets(void *opaque, void *user_data);
>> +
>> +static void colo_compare_handle_event(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    switch (s->event) {
>> +    case COLO_EVENT_CHECKPOINT:
>> +        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
>> +        break;
>> +    case COLO_EVENT_FAILOVER:
>> +        break;
>> +    default:
>> +        break;
>> +    }
>> +    qemu_mutex_lock(&event_mtx);
>>
>
> Isn't this a deadlock? Since colo_notify_compares_event() won't release
> event_mtx until event_unhandled_count reaches zero.
>
>
Good catch!
I will fix it in next version.


>
> +    assert(event_unhandled_count > 0);
>> +    event_unhandled_count--;
>> +    qemu_cond_broadcast(&event_complete_cond);
>> +    qemu_mutex_unlock(&event_mtx);
>> +}
>> +
>>   static void colo_compare_iothread(CompareState *s)
>>   {
>>       object_ref(OBJECT(s->iothread));
>> @@ -769,6 +824,7 @@ static void colo_compare_iothread(CompareState *s)
>>                                s, s->worker_context, true);
>>         colo_compare_timer_init(s);
>> +    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
>>   }
>>     static char *compare_get_pri_indev(Object *obj, Error **errp)
>> @@ -926,8 +982,13 @@ static void colo_compare_complete(UserCreatable
>> *uc, Error **errp)
>>       net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize,
>> s->vnet_hdr);
>>       net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize,
>> s->vnet_hdr);
>>   +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
>> +
>>       g_queue_init(&s->conn_list);
>>   +    qemu_mutex_init(&event_mtx);
>> +    qemu_cond_init(&event_complete_cond);
>> +
>>       s->connection_track_table = g_hash_table_new_full(connecti
>> on_key_hash,
>>
>> connection_key_equal,
>>                                                         g_free,
>> @@ -990,6 +1051,7 @@ static void colo_compare_init(Object *obj)
>>   static void colo_compare_finalize(Object *obj)
>>   {
>>       CompareState *s = COLO_COMPARE(obj);
>> +    CompareState *tmp = NULL;
>>         qemu_chr_fe_deinit(&s->chr_pri_in, false);
>>       qemu_chr_fe_deinit(&s->chr_sec_in, false);
>> @@ -997,6 +1059,16 @@ static void colo_compare_finalize(Object *obj)
>>       if (s->iothread) {
>>           colo_compare_timer_del(s);
>>       }
>> +
>> +    qemu_bh_delete(s->event_bh);
>> +
>> +    QTAILQ_FOREACH(tmp, &net_compares, next) {
>> +        if (!strcmp(tmp->outdev, s->outdev)) {
>>
>
> This looks not elegant, can we compare by address or just use QLIST?
>
>
OK, I will compare by address in next version.

Thanks
Zhang Chen


> Thanks
>
>
> +            QTAILQ_REMOVE(&net_compares, s, next);
>> +            break;
>> +        }
>> +    }
>> +
>>       /* Release all unhandled packets after compare thead exited */
>>       g_queue_foreach(&s->conn_list, colo_flush_packets, s);
>>   @@ -1009,6 +1081,10 @@ static void colo_compare_finalize(Object *obj)
>>       if (s->iothread) {
>>           object_unref(OBJECT(s->iothread));
>>       }
>> +
>> +    qemu_mutex_destroy(&event_mtx);
>> +    qemu_cond_destroy(&event_complete_cond);
>> +
>>       g_free(s->pri_indev);
>>       g_free(s->sec_indev);
>>       g_free(s->outdev);
>> diff --git a/net/colo-compare.h b/net/colo-compare.h
>> new file mode 100644
>> index 0000000000..1b1ce76aea
>> --- /dev/null
>> +++ b/net/colo-compare.h
>> @@ -0,0 +1,22 @@
>> +/*
>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service
>> (COLO)
>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>> + *
>> + * Copyright (c) 2017 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2017 FUJITSU LIMITED
>> + * Copyright (c) 2017 Intel Corporation
>> + *
>> + * Authors:
>> + *    zhanghailiang <zhang.zhanghailiang@huawei.com>
>> + *    Zhang Chen <zhangckid@gmail.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef QEMU_COLO_COMPARE_H
>> +#define QEMU_COLO_COMPARE_H
>> +
>> +void colo_notify_compares_event(void *opaque, int event, Error **errp);
>> +
>> +#endif /* QEMU_COLO_COMPARE_H */
>>
>
>
diff mbox series

Patch

diff --git a/include/migration/colo.h b/include/migration/colo.h
index 2fe48ad353..fefb2fcf4c 100644
--- a/include/migration/colo.h
+++ b/include/migration/colo.h
@@ -16,6 +16,12 @@ 
 #include "qemu-common.h"
 #include "qapi/qapi-types-migration.h"
 
+enum colo_event {
+    COLO_EVENT_NONE,
+    COLO_EVENT_CHECKPOINT,
+    COLO_EVENT_FAILOVER,
+};
+
 void colo_info_init(void);
 
 void migrate_start_colo_process(MigrationState *s);
diff --git a/net/colo-compare.c b/net/colo-compare.c
index 23b2d2c4cc..7ff3ae8904 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -27,11 +27,16 @@ 
 #include "qemu/sockets.h"
 #include "net/colo.h"
 #include "sysemu/iothread.h"
+#include "net/colo-compare.h"
+#include "migration/colo.h"
 
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
 
+static QTAILQ_HEAD(, CompareState) net_compares =
+       QTAILQ_HEAD_INITIALIZER(net_compares);
+
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
 #define MAX_QUEUE_SIZE 1024
 
@@ -41,6 +46,10 @@ 
 /* TODO: Should be configurable */
 #define REGULAR_PACKET_CHECK_MS 3000
 
+static QemuMutex event_mtx;
+static QemuCond event_complete_cond;
+static int event_unhandled_count;
+
 /*
  *  + CompareState ++
  *  |               |
@@ -87,6 +96,11 @@  typedef struct CompareState {
     IOThread *iothread;
     GMainContext *worker_context;
     QEMUTimer *packet_check_timer;
+
+    QEMUBH *event_bh;
+    enum colo_event event;
+
+    QTAILQ_ENTRY(CompareState) next;
 } CompareState;
 
 typedef struct CompareClass {
@@ -736,6 +750,25 @@  static void check_old_packet_regular(void *opaque)
                 REGULAR_PACKET_CHECK_MS);
 }
 
+/* Public API, Used for COLO frame to notify compare event */
+void colo_notify_compares_event(void *opaque, int event, Error **errp)
+{
+    CompareState *s;
+
+    qemu_mutex_lock(&event_mtx);
+    QTAILQ_FOREACH(s, &net_compares, next) {
+        s->event = event;
+        qemu_bh_schedule(s->event_bh);
+        event_unhandled_count++;
+    }
+    /* Wait all compare threads to finish handling this event */
+    while (event_unhandled_count > 0) {
+        qemu_cond_wait(&event_complete_cond, &event_mtx);
+    }
+
+    qemu_mutex_unlock(&event_mtx);
+}
+
 static void colo_compare_timer_init(CompareState *s)
 {
     AioContext *ctx = iothread_get_aio_context(s->iothread);
@@ -756,6 +789,28 @@  static void colo_compare_timer_del(CompareState *s)
     }
  }
 
+static void colo_flush_packets(void *opaque, void *user_data);
+
+static void colo_compare_handle_event(void *opaque)
+{
+    CompareState *s = opaque;
+
+    switch (s->event) {
+    case COLO_EVENT_CHECKPOINT:
+        g_queue_foreach(&s->conn_list, colo_flush_packets, s);
+        break;
+    case COLO_EVENT_FAILOVER:
+        break;
+    default:
+        break;
+    }
+    qemu_mutex_lock(&event_mtx);
+    assert(event_unhandled_count > 0);
+    event_unhandled_count--;
+    qemu_cond_broadcast(&event_complete_cond);
+    qemu_mutex_unlock(&event_mtx);
+}
+
 static void colo_compare_iothread(CompareState *s)
 {
     object_ref(OBJECT(s->iothread));
@@ -769,6 +824,7 @@  static void colo_compare_iothread(CompareState *s)
                              s, s->worker_context, true);
 
     colo_compare_timer_init(s);
+    s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
 }
 
 static char *compare_get_pri_indev(Object *obj, Error **errp)
@@ -926,8 +982,13 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
 
+    QTAILQ_INSERT_TAIL(&net_compares, s, next);
+
     g_queue_init(&s->conn_list);
 
+    qemu_mutex_init(&event_mtx);
+    qemu_cond_init(&event_complete_cond);
+
     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
                                                       connection_key_equal,
                                                       g_free,
@@ -990,6 +1051,7 @@  static void colo_compare_init(Object *obj)
 static void colo_compare_finalize(Object *obj)
 {
     CompareState *s = COLO_COMPARE(obj);
+    CompareState *tmp = NULL;
 
     qemu_chr_fe_deinit(&s->chr_pri_in, false);
     qemu_chr_fe_deinit(&s->chr_sec_in, false);
@@ -997,6 +1059,16 @@  static void colo_compare_finalize(Object *obj)
     if (s->iothread) {
         colo_compare_timer_del(s);
     }
+
+    qemu_bh_delete(s->event_bh);
+
+    QTAILQ_FOREACH(tmp, &net_compares, next) {
+        if (!strcmp(tmp->outdev, s->outdev)) {
+            QTAILQ_REMOVE(&net_compares, s, next);
+            break;
+        }
+    }
+
     /* Release all unhandled packets after compare thead exited */
     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
 
@@ -1009,6 +1081,10 @@  static void colo_compare_finalize(Object *obj)
     if (s->iothread) {
         object_unref(OBJECT(s->iothread));
     }
+
+    qemu_mutex_destroy(&event_mtx);
+    qemu_cond_destroy(&event_complete_cond);
+
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);
diff --git a/net/colo-compare.h b/net/colo-compare.h
new file mode 100644
index 0000000000..1b1ce76aea
--- /dev/null
+++ b/net/colo-compare.h
@@ -0,0 +1,22 @@ 
+/*
+ * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
+ * (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ * Copyright (c) 2017 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2017 FUJITSU LIMITED
+ * Copyright (c) 2017 Intel Corporation
+ *
+ * Authors:
+ *    zhanghailiang <zhang.zhanghailiang@huawei.com>
+ *    Zhang Chen <zhangckid@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_COLO_COMPARE_H
+#define QEMU_COLO_COMPARE_H
+
+void colo_notify_compares_event(void *opaque, int event, Error **errp);
+
+#endif /* QEMU_COLO_COMPARE_H */