diff mbox

[v5,16/17] migration: Transfer pages over new channels

Message ID 20170717134238.1966-17-quintela@redhat.com
State New
Headers show

Commit Message

Juan Quintela July 17, 2017, 1:42 p.m. UTC
We switch for sending the page number to send real pages.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--

Remove the HACK bit, now we have the function that calculates the size
of a page exported.
---
 migration/migration.c | 14 ++++++++----
 migration/ram.c       | 59 +++++++++++++++++----------------------------------
 2 files changed, 29 insertions(+), 44 deletions(-)

Comments

Dr. David Alan Gilbert July 20, 2017, 11:31 a.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> We switch for sending the page number to send real pages.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> 
> Remove the HACK bit, now we have the function that calculates the size
> of a page exported.
> ---
>  migration/migration.c | 14 ++++++++----
>  migration/ram.c       | 59 +++++++++++++++++----------------------------------
>  2 files changed, 29 insertions(+), 44 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e122684..34a34b7 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1882,13 +1882,14 @@ static void *migration_thread(void *opaque)
>      /* Used by the bandwidth calcs, updated later */
>      int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>      int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> -    int64_t initial_bytes = 0;
>      /*
>       * The final stage happens when the remaining data is smaller than
>       * this threshold; it's calculated from the requested downtime and
>       * measured bandwidth
>       */
>      int64_t threshold_size = 0;
> +    int64_t qemu_file_bytes = 0;
> +    int64_t multifd_pages = 0;

It feels like these changes to the transfer count should be in a
separate patch.

>      int64_t start_time = initial_time;
>      int64_t end_time;
>      bool old_vm_running = false;
> @@ -1976,9 +1977,13 @@ static void *migration_thread(void *opaque)
>          }
>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>          if (current_time >= initial_time + BUFFER_DELAY) {
> -            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
> -                                         initial_bytes;
>              uint64_t time_spent = current_time - initial_time;
> +            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
> +            uint64_t multifd_pages_now = ram_counters.multifd;
> +            uint64_t transferred_bytes =
> +                (qemu_file_bytes_now - qemu_file_bytes) +
> +                (multifd_pages_now - multifd_pages) *
> +                qemu_target_page_size();

If I've followed this right, then ram_counters.multifd is in the main
thread not the individual threads, so we should be OK doing that.

>              double bandwidth = (double)transferred_bytes / time_spent;
>              threshold_size = bandwidth * s->parameters.downtime_limit;
>  
> @@ -1996,7 +2001,8 @@ static void *migration_thread(void *opaque)
>  
>              qemu_file_reset_rate_limit(s->to_dst_file);
>              initial_time = current_time;
> -            initial_bytes = qemu_ftell(s->to_dst_file);
> +            qemu_file_bytes = qemu_file_bytes_now;
> +            multifd_pages = multifd_pages_now;
>          }
>          if (qemu_file_rate_limit(s->to_dst_file)) {
>              /* usleep expects microseconds */
> diff --git a/migration/ram.c b/migration/ram.c
> index b55b243..c78b286 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -468,25 +468,21 @@ static void *multifd_send_thread(void *opaque)
>              break;
>          }
>          if (p->pages.num) {
> -            int i;
>              int num;
>  
>              num = p->pages.num;
>              p->pages.num = 0;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            for (i = 0; i < num; i++) {
> -                if (qio_channel_write(p->c,
> -                                      (const char *)&p->pages.iov[i].iov_base,
> -                                      sizeof(uint8_t *), &error_abort)
> -                    != sizeof(uint8_t *)) {
> -                    MigrationState *s = migrate_get_current();
> +            if (qio_channel_writev_all(p->c, p->pages.iov,
> +                                       num, &error_abort)
> +                != num * TARGET_PAGE_SIZE) {
> +                MigrationState *s = migrate_get_current();

Same comments as previous patch; note we should find a way to get
the error message logged; not easy since we're in a thread, but
we need to find a way to log the errors.

>  
> -                    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> -                                      MIGRATION_STATUS_FAILED);
> -                    terminate_multifd_send_threads();
> -                    return NULL;
> -                }
> +                migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                                  MIGRATION_STATUS_FAILED);
> +                terminate_multifd_send_threads();
> +                return NULL;
>              }
>              qemu_mutex_lock(&multifd_send_state->mutex);
>              p->done = true;
> @@ -654,7 +650,6 @@ void multifd_load_cleanup(void)
>  static void *multifd_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> -    uint8_t *recv_address;
>  
>      qemu_sem_post(&p->ready);
>      while (true) {
> @@ -664,38 +659,21 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>          if (p->pages.num) {
> -            int i;
>              int num;
>  
>              num = p->pages.num;
>              p->pages.num = 0;
>  
> -            for (i = 0; i < num; i++) {
> -                if (qio_channel_read(p->c,
> -                                     (char *)&recv_address,
> -                                     sizeof(uint8_t *), &error_abort)
> -                    != sizeof(uint8_t *)) {
> -                    MigrationState *s = migrate_get_current();
> +            if (qio_channel_readv_all(p->c, p->pages.iov,
> +                                      num, &error_abort)
> +                != num * TARGET_PAGE_SIZE) {
> +                MigrationState *s = migrate_get_current();
>  
> -                    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> -                                      MIGRATION_STATUS_FAILED);
> -                    terminate_multifd_recv_threads();
> -                    return NULL;
> -                }
> -                if (recv_address != p->pages.iov[i].iov_base) {
> -                    MigrationState *s = migrate_get_current();
> -
> -                    printf("We received %p what we were expecting %p (%d)\n",
> -                           recv_address,
> -                           p->pages.iov[i].iov_base, i);
> -
> -                    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> -                                      MIGRATION_STATUS_FAILED);
> -                    terminate_multifd_recv_threads();
> -                    return NULL;
> -                }
> +                migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                                  MIGRATION_STATUS_FAILED);
> +                terminate_multifd_recv_threads();
> +                return NULL;
>              }
> -
>              p->done = true;
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
> @@ -1262,8 +1240,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>                               offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
>          qemu_put_be16(rs->f, fd_num);
> +        if (fd_num != UINT16_MAX) {
> +            qemu_fflush(rs->f);
> +        }

Is that to make sure that the relatively small messages actually get
transmitted on the main fd so that the destination starts receiving
them?

I do have a worry there that, since the addresses are going down a
single fd we are open to deadlock by the send threads filling up
buffers and blocking waiting for the receivers to receive.

>          ram_counters.transferred += 2; /* size of fd_num */
> -        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
>          ram_counters.transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          ram_counters.normal++;
> @@ -3126,7 +3106,6 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          case RAM_SAVE_FLAG_MULTIFD_PAGE:
>              fd_num = qemu_get_be16(f);
>              multifd_recv_page(host, fd_num);
> -            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
>  
>          case RAM_SAVE_FLAG_EOS:

Dave

> -- 
> 2.9.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela Aug. 8, 2017, 11:13 a.m. UTC | #2
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We switch for sending the page number to send real pages.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> 
>> --
>> 
>> Remove the HACK bit, now we have the function that calculates the size
>> of a page exported.
>> ---
>>  migration/migration.c | 14 ++++++++----
>>  migration/ram.c       | 59 +++++++++++++++++----------------------------------
>>  2 files changed, 29 insertions(+), 44 deletions(-)
>> 
>> diff --git a/migration/migration.c b/migration/migration.c
>> index e122684..34a34b7 100644
>> --- a/migration/migration.c
>> +++ b/migration/migration.c
>> @@ -1882,13 +1882,14 @@ static void *migration_thread(void *opaque)
>>      /* Used by the bandwidth calcs, updated later */
>>      int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>>      int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>> -    int64_t initial_bytes = 0;
>>      /*
>>       * The final stage happens when the remaining data is smaller than
>>       * this threshold; it's calculated from the requested downtime and
>>       * measured bandwidth
>>       */
>>      int64_t threshold_size = 0;
>> +    int64_t qemu_file_bytes = 0;
>> +    int64_t multifd_pages = 0;
>
> It feels like these changes to the transfer count should be in a
> separate patch.

Until this patch, we only sent the address number for testing purposes,
we can change it in the previous patch.  I can split the
qemu_file_bytes, though.

>>      int64_t start_time = initial_time;
>>      int64_t end_time;
>>      bool old_vm_running = false;
>> @@ -1976,9 +1977,13 @@ static void *migration_thread(void *opaque)
>>          }
>>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
>>          if (current_time >= initial_time + BUFFER_DELAY) {
>> -            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
>> -                                         initial_bytes;
>>              uint64_t time_spent = current_time - initial_time;
>> +            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
>> +            uint64_t multifd_pages_now = ram_counters.multifd;
>> +            uint64_t transferred_bytes =
>> +                (qemu_file_bytes_now - qemu_file_bytes) +
>> +                (multifd_pages_now - multifd_pages) *
>> +                qemu_target_page_size();
>
> If I've followed this right, then ram_counters.multifd is in the main
> thread not the individual threads, so we should be OK doing that.

Yeap.

>
>>              double bandwidth = (double)transferred_bytes / time_spent;
>>              threshold_size = bandwidth * s->parameters.downtime_limit;
>>  
>> @@ -1996,7 +2001,8 @@ static void *migration_thread(void *opaque)
>>  
>>              qemu_file_reset_rate_limit(s->to_dst_file);
>>              initial_time = current_time;
>> -            initial_bytes = qemu_ftell(s->to_dst_file);
>> +            qemu_file_bytes = qemu_file_bytes_now;
>> +            multifd_pages = multifd_pages_now;
>>          }
>>          if (qemu_file_rate_limit(s->to_dst_file)) {
>>              /* usleep expects microseconds */
>> diff --git a/migration/ram.c b/migration/ram.c
>> index b55b243..c78b286 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -468,25 +468,21 @@ static void *multifd_send_thread(void *opaque)
>>              break;
>>          }
>>          if (p->pages.num) {
>> -            int i;
>>              int num;
>>  
>>              num = p->pages.num;
>>              p->pages.num = 0;
>>              qemu_mutex_unlock(&p->mutex);
>>  
>> -            for (i = 0; i < num; i++) {
>> -                if (qio_channel_write(p->c,
>> -                                      (const char *)&p->pages.iov[i].iov_base,
>> -                                      sizeof(uint8_t *), &error_abort)
>> -                    != sizeof(uint8_t *)) {
>> -                    MigrationState *s = migrate_get_current();
>> +            if (qio_channel_writev_all(p->c, p->pages.iov,
>> +                                       num, &error_abort)
>> +                != num * TARGET_PAGE_SIZE) {
>> +                MigrationState *s = migrate_get_current();
>
> Same comments as previous patch; note we should find a way to get
> the error message logged; not easy since we're in a thread, but
> we need to find a way to log the errors.

I am open to suggestions how to set errors in a different thread.

>> @@ -1262,8 +1240,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
>>                               offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>>          fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
>>          qemu_put_be16(rs->f, fd_num);
>> +        if (fd_num != UINT16_MAX) {
>> +            qemu_fflush(rs->f);
>> +        }
>
> Is that to make sure that the relatively small messages actually get
> transmitted on the main fd so that the destination starts receiving
> them?

Yeap.

> I do have a worry there that, since the addresses are going down a
> single fd we are open to deadlock by the send threads filling up
> buffers and blocking waiting for the receivers to receive.

I think we are doing the intelligent case here.
We only sync when we are sure that the package has finished, so we
should be ok here.  If we finish the migration, we call fflush anyways on
other places, so we can't get stuck as far as I can see.

Later, Juan.
Dr. David Alan Gilbert Aug. 8, 2017, 11:32 a.m. UTC | #3
* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We switch for sending the page number to send real pages.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> 
> >> --
> >> 
> >> Remove the HACK bit, now we have the function that calculates the size
> >> of a page exported.
> >> ---
> >>  migration/migration.c | 14 ++++++++----
> >>  migration/ram.c       | 59 +++++++++++++++++----------------------------------
> >>  2 files changed, 29 insertions(+), 44 deletions(-)
> >> 
> >> diff --git a/migration/migration.c b/migration/migration.c
> >> index e122684..34a34b7 100644
> >> --- a/migration/migration.c
> >> +++ b/migration/migration.c
> >> @@ -1882,13 +1882,14 @@ static void *migration_thread(void *opaque)
> >>      /* Used by the bandwidth calcs, updated later */
> >>      int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> >>      int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> >> -    int64_t initial_bytes = 0;
> >>      /*
> >>       * The final stage happens when the remaining data is smaller than
> >>       * this threshold; it's calculated from the requested downtime and
> >>       * measured bandwidth
> >>       */
> >>      int64_t threshold_size = 0;
> >> +    int64_t qemu_file_bytes = 0;
> >> +    int64_t multifd_pages = 0;
> >
> > It feels like these changes to the transfer count should be in a
> > separate patch.
> 
> Until this patch, we only sent the address number for testing purposes,
> we can change it in the previous patch.  I can split the
> qemu_file_bytes, though.
> 
> >>      int64_t start_time = initial_time;
> >>      int64_t end_time;
> >>      bool old_vm_running = false;
> >> @@ -1976,9 +1977,13 @@ static void *migration_thread(void *opaque)
> >>          }
> >>          current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> >>          if (current_time >= initial_time + BUFFER_DELAY) {
> >> -            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
> >> -                                         initial_bytes;
> >>              uint64_t time_spent = current_time - initial_time;
> >> +            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
> >> +            uint64_t multifd_pages_now = ram_counters.multifd;
> >> +            uint64_t transferred_bytes =
> >> +                (qemu_file_bytes_now - qemu_file_bytes) +
> >> +                (multifd_pages_now - multifd_pages) *
> >> +                qemu_target_page_size();
> >
> > If I've followed this right, then ram_counters.multifd is in the main
> > thread not the individual threads, so we should be OK doing that.
> 
> Yeap.
> 
> >
> >>              double bandwidth = (double)transferred_bytes / time_spent;
> >>              threshold_size = bandwidth * s->parameters.downtime_limit;
> >>  
> >> @@ -1996,7 +2001,8 @@ static void *migration_thread(void *opaque)
> >>  
> >>              qemu_file_reset_rate_limit(s->to_dst_file);
> >>              initial_time = current_time;
> >> -            initial_bytes = qemu_ftell(s->to_dst_file);
> >> +            qemu_file_bytes = qemu_file_bytes_now;
> >> +            multifd_pages = multifd_pages_now;
> >>          }
> >>          if (qemu_file_rate_limit(s->to_dst_file)) {
> >>              /* usleep expects microseconds */
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index b55b243..c78b286 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -468,25 +468,21 @@ static void *multifd_send_thread(void *opaque)
> >>              break;
> >>          }
> >>          if (p->pages.num) {
> >> -            int i;
> >>              int num;
> >>  
> >>              num = p->pages.num;
> >>              p->pages.num = 0;
> >>              qemu_mutex_unlock(&p->mutex);
> >>  
> >> -            for (i = 0; i < num; i++) {
> >> -                if (qio_channel_write(p->c,
> >> -                                      (const char *)&p->pages.iov[i].iov_base,
> >> -                                      sizeof(uint8_t *), &error_abort)
> >> -                    != sizeof(uint8_t *)) {
> >> -                    MigrationState *s = migrate_get_current();
> >> +            if (qio_channel_writev_all(p->c, p->pages.iov,
> >> +                                       num, &error_abort)
> >> +                != num * TARGET_PAGE_SIZE) {
> >> +                MigrationState *s = migrate_get_current();
> >
> > Same comments as previous patch; note we should find a way to get
> > the error message logged; not easy since we're in a thread, but
> > we need to find a way to log the errors.
> 
> I am open to suggestions how to set errors in a different thread.

The thread function can 'return' a value - could that be an error
pointer consumed when the thread is joined?
I'd take a fprintf if nothing else (although that's not actually safe)
but not an abort on the source side. Ever.

> 
> >> @@ -1262,8 +1240,10 @@ static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
> >>                               offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> >>          fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
> >>          qemu_put_be16(rs->f, fd_num);
> >> +        if (fd_num != UINT16_MAX) {
> >> +            qemu_fflush(rs->f);
> >> +        }
> >
> > Is that to make sure that the relatively small messages actually get
> > transmitted on the main fd so that the destination starts receiving
> > them?
> 
> Yeap.
> 
> > I do have a worry there that, since the addresses are going down a
> > single fd we are open to deadlock by the send threads filling up
> > buffers and blocking waiting for the receivers to receive.
> 
> I think we are doing the intelligent case here.
> We only sync when we are sure that the package has finished, so we
> should be ok here.  If we finish the migration, we call fflush anyways on
> other places, so we can't get stuck as far as I can see.

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/migration/migration.c b/migration/migration.c
index e122684..34a34b7 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1882,13 +1882,14 @@  static void *migration_thread(void *opaque)
     /* Used by the bandwidth calcs, updated later */
     int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
-    int64_t initial_bytes = 0;
     /*
      * The final stage happens when the remaining data is smaller than
      * this threshold; it's calculated from the requested downtime and
      * measured bandwidth
      */
     int64_t threshold_size = 0;
+    int64_t qemu_file_bytes = 0;
+    int64_t multifd_pages = 0;
     int64_t start_time = initial_time;
     int64_t end_time;
     bool old_vm_running = false;
@@ -1976,9 +1977,13 @@  static void *migration_thread(void *opaque)
         }
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
         if (current_time >= initial_time + BUFFER_DELAY) {
-            uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) -
-                                         initial_bytes;
             uint64_t time_spent = current_time - initial_time;
+            uint64_t qemu_file_bytes_now = qemu_ftell(s->to_dst_file);
+            uint64_t multifd_pages_now = ram_counters.multifd;
+            uint64_t transferred_bytes =
+                (qemu_file_bytes_now - qemu_file_bytes) +
+                (multifd_pages_now - multifd_pages) *
+                qemu_target_page_size();
             double bandwidth = (double)transferred_bytes / time_spent;
             threshold_size = bandwidth * s->parameters.downtime_limit;
 
@@ -1996,7 +2001,8 @@  static void *migration_thread(void *opaque)
 
             qemu_file_reset_rate_limit(s->to_dst_file);
             initial_time = current_time;
-            initial_bytes = qemu_ftell(s->to_dst_file);
+            qemu_file_bytes = qemu_file_bytes_now;
+            multifd_pages = multifd_pages_now;
         }
         if (qemu_file_rate_limit(s->to_dst_file)) {
             /* usleep expects microseconds */
diff --git a/migration/ram.c b/migration/ram.c
index b55b243..c78b286 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -468,25 +468,21 @@  static void *multifd_send_thread(void *opaque)
             break;
         }
         if (p->pages.num) {
-            int i;
             int num;
 
             num = p->pages.num;
             p->pages.num = 0;
             qemu_mutex_unlock(&p->mutex);
 
-            for (i = 0; i < num; i++) {
-                if (qio_channel_write(p->c,
-                                      (const char *)&p->pages.iov[i].iov_base,
-                                      sizeof(uint8_t *), &error_abort)
-                    != sizeof(uint8_t *)) {
-                    MigrationState *s = migrate_get_current();
+            if (qio_channel_writev_all(p->c, p->pages.iov,
+                                       num, &error_abort)
+                != num * TARGET_PAGE_SIZE) {
+                MigrationState *s = migrate_get_current();
 
-                    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
-                                      MIGRATION_STATUS_FAILED);
-                    terminate_multifd_send_threads();
-                    return NULL;
-                }
+                migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                                  MIGRATION_STATUS_FAILED);
+                terminate_multifd_send_threads();
+                return NULL;
             }
             qemu_mutex_lock(&multifd_send_state->mutex);
             p->done = true;
@@ -654,7 +650,6 @@  void multifd_load_cleanup(void)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
-    uint8_t *recv_address;
 
     qemu_sem_post(&p->ready);
     while (true) {
@@ -664,38 +659,21 @@  static void *multifd_recv_thread(void *opaque)
             break;
         }
         if (p->pages.num) {
-            int i;
             int num;
 
             num = p->pages.num;
             p->pages.num = 0;
 
-            for (i = 0; i < num; i++) {
-                if (qio_channel_read(p->c,
-                                     (char *)&recv_address,
-                                     sizeof(uint8_t *), &error_abort)
-                    != sizeof(uint8_t *)) {
-                    MigrationState *s = migrate_get_current();
+            if (qio_channel_readv_all(p->c, p->pages.iov,
+                                      num, &error_abort)
+                != num * TARGET_PAGE_SIZE) {
+                MigrationState *s = migrate_get_current();
 
-                    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
-                                      MIGRATION_STATUS_FAILED);
-                    terminate_multifd_recv_threads();
-                    return NULL;
-                }
-                if (recv_address != p->pages.iov[i].iov_base) {
-                    MigrationState *s = migrate_get_current();
-
-                    printf("We received %p what we were expecting %p (%d)\n",
-                           recv_address,
-                           p->pages.iov[i].iov_base, i);
-
-                    migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
-                                      MIGRATION_STATUS_FAILED);
-                    terminate_multifd_recv_threads();
-                    return NULL;
-                }
+                migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                                  MIGRATION_STATUS_FAILED);
+                terminate_multifd_recv_threads();
+                return NULL;
             }
-
             p->done = true;
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
@@ -1262,8 +1240,10 @@  static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
                              offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1);
         qemu_put_be16(rs->f, fd_num);
+        if (fd_num != UINT16_MAX) {
+            qemu_fflush(rs->f);
+        }
         ram_counters.transferred += 2; /* size of fd_num */
-        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
         ram_counters.transferred += TARGET_PAGE_SIZE;
         pages = 1;
         ram_counters.normal++;
@@ -3126,7 +3106,6 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
             multifd_recv_page(host, fd_num);
-            qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
         case RAM_SAVE_FLAG_EOS: