@@ -16,12 +16,16 @@
#include "qemu-timer.h"
#include "qemu-char.h"
#include "buffered_file.h"
+#include "migration.h"
+#include "savevm.h"
+#include "qemu-thread.h"
//#define DEBUG_BUFFERED_FILE
typedef struct QEMUFileBuffered
{
BufferedPutFunc *put_buffer;
+ BufferedBeginFunc *begin;
BufferedPutReadyFunc *put_ready;
BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
BufferedCloseFunc *close;
@@ -35,6 +39,7 @@ typedef struct QEMUFileBuffered
size_t buffer_size;
size_t buffer_capacity;
QEMUTimer *timer;
+ QemuThread thread;
} QEMUFileBuffered;
#ifdef DEBUG_BUFFERED_FILE
@@ -181,8 +186,6 @@ static int buffered_close(void *opaque)
ret = s->close(s->opaque);
- qemu_del_timer(s->timer);
- qemu_free_timer(s->timer);
qemu_free(s->buffer);
qemu_free(s);
@@ -228,17 +231,15 @@ static int64_t buffered_get_rate_limit(void *opaque)
return s->xfer_limit;
}
-static void buffered_rate_tick(void *opaque)
+void buffered_rate_tick(QEMUFile *file)
{
- QEMUFileBuffered *s = opaque;
+ QEMUFileBuffered *s = file->opaque;
if (s->has_error) {
buffered_close(s);
return;
}
- qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
-
if (s->freeze_output)
return;
@@ -250,9 +251,17 @@ static void buffered_rate_tick(void *opaque)
s->put_ready(s->opaque);
}
+static void *migrate_vm(void *opaque)
+{
+ QEMUFileBuffered *s = opaque;
+ s->begin(s->opaque);
+ return NULL;
+}
+
QEMUFile *qemu_fopen_ops_buffered(void *opaque,
size_t bytes_per_sec,
BufferedPutFunc *put_buffer,
+ BufferedBeginFunc *begin,
BufferedPutReadyFunc *put_ready,
BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
BufferedCloseFunc *close)
@@ -264,6 +273,7 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
s->opaque = opaque;
s->xfer_limit = bytes_per_sec / 10;
s->put_buffer = put_buffer;
+ s->begin = begin;
s->put_ready = put_ready;
s->wait_for_unfreeze = wait_for_unfreeze;
s->close = close;
@@ -271,11 +281,9 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
buffered_close, buffered_rate_limit,
buffered_set_rate_limit,
- buffered_get_rate_limit);
-
- s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+ buffered_get_rate_limit);
- qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+ qemu_thread_create(&s->thread, migrate_vm, s);
return s->file;
}
@@ -17,12 +17,16 @@
#include "hw/hw.h"
typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedBeginFunc)(void *opaque);
typedef void (BufferedPutReadyFunc)(void *opaque);
typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
typedef int (BufferedCloseFunc)(void *opaque);
+void buffered_rate_tick(QEMUFile *file);
+
QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
BufferedPutFunc *put_buffer,
+ BufferedBeginFunc *begin,
BufferedPutReadyFunc *put_ready,
BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
BufferedCloseFunc *close);
@@ -31,6 +31,8 @@
do { } while (0)
#endif
+static int64_t expire_time;
+
/* Migration speed throttling */
static int64_t max_throttle = (32 << 20);
@@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
{
int ret = 0;
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
if (s->file) {
DPRINTF("closing file\n");
if (qemu_fclose(s->file) != 0) {
@@ -310,8 +310,7 @@ int migrate_fd_cleanup(FdMigrationState *s)
void migrate_fd_put_notify(void *opaque)
{
FdMigrationState *s = opaque;
-
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
+ s->callback = NULL;
qemu_file_put_notify(s->file);
}
@@ -328,7 +327,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
ret = -(s->get_error(s));
if (ret == -EAGAIN) {
- qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
+ s->callback = migrate_fd_put_notify;
} else if (ret < 0) {
if (s->mon) {
monitor_resume(s->mon);
@@ -342,27 +341,66 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
void migrate_fd_connect(FdMigrationState *s)
{
- int ret;
-
+ s->callback = NULL;
s->file = qemu_fopen_ops_buffered(s,
s->bandwidth_limit,
migrate_fd_put_buffer,
+ migrate_fd_begin,
migrate_fd_put_ready,
migrate_fd_wait_for_unfreeze,
migrate_fd_close);
+}
+
+static int migrate_fd_check_expire(void)
+{
+ int64_t current_time = qemu_get_clock_ms(rt_clock);
+
+ if (expire_time > current_time) {
+ return 0;
+ } else {
+ expire_time = qemu_get_clock_ms(rt_clock) + 100;
+ return 1;
+ }
+}
+
+void migrate_fd_begin(void *arg)
+{
+ FdMigrationState *s = arg;
+ int ret;
+ qemu_mutex_lock_iothread();
DPRINTF("beginning savevm\n");
ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
s->mig_state.shared);
if (ret < 0) {
DPRINTF("failed, %d\n", ret);
migrate_fd_error(s);
- return;
+ goto out;
}
-
+
+ expire_time = qemu_get_clock_ms(rt_clock) + 100;
migrate_fd_put_ready(s);
+
+ while (s->state == MIG_STATE_ACTIVE) {
+ if (migrate_fd_check_expire()) {
+ buffered_rate_tick(s->file);
+ }
+
+ if (s->state != MIG_STATE_ACTIVE) {
+ break;
+ }
+
+ if (s->callback) {
+ migrate_fd_wait_for_unfreeze(s);
+ s->callback(s);
+ }
+ }
+
+out:
+ qemu_mutex_unlock_iothread();
}
+
void migrate_fd_put_ready(void *opaque)
{
FdMigrationState *s = opaque;
@@ -376,8 +414,6 @@ void migrate_fd_put_ready(void *opaque)
if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
int state;
int old_vm_running = vm_running;
-
- DPRINTF("done iterating\n");
vm_stop(VMSTOP_MIGRATE);
if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
@@ -458,7 +494,6 @@ int migrate_fd_close(void *opaque)
{
FdMigrationState *s = opaque;
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
return s->close(s);
}
@@ -48,6 +48,7 @@ struct FdMigrationState
int (*get_error)(struct FdMigrationState*);
int (*close)(struct FdMigrationState*);
int (*write)(struct FdMigrationState*, const void *, size_t);
+ void (*callback)(void *);
void *opaque;
};
@@ -118,6 +119,8 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
void migrate_fd_connect(FdMigrationState *s);
+void migrate_fd_begin(void *opaque);
+
void migrate_fd_put_ready(void *opaque);
int migrate_fd_get_status(MigrationState *mig_state);
@@ -82,6 +82,7 @@
#include "qemu_socket.h"
#include "qemu-queue.h"
#include "cpus.h"
+#include "savevm.h"
#define SELF_ANNOUNCE_ROUNDS 5
@@ -155,27 +156,6 @@ void qemu_announce_self(void)
/***********************************************************/
/* savevm/loadvm support */
-#define IO_BUF_SIZE 32768
-
-struct QEMUFile {
- QEMUFilePutBufferFunc *put_buffer;
- QEMUFileGetBufferFunc *get_buffer;
- QEMUFileCloseFunc *close;
- QEMUFileRateLimit *rate_limit;
- QEMUFileSetRateLimit *set_rate_limit;
- QEMUFileGetRateLimit *get_rate_limit;
- void *opaque;
- int is_write;
-
- int64_t buf_offset; /* start of buffer when writing, end of buffer
- when reading */
- int buf_index;
- int buf_size; /* 0 when writing */
- uint8_t buf[IO_BUF_SIZE];
-
- int has_error;
-};
-
typedef struct QEMUFileStdio
{
FILE *stdio_file;
new file mode 100644
@@ -0,0 +1,29 @@
+#ifndef QEMU_SAVEVM_H
+#define QEMU_SAVEVM_H
+
+#include "hw/hw.h"
+
+#define IO_BUF_SIZE 32768
+
+struct QEMUFile {
+ QEMUFilePutBufferFunc *put_buffer;
+ QEMUFileGetBufferFunc *get_buffer;
+ QEMUFileCloseFunc *close;
+ QEMUFileRateLimit *rate_limit;
+ QEMUFileSetRateLimit *set_rate_limit;
+ QEMUFileGetRateLimit *get_rate_limit;
+ void *opaque;
+ int is_write;
+
+ int64_t buf_offset; /* start of buffer when writing, end of buffer
+ when reading */
+ int buf_index;
+ int buf_size; /* 0 when
+ * writing
+ * */
+ uint8_t buf[IO_BUF_SIZE];
+
+ int has_error;
+};
+
+#endif
This patch creates a separate thread for the guest migration on the source side. Signed-off-by: Umesh Deshpande <udeshpan@redhat.com> --- buffered_file.c | 28 ++++++++++++++++--------- buffered_file.h | 4 +++ migration.c | 59 +++++++++++++++++++++++++++++++++++++++++++----------- migration.h | 3 ++ savevm.c | 22 +------------------- savevm.h | 29 +++++++++++++++++++++++++++ 6 files changed, 102 insertions(+), 43 deletions(-) create mode 100644 savevm.h