diff mbox series

[v12,19/21] migration: Wait for blocking IO

Message ID 20180425112723.1111-20-quintela@redhat.com
State New
Headers show
Series Multifd | expand

Commit Message

Juan Quintela April 25, 2018, 11:27 a.m. UTC
We have three conditions here:
- channel fails -> error
- we have to quit: we close the channel and reads fails
- normal read that success, we are in bussiness

So forget the complications of waiting in a semaphore.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 81 ++++++++++++++++++-------------------------------
 1 file changed, 29 insertions(+), 52 deletions(-)

Comments

Dr. David Alan Gilbert May 3, 2018, 3:04 p.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> We have three conditions here:
> - channel fails -> error
> - we have to quit: we close the channel and reads fails
> - normal read that success, we are in bussiness
> 
> So forget the complications of waiting in a semaphore.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

I *think* this is OK; but I'd prefer to see most of this folded into
earlier patches where I think they'd be cleaner rather than putting
stuff in and shuffling it around.

Dave

> ---
>  migration/ram.c | 81 ++++++++++++++++++-------------------------------
>  1 file changed, 29 insertions(+), 52 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 9adbaa81f9..2734f91ded 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -496,8 +496,6 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> -    /* thread has work to do */
> -    bool pending_job;
>      /* array of pages to receive */
>      MultiFDPages_t *pages;
>      /* packet allocated len */
> @@ -1056,14 +1054,6 @@ static void multifd_recv_sync_main(void)
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> -        trace_multifd_recv_sync_main_signal(p->id);
> -        qemu_mutex_lock(&p->mutex);
> -        p->pending_job = true;
> -        qemu_mutex_unlock(&p->mutex);
> -    }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
>          trace_multifd_recv_sync_main_wait(p->id);
>          qemu_sem_wait(&multifd_recv_state->sem_sync);
>          qemu_mutex_lock(&p->mutex);
> @@ -1076,7 +1066,6 @@ static void multifd_recv_sync_main(void)
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          trace_multifd_recv_sync_main_signal(p->id);
> -
>          qemu_sem_post(&p->sem_sync);
>      }
>      trace_multifd_recv_sync_main(multifd_recv_state->seq);
> @@ -1091,51 +1080,40 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>  
>      while (true) {
> +        uint32_t used;
> +        uint32_t flags;
> +
> +        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> +                                       p->packet_len, &local_err);
> +        if (ret == 0) {   /* EOF */
> +            break;
> +        }
> +        if (ret == -1) {   /* Error */
> +            break;
> +        }
> +
>          qemu_mutex_lock(&p->mutex);
> -        if (true || p->pending_job) {
> -            uint32_t used;
> -            uint32_t flags;
> -            qemu_mutex_unlock(&p->mutex);
> -
> -            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> -                                           p->packet_len, &local_err);
> -            if (ret == 0) {   /* EOF */
> -                break;
> -            }
> -            if (ret == -1) {   /* Error */
> -                break;
> -            }
> -
> -            qemu_mutex_lock(&p->mutex);
> -            ret = multifd_recv_unfill_packet(p, &local_err);
> -            if (ret) {
> -                qemu_mutex_unlock(&p->mutex);
> -                break;
> -            }
> -
> -            used = p->pages->used;
> -            flags = p->flags;
> -            trace_multifd_recv(p->id, p->seq, used, flags);
> -            p->pending_job = false;
> -            p->num_packets++;
> -            p->num_pages += used;
> +        ret = multifd_recv_unfill_packet(p, &local_err);
> +        if (ret) {
>              qemu_mutex_unlock(&p->mutex);
> +            break;
> +        }
>  
> -            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> -            if (ret != 0) {
> -                break;
> -            }
> +        used = p->pages->used;
> +        flags = p->flags;
> +        trace_multifd_recv(p->id, p->seq, used, flags);
> +        p->num_packets++;
> +        p->num_pages += used;
> +        qemu_mutex_unlock(&p->mutex);
>  
> -            if (flags & MULTIFD_FLAG_SYNC) {
> -                qemu_sem_post(&multifd_recv_state->sem_sync);
> -                qemu_sem_wait(&p->sem_sync);
> -            }
> -        } else if (p->quit) {
> -            qemu_mutex_unlock(&p->mutex);
> +        ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +        if (ret != 0) {
>              break;
> -        } else {
> -            qemu_mutex_unlock(&p->mutex);
> -            /* sometimes there are spurious wakeups */
> +        }
> +
> +        if (flags & MULTIFD_FLAG_SYNC) {
> +            qemu_sem_post(&multifd_recv_state->sem_sync);
> +            qemu_sem_wait(&p->sem_sync);
>          }
>      }
>  
> @@ -1173,7 +1151,6 @@ int multifd_load_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          qemu_sem_init(&p->sem_sync, 0);
>          p->quit = false;
> -        p->pending_job = false;
>          p->id = i;
>          multifd_pages_init(&p->pages, page_count);
>          p->packet_len = sizeof(MultiFDPacket_t)
> -- 
> 2.17.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 9adbaa81f9..2734f91ded 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -496,8 +496,6 @@  typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
-    /* thread has work to do */
-    bool pending_job;
     /* array of pages to receive */
     MultiFDPages_t *pages;
     /* packet allocated len */
@@ -1056,14 +1054,6 @@  static void multifd_recv_sync_main(void)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
-        trace_multifd_recv_sync_main_signal(p->id);
-        qemu_mutex_lock(&p->mutex);
-        p->pending_job = true;
-        qemu_mutex_unlock(&p->mutex);
-    }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
         trace_multifd_recv_sync_main_wait(p->id);
         qemu_sem_wait(&multifd_recv_state->sem_sync);
         qemu_mutex_lock(&p->mutex);
@@ -1076,7 +1066,6 @@  static void multifd_recv_sync_main(void)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         trace_multifd_recv_sync_main_signal(p->id);
-
         qemu_sem_post(&p->sem_sync);
     }
     trace_multifd_recv_sync_main(multifd_recv_state->seq);
@@ -1091,51 +1080,40 @@  static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
+        uint32_t used;
+        uint32_t flags;
+
+        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                       p->packet_len, &local_err);
+        if (ret == 0) {   /* EOF */
+            break;
+        }
+        if (ret == -1) {   /* Error */
+            break;
+        }
+
         qemu_mutex_lock(&p->mutex);
-        if (true || p->pending_job) {
-            uint32_t used;
-            uint32_t flags;
-            qemu_mutex_unlock(&p->mutex);
-
-            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                           p->packet_len, &local_err);
-            if (ret == 0) {   /* EOF */
-                break;
-            }
-            if (ret == -1) {   /* Error */
-                break;
-            }
-
-            qemu_mutex_lock(&p->mutex);
-            ret = multifd_recv_unfill_packet(p, &local_err);
-            if (ret) {
-                qemu_mutex_unlock(&p->mutex);
-                break;
-            }
-
-            used = p->pages->used;
-            flags = p->flags;
-            trace_multifd_recv(p->id, p->seq, used, flags);
-            p->pending_job = false;
-            p->num_packets++;
-            p->num_pages += used;
+        ret = multifd_recv_unfill_packet(p, &local_err);
+        if (ret) {
             qemu_mutex_unlock(&p->mutex);
+            break;
+        }
 
-            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
-            if (ret != 0) {
-                break;
-            }
+        used = p->pages->used;
+        flags = p->flags;
+        trace_multifd_recv(p->id, p->seq, used, flags);
+        p->num_packets++;
+        p->num_pages += used;
+        qemu_mutex_unlock(&p->mutex);
 
-            if (flags & MULTIFD_FLAG_SYNC) {
-                qemu_sem_post(&multifd_recv_state->sem_sync);
-                qemu_sem_wait(&p->sem_sync);
-            }
-        } else if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
+        ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+        if (ret != 0) {
             break;
-        } else {
-            qemu_mutex_unlock(&p->mutex);
-            /* sometimes there are spurious wakeups */
+        }
+
+        if (flags & MULTIFD_FLAG_SYNC) {
+            qemu_sem_post(&multifd_recv_state->sem_sync);
+            qemu_sem_wait(&p->sem_sync);
         }
     }
 
@@ -1173,7 +1151,6 @@  int multifd_load_setup(void)
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
-        p->pending_job = false;
         p->id = i;
         multifd_pages_init(&p->pages, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)