From patchwork Thu Aug 28 15:03:31 2014 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Dr. David Alan Gilbert" X-Patchwork-Id: 383897 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 3B31C140119 for ; Fri, 29 Aug 2014 01:14:15 +1000 (EST) Received: from localhost ([::1]:37215 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XN1Ov-0007qX-9Q for incoming@patchwork.ozlabs.org; Thu, 28 Aug 2014 11:14:13 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:42464) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XN1Fh-0000WO-IF for qemu-devel@nongnu.org; Thu, 28 Aug 2014 11:04:47 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1XN1Fb-0001yO-7V for qemu-devel@nongnu.org; Thu, 28 Aug 2014 11:04:41 -0400 Received: from mx1.redhat.com ([209.132.183.28]:4644) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XN1Fa-0001yB-UV for qemu-devel@nongnu.org; Thu, 28 Aug 2014 11:04:35 -0400 Received: from int-mx13.intmail.prod.int.phx2.redhat.com (int-mx13.intmail.prod.int.phx2.redhat.com [10.5.11.26]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id s7SF4WcA002416 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=FAIL); Thu, 28 Aug 2014 11:04:32 -0400 Received: from dgilbert-t530.home.treblig.org (vpn1-5-132.ams2.redhat.com [10.36.5.132]) by int-mx13.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id s7SF44vp003726; Thu, 28 Aug 2014 11:04:31 -0400 From: "Dr. David Alan Gilbert (git)" To: qemu-devel@nongnu.org Date: Thu, 28 Aug 2014 16:03:31 +0100 Message-Id: <1409238244-31720-15-git-send-email-dgilbert@redhat.com> In-Reply-To: <1409238244-31720-1-git-send-email-dgilbert@redhat.com> References: <1409238244-31720-1-git-send-email-dgilbert@redhat.com> X-Scanned-By: MIMEDefang 2.68 on 10.5.11.26 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 209.132.183.28 Cc: aarcange@redhat.com, yamahata@private.email.ne.jp, amit.shah@redhat.com, lilei@linux.vnet.ibm.com, quintela@redhat.com Subject: [Qemu-devel] [PATCH v3 14/47] Return path: Source handling of return path X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org From: "Dr. David Alan Gilbert" Open a return path, and handle messages that are received upon it. Signed-off-by: Dr. David Alan Gilbert --- include/migration/migration.h | 10 +++ migration.c | 156 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/include/migration/migration.h b/include/migration/migration.h index 12e640d..b87c289 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -47,6 +47,14 @@ enum mig_rpcomm_cmd { MIG_RPCOMM_ACK, /* data (seq: be32 ) */ MIG_RPCOMM_AFTERLASTVALID }; + +/* Source side RP state */ +struct MigrationRetPathState { + uint32_t latest_ack; + QemuThread rp_thread; + bool error; +}; + typedef struct MigrationState MigrationState; /* State for the incoming migration */ @@ -69,9 +77,11 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + QEMUFile *return_path; int state; MigrationParams params; + struct MigrationRetPathState rp_state; double mbps; int64_t total_time; int64_t downtime; diff --git a/migration.c b/migration.c index 5ba8f3e..1754b67 100644 --- a/migration.c +++ b/migration.c @@ -371,6 +371,15 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) } } +static void migrate_fd_cleanup_src_rp(MigrationState *ms) +{ + if (ms->return_path) { + DPRINTF("cleaning up return path\n"); + qemu_fclose(ms->return_path); + ms->return_path = NULL; + } +} + static void migrate_fd_cleanup(void *opaque) { MigrationState *s = opaque; @@ -378,6 +387,8 @@ static void migrate_fd_cleanup(void *opaque) qemu_bh_delete(s->cleanup_bh); s->cleanup_bh = NULL; + migrate_fd_cleanup_src_rp(s); + if (s->file) { trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -414,6 +425,11 @@ static void migrate_fd_cancel(MigrationState *s) int old_state ; trace_migrate_fd_cancel(); + if (s->return_path) { + /* shutdown the rp socket, so causing the rp thread to shutdown */ + qemu_file_shutdown(s->return_path); + } + do { old_state = s->state; if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) { @@ -655,8 +671,146 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } -/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print something to indicate why + */ +static void source_return_path_bad(MigrationState *s) +{ + s->rp_state.error = true; + migrate_fd_cleanup_src_rp(s); +} + +/* + * Handles messages sent on the return path towards the source VM + * + */ +static void *source_return_path_thread(void *opaque) +{ + MigrationState *ms = opaque; + QEMUFile *rp = ms->return_path; + uint16_t expected_len, header_len, header_com; + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32; + int res; + + DPRINTF("RP: %s entry", __func__); + while (rp && !qemu_file_get_error(rp) && + migration_already_active(ms)) { + DPRINTF("RP: %s top of loop", __func__); + header_com = qemu_get_be16(rp); + header_len = qemu_get_be16(rp); + + switch (header_com) { + case MIG_RPCOMM_SHUT: + case MIG_RPCOMM_ACK: + expected_len = 4; + break; + + default: + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", + header_com, header_len); + source_return_path_bad(ms); + goto out; + } + + if (header_len > expected_len) { + error_report("RP: Received command 0x%04x with" + "incorrect length %d expecting %d", + header_com, header_len, + expected_len); + source_return_path_bad(ms); + goto out; + } + + /* We know we've got a valid header by this point */ + res = qemu_get_buffer(rp, buf, header_len); + if (res != header_len) { + DPRINTF("RP: Failed to read command data"); + source_return_path_bad(ms); + goto out; + } + + /* OK, we have the command and the data */ + switch (header_com) { + case MIG_RPCOMM_SHUT: + tmp32 = be32_to_cpup((uint32_t *)buf); + if (tmp32) { + error_report("RP: Sibling indicated error %d", tmp32); + source_return_path_bad(ms); + } else { + DPRINTF("RP: SHUT received"); + } + /* + * We'll let the main thread deal with closing the RP + * we could do a shutdown(2) on it, but we're the only user + * anyway, so there's nothing gained. + */ + goto out; + + case MIG_RPCOMM_ACK: + tmp32 = be32_to_cpup((uint32_t *)buf); + DPRINTF("RP: Received ACK 0x%x", tmp32); + atomic_xchg(&ms->rp_state.latest_ack, tmp32); + break; + + default: + /* This shouldn't happen because we should catch this above */ + DPRINTF("RP: Bad header_com in dispatch"); + } + /* Latest command processed, now leave a gap for the next one */ + header_com = MIG_RPCOMM_INVALID; + } + if (rp && qemu_file_get_error(rp)) { + DPRINTF("%s: rp bad at end", __func__); + source_return_path_bad(ms); + } + + DPRINTF("%s: Bottom exit", __func__); + +out: + return NULL; +} + +static int open_outgoing_return_path(MigrationState *ms) +{ + + ms->return_path = qemu_file_get_return_path(ms->file); + if (!ms->return_path) { + return -1; + } + + DPRINTF("%s: starting thread", __func__); + qemu_thread_create(&ms->rp_state.rp_thread, "return path", + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); + + DPRINTF("%s: continuing", __func__); + + return 0; +} + +static void await_outgoing_return_path_close(MigrationState *ms) +{ + /* + * If this is a normal exit then the destination will send a SHUT and the + * rp_thread will exit, however if there's an error we need to cause + * it to exit, which we can do by a shutdown. + * (canceling must also shutdown to stop us getting stuck here if + * the destination died at just the wrong place) + */ + if (qemu_file_get_error(ms->file) && ms->return_path) { + qemu_file_shutdown(ms->return_path); + } + DPRINTF("%s: Joining", __func__); + qemu_thread_join(&ms->rp_state.rp_thread); + DPRINTF("%s: Exit", __func__); +} +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque;