Message ID | 20190515121544.4597-7-quintela@redhat.com |
---|---|
State | New |
Headers | show |
Series | WIP: Multifd compression support | expand |
On Wed, May 15, 2019 at 02:15:42PM +0200, Juan Quintela wrote: >+ >+MultifdMethods multifd_none_ops = { >+ .send_prepare = none_send_prepare, >+ .send_write = none_send_write, >+ .recv_pages = none_recv_pages >+}; >+ > static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > { > MultiFDInit_t msg; >@@ -904,6 +938,8 @@ struct { > uint64_t packet_num; > /* send channels ready */ > QemuSemaphore channels_ready; >+ /* multifd ops */ >+ MultifdMethods *ops; > } *multifd_send_state; > > /* >@@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque) > /* initial packet */ > p->num_packets = 1; > >+ multifd_send_state->ops = &multifd_none_ops; >+ I am afraid it is not a good practice to assign ops when each thread starts work. > while (true) { > qemu_sem_wait(&p->sem); > qemu_mutex_lock(&p->mutex); >@@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque) > uint64_t packet_num = p->packet_num; > uint32_t flags = p->flags; > >- p->next_packet_size = used * qemu_target_page_size(); >+ if (used) { >+ ret = multifd_send_state->ops->send_prepare(p, used); >+ if (ret != 0) { >+ break; >+ } >+ } > multifd_send_fill_packet(p); > p->flags = 0; > p->num_packets++; >@@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque) > } > > if (used) { >- ret = qio_channel_writev_all(p->c, p->pages->iov, >- used, &local_err); >+ ret = multifd_send_state->ops->send_write(p, used, &local_err); > if (ret != 0) { > break; > } >@@ -1223,6 +1265,8 @@ struct { > QemuSemaphore sem_sync; > /* global number of generated multifd packets */ > uint64_t packet_num; >+ /* multifd ops */ >+ MultifdMethods *ops; > } *multifd_recv_state; > > static void multifd_recv_terminate_threads(Error *err) >@@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque) > trace_multifd_recv_thread_start(p->id); > rcu_register_thread(); > >+ multifd_recv_state->ops = &multifd_none_ops; same as here. > while (true) { > uint32_t used; > uint32_t flags; >@@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque) > qemu_mutex_unlock(&p->mutex); > > if (used) { >- ret = qio_channel_readv_all(p->c, p->pages->iov, >- used, &local_err); >+ ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); > if (ret != 0) { > break; > } >-- >2.21.0 >
* Juan Quintela (quintela@redhat.com) wrote: > It will be used later. 'none' is confusing - I think this is no-compression specifically - right? I'd be happy with something abbreviated like 'nocomp' > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++----- > 1 file changed, 49 insertions(+), 5 deletions(-) > > diff --git a/migration/ram.c b/migration/ram.c > index 1ca9ba77b6..6679e4f213 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -700,6 +700,40 @@ typedef struct { > QemuSemaphore sem_sync; > } MultiFDRecvParams; > > +typedef struct { > + /* Prepare the send packet */ > + int (*send_prepare)(MultiFDSendParams *p, uint32_t used); > + /* Write the send packet */ > + int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **perr); > + /* Read all pages */ > + int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **perr); > +} MultifdMethods; > + > +/* Multifd without compression */ > + > +static int none_send_prepare(MultiFDSendParams *p, uint32_t used) > +{ > + p->next_packet_size = used * qemu_target_page_size(); > + return 0; > +} > + > +static int none_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) > +{ > + return qio_channel_writev_all(p->c, p->pages->iov, used, perr); > +} > + > +static int none_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) > +{ > + return qio_channel_readv_all(p->c, p->pages->iov, used, perr); > + > +} > + > +MultifdMethods multifd_none_ops = { > + .send_prepare = none_send_prepare, > + .send_write = none_send_write, > + .recv_pages = none_recv_pages > +}; > + > static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > { > MultiFDInit_t msg; > @@ -904,6 +938,8 @@ struct { > uint64_t packet_num; > /* send channels ready */ > QemuSemaphore channels_ready; > + /* multifd ops */ > + MultifdMethods *ops; > } *multifd_send_state; > > /* > @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque) > /* initial packet */ > p->num_packets = 1; > > + multifd_send_state->ops = &multifd_none_ops; > + I agree with Wei Yang that is a bad idea; that should be done once before the first thread is started. Dave > while (true) { > qemu_sem_wait(&p->sem); > qemu_mutex_lock(&p->mutex); > @@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque) > uint64_t packet_num = p->packet_num; > uint32_t flags = p->flags; > > - p->next_packet_size = used * qemu_target_page_size(); > + if (used) { > + ret = multifd_send_state->ops->send_prepare(p, used); > + if (ret != 0) { > + break; > + } > + } > multifd_send_fill_packet(p); > p->flags = 0; > p->num_packets++; > @@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque) > } > > if (used) { > - ret = qio_channel_writev_all(p->c, p->pages->iov, > - used, &local_err); > + ret = multifd_send_state->ops->send_write(p, used, &local_err); > if (ret != 0) { > break; > } > @@ -1223,6 +1265,8 @@ struct { > QemuSemaphore sem_sync; > /* global number of generated multifd packets */ > uint64_t packet_num; > + /* multifd ops */ > + MultifdMethods *ops; > } *multifd_recv_state; > > static void multifd_recv_terminate_threads(Error *err) > @@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque) > trace_multifd_recv_thread_start(p->id); > rcu_register_thread(); > > + multifd_recv_state->ops = &multifd_none_ops; > while (true) { > uint32_t used; > uint32_t flags; > @@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque) > qemu_mutex_unlock(&p->mutex); > > if (used) { > - ret = qio_channel_readv_all(p->c, p->pages->iov, > - used, &local_err); > + ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); > if (ret != 0) { > break; > } > -- > 2.21.0 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Wei Yang <richardw.yang@linux.intel.com> wrote: > On Wed, May 15, 2019 at 02:15:42PM +0200, Juan Quintela wrote: >>+ >>+MultifdMethods multifd_none_ops = { >>+ .send_prepare = none_send_prepare, >>+ .send_write = none_send_write, >>+ .recv_pages = none_recv_pages >>+}; >>+ >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) >> { >> MultiFDInit_t msg; >>@@ -904,6 +938,8 @@ struct { >> uint64_t packet_num; >> /* send channels ready */ >> QemuSemaphore channels_ready; >>+ /* multifd ops */ >>+ MultifdMethods *ops; >> } *multifd_send_state; >> >> /* >>@@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque) >> /* initial packet */ >> p->num_packets = 1; >> >>+ multifd_send_state->ops = &multifd_none_ops; >>+ > > I am afraid it is not a good practice to assign ops when each thread starts > work. Agreed. Thanks, Juan.
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > * Juan Quintela (quintela@redhat.com) wrote: >> It will be used later. > > 'none' is confusing - I think this is no-compression specifically - > right? > I'd be happy with something abbreviated like 'nocomp' Got into nocomp. >> @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque) >> /* initial packet */ >> p->num_packets = 1; >> >> + multifd_send_state->ops = &multifd_none_ops; >> + > > I agree with Wei Yang that is a bad idea; that should be done once > before the first thread is started. Also fixed. Thanks, Juan.
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > * Juan Quintela (quintela@redhat.com) wrote: >> It will be used later. > > 'none' is confusing - I think this is no-compression specifically - > right? > I'd be happy with something abbreviated like 'nocomp' I don't care too much, I can change, but when you are setting the value it gets: micgration set-parameter compression none That looks ok. On the other hand, I can agree that I can call the functions nocomp. Thanks, Juan. > >> Signed-off-by: Juan Quintela <quintela@redhat.com> >> --- >> migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++----- >> 1 file changed, 49 insertions(+), 5 deletions(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index 1ca9ba77b6..6679e4f213 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -700,6 +700,40 @@ typedef struct { >> QemuSemaphore sem_sync; >> } MultiFDRecvParams; >> >> +typedef struct { >> + /* Prepare the send packet */ >> + int (*send_prepare)(MultiFDSendParams *p, uint32_t used); >> + /* Write the send packet */ >> + int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **perr); >> + /* Read all pages */ >> + int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **perr); >> +} MultifdMethods; >> + >> +/* Multifd without compression */ >> + >> +static int none_send_prepare(MultiFDSendParams *p, uint32_t used) >> +{ >> + p->next_packet_size = used * qemu_target_page_size(); >> + return 0; >> +} >> + >> +static int none_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) >> +{ >> + return qio_channel_writev_all(p->c, p->pages->iov, used, perr); >> +} >> + >> +static int none_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) >> +{ >> + return qio_channel_readv_all(p->c, p->pages->iov, used, perr); >> + >> +} >> + >> +MultifdMethods multifd_none_ops = { >> + .send_prepare = none_send_prepare, >> + .send_write = none_send_write, >> + .recv_pages = none_recv_pages >> +}; >> + >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) >> { >> MultiFDInit_t msg; >> @@ -904,6 +938,8 @@ struct { >> uint64_t packet_num; >> /* send channels ready */ >> QemuSemaphore channels_ready; >> + /* multifd ops */ >> + MultifdMethods *ops; >> } *multifd_send_state; >> >> /* >> @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque) >> /* initial packet */ >> p->num_packets = 1; >> >> + multifd_send_state->ops = &multifd_none_ops; >> + > > I agree with Wei Yang that is a bad idea; that should be done once > before the first thread is started. > > Dave > >> while (true) { >> qemu_sem_wait(&p->sem); >> qemu_mutex_lock(&p->mutex); >> @@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque) >> uint64_t packet_num = p->packet_num; >> uint32_t flags = p->flags; >> >> - p->next_packet_size = used * qemu_target_page_size(); >> + if (used) { >> + ret = multifd_send_state->ops->send_prepare(p, used); >> + if (ret != 0) { >> + break; >> + } >> + } >> multifd_send_fill_packet(p); >> p->flags = 0; >> p->num_packets++; >> @@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque) >> } >> >> if (used) { >> - ret = qio_channel_writev_all(p->c, p->pages->iov, >> - used, &local_err); >> + ret = multifd_send_state->ops->send_write(p, used, &local_err); >> if (ret != 0) { >> break; >> } >> @@ -1223,6 +1265,8 @@ struct { >> QemuSemaphore sem_sync; >> /* global number of generated multifd packets */ >> uint64_t packet_num; >> + /* multifd ops */ >> + MultifdMethods *ops; >> } *multifd_recv_state; >> >> static void multifd_recv_terminate_threads(Error *err) >> @@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque) >> trace_multifd_recv_thread_start(p->id); >> rcu_register_thread(); >> >> + multifd_recv_state->ops = &multifd_none_ops; >> while (true) { >> uint32_t used; >> uint32_t flags; >> @@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque) >> qemu_mutex_unlock(&p->mutex); >> >> if (used) { >> - ret = qio_channel_readv_all(p->c, p->pages->iov, >> - used, &local_err); >> + ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); >> if (ret != 0) { >> break; >> } >> -- >> 2.21.0 >> > -- > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/migration/ram.c b/migration/ram.c index 1ca9ba77b6..6679e4f213 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -700,6 +700,40 @@ typedef struct { QemuSemaphore sem_sync; } MultiFDRecvParams; +typedef struct { + /* Prepare the send packet */ + int (*send_prepare)(MultiFDSendParams *p, uint32_t used); + /* Write the send packet */ + int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **perr); + /* Read all pages */ + int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **perr); +} MultifdMethods; + +/* Multifd without compression */ + +static int none_send_prepare(MultiFDSendParams *p, uint32_t used) +{ + p->next_packet_size = used * qemu_target_page_size(); + return 0; +} + +static int none_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) +{ + return qio_channel_writev_all(p->c, p->pages->iov, used, perr); +} + +static int none_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) +{ + return qio_channel_readv_all(p->c, p->pages->iov, used, perr); + +} + +MultifdMethods multifd_none_ops = { + .send_prepare = none_send_prepare, + .send_write = none_send_write, + .recv_pages = none_recv_pages +}; + static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) { MultiFDInit_t msg; @@ -904,6 +938,8 @@ struct { uint64_t packet_num; /* send channels ready */ QemuSemaphore channels_ready; + /* multifd ops */ + MultifdMethods *ops; } *multifd_send_state; /* @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque) /* initial packet */ p->num_packets = 1; + multifd_send_state->ops = &multifd_none_ops; + while (true) { qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); @@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque) uint64_t packet_num = p->packet_num; uint32_t flags = p->flags; - p->next_packet_size = used * qemu_target_page_size(); + if (used) { + ret = multifd_send_state->ops->send_prepare(p, used); + if (ret != 0) { + break; + } + } multifd_send_fill_packet(p); p->flags = 0; p->num_packets++; @@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque) } if (used) { - ret = qio_channel_writev_all(p->c, p->pages->iov, - used, &local_err); + ret = multifd_send_state->ops->send_write(p, used, &local_err); if (ret != 0) { break; } @@ -1223,6 +1265,8 @@ struct { QemuSemaphore sem_sync; /* global number of generated multifd packets */ uint64_t packet_num; + /* multifd ops */ + MultifdMethods *ops; } *multifd_recv_state; static void multifd_recv_terminate_threads(Error *err) @@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); rcu_register_thread(); + multifd_recv_state->ops = &multifd_none_ops; while (true) { uint32_t used; uint32_t flags; @@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque) qemu_mutex_unlock(&p->mutex); if (used) { - ret = qio_channel_readv_all(p->c, p->pages->iov, - used, &local_err); + ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); if (ret != 0) { break; }
It will be used later. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-)