From patchwork Wed Aug 24 03:12:50 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Umesh Deshpande X-Patchwork-Id: 111228 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [140.186.70.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 737E8B6F69 for ; Wed, 24 Aug 2011 13:14:12 +1000 (EST) Received: from localhost ([::1]:53360 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Qw3uq-000172-ON for incoming@patchwork.ozlabs.org; Tue, 23 Aug 2011 23:14:08 -0400 Received: from eggs.gnu.org ([140.186.70.92]:44204) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Qw3uU-0000ke-H0 for qemu-devel@nongnu.org; Tue, 23 Aug 2011 23:13:49 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Qw3uN-00016m-8v for qemu-devel@nongnu.org; Tue, 23 Aug 2011 23:13:45 -0400 Received: from mx1.redhat.com ([209.132.183.28]:1471) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Qw3uM-000151-Fw for qemu-devel@nongnu.org; Tue, 23 Aug 2011 23:13:39 -0400 Received: from int-mx10.intmail.prod.int.phx2.redhat.com (int-mx10.intmail.prod.int.phx2.redhat.com [10.5.11.23]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id p7O3Dbmm032285 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK); Tue, 23 Aug 2011 23:13:37 -0400 Received: from ns3.rdu.redhat.com (ns3.rdu.redhat.com [10.11.255.199]) by int-mx10.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id p7O3Dbn5003931; Tue, 23 Aug 2011 23:13:37 -0400 Received: from umeshhome.redhat.com (vpn-11-16.rdu.redhat.com [10.11.11.16]) by ns3.rdu.redhat.com (8.13.8/8.13.8) with ESMTP id p7O3DXSN009641; Tue, 23 Aug 2011 23:13:36 -0400 From: Umesh Deshpande To: kvm@vger.kernel.org, qemu-devel@nongnu.org Date: Tue, 23 Aug 2011 23:12:50 -0400 Message-Id: <3fa1f1529a65cdccf76154c6ab853dffd0a4abf2.1314153301.git.udeshpan@redhat.com> In-Reply-To: References: X-Scanned-By: MIMEDefang 2.68 on 10.5.11.23 X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 209.132.183.28 Cc: pbonzini@redhat.com, mtosatti@redhat.com, Umesh Deshpande , quintela@redhat.com Subject: [Qemu-devel] [RFC PATCH v5 4/4] Separate thread for VM migration X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org This patch creates a separate thread for the guest migration on the source side. All exits (on completion/error) from the migration thread are handled by a bottom handler, which is called from the iothread. Signed-off-by: Umesh Deshpande --- buffered_file.c | 75 +++++++++++++++++-------------- migration.c | 122 +++++++++++++++++++++++++++++--------------------- migration.h | 9 ++++ qemu-thread-posix.c | 10 ++++ qemu-thread.h | 1 + savevm.c | 5 -- 6 files changed, 132 insertions(+), 90 deletions(-) diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..0d94baa 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,6 +16,8 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered void *opaque; QEMUFile *file; int has_error; + int closed; int freeze_output; size_t bytes_xfer; size_t xfer_limit; uint8_t *buffer; size_t buffer_size; size_t buffer_capacity; - QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in offset = size; } - if (pos == 0 && size == 0) { - DPRINTF("file is ready\n"); - if (s->bytes_xfer <= s->xfer_limit) { - DPRINTF("notifying client\n"); - s->put_ready(s->opaque); - } - } - return offset; } @@ -173,22 +168,25 @@ static int buffered_close(void *opaque) DPRINTF("closing\n"); - while (!s->has_error && s->buffer_size) { - buffered_flush(s); - if (s->freeze_output) - s->wait_for_unfreeze(s); - } + s->closed = 1; - ret = s->close(s->opaque); + qemu_mutex_unlock_migthread(); + qemu_mutex_unlock_iothread(); + + qemu_thread_join(&s->thread); + /* Waits for the completion of the migration thread */ - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); + qemu_mutex_lock_iothread(); + qemu_mutex_lock_migthread(); + + ret = s->close(s->opaque); qemu_free(s->buffer); qemu_free(s); return ret; } + static int buffered_rate_limit(void *opaque) { QEMUFileBuffered *s = opaque; @@ -228,26 +226,36 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +static void *migrate_vm(void *opaque) { QEMUFileBuffered *s = opaque; + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; - if (s->has_error) { - buffered_close(s); - return; - } - - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + while (!s->has_error && (!s->closed || s->buffer_size)) { + if (s->freeze_output) { + s->wait_for_unfreeze(s); + s->freeze_output = 0; + continue; + } - if (s->freeze_output) - return; + current_time = qemu_get_clock_ms(rt_clock); + if (!s->closed && (expire_time > current_time)) { + tv.tv_usec = 1000 * (expire_time - current_time); + select(0, NULL, NULL, NULL, &tv); + continue; + } - s->bytes_xfer = 0; + s->bytes_xfer = 0; + buffered_flush(s); - buffered_flush(s); + expire_time = qemu_get_clock_ms(rt_clock) + 100; + if (!s->closed) { + s->put_ready(s->opaque); + } + } - /* Add some checks around this */ - s->put_ready(s->opaque); + return NULL; } QEMUFile *qemu_fopen_ops_buffered(void *opaque, @@ -267,15 +275,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; + s->closed = 0; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/migration.c b/migration.c index af3a1f2..17d866a 100644 --- a/migration.c +++ b/migration.c @@ -149,10 +149,12 @@ int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data) } max_throttle = d; + qemu_mutex_lock_migthread(); s = migrate_to_fms(current_migration); if (s && s->file) { qemu_file_set_rate_limit(s->file, max_throttle); } + qemu_mutex_unlock_migthread(); return 0; } @@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); if (qemu_fclose(s->file) != 0) { @@ -307,14 +307,6 @@ int migrate_fd_cleanup(FdMigrationState *s) return ret; } -void migrate_fd_put_notify(void *opaque) -{ - FdMigrationState *s = opaque; - - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - qemu_file_put_notify(s->file); -} - ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) { FdMigrationState *s = opaque; @@ -327,76 +319,91 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) if (ret == -1) ret = -(s->get_error(s)); - if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); - } else if (ret < 0) { - if (s->mon) { - monitor_resume(s->mon); + return ret; +} + +static void migrate_fd_terminate(void *opaque) +{ + FdMigrationState *s = opaque; + + qemu_mutex_lock_migthread(); + + if (s->code == COMPLETE) { + if (migrate_fd_cleanup(s) < 0) { + if (s->old_vm_running) { + vm_start(); + } + s->state = MIG_STATE_ERROR; + } else { + s->state = MIG_STATE_COMPLETED; } - s->state = MIG_STATE_ERROR; notifier_list_notify(&migration_state_notifiers); + } else if (s->code == RESUME) { + if (s->old_vm_running) { + vm_start(); + } + migrate_fd_error(s); + } else if (s->code == ERROR) { + migrate_fd_error(s); } - return ret; + qemu_mutex_unlock_migthread(); } void migrate_fd_connect(FdMigrationState *s) { - int ret; - + s->code = START; + s->bh = qemu_bh_new(migrate_fd_terminate, s); s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); - - DPRINTF("beginning savevm\n"); - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, - s->mig_state.shared); - if (ret < 0) { - DPRINTF("failed, %d\n", ret); - migrate_fd_error(s); - return; - } - - migrate_fd_put_ready(s); } void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; + int ret; - if (s->state != MIG_STATE_ACTIVE) { + qemu_mutex_lock_iothread(); + if (s->code != ACTIVE && s->code != START) { DPRINTF("put_ready returning because of non-active state\n"); + qemu_mutex_unlock_iothread(); return; } + if (!s->code) { + DPRINTF("beginning savevm\n"); + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, + s->mig_state.shared); + if (ret < 0) { + DPRINTF("failed, %d\n", ret); + s->code = ERROR; + qemu_bh_schedule(s->bh); + qemu_mutex_unlock_iothread(); + return; + } + s->code = ACTIVE; + } + DPRINTF("iterate\n"); if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { - int state; - int old_vm_running = vm_running; + s->old_vm_running = vm_running; DPRINTF("done iterating\n"); vm_stop(VMSTOP_MIGRATE); if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) { - if (old_vm_running) { - vm_start(); - } - state = MIG_STATE_ERROR; + s->code = RESUME; } else { - state = MIG_STATE_COMPLETED; - } - if (migrate_fd_cleanup(s) < 0) { - if (old_vm_running) { - vm_start(); - } - state = MIG_STATE_ERROR; + s->code = COMPLETE; } - s->state = state; - notifier_list_notify(&migration_state_notifiers); + + qemu_bh_schedule(s->bh); } + qemu_mutex_unlock_iothread(); } int migrate_fd_get_status(MigrationState *mig_state) @@ -416,9 +423,11 @@ void migrate_fd_cancel(MigrationState *mig_state) s->state = MIG_STATE_CANCELLED; notifier_list_notify(&migration_state_notifiers); - qemu_savevm_state_cancel(s->mon, s->file); + qemu_mutex_lock_migthread(); + qemu_savevm_state_cancel(s->mon, s->file); migrate_fd_cleanup(s); + qemu_mutex_unlock_migthread(); } void migrate_fd_release(MigrationState *mig_state) @@ -426,22 +435,34 @@ void migrate_fd_release(MigrationState *mig_state) FdMigrationState *s = migrate_to_fms(mig_state); DPRINTF("releasing state\n"); - + if (s->state == MIG_STATE_ACTIVE) { s->state = MIG_STATE_CANCELLED; notifier_list_notify(&migration_state_notifiers); + qemu_mutex_lock_migthread(); migrate_fd_cleanup(s); + qemu_mutex_unlock_migthread(); + } + + if (s->bh) { + qemu_bh_delete(s->bh); } + qemu_free(s); } void migrate_fd_wait_for_unfreeze(void *opaque) { FdMigrationState *s = opaque; - int ret; + int ret, state; DPRINTF("wait for unfreeze\n"); - if (s->state != MIG_STATE_ACTIVE) + + qemu_mutex_lock_iothread(); + state = s->state; + qemu_mutex_unlock_iothread(); + + if (state != MIG_STATE_ACTIVE) return; do { @@ -458,7 +479,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..abbf9e4 100644 --- a/migration.h +++ b/migration.h @@ -23,6 +23,12 @@ #define MIG_STATE_CANCELLED 1 #define MIG_STATE_ACTIVE 2 +#define START 0 +#define ACTIVE 1 +#define COMPLETE 2 +#define ERROR 3 +#define RESUME 4 + typedef struct MigrationState MigrationState; struct MigrationState @@ -45,10 +51,13 @@ struct FdMigrationState int fd; Monitor *mon; int state; + int code; + int old_vm_running; int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); void *opaque; + QEMUBH *bh; }; void process_incoming_migration(QEMUFile *f); diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c index 2bd02ef..6a275be 100644 --- a/qemu-thread-posix.c +++ b/qemu-thread-posix.c @@ -115,6 +115,16 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) error_exit(err, __func__); } +void qemu_thread_join(QemuThread *thread) +{ + int err; + + err = pthread_join(thread->thread, NULL); + if (err) { + error_exit(err, __func__); + } +} + void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*), void *arg) diff --git a/qemu-thread.h b/qemu-thread.h index 0a73d50..d5b99d5 100644 --- a/qemu-thread.h +++ b/qemu-thread.h @@ -30,6 +30,7 @@ void qemu_cond_destroy(QemuCond *cond); void qemu_cond_signal(QemuCond *cond); void qemu_cond_broadcast(QemuCond *cond); void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex); +void qemu_thread_join(QemuThread *thread); void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*), diff --git a/savevm.c b/savevm.c index 8139bc7..f54f555 100644 --- a/savevm.c +++ b/savevm.c @@ -481,11 +481,6 @@ int qemu_fclose(QEMUFile *f) return ret; } -void qemu_file_put_notify(QEMUFile *f) -{ - f->put_buffer(f->opaque, NULL, 0, 0); -} - void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size) { int l;