diff mbox series

[v3,8/8] multifd: rest of zlib compression

Message ID 20190515121544.4597-9-quintela@redhat.com
State New
Headers show
Series WIP: Multifd compression support | expand

Commit Message

Juan Quintela May 15, 2019, 12:15 p.m. UTC
This is still a work in progress, but get everything sent as expected
and it is faster than the code that is already there.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 104 insertions(+), 2 deletions(-)

Comments

Wei Yang May 21, 2019, 3:11 a.m. UTC | #1
On Wed, May 15, 2019 at 02:15:44PM +0200, Juan Quintela wrote:
>This is still a work in progress, but get everything sent as expected
>and it is faster than the code that is already there.

Generally, I prefer to merge this one with previous one.

>
>Signed-off-by: Juan Quintela <quintela@redhat.com>
>---
> migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 104 insertions(+), 2 deletions(-)
>
>diff --git a/migration/ram.c b/migration/ram.c
>index fdb5bf07a5..efbb253c1a 100644
>--- a/migration/ram.c
>+++ b/migration/ram.c
>@@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
>     .recv_pages = none_recv_pages
> };
> 
>+/* Multifd zlib compression */
>+
>+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
>+{
>+    struct iovec *iov = p->pages->iov;
>+    z_stream *zs = &p->zs;
>+    uint32_t out_size = 0;
>+    int ret;
>+    int i;
>+
>+    for (i = 0; i < used; i++) {
>+        uint32_t available = p->zbuff_len - out_size;
>+        int flush = Z_NO_FLUSH;
>+
>+        if (i == used  - 1) {
>+            flush = Z_SYNC_FLUSH;
>+        }
>+
>+        zs->avail_in = iov[i].iov_len;
>+        zs->next_in = iov[i].iov_base;
>+
>+        zs->avail_out = available;
>+        zs->next_out = p->zbuff + out_size;
>+
>+        ret = deflate(zs, flush);
>+        if (ret != Z_OK) {
>+            printf("problem with deflate? %d\n", ret);
>+            qemu_mutex_unlock(&p->mutex);
>+            return -1;
>+        }
>+        out_size += available - zs->avail_out;
>+    }
>+    p->next_packet_size = out_size;
>+
>+    return 0;
>+}
>+
>+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
>+{
>+    return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size,
>+                                 perr);
>+}
>+
>+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
>+{
>+    uint32_t in_size = p->next_packet_size;
>+    uint32_t out_size = 0;
>+    uint32_t expected_size = used * qemu_target_page_size();
>+    z_stream *zs = &p->zs;
>+    int ret;
>+    int i;
>+
>+    ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr);
>+
>+    if (ret != 0) {
>+        return ret;
>+    }
>+
>+    zs->avail_in = in_size;
>+    zs->next_in = p->zbuff;
>+
>+    for (i = 0; i < used; i++) {
>+        struct iovec *iov = &p->pages->iov[i];
>+        int flush = Z_NO_FLUSH;
>+
>+        if (i == used  - 1) {
>+            flush = Z_SYNC_FLUSH;
>+        }
>+
>+        zs->avail_out = iov->iov_len;
>+        zs->next_out = iov->iov_base;
>+
>+        ret = inflate(zs, flush);
>+        if (ret != Z_OK) {
>+            printf("%d: problem with inflate? %d\n", p->id, ret);
>+            qemu_mutex_unlock(&p->mutex);
>+            return ret;
>+        }
>+        out_size += iov->iov_len;
>+    }
>+    if (out_size != expected_size) {
>+        printf("out size %d expected size %d\n",
>+               out_size, expected_size);
>+        return -1;
>+    }
>+    return 0;
>+}
>+
>+MultifdMethods multifd_zlib_ops = {
>+    .send_prepare = zlib_send_prepare,
>+    .send_write = zlib_send_write,
>+    .recv_pages = zlib_recv_pages
>+};
>+
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> {
>     MultiFDInit_t msg;
>@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
>     /* initial packet */
>     p->num_packets = 1;
> 
>-    multifd_send_state->ops = &multifd_none_ops;
>+    if (migrate_use_multifd_zlib()) {
>+        multifd_send_state->ops = &multifd_zlib_ops;
>+    } else {
>+        multifd_send_state->ops = &multifd_none_ops;
>+    }

Again, to manipulate a global variable in each thread is not a good idea.

This would be better to use an array to assign ops instead of *if*. In case
you would have several compress methods, the code would be difficult to read.

> 
>     while (true) {
>         qemu_sem_wait(&p->sem);
>@@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque)
>     trace_multifd_recv_thread_start(p->id);
>     rcu_register_thread();
> 
>-    multifd_recv_state->ops = &multifd_none_ops;
>+    if (migrate_use_multifd_zlib()) {
>+        multifd_recv_state->ops = &multifd_zlib_ops;
>+    } else {
>+        multifd_recv_state->ops = &multifd_none_ops;
>+    }
>     while (true) {
>         uint32_t used;
>         uint32_t flags;
>-- 
>2.21.0
>
Dr. David Alan Gilbert May 29, 2019, 5:15 p.m. UTC | #2
* Juan Quintela (quintela@redhat.com) wrote:
> This is still a work in progress, but get everything sent as expected
> and it is faster than the code that is already there.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 104 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index fdb5bf07a5..efbb253c1a 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
>      .recv_pages = none_recv_pages
>  };
>  
> +/* Multifd zlib compression */
> +

Comment the return value?

> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
> +{
> +    struct iovec *iov = p->pages->iov;
> +    z_stream *zs = &p->zs;
> +    uint32_t out_size = 0;
> +    int ret;
> +    int i;

uint32_t to match 'used' ?

> +    for (i = 0; i < used; i++) {
> +        uint32_t available = p->zbuff_len - out_size;
> +        int flush = Z_NO_FLUSH;
> +
> +        if (i == used  - 1) {
> +            flush = Z_SYNC_FLUSH;
> +        }
> +
> +        zs->avail_in = iov[i].iov_len;
> +        zs->next_in = iov[i].iov_base;
> +
> +        zs->avail_out = available;
> +        zs->next_out = p->zbuff + out_size;
> +
> +        ret = deflate(zs, flush);
> +        if (ret != Z_OK) {
> +            printf("problem with deflate? %d\n", ret);

If it's an error it should probably be at least an fprintf(stderr or
err_ something.

Should this also check that the avail_in/next_in has consumed the whole
of the input?

> +            qemu_mutex_unlock(&p->mutex);

Can you explain and/or comment whyit's unlocked here in the error path?

> +            return -1;
> +        }
> +        out_size += available - zs->avail_out;
> +    }
> +    p->next_packet_size = out_size;

Some traces_ wouldn't hurt.

> +    return 0;
> +}
> +
> +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
> +{
> +    return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size,
> +                                 perr);
> +}
> +
> +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
> +{
> +    uint32_t in_size = p->next_packet_size;
> +    uint32_t out_size = 0;
> +    uint32_t expected_size = used * qemu_target_page_size();
> +    z_stream *zs = &p->zs;
> +    int ret;
> +    int i;
> +
> +    ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr);
> +
> +    if (ret != 0) {
> +        return ret;
> +    }
> +
> +    zs->avail_in = in_size;
> +    zs->next_in = p->zbuff;
> +
> +    for (i = 0; i < used; i++) {
> +        struct iovec *iov = &p->pages->iov[i];
> +        int flush = Z_NO_FLUSH;
> +
> +        if (i == used  - 1) {
> +            flush = Z_SYNC_FLUSH;
> +        }
> +
> +        zs->avail_out = iov->iov_len;
> +        zs->next_out = iov->iov_base;
> +
> +        ret = inflate(zs, flush);
> +        if (ret != Z_OK) {
> +            printf("%d: problem with inflate? %d\n", p->id, ret);
> +            qemu_mutex_unlock(&p->mutex);
> +            return ret;
> +        }
> +        out_size += iov->iov_len;
> +    }
> +    if (out_size != expected_size) {
> +        printf("out size %d expected size %d\n",
> +               out_size, expected_size);
> +        return -1;
> +    }
> +    return 0;
> +}
> +
> +MultifdMethods multifd_zlib_ops = {
> +    .send_prepare = zlib_send_prepare,
> +    .send_write = zlib_send_write,
> +    .recv_pages = zlib_recv_pages
> +};
> +
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>  {
>      MultiFDInit_t msg;
> @@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
>      /* initial packet */
>      p->num_packets = 1;
>  
> -    multifd_send_state->ops = &multifd_none_ops;
> +    if (migrate_use_multifd_zlib()) {
> +        multifd_send_state->ops = &multifd_zlib_ops;
> +    } else {
> +        multifd_send_state->ops = &multifd_none_ops;
> +    }
>  
>      while (true) {
>          qemu_sem_wait(&p->sem);
> @@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>      rcu_register_thread();
>  
> -    multifd_recv_state->ops = &multifd_none_ops;
> +    if (migrate_use_multifd_zlib()) {
> +        multifd_recv_state->ops = &multifd_zlib_ops;
> +    } else {
> +        multifd_recv_state->ops = &multifd_none_ops;
> +    }
>      while (true) {
>          uint32_t used;
>          uint32_t flags;
> -- 
> 2.21.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela June 11, 2019, 4:54 p.m. UTC | #3
Wei Yang <richardw.yang@linux.intel.com> wrote:
> On Wed, May 15, 2019 at 02:15:44PM +0200, Juan Quintela wrote:
>>This is still a work in progress, but get everything sent as expected
>>and it is faster than the code that is already there.
>
> Generally, I prefer to merge this one with previous one.

Done, sir O:-)

For the WIP part, it was easier to have the bits that didn't change and
the ones that I was working with.

>>@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
>>     /* initial packet */
>>     p->num_packets = 1;
>> 
>>-    multifd_send_state->ops = &multifd_none_ops;
>>+    if (migrate_use_multifd_zlib()) {
>>+        multifd_send_state->ops = &multifd_zlib_ops;
>>+    } else {
>>+        multifd_send_state->ops = &multifd_none_ops;
>>+    }
>
> Again, to manipulate a global variable in each thread is not a good idea.

Fixed.

> This would be better to use an array to assign ops instead of *if*. In case
> you would have several compress methods, the code would be difficult to read.

it is going to end:

   if (migrate_use_multifd_zlib()) {
       multifd_send_state->ops = &multifd_zlib_ops;
   if (migrate_use_multifd_zstd()) {
       multifd_send_state->ops = &multifd_zstd_ops;
   } else {
       multifd_send_state->ops = &multifd_none_ops;
   }

We can use:

multifd_send_state->ops = multifd_ops[migrate_multifd_method(void)];

About what is easier to read .....  it depends on taste.

Will change anyways.

Thanks, Juan.
Juan Quintela June 12, 2019, 8:33 a.m. UTC | #4
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> This is still a work in progress, but get everything sent as expected
>> and it is faster than the code that is already there.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 104 insertions(+), 2 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index fdb5bf07a5..efbb253c1a 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
>>      .recv_pages = none_recv_pages
>>  };
>>  
>> +/* Multifd zlib compression */
>> +
>
> Comment the return value?

Once there, commented all the functions.

>> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
>> +{
>> +    struct iovec *iov = p->pages->iov;
>> +    z_stream *zs = &p->zs;
>> +    uint32_t out_size = 0;
>> +    int ret;
>> +    int i;
>
> uint32_t to match 'used' ?

Done

>> +    for (i = 0; i < used; i++) {
>> +        uint32_t available = p->zbuff_len - out_size;
>> +        int flush = Z_NO_FLUSH;
>> +
>> +        if (i == used  - 1) {
>> +            flush = Z_SYNC_FLUSH;
>> +        }
>> +
>> +        zs->avail_in = iov[i].iov_len;
>> +        zs->next_in = iov[i].iov_base;
>> +
>> +        zs->avail_out = available;
>> +        zs->next_out = p->zbuff + out_size;
>> +
>> +        ret = deflate(zs, flush);
>> +        if (ret != Z_OK) {
>> +            printf("problem with deflate? %d\n", ret);
>
> If it's an error it should probably be at least an fprintf(stderr or
> err_ something.

We don't have any error arround really, we need one. Searching for it.

> Should this also check that the avail_in/next_in has consumed the whole
> of the input?

I am not checking because _it_ is supposed to b doing it right.  We can
test it through, specially in reception.

>> +            qemu_mutex_unlock(&p->mutex);
>
> Can you explain and/or comment whyit's unlocked here in the error path?

Uh, oh ....

Leftover for when it was done inline inside the main function.
Removed.

>> +            return -1;
>> +        }
>> +        out_size += available - zs->avail_out;
>> +    }
>> +    p->next_packet_size = out_size;
>
> Some traces_ wouldn't hurt.

Humm, you are right here.

Thanks, Juan.
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index fdb5bf07a5..efbb253c1a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -747,6 +747,100 @@  MultifdMethods multifd_none_ops = {
     .recv_pages = none_recv_pages
 };
 
+/* Multifd zlib compression */
+
+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
+{
+    struct iovec *iov = p->pages->iov;
+    z_stream *zs = &p->zs;
+    uint32_t out_size = 0;
+    int ret;
+    int i;
+
+    for (i = 0; i < used; i++) {
+        uint32_t available = p->zbuff_len - out_size;
+        int flush = Z_NO_FLUSH;
+
+        if (i == used  - 1) {
+            flush = Z_SYNC_FLUSH;
+        }
+
+        zs->avail_in = iov[i].iov_len;
+        zs->next_in = iov[i].iov_base;
+
+        zs->avail_out = available;
+        zs->next_out = p->zbuff + out_size;
+
+        ret = deflate(zs, flush);
+        if (ret != Z_OK) {
+            printf("problem with deflate? %d\n", ret);
+            qemu_mutex_unlock(&p->mutex);
+            return -1;
+        }
+        out_size += available - zs->avail_out;
+    }
+    p->next_packet_size = out_size;
+
+    return 0;
+}
+
+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
+{
+    return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size,
+                                 perr);
+}
+
+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
+{
+    uint32_t in_size = p->next_packet_size;
+    uint32_t out_size = 0;
+    uint32_t expected_size = used * qemu_target_page_size();
+    z_stream *zs = &p->zs;
+    int ret;
+    int i;
+
+    ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr);
+
+    if (ret != 0) {
+        return ret;
+    }
+
+    zs->avail_in = in_size;
+    zs->next_in = p->zbuff;
+
+    for (i = 0; i < used; i++) {
+        struct iovec *iov = &p->pages->iov[i];
+        int flush = Z_NO_FLUSH;
+
+        if (i == used  - 1) {
+            flush = Z_SYNC_FLUSH;
+        }
+
+        zs->avail_out = iov->iov_len;
+        zs->next_out = iov->iov_base;
+
+        ret = inflate(zs, flush);
+        if (ret != Z_OK) {
+            printf("%d: problem with inflate? %d\n", p->id, ret);
+            qemu_mutex_unlock(&p->mutex);
+            return ret;
+        }
+        out_size += iov->iov_len;
+    }
+    if (out_size != expected_size) {
+        printf("out size %d expected size %d\n",
+               out_size, expected_size);
+        return -1;
+    }
+    return 0;
+}
+
+MultifdMethods multifd_zlib_ops = {
+    .send_prepare = zlib_send_prepare,
+    .send_write = zlib_send_write,
+    .recv_pages = zlib_recv_pages
+};
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg;
@@ -1145,7 +1239,11 @@  static void *multifd_send_thread(void *opaque)
     /* initial packet */
     p->num_packets = 1;
 
-    multifd_send_state->ops = &multifd_none_ops;
+    if (migrate_use_multifd_zlib()) {
+        multifd_send_state->ops = &multifd_zlib_ops;
+    } else {
+        multifd_send_state->ops = &multifd_none_ops;
+    }
 
     while (true) {
         qemu_sem_wait(&p->sem);
@@ -1399,7 +1497,11 @@  static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
     rcu_register_thread();
 
-    multifd_recv_state->ops = &multifd_none_ops;
+    if (migrate_use_multifd_zlib()) {
+        multifd_recv_state->ops = &multifd_zlib_ops;
+    } else {
+        multifd_recv_state->ops = &multifd_none_ops;
+    }
     while (true) {
         uint32_t used;
         uint32_t flags;