Message ID | 1412358473-31398-17-git-send-email-dgilbert@redhat.com |
---|---|
State | New |
Headers | show |
Il 03/10/2014 19:47, Dr. David Alan Gilbert (git) ha scritto: > +/* Source side RP state */ > +struct MigrationRetPathState { > + uint32_t latest_ack; > + QemuThread rp_thread; > + bool error; Should the QemuFile be in here? > +}; > + Also please do not abbrev words, and add a typedef that matches the struct if it is useful. If it is not, just embed the struct without giving the type a name (struct { } rp_state). > +static bool migration_already_active(MigrationState *ms) > +{ > + switch (ms->state) { > + case MIG_STATE_ACTIVE: > + case MIG_STATE_SETUP: > + return true; > + > + default: > + return false; > + > + } > +} Should CANCELLING also be considered active? It is on the source->dest path. > > +static void await_outgoing_return_path_close(MigrationState *ms) > +{ > + /* > + * If this is a normal exit then the destination will send a SHUT and the > + * rp_thread will exit, however if there's an error we need to cause > + * it to exit, which we can do by a shutdown. > + * (canceling must also shutdown to stop us getting stuck here if > + * the destination died at just the wrong place) > + */ > + if (qemu_file_get_error(ms->file) && ms->return_path) { > + qemu_file_shutdown(ms->return_path); > + } As mentioned early, I think it's simpler to let these function handle themselves the case where there is no return path, and call them unconditionally. Paolo
On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote: > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > > Open a return path, and handle messages that are received upon it. > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> > --- > include/migration/migration.h | 10 +++ > migration.c | 181 +++++++++++++++++++++++++++++++++++++++++- > 2 files changed, 190 insertions(+), 1 deletion(-) > > diff --git a/include/migration/migration.h b/include/migration/migration.h > index 12e640d..b87c289 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -47,6 +47,14 @@ enum mig_rpcomm_cmd { > MIG_RPCOMM_ACK, /* data (seq: be32 ) */ > MIG_RPCOMM_AFTERLASTVALID > }; > + > +/* Source side RP state */ > +struct MigrationRetPathState { > + uint32_t latest_ack; > + QemuThread rp_thread; > + bool error; > +}; > + > typedef struct MigrationState MigrationState; > > /* State for the incoming migration */ > @@ -69,9 +77,11 @@ struct MigrationState > QemuThread thread; > QEMUBH *cleanup_bh; > QEMUFile *file; > + QEMUFile *return_path; > > int state; > MigrationParams params; > + struct MigrationRetPathState rp_state; > double mbps; > int64_t total_time; > int64_t downtime; > diff --git a/migration.c b/migration.c > index 5ba8f3e..ee6db1d 100644 > --- a/migration.c > +++ b/migration.c > @@ -246,6 +246,23 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) > return head; > } > > +/* > + * Return true if we're already in the middle of a migration > + * (i.e. any of the active or setup states) > + */ > +static bool migration_already_active(MigrationState *ms) > +{ > + switch (ms->state) { > + case MIG_STATE_ACTIVE: > + case MIG_STATE_SETUP: > + return true; > + > + default: > + return false; > + > + } > +} > + > static void get_xbzrle_cache_stats(MigrationInfo *info) > { > if (migrate_use_xbzrle()) { > @@ -371,6 +388,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) > } > } > > +static void migrate_fd_cleanup_src_rp(MigrationState *ms) > +{ > + QEMUFile *rp = ms->return_path; > + > + /* > + * When stuff goes wrong (e.g. failing destination) on the rp, it can get > + * cleaned up from a few threads; make sure not to do it twice in parallel > + */ > + rp = atomic_cmpxchg(&ms->return_path, rp, NULL); > + if (rp) { > + DPRINTF("cleaning up return path\n"); > + qemu_fclose(rp); > + } > +} > + > static void migrate_fd_cleanup(void *opaque) > { > MigrationState *s = opaque; > @@ -378,6 +410,8 @@ static void migrate_fd_cleanup(void *opaque) > qemu_bh_delete(s->cleanup_bh); > s->cleanup_bh = NULL; > > + migrate_fd_cleanup_src_rp(s); > + > if (s->file) { > trace_migrate_fd_cleanup(); > qemu_mutex_unlock_iothread(); > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > int old_state ; > trace_migrate_fd_cancel(); > > + if (s->return_path) { > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > + qemu_file_shutdown(s->return_path); > + } > + > do { > old_state = s->state; > if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) { > @@ -655,8 +694,148 @@ int64_t migrate_xbzrle_cache_size(void) > return s->xbzrle_cache_size; > } > > -/* migration thread support */ > +/* > + * Something bad happened to the RP stream, mark an error > + * The caller shall print something to indicate why > + */ > +static void source_return_path_bad(MigrationState *s) > +{ > + s->rp_state.error = true; > + migrate_fd_cleanup_src_rp(s); > +} > > +/* > + * Handles messages sent on the return path towards the source VM > + * > + */ > +static void *source_return_path_thread(void *opaque) > +{ > + MigrationState *ms = opaque; > + QEMUFile *rp = ms->return_path; > + uint16_t expected_len, header_len, header_com; > + const int max_len = 512; > + uint8_t buf[max_len]; > + uint32_t tmp32; > + int res; > + > + DPRINTF("RP: %s entry", __func__); > + while (rp && !qemu_file_get_error(rp) && > + migration_already_active(ms)) { > + DPRINTF("RP: %s top of loop", __func__); > + header_com = qemu_get_be16(rp); > + header_len = qemu_get_be16(rp); > + > + switch (header_com) { > + case MIG_RPCOMM_SHUT: > + case MIG_RPCOMM_ACK: > + expected_len = 4; > + break; > + > + default: > + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", > + header_com, header_len); > + source_return_path_bad(ms); > + goto out; > + } > + > + if (header_len > expected_len) { > + error_report("RP: Received command 0x%04x with" > + "incorrect length %d expecting %d", > + header_com, header_len, > + expected_len); > + source_return_path_bad(ms); > + goto out; > + } > + > + /* We know we've got a valid header by this point */ > + res = qemu_get_buffer(rp, buf, header_len); > + if (res != header_len) { > + DPRINTF("RP: Failed to read command data"); > + source_return_path_bad(ms); > + goto out; > + } > + > + /* OK, we have the command and the data */ > + switch (header_com) { > + case MIG_RPCOMM_SHUT: > + tmp32 = be32_to_cpup((uint32_t *)buf); > + if (tmp32) { > + error_report("RP: Sibling indicated error %d", tmp32); > + source_return_path_bad(ms); > + } else { > + DPRINTF("RP: SHUT received"); > + } > + /* > + * We'll let the main thread deal with closing the RP > + * we could do a shutdown(2) on it, but we're the only user > + * anyway, so there's nothing gained. > + */ > + goto out; > + > + case MIG_RPCOMM_ACK: > + tmp32 = be32_to_cpup((uint32_t *)buf); > + DPRINTF("RP: Received ACK 0x%x", tmp32); > + atomic_xchg(&ms->rp_state.latest_ack, tmp32); I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;) > + break; > + > + default: > + /* This shouldn't happen because we should catch this above */ > + DPRINTF("RP: Bad header_com in dispatch"); > + } > + /* Latest command processed, now leave a gap for the next one */ > + header_com = MIG_RPCOMM_INVALID; > + } > + if (rp && qemu_file_get_error(rp)) { > + DPRINTF("%s: rp bad at end", __func__); > + source_return_path_bad(ms); > + } > + > + DPRINTF("%s: Bottom exit", __func__); > + > +out: > + return NULL; > +} > + > +__attribute__ (( unused )) /* Until later in patch series */ > +static int open_outgoing_return_path(MigrationState *ms) > +{ > + > + ms->return_path = qemu_file_get_return_path(ms->file); > + if (!ms->return_path) { > + return -1; > + } > + > + DPRINTF("%s: starting thread", __func__); > + qemu_thread_create(&ms->rp_state.rp_thread, "return path", > + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); > + > + DPRINTF("%s: continuing", __func__); > + > + return 0; > +} > + > +__attribute__ (( unused )) /* Until later in patch series */ > +static void await_outgoing_return_path_close(MigrationState *ms) > +{ > + /* > + * If this is a normal exit then the destination will send a SHUT and the > + * rp_thread will exit, however if there's an error we need to cause > + * it to exit, which we can do by a shutdown. > + * (canceling must also shutdown to stop us getting stuck here if > + * the destination died at just the wrong place) > + */ > + if (qemu_file_get_error(ms->file) && ms->return_path) { > + qemu_file_shutdown(ms->return_path); > + } > + DPRINTF("%s: Joining", __func__); > + qemu_thread_join(&ms->rp_state.rp_thread); > + DPRINTF("%s: Exit", __func__); > +} > + > +/* > + * Master migration thread on the source VM. > + * It drives the migration and pumps the data down the outgoing channel. > + */ > static void *migration_thread(void *opaque) > { > MigrationState *s = opaque; >
* zhanghailiang (zhang.zhanghailiang@huawei.com) wrote: > >+ case MIG_RPCOMM_ACK: > >+ tmp32 = be32_to_cpup((uint32_t *)buf); > >+ DPRINTF("RP: Received ACK 0x%x", tmp32); > >+ atomic_xchg(&ms->rp_state.latest_ack, tmp32); > > I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;) Nothing currently; I've used the REQ/ACK as debug at the moment; I was thinking that someone might want to wait on an ack being received before carrying on; but hadn't actually needed it in postcopy. Dave -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 2014/10/16 16:35, Dr. David Alan Gilbert wrote: > * zhanghailiang (zhang.zhanghailiang@huawei.com) wrote: > >>> + case MIG_RPCOMM_ACK: >>> + tmp32 = be32_to_cpup((uint32_t *)buf); >>> + DPRINTF("RP: Received ACK 0x%x", tmp32); >>> + atomic_xchg(&ms->rp_state.latest_ack, tmp32); >> >> I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;) > > Nothing currently; I've used the REQ/ACK as debug at the moment; I was thinking > that someone might want to wait on an ack being received before carrying on; but hadn't > actually needed it in postcopy. > OK, i see, Thanks. > Dave > > -- > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK > > . >
* Paolo Bonzini (pbonzini@redhat.com) wrote: > Il 03/10/2014 19:47, Dr. David Alan Gilbert (git) ha scritto: > > +/* Source side RP state */ > > +struct MigrationRetPathState { > > + uint32_t latest_ack; > > + QemuThread rp_thread; > > + bool error; > > Should the QemuFile be in here? Yes, done. > > +}; > > + > > Also please do not abbrev words, and add a typedef that matches the > struct if it is useful. If it is not, just embed the struct without > giving the type a name (struct { } rp_state). Done. > > > +static bool migration_already_active(MigrationState *ms) > > +{ > > + switch (ms->state) { > > + case MIG_STATE_ACTIVE: > > + case MIG_STATE_SETUP: > > + return true; > > + > > + default: > > + return false; > > + > > + } > > +} > > Should CANCELLING also be considered active? It is on the source->dest > path. Hmm, possibly - but my intention here was just to round up all of the places that already checked for ACTIVE+SETUP so that I could add POSTCOPY_ACTIVE; only one of those places also checked for CANCELLING, so I left it out. > > +static void await_outgoing_return_path_close(MigrationState *ms) > > +{ > > + /* > > + * If this is a normal exit then the destination will send a SHUT and the > > + * rp_thread will exit, however if there's an error we need to cause > > + * it to exit, which we can do by a shutdown. > > + * (canceling must also shutdown to stop us getting stuck here if > > + * the destination died at just the wrong place) > > + */ > > + if (qemu_file_get_error(ms->file) && ms->return_path) { > > + qemu_file_shutdown(ms->return_path); > > + } > > As mentioned early, I think it's simpler to let these function handle > themselves the case where there is no return path, and call them > unconditionally. I still need to think about that. Dave -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 10/23/2014 08:00 PM, Dr. David Alan Gilbert wrote: >>> > > +static bool migration_already_active(MigrationState *ms) >>> > > +{ >>> > > + switch (ms->state) { >>> > > + case MIG_STATE_ACTIVE: >>> > > + case MIG_STATE_SETUP: >>> > > + return true; >>> > > + >>> > > + default: >>> > > + return false; >>> > > + >>> > > + } >>> > > +} >> > >> > Should CANCELLING also be considered active? It is on the source->dest >> > path. > Hmm, possibly - but my intention here was just to round up all of the > places that already checked for ACTIVE+SETUP so that I could add POSTCOPY_ACTIVE; > only one of those places also checked for CANCELLING, so I left it out. Ok, I would need to check the callers... There may be bugs waiting to be fixed. :) For now I guess it's ok as is. Thanks for answering my comments! Paolo
On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote: > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > > Open a return path, and handle messages that are received upon it. > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> [snip] > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > int old_state ; > trace_migrate_fd_cancel(); > > + if (s->return_path) { > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > + qemu_file_shutdown(s->return_path); Terminating the rp thread via shutting down its file seems roundabout, and kind of dependent on the socket file implementation. [snip] > +__attribute__ (( unused )) /* Until later in patch series */ > +static int open_outgoing_return_path(MigrationState *ms) > +{ > + > + ms->return_path = qemu_file_get_return_path(ms->file); So, another reason this get_return_path abstraction doesn't seem right to me, is that it's not obvious that for non-socket file types, the source and destination side "get return path" operations would necessarily be the same.
On Thu, Oct 16, 2014 at 04:26:55PM +0800, zhanghailiang wrote: > On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote: > >From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> [snip] > >+ case MIG_RPCOMM_ACK: > >+ tmp32 = be32_to_cpup((uint32_t *)buf); > >+ DPRINTF("RP: Received ACK 0x%x", tmp32); > >+ atomic_xchg(&ms->rp_state.latest_ack, tmp32); > > I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;) Also, you don't appear to use tmp32 after that point, so what's the reason for the exchange, rather than just an assignment?
* David Gibson (david@gibson.dropbear.id.au) wrote: > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote: > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > > > > Open a return path, and handle messages that are received upon it. > > > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> > > [snip] > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > > int old_state ; > > trace_migrate_fd_cancel(); > > > > + if (s->return_path) { > > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > > + qemu_file_shutdown(s->return_path); > > Terminating the rp thread via shutting down its file seems roundabout, > and kind of dependent on the socket file implementation. The rp thread might be in the middle of a blocking read()/recv() so I'm doing a shutdown() to cause those to exit; once I have to do that anyway it didn't seem necessary to add anything etra. > [snip] > > +__attribute__ (( unused )) /* Until later in patch series */ > > +static int open_outgoing_return_path(MigrationState *ms) > > +{ > > + > > + ms->return_path = qemu_file_get_return_path(ms->file); > > So, another reason this get_return_path abstraction doesn't seem right > to me, is that it's not obvious that for non-socket file types, the > source and destination side "get return path" operations would > necessarily be the same. However, since the implementation of the get_return_path is a method on the particular implementation, and it can be different for a qemu_file opened for read or write, then that non-socket file type could implement it how it likes including something like shutdown). Dave > -- > David Gibson | I'll have my music baroque, and my code > david AT gibson.dropbear.id.au | minimalist, thank you. NOT _the_ _other_ > | _way_ _around_! > http://www.ozlabs.org/~dgibson -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On Mon, Nov 03, 2014 at 01:22:45PM +0000, Dr. David Alan Gilbert wrote: > * David Gibson (david@gibson.dropbear.id.au) wrote: > > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote: > > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > > > > > > Open a return path, and handle messages that are received upon it. > > > > > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> > > > > [snip] > > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > > > int old_state ; > > > trace_migrate_fd_cancel(); > > > > > > + if (s->return_path) { > > > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > > > + qemu_file_shutdown(s->return_path); > > > > Terminating the rp thread via shutting down its file seems roundabout, > > and kind of dependent on the socket file implementation. > > The rp thread might be in the middle of a blocking read()/recv() > so I'm doing a shutdown() to cause those to exit; once I have to do that > anyway it didn't seem necessary to add anything etra. Hm. I don't recall, does the rp thread need to do some cleanup at this point? Otherwise pthread_cancel() should kill a thread, even if it's blocked at the moment. > > [snip] > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > +static int open_outgoing_return_path(MigrationState *ms) > > > +{ > > > + > > > + ms->return_path = qemu_file_get_return_path(ms->file); > > > > So, another reason this get_return_path abstraction doesn't seem right > > to me, is that it's not obvious that for non-socket file types, the > > source and destination side "get return path" operations would > > necessarily be the same. > > However, since the implementation of the get_return_path is a method > on the particular implementation, and it can be different for a > qemu_file opened for read or write, then that non-socket file type > could implement it how it likes including something like shutdown). So, I'm a little less bothered by this since I realised that QemuFile is basically only used for migration streams, not for other file type operations. The fact that that makes QemuFile a really bad name is a different matter. The return path operation is quite specific to a migration stream, and doesn't really belong with a "file" abstraction. The case I've been considering where it's not easy to see how to abstract this is that of a pipe - in that case it will be necessary to open a second pipe from destination to source, which probably needs some preliminary work when first opening the connection, and therefore can't easily be encapsulated into a "get return path" callback. The abstraction of the shutdown is another question again - I can't think of any other file type which has an operation similar in effect to shutdown(), so it seems really socket specific. Which is another reason I'm not convinced telling the rp thread to die via its stream is a good idea.
* David Gibson (david@gibson.dropbear.id.au) wrote: > On Mon, Nov 03, 2014 at 01:22:45PM +0000, Dr. David Alan Gilbert wrote: > > * David Gibson (david@gibson.dropbear.id.au) wrote: > > > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote: > > > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > > > > > > > > Open a return path, and handle messages that are received upon it. > > > > > > > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> > > > > > > [snip] > > > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > > > > int old_state ; > > > > trace_migrate_fd_cancel(); > > > > > > > > + if (s->return_path) { > > > > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > > > > + qemu_file_shutdown(s->return_path); > > > > > > Terminating the rp thread via shutting down its file seems roundabout, > > > and kind of dependent on the socket file implementation. > > > > The rp thread might be in the middle of a blocking read()/recv() > > so I'm doing a shutdown() to cause those to exit; once I have to do that > > anyway it didn't seem necessary to add anything etra. > > Hm. I don't recall, does the rp thread need to do some cleanup at > this point? Otherwise pthread_cancel() should kill a thread, even if > it's blocked at the moment. It was Paolo's idea to use shutdown() and I agree - it works well; I'd originally thought about using pthread_cancel but it seemed to be generally disliked - you have to be very careful to either know exactly the points at which it might be killed (if you use the deferred version) or be prepared to deal with your thread disappearing at any time and ensure your data structures are always consistent. In addition there was some concern that there was no Windows equivalent to pthread_cancel. > > > [snip] > > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > > +static int open_outgoing_return_path(MigrationState *ms) > > > > +{ > > > > + > > > > + ms->return_path = qemu_file_get_return_path(ms->file); > > > > > > So, another reason this get_return_path abstraction doesn't seem right > > > to me, is that it's not obvious that for non-socket file types, the > > > source and destination side "get return path" operations would > > > necessarily be the same. > > > > However, since the implementation of the get_return_path is a method > > on the particular implementation, and it can be different for a > > qemu_file opened for read or write, then that non-socket file type > > could implement it how it likes including something like shutdown). > > So, I'm a little less bothered by this since I realised that QemuFile > is basically only used for migration streams, not for other file type > operations. The fact that that makes QemuFile a really bad name is a > different matter. Yes, but hey we've got FILE* in C anyway, so it might be bad, but it's not inconsitent. > The return path operation is quite specific to a migration stream, and > doesn't really belong with a "file" abstraction. I think the bit that's specific, is as you say that I don't know whether I need it until later. > The case I've been considering where it's not easy to see how to > abstract this is that of a pipe - in that case it will be necessary to > open a second pipe from destination to source, which probably needs > some preliminary work when first opening the connection, and therefore > can't easily be encapsulated into a "get return path" callback. I'm OK with some transports not supporting this; I check for it and error out. At a higher level I do send an 'open_return_path' command from src->dest early on to say I'm going to want a return path, I guess a pipe might be able to open that fd then and pass it back over the original fd? But that might be hairy. > The abstraction of the shutdown is another question again - I can't > think of any other file type which has an operation similar in effect > to shutdown(), so it seems really socket specific. Which is another > reason I'm not convinced telling the rp thread to die via its stream > is a good idea. I'd be OK with setting some flag or similar at the same time if that would help; but I don't think there's a safe posix'y way of killing a thread that might be stuck in a recv()/read() other than shutdown(). Dave -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On Wed, Nov 19, 2014 at 05:06:50PM +0000, Dr. David Alan Gilbert wrote: > * David Gibson (david@gibson.dropbear.id.au) wrote: > > On Mon, Nov 03, 2014 at 01:22:45PM +0000, Dr. David Alan Gilbert wrote: > > > * David Gibson (david@gibson.dropbear.id.au) wrote: > > > > On Fri, Oct 03, 2014 at 06:47:22PM +0100, Dr. David Alan Gilbert (git) wrote: > > > > > From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > > > > > > > > > > Open a return path, and handle messages that are received upon it. > > > > > > > > > > Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> > > > > > > > > [snip] > > > > > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > > > > > int old_state ; > > > > > trace_migrate_fd_cancel(); > > > > > > > > > > + if (s->return_path) { > > > > > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > > > > > + qemu_file_shutdown(s->return_path); > > > > > > > > Terminating the rp thread via shutting down its file seems roundabout, > > > > and kind of dependent on the socket file implementation. > > > > > > The rp thread might be in the middle of a blocking read()/recv() > > > so I'm doing a shutdown() to cause those to exit; once I have to do that > > > anyway it didn't seem necessary to add anything etra. > > > > Hm. I don't recall, does the rp thread need to do some cleanup at > > this point? Otherwise pthread_cancel() should kill a thread, even if > > it's blocked at the moment. > > It was Paolo's idea to use shutdown() and I agree - it works well; > I'd originally thought about using pthread_cancel but it seemed to be > generally disliked - you have to be very careful to either know exactly > the points at which it might be killed (if you use the deferred version) > or be prepared to deal with your thread disappearing at any time and > ensure your data structures are always consistent. In addition there > was some concern that there was no Windows equivalent to pthread_cancel. Hmm, yeah all right. > > > > [snip] > > > > > +__attribute__ (( unused )) /* Until later in patch series */ > > > > > +static int open_outgoing_return_path(MigrationState *ms) > > > > > +{ > > > > > + > > > > > + ms->return_path = qemu_file_get_return_path(ms->file); > > > > > > > > So, another reason this get_return_path abstraction doesn't seem right > > > > to me, is that it's not obvious that for non-socket file types, the > > > > source and destination side "get return path" operations would > > > > necessarily be the same. > > > > > > However, since the implementation of the get_return_path is a method > > > on the particular implementation, and it can be different for a > > > qemu_file opened for read or write, then that non-socket file type > > > could implement it how it likes including something like shutdown). > > > > So, I'm a little less bothered by this since I realised that QemuFile > > is basically only used for migration streams, not for other file type > > operations. The fact that that makes QemuFile a really bad name is a > > different matter. > > Yes, but hey we've got FILE* in C anyway, so it might be bad, but it's > not inconsitent. Uh.. not following the analogy here, sorry. > > The return path operation is quite specific to a migration stream, and > > doesn't really belong with a "file" abstraction. > > I think the bit that's specific, is as you say that I don't know whether > I need it until later. > > > The case I've been considering where it's not easy to see how to > > abstract this is that of a pipe - in that case it will be necessary to > > open a second pipe from destination to source, which probably needs > > some preliminary work when first opening the connection, and therefore > > can't easily be encapsulated into a "get return path" callback. > > I'm OK with some transports not supporting this; I check for it and > error out. At a higher level I do send an 'open_return_path' command > from src->dest early on to say I'm going to want a return path, I guess > a pipe might be able to open that fd then and pass it back over the original > fd? But that might be hairy. You can pass fds over Unix sockets, but not over pipes AFAIK. > > The abstraction of the shutdown is another question again - I can't > > think of any other file type which has an operation similar in effect > > to shutdown(), so it seems really socket specific. Which is another > > reason I'm not convinced telling the rp thread to die via its stream > > is a good idea. > > I'd be OK with setting some flag or similar at the same time if that > would help; but I don't think there's a safe posix'y way of killing a > thread that might be stuck in a recv()/read() other than shutdown(). > > Dave >
* David Gibson (david@gibson.dropbear.id.au) wrote: > On Thu, Oct 16, 2014 at 04:26:55PM +0800, zhanghailiang wrote: > > On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote: > > >From: "Dr. David Alan Gilbert" <dgilbert@redhat.com> > [snip] > > > >+ case MIG_RPCOMM_ACK: > > >+ tmp32 = be32_to_cpup((uint32_t *)buf); > > >+ DPRINTF("RP: Received ACK 0x%x", tmp32); > > >+ atomic_xchg(&ms->rp_state.latest_ack, tmp32); > > > > I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;) > > Also, you don't appear to use tmp32 after that point, so what's the > reason for the exchange, rather than just an assignment? I've killed the 'latest_ack' off; I've kept the DPRINTF (and might turn it into a trace). Dave > > -- > David Gibson | I'll have my music baroque, and my code > david AT gibson.dropbear.id.au | minimalist, thank you. NOT _the_ _other_ > | _way_ _around_! > http://www.ozlabs.org/~dgibson -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/include/migration/migration.h b/include/migration/migration.h index 12e640d..b87c289 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -47,6 +47,14 @@ enum mig_rpcomm_cmd { MIG_RPCOMM_ACK, /* data (seq: be32 ) */ MIG_RPCOMM_AFTERLASTVALID }; + +/* Source side RP state */ +struct MigrationRetPathState { + uint32_t latest_ack; + QemuThread rp_thread; + bool error; +}; + typedef struct MigrationState MigrationState; /* State for the incoming migration */ @@ -69,9 +77,11 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + QEMUFile *return_path; int state; MigrationParams params; + struct MigrationRetPathState rp_state; double mbps; int64_t total_time; int64_t downtime; diff --git a/migration.c b/migration.c index 5ba8f3e..ee6db1d 100644 --- a/migration.c +++ b/migration.c @@ -246,6 +246,23 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) return head; } +/* + * Return true if we're already in the middle of a migration + * (i.e. any of the active or setup states) + */ +static bool migration_already_active(MigrationState *ms) +{ + switch (ms->state) { + case MIG_STATE_ACTIVE: + case MIG_STATE_SETUP: + return true; + + default: + return false; + + } +} + static void get_xbzrle_cache_stats(MigrationInfo *info) { if (migrate_use_xbzrle()) { @@ -371,6 +388,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) } } +static void migrate_fd_cleanup_src_rp(MigrationState *ms) +{ + QEMUFile *rp = ms->return_path; + + /* + * When stuff goes wrong (e.g. failing destination) on the rp, it can get + * cleaned up from a few threads; make sure not to do it twice in parallel + */ + rp = atomic_cmpxchg(&ms->return_path, rp, NULL); + if (rp) { + DPRINTF("cleaning up return path\n"); + qemu_fclose(rp); + } +} + static void migrate_fd_cleanup(void *opaque) { MigrationState *s = opaque; @@ -378,6 +410,8 @@ static void migrate_fd_cleanup(void *opaque) qemu_bh_delete(s->cleanup_bh); s->cleanup_bh = NULL; + migrate_fd_cleanup_src_rp(s); + if (s->file) { trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) int old_state ; trace_migrate_fd_cancel(); + if (s->return_path) { + /* shutdown the rp socket, so causing the rp thread to shutdown */ + qemu_file_shutdown(s->return_path); + } + do { old_state = s->state; if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) { @@ -655,8 +694,148 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } -/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print something to indicate why + */ +static void source_return_path_bad(MigrationState *s) +{ + s->rp_state.error = true; + migrate_fd_cleanup_src_rp(s); +} +/* + * Handles messages sent on the return path towards the source VM + * + */ +static void *source_return_path_thread(void *opaque) +{ + MigrationState *ms = opaque; + QEMUFile *rp = ms->return_path; + uint16_t expected_len, header_len, header_com; + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32; + int res; + + DPRINTF("RP: %s entry", __func__); + while (rp && !qemu_file_get_error(rp) && + migration_already_active(ms)) { + DPRINTF("RP: %s top of loop", __func__); + header_com = qemu_get_be16(rp); + header_len = qemu_get_be16(rp); + + switch (header_com) { + case MIG_RPCOMM_SHUT: + case MIG_RPCOMM_ACK: + expected_len = 4; + break; + + default: + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", + header_com, header_len); + source_return_path_bad(ms); + goto out; + } + + if (header_len > expected_len) { + error_report("RP: Received command 0x%04x with" + "incorrect length %d expecting %d", + header_com, header_len, + expected_len); + source_return_path_bad(ms); + goto out; + } + + /* We know we've got a valid header by this point */ + res = qemu_get_buffer(rp, buf, header_len); + if (res != header_len) { + DPRINTF("RP: Failed to read command data"); + source_return_path_bad(ms); + goto out; + } + + /* OK, we have the command and the data */ + switch (header_com) { + case MIG_RPCOMM_SHUT: + tmp32 = be32_to_cpup((uint32_t *)buf); + if (tmp32) { + error_report("RP: Sibling indicated error %d", tmp32); + source_return_path_bad(ms); + } else { + DPRINTF("RP: SHUT received"); + } + /* + * We'll let the main thread deal with closing the RP + * we could do a shutdown(2) on it, but we're the only user + * anyway, so there's nothing gained. + */ + goto out; + + case MIG_RPCOMM_ACK: + tmp32 = be32_to_cpup((uint32_t *)buf); + DPRINTF("RP: Received ACK 0x%x", tmp32); + atomic_xchg(&ms->rp_state.latest_ack, tmp32); + break; + + default: + /* This shouldn't happen because we should catch this above */ + DPRINTF("RP: Bad header_com in dispatch"); + } + /* Latest command processed, now leave a gap for the next one */ + header_com = MIG_RPCOMM_INVALID; + } + if (rp && qemu_file_get_error(rp)) { + DPRINTF("%s: rp bad at end", __func__); + source_return_path_bad(ms); + } + + DPRINTF("%s: Bottom exit", __func__); + +out: + return NULL; +} + +__attribute__ (( unused )) /* Until later in patch series */ +static int open_outgoing_return_path(MigrationState *ms) +{ + + ms->return_path = qemu_file_get_return_path(ms->file); + if (!ms->return_path) { + return -1; + } + + DPRINTF("%s: starting thread", __func__); + qemu_thread_create(&ms->rp_state.rp_thread, "return path", + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); + + DPRINTF("%s: continuing", __func__); + + return 0; +} + +__attribute__ (( unused )) /* Until later in patch series */ +static void await_outgoing_return_path_close(MigrationState *ms) +{ + /* + * If this is a normal exit then the destination will send a SHUT and the + * rp_thread will exit, however if there's an error we need to cause + * it to exit, which we can do by a shutdown. + * (canceling must also shutdown to stop us getting stuck here if + * the destination died at just the wrong place) + */ + if (qemu_file_get_error(ms->file) && ms->return_path) { + qemu_file_shutdown(ms->return_path); + } + DPRINTF("%s: Joining", __func__); + qemu_thread_join(&ms->rp_state.rp_thread); + DPRINTF("%s: Exit", __func__); +} + +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque;