diff mbox series

[v5,41/50] multi-process/mig: Enable VMSD save in the Proxy object

Message ID f056d73bb7f749171626cb8abc5a317b0ec17845.1582576372.git.jag.raman@oracle.com
State New
Headers show
Series Initial support for multi-process qemu | expand

Commit Message

Jag Raman Feb. 24, 2020, 8:55 p.m. UTC
Collect the VMSD from remote process on the source and save
it to the channel leading to the destination

Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
---
 v4 -> v5:
  - Using qemu_file_shutdown() instead of qemu_thread_cancel(). Removed patch
    which introduced qemu_thread_cancel()

 hw/proxy/qemu-proxy.c         | 135 ++++++++++++++++++++++++++++++++++++++++++
 include/hw/proxy/qemu-proxy.h |   2 +
 include/io/mpqemu-link.h      |   1 +
 3 files changed, 138 insertions(+)

Comments

Dr. David Alan Gilbert March 5, 2020, 12:31 p.m. UTC | #1
* Jagannathan Raman (jag.raman@oracle.com) wrote:
> Collect the VMSD from remote process on the source and save
> it to the channel leading to the destination
> 
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
>  v4 -> v5:
>   - Using qemu_file_shutdown() instead of qemu_thread_cancel(). Removed patch
>     which introduced qemu_thread_cancel()
> 
>  hw/proxy/qemu-proxy.c         | 135 ++++++++++++++++++++++++++++++++++++++++++
>  include/hw/proxy/qemu-proxy.h |   2 +
>  include/io/mpqemu-link.h      |   1 +
>  3 files changed, 138 insertions(+)
> 
> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
> index b1b9282..19f0dbb 100644
> --- a/hw/proxy/qemu-proxy.c
> +++ b/hw/proxy/qemu-proxy.c
> @@ -23,6 +23,14 @@
>  #include "util/event_notifier-posix.c"
>  #include "hw/boards.h"
>  #include "include/qemu/log.h"
> +#include "io/channel.h"
> +#include "migration/qemu-file-types.h"
> +#include "qapi/error.h"
> +#include "io/channel-util.h"
> +#include "migration/qemu-file-channel.h"
> +#include "migration/qemu-file.h"
> +#include "migration/migration.h"
> +#include "migration/vmstate.h"
>  
>  QEMUTimer *hb_timer;
>  static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
> @@ -35,6 +43,9 @@ static void broadcast_init(void);
>  static int config_op_send(PCIProxyDev *dev, uint32_t addr, uint32_t *val, int l,
>                            unsigned int op);
>  
> +#define PAGE_SIZE qemu_real_host_page_size
> +uint8_t *mig_data;
> +
>  static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
>  {
>      /* TODO: Add proper handler. */
> @@ -460,6 +471,129 @@ static void pci_proxy_dev_inst_init(Object *obj)
>      dev->mem_init = false;
>  }
>  
> +typedef struct {
> +    QEMUFile *rem;
> +    PCIProxyDev *dev;
> +} proxy_mig_data;
> +
> +static void *proxy_mig_out(void *opaque)
> +{
> +    proxy_mig_data *data = opaque;
> +    PCIProxyDev *dev = data->dev;
> +    uint8_t byte;
> +    uint64_t data_size = PAGE_SIZE;
> +
> +    mig_data = g_malloc(data_size);
> +
> +    while (true) {
> +        byte = qemu_get_byte(data->rem);
> +
> +        if (qemu_file_get_error(data->rem)) {
> +            break;
> +        }
> +
> +        mig_data[dev->migsize++] = byte;
> +        if (dev->migsize == data_size) {
> +            data_size += PAGE_SIZE;
> +            mig_data = g_realloc(mig_data, data_size);
> +        }
> +    }
> +
> +    return NULL;
> +}
> +
> +static int proxy_pre_save(void *opaque)
> +{
> +    PCIProxyDev *pdev = opaque;
> +    proxy_mig_data *mig_data;
> +    QEMUFile *f_remote;
> +    MPQemuMsg msg = {0};
> +    QemuThread thread;
> +    Error *err = NULL;
> +    QIOChannel *ioc;
> +    uint64_t size;
> +    int fd[2];
> +
> +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
> +        return -1;
> +    }
> +
> +    ioc = qio_channel_new_fd(fd[0], &err);
> +    if (err) {
> +        error_report_err(err);
> +        return -1;
> +    }
> +
> +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
> +
> +    f_remote = qemu_fopen_channel_input(ioc);
> +
> +    pdev->migsize = 0;
> +
> +    mig_data = g_malloc0(sizeof(proxy_mig_data));
> +    mig_data->rem = f_remote;

This feels way too complicated.   Since we know f_remote is always just
a simple fd we're getting we don't need to use qio or qemu_file here;
just use the fd - nice and simple.

Then the code to read it can just use read(2) with a sensible size chunk
rather than reading a byte at a time.

> +    mig_data->dev = pdev;
> +
> +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
> +                       QEMU_THREAD_DETACHED);

I'm just checking why a thread is necessary; is this because you need to
be able to start reading before you block waiting for the remote to tell
you the size - worrying that if you don't start reading then the remote
might block waiting for us?

> +    msg.cmd = START_MIG_OUT;
> +    msg.bytestream = 0;
> +    msg.num_fds = 2;
> +    msg.fds[0] = fd[1];
> +    msg.fds[1] = GET_REMOTE_WAIT;
> +
> +    mpqemu_msg_send(&msg, pdev->mpqemu_link->com);
> +    size = wait_for_remote(msg.fds[1]);
> +    PUT_REMOTE_WAIT(msg.fds[1]);
> +
> +    assert(size != ULLONG_MAX);
> +
> +    /*
> +     * migsize is being update by a separate thread. Using volatile to
> +     * instruct the compiler to fetch the value of this variable from
> +     * memory during every read
> +     */
> +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
> +    }

Hmm.  I suggest the following:

  a) You create a shared 'size' variable;  and initialize it to
    UINT64_MAX.
  b) The thread uses atomic_read(shared_size) to read that value.
  c) When you receive the size from the remote you do
     atomic_set(&shared_size, size);
  d) The thread does:
     while (received_size < atomic_read(&shared_size))

     so the thread will quit either on EOF or it hitting the size.

  e) We pthread_join here to wait for the thread
  f) We then check a received size to make sure it matches what we
expect.

That removes the tight loop.

> +    qemu_file_shutdown(f_remote);
> +
> +    qemu_fclose(f_remote);
> +    close(fd[1]);
> +
> +    return 0;
> +}
> +
> +static int proxy_post_save(void *opaque)
> +{
> +    MigrationState *ms = migrate_get_current();
> +    PCIProxyDev *pdev = opaque;
> +    uint64_t pos = 0;
> +
> +    while (pos < pdev->migsize) {
> +        qemu_put_byte(ms->to_dst_file, mig_data[pos]);
> +        pos++;
> +    }
> +
> +    qemu_fflush(ms->to_dst_file);
> +
> +    return 0;

I don't think you need that.

> +}
> +
> +const VMStateDescription vmstate_pci_proxy_device = {
> +    .name = "PCIProxyDevice",
> +    .version_id = 2,
> +    .minimum_version_id = 1,
> +    .pre_save = proxy_pre_save,
> +    .post_save = proxy_post_save,
> +    .fields = (VMStateField[]) {
> +        VMSTATE_PCI_DEVICE(parent_dev, PCIProxyDev),
> +        VMSTATE_UINT64(migsize, PCIProxyDev),

I think instead you should use a VMSTATE_VBUFFER here to save
the mig_data.
What we should do is the post_save should g_free the buffer.
(mig_data should be a field in proxy).

Dave


> +        VMSTATE_END_OF_LIST()
> +    }
> +};
> +
>  static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
>  {
>      PCIDeviceClass *k = PCI_DEVICE_CLASS(klass);
> @@ -471,6 +605,7 @@ static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
>      k->config_write = pci_proxy_write_config;
>  
>      dc->reset = proxy_device_reset;
> +    dc->vmsd = &vmstate_pci_proxy_device;
>  }
>  
>  static const TypeInfo pci_proxy_dev_type_info = {
> diff --git a/include/hw/proxy/qemu-proxy.h b/include/hw/proxy/qemu-proxy.h
> index 5de8129..537c227 100644
> --- a/include/hw/proxy/qemu-proxy.h
> +++ b/include/hw/proxy/qemu-proxy.h
> @@ -75,6 +75,8 @@ struct PCIProxyDev {
>                          bool need_spawn, Error **errp);
>  
>      ProxyMemoryRegion region[PCI_NUM_REGIONS];
> +
> +    uint64_t migsize;
>  };
>  
>  typedef struct PCIProxyDevClass {
> diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
> index d2234ca..b42c003 100644
> --- a/include/io/mpqemu-link.h
> +++ b/include/io/mpqemu-link.h
> @@ -63,6 +63,7 @@ typedef enum {
>      PROXY_PING,
>      MMIO_RETURN,
>      DEVICE_RESET,
> +    START_MIG_OUT,
>      MAX,
>  } mpqemu_cmd_t;
>  
> -- 
> 1.8.3.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Jag Raman March 5, 2020, 4:48 p.m. UTC | #2
On 3/5/2020 7:31 AM, Dr. David Alan Gilbert wrote:
> * Jagannathan Raman (jag.raman@oracle.com) wrote:
>> Collect the VMSD from remote process on the source and save
>> it to the channel leading to the destination
>>
>> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
>> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
>> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
>> ---
>>   v4 -> v5:
>>    - Using qemu_file_shutdown() instead of qemu_thread_cancel(). Removed patch
>>      which introduced qemu_thread_cancel()
>>
>>   hw/proxy/qemu-proxy.c         | 135 ++++++++++++++++++++++++++++++++++++++++++
>>   include/hw/proxy/qemu-proxy.h |   2 +
>>   include/io/mpqemu-link.h      |   1 +
>>   3 files changed, 138 insertions(+)
>>
>> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
>> index b1b9282..19f0dbb 100644
>> --- a/hw/proxy/qemu-proxy.c
>> +++ b/hw/proxy/qemu-proxy.c
>> @@ -23,6 +23,14 @@
>>   #include "util/event_notifier-posix.c"
>>   #include "hw/boards.h"
>>   #include "include/qemu/log.h"
>> +#include "io/channel.h"
>> +#include "migration/qemu-file-types.h"
>> +#include "qapi/error.h"
>> +#include "io/channel-util.h"
>> +#include "migration/qemu-file-channel.h"
>> +#include "migration/qemu-file.h"
>> +#include "migration/migration.h"
>> +#include "migration/vmstate.h"
>>   
>>   QEMUTimer *hb_timer;
>>   static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
>> @@ -35,6 +43,9 @@ static void broadcast_init(void);
>>   static int config_op_send(PCIProxyDev *dev, uint32_t addr, uint32_t *val, int l,
>>                             unsigned int op);
>>   
>> +#define PAGE_SIZE qemu_real_host_page_size
>> +uint8_t *mig_data;
>> +
>>   static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
>>   {
>>       /* TODO: Add proper handler. */
>> @@ -460,6 +471,129 @@ static void pci_proxy_dev_inst_init(Object *obj)
>>       dev->mem_init = false;
>>   }
>>   
>> +typedef struct {
>> +    QEMUFile *rem;
>> +    PCIProxyDev *dev;
>> +} proxy_mig_data;
>> +
>> +static void *proxy_mig_out(void *opaque)
>> +{
>> +    proxy_mig_data *data = opaque;
>> +    PCIProxyDev *dev = data->dev;
>> +    uint8_t byte;
>> +    uint64_t data_size = PAGE_SIZE;
>> +
>> +    mig_data = g_malloc(data_size);
>> +
>> +    while (true) {
>> +        byte = qemu_get_byte(data->rem);
>> +
>> +        if (qemu_file_get_error(data->rem)) {
>> +            break;
>> +        }
>> +
>> +        mig_data[dev->migsize++] = byte;
>> +        if (dev->migsize == data_size) {
>> +            data_size += PAGE_SIZE;
>> +            mig_data = g_realloc(mig_data, data_size);
>> +        }
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +static int proxy_pre_save(void *opaque)
>> +{
>> +    PCIProxyDev *pdev = opaque;
>> +    proxy_mig_data *mig_data;
>> +    QEMUFile *f_remote;
>> +    MPQemuMsg msg = {0};
>> +    QemuThread thread;
>> +    Error *err = NULL;
>> +    QIOChannel *ioc;
>> +    uint64_t size;
>> +    int fd[2];
>> +
>> +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
>> +        return -1;
>> +    }
>> +
>> +    ioc = qio_channel_new_fd(fd[0], &err);
>> +    if (err) {
>> +        error_report_err(err);
>> +        return -1;
>> +    }
>> +
>> +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
>> +
>> +    f_remote = qemu_fopen_channel_input(ioc);
>> +
>> +    pdev->migsize = 0;
>> +
>> +    mig_data = g_malloc0(sizeof(proxy_mig_data));
>> +    mig_data->rem = f_remote;
> 
> This feels way too complicated.   Since we know f_remote is always just
> a simple fd we're getting we don't need to use qio or qemu_file here;
> just use the fd - nice and simple.
> 
> Then the code to read it can just use read(2) with a sensible size chunk
> rather than reading a byte at a time.

Hi Dave,

Upon closer inspection, we found that the migration code on the remote
(which uses QEMUFile) could sometimes set an error on the channel using
qemu_file_set_error(). We needed to use qemu_file_get_error() to catch
these errors and abort migration. So we had to stick with QEMUFile at
the receiving end as well.

> 
>> +    mig_data->dev = pdev;
>> +
>> +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
>> +                       QEMU_THREAD_DETACHED);
> 
> I'm just checking why a thread is necessary; is this because you need to
> be able to start reading before you block waiting for the remote to tell
> you the size - worrying that if you don't start reading then the remote
> might block waiting for us?

Yes, Dave. That is correct.

> 
>> +    msg.cmd = START_MIG_OUT;
>> +    msg.bytestream = 0;
>> +    msg.num_fds = 2;
>> +    msg.fds[0] = fd[1];
>> +    msg.fds[1] = GET_REMOTE_WAIT;
>> +
>> +    mpqemu_msg_send(&msg, pdev->mpqemu_link->com);
>> +    size = wait_for_remote(msg.fds[1]);
>> +    PUT_REMOTE_WAIT(msg.fds[1]);
>> +
>> +    assert(size != ULLONG_MAX);
>> +
>> +    /*
>> +     * migsize is being update by a separate thread. Using volatile to
>> +     * instruct the compiler to fetch the value of this variable from
>> +     * memory during every read
>> +     */
>> +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
>> +    }
> 
> Hmm.  I suggest the following:
> 
>    a) You create a shared 'size' variable;  and initialize it to
>      UINT64_MAX.
>    b) The thread uses atomic_read(shared_size) to read that value.
>    c) When you receive the size from the remote you do
>       atomic_set(&shared_size, size);
>    d) The thread does:
>       while (received_size < atomic_read(&shared_size))
> 
>       so the thread will quit either on EOF or it hitting the size.
> 
>    e) We pthread_join here to wait for the thread
>    f) We then check a received size to make sure it matches what we
> expect.
> 
> That removes the tight loop.

Sure, will do.

> 
>> +    qemu_file_shutdown(f_remote);
>> +
>> +    qemu_fclose(f_remote);
>> +    close(fd[1]);
>> +
>> +    return 0;
>> +}
>> +
>> +static int proxy_post_save(void *opaque)
>> +{
>> +    MigrationState *ms = migrate_get_current();
>> +    PCIProxyDev *pdev = opaque;
>> +    uint64_t pos = 0;
>> +
>> +    while (pos < pdev->migsize) {
>> +        qemu_put_byte(ms->to_dst_file, mig_data[pos]);
>> +        pos++;
>> +    }
>> +
>> +    qemu_fflush(ms->to_dst_file);
>> +
>> +    return 0;
> 
> I don't think you need that.
> 
>> +}
>> +
>> +const VMStateDescription vmstate_pci_proxy_device = {
>> +    .name = "PCIProxyDevice",
>> +    .version_id = 2,
>> +    .minimum_version_id = 1,
>> +    .pre_save = proxy_pre_save,
>> +    .post_save = proxy_post_save,
>> +    .fields = (VMStateField[]) {
>> +        VMSTATE_PCI_DEVICE(parent_dev, PCIProxyDev),
>> +        VMSTATE_UINT64(migsize, PCIProxyDev),
> 
> I think instead you should use a VMSTATE_VBUFFER here to save
> the mig_data.
> What we should do is the post_save should g_free the buffer.
> (mig_data should be a field in proxy).

We noticed that VMSTATE_BUFFER requires that the buffer be part of a
"struct" and that the buffer is an array. Since the buffer we're using
is neither an array nor part of a "struct", we decided to go with
writing the buffer directly to the migration stream in proxy_post_save()
instead of using VMSTATE_BUFFER.

I think we should nevertheless g_free this buffer in post_save like you
pointed out.

Thank you!
--
Jag

> 
> Dave
> 
> 
>> +        VMSTATE_END_OF_LIST()
>> +    }
>> +};
>> +
>>   static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
>>   {
>>       PCIDeviceClass *k = PCI_DEVICE_CLASS(klass);
>> @@ -471,6 +605,7 @@ static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
>>       k->config_write = pci_proxy_write_config;
>>   
>>       dc->reset = proxy_device_reset;
>> +    dc->vmsd = &vmstate_pci_proxy_device;
>>   }
>>   
>>   static const TypeInfo pci_proxy_dev_type_info = {
>> diff --git a/include/hw/proxy/qemu-proxy.h b/include/hw/proxy/qemu-proxy.h
>> index 5de8129..537c227 100644
>> --- a/include/hw/proxy/qemu-proxy.h
>> +++ b/include/hw/proxy/qemu-proxy.h
>> @@ -75,6 +75,8 @@ struct PCIProxyDev {
>>                           bool need_spawn, Error **errp);
>>   
>>       ProxyMemoryRegion region[PCI_NUM_REGIONS];
>> +
>> +    uint64_t migsize;
>>   };
>>   
>>   typedef struct PCIProxyDevClass {
>> diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
>> index d2234ca..b42c003 100644
>> --- a/include/io/mpqemu-link.h
>> +++ b/include/io/mpqemu-link.h
>> @@ -63,6 +63,7 @@ typedef enum {
>>       PROXY_PING,
>>       MMIO_RETURN,
>>       DEVICE_RESET,
>> +    START_MIG_OUT,
>>       MAX,
>>   } mpqemu_cmd_t;
>>   
>> -- 
>> 1.8.3.1
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
Dr. David Alan Gilbert March 5, 2020, 5:04 p.m. UTC | #3
* Jag Raman (jag.raman@oracle.com) wrote:
> 
> 
> On 3/5/2020 7:31 AM, Dr. David Alan Gilbert wrote:
> > * Jagannathan Raman (jag.raman@oracle.com) wrote:
> > > Collect the VMSD from remote process on the source and save
> > > it to the channel leading to the destination
> > > 
> > > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> > > Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> > > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> > > ---
> > >   v4 -> v5:
> > >    - Using qemu_file_shutdown() instead of qemu_thread_cancel(). Removed patch
> > >      which introduced qemu_thread_cancel()
> > > 
> > >   hw/proxy/qemu-proxy.c         | 135 ++++++++++++++++++++++++++++++++++++++++++
> > >   include/hw/proxy/qemu-proxy.h |   2 +
> > >   include/io/mpqemu-link.h      |   1 +
> > >   3 files changed, 138 insertions(+)
> > > 
> > > diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
> > > index b1b9282..19f0dbb 100644
> > > --- a/hw/proxy/qemu-proxy.c
> > > +++ b/hw/proxy/qemu-proxy.c
> > > @@ -23,6 +23,14 @@
> > >   #include "util/event_notifier-posix.c"
> > >   #include "hw/boards.h"
> > >   #include "include/qemu/log.h"
> > > +#include "io/channel.h"
> > > +#include "migration/qemu-file-types.h"
> > > +#include "qapi/error.h"
> > > +#include "io/channel-util.h"
> > > +#include "migration/qemu-file-channel.h"
> > > +#include "migration/qemu-file.h"
> > > +#include "migration/migration.h"
> > > +#include "migration/vmstate.h"
> > >   QEMUTimer *hb_timer;
> > >   static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
> > > @@ -35,6 +43,9 @@ static void broadcast_init(void);
> > >   static int config_op_send(PCIProxyDev *dev, uint32_t addr, uint32_t *val, int l,
> > >                             unsigned int op);
> > > +#define PAGE_SIZE qemu_real_host_page_size
> > > +uint8_t *mig_data;
> > > +
> > >   static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
> > >   {
> > >       /* TODO: Add proper handler. */
> > > @@ -460,6 +471,129 @@ static void pci_proxy_dev_inst_init(Object *obj)
> > >       dev->mem_init = false;
> > >   }
> > > +typedef struct {
> > > +    QEMUFile *rem;
> > > +    PCIProxyDev *dev;
> > > +} proxy_mig_data;
> > > +
> > > +static void *proxy_mig_out(void *opaque)
> > > +{
> > > +    proxy_mig_data *data = opaque;
> > > +    PCIProxyDev *dev = data->dev;
> > > +    uint8_t byte;
> > > +    uint64_t data_size = PAGE_SIZE;
> > > +
> > > +    mig_data = g_malloc(data_size);
> > > +
> > > +    while (true) {
> > > +        byte = qemu_get_byte(data->rem);
> > > +
> > > +        if (qemu_file_get_error(data->rem)) {
> > > +            break;
> > > +        }
> > > +
> > > +        mig_data[dev->migsize++] = byte;
> > > +        if (dev->migsize == data_size) {
> > > +            data_size += PAGE_SIZE;
> > > +            mig_data = g_realloc(mig_data, data_size);
> > > +        }
> > > +    }
> > > +
> > > +    return NULL;
> > > +}
> > > +
> > > +static int proxy_pre_save(void *opaque)
> > > +{
> > > +    PCIProxyDev *pdev = opaque;
> > > +    proxy_mig_data *mig_data;
> > > +    QEMUFile *f_remote;
> > > +    MPQemuMsg msg = {0};
> > > +    QemuThread thread;
> > > +    Error *err = NULL;
> > > +    QIOChannel *ioc;
> > > +    uint64_t size;
> > > +    int fd[2];
> > > +
> > > +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
> > > +        return -1;
> > > +    }
> > > +
> > > +    ioc = qio_channel_new_fd(fd[0], &err);
> > > +    if (err) {
> > > +        error_report_err(err);
> > > +        return -1;
> > > +    }
> > > +
> > > +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
> > > +
> > > +    f_remote = qemu_fopen_channel_input(ioc);
> > > +
> > > +    pdev->migsize = 0;
> > > +
> > > +    mig_data = g_malloc0(sizeof(proxy_mig_data));
> > > +    mig_data->rem = f_remote;
> > 
> > This feels way too complicated.   Since we know f_remote is always just
> > a simple fd we're getting we don't need to use qio or qemu_file here;
> > just use the fd - nice and simple.
> > 
> > Then the code to read it can just use read(2) with a sensible size chunk
> > rather than reading a byte at a time.
> 
> Hi Dave,
> 
> Upon closer inspection, we found that the migration code on the remote
> (which uses QEMUFile) could sometimes set an error on the channel using
> qemu_file_set_error(). We needed to use qemu_file_get_error() to catch
> these errors and abort migration. So we had to stick with QEMUFile at
> the receiving end as well.

I realise you need to use a QEMUFile on the part that connects to the
Savevm code/vmstate code - but that doesn't mean you need it on the side
that just connects between your pipe and the qemu.

> > 
> > > +    mig_data->dev = pdev;
> > > +
> > > +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
> > > +                       QEMU_THREAD_DETACHED);
> > 
> > I'm just checking why a thread is necessary; is this because you need to
> > be able to start reading before you block waiting for the remote to tell
> > you the size - worrying that if you don't start reading then the remote
> > might block waiting for us?
> 
> Yes, Dave. That is correct.
> 
> > 
> > > +    msg.cmd = START_MIG_OUT;
> > > +    msg.bytestream = 0;
> > > +    msg.num_fds = 2;
> > > +    msg.fds[0] = fd[1];
> > > +    msg.fds[1] = GET_REMOTE_WAIT;
> > > +
> > > +    mpqemu_msg_send(&msg, pdev->mpqemu_link->com);
> > > +    size = wait_for_remote(msg.fds[1]);
> > > +    PUT_REMOTE_WAIT(msg.fds[1]);
> > > +
> > > +    assert(size != ULLONG_MAX);
> > > +
> > > +    /*
> > > +     * migsize is being update by a separate thread. Using volatile to
> > > +     * instruct the compiler to fetch the value of this variable from
> > > +     * memory during every read
> > > +     */
> > > +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
> > > +    }
> > 
> > Hmm.  I suggest the following:
> > 
> >    a) You create a shared 'size' variable;  and initialize it to
> >      UINT64_MAX.
> >    b) The thread uses atomic_read(shared_size) to read that value.
> >    c) When you receive the size from the remote you do
> >       atomic_set(&shared_size, size);
> >    d) The thread does:
> >       while (received_size < atomic_read(&shared_size))
> > 
> >       so the thread will quit either on EOF or it hitting the size.
> > 
> >    e) We pthread_join here to wait for the thread
> >    f) We then check a received size to make sure it matches what we
> > expect.
> > 
> > That removes the tight loop.
> 
> Sure, will do.
> 
> > 
> > > +    qemu_file_shutdown(f_remote);
> > > +
> > > +    qemu_fclose(f_remote);
> > > +    close(fd[1]);
> > > +
> > > +    return 0;
> > > +}
> > > +
> > > +static int proxy_post_save(void *opaque)
> > > +{
> > > +    MigrationState *ms = migrate_get_current();
> > > +    PCIProxyDev *pdev = opaque;
> > > +    uint64_t pos = 0;
> > > +
> > > +    while (pos < pdev->migsize) {
> > > +        qemu_put_byte(ms->to_dst_file, mig_data[pos]);
> > > +        pos++;
> > > +    }
> > > +
> > > +    qemu_fflush(ms->to_dst_file);
> > > +
> > > +    return 0;
> > 
> > I don't think you need that.
> > 
> > > +}
> > > +
> > > +const VMStateDescription vmstate_pci_proxy_device = {
> > > +    .name = "PCIProxyDevice",
> > > +    .version_id = 2,
> > > +    .minimum_version_id = 1,
> > > +    .pre_save = proxy_pre_save,
> > > +    .post_save = proxy_post_save,
> > > +    .fields = (VMStateField[]) {
> > > +        VMSTATE_PCI_DEVICE(parent_dev, PCIProxyDev),
> > > +        VMSTATE_UINT64(migsize, PCIProxyDev),
> > 
> > I think instead you should use a VMSTATE_VBUFFER here to save
> > the mig_data.
> > What we should do is the post_save should g_free the buffer.
> > (mig_data should be a field in proxy).
> 
> We noticed that VMSTATE_BUFFER requires that the buffer be part of a
> "struct" and that the buffer is an array. Since the buffer we're using
> is neither an array nor part of a "struct", we decided to go with
> writing the buffer directly to the migration stream in proxy_post_save()
> instead of using VMSTATE_BUFFER.
> 
> I think we should nevertheless g_free this buffer in post_save like you
> pointed out.

Note I'm suggesting using VMSTATE_VBUFFER, not VMSTATE_BUFFER;
VBUFFER is expecting a unsigned char *; it does expect that to be in
your structure, so I'd expect your proxy to have a mig_data and uint32_t
mig_len fields.

Dave

> Thank you!
> --
> Jag
> 
> > 
> > Dave
> > 
> > 
> > > +        VMSTATE_END_OF_LIST()
> > > +    }
> > > +};
> > > +
> > >   static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
> > >   {
> > >       PCIDeviceClass *k = PCI_DEVICE_CLASS(klass);
> > > @@ -471,6 +605,7 @@ static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
> > >       k->config_write = pci_proxy_write_config;
> > >       dc->reset = proxy_device_reset;
> > > +    dc->vmsd = &vmstate_pci_proxy_device;
> > >   }
> > >   static const TypeInfo pci_proxy_dev_type_info = {
> > > diff --git a/include/hw/proxy/qemu-proxy.h b/include/hw/proxy/qemu-proxy.h
> > > index 5de8129..537c227 100644
> > > --- a/include/hw/proxy/qemu-proxy.h
> > > +++ b/include/hw/proxy/qemu-proxy.h
> > > @@ -75,6 +75,8 @@ struct PCIProxyDev {
> > >                           bool need_spawn, Error **errp);
> > >       ProxyMemoryRegion region[PCI_NUM_REGIONS];
> > > +
> > > +    uint64_t migsize;
> > >   };
> > >   typedef struct PCIProxyDevClass {
> > > diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
> > > index d2234ca..b42c003 100644
> > > --- a/include/io/mpqemu-link.h
> > > +++ b/include/io/mpqemu-link.h
> > > @@ -63,6 +63,7 @@ typedef enum {
> > >       PROXY_PING,
> > >       MMIO_RETURN,
> > >       DEVICE_RESET,
> > > +    START_MIG_OUT,
> > >       MAX,
> > >   } mpqemu_cmd_t;
> > > -- 
> > > 1.8.3.1
> > > 
> > --
> > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> > 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox series

Patch

diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
index b1b9282..19f0dbb 100644
--- a/hw/proxy/qemu-proxy.c
+++ b/hw/proxy/qemu-proxy.c
@@ -23,6 +23,14 @@ 
 #include "util/event_notifier-posix.c"
 #include "hw/boards.h"
 #include "include/qemu/log.h"
+#include "io/channel.h"
+#include "migration/qemu-file-types.h"
+#include "qapi/error.h"
+#include "io/channel-util.h"
+#include "migration/qemu-file-channel.h"
+#include "migration/qemu-file.h"
+#include "migration/migration.h"
+#include "migration/vmstate.h"
 
 QEMUTimer *hb_timer;
 static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
@@ -35,6 +43,9 @@  static void broadcast_init(void);
 static int config_op_send(PCIProxyDev *dev, uint32_t addr, uint32_t *val, int l,
                           unsigned int op);
 
+#define PAGE_SIZE qemu_real_host_page_size
+uint8_t *mig_data;
+
 static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
 {
     /* TODO: Add proper handler. */
@@ -460,6 +471,129 @@  static void pci_proxy_dev_inst_init(Object *obj)
     dev->mem_init = false;
 }
 
+typedef struct {
+    QEMUFile *rem;
+    PCIProxyDev *dev;
+} proxy_mig_data;
+
+static void *proxy_mig_out(void *opaque)
+{
+    proxy_mig_data *data = opaque;
+    PCIProxyDev *dev = data->dev;
+    uint8_t byte;
+    uint64_t data_size = PAGE_SIZE;
+
+    mig_data = g_malloc(data_size);
+
+    while (true) {
+        byte = qemu_get_byte(data->rem);
+
+        if (qemu_file_get_error(data->rem)) {
+            break;
+        }
+
+        mig_data[dev->migsize++] = byte;
+        if (dev->migsize == data_size) {
+            data_size += PAGE_SIZE;
+            mig_data = g_realloc(mig_data, data_size);
+        }
+    }
+
+    return NULL;
+}
+
+static int proxy_pre_save(void *opaque)
+{
+    PCIProxyDev *pdev = opaque;
+    proxy_mig_data *mig_data;
+    QEMUFile *f_remote;
+    MPQemuMsg msg = {0};
+    QemuThread thread;
+    Error *err = NULL;
+    QIOChannel *ioc;
+    uint64_t size;
+    int fd[2];
+
+    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
+        return -1;
+    }
+
+    ioc = qio_channel_new_fd(fd[0], &err);
+    if (err) {
+        error_report_err(err);
+        return -1;
+    }
+
+    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
+
+    f_remote = qemu_fopen_channel_input(ioc);
+
+    pdev->migsize = 0;
+
+    mig_data = g_malloc0(sizeof(proxy_mig_data));
+    mig_data->rem = f_remote;
+    mig_data->dev = pdev;
+
+    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
+                       QEMU_THREAD_DETACHED);
+
+    msg.cmd = START_MIG_OUT;
+    msg.bytestream = 0;
+    msg.num_fds = 2;
+    msg.fds[0] = fd[1];
+    msg.fds[1] = GET_REMOTE_WAIT;
+
+    mpqemu_msg_send(&msg, pdev->mpqemu_link->com);
+    size = wait_for_remote(msg.fds[1]);
+    PUT_REMOTE_WAIT(msg.fds[1]);
+
+    assert(size != ULLONG_MAX);
+
+    /*
+     * migsize is being update by a separate thread. Using volatile to
+     * instruct the compiler to fetch the value of this variable from
+     * memory during every read
+     */
+    while (*((volatile uint64_t *)&pdev->migsize) < size) {
+    }
+
+    qemu_file_shutdown(f_remote);
+
+    qemu_fclose(f_remote);
+    close(fd[1]);
+
+    return 0;
+}
+
+static int proxy_post_save(void *opaque)
+{
+    MigrationState *ms = migrate_get_current();
+    PCIProxyDev *pdev = opaque;
+    uint64_t pos = 0;
+
+    while (pos < pdev->migsize) {
+        qemu_put_byte(ms->to_dst_file, mig_data[pos]);
+        pos++;
+    }
+
+    qemu_fflush(ms->to_dst_file);
+
+    return 0;
+}
+
+const VMStateDescription vmstate_pci_proxy_device = {
+    .name = "PCIProxyDevice",
+    .version_id = 2,
+    .minimum_version_id = 1,
+    .pre_save = proxy_pre_save,
+    .post_save = proxy_post_save,
+    .fields = (VMStateField[]) {
+        VMSTATE_PCI_DEVICE(parent_dev, PCIProxyDev),
+        VMSTATE_UINT64(migsize, PCIProxyDev),
+        VMSTATE_END_OF_LIST()
+    }
+};
+
 static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
 {
     PCIDeviceClass *k = PCI_DEVICE_CLASS(klass);
@@ -471,6 +605,7 @@  static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
     k->config_write = pci_proxy_write_config;
 
     dc->reset = proxy_device_reset;
+    dc->vmsd = &vmstate_pci_proxy_device;
 }
 
 static const TypeInfo pci_proxy_dev_type_info = {
diff --git a/include/hw/proxy/qemu-proxy.h b/include/hw/proxy/qemu-proxy.h
index 5de8129..537c227 100644
--- a/include/hw/proxy/qemu-proxy.h
+++ b/include/hw/proxy/qemu-proxy.h
@@ -75,6 +75,8 @@  struct PCIProxyDev {
                         bool need_spawn, Error **errp);
 
     ProxyMemoryRegion region[PCI_NUM_REGIONS];
+
+    uint64_t migsize;
 };
 
 typedef struct PCIProxyDevClass {
diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
index d2234ca..b42c003 100644
--- a/include/io/mpqemu-link.h
+++ b/include/io/mpqemu-link.h
@@ -63,6 +63,7 @@  typedef enum {
     PROXY_PING,
     MMIO_RETURN,
     DEVICE_RESET,
+    START_MIG_OUT,
     MAX,
 } mpqemu_cmd_t;