diff mbox

[RFC,v2,1/3] separate thread for VM migration

Message ID e5a28f3e3d8ff3c5c08b307ef320051632832718.1311971938.git.udeshpan@redhat.com
State New
Headers show

Commit Message

Umesh Deshpande July 29, 2011, 8:57 p.m. UTC
This patch creates a separate thread for the guest migration on the source side.

Signed-off-by: Umesh Deshpande <udeshpan@redhat.com>
---
 buffered_file.c |   28 ++++++++++++++++---------
 buffered_file.h |    4 +++
 migration.c     |   59 +++++++++++++++++++++++++++++++++++++++++++-----------
 migration.h     |    3 ++
 savevm.c        |   22 +-------------------
 savevm.h        |   29 +++++++++++++++++++++++++++
 6 files changed, 102 insertions(+), 43 deletions(-)
 create mode 100644 savevm.h

Comments

Paolo Bonzini Aug. 1, 2011, 9:37 a.m. UTC | #1
On 07/29/2011 10:57 PM, Umesh Deshpande wrote:
> This patch creates a separate thread for the guest migration on the source side.
>
> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com>

Looks pretty good!

One thing that shows, is that the interface separation between 
buffered_file.c is migration.c is pretty weird.  Your patch makes it 
somewhat worse, but it was like this before so it's not your fault.  The 
good thing is that if buffered_file.c uses threads, you can fix a large 
part of this and get even simpler code:

1) there is really just one way to implement migrate_fd_put_notify, and 
with your simplifications it does not belong anymore in migration.c.

2) s->callback is actually not NULL exactly if s->file->frozen_output is 
true, you can remove it as well;

3) buffered_close is messy because it can be called from both the 
iothread (monitor->migrate_fd_cancel->migrate_fd_cleanup->qemu_fclose) 
or the migration thread (after qemu_savevm_state_complete).  But 
buffered_close is actually very similar to your thread function (it does 
flush+wait_for_unfreeze, basically)!  So buffered_close can be simply:

     s->closed = 1;
     ret = qemu_thread_join(s->thread); /* doesn't exist yet :) */
     qemu_free(...);
     return ret;

Another nit is that here:

> +        if (migrate_fd_check_expire()) {
> +            buffered_rate_tick(s->file);
> +        }
> +
> +        if (s->state != MIG_STATE_ACTIVE) {
> +            break;
> +        }
> +
> +        if (s->callback) {
> +            migrate_fd_wait_for_unfreeze(s);
> +            s->callback(s);
> +        }

you can still have a busy wait.

Putting it all together, you can move the thread function back to 
buffered_file.c like:

     while (!s->closed || (!s->has_error && s->buffer_size)) {
         if (s->freeze_output) {
             qemu_mutex_unlock_iothread();
             s->wait_for_unfreeze(s);
             qemu_mutex_lock_iothread();
             /* This comes from qemu_file_put_notify (via
                buffered_put_buffer---can be simplified a lot too?).
             s->freeze_output = 0;
             /* Test again for cancellation.  */
             continue;
         }

         int64_t current_time = qemu_get_clock_ms(rt_clock);
         if (s->expire_time > current_time) {
             struct timeval tv = { .tv_sec = 0, .tv_usec = ... };
             qemu_mutex_unlock_iothread();
             select (0, NULL, NULL, NULL, &tv);
             qemu_mutex_lock_iothread();
             s->expire_time = qemu_get_clock_ms(rt_clock) + 100;
             continue;
         }

         /* This comes from buffered_rate_tick.  */
         s->bytes_xfer = 0;
         buffered_flush(s);
         if (!s->closed) {
             s->put_ready(s->opaque);
         }
     }

     ret = s->close(s->opaque);
     ...

Does it look sane?

Paolo
Umesh Deshpande Aug. 1, 2011, 9 p.m. UTC | #2
On 08/01/2011 05:37 AM, Paolo Bonzini wrote:
> On 07/29/2011 10:57 PM, Umesh Deshpande wrote:
>> This patch creates a separate thread for the guest migration on the 
>> source side.
>>
>> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com>
>
> Looks pretty good!
>
> One thing that shows, is that the interface separation between 
> buffered_file.c is migration.c is pretty weird.  Your patch makes it 
> somewhat worse, but it was like this before so it's not your fault.  
> The good thing is that if buffered_file.c uses threads, you can fix a 
> large part of this and get even simpler code:
>
> 1) there is really just one way to implement migrate_fd_put_notify, 
> and with your simplifications it does not belong anymore in migration.c.
>
> 2) s->callback is actually not NULL exactly if s->file->frozen_output 
> is true, you can remove it as well;
>
> 3) buffered_close is messy because it can be called from both the 
> iothread (monitor->migrate_fd_cancel->migrate_fd_cleanup->qemu_fclose) 
> or the migration thread (after qemu_savevm_state_complete).  But 
> buffered_close is actually very similar to your thread function (it 
> does flush+wait_for_unfreeze, basically)!  So buffered_close can be 
> simply:
>
>     s->closed = 1;
>     ret = qemu_thread_join(s->thread); /* doesn't exist yet :) */
>     qemu_free(...);
>     return ret;
>
> Another nit is that here:
>
>> +        if (migrate_fd_check_expire()) {
>> +            buffered_rate_tick(s->file);
>> +        }
>> +
>> +        if (s->state != MIG_STATE_ACTIVE) {
>> +            break;
>> +        }
>> +
>> +        if (s->callback) {
>> +            migrate_fd_wait_for_unfreeze(s);
>> +            s->callback(s);
>> +        }
>
> you can still have a busy wait.
>
> Putting it all together, you can move the thread function back to 
> buffered_file.c like:
>
>     while (!s->closed || (!s->has_error && s->buffer_size)) {
>         if (s->freeze_output) {
>             qemu_mutex_unlock_iothread();
>             s->wait_for_unfreeze(s);
>             qemu_mutex_lock_iothread();
>             /* This comes from qemu_file_put_notify (via
>                buffered_put_buffer---can be simplified a lot too?).
>             s->freeze_output = 0;
>             /* Test again for cancellation.  */
>             continue;
>         }
>
>         int64_t current_time = qemu_get_clock_ms(rt_clock);
>         if (s->expire_time > current_time) {
>             struct timeval tv = { .tv_sec = 0, .tv_usec = ... };
>             qemu_mutex_unlock_iothread();
>             select (0, NULL, NULL, NULL, &tv);
>             qemu_mutex_lock_iothread();
>             s->expire_time = qemu_get_clock_ms(rt_clock) + 100;
>             continue;
>         }
>
>         /* This comes from buffered_rate_tick.  */
>         s->bytes_xfer = 0;
>         buffered_flush(s);
>         if (!s->closed) {
>             s->put_ready(s->opaque);
>         }
>     }
>
>     ret = s->close(s->opaque);
>     ...
>
> Does it look sane?
>
I kept this in migration.c to call qemu_savevm_state_begin. (The way it 
is done currently. i.e. to keep access to FdMigrationState in migration.c)
Calling it from buffered_file.c would be inconsistent in that sense. or 
we will have to call it from the iothread before spawning the migration 
thread.

Also why is the separation between FdMigrationState and QEMUFileBuffered 
is required. Is QEMUFileBuffered designed to use also for things other 
than migration?

Thanks
Umesh
>
> Paolo
Paolo Bonzini Aug. 2, 2011, 7:44 a.m. UTC | #3
On 08/01/2011 11:00 PM, Umesh Deshpande wrote:
>>
> I kept this in migration.c to call qemu_savevm_state_begin. (The way it
> is done currently. i.e. to keep access to FdMigrationState in migration.c)
> Calling it from buffered_file.c would be inconsistent in that sense. or
> we will have to call it from the iothread before spawning the migration
> thread.

Right, I missed that.  Perhaps you can call it the first time put_ready 
is called.

> Also why is the separation between FdMigrationState and QEMUFileBuffered
> is required. Is QEMUFileBuffered designed to use also for things other
> than migration?

No, but let's keep it this way for now.  It may be an annoyance, but it 
also helps making a reusable architecture, and it can probably be 
cleaned up substantially with thread support.

Paolo
diff mbox

Patch

diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..d4146bf 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,12 +16,16 @@ 
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "buffered_file.h"
+#include "migration.h"
+#include "savevm.h"
+#include "qemu-thread.h"
 
 //#define DEBUG_BUFFERED_FILE
 
 typedef struct QEMUFileBuffered
 {
     BufferedPutFunc *put_buffer;
+    BufferedBeginFunc *begin;
     BufferedPutReadyFunc *put_ready;
     BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
     BufferedCloseFunc *close;
@@ -35,6 +39,7 @@  typedef struct QEMUFileBuffered
     size_t buffer_size;
     size_t buffer_capacity;
     QEMUTimer *timer;
+    QemuThread thread;
 } QEMUFileBuffered;
 
 #ifdef DEBUG_BUFFERED_FILE
@@ -181,8 +186,6 @@  static int buffered_close(void *opaque)
 
     ret = s->close(s->opaque);
 
-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
     qemu_free(s->buffer);
     qemu_free(s);
 
@@ -228,17 +231,15 @@  static int64_t buffered_get_rate_limit(void *opaque)
     return s->xfer_limit;
 }
 
-static void buffered_rate_tick(void *opaque)
+void buffered_rate_tick(QEMUFile *file)
 {
-    QEMUFileBuffered *s = opaque;
+    QEMUFileBuffered *s = file->opaque;
 
     if (s->has_error) {
         buffered_close(s);
         return;
     }
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
-
     if (s->freeze_output)
         return;
 
@@ -250,9 +251,17 @@  static void buffered_rate_tick(void *opaque)
     s->put_ready(s->opaque);
 }
 
+static void *migrate_vm(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+    s->begin(s->opaque);
+    return NULL;
+}
+
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
                                   size_t bytes_per_sec,
                                   BufferedPutFunc *put_buffer,
+                                  BufferedBeginFunc *begin,
                                   BufferedPutReadyFunc *put_ready,
                                   BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
                                   BufferedCloseFunc *close)
@@ -264,6 +273,7 @@  QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->opaque = opaque;
     s->xfer_limit = bytes_per_sec / 10;
     s->put_buffer = put_buffer;
+    s->begin = begin;
     s->put_ready = put_ready;
     s->wait_for_unfreeze = wait_for_unfreeze;
     s->close = close;
@@ -271,11 +281,9 @@  QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-			     buffered_get_rate_limit);
-
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+                             buffered_get_rate_limit);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, migrate_vm, s);
 
     return s->file;
 }
diff --git a/buffered_file.h b/buffered_file.h
index 98d358b..cfe2833 100644
--- a/buffered_file.h
+++ b/buffered_file.h
@@ -17,12 +17,16 @@ 
 #include "hw/hw.h"
 
 typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedBeginFunc)(void *opaque);
 typedef void (BufferedPutReadyFunc)(void *opaque);
 typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
 typedef int (BufferedCloseFunc)(void *opaque);
 
+void buffered_rate_tick(QEMUFile *file);
+
 QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
                                   BufferedPutFunc *put_buffer,
+                                  BufferedBeginFunc *begin,
                                   BufferedPutReadyFunc *put_ready,
                                   BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
                                   BufferedCloseFunc *close);
diff --git a/migration.c b/migration.c
index af3a1f2..bf86067 100644
--- a/migration.c
+++ b/migration.c
@@ -31,6 +31,8 @@ 
     do { } while (0)
 #endif
 
+static int64_t expire_time;
+
 /* Migration speed throttling */
 static int64_t max_throttle = (32 << 20);
 
@@ -284,8 +286,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -310,8 +310,7 @@  int migrate_fd_cleanup(FdMigrationState *s)
 void migrate_fd_put_notify(void *opaque)
 {
     FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+    s->callback = NULL;
     qemu_file_put_notify(s->file);
 }
 
@@ -328,7 +327,7 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
         ret = -(s->get_error(s));
 
     if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
+        s->callback = migrate_fd_put_notify;
     } else if (ret < 0) {
         if (s->mon) {
             monitor_resume(s->mon);
@@ -342,27 +341,66 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 
 void migrate_fd_connect(FdMigrationState *s)
 {
-    int ret;
-
+    s->callback = NULL;
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
+                                      migrate_fd_begin,
                                       migrate_fd_put_ready,
                                       migrate_fd_wait_for_unfreeze,
                                       migrate_fd_close);
+}
+
+static int migrate_fd_check_expire(void)
+{
+    int64_t current_time = qemu_get_clock_ms(rt_clock);
+
+    if (expire_time > current_time) {
+        return 0;
+    } else {
+        expire_time = qemu_get_clock_ms(rt_clock) + 100;
+        return 1;
+    }
+}
+
+void migrate_fd_begin(void *arg)
+{
+    FdMigrationState *s = arg;
+    int ret;
 
+    qemu_mutex_lock_iothread();
     DPRINTF("beginning savevm\n");
     ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
                                   s->mig_state.shared);
     if (ret < 0) {
         DPRINTF("failed, %d\n", ret);
         migrate_fd_error(s);
-        return;
+        goto out;
     }
-    
+
+    expire_time = qemu_get_clock_ms(rt_clock) + 100;
     migrate_fd_put_ready(s);
+
+    while (s->state == MIG_STATE_ACTIVE) {
+        if (migrate_fd_check_expire()) {
+            buffered_rate_tick(s->file);
+        }
+
+        if (s->state != MIG_STATE_ACTIVE) {
+            break;
+        }
+
+        if (s->callback) {
+            migrate_fd_wait_for_unfreeze(s);
+            s->callback(s);
+        }
+    }
+
+out:
+    qemu_mutex_unlock_iothread();
 }
 
+
 void migrate_fd_put_ready(void *opaque)
 {
     FdMigrationState *s = opaque;
@@ -376,8 +414,6 @@  void migrate_fd_put_ready(void *opaque)
     if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
         int state;
         int old_vm_running = vm_running;
-
-        DPRINTF("done iterating\n");
         vm_stop(VMSTOP_MIGRATE);
 
         if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
@@ -458,7 +494,6 @@  int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..8ed34ab 100644
--- a/migration.h
+++ b/migration.h
@@ -48,6 +48,7 @@  struct FdMigrationState
     int (*get_error)(struct FdMigrationState*);
     int (*close)(struct FdMigrationState*);
     int (*write)(struct FdMigrationState*, const void *, size_t);
+    void (*callback)(void *);
     void *opaque;
 };
 
@@ -118,6 +119,8 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
 
 void migrate_fd_connect(FdMigrationState *s);
 
+void migrate_fd_begin(void *opaque);
+
 void migrate_fd_put_ready(void *opaque);
 
 int migrate_fd_get_status(MigrationState *mig_state);
diff --git a/savevm.c b/savevm.c
index 8139bc7..4859a34 100644
--- a/savevm.c
+++ b/savevm.c
@@ -82,6 +82,7 @@ 
 #include "qemu_socket.h"
 #include "qemu-queue.h"
 #include "cpus.h"
+#include "savevm.h"
 
 #define SELF_ANNOUNCE_ROUNDS 5
 
@@ -155,27 +156,6 @@  void qemu_announce_self(void)
 /***********************************************************/
 /* savevm/loadvm support */
 
-#define IO_BUF_SIZE 32768
-
-struct QEMUFile {
-    QEMUFilePutBufferFunc *put_buffer;
-    QEMUFileGetBufferFunc *get_buffer;
-    QEMUFileCloseFunc *close;
-    QEMUFileRateLimit *rate_limit;
-    QEMUFileSetRateLimit *set_rate_limit;
-    QEMUFileGetRateLimit *get_rate_limit;
-    void *opaque;
-    int is_write;
-
-    int64_t buf_offset; /* start of buffer when writing, end of buffer
-                           when reading */
-    int buf_index;
-    int buf_size; /* 0 when writing */
-    uint8_t buf[IO_BUF_SIZE];
-
-    int has_error;
-};
-
 typedef struct QEMUFileStdio
 {
     FILE *stdio_file;
diff --git a/savevm.h b/savevm.h
new file mode 100644
index 0000000..954eded
--- /dev/null
+++ b/savevm.h
@@ -0,0 +1,29 @@ 
+#ifndef QEMU_SAVEVM_H
+#define QEMU_SAVEVM_H
+
+#include "hw/hw.h"
+
+#define IO_BUF_SIZE 32768
+
+struct QEMUFile {
+    QEMUFilePutBufferFunc *put_buffer;
+    QEMUFileGetBufferFunc *get_buffer;
+    QEMUFileCloseFunc *close;
+    QEMUFileRateLimit *rate_limit;
+    QEMUFileSetRateLimit *set_rate_limit;
+    QEMUFileGetRateLimit *get_rate_limit;
+    void *opaque;
+    int is_write;
+
+    int64_t buf_offset; /* start of buffer when writing, end of buffer
+                           when reading */
+    int buf_index;
+    int buf_size; /* 0 when
+     * writing
+     * */
+    uint8_t buf[IO_BUF_SIZE];
+
+    int has_error;
+};
+
+#endif