diff mbox

[RFC,v4,4/5] separate thread for VM migration

Message ID d11a5c990004ea81b59fb4984e18358223e674b8.1313552764.git.udeshpan@redhat.com
State New
Headers show

Commit Message

Umesh Deshpande Aug. 17, 2011, 3:56 a.m. UTC
This patch creates a separate thread for the guest migration on the source side.
migrate_cancel request from the iothread is handled asynchronously. That is,
iothread submits migrate_cancel to the migration thread and returns, while the
migration thread attends this request at the next iteration to terminate its
execution.

Signed-off-by: Umesh Deshpande <udeshpan@redhat.com>
---
 buffered_file.c |   93 ++++++++++++++++++++++++++++++++++--------------------
 migration.c     |   77 +++++++++++++++++++++++----------------------
 migration.h     |    1 +
 savevm.c        |    5 ---
 4 files changed, 99 insertions(+), 77 deletions(-)

Comments

Paolo Bonzini Aug. 17, 2011, 7:13 a.m. UTC | #1
On 08/16/2011 08:56 PM, Umesh Deshpande wrote:
> +            qemu_mutex_lock_ramlist();

Taken locks: iothread, ramlist

> +            qemu_mutex_unlock_iothread();

Taken locks: ramlist

> +            s->wait_for_unfreeze(s);
> +            qemu_mutex_lock_iothread();

Taken locks: ramlist, iothread

You'd have a deadlock here if hypothetically you had two migrations at 
the same time.

> +            qemu_mutex_unlock_ramlist();

But in general, why this locking?  The buffered file need not know 
anything about the ram list and its mutex.  Only ram_save_live needs to 
hold the ramlist lock---starting just before sort_ram_list and ending 
just after the end of stage 3.  That should be part of patch 2.

Actually buffered_file.c should ideally not even take the iothread lock! 
  The code there is only copying data from a private buffer to a file 
descriptor; neither is shared.  It's migrate_fd_put_buffer that should 
take care of locking.  This is an example of why keeping the separation 
of QEMUBufferedFile is a good idea at least for now.

I am still kind of unconvinced about calling qemu_fclose from the 
migration thread.  You still have one instance of cancellation in the 
iothread when migrate_fd_release is called.  Ideally, as soon as 
migration finishes or has an error you could trigger a bottom half that 
closes the file (which in turn joins the thread).  Migration state 
notifiers should also be run only from the iothread.  Failure to do so 
(or in general lack of a policy of what runs where) can lead to very 
difficult bugs.  Not so much hard to debug in this case (we have a 
global lock, so things cannot go _that_ bad), but hard to fix without 
redoing everything.

However, this patch is a good start (with locking fixed).  It should 
takes several incremental steps before getting there, including 
incredible simplification if you take into account that migration can 
block and wait_for_unfreeze can disappear.  In the end it probably 
should be committed as a single patch, but I'm liking the patches more 
and more.

Paolo
diff mbox

Patch

diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..bdcdf42 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,6 +16,8 @@ 
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "buffered_file.h"
+#include "migration.h"
+#include "qemu-thread.h"
 
 //#define DEBUG_BUFFERED_FILE
 
@@ -28,13 +30,14 @@  typedef struct QEMUFileBuffered
     void *opaque;
     QEMUFile *file;
     int has_error;
+    int closed;
     int freeze_output;
     size_t bytes_xfer;
     size_t xfer_limit;
     uint8_t *buffer;
     size_t buffer_size;
     size_t buffer_capacity;
-    QEMUTimer *timer;
+    QemuThread thread;
 } QEMUFileBuffered;
 
 #ifdef DEBUG_BUFFERED_FILE
@@ -155,14 +158,6 @@  static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
         offset = size;
     }
 
-    if (pos == 0 && size == 0) {
-        DPRINTF("file is ready\n");
-        if (s->bytes_xfer <= s->xfer_limit) {
-            DPRINTF("notifying client\n");
-            s->put_ready(s->opaque);
-        }
-    }
-
     return offset;
 }
 
@@ -175,20 +170,20 @@  static int buffered_close(void *opaque)
 
     while (!s->has_error && s->buffer_size) {
         buffered_flush(s);
-        if (s->freeze_output)
+        if (s->freeze_output) {
             s->wait_for_unfreeze(s);
+        }
     }
 
-    ret = s->close(s->opaque);
+    s->closed = 1;
 
-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
+    ret = s->close(s->opaque);
     qemu_free(s->buffer);
-    qemu_free(s);
 
     return ret;
 }
 
+
 static int buffered_rate_limit(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
@@ -228,34 +223,63 @@  static int64_t buffered_get_rate_limit(void *opaque)
     return s->xfer_limit;
 }
 
-static void buffered_rate_tick(void *opaque)
+static void *migrate_vm(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
+    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
+    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
 
-    if (s->has_error) {
-        buffered_close(s);
-        return;
-    }
+    qemu_mutex_lock_iothread();
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    while (!s->closed) {
+        if (s->freeze_output) {
+            qemu_mutex_lock_ramlist();
+            qemu_mutex_unlock_iothread();
+            s->wait_for_unfreeze(s);
+            qemu_mutex_lock_iothread();
+            qemu_mutex_unlock_ramlist();
+            s->freeze_output = 0;
+            continue;
+        }
 
-    if (s->freeze_output)
-        return;
+        if (s->has_error) {
+            break;
+        }
+
+        current_time = qemu_get_clock_ms(rt_clock);
+        if (!s->closed && (expire_time > current_time)) {
+            tv.tv_usec = 1000 * (expire_time - current_time);
+            qemu_mutex_lock_ramlist();
+            qemu_mutex_unlock_iothread();
+            select(0, NULL, NULL, NULL, &tv);
+            qemu_mutex_lock_iothread();
+            qemu_mutex_unlock_ramlist();
+            continue;
+        }
 
-    s->bytes_xfer = 0;
+        s->bytes_xfer = 0;
+        buffered_flush(s);
 
-    buffered_flush(s);
+        expire_time = qemu_get_clock_ms(rt_clock) + 100;
+        s->put_ready(s->opaque);
+    }
 
-    /* Add some checks around this */
-    s->put_ready(s->opaque);
+    if (s->has_error) {
+        buffered_close(s);
+    }
+    qemu_free(s);
+
+    qemu_mutex_unlock_iothread();
+
+    return NULL;
 }
 
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
-                                  size_t bytes_per_sec,
-                                  BufferedPutFunc *put_buffer,
-                                  BufferedPutReadyFunc *put_ready,
-                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
-                                  BufferedCloseFunc *close)
+        size_t bytes_per_sec,
+        BufferedPutFunc *put_buffer,
+        BufferedPutReadyFunc *put_ready,
+        BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
+        BufferedCloseFunc *close)
 {
     QEMUFileBuffered *s;
 
@@ -267,15 +291,14 @@  QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->put_ready = put_ready;
     s->wait_for_unfreeze = wait_for_unfreeze;
     s->close = close;
+    s->closed = 0;
 
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-			     buffered_get_rate_limit);
-
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+                             buffered_get_rate_limit);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, migrate_vm, s);
 
     return s->file;
 }
diff --git a/migration.c b/migration.c
index af3a1f2..b6ba690 100644
--- a/migration.c
+++ b/migration.c
@@ -34,6 +34,8 @@ 
 /* Migration speed throttling */
 static int64_t max_throttle = (32 << 20);
 
+static int change_speed;
+
 static MigrationState *current_migration;
 
 static NotifierList migration_state_notifiers =
@@ -141,18 +143,13 @@  int do_migrate_cancel(Monitor *mon, const QDict *qdict, QObject **ret_data)
 int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data)
 {
     int64_t d;
-    FdMigrationState *s;
 
     d = qdict_get_int(qdict, "value");
     if (d < 0) {
         d = 0;
     }
     max_throttle = d;
-
-    s = migrate_to_fms(current_migration);
-    if (s && s->file) {
-        qemu_file_set_rate_limit(s->file, max_throttle);
-    }
+    change_speed = 1;
 
     return 0;
 }
@@ -284,8 +281,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -307,14 +302,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
     return ret;
 }
 
-void migrate_fd_put_notify(void *opaque)
-{
-    FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-    qemu_file_put_notify(s->file);
-}
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 {
     FdMigrationState *s = opaque;
@@ -327,9 +314,7 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
     if (ret == -1)
         ret = -(s->get_error(s));
 
-    if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
-    } else if (ret < 0) {
+    if (ret < 0 && ret != -EAGAIN) {
         if (s->mon) {
             monitor_resume(s->mon);
         }
@@ -340,36 +325,59 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
     return ret;
 }
 
-void migrate_fd_connect(FdMigrationState *s)
+static void migrate_fd_set_speed(void)
 {
-    int ret;
+    FdMigrationState *s = migrate_to_fms(current_migration);
+    if (s && s->file && change_speed) {
+        qemu_file_set_rate_limit(s->file, max_throttle);
+        change_speed = 0;
+    }
+}
+
+static void migrate_fd_terminate(FdMigrationState *s)
+{
+    notifier_list_notify(&migration_state_notifiers);
+    qemu_savevm_state_cancel(s->mon, s->file);
 
+    migrate_fd_cleanup(s);
+}
+
+void migrate_fd_connect(FdMigrationState *s)
+{
+    s->begin = 1;
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
                                       migrate_fd_put_ready,
                                       migrate_fd_wait_for_unfreeze,
                                       migrate_fd_close);
-
-    DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
-                                  s->mig_state.shared);
-    if (ret < 0) {
-        DPRINTF("failed, %d\n", ret);
-        migrate_fd_error(s);
-        return;
-    }
-    
-    migrate_fd_put_ready(s);
 }
 
 void migrate_fd_put_ready(void *opaque)
 {
     FdMigrationState *s = opaque;
+    int ret;
 
     if (s->state != MIG_STATE_ACTIVE) {
         DPRINTF("put_ready returning because of non-active state\n");
+        if (s->state == MIG_STATE_CANCELLED) {
+            migrate_fd_terminate(s);
+        }
         return;
+    } else {
+        migrate_fd_set_speed();
+    }
+
+    if (s->begin) {
+        DPRINTF("beginning savevm\n");
+        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+                s->mig_state.shared);
+        if (ret < 0) {
+            DPRINTF("failed, %d\n", ret);
+            migrate_fd_error(s);
+            return;
+        }
+        s->begin = 0;
     }
 
     DPRINTF("iterate\n");
@@ -415,10 +423,6 @@  void migrate_fd_cancel(MigrationState *mig_state)
     DPRINTF("cancelling migration\n");
 
     s->state = MIG_STATE_CANCELLED;
-    notifier_list_notify(&migration_state_notifiers);
-    qemu_savevm_state_cancel(s->mon, s->file);
-
-    migrate_fd_cleanup(s);
 }
 
 void migrate_fd_release(MigrationState *mig_state)
@@ -458,7 +462,6 @@  int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..4f50df8 100644
--- a/migration.h
+++ b/migration.h
@@ -45,6 +45,7 @@  struct FdMigrationState
     int fd;
     Monitor *mon;
     int state;
+    int begin;
     int (*get_error)(struct FdMigrationState*);
     int (*close)(struct FdMigrationState*);
     int (*write)(struct FdMigrationState*, const void *, size_t);
diff --git a/savevm.c b/savevm.c
index 8139bc7..f54f555 100644
--- a/savevm.c
+++ b/savevm.c
@@ -481,11 +481,6 @@  int qemu_fclose(QEMUFile *f)
     return ret;
 }
 
-void qemu_file_put_notify(QEMUFile *f)
-{
-    f->put_buffer(f->opaque, NULL, 0, 0);
-}
-
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
 {
     int l;