diff mbox

[v4,16/47] Return path: Source handling of return path

Message ID 1412358473-31398-17-git-send-email-dgilbert@redhat.com
State New
Headers show

Commit Message

Dr. David Alan Gilbert Oct. 3, 2014, 5:47 p.m. UTC
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 include/migration/migration.h |  10 +++
 migration.c                   | 181 +++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 190 insertions(+), 1 deletion(-)

Comments

Paolo Bonzini Oct. 4, 2014, 6:14 p.m. UTC | #1
Il 03/10/2014 19:47, Dr. David Alan Gilbert (git) ha scritto:
> +/* Source side RP state */
> +struct MigrationRetPathState {
> +    uint32_t      latest_ack;
> +    QemuThread    rp_thread;
> +    bool          error;

Should the QemuFile be in here?

> +};
> +

Also please do not abbrev words, and add a typedef that matches the
struct if it is useful.  If it is not, just embed the struct without
giving the type a name (struct { } rp_state).

> +static bool migration_already_active(MigrationState *ms)
> +{
> +    switch (ms->state) {
> +    case MIG_STATE_ACTIVE:
> +    case MIG_STATE_SETUP:
> +        return true;
> +
> +    default:
> +        return false;
> +
> +    }
> +}

Should CANCELLING also be considered active?  It is on the source->dest
path.

> 
> +static void await_outgoing_return_path_close(MigrationState *ms)
> +{
> +    /*
> +     * If this is a normal exit then the destination will send a SHUT and the
> +     * rp_thread will exit, however if there's an error we need to cause
> +     * it to exit, which we can do by a shutdown.
> +     * (canceling must also shutdown to stop us getting stuck here if
> +     * the destination died at just the wrong place)
> +     */
> +    if (qemu_file_get_error(ms->file) && ms->return_path) {
> +        qemu_file_shutdown(ms->return_path);
> +    }

As mentioned early, I think it's simpler to let these function handle
themselves the case where there is no return path, and call them
unconditionally.

Paolo
Zhanghailiang Oct. 16, 2014, 8:26 a.m. UTC | #2
On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote:
> From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
>
> Open a return path, and handle messages that are received upon it.
>
> Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> ---
>   include/migration/migration.h |  10 +++
>   migration.c                   | 181 +++++++++++++++++++++++++++++++++++++++++-
>   2 files changed, 190 insertions(+), 1 deletion(-)
>
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 12e640d..b87c289 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -47,6 +47,14 @@ enum mig_rpcomm_cmd {
>       MIG_RPCOMM_ACK,          /* data (seq: be32 ) */
>       MIG_RPCOMM_AFTERLASTVALID
>   };
> +
> +/* Source side RP state */
> +struct MigrationRetPathState {
> +    uint32_t      latest_ack;
> +    QemuThread    rp_thread;
> +    bool          error;
> +};
> +
>   typedef struct MigrationState MigrationState;
>
>   /* State for the incoming migration */
> @@ -69,9 +77,11 @@ struct MigrationState
>       QemuThread thread;
>       QEMUBH *cleanup_bh;
>       QEMUFile *file;
> +    QEMUFile *return_path;
>
>       int state;
>       MigrationParams params;
> +    struct MigrationRetPathState rp_state;
>       double mbps;
>       int64_t total_time;
>       int64_t downtime;
> diff --git a/migration.c b/migration.c
> index 5ba8f3e..ee6db1d 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -246,6 +246,23 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
>       return head;
>   }
>
> +/*
> + * Return true if we're already in the middle of a migration
> + * (i.e. any of the active or setup states)
> + */
> +static bool migration_already_active(MigrationState *ms)
> +{
> +    switch (ms->state) {
> +    case MIG_STATE_ACTIVE:
> +    case MIG_STATE_SETUP:
> +        return true;
> +
> +    default:
> +        return false;
> +
> +    }
> +}
> +
>   static void get_xbzrle_cache_stats(MigrationInfo *info)
>   {
>       if (migrate_use_xbzrle()) {
> @@ -371,6 +388,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state)
>       }
>   }
>
> +static void migrate_fd_cleanup_src_rp(MigrationState *ms)
> +{
> +    QEMUFile *rp = ms->return_path;
> +
> +    /*
> +     * When stuff goes wrong (e.g. failing destination) on the rp, it can get
> +     * cleaned up from a few threads; make sure not to do it twice in parallel
> +     */
> +    rp = atomic_cmpxchg(&ms->return_path, rp, NULL);
> +    if (rp) {
> +        DPRINTF("cleaning up return path\n");
> +        qemu_fclose(rp);
> +    }
> +}
> +
>   static void migrate_fd_cleanup(void *opaque)
>   {
>       MigrationState *s = opaque;
> @@ -378,6 +410,8 @@ static void migrate_fd_cleanup(void *opaque)
>       qemu_bh_delete(s->cleanup_bh);
>       s->cleanup_bh = NULL;
>
> +    migrate_fd_cleanup_src_rp(s);
> +
>       if (s->file) {
>           trace_migrate_fd_cleanup();
>           qemu_mutex_unlock_iothread();
> @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
>       int old_state ;
>       trace_migrate_fd_cancel();
>
> +    if (s->return_path) {
> +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> +        qemu_file_shutdown(s->return_path);
> +    }
> +
>       do {
>           old_state = s->state;
>           if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) {
> @@ -655,8 +694,148 @@ int64_t migrate_xbzrle_cache_size(void)
>       return s->xbzrle_cache_size;
>   }
>
> -/* migration thread support */
> +/*
> + * Something bad happened to the RP stream, mark an error
> + * The caller shall print something to indicate why
> + */
> +static void source_return_path_bad(MigrationState *s)
> +{
> +    s->rp_state.error = true;
> +    migrate_fd_cleanup_src_rp(s);
> +}
>
> +/*
> + * Handles messages sent on the return path towards the source VM
> + *
> + */
> +static void *source_return_path_thread(void *opaque)
> +{
> +    MigrationState *ms = opaque;
> +    QEMUFile *rp = ms->return_path;
> +    uint16_t expected_len, header_len, header_com;
> +    const int max_len = 512;
> +    uint8_t buf[max_len];
> +    uint32_t tmp32;
> +    int res;
> +
> +    DPRINTF("RP: %s entry", __func__);
> +    while (rp && !qemu_file_get_error(rp) &&
> +        migration_already_active(ms)) {
> +        DPRINTF("RP: %s top of loop", __func__);
> +        header_com = qemu_get_be16(rp);
> +        header_len = qemu_get_be16(rp);
> +
> +        switch (header_com) {
> +        case MIG_RPCOMM_SHUT:
> +        case MIG_RPCOMM_ACK:
> +            expected_len = 4;
> +            break;
> +
> +        default:
> +            error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
> +                    header_com, header_len);
> +            source_return_path_bad(ms);
> +            goto out;
> +        }
> +
> +        if (header_len > expected_len) {
> +            error_report("RP: Received command 0x%04x with"
> +                    "incorrect length %d expecting %d",
> +                    header_com, header_len,
> +                    expected_len);
> +            source_return_path_bad(ms);
> +            goto out;
> +        }
> +
> +        /* We know we've got a valid header by this point */
> +        res = qemu_get_buffer(rp, buf, header_len);
> +        if (res != header_len) {
> +            DPRINTF("RP: Failed to read command data");
> +            source_return_path_bad(ms);
> +            goto out;
> +        }
> +
> +        /* OK, we have the command and the data */
> +        switch (header_com) {
> +        case MIG_RPCOMM_SHUT:
> +            tmp32 = be32_to_cpup((uint32_t *)buf);
> +            if (tmp32) {
> +                error_report("RP: Sibling indicated error %d", tmp32);
> +                source_return_path_bad(ms);
> +            } else {
> +                DPRINTF("RP: SHUT received");
> +            }
> +            /*
> +             * We'll let the main thread deal with closing the RP
> +             * we could do a shutdown(2) on it, but we're the only user
> +             * anyway, so there's nothing gained.
> +             */
> +            goto out;
> +
> +        case MIG_RPCOMM_ACK:
> +            tmp32 = be32_to_cpup((uint32_t *)buf);
> +            DPRINTF("RP: Received ACK 0x%x", tmp32);
> +            atomic_xchg(&ms->rp_state.latest_ack, tmp32);

I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;)

> +            break;
> +
> +        default:
> +            /* This shouldn't happen because we should catch this above */
> +            DPRINTF("RP: Bad header_com in dispatch");
> +        }
> +        /* Latest command processed, now leave a gap for the next one */
> +        header_com = MIG_RPCOMM_INVALID;
> +    }
> +    if (rp && qemu_file_get_error(rp)) {
> +        DPRINTF("%s: rp bad at end", __func__);
> +        source_return_path_bad(ms);
> +    }
> +
> +    DPRINTF("%s: Bottom exit", __func__);
> +
> +out:
> +    return NULL;
> +}
> +
> +__attribute__ (( unused )) /* Until later in patch series */
> +static int open_outgoing_return_path(MigrationState *ms)
> +{
> +
> +    ms->return_path = qemu_file_get_return_path(ms->file);
> +    if (!ms->return_path) {
> +        return -1;
> +    }
> +
> +    DPRINTF("%s: starting thread", __func__);
> +    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
> +                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
> +
> +    DPRINTF("%s: continuing", __func__);
> +
> +    return 0;
> +}
> +
> +__attribute__ (( unused )) /* Until later in patch series */
> +static void await_outgoing_return_path_close(MigrationState *ms)
> +{
> +    /*
> +     * If this is a normal exit then the destination will send a SHUT and the
> +     * rp_thread will exit, however if there's an error we need to cause
> +     * it to exit, which we can do by a shutdown.
> +     * (canceling must also shutdown to stop us getting stuck here if
> +     * the destination died at just the wrong place)
> +     */
> +    if (qemu_file_get_error(ms->file) && ms->return_path) {
> +        qemu_file_shutdown(ms->return_path);
> +    }
> +    DPRINTF("%s: Joining", __func__);
> +    qemu_thread_join(&ms->rp_state.rp_thread);
> +    DPRINTF("%s: Exit", __func__);
> +}
> +
> +/*
> + * Master migration thread on the source VM.
> + * It drives the migration and pumps the data down the outgoing channel.
> + */
>   static void *migration_thread(void *opaque)
>   {
>       MigrationState *s = opaque;
>
Dr. David Alan Gilbert Oct. 16, 2014, 8:35 a.m. UTC | #3
* zhanghailiang (zhang.zhanghailiang@huawei.com) wrote:

> >+        case MIG_RPCOMM_ACK:
> >+            tmp32 = be32_to_cpup((uint32_t *)buf);
> >+            DPRINTF("RP: Received ACK 0x%x", tmp32);
> >+            atomic_xchg(&ms->rp_state.latest_ack, tmp32);
> 
> I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;)

Nothing currently; I've used the REQ/ACK as debug at the moment;   I was thinking
that someone might want to wait on an ack being received before carrying on; but hadn't
actually needed it in postcopy.

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Zhanghailiang Oct. 16, 2014, 9:09 a.m. UTC | #4
On 2014/10/16 16:35, Dr. David Alan Gilbert wrote:
> * zhanghailiang (zhang.zhanghailiang@huawei.com) wrote:
>
>>> +        case MIG_RPCOMM_ACK:
>>> +            tmp32 = be32_to_cpup((uint32_t *)buf);
>>> +            DPRINTF("RP: Received ACK 0x%x", tmp32);
>>> +            atomic_xchg(&ms->rp_state.latest_ack, tmp32);
>>
>> I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;)
>
> Nothing currently; I've used the REQ/ACK as debug at the moment;   I was thinking
> that someone might want to wait on an ack being received before carrying on; but hadn't
> actually needed it in postcopy.
>

OK, i see, Thanks.

> Dave
>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
> .
>
Dr. David Alan Gilbert Oct. 23, 2014, 6 p.m. UTC | #5
* Paolo Bonzini (pbonzini@redhat.com) wrote:
> Il 03/10/2014 19:47, Dr. David Alan Gilbert (git) ha scritto:
> > +/* Source side RP state */
> > +struct MigrationRetPathState {
> > +    uint32_t      latest_ack;
> > +    QemuThread    rp_thread;
> > +    bool          error;
> 
> Should the QemuFile be in here?

Yes, done.

> > +};
> > +
> 
> Also please do not abbrev words, and add a typedef that matches the
> struct if it is useful.  If it is not, just embed the struct without
> giving the type a name (struct { } rp_state).

Done.

> 
> > +static bool migration_already_active(MigrationState *ms)
> > +{
> > +    switch (ms->state) {
> > +    case MIG_STATE_ACTIVE:
> > +    case MIG_STATE_SETUP:
> > +        return true;
> > +
> > +    default:
> > +        return false;
> > +
> > +    }
> > +}
> 
> Should CANCELLING also be considered active?  It is on the source->dest
> path.

Hmm, possibly - but my intention here was just to round up all of the
places that already checked for ACTIVE+SETUP so that I could add POSTCOPY_ACTIVE;
only one of those places also checked for CANCELLING, so I left it out.

> > +static void await_outgoing_return_path_close(MigrationState *ms)
> > +{
> > +    /*
> > +     * If this is a normal exit then the destination will send a SHUT and the
> > +     * rp_thread will exit, however if there's an error we need to cause
> > +     * it to exit, which we can do by a shutdown.
> > +     * (canceling must also shutdown to stop us getting stuck here if
> > +     * the destination died at just the wrong place)
> > +     */
> > +    if (qemu_file_get_error(ms->file) && ms->return_path) {
> > +        qemu_file_shutdown(ms->return_path);
> > +    }
> 
> As mentioned early, I think it's simpler to let these function handle
> themselves the case where there is no return path, and call them
> unconditionally.

I still need to think about that.

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Paolo Bonzini Oct. 24, 2014, 10:04 a.m. UTC | #6
On 10/23/2014 08:00 PM, Dr. David Alan Gilbert wrote:
>>> > > +static bool migration_already_active(MigrationState *ms)
>>> > > +{
>>> > > +    switch (ms->state) {
>>> > > +    case MIG_STATE_ACTIVE:
>>> > > +    case MIG_STATE_SETUP:
>>> > > +        return true;
>>> > > +
>>> > > +    default:
>>> > > +        return false;
>>> > > +
>>> > > +    }
>>> > > +}
>> > 
>> > Should CANCELLING also be considered active?  It is on the source->dest
>> > path.
> Hmm, possibly - but my intention here was just to round up all of the
> places that already checked for ACTIVE+SETUP so that I could add POSTCOPY_ACTIVE;
> only one of those places also checked for CANCELLING, so I left it out.

Ok, I would need to check the callers...  There may be bugs waiting to
be fixed. :)  For now I guess it's ok as is.

Thanks for answering my comments!

Paolo
David Gibson Nov. 3, 2014, 3:46 a.m. UTC | #7
On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote:
> From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
> 
> Open a return path, and handle messages that are received upon it.
> 
> Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

[snip]
> @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
>      int old_state ;
>      trace_migrate_fd_cancel();
>  
> +    if (s->return_path) {
> +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> +        qemu_file_shutdown(s->return_path);

Terminating the rp thread via shutting down its file seems roundabout,
and kind of dependent on the socket file implementation.

[snip]
> +__attribute__ (( unused )) /* Until later in patch series */
> +static int open_outgoing_return_path(MigrationState *ms)
> +{
> +
> +    ms->return_path = qemu_file_get_return_path(ms->file);

So, another reason this get_return_path abstraction doesn't seem right
to me, is that it's not obvious that for non-socket file types, the
source and destination side "get return path" operations would
necessarily be the same.
David Gibson Nov. 3, 2014, 3:47 a.m. UTC | #8
On Thu, Oct 16, 2014 at 04:26:55PM +0800, zhanghailiang wrote:
> On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote:
> >From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
[snip]

> >+        case MIG_RPCOMM_ACK:
> >+            tmp32 = be32_to_cpup((uint32_t *)buf);
> >+            DPRINTF("RP: Received ACK 0x%x", tmp32);
> >+            atomic_xchg(&ms->rp_state.latest_ack, tmp32);
> 
> I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;)

Also, you don't appear to use tmp32 after that point, so what's the
reason for the exchange, rather than just an assignment?
Dr. David Alan Gilbert Nov. 3, 2014, 1:22 p.m. UTC | #9
* David Gibson (david@gibson.dropbear.id.au) wrote:
> On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote:
> > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
> > 
> > Open a return path, and handle messages that are received upon it.
> > 
> > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> 
> [snip]
> > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
> >      int old_state ;
> >      trace_migrate_fd_cancel();
> >  
> > +    if (s->return_path) {
> > +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> > +        qemu_file_shutdown(s->return_path);
> 
> Terminating the rp thread via shutting down its file seems roundabout,
> and kind of dependent on the socket file implementation.

The rp thread might be in the middle of a blocking read()/recv()
so I'm doing a shutdown() to cause those to exit; once I have to do that
anyway it didn't seem necessary to add anything etra.

> [snip]
> > +__attribute__ (( unused )) /* Until later in patch series */
> > +static int open_outgoing_return_path(MigrationState *ms)
> > +{
> > +
> > +    ms->return_path = qemu_file_get_return_path(ms->file);
> 
> So, another reason this get_return_path abstraction doesn't seem right
> to me, is that it's not obvious that for non-socket file types, the
> source and destination side "get return path" operations would
> necessarily be the same.

However, since the implementation of the get_return_path is a method
on the particular implementation, and it can be different for a 
qemu_file opened for read or write, then that non-socket file type
could implement it how it likes including something like shutdown).

Dave

> -- 
> David Gibson			| I'll have my music baroque, and my code
> david AT gibson.dropbear.id.au	| minimalist, thank you.  NOT _the_ _other_
> 				| _way_ _around_!
> http://www.ozlabs.org/~dgibson


--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
David Gibson Nov. 18, 2014, 3:52 a.m. UTC | #10
On Mon, Nov 03, 2014 at 01:22:45PM +0000, Dr. David Alan Gilbert wrote:
> * David Gibson (david@gibson.dropbear.id.au) wrote:
> > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote:
> > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
> > > 
> > > Open a return path, and handle messages that are received upon it.
> > > 
> > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> > 
> > [snip]
> > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
> > >      int old_state ;
> > >      trace_migrate_fd_cancel();
> > >  
> > > +    if (s->return_path) {
> > > +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> > > +        qemu_file_shutdown(s->return_path);
> > 
> > Terminating the rp thread via shutting down its file seems roundabout,
> > and kind of dependent on the socket file implementation.
> 
> The rp thread might be in the middle of a blocking read()/recv()
> so I'm doing a shutdown() to cause those to exit; once I have to do that
> anyway it didn't seem necessary to add anything etra.

Hm.  I don't recall, does the rp thread need to do some cleanup at
this point?  Otherwise pthread_cancel() should kill a thread, even if
it's blocked at the moment.

> > [snip]
> > > +__attribute__ (( unused )) /* Until later in patch series */
> > > +static int open_outgoing_return_path(MigrationState *ms)
> > > +{
> > > +
> > > +    ms->return_path = qemu_file_get_return_path(ms->file);
> > 
> > So, another reason this get_return_path abstraction doesn't seem right
> > to me, is that it's not obvious that for non-socket file types, the
> > source and destination side "get return path" operations would
> > necessarily be the same.
> 
> However, since the implementation of the get_return_path is a method
> on the particular implementation, and it can be different for a 
> qemu_file opened for read or write, then that non-socket file type
> could implement it how it likes including something like shutdown).

So, I'm a little less bothered by this since I realised that QemuFile
is basically only used for migration streams, not for other file type
operations.  The fact that that makes QemuFile a really bad name is a
different matter.

The return path operation is quite specific to a migration stream, and
doesn't really belong with a "file" abstraction.

The case I've been considering where it's not easy to see how to
abstract this is that of a pipe - in that case it will be necessary to
open a second pipe from destination to source, which probably needs
some preliminary work when first opening the connection, and therefore
can't easily be encapsulated into a "get return path" callback.

The abstraction of the shutdown is another question again - I can't
think of any other file type which has an operation similar in effect
to shutdown(), so it seems really socket specific. Which is another
reason I'm not convinced telling the rp thread to die via its stream
is a good idea.
Dr. David Alan Gilbert Nov. 19, 2014, 5:06 p.m. UTC | #11
* David Gibson (david@gibson.dropbear.id.au) wrote:
> On Mon, Nov 03, 2014 at 01:22:45PM +0000, Dr. David Alan Gilbert wrote:
> > * David Gibson (david@gibson.dropbear.id.au) wrote:
> > > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote:
> > > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
> > > > 
> > > > Open a return path, and handle messages that are received upon it.
> > > > 
> > > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> > > 
> > > [snip]
> > > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
> > > >      int old_state ;
> > > >      trace_migrate_fd_cancel();
> > > >  
> > > > +    if (s->return_path) {
> > > > +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> > > > +        qemu_file_shutdown(s->return_path);
> > > 
> > > Terminating the rp thread via shutting down its file seems roundabout,
> > > and kind of dependent on the socket file implementation.
> > 
> > The rp thread might be in the middle of a blocking read()/recv()
> > so I'm doing a shutdown() to cause those to exit; once I have to do that
> > anyway it didn't seem necessary to add anything etra.
> 
> Hm.  I don't recall, does the rp thread need to do some cleanup at
> this point?  Otherwise pthread_cancel() should kill a thread, even if
> it's blocked at the moment.

It was Paolo's idea to use shutdown() and I agree - it works well;
I'd originally thought about using pthread_cancel but it seemed to be
generally disliked - you have to be very careful to either know exactly
the points at which it might be killed (if you use the deferred version)
or be prepared to deal with your thread disappearing at any time and
ensure your data structures are always consistent.  In addition there
was some concern that there was no Windows equivalent to pthread_cancel.

> > > [snip]
> > > > +__attribute__ (( unused )) /* Until later in patch series */
> > > > +static int open_outgoing_return_path(MigrationState *ms)
> > > > +{
> > > > +
> > > > +    ms->return_path = qemu_file_get_return_path(ms->file);
> > > 
> > > So, another reason this get_return_path abstraction doesn't seem right
> > > to me, is that it's not obvious that for non-socket file types, the
> > > source and destination side "get return path" operations would
> > > necessarily be the same.
> > 
> > However, since the implementation of the get_return_path is a method
> > on the particular implementation, and it can be different for a 
> > qemu_file opened for read or write, then that non-socket file type
> > could implement it how it likes including something like shutdown).
> 
> So, I'm a little less bothered by this since I realised that QemuFile
> is basically only used for migration streams, not for other file type
> operations.  The fact that that makes QemuFile a really bad name is a
> different matter.

Yes, but hey we've got FILE* in C anyway, so it might be bad, but it's
not inconsitent.

> The return path operation is quite specific to a migration stream, and
> doesn't really belong with a "file" abstraction.

I think the bit that's specific, is as you say that I don't know whether
I need it until later.

> The case I've been considering where it's not easy to see how to
> abstract this is that of a pipe - in that case it will be necessary to
> open a second pipe from destination to source, which probably needs
> some preliminary work when first opening the connection, and therefore
> can't easily be encapsulated into a "get return path" callback.

I'm OK with some transports not supporting this; I check for it and
error out.  At a higher level I do send an 'open_return_path' command
from src->dest early on to say I'm going to want a return path, I guess
a pipe might be able to open that fd then and pass it back over the original
fd? But that might be hairy.

> The abstraction of the shutdown is another question again - I can't
> think of any other file type which has an operation similar in effect
> to shutdown(), so it seems really socket specific. Which is another
> reason I'm not convinced telling the rp thread to die via its stream
> is a good idea.

I'd be OK with setting some flag or similar at the same time if that
would help; but I don't think there's a safe posix'y way of killing a
thread that might be stuck in a recv()/read() other than shutdown().

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
David Gibson Nov. 19, 2014, 9:12 p.m. UTC | #12
On Wed, Nov 19, 2014 at 05:06:50PM +0000, Dr. David Alan Gilbert wrote:
> * David Gibson (david@gibson.dropbear.id.au) wrote:
> > On Mon, Nov 03, 2014 at 01:22:45PM +0000, Dr. David Alan Gilbert wrote:
> > > * David Gibson (david@gibson.dropbear.id.au) wrote:
> > > > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote:
> > > > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
> > > > > 
> > > > > Open a return path, and handle messages that are received upon it.
> > > > > 
> > > > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> > > > 
> > > > [snip]
> > > > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
> > > > >      int old_state ;
> > > > >      trace_migrate_fd_cancel();
> > > > >  
> > > > > +    if (s->return_path) {
> > > > > +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> > > > > +        qemu_file_shutdown(s->return_path);
> > > > 
> > > > Terminating the rp thread via shutting down its file seems roundabout,
> > > > and kind of dependent on the socket file implementation.
> > > 
> > > The rp thread might be in the middle of a blocking read()/recv()
> > > so I'm doing a shutdown() to cause those to exit; once I have to do that
> > > anyway it didn't seem necessary to add anything etra.
> > 
> > Hm.  I don't recall, does the rp thread need to do some cleanup at
> > this point?  Otherwise pthread_cancel() should kill a thread, even if
> > it's blocked at the moment.
> 
> It was Paolo's idea to use shutdown() and I agree - it works well;
> I'd originally thought about using pthread_cancel but it seemed to be
> generally disliked - you have to be very careful to either know exactly
> the points at which it might be killed (if you use the deferred version)
> or be prepared to deal with your thread disappearing at any time and
> ensure your data structures are always consistent.  In addition there
> was some concern that there was no Windows equivalent to pthread_cancel.

Hmm, yeah all right.

> > > > [snip]
> > > > > +__attribute__ (( unused )) /* Until later in patch series */
> > > > > +static int open_outgoing_return_path(MigrationState *ms)
> > > > > +{
> > > > > +
> > > > > +    ms->return_path = qemu_file_get_return_path(ms->file);
> > > > 
> > > > So, another reason this get_return_path abstraction doesn't seem right
> > > > to me, is that it's not obvious that for non-socket file types, the
> > > > source and destination side "get return path" operations would
> > > > necessarily be the same.
> > > 
> > > However, since the implementation of the get_return_path is a method
> > > on the particular implementation, and it can be different for a 
> > > qemu_file opened for read or write, then that non-socket file type
> > > could implement it how it likes including something like shutdown).
> > 
> > So, I'm a little less bothered by this since I realised that QemuFile
> > is basically only used for migration streams, not for other file type
> > operations.  The fact that that makes QemuFile a really bad name is a
> > different matter.
> 
> Yes, but hey we've got FILE* in C anyway, so it might be bad, but it's
> not inconsitent.

Uh.. not following the analogy here, sorry.

> > The return path operation is quite specific to a migration stream, and
> > doesn't really belong with a "file" abstraction.
> 
> I think the bit that's specific, is as you say that I don't know whether
> I need it until later.
> 
> > The case I've been considering where it's not easy to see how to
> > abstract this is that of a pipe - in that case it will be necessary to
> > open a second pipe from destination to source, which probably needs
> > some preliminary work when first opening the connection, and therefore
> > can't easily be encapsulated into a "get return path" callback.
> 
> I'm OK with some transports not supporting this; I check for it and
> error out.  At a higher level I do send an 'open_return_path' command
> from src->dest early on to say I'm going to want a return path, I guess
> a pipe might be able to open that fd then and pass it back over the original
> fd? But that might be hairy.

You can pass fds over Unix sockets, but not over pipes AFAIK.

> > The abstraction of the shutdown is another question again - I can't
> > think of any other file type which has an operation similar in effect
> > to shutdown(), so it seems really socket specific. Which is another
> > reason I'm not convinced telling the rp thread to die via its stream
> > is a good idea.
> 
> I'd be OK with setting some flag or similar at the same time if that
> would help; but I don't think there's a safe posix'y way of killing a
> thread that might be stuck in a recv()/read() other than shutdown().
> 
> Dave
>
Dr. David Alan Gilbert Nov. 25, 2014, 3:44 p.m. UTC | #13
* David Gibson (david@gibson.dropbear.id.au) wrote:
> On Thu, Oct 16, 2014 at 04:26:55PM +0800, zhanghailiang wrote:
> > On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote:
> > >From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
> [snip]
> 
> > >+        case MIG_RPCOMM_ACK:
> > >+            tmp32 = be32_to_cpup((uint32_t *)buf);
> > >+            DPRINTF("RP: Received ACK 0x%x", tmp32);
> > >+            atomic_xchg(&ms->rp_state.latest_ack, tmp32);
> > 
> > I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;)
> 
> Also, you don't appear to use tmp32 after that point, so what's the
> reason for the exchange, rather than just an assignment?

I've killed the 'latest_ack' off; I've kept the DPRINTF (and might turn it into
a trace).

Dave

> 
> -- 
> David Gibson			| I'll have my music baroque, and my code
> david AT gibson.dropbear.id.au	| minimalist, thank you.  NOT _the_ _other_
> 				| _way_ _around_!
> http://www.ozlabs.org/~dgibson


--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 12e640d..b87c289 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -47,6 +47,14 @@  enum mig_rpcomm_cmd {
     MIG_RPCOMM_ACK,          /* data (seq: be32 ) */
     MIG_RPCOMM_AFTERLASTVALID
 };
+
+/* Source side RP state */
+struct MigrationRetPathState {
+    uint32_t      latest_ack;
+    QemuThread    rp_thread;
+    bool          error;
+};
+
 typedef struct MigrationState MigrationState;
 
 /* State for the incoming migration */
@@ -69,9 +77,11 @@  struct MigrationState
     QemuThread thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
+    QEMUFile *return_path;
 
     int state;
     MigrationParams params;
+    struct MigrationRetPathState rp_state;
     double mbps;
     int64_t total_time;
     int64_t downtime;
diff --git a/migration.c b/migration.c
index 5ba8f3e..ee6db1d 100644
--- a/migration.c
+++ b/migration.c
@@ -246,6 +246,23 @@  MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
     return head;
 }
 
+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+    switch (ms->state) {
+    case MIG_STATE_ACTIVE:
+    case MIG_STATE_SETUP:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
 static void get_xbzrle_cache_stats(MigrationInfo *info)
 {
     if (migrate_use_xbzrle()) {
@@ -371,6 +388,21 @@  static void migrate_set_state(MigrationState *s, int old_state, int new_state)
     }
 }
 
+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+    QEMUFile *rp = ms->return_path;
+
+    /*
+     * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+     * cleaned up from a few threads; make sure not to do it twice in parallel
+     */
+    rp = atomic_cmpxchg(&ms->return_path, rp, NULL);
+    if (rp) {
+        DPRINTF("cleaning up return path\n");
+        qemu_fclose(rp);
+    }
+}
+
 static void migrate_fd_cleanup(void *opaque)
 {
     MigrationState *s = opaque;
@@ -378,6 +410,8 @@  static void migrate_fd_cleanup(void *opaque)
     qemu_bh_delete(s->cleanup_bh);
     s->cleanup_bh = NULL;
 
+    migrate_fd_cleanup_src_rp(s);
+
     if (s->file) {
         trace_migrate_fd_cleanup();
         qemu_mutex_unlock_iothread();
@@ -414,6 +448,11 @@  static void migrate_fd_cancel(MigrationState *s)
     int old_state ;
     trace_migrate_fd_cancel();
 
+    if (s->return_path) {
+        /* shutdown the rp socket, so causing the rp thread to shutdown */
+        qemu_file_shutdown(s->return_path);
+    }
+
     do {
         old_state = s->state;
         if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) {
@@ -655,8 +694,148 @@  int64_t migrate_xbzrle_cache_size(void)
     return s->xbzrle_cache_size;
 }
 
-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+    s->rp_state.error = true;
+    migrate_fd_cleanup_src_rp(s);
+}
 
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+    MigrationState *ms = opaque;
+    QEMUFile *rp = ms->return_path;
+    uint16_t expected_len, header_len, header_com;
+    const int max_len = 512;
+    uint8_t buf[max_len];
+    uint32_t tmp32;
+    int res;
+
+    DPRINTF("RP: %s entry", __func__);
+    while (rp && !qemu_file_get_error(rp) &&
+        migration_already_active(ms)) {
+        DPRINTF("RP: %s top of loop", __func__);
+        header_com = qemu_get_be16(rp);
+        header_len = qemu_get_be16(rp);
+
+        switch (header_com) {
+        case MIG_RPCOMM_SHUT:
+        case MIG_RPCOMM_ACK:
+            expected_len = 4;
+            break;
+
+        default:
+            error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
+                    header_com, header_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        if (header_len > expected_len) {
+            error_report("RP: Received command 0x%04x with"
+                    "incorrect length %d expecting %d",
+                    header_com, header_len,
+                    expected_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* We know we've got a valid header by this point */
+        res = qemu_get_buffer(rp, buf, header_len);
+        if (res != header_len) {
+            DPRINTF("RP: Failed to read command data");
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* OK, we have the command and the data */
+        switch (header_com) {
+        case MIG_RPCOMM_SHUT:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            if (tmp32) {
+                error_report("RP: Sibling indicated error %d", tmp32);
+                source_return_path_bad(ms);
+            } else {
+                DPRINTF("RP: SHUT received");
+            }
+            /*
+             * We'll let the main thread deal with closing the RP
+             * we could do a shutdown(2) on it, but we're the only user
+             * anyway, so there's nothing gained.
+             */
+            goto out;
+
+        case MIG_RPCOMM_ACK:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            DPRINTF("RP: Received ACK 0x%x", tmp32);
+            atomic_xchg(&ms->rp_state.latest_ack, tmp32);
+            break;
+
+        default:
+            /* This shouldn't happen because we should catch this above */
+            DPRINTF("RP: Bad header_com in dispatch");
+        }
+        /* Latest command processed, now leave a gap for the next one */
+        header_com = MIG_RPCOMM_INVALID;
+    }
+    if (rp && qemu_file_get_error(rp)) {
+        DPRINTF("%s: rp bad at end", __func__);
+        source_return_path_bad(ms);
+    }
+
+    DPRINTF("%s: Bottom exit", __func__);
+
+out:
+    return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_outgoing_return_path(MigrationState *ms)
+{
+
+    ms->return_path = qemu_file_get_return_path(ms->file);
+    if (!ms->return_path) {
+        return -1;
+    }
+
+    DPRINTF("%s: starting thread", __func__);
+    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
+
+    DPRINTF("%s: continuing", __func__);
+
+    return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static void await_outgoing_return_path_close(MigrationState *ms)
+{
+    /*
+     * If this is a normal exit then the destination will send a SHUT and the
+     * rp_thread will exit, however if there's an error we need to cause
+     * it to exit, which we can do by a shutdown.
+     * (canceling must also shutdown to stop us getting stuck here if
+     * the destination died at just the wrong place)
+     */
+    if (qemu_file_get_error(ms->file) && ms->return_path) {
+        qemu_file_shutdown(ms->return_path);
+    }
+    DPRINTF("%s: Joining", __func__);
+    qemu_thread_join(&ms->rp_state.rp_thread);
+    DPRINTF("%s: Exit", __func__);
+}
+
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
 static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;