diff mbox

[v5,08/12] migration: Add the core code of multi-thread compression

Message ID 1423623986-590-9-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z Feb. 11, 2015, 3:06 a.m. UTC
Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 193 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 185 insertions(+), 8 deletions(-)

Comments

Juan Quintela Feb. 11, 2015, 11:44 a.m. UTC | #1
Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>


> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock;
>  static QemuCond *comp_done_cond;
>  /* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
> +
> +/* one_byte_count is used to count the bytes that is added to
> + * bytes_transferred but not actually transferred, at the proper
> + * time, we should sub one_byte_count from bytes_transferred to
> + * make bytes_transferred accurate.
> + */
> +static int one_byte_count;

With the changes proposed previously to ram_save_compressed_page() this
shouldn't be needed.  It can return 0 now.

> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> -
> -    /* To be done */
> +    CompressParam *param = opaque;
>  
> +    while (!quit_comp_thread) {
> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
> +        while (!param->busy && !quit_comp_thread) {
> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        qemu_mutex_unlock(&param->mutex);
> +        if (!quit_comp_thread) {
> +            do_compress_ram_page(param);
> +        }
> +        qemu_mutex_lock(comp_done_lock);
> +        param->busy = false;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }
>  
>      return NULL;
> @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;
> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(MigrationState *s)
> @@ -770,12 +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, cont;
> +    int blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->busy = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (comp_param[idx].busy) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (comp_param[idx].busy && !quit_comp_thread) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }

If we arrive here because quit_comp_thread == true, shouldn't we skip
the qemu_put_qemu_file()?

> +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> +        bytes_transferred += len;
> +    }

[remove one_byte stuff here]

> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset)
> +{
> +    int idx, thread_count, bytes_sent = 0;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!comp_param[idx].busy) {
> +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);

[remove stuff here]

> +                break;
> +            }
> +        }
> +        if (bytes_sent > 0) {

Change this to:
          if (bytes_sent >= 0) {

> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return bytes_sent;
> +}
> +
>  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      ram_addr_t offset, bool last_stage)
>  {
>      int bytes_sent = -1;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +    int cont;
>  
> -    /* To be done*/
> +    p = memory_region_get_ram_ptr(mr) + offset;
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_sent);
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_sent == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> +            if (bytes_sent == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_sent = do_compress_ram_page(&comp_param[0]);
> +                if (bytes_sent > 0) {

This test is not needed

assert(bytes_sent>0)

or how can it be zero or negative here?  So, we have to always call
qemu_put_qemu_file() no?

> +                    qemu_put_qemu_file(f, comp_param[0].file);
> +                }
> +            }
> +        } else {
> +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> +            if (bytes_sent == -1) {
> +                bytes_sent = compress_page_with_multi_thread(f, block, offset);
> +            }
> +        }
> +    }
>  
>      return bytes_sent;
>  }
> @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1043,6 +1218,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();


I thihnk this would make the code work, but not the locking.  You are
using here:

quit_comp_thread:  global, and not completely clear what protects it
comp_done_lock: global
comp_done_cond: global

param[i].busy: I would suggest renaming to pending work
param[i].mutex:
param[i].cond:
       thread is waiting for work


Issues:

param->busy is protected on do_data_compress() and start_compression()
with param->busy, but in flush_compressed_data() and
comress_page_with_multithread() it is protected by
comp_done_lock.

At this point, I would suggest to just drop param[i].mutex and use
everywhere comp_done_lock.  We can make locking granularly later if
needed, but 1st get it correct?

Code basically does (forget termination and locking)

each compression_thread()

  while(1) {
     while(!work_to_do)
        wait_for_work
     do_work
  }

And the main thread does:


while(1) {
     foreacth compression_thread {
          if thread free {
             put it to work
             break;
          }
          wait_for_thread_to_finish
     }
}

Notice how we are walking all threads each time that we need to do anything

Perhaps code should be more simple if we put the data that needs to be
done on a global variable and change this to:

compression_thread

  while(1) {
     while(!work_to_do)
        wait_for_work
     pick work from global variable
     wakeup main thread
     do_work
  }

main thread:

put work on global variable
while(nobody_pick_thework) {
     signal all threads
     wait for a compression thread to take the work
}

Why?  because then we only have a global mutex and two condition
variables, with a clear semantics:
- lock protects two conditions and global variable with work
- one condition where threads wait for work
- one condition where main thread wait for a worker to be ready

As we would need to lock every single tame to put the work in the global
variable, to wait or to pick up the work, we can stop all the:

if (!foo) {
    mutex_lock
    if(!foo) /* this time with lock */
        ....
}


Sorry for the very long mail, if it makes you feel better, this is the
second time that I wrote it, because the 1st version, my locking
proposal didn't worked correctly.

What do you think?

Later, Juan.
Li, Liang Z Feb. 12, 2015, 7:43 a.m. UTC | #2
> -----Original Message-----
> From: Juan Quintela [mailto:quintela@redhat.com]
> Sent: Wednesday, February 11, 2015 7:45 PM
> To: Li, Liang Z
> Cc: qemu-devel@nongnu.org; eblake@redhat.com; amit.shah@redhat.com;
> lcapitulino@redhat.com; armbru@redhat.com; dgilbert@redhat.com; Zhang,
> Yang Z
> Subject: Re: [v5 08/12] migration: Add the core code of multi-thread
> compression
> 
> Liang Li <liang.z.li@intel.com> wrote:
> > Implement the core logic of the multiple thread compression. At this
> > point, multiple thread compression can't co-work with xbzrle yet.
> >
> > Signed-off-by: Liang Li <liang.z.li@intel.com>
> > Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> 
> 
> > --- a/arch_init.c
> > +++ b/arch_init.c
> > @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock;  static
> > QemuCond *comp_done_cond;
> >  /* The empty QEMUFileOps will be used by file in CompressParam */
> > static const QEMUFileOps empty_ops = { };
> > +
> > +/* one_byte_count is used to count the bytes that is added to
> > + * bytes_transferred but not actually transferred, at the proper
> > + * time, we should sub one_byte_count from bytes_transferred to
> > + * make bytes_transferred accurate.
> > + */
> > +static int one_byte_count;
> 
> With the changes proposed previously to ram_save_compressed_page() this
> shouldn't be needed.  It can return 0 now.
> 
> > +static int do_compress_ram_page(CompressParam *param);
> > +
> >  static void *do_data_compress(void *opaque)  {
> > -    while (!quit_comp_thread) {
> > -
> > -    /* To be done */
> > +    CompressParam *param = opaque;
> >
> > +    while (!quit_comp_thread) {
> > +        qemu_mutex_lock(&param->mutex);
> > +        /* Re-check the quit_comp_thread in case of
> > +         * terminate_compression_threads is called just before
> > +         * qemu_mutex_lock(&param->mutex) and after
> > +         * while(!quit_comp_thread), re-check it here can make
> > +         * sure the compression thread terminate as expected.
> > +         */
> > +        while (!param->busy && !quit_comp_thread) {
> > +            qemu_cond_wait(&param->cond, &param->mutex);
> > +        }
> > +        qemu_mutex_unlock(&param->mutex);
> > +        if (!quit_comp_thread) {
> > +            do_compress_ram_page(param);
> > +        }
> > +        qemu_mutex_lock(comp_done_lock);
> > +        param->busy = false;
> > +        qemu_cond_signal(comp_done_cond);
> > +        qemu_mutex_unlock(comp_done_lock);
> >      }
> >
> >      return NULL;
> > @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
> >
> >  static inline void terminate_compression_threads(void)
> >  {
> > -    quit_comp_thread = true;
> > +    int idx, thread_count;
> >
> > -    /* To be done */
> > +    thread_count = migrate_compress_threads();
> > +    quit_comp_thread = true;
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        qemu_mutex_lock(&comp_param[idx].mutex);
> > +        qemu_cond_signal(&comp_param[idx].cond);
> > +        qemu_mutex_unlock(&comp_param[idx].mutex);
> > +    }
> >  }
> >
> >  void migrate_compress_threads_join(MigrationState *s) @@ -770,12
> > +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block,
> ram_addr_t offset,
> >      return bytes_sent;
> >  }
> >
> > +static int do_compress_ram_page(CompressParam *param) {
> > +    int bytes_sent, cont;
> > +    int blen;
> > +    uint8_t *p;
> > +    RAMBlock *block = param->block;
> > +    ram_addr_t offset = param->offset;
> > +
> > +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > +    p = memory_region_get_ram_ptr(block->mr) + offset;
> > +
> > +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> > +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> > +    blen = qemu_put_compression_data(param->file, p,
> TARGET_PAGE_SIZE,
> > +                                     migrate_compress_level());
> > +    bytes_sent += blen;
> > +    atomic_inc(&acct_info.norm_pages);
> > +
> > +    return bytes_sent;
> > +}
> > +
> > +static inline void start_compression(CompressParam *param) {
> > +    qemu_mutex_lock(&param->mutex);
> > +    param->busy = true;
> > +    qemu_cond_signal(&param->cond);
> > +    qemu_mutex_unlock(&param->mutex); }
> > +
> > +
> > +static uint64_t bytes_transferred;
> > +
> > +static void flush_compressed_data(QEMUFile *f) {
> > +    int idx, len, thread_count;
> > +
> > +    if (!migrate_use_compression()) {
> > +        return;
> > +    }
> > +    thread_count = migrate_compress_threads();
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        if (comp_param[idx].busy) {
> > +            qemu_mutex_lock(comp_done_lock);
> > +            while (comp_param[idx].busy && !quit_comp_thread) {
> > +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> > +            }
> > +            qemu_mutex_unlock(comp_done_lock);
> > +        }
> 
> If we arrive here because quit_comp_thread == true, shouldn't we skip the
> qemu_put_qemu_file()?
> 
> > +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> > +        bytes_transferred += len;
> > +    }
> 
> [remove one_byte stuff here]
> 
> > +}
> > +
> > +static inline void set_compress_params(CompressParam *param,
> RAMBlock *block,
> > +                                       ram_addr_t offset) {
> > +    param->block = block;
> > +    param->offset = offset;
> > +}
> > +
> > +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock
> *block,
> > +                                           ram_addr_t offset) {
> > +    int idx, thread_count, bytes_sent = 0;
> > +
> > +    thread_count = migrate_compress_threads();
> > +    qemu_mutex_lock(comp_done_lock);
> > +    while (true) {
> > +        for (idx = 0; idx < thread_count; idx++) {
> > +            if (!comp_param[idx].busy) {
> > +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> > +                set_compress_params(&comp_param[idx], block, offset);
> > +                start_compression(&comp_param[idx]);
> 
> [remove stuff here]
> 
> > +                break;
> > +            }
> > +        }
> > +        if (bytes_sent > 0) {
> 
> Change this to:
>           if (bytes_sent >= 0) {
> 
> > +            break;
> > +        } else {
> > +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> > +        }
> > +    }
> > +    qemu_mutex_unlock(comp_done_lock);
> > +
> > +    return bytes_sent;
> > +}
> > +
> >  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
> >                                      ram_addr_t offset, bool
> > last_stage)  {
> >      int bytes_sent = -1;
> > +    MemoryRegion *mr = block->mr;
> > +    uint8_t *p;
> > +    int ret;
> > +    int cont;
> >
> > -    /* To be done*/
> > +    p = memory_region_get_ram_ptr(mr) + offset;
> > +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > +    ret = ram_control_save_page(f, block->offset,
> > +                                offset, TARGET_PAGE_SIZE, &bytes_sent);
> > +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> > +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> > +            if (bytes_sent > 0) {
> > +                acct_info.norm_pages++;
> > +            } else if (bytes_sent == 0) {
> > +                acct_info.dup_pages++;
> > +            }
> > +        }
> > +    } else {
> > +        /* When starting the process of a new block, the first page of
> > +         * the block should be sent out before other pages in the same
> > +         * block, and all the pages in last block should have been sent
> > +         * out, keeping this order is important, because the 'cont' flag
> > +         * is used to avoid resending the block name.
> > +         */
> > +        if (block != last_sent_block) {
> > +            flush_compressed_data(f);
> > +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> > +            if (bytes_sent == -1) {
> > +                set_compress_params(&comp_param[0], block, offset);
> > +                /* Use the qemu thread to compress the data to make sure the
> > +                 * first page is sent out before other pages
> > +                 */
> > +                bytes_sent = do_compress_ram_page(&comp_param[0]);
> > +                if (bytes_sent > 0) {
> 
> This test is not needed
> 
> assert(bytes_sent>0)
> 
> or how can it be zero or negative here?  So, we have to always call
> qemu_put_qemu_file() no?
> 
> > +                    qemu_put_qemu_file(f, comp_param[0].file);
> > +                }
> > +            }
> > +        } else {
> > +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> > +            if (bytes_sent == -1) {
> > +                bytes_sent = compress_page_with_multi_thread(f, block, offset);
> > +            }
> > +        }
> > +    }
> >
> >      return bytes_sent;
> >  }
> > @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f,
> bool last_stage)
> >      return bytes_sent;
> >  }
> >
> > -static uint64_t bytes_transferred;
> > -
> >  void acct_update_position(QEMUFile *f, size_t size, bool zero)  {
> >      uint64_t pages = size / TARGET_PAGE_SIZE; @@ -1043,6 +1218,7 @@
> > static int ram_save_iterate(QEMUFile *f, void *opaque)
> >          i++;
> >      }
> >
> > +    flush_compressed_data(f);
> >      qemu_mutex_unlock_ramlist();
> >
> >      /*
> > @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void
> *opaque)
> >          bytes_transferred += bytes_sent;
> >      }
> >
> > +    flush_compressed_data(f);
> >      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> >      migration_end();
> 
> 
> I thihnk this would make the code work, but not the locking.  You are using
> here:
> 
> quit_comp_thread:  global, and not completely clear what protects it
> comp_done_lock: global
> comp_done_cond: global
> 
> param[i].busy: I would suggest renaming to pending work
> param[i].mutex:
> param[i].cond:
>        thread is waiting for work
> 
> 
> Issues:
> 
> param->busy is protected on do_data_compress() and start_compression()
> with param->busy, but in flush_compressed_data() and
> comress_page_with_multithread() it is protected by comp_done_lock.
> 
> At this point, I would suggest to just drop param[i].mutex and use
> everywhere comp_done_lock.  We can make locking granularly later if
> needed, but 1st get it correct?
> 
> Code basically does (forget termination and locking)
> 
> each compression_thread()
> 
>   while(1) {
>      while(!work_to_do)
>         wait_for_work
>      do_work
>   }
> 
> And the main thread does:
> 
> 
> while(1) {
>      foreacth compression_thread {
>           if thread free {
>              put it to work
>              break;
>           }
>           wait_for_thread_to_finish
>      }
> }
> 
> Notice how we are walking all threads each time that we need to do anything
> 
> Perhaps code should be more simple if we put the data that needs to be
> done on a global variable and change this to:
> 
> compression_thread
> 
>   while(1) {
>      while(!work_to_do)
>         wait_for_work
>      pick work from global variable
>      wakeup main thread
>      do_work
>   }
> 
> main thread:
> 
> put work on global variable
> while(nobody_pick_thework) {
>      signal all threads
>      wait for a compression thread to take the work }
> 
> Why?  because then we only have a global mutex and two condition variables,
> with a clear semantics:
> - lock protects two conditions and global variable with work
> - one condition where threads wait for work
> - one condition where main thread wait for a worker to be ready
> 
> As we would need to lock every single tame to put the work in the global
> variable, to wait or to pick up the work, we can stop all the:
> 
> if (!foo) {
>     mutex_lock
>     if(!foo) /* this time with lock */
>         ....
> }
> 
> 
> Sorry for the very long mail, if it makes you feel better, this is the second
> time that I wrote it, because the 1st version, my locking proposal didn't
> worked correctly.
> 
> What do you think?

It sounds good, I will try according to your suggestion.  Thanks for your detail explanation :)

Liang
Li, Liang Z March 2, 2015, 2:46 a.m. UTC | #3
> I thihnk this would make the code work, but not the locking.  You are using
> here:
> 
> quit_comp_thread:  global, and not completely clear what protects it
> comp_done_lock: global
> comp_done_cond: global
> 
> param[i].busy: I would suggest renaming to pending work
> param[i].mutex:
> param[i].cond:
>        thread is waiting for work
> 
> 
> Issues:
> 
> param->busy is protected on do_data_compress() and start_compression()
> with param->busy, but in flush_compressed_data() and
> comress_page_with_multithread() it is protected by comp_done_lock.
> 
> At this point, I would suggest to just drop param[i].mutex and use
> everywhere comp_done_lock.  We can make locking granularly later if
> needed, but 1st get it correct?
> Code basically does (forget termination and locking)
> 
> each compression_thread()
> 
>   while(1) {
>      while(!work_to_do)
>         wait_for_work
>      do_work
>   }
> 
> And the main thread does:
> 
> 
> while(1) {
>      foreacth compression_thread {
>           if thread free {
>              put it to work
>              break;
>           }
>           wait_for_thread_to_finish
>      }
> }
> 
> Notice how we are walking all threads each time that we need to do anything
> 
> Perhaps code should be more simple if we put the data that needs to be
> done on a global variable and change this to:
> 
> compression_thread
> 
>   while(1) {
>      while(!work_to_do)
>         wait_for_work
>      pick work from global variable
>      wakeup main thread
>      do_work
>   }
> 
> main thread:
> 
> put work on global variable
> while(nobody_pick_thework) {
>      signal all threads
>      wait for a compression thread to take the work }
> 
> Why?  because then we only have a global mutex and two condition variables,
> with a clear semantics:
> - lock protects two conditions and global variable with work
> - one condition where threads wait for work
> - one condition where main thread wait for a worker to be ready
> 
> As we would need to lock every single tame to put the work in the global
> variable, to wait or to pick up the work, we can stop all the:
> 
> if (!foo) {
>     mutex_lock
>     if(!foo) /* this time with lock */
>         ....
> }
> 
> 
> Sorry for the very long mail, if it makes you feel better, this is the second
> time that I wrote it, because the 1st version, my locking proposal didn't
> worked correctly.
> 
> What do you think?

I have tried to use comp_done_lock everywhere instead of param[i].mutex and found
that the performance is poor, the total migration time increase about 5 times.

So I add another variable to make the code correct and remain the param[i].mutex and the
logic of the compression thread and main thread.

Add another lock will drop the performance and increase total migration time ... 

Liang

> Later, Juan.
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index fe062db..17b7f15 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -363,18 +363,44 @@  static QemuMutex *comp_done_lock;
 static QemuCond *comp_done_cond;
 /* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
+
+/* one_byte_count is used to count the bytes that is added to
+ * bytes_transferred but not actually transferred, at the proper
+ * time, we should sub one_byte_count from bytes_transferred to
+ * make bytes_transferred accurate.
+ */
+static int one_byte_count;
 static bool quit_comp_thread;
 static bool quit_decomp_thread;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
 static void *do_data_compress(void *opaque)
 {
-    while (!quit_comp_thread) {
-
-    /* To be done */
+    CompressParam *param = opaque;
 
+    while (!quit_comp_thread) {
+        qemu_mutex_lock(&param->mutex);
+        /* Re-check the quit_comp_thread in case of
+         * terminate_compression_threads is called just before
+         * qemu_mutex_lock(&param->mutex) and after
+         * while(!quit_comp_thread), re-check it here can make
+         * sure the compression thread terminate as expected.
+         */
+        while (!param->busy && !quit_comp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        qemu_mutex_unlock(&param->mutex);
+        if (!quit_comp_thread) {
+            do_compress_ram_page(param);
+        }
+        qemu_mutex_lock(comp_done_lock);
+        param->busy = false;
+        qemu_cond_signal(comp_done_cond);
+        qemu_mutex_unlock(comp_done_lock);
     }
 
     return NULL;
@@ -382,9 +408,15 @@  static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    quit_comp_thread = true;
+    int idx, thread_count;
 
-    /* To be done */
+    thread_count = migrate_compress_threads();
+    quit_comp_thread = true;
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(&comp_param[idx].mutex);
+        qemu_cond_signal(&comp_param[idx].cond);
+        qemu_mutex_unlock(&comp_param[idx].mutex);
+    }
 }
 
 void migrate_compress_threads_join(MigrationState *s)
@@ -770,12 +802,157 @@  static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, cont;
+    int blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    p = memory_region_get_ram_ptr(block->mr) + offset;
+
+    bytes_sent = save_block_hdr(param->file, block, offset, cont,
+                                RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->busy = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (comp_param[idx].busy) {
+            qemu_mutex_lock(comp_done_lock);
+            while (comp_param[idx].busy && !quit_comp_thread) {
+                qemu_cond_wait(comp_done_cond, comp_done_lock);
+            }
+            qemu_mutex_unlock(comp_done_lock);
+        }
+        len = qemu_put_qemu_file(f, comp_param[idx].file);
+        bytes_transferred += len;
+    }
+    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
+        bytes_transferred -= one_byte_count;
+        one_byte_count = 0;
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset)
+{
+    int idx, thread_count, bytes_sent = 0;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!comp_param[idx].busy) {
+                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                if (bytes_sent == 0) {
+                    /* set bytes_sent to 1 in this case to prevent migration
+                     * from terminating, this 1 byte will be added to
+                     * bytes_transferred later, minus 1 to keep the
+                     * bytes_transferred accurate */
+                    bytes_sent = 1;
+                    if (bytes_transferred <= 0) {
+                        one_byte_count++;
+                    } else {
+                        bytes_transferred -= 1;
+                    }
+                }
+                break;
+            }
+        }
+        if (bytes_sent > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return bytes_sent;
+}
+
 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
                                     ram_addr_t offset, bool last_stage)
 {
     int bytes_sent = -1;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
+    int ret;
+    int cont;
 
-    /* To be done*/
+    p = memory_region_get_ram_ptr(mr) + offset;
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_sent);
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_sent > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_sent == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else {
+        /* When starting the process of a new block, the first page of
+         * the block should be sent out before other pages in the same
+         * block, and all the pages in last block should have been sent
+         * out, keeping this order is important, because the 'cont' flag
+         * is used to avoid resending the block name.
+         */
+        if (block != last_sent_block) {
+            flush_compressed_data(f);
+            bytes_sent = save_zero_page(f, block, offset, p, cont);
+            if (bytes_sent == -1) {
+                set_compress_params(&comp_param[0], block, offset);
+                /* Use the qemu thread to compress the data to make sure the
+                 * first page is sent out before other pages
+                 */
+                bytes_sent = do_compress_ram_page(&comp_param[0]);
+                if (bytes_sent > 0) {
+                    qemu_put_qemu_file(f, comp_param[0].file);
+                }
+            }
+        } else {
+            bytes_sent = save_zero_page(f, block, offset, p, cont);
+            if (bytes_sent == -1) {
+                bytes_sent = compress_page_with_multi_thread(f, block, offset);
+            }
+        }
+    }
 
     return bytes_sent;
 }
@@ -834,8 +1011,6 @@  static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1043,6 +1218,7 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -1089,6 +1265,7 @@  static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();