Message ID | 20190515121544.4597-9-quintela@redhat.com |
---|---|
State | New |
Headers | show |
Series | WIP: Multifd compression support | expand |
On Wed, May 15, 2019 at 02:15:44PM +0200, Juan Quintela wrote: >This is still a work in progress, but get everything sent as expected >and it is faster than the code that is already there. Generally, I prefer to merge this one with previous one. > >Signed-off-by: Juan Quintela <quintela@redhat.com> >--- > migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 104 insertions(+), 2 deletions(-) > >diff --git a/migration/ram.c b/migration/ram.c >index fdb5bf07a5..efbb253c1a 100644 >--- a/migration/ram.c >+++ b/migration/ram.c >@@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = { > .recv_pages = none_recv_pages > }; > >+/* Multifd zlib compression */ >+ >+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used) >+{ >+ struct iovec *iov = p->pages->iov; >+ z_stream *zs = &p->zs; >+ uint32_t out_size = 0; >+ int ret; >+ int i; >+ >+ for (i = 0; i < used; i++) { >+ uint32_t available = p->zbuff_len - out_size; >+ int flush = Z_NO_FLUSH; >+ >+ if (i == used - 1) { >+ flush = Z_SYNC_FLUSH; >+ } >+ >+ zs->avail_in = iov[i].iov_len; >+ zs->next_in = iov[i].iov_base; >+ >+ zs->avail_out = available; >+ zs->next_out = p->zbuff + out_size; >+ >+ ret = deflate(zs, flush); >+ if (ret != Z_OK) { >+ printf("problem with deflate? %d\n", ret); >+ qemu_mutex_unlock(&p->mutex); >+ return -1; >+ } >+ out_size += available - zs->avail_out; >+ } >+ p->next_packet_size = out_size; >+ >+ return 0; >+} >+ >+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) >+{ >+ return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size, >+ perr); >+} >+ >+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) >+{ >+ uint32_t in_size = p->next_packet_size; >+ uint32_t out_size = 0; >+ uint32_t expected_size = used * qemu_target_page_size(); >+ z_stream *zs = &p->zs; >+ int ret; >+ int i; >+ >+ ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr); >+ >+ if (ret != 0) { >+ return ret; >+ } >+ >+ zs->avail_in = in_size; >+ zs->next_in = p->zbuff; >+ >+ for (i = 0; i < used; i++) { >+ struct iovec *iov = &p->pages->iov[i]; >+ int flush = Z_NO_FLUSH; >+ >+ if (i == used - 1) { >+ flush = Z_SYNC_FLUSH; >+ } >+ >+ zs->avail_out = iov->iov_len; >+ zs->next_out = iov->iov_base; >+ >+ ret = inflate(zs, flush); >+ if (ret != Z_OK) { >+ printf("%d: problem with inflate? %d\n", p->id, ret); >+ qemu_mutex_unlock(&p->mutex); >+ return ret; >+ } >+ out_size += iov->iov_len; >+ } >+ if (out_size != expected_size) { >+ printf("out size %d expected size %d\n", >+ out_size, expected_size); >+ return -1; >+ } >+ return 0; >+} >+ >+MultifdMethods multifd_zlib_ops = { >+ .send_prepare = zlib_send_prepare, >+ .send_write = zlib_send_write, >+ .recv_pages = zlib_recv_pages >+}; >+ > static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > { > MultiFDInit_t msg; >@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque) > /* initial packet */ > p->num_packets = 1; > >- multifd_send_state->ops = &multifd_none_ops; >+ if (migrate_use_multifd_zlib()) { >+ multifd_send_state->ops = &multifd_zlib_ops; >+ } else { >+ multifd_send_state->ops = &multifd_none_ops; >+ } Again, to manipulate a global variable in each thread is not a good idea. This would be better to use an array to assign ops instead of *if*. In case you would have several compress methods, the code would be difficult to read. > > while (true) { > qemu_sem_wait(&p->sem); >@@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque) > trace_multifd_recv_thread_start(p->id); > rcu_register_thread(); > >- multifd_recv_state->ops = &multifd_none_ops; >+ if (migrate_use_multifd_zlib()) { >+ multifd_recv_state->ops = &multifd_zlib_ops; >+ } else { >+ multifd_recv_state->ops = &multifd_none_ops; >+ } > while (true) { > uint32_t used; > uint32_t flags; >-- >2.21.0 >
* Juan Quintela (quintela@redhat.com) wrote: > This is still a work in progress, but get everything sent as expected > and it is faster than the code that is already there. > > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 104 insertions(+), 2 deletions(-) > > diff --git a/migration/ram.c b/migration/ram.c > index fdb5bf07a5..efbb253c1a 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = { > .recv_pages = none_recv_pages > }; > > +/* Multifd zlib compression */ > + Comment the return value? > +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used) > +{ > + struct iovec *iov = p->pages->iov; > + z_stream *zs = &p->zs; > + uint32_t out_size = 0; > + int ret; > + int i; uint32_t to match 'used' ? > + for (i = 0; i < used; i++) { > + uint32_t available = p->zbuff_len - out_size; > + int flush = Z_NO_FLUSH; > + > + if (i == used - 1) { > + flush = Z_SYNC_FLUSH; > + } > + > + zs->avail_in = iov[i].iov_len; > + zs->next_in = iov[i].iov_base; > + > + zs->avail_out = available; > + zs->next_out = p->zbuff + out_size; > + > + ret = deflate(zs, flush); > + if (ret != Z_OK) { > + printf("problem with deflate? %d\n", ret); If it's an error it should probably be at least an fprintf(stderr or err_ something. Should this also check that the avail_in/next_in has consumed the whole of the input? > + qemu_mutex_unlock(&p->mutex); Can you explain and/or comment whyit's unlocked here in the error path? > + return -1; > + } > + out_size += available - zs->avail_out; > + } > + p->next_packet_size = out_size; Some traces_ wouldn't hurt. > + return 0; > +} > + > +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) > +{ > + return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size, > + perr); > +} > + > +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) > +{ > + uint32_t in_size = p->next_packet_size; > + uint32_t out_size = 0; > + uint32_t expected_size = used * qemu_target_page_size(); > + z_stream *zs = &p->zs; > + int ret; > + int i; > + > + ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr); > + > + if (ret != 0) { > + return ret; > + } > + > + zs->avail_in = in_size; > + zs->next_in = p->zbuff; > + > + for (i = 0; i < used; i++) { > + struct iovec *iov = &p->pages->iov[i]; > + int flush = Z_NO_FLUSH; > + > + if (i == used - 1) { > + flush = Z_SYNC_FLUSH; > + } > + > + zs->avail_out = iov->iov_len; > + zs->next_out = iov->iov_base; > + > + ret = inflate(zs, flush); > + if (ret != Z_OK) { > + printf("%d: problem with inflate? %d\n", p->id, ret); > + qemu_mutex_unlock(&p->mutex); > + return ret; > + } > + out_size += iov->iov_len; > + } > + if (out_size != expected_size) { > + printf("out size %d expected size %d\n", > + out_size, expected_size); > + return -1; > + } > + return 0; > +} > + > +MultifdMethods multifd_zlib_ops = { > + .send_prepare = zlib_send_prepare, > + .send_write = zlib_send_write, > + .recv_pages = zlib_recv_pages > +}; > + > static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) > { > MultiFDInit_t msg; > @@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque) > /* initial packet */ > p->num_packets = 1; > > - multifd_send_state->ops = &multifd_none_ops; > + if (migrate_use_multifd_zlib()) { > + multifd_send_state->ops = &multifd_zlib_ops; > + } else { > + multifd_send_state->ops = &multifd_none_ops; > + } > > while (true) { > qemu_sem_wait(&p->sem); > @@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque) > trace_multifd_recv_thread_start(p->id); > rcu_register_thread(); > > - multifd_recv_state->ops = &multifd_none_ops; > + if (migrate_use_multifd_zlib()) { > + multifd_recv_state->ops = &multifd_zlib_ops; > + } else { > + multifd_recv_state->ops = &multifd_none_ops; > + } > while (true) { > uint32_t used; > uint32_t flags; > -- > 2.21.0 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Wei Yang <richardw.yang@linux.intel.com> wrote: > On Wed, May 15, 2019 at 02:15:44PM +0200, Juan Quintela wrote: >>This is still a work in progress, but get everything sent as expected >>and it is faster than the code that is already there. > > Generally, I prefer to merge this one with previous one. Done, sir O:-) For the WIP part, it was easier to have the bits that didn't change and the ones that I was working with. >>@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque) >> /* initial packet */ >> p->num_packets = 1; >> >>- multifd_send_state->ops = &multifd_none_ops; >>+ if (migrate_use_multifd_zlib()) { >>+ multifd_send_state->ops = &multifd_zlib_ops; >>+ } else { >>+ multifd_send_state->ops = &multifd_none_ops; >>+ } > > Again, to manipulate a global variable in each thread is not a good idea. Fixed. > This would be better to use an array to assign ops instead of *if*. In case > you would have several compress methods, the code would be difficult to read. it is going to end: if (migrate_use_multifd_zlib()) { multifd_send_state->ops = &multifd_zlib_ops; if (migrate_use_multifd_zstd()) { multifd_send_state->ops = &multifd_zstd_ops; } else { multifd_send_state->ops = &multifd_none_ops; } We can use: multifd_send_state->ops = multifd_ops[migrate_multifd_method(void)]; About what is easier to read ..... it depends on taste. Will change anyways. Thanks, Juan.
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > * Juan Quintela (quintela@redhat.com) wrote: >> This is still a work in progress, but get everything sent as expected >> and it is faster than the code that is already there. >> >> Signed-off-by: Juan Quintela <quintela@redhat.com> >> --- >> migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++- >> 1 file changed, 104 insertions(+), 2 deletions(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index fdb5bf07a5..efbb253c1a 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = { >> .recv_pages = none_recv_pages >> }; >> >> +/* Multifd zlib compression */ >> + > > Comment the return value? Once there, commented all the functions. >> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used) >> +{ >> + struct iovec *iov = p->pages->iov; >> + z_stream *zs = &p->zs; >> + uint32_t out_size = 0; >> + int ret; >> + int i; > > uint32_t to match 'used' ? Done >> + for (i = 0; i < used; i++) { >> + uint32_t available = p->zbuff_len - out_size; >> + int flush = Z_NO_FLUSH; >> + >> + if (i == used - 1) { >> + flush = Z_SYNC_FLUSH; >> + } >> + >> + zs->avail_in = iov[i].iov_len; >> + zs->next_in = iov[i].iov_base; >> + >> + zs->avail_out = available; >> + zs->next_out = p->zbuff + out_size; >> + >> + ret = deflate(zs, flush); >> + if (ret != Z_OK) { >> + printf("problem with deflate? %d\n", ret); > > If it's an error it should probably be at least an fprintf(stderr or > err_ something. We don't have any error arround really, we need one. Searching for it. > Should this also check that the avail_in/next_in has consumed the whole > of the input? I am not checking because _it_ is supposed to b doing it right. We can test it through, specially in reception. >> + qemu_mutex_unlock(&p->mutex); > > Can you explain and/or comment whyit's unlocked here in the error path? Uh, oh .... Leftover for when it was done inline inside the main function. Removed. >> + return -1; >> + } >> + out_size += available - zs->avail_out; >> + } >> + p->next_packet_size = out_size; > > Some traces_ wouldn't hurt. Humm, you are right here. Thanks, Juan.
diff --git a/migration/ram.c b/migration/ram.c index fdb5bf07a5..efbb253c1a 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = { .recv_pages = none_recv_pages }; +/* Multifd zlib compression */ + +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used) +{ + struct iovec *iov = p->pages->iov; + z_stream *zs = &p->zs; + uint32_t out_size = 0; + int ret; + int i; + + for (i = 0; i < used; i++) { + uint32_t available = p->zbuff_len - out_size; + int flush = Z_NO_FLUSH; + + if (i == used - 1) { + flush = Z_SYNC_FLUSH; + } + + zs->avail_in = iov[i].iov_len; + zs->next_in = iov[i].iov_base; + + zs->avail_out = available; + zs->next_out = p->zbuff + out_size; + + ret = deflate(zs, flush); + if (ret != Z_OK) { + printf("problem with deflate? %d\n", ret); + qemu_mutex_unlock(&p->mutex); + return -1; + } + out_size += available - zs->avail_out; + } + p->next_packet_size = out_size; + + return 0; +} + +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) +{ + return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size, + perr); +} + +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) +{ + uint32_t in_size = p->next_packet_size; + uint32_t out_size = 0; + uint32_t expected_size = used * qemu_target_page_size(); + z_stream *zs = &p->zs; + int ret; + int i; + + ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr); + + if (ret != 0) { + return ret; + } + + zs->avail_in = in_size; + zs->next_in = p->zbuff; + + for (i = 0; i < used; i++) { + struct iovec *iov = &p->pages->iov[i]; + int flush = Z_NO_FLUSH; + + if (i == used - 1) { + flush = Z_SYNC_FLUSH; + } + + zs->avail_out = iov->iov_len; + zs->next_out = iov->iov_base; + + ret = inflate(zs, flush); + if (ret != Z_OK) { + printf("%d: problem with inflate? %d\n", p->id, ret); + qemu_mutex_unlock(&p->mutex); + return ret; + } + out_size += iov->iov_len; + } + if (out_size != expected_size) { + printf("out size %d expected size %d\n", + out_size, expected_size); + return -1; + } + return 0; +} + +MultifdMethods multifd_zlib_ops = { + .send_prepare = zlib_send_prepare, + .send_write = zlib_send_write, + .recv_pages = zlib_recv_pages +}; + static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) { MultiFDInit_t msg; @@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque) /* initial packet */ p->num_packets = 1; - multifd_send_state->ops = &multifd_none_ops; + if (migrate_use_multifd_zlib()) { + multifd_send_state->ops = &multifd_zlib_ops; + } else { + multifd_send_state->ops = &multifd_none_ops; + } while (true) { qemu_sem_wait(&p->sem); @@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); rcu_register_thread(); - multifd_recv_state->ops = &multifd_none_ops; + if (migrate_use_multifd_zlib()) { + multifd_recv_state->ops = &multifd_zlib_ops; + } else { + multifd_recv_state->ops = &multifd_none_ops; + } while (true) { uint32_t used; uint32_t flags;
This is still a work in progress, but get everything sent as expected and it is faster than the code that is already there. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-)