diff mbox

[v5,17/17] migration: Flush receive queue

Message ID 20170717134238.1966-18-quintela@redhat.com
State New
Headers show

Commit Message

Juan Quintela July 17, 2017, 1:42 p.m. UTC
Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

We are low on page flags, so we use a combination that is not valid to
emit that message:  MULTIFD_PAGE and COMPRESSED.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 56 insertions(+), 1 deletion(-)

Comments

Dr. David Alan Gilbert July 20, 2017, 11:45 a.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 56 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c78b286..bffe204 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -71,6 +71,12 @@
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
> +/* We are getting low on pages flags, so we start using combinations
> +   When we need to flush a page, we sent it as
> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
> +   We don't allow that combination
> +*/
> +
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
>      return buffer_is_zero(p, size);
> @@ -193,6 +199,9 @@ struct RAMState {
>      uint64_t iterations_prev;
>      /* Iterations since start */
>      uint64_t iterations;
> +    /* Indicates if we have synced the bitmap and we need to assure that
> +       target has processeed all previous pages */
> +    bool multifd_needs_flush;
>      /* protects modification of the bitmap */
>      uint64_t migration_dirty_pages;
>      /* number of dirty bits in the bitmap */
> @@ -363,7 +372,6 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> -
>  typedef struct {
>      int num;
>      int size;
> @@ -595,9 +603,11 @@ struct MultiFDRecvParams {
>      QIOChannel *c;
>      QemuSemaphore ready;
>      QemuSemaphore sem;
> +    QemuCond cond_sync;
>      QemuMutex mutex;
>      /* proteced by param mutex */
>      bool quit;
> +    bool sync;
>      multifd_pages_t pages;
>      bool done;
>  };
> @@ -637,6 +647,7 @@ void multifd_load_cleanup(void)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_cond_destroy(&p->cond_sync);
>          socket_recv_channel_destroy(p->c);
>          g_free(p);
>          multifd_recv_state->params[i] = NULL;
> @@ -675,6 +686,10 @@ static void *multifd_recv_thread(void *opaque)
>                  return NULL;
>              }
>              p->done = true;
> +            if (p->sync) {
> +                qemu_cond_signal(&p->cond_sync);
> +                p->sync = false;
> +            }
>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
>              continue;
> @@ -724,9 +739,11 @@ gboolean multifd_new_channel(QIOChannel *ioc)
>      qemu_mutex_init(&p->mutex);
>      qemu_sem_init(&p->sem, 0);
>      qemu_sem_init(&p->ready, 0);
> +    qemu_cond_init(&p->cond_sync);
>      p->quit = false;
>      p->id = id;
>      p->done = false;
> +    p->sync = false;
>      multifd_init_group(&p->pages);
>      p->c = ioc;
>      atomic_set(&multifd_recv_state->params[id], p);
> @@ -792,6 +809,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
>      qemu_sem_post(&p->sem);
>  }
>  
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        while (!p->done) {
> +            p->sync = true;
> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
> +        }

I don't think I understand how that works in the case where the
recv_thread has already 'done' by the point you set sync=true; how does
it get back to the check and do the signal?

> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
>  {
>      size_t size, len;
>  
> +    if (rs->multifd_needs_flush &&
> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> +        offset |= RAM_SAVE_FLAG_ZERO;

In the comment near the top you say RAM_SAVE_FLAG_COMPRESS_PAGE;  it's
probably best to add an alias at the top to make it clear, e.g.
  #define RAM_SAVE_FLAG_MULTIFD_SYNC RAM_SAVE_FLAG_ZERO

  or maybe (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)

> +        rs->multifd_needs_flush = false;
> +    }
> +
>      if (block == rs->last_sent_block) {
>          offset |= RAM_SAVE_FLAG_CONTINUE;
>      }
> @@ -2496,6 +2540,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      if (!migration_in_postcopy()) {
>          migration_bitmap_sync(rs);
> +        if (migrate_use_multifd()) {
> +            rs->multifd_needs_flush = true;
> +        }
>      }
>  
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2538,6 +2585,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync(rs);
> +        if (migrate_use_multifd()) {
> +            rs->multifd_needs_flush = true;
> +        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
> @@ -3012,6 +3062,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          }
>  
> +        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO))
> +                  == (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)) {
> +            multifd_flush();
> +            flags = flags & ~RAM_SAVE_FLAG_ZERO;
> +        }
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {

Dave

> -- 
> 2.9.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Peter Xu July 21, 2017, 2:40 a.m. UTC | #2
On Mon, Jul 17, 2017 at 03:42:38PM +0200, Juan Quintela wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 56 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c78b286..bffe204 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -71,6 +71,12 @@
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
>  
> +/* We are getting low on pages flags, so we start using combinations
> +   When we need to flush a page, we sent it as
> +   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
> +   We don't allow that combination
> +*/
> +
>  static inline bool is_zero_range(uint8_t *p, uint64_t size)
>  {
>      return buffer_is_zero(p, size);
> @@ -193,6 +199,9 @@ struct RAMState {
>      uint64_t iterations_prev;
>      /* Iterations since start */
>      uint64_t iterations;
> +    /* Indicates if we have synced the bitmap and we need to assure that
> +       target has processeed all previous pages */
> +    bool multifd_needs_flush;
>      /* protects modification of the bitmap */
>      uint64_t migration_dirty_pages;
>      /* number of dirty bits in the bitmap */
> @@ -363,7 +372,6 @@ static void compress_threads_save_setup(void)
>  
>  /* Multiple fd's */
>  
> -
>  typedef struct {
>      int num;
>      int size;
> @@ -595,9 +603,11 @@ struct MultiFDRecvParams {
>      QIOChannel *c;
>      QemuSemaphore ready;
>      QemuSemaphore sem;
> +    QemuCond cond_sync;
>      QemuMutex mutex;
>      /* proteced by param mutex */
>      bool quit;
> +    bool sync;
>      multifd_pages_t pages;
>      bool done;
>  };
> @@ -637,6 +647,7 @@ void multifd_load_cleanup(void)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        qemu_cond_destroy(&p->cond_sync);
>          socket_recv_channel_destroy(p->c);
>          g_free(p);
>          multifd_recv_state->params[i] = NULL;
> @@ -675,6 +686,10 @@ static void *multifd_recv_thread(void *opaque)
>                  return NULL;
>              }
>              p->done = true;
> +            if (p->sync) {
> +                qemu_cond_signal(&p->cond_sync);
> +                p->sync = false;
> +            }

Could we use the same p->ready for this purpose? They looks similar:
all we want to do is to let the main thread know "worker thread has
finished receiving the last piece and becomes idle again", right?

>              qemu_mutex_unlock(&p->mutex);
>              qemu_sem_post(&p->ready);
>              continue;
> @@ -724,9 +739,11 @@ gboolean multifd_new_channel(QIOChannel *ioc)
>      qemu_mutex_init(&p->mutex);
>      qemu_sem_init(&p->sem, 0);
>      qemu_sem_init(&p->ready, 0);
> +    qemu_cond_init(&p->cond_sync);
>      p->quit = false;
>      p->id = id;
>      p->done = false;
> +    p->sync = false;
>      multifd_init_group(&p->pages);
>      p->c = ioc;
>      atomic_set(&multifd_recv_state->params[id], p);
> @@ -792,6 +809,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
>      qemu_sem_post(&p->sem);
>  }
>  
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        while (!p->done) {
> +            p->sync = true;
> +            qemu_cond_wait(&p->cond_sync, &p->mutex);

(similar comment like above)

> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: write page header to wire
>   *
> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
>  {
>      size_t size, len;
>  
> +    if (rs->multifd_needs_flush &&
> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {

If multifd_needs_flush is only for multifd, then we may skip this
check, but it looks more like an assertion:

    if (rs->multifd_needs_flush) {
        assert(offset & RAM_SAVE_FLAG_MULTIFD_PAGE);
        offset |= RAM_SAVE_FLAG_ZERO;
    }

(Dave mentioned about unaligned flag used in commit message and here:
 ZERO is used, but COMPRESS is mentioned)

> +        offset |= RAM_SAVE_FLAG_ZERO;
> +        rs->multifd_needs_flush = false;
> +    }
> +
>      if (block == rs->last_sent_block) {
>          offset |= RAM_SAVE_FLAG_CONTINUE;
>      }
> @@ -2496,6 +2540,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      if (!migration_in_postcopy()) {
>          migration_bitmap_sync(rs);
> +        if (migrate_use_multifd()) {
> +            rs->multifd_needs_flush = true;
> +        }

Would it be good to move this block into entry of
migration_bitmap_sync(), instead of setting it up at the callers of
migration_bitmap_sync()?

>      }
>  
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2538,6 +2585,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync(rs);
> +        if (migrate_use_multifd()) {
> +            rs->multifd_needs_flush = true;
> +        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
> @@ -3012,6 +3062,11 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              break;
>          }
>  
> +        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO))
> +                  == (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)) {
> +            multifd_flush();
> +            flags = flags & ~RAM_SAVE_FLAG_ZERO;
> +        }
>          if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -- 
> 2.9.4
> 

Thanks,
Peter Xu July 21, 2017, 6:03 a.m. UTC | #3
On Mon, Jul 17, 2017 at 03:42:38PM +0200, Juan Quintela wrote:
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> We are low on page flags, so we use a combination that is not valid to
> emit that message:  MULTIFD_PAGE and COMPRESSED.

Btw, would it be possible that we introduce a new QEMU_VM_COMMAND for
this flush operation?  Like: MIG_CMD_MULTIFD_FLUSH?
Juan Quintela July 21, 2017, 10:53 a.m. UTC | #4
Peter Xu <peterx@redhat.com> wrote:
> On Mon, Jul 17, 2017 at 03:42:38PM +0200, Juan Quintela wrote:
>> Each time that we sync the bitmap, it is a possiblity that we receive
>> a page that is being processed by a different thread.  We fix this
>> problem just making sure that we wait for all receiving threads to
>> finish its work before we procedeed with the next stage.
>> 
>> We are low on page flags, so we use a combination that is not valid to
>> emit that message:  MULTIFD_PAGE and COMPRESSED.
>
> Btw, would it be possible that we introduce a new QEMU_VM_COMMAND for
> this flush operation?  Like: MIG_CMD_MULTIFD_FLUSH?

From the commit message:

> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.

Yeap, I found that much better dessign, but without further surgery, it
is not trivial.  There are two places where we can sync the bitmap, and
in one of them, we have already sent the beggining of the section, so
too late to send a command.

Later, Juan.
Juan Quintela Aug. 8, 2017, 10:43 a.m. UTC | #5
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Each time that we sync the bitmap, it is a possiblity that we receive
>> a page that is being processed by a different thread.  We fix this
>> problem just making sure that we wait for all receiving threads to
>> finish its work before we procedeed with the next stage.
>> 
>> We are low on page flags, so we use a combination that is not valid to
>> emit that message:  MULTIFD_PAGE and COMPRESSED.
>> 
>> I tried to make a migration command for it, but it don't work because
>> we sync the bitmap sometimes when we have already sent the beggining
>> of the section, so I just added a new page flag.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>

>> +static int multifd_flush(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_use_multifd()) {
>> +        return 0;
>> +    }
>> +    thread_count = migrate_multifd_threads();
>> +    for (i = 0; i < thread_count; i++) {
>> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
>> +
>> +        qemu_mutex_lock(&p->mutex);
>> +        while (!p->done) {
>> +            p->sync = true;
>> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
>> +        }
>
> I don't think I understand how that works in the case where the
> recv_thread has already 'done' by the point you set sync=true; how does
> it get back to the check and do the signal?

We have two cases:
* done = true
* done = false

if done is false, we need to wait until it is done.  But if it is true,
we don't have to wait.  By definition, there is nothing on that thread
that we need to wait for.  It is not in the middle of receiving a page.



>
>> +        qemu_mutex_unlock(&p->mutex);
>> +    }
>> +    return 0;
>> +}
>> +
>>  /**
>>   * save_page_header: write page header to wire
>>   *
>> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
>>  {
>>      size_t size, len;
>>  
>> +    if (rs->multifd_needs_flush &&
>> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>> +        offset |= RAM_SAVE_FLAG_ZERO;
>
> In the comment near the top you say RAM_SAVE_FLAG_COMPRESS_PAGE;  it's
> probably best to add an alias at the top to make it clear, e.g.
>   #define RAM_SAVE_FLAG_MULTIFD_SYNC RAM_SAVE_FLAG_ZERO
>
>   or maybe (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)

done.

But I only use it when we use the "or".

Thanks, Juan.
Dr. David Alan Gilbert Aug. 8, 2017, 11:25 a.m. UTC | #6
* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> Each time that we sync the bitmap, it is a possiblity that we receive
> >> a page that is being processed by a different thread.  We fix this
> >> problem just making sure that we wait for all receiving threads to
> >> finish its work before we procedeed with the next stage.
> >> 
> >> We are low on page flags, so we use a combination that is not valid to
> >> emit that message:  MULTIFD_PAGE and COMPRESSED.
> >> 
> >> I tried to make a migration command for it, but it don't work because
> >> we sync the bitmap sometimes when we have already sent the beggining
> >> of the section, so I just added a new page flag.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> >> +static int multifd_flush(void)
> >> +{
> >> +    int i, thread_count;
> >> +
> >> +    if (!migrate_use_multifd()) {
> >> +        return 0;
> >> +    }
> >> +    thread_count = migrate_multifd_threads();
> >> +    for (i = 0; i < thread_count; i++) {
> >> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
> >> +
> >> +        qemu_mutex_lock(&p->mutex);
> >> +        while (!p->done) {
> >> +            p->sync = true;
> >> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
> >> +        }
> >
> > I don't think I understand how that works in the case where the
> > recv_thread has already 'done' by the point you set sync=true; how does
> > it get back to the check and do the signal?
> 
> We have two cases:
> * done = true
> * done = false
> 
> if done is false, we need to wait until it is done.  But if it is true,
> we don't have to wait.  By definition, there is nothing on that thread
> that we need to wait for.  It is not in the middle of receiving a page.

OK, and you've got the p->mutex, so done can't become true
between the check at the top of the while() and the p->sync = true
on the next line? OK.

Dave
> 
> 
> >
> >> +        qemu_mutex_unlock(&p->mutex);
> >> +    }
> >> +    return 0;
> >> +}
> >> +
> >>  /**
> >>   * save_page_header: write page header to wire
> >>   *
> >> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
> >>  {
> >>      size_t size, len;
> >>  
> >> +    if (rs->multifd_needs_flush &&
> >> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> >> +        offset |= RAM_SAVE_FLAG_ZERO;
> >
> > In the comment near the top you say RAM_SAVE_FLAG_COMPRESS_PAGE;  it's
> > probably best to add an alias at the top to make it clear, e.g.
> >   #define RAM_SAVE_FLAG_MULTIFD_SYNC RAM_SAVE_FLAG_ZERO
> >
> >   or maybe (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)
> 
> done.
> 
> But I only use it when we use the "or".
> 
> Thanks, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela Aug. 8, 2017, 11:40 a.m. UTC | #7
Peter Xu <peterx@redhat.com> wrote:
> On Mon, Jul 17, 2017 at 03:42:38PM +0200, Juan Quintela wrote:
>> Each time that we sync the bitmap, it is a possiblity that we receive
>> a page that is being processed by a different thread.  We fix this
>> problem just making sure that we wait for all receiving threads to
>> finish its work before we procedeed with the next stage.
>> 
>> We are low on page flags, so we use a combination that is not valid to
>> emit that message:  MULTIFD_PAGE and COMPRESSED.
>> 
>> I tried to make a migration command for it, but it don't work because
>> we sync the bitmap sometimes when we have already sent the beggining
>> of the section, so I just added a new page flag.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>

>> @@ -675,6 +686,10 @@ static void *multifd_recv_thread(void *opaque)
>>                  return NULL;
>>              }
>>              p->done = true;
>> +            if (p->sync) {
>> +                qemu_cond_signal(&p->cond_sync);
>> +                p->sync = false;
>> +            }
>
> Could we use the same p->ready for this purpose? They looks similar:
> all we want to do is to let the main thread know "worker thread has
> finished receiving the last piece and becomes idle again", right?

We *could*, but "ready" is used for each page that we sent, sync is only
used once every round.  Notice that "ready" is a semaphore, and its
semantic is weird.  See next comment.


>> +static int multifd_flush(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_use_multifd()) {
>> +        return 0;
>> +    }
>> +    thread_count = migrate_multifd_threads();
>> +    for (i = 0; i < thread_count; i++) {
>> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
>> +
>> +        qemu_mutex_lock(&p->mutex);
>> +        while (!p->done) {
>> +            p->sync = true;
>> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
>
> (similar comment like above)

We need to look at the two pieces of code at the same time.  What are we
trying to do:

- making sure that all threads have finished the current round.
  in this particular case, that this thread has finished its current
  round OR  that it is waiting for work.

So, the main thread is the one that does the sem_wait(ready) and the channel
thread is the one that does the sem_post(ready).

multifd_recv_thread()

    if (p->sync) {
        sem_post(ready);
        p->sync = false;
    }

multifd_flush()
   if (!p->done) {
       p->sync = true;
       sem_wait(ready);
   }

Ah, but done and sync can be changed from other threads, so current code
will become:

multifd_recv_thread()

    if (p->sync) {
        sem_post(ready);
        p->sync = false;
    }

multifd_flush()
   ...
   mutex_lock(lock);
   if (!p->done) {
       p->sync = true;
       mutex_unlock(lock)
       sem_wait(ready);
       mutex_lock(lock)
   }
   mutex_unlock(lock)

That I would claim that it is more complicated to understand.  Mixing
locks and semaphores is ..... interesting to say the least.  With
variable conditions it becomes easy.

Yes, we can change sync/done to atomic access, but not sure that makes
things so much simpler.

>> +        }
>> +        qemu_mutex_unlock(&p->mutex);
>> +    }
>> +    return 0;
>> +}
>> +
>>  /**
>>   * save_page_header: write page header to wire
>>   *
>> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
>>  {
>>      size_t size, len;
>>  
>> +    if (rs->multifd_needs_flush &&
>> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
>
> If multifd_needs_flush is only for multifd, then we may skip this
> check, but it looks more like an assertion:
>
>     if (rs->multifd_needs_flush) {
>         assert(offset & RAM_SAVE_FLAG_MULTIFD_PAGE);
>         offset |= RAM_SAVE_FLAG_ZERO;
>     }

No, it could be that this page is a _non_ multifd page, and then ZERO
means something different.  So, we can only send this for MULTIFD pages.

> (Dave mentioned about unaligned flag used in commit message and here:
>  ZERO is used, but COMPRESS is mentioned)

OK, I can change the message.

>> @@ -2496,6 +2540,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>>  
>>      if (!migration_in_postcopy()) {
>>          migration_bitmap_sync(rs);
>> +        if (migrate_use_multifd()) {
>> +            rs->multifd_needs_flush = true;
>> +        }
>
> Would it be good to move this block into entry of
> migration_bitmap_sync(), instead of setting it up at the callers of
> migration_bitmap_sync()?

We can't have all of it.

We call migration_bitmap_sync() in 4 places.
- We don't need to set the flag for the 1st synchronization
- We don't need to set it on postcopy (yet).

So, we can add code inside to check if we are on the 1st round, and
forget about postcopy (we check in other place), or we maintain it this way.

So, change becomes:

modified   migration/ram.c
@@ -1131,6 +1131,9 @@ static void migration_bitmap_sync(RAMState *rs)
     if (migrate_use_events()) {
         qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
     }
+    if (rs->ram_bulk_stage && migrate_use_multifd()) {
+        rs->multifd_needs_flush = true;
+    }
 }
 
 /**
@@ -2533,9 +2536,6 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     if (!migration_in_postcopy()) {
         migration_bitmap_sync(rs);
-        if (migrate_use_multifd()) {
-            rs->multifd_needs_flush = true;
-        }
     }
 
     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -2578,9 +2578,6 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
         qemu_mutex_lock_iothread();
         rcu_read_lock();
         migration_bitmap_sync(rs);
-        if (migrate_use_multifd()) {
-            rs->multifd_needs_flush = true;
-        }
         rcu_read_unlock();
         qemu_mutex_unlock_iothread();
         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;

three less lines, you win.  We need to check in otherplace already that
postcopy & multifd are not enabled at the same time.

Thanks, Juan.
Peter Xu Aug. 10, 2017, 6:49 a.m. UTC | #8
On Tue, Aug 08, 2017 at 01:40:58PM +0200, Juan Quintela wrote:
> Peter Xu <peterx@redhat.com> wrote:
> > On Mon, Jul 17, 2017 at 03:42:38PM +0200, Juan Quintela wrote:
> >> Each time that we sync the bitmap, it is a possiblity that we receive
> >> a page that is being processed by a different thread.  We fix this
> >> problem just making sure that we wait for all receiving threads to
> >> finish its work before we procedeed with the next stage.
> >> 
> >> We are low on page flags, so we use a combination that is not valid to
> >> emit that message:  MULTIFD_PAGE and COMPRESSED.
> >> 
> >> I tried to make a migration command for it, but it don't work because
> >> we sync the bitmap sometimes when we have already sent the beggining
> >> of the section, so I just added a new page flag.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> >> @@ -675,6 +686,10 @@ static void *multifd_recv_thread(void *opaque)
> >>                  return NULL;
> >>              }
> >>              p->done = true;
> >> +            if (p->sync) {
> >> +                qemu_cond_signal(&p->cond_sync);
> >> +                p->sync = false;
> >> +            }
> >
> > Could we use the same p->ready for this purpose? They looks similar:
> > all we want to do is to let the main thread know "worker thread has
> > finished receiving the last piece and becomes idle again", right?
> 
> We *could*, but "ready" is used for each page that we sent, sync is only
> used once every round.  Notice that "ready" is a semaphore, and its
> semantic is weird.  See next comment.
> 
> 
> >> +static int multifd_flush(void)
> >> +{
> >> +    int i, thread_count;
> >> +
> >> +    if (!migrate_use_multifd()) {
> >> +        return 0;
> >> +    }
> >> +    thread_count = migrate_multifd_threads();
> >> +    for (i = 0; i < thread_count; i++) {
> >> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
> >> +
> >> +        qemu_mutex_lock(&p->mutex);
> >> +        while (!p->done) {
> >> +            p->sync = true;
> >> +            qemu_cond_wait(&p->cond_sync, &p->mutex);
> >
> > (similar comment like above)
> 
> We need to look at the two pieces of code at the same time.  What are we
> trying to do:
> 
> - making sure that all threads have finished the current round.
>   in this particular case, that this thread has finished its current
>   round OR  that it is waiting for work.
> 
> So, the main thread is the one that does the sem_wait(ready) and the channel
> thread is the one that does the sem_post(ready).
> 
> multifd_recv_thread()
> 
>     if (p->sync) {
>         sem_post(ready);
>         p->sync = false;
>     }
> 
> multifd_flush()
>    if (!p->done) {
>        p->sync = true;
>        sem_wait(ready);
>    }
> 
> Ah, but done and sync can be changed from other threads, so current code
> will become:
> 
> multifd_recv_thread()
> 
>     if (p->sync) {
>         sem_post(ready);
>         p->sync = false;
>     }
> 
> multifd_flush()
>    ...
>    mutex_lock(lock);
>    if (!p->done) {
>        p->sync = true;
>        mutex_unlock(lock)
>        sem_wait(ready);
>        mutex_lock(lock)
>    }
>    mutex_unlock(lock)
> 
> That I would claim that it is more complicated to understand.  Mixing
> locks and semaphores is ..... interesting to say the least.  With
> variable conditions it becomes easy.
> 
> Yes, we can change sync/done to atomic access, but not sure that makes
> things so much simpler.

I was thinking that p->ready can be used a notification channel from
recv thread to main thread for any reason. But I'm also fine that if
you want to do this separately to have different sync channels for
page-level completions and global flushes especially in first version.

(but I'd say I feel the whole thing slightly complicated, while I feel
 it can be simpler somewhere...)

> 
> >> +        }
> >> +        qemu_mutex_unlock(&p->mutex);
> >> +    }
> >> +    return 0;
> >> +}
> >> +
> >>  /**
> >>   * save_page_header: write page header to wire
> >>   *
> >> @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
> >>  {
> >>      size_t size, len;
> >>  
> >> +    if (rs->multifd_needs_flush &&
> >> +        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> >
> > If multifd_needs_flush is only for multifd, then we may skip this
> > check, but it looks more like an assertion:
> >
> >     if (rs->multifd_needs_flush) {
> >         assert(offset & RAM_SAVE_FLAG_MULTIFD_PAGE);
> >         offset |= RAM_SAVE_FLAG_ZERO;
> >     }
> 
> No, it could be that this page is a _non_ multifd page, and then ZERO
> means something different.  So, we can only send this for MULTIFD pages.

But if multifd_needs_flush==true, it must be a multifd page, no? :)

I think this is trivial, so both work for me.

> 
> > (Dave mentioned about unaligned flag used in commit message and here:
> >  ZERO is used, but COMPRESS is mentioned)
> 
> OK, I can change the message.
> 
> >> @@ -2496,6 +2540,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> >>  
> >>      if (!migration_in_postcopy()) {
> >>          migration_bitmap_sync(rs);
> >> +        if (migrate_use_multifd()) {
> >> +            rs->multifd_needs_flush = true;
> >> +        }
> >
> > Would it be good to move this block into entry of
> > migration_bitmap_sync(), instead of setting it up at the callers of
> > migration_bitmap_sync()?
> 
> We can't have all of it.
> 
> We call migration_bitmap_sync() in 4 places.
> - We don't need to set the flag for the 1st synchronization
> - We don't need to set it on postcopy (yet).

[1]

I see.

> 
> So, we can add code inside to check if we are on the 1st round, and
> forget about postcopy (we check in other place), or we maintain it this way.
> 
> So, change becomes:
> 
> modified   migration/ram.c
> @@ -1131,6 +1131,9 @@ static void migration_bitmap_sync(RAMState *rs)
>      if (migrate_use_events()) {
>          qapi_event_send_migration_pass(ram_counters.dirty_sync_count, NULL);
>      }
> +    if (rs->ram_bulk_stage && migrate_use_multifd()) {

Should this be "!rs->ram_bulk_stage && migrate_use_multifd()"?

> +        rs->multifd_needs_flush = true;
> +    }
>  }
>  
>  /**
> @@ -2533,9 +2536,6 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>  
>      if (!migration_in_postcopy()) {
>          migration_bitmap_sync(rs);
> -        if (migrate_use_multifd()) {
> -            rs->multifd_needs_flush = true;
> -        }
>      }
>  
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2578,9 +2578,6 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync(rs);
> -        if (migrate_use_multifd()) {
> -            rs->multifd_needs_flush = true;
> -        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
> 
> three less lines, you win.  We need to check in otherplace already that
> postcopy & multifd are not enabled at the same time.

I got the point. I would slightly prefer the new way to have only one
single place to set multifd_needs_flush (it would be nice to have some
comments like [1] there), but I'm also fine if you prefer the old one.

Thanks,
diff mbox

Patch

diff --git a/migration/ram.c b/migration/ram.c
index c78b286..bffe204 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -71,6 +71,12 @@ 
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
 
+/* We are getting low on pages flags, so we start using combinations
+   When we need to flush a page, we sent it as
+   RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE
+   We don't allow that combination
+*/
+
 static inline bool is_zero_range(uint8_t *p, uint64_t size)
 {
     return buffer_is_zero(p, size);
@@ -193,6 +199,9 @@  struct RAMState {
     uint64_t iterations_prev;
     /* Iterations since start */
     uint64_t iterations;
+    /* Indicates if we have synced the bitmap and we need to assure that
+       target has processeed all previous pages */
+    bool multifd_needs_flush;
     /* protects modification of the bitmap */
     uint64_t migration_dirty_pages;
     /* number of dirty bits in the bitmap */
@@ -363,7 +372,6 @@  static void compress_threads_save_setup(void)
 
 /* Multiple fd's */
 
-
 typedef struct {
     int num;
     int size;
@@ -595,9 +603,11 @@  struct MultiFDRecvParams {
     QIOChannel *c;
     QemuSemaphore ready;
     QemuSemaphore sem;
+    QemuCond cond_sync;
     QemuMutex mutex;
     /* proteced by param mutex */
     bool quit;
+    bool sync;
     multifd_pages_t pages;
     bool done;
 };
@@ -637,6 +647,7 @@  void multifd_load_cleanup(void)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_cond_destroy(&p->cond_sync);
         socket_recv_channel_destroy(p->c);
         g_free(p);
         multifd_recv_state->params[i] = NULL;
@@ -675,6 +686,10 @@  static void *multifd_recv_thread(void *opaque)
                 return NULL;
             }
             p->done = true;
+            if (p->sync) {
+                qemu_cond_signal(&p->cond_sync);
+                p->sync = false;
+            }
             qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->ready);
             continue;
@@ -724,9 +739,11 @@  gboolean multifd_new_channel(QIOChannel *ioc)
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
     qemu_sem_init(&p->ready, 0);
+    qemu_cond_init(&p->cond_sync);
     p->quit = false;
     p->id = id;
     p->done = false;
+    p->sync = false;
     multifd_init_group(&p->pages);
     p->c = ioc;
     atomic_set(&multifd_recv_state->params[id], p);
@@ -792,6 +809,27 @@  static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
     qemu_sem_post(&p->sem);
 }
 
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        MultiFDRecvParams *p = multifd_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        while (!p->done) {
+            p->sync = true;
+            qemu_cond_wait(&p->cond_sync, &p->mutex);
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    return 0;
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -809,6 +847,12 @@  static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
 {
     size_t size, len;
 
+    if (rs->multifd_needs_flush &&
+        (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) {
+        offset |= RAM_SAVE_FLAG_ZERO;
+        rs->multifd_needs_flush = false;
+    }
+
     if (block == rs->last_sent_block) {
         offset |= RAM_SAVE_FLAG_CONTINUE;
     }
@@ -2496,6 +2540,9 @@  static int ram_save_complete(QEMUFile *f, void *opaque)
 
     if (!migration_in_postcopy()) {
         migration_bitmap_sync(rs);
+        if (migrate_use_multifd()) {
+            rs->multifd_needs_flush = true;
+        }
     }
 
     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -2538,6 +2585,9 @@  static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
         qemu_mutex_lock_iothread();
         rcu_read_lock();
         migration_bitmap_sync(rs);
+        if (migrate_use_multifd()) {
+            rs->multifd_needs_flush = true;
+        }
         rcu_read_unlock();
         qemu_mutex_unlock_iothread();
         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
@@ -3012,6 +3062,11 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         }
 
+        if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO))
+                  == (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)) {
+            multifd_flush();
+            flags = flags & ~RAM_SAVE_FLAG_ZERO;
+        }
         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {