Message ID | 20180425112723.1111-17-quintela@redhat.com |
---|---|
State | New |
Headers | show |
Series | Multifd | expand |
* Juan Quintela (quintela@redhat.com) wrote: > We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap > synchronizations don't happen inside a ram section, so we are safe > about two channels trying to overwrite the same memory. OK, that's quite neat - so you don't need any extra flags in the stream to do the sync; it probably needs a comment in the code somewhere so we don't forget! > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++---- > migration/trace-events | 6 +++ > 2 files changed, 113 insertions(+), 11 deletions(-) > > diff --git a/migration/ram.c b/migration/ram.c > index c4c185cc4c..398cb0af3b 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void) > #define MULTIFD_MAGIC 0x11223344U > #define MULTIFD_VERSION 1 > > +#define MULTIFD_FLAG_SYNC (1 << 0) > + > typedef struct { > uint32_t magic; > uint32_t version; > @@ -471,6 +473,8 @@ typedef struct { > uint32_t num_packets; > /* pages sent through this channel */ > uint32_t num_pages; > + /* syncs main thread and channels */ > + QemuSemaphore sem_sync; > } MultiFDSendParams; > > typedef struct { > @@ -507,6 +511,8 @@ typedef struct { > uint32_t num_packets; > /* pages sent through this channel */ > uint32_t num_pages; > + /* syncs main thread and channels */ > + QemuSemaphore sem_sync; > } MultiFDRecvParams; > > static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > @@ -682,6 +688,10 @@ struct { > int count; > /* array of pages to sent */ > MultiFDPages_t *pages; > + /* syncs main thread and channels */ > + QemuSemaphore sem_sync; > + /* global number of generated multifd packets */ > + uint32_t seq; It's interesting you use the same comment for 'seq' in MultiFDSendParams - but I guess that means only this one is the global version and the others aren't really global number - they're just local to that thread? > } *multifd_send_state; > > static void multifd_send_terminate_threads(Error *err) > @@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp) > p->c = NULL; > qemu_mutex_destroy(&p->mutex); > qemu_sem_destroy(&p->sem); > + qemu_sem_destroy(&p->sem_sync); > g_free(p->name); > p->name = NULL; > multifd_pages_clear(p->pages); > @@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp) > g_free(p->packet); > p->packet = NULL; > } > + qemu_sem_destroy(&multifd_send_state->sem_sync); > g_free(multifd_send_state->params); > multifd_send_state->params = NULL; > multifd_pages_clear(multifd_send_state->pages); > @@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp) > return ret; > } > > +static void multifd_send_sync_main(void) > +{ > + int i; > + > + if (!migrate_use_multifd()) { > + return; > + } > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDSendParams *p = &multifd_send_state->params[i]; > + > + trace_multifd_send_sync_main_signal(p->id); > + > + qemu_mutex_lock(&p->mutex); > + p->flags |= MULTIFD_FLAG_SYNC; > + p->pending_job++; > + qemu_mutex_unlock(&p->mutex); > + qemu_sem_post(&p->sem); > + } > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDSendParams *p = &multifd_send_state->params[i]; > + > + trace_multifd_send_sync_main_wait(p->id); > + qemu_sem_wait(&multifd_send_state->sem_sync); > + } > + trace_multifd_send_sync_main(multifd_send_state->seq); > +} > + OK, so this just makes each of the sending threads ack, so that seems OK. But what happens with an error? multifd_send_sync_main exits it's loop with a 'break' if the writes fail, and that could mean they never come and post the flag-sync sem. > static void *multifd_send_thread(void *opaque) > { > MultiFDSendParams *p = opaque; > @@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque) > /* ToDo: send packet here */ > > qemu_mutex_lock(&p->mutex); > + p->flags = 0; > p->pending_job--; > qemu_mutex_unlock(&p->mutex); > - continue; > + > + if (flags & MULTIFD_FLAG_SYNC) { > + qemu_sem_post(&multifd_send_state->sem_sync); > + } > } else if (p->quit) { > qemu_mutex_unlock(&p->mutex); > break; > + } else { > + qemu_mutex_unlock(&p->mutex); > + /* sometimes there are spurious wakeups */ > } > - qemu_mutex_unlock(&p->mutex); > - /* this is impossible */ > - error_setg(&local_err, "multifd_send_thread: Unknown command"); > - break; > } > > out: > @@ -840,12 +882,14 @@ int multifd_save_setup(void) > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); > atomic_set(&multifd_send_state->count, 0); > multifd_pages_init(&multifd_send_state->pages, page_count); > + qemu_sem_init(&multifd_send_state->sem_sync, 0); > > for (i = 0; i < thread_count; i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > + qemu_sem_init(&p->sem_sync, 0); > p->quit = false; > p->pending_job = 0; > p->id = i; > @@ -863,6 +907,10 @@ struct { > MultiFDRecvParams *params; > /* number of created threads */ > int count; > + /* syncs main thread and channels */ > + QemuSemaphore sem_sync; > + /* global number of generated multifd packets */ > + uint32_t seq; > } *multifd_recv_state; > > static void multifd_recv_terminate_threads(Error *err) > @@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp) > p->c = NULL; > qemu_mutex_destroy(&p->mutex); > qemu_sem_destroy(&p->sem); > + qemu_sem_destroy(&p->sem_sync); > g_free(p->name); > p->name = NULL; > multifd_pages_clear(p->pages); > @@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp) > g_free(p->packet); > p->packet = NULL; > } > + qemu_sem_destroy(&multifd_recv_state->sem_sync); > g_free(multifd_recv_state->params); > multifd_recv_state->params = NULL; > g_free(multifd_recv_state); > @@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp) > return ret; > } > > +static void multifd_recv_sync_main(void) > +{ > + int i; > + > + if (!migrate_use_multifd()) { > + return; > + } > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDRecvParams *p = &multifd_recv_state->params[i]; > + > + trace_multifd_recv_sync_main_signal(p->id); > + qemu_mutex_lock(&p->mutex); > + p->pending_job = true; > + qemu_mutex_unlock(&p->mutex); > + } > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDRecvParams *p = &multifd_recv_state->params[i]; > + > + trace_multifd_recv_sync_main_wait(p->id); > + qemu_sem_wait(&multifd_recv_state->sem_sync); > + qemu_mutex_lock(&p->mutex); > + if (multifd_recv_state->seq < p->seq) { > + multifd_recv_state->seq = p->seq; > + } Can you explain what this is for? Something like the latest received block? > + qemu_mutex_unlock(&p->mutex); > + } > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDRecvParams *p = &multifd_recv_state->params[i]; > + > + trace_multifd_recv_sync_main_signal(p->id); > + > + qemu_sem_post(&p->sem_sync); > + } > + trace_multifd_recv_sync_main(multifd_recv_state->seq); > +} > + > static void *multifd_recv_thread(void *opaque) > { > MultiFDRecvParams *p = opaque; > @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque) > trace_multifd_recv_thread_start(p->id); > > while (true) { > - qemu_sem_wait(&p->sem); > qemu_mutex_lock(&p->mutex); > - if (p->pending_job) { > + if (true || p->pending_job) { A TODO I guess??? > uint32_t used; > uint32_t flags; > qemu_mutex_unlock(&p->mutex); > @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque) > p->num_packets++; > p->num_pages += used; > qemu_mutex_unlock(&p->mutex); > + > + if (flags & MULTIFD_FLAG_SYNC) { > + qemu_sem_post(&multifd_recv_state->sem_sync); > + qemu_sem_wait(&p->sem_sync); > + } Can you explain the receive side logic - I think this is waiting for all receive threads to 'ack' - but how do we know that they've finished receiving all data that was sent? Dave > } else if (p->quit) { > qemu_mutex_unlock(&p->mutex); > break; > + } else { > + qemu_mutex_unlock(&p->mutex); > + /* sometimes there are spurious wakeups */ > } > - qemu_mutex_unlock(&p->mutex); > - /* this is impossible */ > - error_setg(&local_err, "multifd_recv_thread: Unknown command"); > - break; > } > > if (local_err) { > @@ -991,12 +1080,14 @@ int multifd_load_setup(void) > multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); > multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); > atomic_set(&multifd_recv_state->count, 0); > + qemu_sem_init(&multifd_recv_state->sem_sync, 0); > > for (i = 0; i < thread_count; i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > + qemu_sem_init(&p->sem_sync, 0); > p->quit = false; > p->pending_job = false; > p->id = i; > @@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) > ram_control_before_iterate(f, RAM_CONTROL_SETUP); > ram_control_after_iterate(f, RAM_CONTROL_SETUP); > > + multifd_send_sync_main(); > qemu_put_be64(f, RAM_SAVE_FLAG_EOS); > > return 0; > @@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) > */ > ram_control_after_iterate(f, RAM_CONTROL_ROUND); > > + multifd_send_sync_main(); > out: > qemu_put_be64(f, RAM_SAVE_FLAG_EOS); > ram_counters.transferred += 8; > @@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) > > rcu_read_unlock(); > > + multifd_send_sync_main(); > qemu_put_be64(f, RAM_SAVE_FLAG_EOS); > > return 0; > @@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f) > break; > case RAM_SAVE_FLAG_EOS: > /* normal exit */ > + multifd_recv_sync_main(); > break; > default: > error_report("Unknown combination of migration flags: %#x" > @@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) > break; > case RAM_SAVE_FLAG_EOS: > /* normal exit */ > + multifd_recv_sync_main(); > break; > default: > if (flags & RAM_SAVE_FLAG_HOOK) { > diff --git a/migration/trace-events b/migration/trace-events > index 9eee048287..b0ab8e2d03 100644 > --- a/migration/trace-events > +++ b/migration/trace-events > @@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d" > multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d" > multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x" > multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x" > +multifd_send_sync_main(uint32_t seq) "seq %d" > +multifd_send_sync_main_signal(uint8_t id) "channel %d" > +multifd_send_sync_main_wait(uint8_t id) "channel %d" > +multifd_recv_sync_main(uint32_t seq) "seq %d" > +multifd_recv_sync_main_signal(uint8_t id) "channel %d" > +multifd_recv_sync_main_wait(uint8_t id) "channel %d" > > # migration/migration.c > await_return_path_close_on_source_close(void) "" > -- > 2.17.0 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > * Juan Quintela (quintela@redhat.com) wrote: >> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap >> synchronizations don't happen inside a ram section, so we are safe >> about two channels trying to overwrite the same memory. > > OK, that's quite neat - so you don't need any extra flags in the stream > to do the sync; it probably needs a comment in the code somewhere so we > don't forget! Thanks. >> Signed-off-by: Juan Quintela <quintela@redhat.com> >> --- >> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++---- >> migration/trace-events | 6 +++ >> 2 files changed, 113 insertions(+), 11 deletions(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index c4c185cc4c..398cb0af3b 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void) >> #define MULTIFD_MAGIC 0x11223344U >> #define MULTIFD_VERSION 1 >> >> +#define MULTIFD_FLAG_SYNC (1 << 0) >> + >> typedef struct { >> uint32_t magic; >> uint32_t version; >> @@ -471,6 +473,8 @@ typedef struct { >> uint32_t num_packets; >> /* pages sent through this channel */ >> uint32_t num_pages; >> + /* syncs main thread and channels */ >> + QemuSemaphore sem_sync; >> } MultiFDSendParams; >> >> typedef struct { >> @@ -507,6 +511,8 @@ typedef struct { >> uint32_t num_packets; >> /* pages sent through this channel */ >> uint32_t num_pages; >> + /* syncs main thread and channels */ >> + QemuSemaphore sem_sync; >> } MultiFDRecvParams; >> >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) >> @@ -682,6 +688,10 @@ struct { >> int count; >> /* array of pages to sent */ >> MultiFDPages_t *pages; >> + /* syncs main thread and channels */ >> + QemuSemaphore sem_sync; >> + /* global number of generated multifd packets */ >> + uint32_t seq; > > It's interesting you use the same comment for 'seq' in > MultiFDSendParams - but I guess that means only this one is the global > version and the others aren't really global number - they're just > local to that thread? Only place that "increases/generates" seq is multifd_send_pages(), that is what creates a new packet to be sent. So, if we see _any_ packet on the wire, we know the real global ordering. They are only used for traces, to se that packet 42 was sent through channel 3, and on reception you check that packet 42 is what you received through channel 3. They only appears on traces, but I find they useful for debugging synchcronization errors. >> + for (i = 0; i < migrate_multifd_channels(); i++) { >> + MultiFDSendParams *p = &multifd_send_state->params[i]; >> + >> + trace_multifd_send_sync_main_signal(p->id); >> + >> + qemu_mutex_lock(&p->mutex); >> + p->flags |= MULTIFD_FLAG_SYNC; >> + p->pending_job++; >> + qemu_mutex_unlock(&p->mutex); >> + qemu_sem_post(&p->sem); >> + } [1] >> + for (i = 0; i < migrate_multifd_channels(); i++) { >> + MultiFDSendParams *p = &multifd_send_state->params[i]; >> + >> + trace_multifd_send_sync_main_wait(p->id); >> + qemu_sem_wait(&multifd_send_state->sem_sync); >> + } [2] >> + trace_multifd_send_sync_main(multifd_send_state->seq); >> +} >> + > > OK, so this just makes each of the sending threads ack, so that seems > OK. > But what happens with an error? multifd_send_sync_main exits it's > loop with a 'break' if the writes fail, and that could mean they never > come and post the flag-sync sem. Let's see. [1]: we are just doing mutex_lock/sem_post(), if we are not able to do that, we have got a big race that needs to be fixed. So that bit is ok. [2]: We do an unconditional sem_wait(). Looking at the worker code. In this patch level, we are ok, but I agree with you than in later patches, we need to also do the post on the error case. Changing. >> + >> + trace_multifd_recv_sync_main_wait(p->id); >> + qemu_sem_wait(&multifd_recv_state->sem_sync); >> + qemu_mutex_lock(&p->mutex); >> + if (multifd_recv_state->seq < p->seq) { >> + multifd_recv_state->seq = p->seq; >> + } > > Can you explain what this is for? > Something like the latest received block? When we are at a synhronization point, we don't know on the main thread when that synchronization happened (at what packet considered as a logical list of packages). So, we choose 'seq' from the channel with the highest number. That is the one that we want. We only use this for tracing, so we can "match" that we did a synchronization on the send side at packet N and we see the trace at reception side that we did it at packet N also. Remember than in a previous patch you asked me what happened if this does a wark around? At that point nothing. But now I need to change this code to be. multifd_recv_state->seq = 0; for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; ... if (multifd_recv_state->seq < p->seq) { multifd_recv_state->seq = p->seq; } And I have fixed the workaround problem, no? >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque) >> trace_multifd_recv_thread_start(p->id); >> >> while (true) { >> - qemu_sem_wait(&p->sem); >> qemu_mutex_lock(&p->mutex); >> - if (p->pending_job) { >> + if (true || p->pending_job) { > > A TODO I guess??? Oops, that should be out. Fixed on next version. >> uint32_t used; >> uint32_t flags; >> qemu_mutex_unlock(&p->mutex); >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque) >> p->num_packets++; >> p->num_pages += used; >> qemu_mutex_unlock(&p->mutex); >> + >> + if (flags & MULTIFD_FLAG_SYNC) { >> + qemu_sem_post(&multifd_recv_state->sem_sync); >> + qemu_sem_wait(&p->sem_sync); >> + } > > Can you explain the receive side logic - I think this is waiting for all > receive threads to 'ack' - but how do we know that they've finished > receiving all data that was sent? Because they need to receive a packet with MULTIFD_FLAG_SYNC sent. And if they receive that flag, we know that is the last one of the sequence. synchrconization works like (2 channels to make things easy): main thread: we finish a RAM_SECTION; flush pending packets to one of the channels send packet with MULTIFD_FLAG_SYNC for all the channels wait unil all the channels have processesed the FLAG_SYNC At this point send the RAM_SECTION_EOS footer. worker1 worker 2 if there is a pending packet, send it if there is a pending packet, send it (notice that there can't be more than one ever) send a pacet with SYNC flag set send a pacet with SYNC flag set On recetpion side main thread receives RAM_SECTION_EOS footer wait for works to receive a sync worker1 worker1 process any pending packet(no sync) process any pending packet(no sync) process packet with SYNC process packet with SYNC post main thread post main thread now main thread can continue Notice that we don't care what happens first, receiving packet with SYNC in workeers or RAM_SECTION_EOS on main thread, all works as expected. Noticing how long took to explain this, I think that I am going to add this to migration documentation. Will wait for any question you had before adding it. Later, Juan.
* Juan Quintela (quintela@redhat.com) wrote: > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > > * Juan Quintela (quintela@redhat.com) wrote: > >> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap > >> synchronizations don't happen inside a ram section, so we are safe > >> about two channels trying to overwrite the same memory. > > > > OK, that's quite neat - so you don't need any extra flags in the stream > > to do the sync; it probably needs a comment in the code somewhere so we > > don't forget! > > Thanks. > > >> Signed-off-by: Juan Quintela <quintela@redhat.com> > >> --- > >> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++---- > >> migration/trace-events | 6 +++ > >> 2 files changed, 113 insertions(+), 11 deletions(-) > >> > >> diff --git a/migration/ram.c b/migration/ram.c > >> index c4c185cc4c..398cb0af3b 100644 > >> --- a/migration/ram.c > >> +++ b/migration/ram.c > >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void) > >> #define MULTIFD_MAGIC 0x11223344U > >> #define MULTIFD_VERSION 1 > >> > >> +#define MULTIFD_FLAG_SYNC (1 << 0) > >> + > >> typedef struct { > >> uint32_t magic; > >> uint32_t version; > >> @@ -471,6 +473,8 @@ typedef struct { > >> uint32_t num_packets; > >> /* pages sent through this channel */ > >> uint32_t num_pages; > >> + /* syncs main thread and channels */ > >> + QemuSemaphore sem_sync; > >> } MultiFDSendParams; > >> > >> typedef struct { > >> @@ -507,6 +511,8 @@ typedef struct { > >> uint32_t num_packets; > >> /* pages sent through this channel */ > >> uint32_t num_pages; > >> + /* syncs main thread and channels */ > >> + QemuSemaphore sem_sync; > >> } MultiFDRecvParams; > >> > >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > >> @@ -682,6 +688,10 @@ struct { > >> int count; > >> /* array of pages to sent */ > >> MultiFDPages_t *pages; > >> + /* syncs main thread and channels */ > >> + QemuSemaphore sem_sync; > >> + /* global number of generated multifd packets */ > >> + uint32_t seq; > > > > It's interesting you use the same comment for 'seq' in > > MultiFDSendParams - but I guess that means only this one is the global > > version and the others aren't really global number - they're just > > local to that thread? > > Only place that "increases/generates" seq is multifd_send_pages(), that > is what creates a new packet to be sent. So, if we see _any_ packet on > the wire, we know the real global ordering. They are only used for > traces, to se that packet 42 was sent through channel 3, and on > reception you check that packet 42 is what you received through channel > 3. They only appears on traces, but I find they useful for debugging > synchcronization errors. Ah, and multifd_send_pages is the main thread, and it always operates on the multifd_send_state->seq and then passes it to the SendParams; OK. I'm not sure how to explain that better; but it's a little confusing. > >> + for (i = 0; i < migrate_multifd_channels(); i++) { > >> + MultiFDSendParams *p = &multifd_send_state->params[i]; > >> + > >> + trace_multifd_send_sync_main_signal(p->id); > >> + > >> + qemu_mutex_lock(&p->mutex); > >> + p->flags |= MULTIFD_FLAG_SYNC; > >> + p->pending_job++; > >> + qemu_mutex_unlock(&p->mutex); > >> + qemu_sem_post(&p->sem); > >> + } > > [1] > > >> + for (i = 0; i < migrate_multifd_channels(); i++) { > >> + MultiFDSendParams *p = &multifd_send_state->params[i]; > >> + > >> + trace_multifd_send_sync_main_wait(p->id); > >> + qemu_sem_wait(&multifd_send_state->sem_sync); > >> + } > > [2] > > >> + trace_multifd_send_sync_main(multifd_send_state->seq); > >> +} > >> + > > > > OK, so this just makes each of the sending threads ack, so that seems > > OK. > > But what happens with an error? multifd_send_sync_main exits it's > > loop with a 'break' if the writes fail, and that could mean they never > > come and post the flag-sync sem. > > Let's see. > > [1]: we are just doing mutex_lock/sem_post(), if we are not able to do > that, we have got a big race that needs to be fixed. So that bit is ok. > > [2]: We do an unconditional sem_wait(). Looking at the worker code. > In this patch level, we are ok, but I agree with you than in later > patches, we need to also do the post on the error case. Changing. K. > >> + > >> + trace_multifd_recv_sync_main_wait(p->id); > >> + qemu_sem_wait(&multifd_recv_state->sem_sync); > >> + qemu_mutex_lock(&p->mutex); > >> + if (multifd_recv_state->seq < p->seq) { > >> + multifd_recv_state->seq = p->seq; > >> + } > > > > Can you explain what this is for? > > Something like the latest received block? > > When we are at a synhronization point, we don't know on the main thread > when that synchronization happened (at what packet considered as a > logical list of packages). So, we choose 'seq' from the channel with > the highest number. That is the one that we want. We only use this > for tracing, so we can "match" that we did a synchronization on the send > side at packet N and we see the trace at reception side that we did it > at packet N also. OK, I think I see; again, this code is main thread, and it's going around all the subthreads; so it's updating the central copy seeing who has been received - OK. > Remember than in a previous patch you asked me what happened if this > does a wark around? At that point nothing. But now I need to change > this code to be. > > > multifd_recv_state->seq = 0; > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > ... > if (multifd_recv_state->seq < p->seq) { > multifd_recv_state->seq = p->seq; > } > > And I have fixed the workaround problem, no? Yes. Adding a note somewhat saying it's just for debug would help as well. > >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque) > >> trace_multifd_recv_thread_start(p->id); > >> > >> while (true) { > >> - qemu_sem_wait(&p->sem); > >> qemu_mutex_lock(&p->mutex); > >> - if (p->pending_job) { > >> + if (true || p->pending_job) { > > > > A TODO I guess??? > > Oops, that should be out. > > Fixed on next version. > > >> uint32_t used; > >> uint32_t flags; > >> qemu_mutex_unlock(&p->mutex); > >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque) > >> p->num_packets++; > >> p->num_pages += used; > >> qemu_mutex_unlock(&p->mutex); > >> + > >> + if (flags & MULTIFD_FLAG_SYNC) { > >> + qemu_sem_post(&multifd_recv_state->sem_sync); > >> + qemu_sem_wait(&p->sem_sync); > >> + } > > > > Can you explain the receive side logic - I think this is waiting for all > > receive threads to 'ack' - but how do we know that they've finished > > receiving all data that was sent? > > Because they need to receive a packet with MULTIFD_FLAG_SYNC sent. And > if they receive that flag, we know that is the last one of the sequence. > > synchrconization works like (2 channels to make things easy): > > main thread: > we finish a RAM_SECTION; > flush pending packets to one of the channels > send packet with MULTIFD_FLAG_SYNC for all the channels > wait unil all the channels have processesed the FLAG_SYNC > At this point send the RAM_SECTION_EOS footer. > > worker1 worker 2 > > if there is a pending packet, send it if there is a pending packet, send it > (notice that there can't be more than one ever) > send a pacet with SYNC flag set send a pacet with SYNC flag set > > On recetpion side > > > main thread > receives RAM_SECTION_EOS footer > wait for works to receive a sync > > worker1 worker1 > process any pending packet(no sync) process any pending packet(no sync) > process packet with SYNC process packet with SYNC > post main thread post main thread > > now main thread can continue > > Notice that we don't care what happens first, receiving packet with SYNC > in workeers or RAM_SECTION_EOS on main thread, all works as expected. > > Noticing how long took to explain this, I think that I am going to add > this to migration documentation. Will wait for any question you had > before adding it. Thanks; that I think makes sense. Dave > Later, Juan. -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/migration/ram.c b/migration/ram.c index c4c185cc4c..398cb0af3b 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void) #define MULTIFD_MAGIC 0x11223344U #define MULTIFD_VERSION 1 +#define MULTIFD_FLAG_SYNC (1 << 0) + typedef struct { uint32_t magic; uint32_t version; @@ -471,6 +473,8 @@ typedef struct { uint32_t num_packets; /* pages sent through this channel */ uint32_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; } MultiFDSendParams; typedef struct { @@ -507,6 +511,8 @@ typedef struct { uint32_t num_packets; /* pages sent through this channel */ uint32_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -682,6 +688,10 @@ struct { int count; /* array of pages to sent */ MultiFDPages_t *pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint32_t seq; } *multifd_send_state; static void multifd_send_terminate_threads(Error *err) @@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; multifd_pages_clear(p->pages); @@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp) g_free(p->packet); p->packet = NULL; } + qemu_sem_destroy(&multifd_send_state->sem_sync); g_free(multifd_send_state->params); multifd_send_state->params = NULL; multifd_pages_clear(multifd_send_state->pages); @@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp) return ret; } +static void multifd_send_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); + p->flags |= MULTIFD_FLAG_SYNC; + p->pending_job++; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_wait(p->id); + qemu_sem_wait(&multifd_send_state->sem_sync); + } + trace_multifd_send_sync_main(multifd_send_state->seq); +} + static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; @@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque) /* ToDo: send packet here */ qemu_mutex_lock(&p->mutex); + p->flags = 0; p->pending_job--; qemu_mutex_unlock(&p->mutex); - continue; + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_send_state->sem_sync); + } } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ } - qemu_mutex_unlock(&p->mutex); - /* this is impossible */ - error_setg(&local_err, "multifd_send_thread: Unknown command"); - break; } out: @@ -840,12 +882,14 @@ int multifd_save_setup(void) multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); atomic_set(&multifd_send_state->count, 0); multifd_pages_init(&multifd_send_state->pages, page_count); + qemu_sem_init(&multifd_send_state->sem_sync, 0); for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); p->quit = false; p->pending_job = 0; p->id = i; @@ -863,6 +907,10 @@ struct { MultiFDRecvParams *params; /* number of created threads */ int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint32_t seq; } *multifd_recv_state; static void multifd_recv_terminate_threads(Error *err) @@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; multifd_pages_clear(p->pages); @@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp) g_free(p->packet); p->packet = NULL; } + qemu_sem_destroy(&multifd_recv_state->sem_sync); g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; g_free(multifd_recv_state); @@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp) return ret; } +static void multifd_recv_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_signal(p->id); + qemu_mutex_lock(&p->mutex); + p->pending_job = true; + qemu_mutex_unlock(&p->mutex); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_wait(p->id); + qemu_sem_wait(&multifd_recv_state->sem_sync); + qemu_mutex_lock(&p->mutex); + if (multifd_recv_state->seq < p->seq) { + multifd_recv_state->seq = p->seq; + } + qemu_mutex_unlock(&p->mutex); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_signal(p->id); + + qemu_sem_post(&p->sem_sync); + } + trace_multifd_recv_sync_main(multifd_recv_state->seq); +} + static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); while (true) { - qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); - if (p->pending_job) { + if (true || p->pending_job) { uint32_t used; uint32_t flags; qemu_mutex_unlock(&p->mutex); @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque) p->num_packets++; p->num_pages += used; qemu_mutex_unlock(&p->mutex); + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ } - qemu_mutex_unlock(&p->mutex); - /* this is impossible */ - error_setg(&local_err, "multifd_recv_thread: Unknown command"); - break; } if (local_err) { @@ -991,12 +1080,14 @@ int multifd_load_setup(void) multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); atomic_set(&multifd_recv_state->count, 0); + qemu_sem_init(&multifd_recv_state->sem_sync, 0); for (i = 0; i < thread_count; i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); p->quit = false; p->pending_job = false; p->id = i; @@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); return 0; @@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) */ ram_control_after_iterate(f, RAM_CONTROL_ROUND); + multifd_send_sync_main(); out: qemu_put_be64(f, RAM_SAVE_FLAG_EOS); ram_counters.transferred += 8; @@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) rcu_read_unlock(); + multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); return 0; @@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f) break; case RAM_SAVE_FLAG_EOS: /* normal exit */ + multifd_recv_sync_main(); break; default: error_report("Unknown combination of migration flags: %#x" @@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) break; case RAM_SAVE_FLAG_EOS: /* normal exit */ + multifd_recv_sync_main(); break; default: if (flags & RAM_SAVE_FLAG_HOOK) { diff --git a/migration/trace-events b/migration/trace-events index 9eee048287..b0ab8e2d03 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d" multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d" multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x" multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x" +multifd_send_sync_main(uint32_t seq) "seq %d" +multifd_send_sync_main_signal(uint8_t id) "channel %d" +multifd_send_sync_main_wait(uint8_t id) "channel %d" +multifd_recv_sync_main(uint32_t seq) "seq %d" +multifd_recv_sync_main_signal(uint8_t id) "channel %d" +multifd_recv_sync_main_wait(uint8_t id) "channel %d" # migration/migration.c await_return_path_close_on_source_close(void) ""
We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap synchronizations don't happen inside a ram section, so we are safe about two channels trying to overwrite the same memory. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++---- migration/trace-events | 6 +++ 2 files changed, 113 insertions(+), 11 deletions(-)