diff mbox series

[v13,09/12] migration: Start sending messages

Message ID 20180523111817.1463-10-quintela@redhat.com
State New
Headers show
Series Multifd | expand

Commit Message

Juan Quintela May 23, 2018, 11:18 a.m. UTC
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 29 ++++++++++++++++++++++++-----
 1 file changed, 24 insertions(+), 5 deletions(-)

Comments

Dr. David Alan Gilbert June 11, 2018, 3:49 p.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> ---
>  migration/ram.c | 29 ++++++++++++++++++++++++-----
>  1 file changed, 24 insertions(+), 5 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 28f5cea4d8..2584130c85 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -729,9 +729,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>      RAMBlock *block;
>      int i;
>  
> -    /* ToDo: We can't use it until we haven't received a message */
> -    return 0;
> -
>      be32_to_cpus(&packet->magic);
>      if (packet->magic != MULTIFD_MAGIC) {
>          error_setg(errp, "multifd: received packet "
> @@ -965,6 +962,7 @@ static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
>      Error *local_err = NULL;
> +    int ret;
>  
>      trace_multifd_send_thread_start(p->id);
>  
> @@ -992,7 +990,16 @@ static void *multifd_send_thread(void *opaque)
>  
>              trace_multifd_send(p->id, seq, used, flags);
>  
> -            /* ToDo: send packet here */
> +            ret = qio_channel_write_all(p->c, (void *)p->packet,
> +                                        p->packet_len, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
> +
> +            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
>  
>              qemu_mutex_lock(&p->mutex);
>              p->pending_job--;
> @@ -1204,7 +1211,14 @@ static void *multifd_recv_thread(void *opaque)
>              uint32_t flags;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            /* ToDo: recv packet here */
> +            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                           p->packet_len, &local_err);
> +            if (ret == 0) {   /* EOF */
> +                break;
> +            }
> +            if (ret == -1) {   /* Error */
> +                break;
> +            }
>  
>              qemu_mutex_lock(&p->mutex);
>              ret = multifd_recv_unfill_packet(p, &local_err);
> @@ -1221,6 +1235,11 @@ static void *multifd_recv_thread(void *opaque)
>              p->num_pages += used;
>              qemu_mutex_unlock(&p->mutex);
>  
> +            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
> +
>              if (flags & MULTIFD_FLAG_SYNC) {
>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>                  qemu_sem_wait(&p->sem_sync);
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 28f5cea4d8..2584130c85 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -729,9 +729,6 @@  static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     RAMBlock *block;
     int i;
 
-    /* ToDo: We can't use it until we haven't received a message */
-    return 0;
-
     be32_to_cpus(&packet->magic);
     if (packet->magic != MULTIFD_MAGIC) {
         error_setg(errp, "multifd: received packet "
@@ -965,6 +962,7 @@  static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret;
 
     trace_multifd_send_thread_start(p->id);
 
@@ -992,7 +990,16 @@  static void *multifd_send_thread(void *opaque)
 
             trace_multifd_send(p->id, seq, used, flags);
 
-            /* ToDo: send packet here */
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
+            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
@@ -1204,7 +1211,14 @@  static void *multifd_recv_thread(void *opaque)
             uint32_t flags;
             qemu_mutex_unlock(&p->mutex);
 
-            /* ToDo: recv packet here */
+            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                           p->packet_len, &local_err);
+            if (ret == 0) {   /* EOF */
+                break;
+            }
+            if (ret == -1) {   /* Error */
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
             ret = multifd_recv_unfill_packet(p, &local_err);
@@ -1221,6 +1235,11 @@  static void *multifd_recv_thread(void *opaque)
             p->num_pages += used;
             qemu_mutex_unlock(&p->mutex);
 
+            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_recv_state->sem_sync);
                 qemu_sem_wait(&p->sem_sync);