diff mbox series

[v3,6/8] migration: Make none operations intoits own structure

Message ID 20190515121544.4597-7-quintela@redhat.com
State New
Headers show
Series WIP: Multifd compression support | expand

Commit Message

Juan Quintela May 15, 2019, 12:15 p.m. UTC
It will be used later.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 49 insertions(+), 5 deletions(-)

Comments

Wei Yang May 21, 2019, 2:47 a.m. UTC | #1
On Wed, May 15, 2019 at 02:15:42PM +0200, Juan Quintela wrote:
>+
>+MultifdMethods multifd_none_ops = {
>+    .send_prepare = none_send_prepare,
>+    .send_write = none_send_write,
>+    .recv_pages = none_recv_pages
>+};
>+
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> {
>     MultiFDInit_t msg;
>@@ -904,6 +938,8 @@ struct {
>     uint64_t packet_num;
>     /* send channels ready */
>     QemuSemaphore channels_ready;
>+    /* multifd ops */
>+    MultifdMethods *ops;
> } *multifd_send_state;
> 
> /*
>@@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque)
>     /* initial packet */
>     p->num_packets = 1;
> 
>+    multifd_send_state->ops = &multifd_none_ops;
>+

I am afraid it is not a good practice to assign ops when each thread starts
work.

>     while (true) {
>         qemu_sem_wait(&p->sem);
>         qemu_mutex_lock(&p->mutex);
>@@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque)
>             uint64_t packet_num = p->packet_num;
>             uint32_t flags = p->flags;
> 
>-            p->next_packet_size = used * qemu_target_page_size();
>+            if (used) {
>+                ret = multifd_send_state->ops->send_prepare(p, used);
>+                if (ret != 0) {
>+                    break;
>+                }
>+            }
>             multifd_send_fill_packet(p);
>             p->flags = 0;
>             p->num_packets++;
>@@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque)
>             }
> 
>             if (used) {
>-                ret = qio_channel_writev_all(p->c, p->pages->iov,
>-                                             used, &local_err);
>+                ret = multifd_send_state->ops->send_write(p, used, &local_err);
>                 if (ret != 0) {
>                     break;
>                 }
>@@ -1223,6 +1265,8 @@ struct {
>     QemuSemaphore sem_sync;
>     /* global number of generated multifd packets */
>     uint64_t packet_num;
>+    /* multifd ops */
>+    MultifdMethods *ops;
> } *multifd_recv_state;
> 
> static void multifd_recv_terminate_threads(Error *err)
>@@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque)
>     trace_multifd_recv_thread_start(p->id);
>     rcu_register_thread();
> 
>+    multifd_recv_state->ops = &multifd_none_ops;

same as here.

>     while (true) {
>         uint32_t used;
>         uint32_t flags;
>@@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque)
>         qemu_mutex_unlock(&p->mutex);
> 
>         if (used) {
>-            ret = qio_channel_readv_all(p->c, p->pages->iov,
>-                                        used, &local_err);
>+            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
>             if (ret != 0) {
>                 break;
>             }
>-- 
>2.21.0
>
Dr. David Alan Gilbert May 29, 2019, 4:34 p.m. UTC | #2
* Juan Quintela (quintela@redhat.com) wrote:
> It will be used later.

'none' is confusing - I think this is no-compression specifically -
right?
I'd be happy with something abbreviated like 'nocomp'

> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++-----
>  1 file changed, 49 insertions(+), 5 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 1ca9ba77b6..6679e4f213 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -700,6 +700,40 @@ typedef struct {
>      QemuSemaphore sem_sync;
>  } MultiFDRecvParams;
>  
> +typedef struct {
> +    /* Prepare the send packet */
> +    int (*send_prepare)(MultiFDSendParams *p, uint32_t used);
> +    /* Write the send packet */
> +    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **perr);
> +    /* Read all pages */
> +    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **perr);
> +} MultifdMethods;
> +
> +/* Multifd without compression */
> +
> +static int none_send_prepare(MultiFDSendParams *p, uint32_t used)
> +{
> +    p->next_packet_size = used * qemu_target_page_size();
> +    return 0;
> +}
> +
> +static int none_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
> +{
> +    return qio_channel_writev_all(p->c, p->pages->iov, used, perr);
> +}
> +
> +static int none_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
> +{
> +    return qio_channel_readv_all(p->c, p->pages->iov, used, perr);
> +
> +}
> +
> +MultifdMethods multifd_none_ops = {
> +    .send_prepare = none_send_prepare,
> +    .send_write = none_send_write,
> +    .recv_pages = none_recv_pages
> +};
> +
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>  {
>      MultiFDInit_t msg;
> @@ -904,6 +938,8 @@ struct {
>      uint64_t packet_num;
>      /* send channels ready */
>      QemuSemaphore channels_ready;
> +    /* multifd ops */
> +    MultifdMethods *ops;
>  } *multifd_send_state;
>  
>  /*
> @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque)
>      /* initial packet */
>      p->num_packets = 1;
>  
> +    multifd_send_state->ops = &multifd_none_ops;
> +

I agree with Wei Yang that is a bad idea; that should be done once
before the first thread is started.

Dave

>      while (true) {
>          qemu_sem_wait(&p->sem);
>          qemu_mutex_lock(&p->mutex);
> @@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque)
>              uint64_t packet_num = p->packet_num;
>              uint32_t flags = p->flags;
>  
> -            p->next_packet_size = used * qemu_target_page_size();
> +            if (used) {
> +                ret = multifd_send_state->ops->send_prepare(p, used);
> +                if (ret != 0) {
> +                    break;
> +                }
> +            }
>              multifd_send_fill_packet(p);
>              p->flags = 0;
>              p->num_packets++;
> @@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque)
>              }
>  
>              if (used) {
> -                ret = qio_channel_writev_all(p->c, p->pages->iov,
> -                                             used, &local_err);
> +                ret = multifd_send_state->ops->send_write(p, used, &local_err);
>                  if (ret != 0) {
>                      break;
>                  }
> @@ -1223,6 +1265,8 @@ struct {
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    /* multifd ops */
> +    MultifdMethods *ops;
>  } *multifd_recv_state;
>  
>  static void multifd_recv_terminate_threads(Error *err)
> @@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque)
>      trace_multifd_recv_thread_start(p->id);
>      rcu_register_thread();
>  
> +    multifd_recv_state->ops = &multifd_none_ops;
>      while (true) {
>          uint32_t used;
>          uint32_t flags;
> @@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque)
>          qemu_mutex_unlock(&p->mutex);
>  
>          if (used) {
> -            ret = qio_channel_readv_all(p->c, p->pages->iov,
> -                                        used, &local_err);
> +            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
>              if (ret != 0) {
>                  break;
>              }
> -- 
> 2.21.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela June 10, 2019, 9:54 a.m. UTC | #3
Wei Yang <richardw.yang@linux.intel.com> wrote:
> On Wed, May 15, 2019 at 02:15:42PM +0200, Juan Quintela wrote:
>>+
>>+MultifdMethods multifd_none_ops = {
>>+    .send_prepare = none_send_prepare,
>>+    .send_write = none_send_write,
>>+    .recv_pages = none_recv_pages
>>+};
>>+
>> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>> {
>>     MultiFDInit_t msg;
>>@@ -904,6 +938,8 @@ struct {
>>     uint64_t packet_num;
>>     /* send channels ready */
>>     QemuSemaphore channels_ready;
>>+    /* multifd ops */
>>+    MultifdMethods *ops;
>> } *multifd_send_state;
>> 
>> /*
>>@@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque)
>>     /* initial packet */
>>     p->num_packets = 1;
>> 
>>+    multifd_send_state->ops = &multifd_none_ops;
>>+
>
> I am afraid it is not a good practice to assign ops when each thread starts
> work.

Agreed.

Thanks, Juan.
Juan Quintela June 10, 2019, 9:54 a.m. UTC | #4
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> It will be used later.
>
> 'none' is confusing - I think this is no-compression specifically -
> right?
> I'd be happy with something abbreviated like 'nocomp'

Got into nocomp.
>> @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque)
>>      /* initial packet */
>>      p->num_packets = 1;
>>  
>> +    multifd_send_state->ops = &multifd_none_ops;
>> +
>
> I agree with Wei Yang that is a bad idea; that should be done once
> before the first thread is started.

Also fixed.

Thanks, Juan.
Juan Quintela June 12, 2019, 11:54 a.m. UTC | #5
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> It will be used later.
>
> 'none' is confusing - I think this is no-compression specifically -
> right?
> I'd be happy with something abbreviated like 'nocomp'

I don't care too much, I can change, but when you are setting the value
it gets:

micgration set-parameter compression none

That looks ok.

On the other hand, I can agree that I can call the functions nocomp.

Thanks, Juan.

>
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 54 ++++++++++++++++++++++++++++++++++++++++++++-----
>>  1 file changed, 49 insertions(+), 5 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 1ca9ba77b6..6679e4f213 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -700,6 +700,40 @@ typedef struct {
>>      QemuSemaphore sem_sync;
>>  } MultiFDRecvParams;
>>  
>> +typedef struct {
>> +    /* Prepare the send packet */
>> +    int (*send_prepare)(MultiFDSendParams *p, uint32_t used);
>> +    /* Write the send packet */
>> +    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **perr);
>> +    /* Read all pages */
>> +    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **perr);
>> +} MultifdMethods;
>> +
>> +/* Multifd without compression */
>> +
>> +static int none_send_prepare(MultiFDSendParams *p, uint32_t used)
>> +{
>> +    p->next_packet_size = used * qemu_target_page_size();
>> +    return 0;
>> +}
>> +
>> +static int none_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
>> +{
>> +    return qio_channel_writev_all(p->c, p->pages->iov, used, perr);
>> +}
>> +
>> +static int none_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
>> +{
>> +    return qio_channel_readv_all(p->c, p->pages->iov, used, perr);
>> +
>> +}
>> +
>> +MultifdMethods multifd_none_ops = {
>> +    .send_prepare = none_send_prepare,
>> +    .send_write = none_send_write,
>> +    .recv_pages = none_recv_pages
>> +};
>> +
>>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>>  {
>>      MultiFDInit_t msg;
>> @@ -904,6 +938,8 @@ struct {
>>      uint64_t packet_num;
>>      /* send channels ready */
>>      QemuSemaphore channels_ready;
>> +    /* multifd ops */
>> +    MultifdMethods *ops;
>>  } *multifd_send_state;
>>  
>>  /*
>> @@ -1093,6 +1129,8 @@ static void *multifd_send_thread(void *opaque)
>>      /* initial packet */
>>      p->num_packets = 1;
>>  
>> +    multifd_send_state->ops = &multifd_none_ops;
>> +
>
> I agree with Wei Yang that is a bad idea; that should be done once
> before the first thread is started.
>
> Dave
>
>>      while (true) {
>>          qemu_sem_wait(&p->sem);
>>          qemu_mutex_lock(&p->mutex);
>> @@ -1102,7 +1140,12 @@ static void *multifd_send_thread(void *opaque)
>>              uint64_t packet_num = p->packet_num;
>>              uint32_t flags = p->flags;
>>  
>> -            p->next_packet_size = used * qemu_target_page_size();
>> +            if (used) {
>> +                ret = multifd_send_state->ops->send_prepare(p, used);
>> +                if (ret != 0) {
>> +                    break;
>> +                }
>> +            }
>>              multifd_send_fill_packet(p);
>>              p->flags = 0;
>>              p->num_packets++;
>> @@ -1120,8 +1163,7 @@ static void *multifd_send_thread(void *opaque)
>>              }
>>  
>>              if (used) {
>> -                ret = qio_channel_writev_all(p->c, p->pages->iov,
>> -                                             used, &local_err);
>> +                ret = multifd_send_state->ops->send_write(p, used, &local_err);
>>                  if (ret != 0) {
>>                      break;
>>                  }
>> @@ -1223,6 +1265,8 @@ struct {
>>      QemuSemaphore sem_sync;
>>      /* global number of generated multifd packets */
>>      uint64_t packet_num;
>> +    /* multifd ops */
>> +    MultifdMethods *ops;
>>  } *multifd_recv_state;
>>  
>>  static void multifd_recv_terminate_threads(Error *err)
>> @@ -1324,6 +1368,7 @@ static void *multifd_recv_thread(void *opaque)
>>      trace_multifd_recv_thread_start(p->id);
>>      rcu_register_thread();
>>  
>> +    multifd_recv_state->ops = &multifd_none_ops;
>>      while (true) {
>>          uint32_t used;
>>          uint32_t flags;
>> @@ -1353,8 +1398,7 @@ static void *multifd_recv_thread(void *opaque)
>>          qemu_mutex_unlock(&p->mutex);
>>  
>>          if (used) {
>> -            ret = qio_channel_readv_all(p->c, p->pages->iov,
>> -                                        used, &local_err);
>> +            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
>>              if (ret != 0) {
>>                  break;
>>              }
>> -- 
>> 2.21.0
>> 
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 1ca9ba77b6..6679e4f213 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -700,6 +700,40 @@  typedef struct {
     QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
+typedef struct {
+    /* Prepare the send packet */
+    int (*send_prepare)(MultiFDSendParams *p, uint32_t used);
+    /* Write the send packet */
+    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **perr);
+    /* Read all pages */
+    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **perr);
+} MultifdMethods;
+
+/* Multifd without compression */
+
+static int none_send_prepare(MultiFDSendParams *p, uint32_t used)
+{
+    p->next_packet_size = used * qemu_target_page_size();
+    return 0;
+}
+
+static int none_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
+{
+    return qio_channel_writev_all(p->c, p->pages->iov, used, perr);
+}
+
+static int none_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
+{
+    return qio_channel_readv_all(p->c, p->pages->iov, used, perr);
+
+}
+
+MultifdMethods multifd_none_ops = {
+    .send_prepare = none_send_prepare,
+    .send_write = none_send_write,
+    .recv_pages = none_recv_pages
+};
+
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg;
@@ -904,6 +938,8 @@  struct {
     uint64_t packet_num;
     /* send channels ready */
     QemuSemaphore channels_ready;
+    /* multifd ops */
+    MultifdMethods *ops;
 } *multifd_send_state;
 
 /*
@@ -1093,6 +1129,8 @@  static void *multifd_send_thread(void *opaque)
     /* initial packet */
     p->num_packets = 1;
 
+    multifd_send_state->ops = &multifd_none_ops;
+
     while (true) {
         qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
@@ -1102,7 +1140,12 @@  static void *multifd_send_thread(void *opaque)
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
 
-            p->next_packet_size = used * qemu_target_page_size();
+            if (used) {
+                ret = multifd_send_state->ops->send_prepare(p, used);
+                if (ret != 0) {
+                    break;
+                }
+            }
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
@@ -1120,8 +1163,7 @@  static void *multifd_send_thread(void *opaque)
             }
 
             if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
+                ret = multifd_send_state->ops->send_write(p, used, &local_err);
                 if (ret != 0) {
                     break;
                 }
@@ -1223,6 +1265,8 @@  struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /* multifd ops */
+    MultifdMethods *ops;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -1324,6 +1368,7 @@  static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
     rcu_register_thread();
 
+    multifd_recv_state->ops = &multifd_none_ops;
     while (true) {
         uint32_t used;
         uint32_t flags;
@@ -1353,8 +1398,7 @@  static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
 
         if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
+            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
             if (ret != 0) {
                 break;
             }