diff mbox

[RFC,V3,3/4] colo-compare: introduce packet comparison thread

Message ID 1460977906-25218-4-git-send-email-zhangchen.fnst@cn.fujitsu.com
State New
Headers show

Commit Message

Zhang Chen April 18, 2016, 11:11 a.m. UTC
if packets are same, we send primary packet and drop secondary
packet, otherwise notify COLO do checkpoint.

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-compare.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 trace-events       |   2 +
 2 files changed, 128 insertions(+)

Comments

Jason Wang April 28, 2016, 7:58 a.m. UTC | #1
On 04/18/2016 07:11 PM, Zhang Chen wrote:
> if packets are same, we send primary packet and drop secondary
> packet, otherwise notify COLO do checkpoint.
>
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  net/colo-compare.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  trace-events       |   2 +
>  2 files changed, 128 insertions(+)
>
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index dc57eac..4b5a2d4 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -26,6 +26,7 @@
>  #include "qemu/jhash.h"
>  #include "net/eth.h"
>  
> +#define DEBUG_TCP_COMPARE 1
>  #define TYPE_COLO_COMPARE "colo-compare"
>  #define COLO_COMPARE(obj) \
>      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
> @@ -90,6 +91,13 @@ typedef struct CompareState {
>      GQueue unprocessed_connections;
>      /* proxy current hash size */
>      uint32_t hashtable_size;
> +
> +    /* notify compare thread */
> +    QemuEvent event;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    int thread_status;
> +
>  } CompareState;
>  
>  typedef struct CompareClass {
> @@ -132,6 +140,15 @@ enum {
>      SECONDARY_IN,
>  };
>  
> +enum {
> +    /* compare thread isn't started */
> +    COMPARE_THREAD_NONE,
> +    /* compare thread is running */
> +    COMPARE_THREAD_RUNNING,
> +    /* compare thread exit */
> +    COMPARE_THREAD_EXIT,
> +};
> +
>  static void packet_destroy(void *opaque, void *user_data);
>  static int compare_chr_send(CharDriverState *out,
>                              const uint8_t *buf,
> @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data)
>      g_slice_free(Packet, pkt);
>  }
>  
> +static inline void colo_dump_packet(Packet *pkt)
> +{
> +    int i;
> +    for (i = 0; i < pkt->size; i++) {
> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
> +    }
> +    printf("\n");

Can we use something like qemu_hexdump() here?

> +}
> +
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    trace_colo_compare_with_int("ppkt size", ppkt->size);
> +    trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src));
> +    trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst));
> +    trace_colo_compare_with_int("spkt size", spkt->size);
> +    trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src));
> +    trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst));

Can we use a single tracepoint here instead?

> +
> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
> +{
> +    trace_colo_compare_main("compare all");
> +    return colo_packet_compare(ppkt, spkt);

Why need this?

> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare connection
> + */
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare_all);
> +
> +        if (result) {
> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            trace_colo_compare_main("packet same and release packet");
> +            g_queue_remove(&conn->secondary_list, result->data);
> +        } else {
> +            trace_colo_compare_main("packet different");
> +            g_queue_push_head(&conn->primary_list, pkt);

Is this possible that the packet from secondary has not been arrived on
time? If yes, do we still need to notify the checkpoint here?

> +            /* TODO: colo_notify_checkpoint();*/
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
> +        qemu_event_wait(&s->event);
> +        qemu_event_reset(&s->event);
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
> +        qemu_mutex_unlock(&s->conn_list_lock);
> +    }
> +
> +    return NULL;
> +}
> +
>  static int compare_chr_send(CharDriverState *out,
>                              const uint8_t *buf,
>                              uint32_t size)
> @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>          if (packet_enqueue(s, PRIMARY_IN)) {
>              trace_colo_compare_main("primary: unsupported packet in");
>              compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>              trace_colo_compare_main("secondary: unsupported packet in");
>              /* should we send sec arp pkt? */
>              compare_chr_send(s->chr_out, s->sec_rs.buf, s->sec_rs.packet_len);
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>  static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
> +    char thread_name[64];
> +    static int compare_id;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
>  
>      g_queue_init(&s->conn_list);
> +    qemu_event_init(&s->event, false);
>      qemu_mutex_init(&s->conn_list_lock);
>      s->hashtable_size = 0;
>  
> @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>                                                        g_free,
>                                                        connection_destroy);
>  
> +    s->thread_status = COMPARE_THREAD_RUNNING;
> +    sprintf(thread_name, "compare %d", compare_id);
> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
>      return;
>  }
>  
> @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
>      qemu_mutex_destroy(&s->conn_list_lock);
> +
> +    if (s->thread.thread) {
> +        s->thread_status = COMPARE_THREAD_EXIT;
> +        qemu_event_set(&s->event);
> +        qemu_thread_join(&s->thread);
> +    }
> +    qemu_event_destroy(&s->event);
>  }
>  
>  static void colo_compare_init(Object *obj)
> diff --git a/trace-events b/trace-events
> index 8862288..978c47f 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
>  
>  # net/colo-compare.c
>  colo_compare_main(const char *chr) "chr: %s"
> +colo_compare_with_int(const char *sta, int size) ": %s = %d"
> +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"
Zhang Chen April 28, 2016, 10:31 a.m. UTC | #2
On 04/28/2016 03:58 PM, Jason Wang wrote:
>
> On 04/18/2016 07:11 PM, Zhang Chen wrote:
>> if packets are same, we send primary packet and drop secondary
>> packet, otherwise notify COLO do checkpoint.
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-compare.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   trace-events       |   2 +
>>   2 files changed, 128 insertions(+)
>>
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index dc57eac..4b5a2d4 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -26,6 +26,7 @@
>>   #include "qemu/jhash.h"
>>   #include "net/eth.h"
>>   
>> +#define DEBUG_TCP_COMPARE 1
>>   #define TYPE_COLO_COMPARE "colo-compare"
>>   #define COLO_COMPARE(obj) \
>>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>> @@ -90,6 +91,13 @@ typedef struct CompareState {
>>       GQueue unprocessed_connections;
>>       /* proxy current hash size */
>>       uint32_t hashtable_size;
>> +
>> +    /* notify compare thread */
>> +    QemuEvent event;
>> +    /* compare thread, a thread for each NIC */
>> +    QemuThread thread;
>> +    int thread_status;
>> +
>>   } CompareState;
>>   
>>   typedef struct CompareClass {
>> @@ -132,6 +140,15 @@ enum {
>>       SECONDARY_IN,
>>   };
>>   
>> +enum {
>> +    /* compare thread isn't started */
>> +    COMPARE_THREAD_NONE,
>> +    /* compare thread is running */
>> +    COMPARE_THREAD_RUNNING,
>> +    /* compare thread exit */
>> +    COMPARE_THREAD_EXIT,
>> +};
>> +
>>   static void packet_destroy(void *opaque, void *user_data);
>>   static int compare_chr_send(CharDriverState *out,
>>                               const uint8_t *buf,
>> @@ -336,6 +353,94 @@ static void packet_destroy(void *opaque, void *user_data)
>>       g_slice_free(Packet, pkt);
>>   }
>>   
>> +static inline void colo_dump_packet(Packet *pkt)
>> +{
>> +    int i;
>> +    for (i = 0; i < pkt->size; i++) {
>> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
>> +    }
>> +    printf("\n");
> Can we use something like qemu_hexdump() here?

Thanks~~
I will change it to qemu_hexdump

>
>> +}
>> +
>> +/*
>> + * The IP packets sent by primary and secondary
>> + * will be compared in here
>> + * TODO support ip fragment, Out-Of-Order
>> + * return:    0  means packet same
>> + *            > 0 || < 0 means packet different
>> + */
>> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>> +{
>> +    trace_colo_compare_with_int("ppkt size", ppkt->size);
>> +    trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src));
>> +    trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst));
>> +    trace_colo_compare_with_int("spkt size", spkt->size);
>> +    trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src));
>> +    trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst));
> Can we use a single tracepoint here instead?

Yes,fix in next.

>
>> +
>> +    if (ppkt->size == spkt->size) {
>> +        return memcmp(ppkt->data, spkt->data, spkt->size);
>> +    } else {
>> +        return -1;
>> +    }
>> +}
>> +
>> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
>> +{
>> +    trace_colo_compare_main("compare all");
>> +    return colo_packet_compare(ppkt, spkt);
> Why need this?

just temp name,will change in patch 4/4

>
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare connection
>> + */
>> +static void colo_compare_connection(void *opaque, void *user_data)
>> +{
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +    GList *result = NULL;
>> +    int ret;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>> +           !g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_head(&conn->primary_list);
>> +        result = g_queue_find_custom(&conn->secondary_list,
>> +                              pkt, (GCompareFunc)colo_packet_compare_all);
>> +
>> +        if (result) {
>> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
>> +            if (ret < 0) {
>> +                error_report("colo_send_primary_packet failed");
>> +            }
>> +            trace_colo_compare_main("packet same and release packet");
>> +            g_queue_remove(&conn->secondary_list, result->data);
>> +        } else {
>> +            trace_colo_compare_main("packet different");
>> +            g_queue_push_head(&conn->primary_list, pkt);
> Is this possible that the packet from secondary has not been arrived on
> time? If yes, do we still need to notify the checkpoint here?

Yes,the packet of secondary may not arrived.
we will hold primary packet to next periodic checkpoint
to flush it. and more, I consider to set a timer
to flush timeout(200ms???) packet like Dave's branch.


Thanks
zhangchen

>
>> +            /* TODO: colo_notify_checkpoint();*/
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +}
>> +
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
>> +        qemu_event_wait(&s->event);
>> +        qemu_event_reset(&s->event);
>> +        qemu_mutex_lock(&s->conn_list_lock);
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
>> +        qemu_mutex_unlock(&s->conn_list_lock);
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out,
>>                               const uint8_t *buf,
>>                               uint32_t size)
>> @@ -440,6 +545,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>>           if (packet_enqueue(s, PRIMARY_IN)) {
>>               trace_colo_compare_main("primary: unsupported packet in");
>>               compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
>> +        } else {
>> +            qemu_event_set(&s->event);
>>           }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> @@ -461,6 +568,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>>               trace_colo_compare_main("secondary: unsupported packet in");
>>               /* should we send sec arp pkt? */
>>               compare_chr_send(s->chr_out, s->sec_rs.buf, s->sec_rs.packet_len);
>> +        } else {
>> +            qemu_event_set(&s->event);
>>           }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> @@ -519,6 +628,8 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>>   static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>> +    char thread_name[64];
>> +    static int compare_id;
>>   
>>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -564,6 +675,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>       QTAILQ_INSERT_TAIL(&net_compares, s, next);
>>   
>>       g_queue_init(&s->conn_list);
>> +    qemu_event_init(&s->event, false);
>>       qemu_mutex_init(&s->conn_list_lock);
>>       s->hashtable_size = 0;
>>   
>> @@ -572,6 +684,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>                                                         g_free,
>>                                                         connection_destroy);
>>   
>> +    s->thread_status = COMPARE_THREAD_RUNNING;
>> +    sprintf(thread_name, "compare %d", compare_id);
>> +    qemu_thread_create(&s->thread, thread_name,
>> +                       colo_compare_thread, s,
>> +                       QEMU_THREAD_JOINABLE);
>> +    compare_id++;
>> +
>>       return;
>>   }
>>   
>> @@ -607,6 +726,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>>           QTAILQ_REMOVE(&net_compares, s, next);
>>       }
>>       qemu_mutex_destroy(&s->conn_list_lock);
>> +
>> +    if (s->thread.thread) {
>> +        s->thread_status = COMPARE_THREAD_EXIT;
>> +        qemu_event_set(&s->event);
>> +        qemu_thread_join(&s->thread);
>> +    }
>> +    qemu_event_destroy(&s->event);
>>   }
>>   
>>   static void colo_compare_init(Object *obj)
>> diff --git a/trace-events b/trace-events
>> index 8862288..978c47f 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
>>   
>>   # net/colo-compare.c
>>   colo_compare_main(const char *chr) "chr: %s"
>> +colo_compare_with_int(const char *sta, int size) ": %s = %d"
>> +colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"
>
>
> .
>
Jason Wang April 29, 2016, 2:07 a.m. UTC | #3
On 04/28/2016 06:31 PM, Zhang Chen wrote:
>>> +/*
>>> + * called from the compare thread on the primary
>>> + * for compare connection
>>> + */
>>> +static void colo_compare_connection(void *opaque, void *user_data)
>>> +{
>>> +    Connection *conn = opaque;
>>> +    Packet *pkt = NULL;
>>> +    GList *result = NULL;
>>> +    int ret;
>>> +
>>> +    qemu_mutex_lock(&conn->list_lock);
>>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>>> +           !g_queue_is_empty(&conn->secondary_list)) {
>>> +        pkt = g_queue_pop_head(&conn->primary_list);
>>> +        result = g_queue_find_custom(&conn->secondary_list,
>>> +                              pkt,
>>> (GCompareFunc)colo_packet_compare_all);
>>> +
>>> +        if (result) {
>>> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data,
>>> pkt->size);
>>> +            if (ret < 0) {
>>> +                error_report("colo_send_primary_packet failed");
>>> +            }
>>> +            trace_colo_compare_main("packet same and release packet");
>>> +            g_queue_remove(&conn->secondary_list, result->data);
>>> +        } else {
>>> +            trace_colo_compare_main("packet different");
>>> +            g_queue_push_head(&conn->primary_list, pkt);
>> Is this possible that the packet from secondary has not been arrived on
>> time? If yes, do we still need to notify the checkpoint here?
>
> Yes,the packet of secondary may not arrived.
> we will hold primary packet to next periodic checkpoint
> to flush it. and more, I consider to set a timer
> to flush timeout(200ms???) packet like Dave's branch.
>
>
> Thanks
> zhangchen 

I was wondering maybe you can merge or unify all other changes from
Dave's branch?
Zhang Chen April 29, 2016, 8:28 a.m. UTC | #4
On 04/29/2016 10:07 AM, Jason Wang wrote:
>
> On 04/28/2016 06:31 PM, Zhang Chen wrote:
>>>> +/*
>>>> + * called from the compare thread on the primary
>>>> + * for compare connection
>>>> + */
>>>> +static void colo_compare_connection(void *opaque, void *user_data)
>>>> +{
>>>> +    Connection *conn = opaque;
>>>> +    Packet *pkt = NULL;
>>>> +    GList *result = NULL;
>>>> +    int ret;
>>>> +
>>>> +    qemu_mutex_lock(&conn->list_lock);
>>>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>>>> +           !g_queue_is_empty(&conn->secondary_list)) {
>>>> +        pkt = g_queue_pop_head(&conn->primary_list);
>>>> +        result = g_queue_find_custom(&conn->secondary_list,
>>>> +                              pkt,
>>>> (GCompareFunc)colo_packet_compare_all);
>>>> +
>>>> +        if (result) {
>>>> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data,
>>>> pkt->size);
>>>> +            if (ret < 0) {
>>>> +                error_report("colo_send_primary_packet failed");
>>>> +            }
>>>> +            trace_colo_compare_main("packet same and release packet");
>>>> +            g_queue_remove(&conn->secondary_list, result->data);
>>>> +        } else {
>>>> +            trace_colo_compare_main("packet different");
>>>> +            g_queue_push_head(&conn->primary_list, pkt);
>>> Is this possible that the packet from secondary has not been arrived on
>>> time? If yes, do we still need to notify the checkpoint here?
>> Yes,the packet of secondary may not arrived.
>> we will hold primary packet to next periodic checkpoint
>> to flush it. and more, I consider to set a timer
>> to flush timeout(200ms???) packet like Dave's branch.
>>
>>
>> Thanks
>> zhangchen
> I was wondering maybe you can merge or unify all other changes from
> Dave's branch?
>

Yes, I will unify some codes from Dave's colo-proxy branch.

Thanks
Zhang Chen

> .
>
Dr. David Alan Gilbert April 29, 2016, 11:20 a.m. UTC | #5
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> 
> 
> On 04/29/2016 10:07 AM, Jason Wang wrote:
> >
> >On 04/28/2016 06:31 PM, Zhang Chen wrote:
> >>>>+/*
> >>>>+ * called from the compare thread on the primary
> >>>>+ * for compare connection
> >>>>+ */
> >>>>+static void colo_compare_connection(void *opaque, void *user_data)
> >>>>+{
> >>>>+    Connection *conn = opaque;
> >>>>+    Packet *pkt = NULL;
> >>>>+    GList *result = NULL;
> >>>>+    int ret;
> >>>>+
> >>>>+    qemu_mutex_lock(&conn->list_lock);
> >>>>+    while (!g_queue_is_empty(&conn->primary_list) &&
> >>>>+           !g_queue_is_empty(&conn->secondary_list)) {
> >>>>+        pkt = g_queue_pop_head(&conn->primary_list);
> >>>>+        result = g_queue_find_custom(&conn->secondary_list,
> >>>>+                              pkt,
> >>>>(GCompareFunc)colo_packet_compare_all);
> >>>>+
> >>>>+        if (result) {
> >>>>+            ret = compare_chr_send(pkt->s->chr_out, pkt->data,
> >>>>pkt->size);
> >>>>+            if (ret < 0) {
> >>>>+                error_report("colo_send_primary_packet failed");
> >>>>+            }
> >>>>+            trace_colo_compare_main("packet same and release packet");
> >>>>+            g_queue_remove(&conn->secondary_list, result->data);
> >>>>+        } else {
> >>>>+            trace_colo_compare_main("packet different");
> >>>>+            g_queue_push_head(&conn->primary_list, pkt);
> >>>Is this possible that the packet from secondary has not been arrived on
> >>>time? If yes, do we still need to notify the checkpoint here?
> >>Yes,the packet of secondary may not arrived.
> >>we will hold primary packet to next periodic checkpoint
> >>to flush it. and more, I consider to set a timer
> >>to flush timeout(200ms???) packet like Dave's branch.
> >>
> >>
> >>Thanks
> >>zhangchen
> >I was wondering maybe you can merge or unify all other changes from
> >Dave's branch?
> >
> 
> Yes, I will unify some codes from Dave's colo-proxy branch.

Of course always check what I've written; some of that branch
was quite hacky itself so don't just assume it's good!

Dave

> 
> Thanks
> Zhang Chen
> 
> >.
> >
> 
> -- 
> Thanks
> zhangchen
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/net/colo-compare.c b/net/colo-compare.c
index dc57eac..4b5a2d4 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -26,6 +26,7 @@ 
 #include "qemu/jhash.h"
 #include "net/eth.h"
 
+#define DEBUG_TCP_COMPARE 1
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@@ -90,6 +91,13 @@  typedef struct CompareState {
     GQueue unprocessed_connections;
     /* proxy current hash size */
     uint32_t hashtable_size;
+
+    /* notify compare thread */
+    QemuEvent event;
+    /* compare thread, a thread for each NIC */
+    QemuThread thread;
+    int thread_status;
+
 } CompareState;
 
 typedef struct CompareClass {
@@ -132,6 +140,15 @@  enum {
     SECONDARY_IN,
 };
 
+enum {
+    /* compare thread isn't started */
+    COMPARE_THREAD_NONE,
+    /* compare thread is running */
+    COMPARE_THREAD_RUNNING,
+    /* compare thread exit */
+    COMPARE_THREAD_EXIT,
+};
+
 static void packet_destroy(void *opaque, void *user_data);
 static int compare_chr_send(CharDriverState *out,
                             const uint8_t *buf,
@@ -336,6 +353,94 @@  static void packet_destroy(void *opaque, void *user_data)
     g_slice_free(Packet, pkt);
 }
 
+static inline void colo_dump_packet(Packet *pkt)
+{
+    int i;
+    for (i = 0; i < pkt->size; i++) {
+        printf("%02x ", ((uint8_t *)pkt->data)[i]);
+    }
+    printf("\n");
+}
+
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return:    0  means packet same
+ *            > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+    trace_colo_compare_with_int("ppkt size", ppkt->size);
+    trace_colo_compare_with_char("ppkt ip_src", inet_ntoa(ppkt->ip->ip_src));
+    trace_colo_compare_with_char("ppkt ip_dst", inet_ntoa(ppkt->ip->ip_dst));
+    trace_colo_compare_with_int("spkt size", spkt->size);
+    trace_colo_compare_with_char("spkt ip_src", inet_ntoa(spkt->ip->ip_src));
+    trace_colo_compare_with_char("spkt ip_dst", inet_ntoa(spkt->ip->ip_dst));
+
+    if (ppkt->size == spkt->size) {
+        return memcmp(ppkt->data, spkt->data, spkt->size);
+    } else {
+        return -1;
+    }
+}
+
+static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
+{
+    trace_colo_compare_main("compare all");
+    return colo_packet_compare(ppkt, spkt);
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare connection
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+    GList *result = NULL;
+    int ret;
+
+    qemu_mutex_lock(&conn->list_lock);
+    while (!g_queue_is_empty(&conn->primary_list) &&
+           !g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_head(&conn->primary_list);
+        result = g_queue_find_custom(&conn->secondary_list,
+                              pkt, (GCompareFunc)colo_packet_compare_all);
+
+        if (result) {
+            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
+            if (ret < 0) {
+                error_report("colo_send_primary_packet failed");
+            }
+            trace_colo_compare_main("packet same and release packet");
+            g_queue_remove(&conn->secondary_list, result->data);
+        } else {
+            trace_colo_compare_main("packet different");
+            g_queue_push_head(&conn->primary_list, pkt);
+            /* TODO: colo_notify_checkpoint();*/
+            break;
+        }
+    }
+    qemu_mutex_unlock(&conn->list_lock);
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+    CompareState *s = opaque;
+
+    while (s->thread_status == COMPARE_THREAD_RUNNING) {
+        qemu_event_wait(&s->event);
+        qemu_event_reset(&s->event);
+        qemu_mutex_lock(&s->conn_list_lock);
+        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
+        qemu_mutex_unlock(&s->conn_list_lock);
+    }
+
+    return NULL;
+}
+
 static int compare_chr_send(CharDriverState *out,
                             const uint8_t *buf,
                             uint32_t size)
@@ -440,6 +545,8 @@  static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
         if (packet_enqueue(s, PRIMARY_IN)) {
             trace_colo_compare_main("primary: unsupported packet in");
             compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
+        } else {
+            qemu_event_set(&s->event);
         }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
@@ -461,6 +568,8 @@  static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
             trace_colo_compare_main("secondary: unsupported packet in");
             /* should we send sec arp pkt? */
             compare_chr_send(s->chr_out, s->sec_rs.buf, s->sec_rs.packet_len);
+        } else {
+            qemu_event_set(&s->event);
         }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
@@ -519,6 +628,8 @@  static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
+    char thread_name[64];
+    static int compare_id;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -564,6 +675,7 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
     QTAILQ_INSERT_TAIL(&net_compares, s, next);
 
     g_queue_init(&s->conn_list);
+    qemu_event_init(&s->event, false);
     qemu_mutex_init(&s->conn_list_lock);
     s->hashtable_size = 0;
 
@@ -572,6 +684,13 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       g_free,
                                                       connection_destroy);
 
+    s->thread_status = COMPARE_THREAD_RUNNING;
+    sprintf(thread_name, "compare %d", compare_id);
+    qemu_thread_create(&s->thread, thread_name,
+                       colo_compare_thread, s,
+                       QEMU_THREAD_JOINABLE);
+    compare_id++;
+
     return;
 }
 
@@ -607,6 +726,13 @@  static void colo_compare_class_finalize(ObjectClass *oc, void *data)
         QTAILQ_REMOVE(&net_compares, s, next);
     }
     qemu_mutex_destroy(&s->conn_list_lock);
+
+    if (s->thread.thread) {
+        s->thread_status = COMPARE_THREAD_EXIT;
+        qemu_event_set(&s->event);
+        qemu_thread_join(&s->thread);
+    }
+    qemu_event_destroy(&s->event);
 }
 
 static void colo_compare_init(Object *obj)
diff --git a/trace-events b/trace-events
index 8862288..978c47f 100644
--- a/trace-events
+++ b/trace-events
@@ -1919,3 +1919,5 @@  aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
 
 # net/colo-compare.c
 colo_compare_main(const char *chr) "chr: %s"
+colo_compare_with_int(const char *sta, int size) ": %s = %d"
+colo_compare_with_char(const char *sta, const char *stb) ": %s = %s"