diff mbox series

[v12,18/21] migration: Start sending messages

Message ID 20180425112723.1111-19-quintela@redhat.com
State New
Headers show
Series Multifd | expand

Commit Message

Juan Quintela April 25, 2018, 11:27 a.m. UTC
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 30 ++++++++++++++++++++++++------
 1 file changed, 24 insertions(+), 6 deletions(-)

Comments

Dr. David Alan Gilbert May 3, 2018, 2:55 p.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 30 ++++++++++++++++++++++++------
>  1 file changed, 24 insertions(+), 6 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 862ec53d32..9adbaa81f9 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -625,9 +625,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>      RAMBlock *block;
>      int i;
>  
> -    /* ToDo: We can't use it until we haven't received a message */
> -    return 0;
> -
>      be32_to_cpus(&packet->magic);
>      if (packet->magic != MULTIFD_MAGIC) {
>          error_setg(errp, "multifd: received packet "
> @@ -851,6 +848,7 @@ static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
>      Error *local_err = NULL;
> +    int ret;
>  
>      trace_multifd_send_thread_start(p->id);
>  
> @@ -878,10 +876,18 @@ static void *multifd_send_thread(void *opaque)
>  
>              trace_multifd_send(p->id, seq, used, flags);
>  
> -            /* ToDo: send packet here */
> +            ret = qio_channel_write_all(p->c, (void *)p->packet,
> +                                        p->packet_len, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
> +
> +            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
>  
>              qemu_mutex_lock(&p->mutex);
> -            p->flags = 0;

What's this change?

Other than that looks OK.

Dave

>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
>  
> @@ -1091,7 +1097,14 @@ static void *multifd_recv_thread(void *opaque)
>              uint32_t flags;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            /* ToDo: recv packet here */
> +            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                           p->packet_len, &local_err);
> +            if (ret == 0) {   /* EOF */
> +                break;
> +            }
> +            if (ret == -1) {   /* Error */
> +                break;
> +            }
>  
>              qemu_mutex_lock(&p->mutex);
>              ret = multifd_recv_unfill_packet(p, &local_err);
> @@ -1108,6 +1121,11 @@ static void *multifd_recv_thread(void *opaque)
>              p->num_pages += used;
>              qemu_mutex_unlock(&p->mutex);
>  
> +            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +            if (ret != 0) {
> +                break;
> +            }
> +
>              if (flags & MULTIFD_FLAG_SYNC) {
>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>                  qemu_sem_wait(&p->sem_sync);
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela May 23, 2018, 10:51 a.m. UTC | #2
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 30 ++++++++++++++++++++++++------
>>  1 file changed, 24 insertions(+), 6 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 862ec53d32..9adbaa81f9 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -625,9 +625,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>>      RAMBlock *block;
>>      int i;
>>  
>> -    /* ToDo: We can't use it until we haven't received a message */
>> -    return 0;
>> -
>>      be32_to_cpus(&packet->magic);
>>      if (packet->magic != MULTIFD_MAGIC) {
>>          error_setg(errp, "multifd: received packet "
>> @@ -851,6 +848,7 @@ static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *p = opaque;
>>      Error *local_err = NULL;
>> +    int ret;
>>  
>>      trace_multifd_send_thread_start(p->id);
>>  
>> @@ -878,10 +876,18 @@ static void *multifd_send_thread(void *opaque)
>>  
>>              trace_multifd_send(p->id, seq, used, flags);
>>  
>> -            /* ToDo: send packet here */
>> +            ret = qio_channel_write_all(p->c, (void *)p->packet,
>> +                                        p->packet_len, &local_err);
>> +            if (ret != 0) {
>> +                break;
>> +            }
>> +
>> +            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
>> +            if (ret != 0) {
>> +                break;
>> +            }
>>  
>>              qemu_mutex_lock(&p->mutex);
>> -            p->flags = 0;
>
> What's this change?

Leftover from previous approach on patch 16, we already do that
assignment several lines before.  Removed it on patch 16 as it should.

Thanks, Juan.
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 862ec53d32..9adbaa81f9 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -625,9 +625,6 @@  static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     RAMBlock *block;
     int i;
 
-    /* ToDo: We can't use it until we haven't received a message */
-    return 0;
-
     be32_to_cpus(&packet->magic);
     if (packet->magic != MULTIFD_MAGIC) {
         error_setg(errp, "multifd: received packet "
@@ -851,6 +848,7 @@  static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret;
 
     trace_multifd_send_thread_start(p->id);
 
@@ -878,10 +876,18 @@  static void *multifd_send_thread(void *opaque)
 
             trace_multifd_send(p->id, seq, used, flags);
 
-            /* ToDo: send packet here */
+            ret = qio_channel_write_all(p->c, (void *)p->packet,
+                                        p->packet_len, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
+            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
-            p->flags = 0;
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
 
@@ -1091,7 +1097,14 @@  static void *multifd_recv_thread(void *opaque)
             uint32_t flags;
             qemu_mutex_unlock(&p->mutex);
 
-            /* ToDo: recv packet here */
+            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                           p->packet_len, &local_err);
+            if (ret == 0) {   /* EOF */
+                break;
+            }
+            if (ret == -1) {   /* Error */
+                break;
+            }
 
             qemu_mutex_lock(&p->mutex);
             ret = multifd_recv_unfill_packet(p, &local_err);
@@ -1108,6 +1121,11 @@  static void *multifd_recv_thread(void *opaque)
             p->num_pages += used;
             qemu_mutex_unlock(&p->mutex);
 
+            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+            if (ret != 0) {
+                break;
+            }
+
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_recv_state->sem_sync);
                 qemu_sem_wait(&p->sem_sync);