Patchwork [RFC,2/2] separate thread for VM migration

login
register
mail settings
Submitter Umesh Deshpande
Date July 22, 2011, 7:58 p.m.
Message ID <6fef8cdde0cfabfd664b00c27d39613ba4e75737.1311363171.git.udeshpan@redhat.com>
Download mbox | patch
Permalink /patch/106388/
State New
Headers show

Comments

Umesh Deshpande - July 22, 2011, 7:58 p.m.
From: Umesh Deshpande <udeshpan@redhat.com>

This patch creates a separate thread for the guest migration on the source side.
The migration routine is called from the migration clock.

Signed-off-by: Umesh Deshpande <udeshpan@redhat.com>
---
 arch_init.c      |   14 ++++++++++--
 buffered_file.c  |   12 ++++++----
 exec.c           |    4 +++
 migration-tcp.c  |   18 ++++++++--------
 migration-unix.c |    7 ++---
 migration.c      |   59 ++++++++++++++++++++++++++++++-----------------------
 migration.h      |    4 +-
 7 files changed, 69 insertions(+), 49 deletions(-)
Paolo Bonzini - July 26, 2011, 11:13 a.m.
On 07/22/2011 09:58 PM, Umesh Deshapnde wrote:
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    qemu_mod_timer(s->timer, qemu_get_clock_ms(migration_clock) + 100);
>
>       if (s->freeze_output)
>           return;
> @@ -246,8 +246,10 @@ static void buffered_rate_tick(void *opaque)
>
>       buffered_flush(s);
>
> -    /* Add some checks around this */
>       s->put_ready(s->opaque);
> +    qemu_mutex_unlock_iothread();
> +    usleep(qemu_timer_difference(s->timer, migration_clock) * 1000);
> +    qemu_mutex_lock_iothread();
>   }

Do you really need a timer?  The timer will only run in the migration 
thread, where you should have full control on when to wait.

The BufferedFile code is more general, but you can create one thread per 
BufferedFile (you will only have one).  Then, since you only have one 
timer, calling buffered_rate_tick repeatedly is really all that is done 
in the body of the thread:

while (s->state == MIG_STATE_ACTIVE)
     start_time = qemu_get_clock_ms(rt_clock);
     buffered_rate_tick (s->file);

     qemu_mutex_unlock_iothread();
     usleep ((qemu_get_clock_ms(rt_clock) + 100 - start_time) * 1000);
     qemu_mutex_lock_iothread();
}


Perhaps the whole QEMUFile abstraction should be abandoned as the basis 
of QEMUBufferedFile.  It is simply too heavy when you're working in a 
separate thread and you can afford blocking operation.

I also am not sure about the correctness.  Here you removed the delayed 
call to migrate_fd_put_notify, which is what resets freeze_output to 0:

> @@ -327,9 +320,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>       if (ret == -1)
>           ret = -(s->get_error(s));
>
> -    if (ret == -EAGAIN) {
> -        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
> -    } else if (ret<  0) {
> +    if (ret<  0&&  ret != -EAGAIN) {
>           if (s->mon) {
>               monitor_resume(s->mon);
>           }

The call to migrate_fd_put_notify is here:

> @@ -396,6 +401,9 @@ void migrate_fd_put_ready(void *opaque)
>          }
>          s->state = state;
>          notifier_list_notify(&migration_state_notifiers);
> +    } else {
> +        migrate_fd_wait_for_unfreeze(s);
> +        qemu_file_put_notify(s->file);
>      }
>  }
>

But it is not clear to me what calls migrate_fd_put_ready when the 
output file is frozen.

Finally, here you are busy waiting:

> @@ -340,10 +331,34 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>       return ret;
>   }
>
> -void migrate_fd_connect(FdMigrationState *s)
> +void *migrate_run_timers(void *arg)
>   {
> +    FdMigrationState *s = arg;
>       int ret;
>
> +    qemu_mutex_lock_iothread();
> +    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> +            s->mig_state.shared);
> +    if (ret<  0) {
> +        DPRINTF("failed, %d\n", ret);
> +        migrate_fd_error(s);
> +        return NULL;
> +    }
> +
> +    migrate_fd_put_ready(s);
> +
> +    while (s->state == MIG_STATE_ACTIVE) {
> +        qemu_run_timers(migration_clock);
> +    }

which also convinces me that if you make rate limiting the main purpose 
of the migration thread's main loop, most problems will go away.  You 
can also use select() instead of usleep, so that you fix the problem 
with qemu_file_put_notify.

Paolo

Patch

diff --git a/arch_init.c b/arch_init.c
index 484b39d..cd545bc 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -110,7 +110,7 @@  static int is_dup_page(uint8_t *page, uint8_t ch)
 static RAMBlock *last_block;
 static ram_addr_t last_offset;
 
-static int ram_save_block(QEMUFile *f)
+static int ram_save_block(QEMUFile *f, int stage)
 {
     RAMBlock *block = last_block;
     ram_addr_t offset = last_offset;
@@ -131,6 +131,10 @@  static int ram_save_block(QEMUFile *f)
                                             current_addr + TARGET_PAGE_SIZE,
                                             MIGRATION_DIRTY_FLAG);
 
+            if (stage != 3) {
+                qemu_mutex_unlock_iothread();
+            }
+
             p = block->host + offset;
 
             if (is_dup_page(p, *p)) {
@@ -153,6 +157,10 @@  static int ram_save_block(QEMUFile *f)
                 bytes_sent = TARGET_PAGE_SIZE;
             }
 
+            if (stage != 3) {
+                qemu_mutex_lock_iothread();
+            }
+
             break;
         }
 
@@ -301,7 +309,7 @@  int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
     while (!qemu_file_rate_limit(f)) {
         int bytes_sent;
 
-        bytes_sent = ram_save_block(f);
+        bytes_sent = ram_save_block(f, stage);
         bytes_transferred += bytes_sent;
         if (bytes_sent == 0) { /* no more blocks */
             break;
@@ -322,7 +330,7 @@  int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
         int bytes_sent;
 
         /* flush all remaining blocks regardless of rate limiting */
-        while ((bytes_sent = ram_save_block(f)) != 0) {
+        while ((bytes_sent = ram_save_block(f, stage)) != 0) {
             bytes_transferred += bytes_sent;
         }
         cpu_physical_memory_set_dirty_tracking(0);
diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..73e9666 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -237,7 +237,7 @@  static void buffered_rate_tick(void *opaque)
         return;
     }
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_mod_timer(s->timer, qemu_get_clock_ms(migration_clock) + 100);
 
     if (s->freeze_output)
         return;
@@ -246,8 +246,10 @@  static void buffered_rate_tick(void *opaque)
 
     buffered_flush(s);
 
-    /* Add some checks around this */
     s->put_ready(s->opaque);
+    qemu_mutex_unlock_iothread();
+    usleep(qemu_timer_difference(s->timer, migration_clock) * 1000);
+    qemu_mutex_lock_iothread();
 }
 
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
@@ -271,11 +273,11 @@  QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-			     buffered_get_rate_limit);
+                             buffered_get_rate_limit);
 
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+    s->timer = qemu_new_timer_ms(migration_clock, buffered_rate_tick, s);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_mod_timer(s->timer, qemu_get_clock_ms(migration_clock) + 100);
 
     return s->file;
 }
diff --git a/exec.c b/exec.c
index 0e2ce57..97f60c6 100644
--- a/exec.c
+++ b/exec.c
@@ -2106,6 +2106,10 @@  void cpu_physical_memory_reset_dirty(ram_addr_t start, ram_addr_t end,
         abort();
     }
 
+    if (kvm_enabled()) {
+        return;
+    }
+
     for(env = first_cpu; env != NULL; env = env->next_cpu) {
         int mmu_idx;
         for (mmu_idx = 0; mmu_idx < NB_MMU_MODES; mmu_idx++) {
diff --git a/migration-tcp.c b/migration-tcp.c
index d3d80c9..5840668 100644
--- a/migration-tcp.c
+++ b/migration-tcp.c
@@ -65,11 +65,9 @@  static void tcp_wait_for_connect(void *opaque)
         return;
     }
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
-    if (val == 0)
+    if (val == 0) {
         migrate_fd_connect(s);
-    else {
+    } else {
         DPRINTF("error connecting %d\n", val);
         migrate_fd_error(s);
     }
@@ -79,8 +77,8 @@  MigrationState *tcp_start_outgoing_migration(Monitor *mon,
                                              const char *host_port,
                                              int64_t bandwidth_limit,
                                              int detach,
-					     int blk,
-					     int inc)
+                                             int blk,
+                                             int inc)
 {
     struct sockaddr_in addr;
     FdMigrationState *s;
@@ -121,15 +119,17 @@  MigrationState *tcp_start_outgoing_migration(Monitor *mon,
         if (ret == -1)
             ret = -(s->get_error(s));
 
-        if (ret == -EINPROGRESS || ret == -EWOULDBLOCK)
-            qemu_set_fd_handler2(s->fd, NULL, NULL, tcp_wait_for_connect, s);
     } while (ret == -EINTR);
 
     if (ret < 0 && ret != -EINPROGRESS && ret != -EWOULDBLOCK) {
         DPRINTF("connect failed\n");
         migrate_fd_error(s);
-    } else if (ret >= 0)
+    } else if (ret >= 0) {
         migrate_fd_connect(s);
+    } else {
+        migrate_fd_wait_for_unfreeze(s);
+        tcp_wait_for_connect(s);
+    }
 
     return &s->mig_state;
 }
diff --git a/migration-unix.c b/migration-unix.c
index c8625c7..e0a8f6f 100644
--- a/migration-unix.c
+++ b/migration-unix.c
@@ -64,8 +64,6 @@  static void unix_wait_for_connect(void *opaque)
         return;
     }
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (val == 0)
         migrate_fd_connect(s);
     else {
@@ -116,13 +114,14 @@  MigrationState *unix_start_outgoing_migration(Monitor *mon,
         if (ret == -1)
 	    ret = -(s->get_error(s));
 
-        if (ret == -EINPROGRESS || ret == -EWOULDBLOCK)
-	    qemu_set_fd_handler2(s->fd, NULL, NULL, unix_wait_for_connect, s);
     } while (ret == -EINTR);
 
     if (ret < 0 && ret != -EINPROGRESS && ret != -EWOULDBLOCK) {
         DPRINTF("connect failed\n");
         goto err_after_open;
+    } else if (ret == -EINPROGRESS || ret == -EWOULDBLOCK) {
+        migrate_fd_wait_for_unfreeze(s);
+        unix_wait_for_connect(s);
     }
 
     if (!detach) {
diff --git a/migration.c b/migration.c
index af3a1f2..4649691 100644
--- a/migration.c
+++ b/migration.c
@@ -12,6 +12,8 @@ 
  */
 
 #include "qemu-common.h"
+#include "qemu-thread.h"
+#include "qemu-timer.h"
 #include "migration.h"
 #include "monitor.h"
 #include "buffered_file.h"
@@ -35,6 +37,7 @@ 
 static int64_t max_throttle = (32 << 20);
 
 static MigrationState *current_migration;
+char host_port[50];
 
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -284,8 +287,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -307,14 +308,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
     return ret;
 }
 
-void migrate_fd_put_notify(void *opaque)
-{
-    FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-    qemu_file_put_notify(s->file);
-}
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 {
     FdMigrationState *s = opaque;
@@ -327,9 +320,7 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
     if (ret == -1)
         ret = -(s->get_error(s));
 
-    if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
-    } else if (ret < 0) {
+    if (ret < 0 && ret != -EAGAIN) {
         if (s->mon) {
             monitor_resume(s->mon);
         }
@@ -340,10 +331,34 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
     return ret;
 }
 
-void migrate_fd_connect(FdMigrationState *s)
+void *migrate_run_timers(void *arg)
 {
+    FdMigrationState *s = arg;
     int ret;
 
+    qemu_mutex_lock_iothread();
+    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+            s->mig_state.shared);
+    if (ret < 0) {
+        DPRINTF("failed, %d\n", ret);
+        migrate_fd_error(s);
+        return NULL;
+    }
+
+    migrate_fd_put_ready(s);
+
+    while (s->state == MIG_STATE_ACTIVE) {
+        qemu_run_timers(migration_clock);
+    }
+
+    qemu_mutex_unlock_iothread();
+
+    return NULL;
+}
+
+void migrate_fd_connect(FdMigrationState *s)
+{
+    struct QemuThread migrate_thread;
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
@@ -352,15 +367,7 @@  void migrate_fd_connect(FdMigrationState *s)
                                       migrate_fd_close);
 
     DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
-                                  s->mig_state.shared);
-    if (ret < 0) {
-        DPRINTF("failed, %d\n", ret);
-        migrate_fd_error(s);
-        return;
-    }
-    
-    migrate_fd_put_ready(s);
+    qemu_thread_create(&migrate_thread, migrate_run_timers, s);
 }
 
 void migrate_fd_put_ready(void *opaque)
@@ -376,8 +383,6 @@  void migrate_fd_put_ready(void *opaque)
     if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
         int state;
         int old_vm_running = vm_running;
-
-        DPRINTF("done iterating\n");
         vm_stop(VMSTOP_MIGRATE);
 
         if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
@@ -396,6 +401,9 @@  void migrate_fd_put_ready(void *opaque)
         }
         s->state = state;
         notifier_list_notify(&migration_state_notifiers);
+    } else {
+        migrate_fd_wait_for_unfreeze(s);
+        qemu_file_put_notify(s->file);
     }
 }
 
@@ -458,7 +466,6 @@  int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..8d8b7c9 100644
--- a/migration.h
+++ b/migration.h
@@ -72,6 +72,8 @@  void do_info_migrate(Monitor *mon, QObject **ret_data);
 
 int exec_start_incoming_migration(const char *host_port);
 
+void *migrate_run_timers(void *);
+
 MigrationState *exec_start_outgoing_migration(Monitor *mon,
                                               const char *host_port,
 					      int64_t bandwidth_limit,
@@ -112,8 +114,6 @@  void migrate_fd_error(FdMigrationState *s);
 
 int migrate_fd_cleanup(FdMigrationState *s);
 
-void migrate_fd_put_notify(void *opaque);
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
 
 void migrate_fd_connect(FdMigrationState *s);