diff mbox

[v1,13/22] migration: convert RDMA to use QIOChannel interface

Message ID 1452599056-27357-14-git-send-email-berrange@redhat.com
State New
Headers show

Commit Message

Daniel P. Berrangé Jan. 12, 2016, 11:44 a.m. UTC
This converts the RDMA code to provide a subclass of
QIOChannel that uses RDMA for the data transport.

The RDMA code would be much better off it it could
be split up in a generic RDMA layer, a QIOChannel
impl based on RMDA, and then the RMDA migration
glue. This is left as a future exercise for the brave.

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
---
 migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++---------------------
 1 file changed, 161 insertions(+), 99 deletions(-)

Comments

Dr. David Alan Gilbert Feb. 2, 2016, 8:01 p.m. UTC | #1
* Daniel P. Berrange (berrange@redhat.com) wrote:
> This converts the RDMA code to provide a subclass of
> QIOChannel that uses RDMA for the data transport.
> 
> The RDMA code would be much better off it it could
> be split up in a generic RDMA layer, a QIOChannel
> impl based on RMDA, and then the RMDA migration
> glue. This is left as a future exercise for the brave.
> 
> Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
> ---
>  migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++---------------------
>  1 file changed, 161 insertions(+), 99 deletions(-)
> 
> diff --git a/migration/rdma.c b/migration/rdma.c
> index bffbfaf..3e961cb 100644
> --- a/migration/rdma.c
> +++ b/migration/rdma.c
> @@ -374,14 +374,19 @@ typedef struct RDMAContext {
>      GHashTable *blockmap;
>  } RDMAContext;
>  
> -/*
> - * Interface to the rest of the migration call stack.
> - */
> -typedef struct QEMUFileRDMA {
> +#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
> +#define QIO_CHANNEL_RDMA(obj)                                     \
> +    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
> +
> +typedef struct QIOChannelRDMA QIOChannelRDMA;
> +
> +
> +struct QIOChannelRDMA {
> +    QIOChannel parent;
>      RDMAContext *rdma;
> +    QEMUFile *file;
>      size_t len;
> -    void *file;
> -} QEMUFileRDMA;
> +};
>  
>  /*
>   * Main structure for IB Send/Recv control messages.
> @@ -2518,15 +2523,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
>   * SEND messages for control only.
>   * VM's ram is handled with regular RDMA messages.
>   */
> -static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
> -                                    int64_t pos, size_t size)
> -{
> -    QEMUFileRDMA *r = opaque;
> -    QEMUFile *f = r->file;
> -    RDMAContext *rdma = r->rdma;
> -    size_t remaining = size;
> -    uint8_t * data = (void *) buf;
> +static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
> +                                       const struct iovec *iov,
> +                                       size_t niov,
> +                                       int *fds,
> +                                       size_t nfds,
> +                                       Error **errp)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +    QEMUFile *f = rioc->file;
> +    RDMAContext *rdma = rioc->rdma;
>      int ret;
> +    ssize_t done = 0;
> +    size_t i;
>  
>      CHECK_ERROR_STATE();
>  
> @@ -2540,27 +2549,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
>          return ret;
>      }
>  
> -    while (remaining) {
> -        RDMAControlHeader head;
> +    for (i = 0; i < niov; i++) {
> +        size_t remaining = iov[i].iov_len;
> +        uint8_t * data = (void *)iov[i].iov_base;
> +        while (remaining) {
> +            RDMAControlHeader head;
>  
> -        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
> -        remaining -= r->len;
> +            rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
> +            remaining -= rioc->len;
>  
> -        /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
> -        head.len = (uint32_t)r->len;
> -        head.type = RDMA_CONTROL_QEMU_FILE;
> +            head.len = rioc->len;
> +            head.type = RDMA_CONTROL_QEMU_FILE;
>  
> -        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
> +            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
>  
> -        if (ret < 0) {
> -            rdma->error_state = ret;
> -            return ret;
> -        }
> +            if (ret < 0) {
> +                rdma->error_state = ret;
> +                return ret;
> +            }
>  
> -        data += r->len;
> +            data += rioc->len;
> +            done += rioc->len;
> +        }
>      }
>  
> -    return size;
> +    return done;
>  }
>  
>  static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
> @@ -2585,41 +2598,65 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
>   * RDMA links don't use bytestreams, so we have to
>   * return bytes to QEMUFile opportunistically.
>   */
> -static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
> -                                    int64_t pos, size_t size)
> -{
> -    QEMUFileRDMA *r = opaque;
> -    RDMAContext *rdma = r->rdma;
> +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
> +                                      const struct iovec *iov,
> +                                      size_t niov,
> +                                      int **fds,
> +                                      size_t *nfds,
> +                                      Error **errp)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +    RDMAContext *rdma = rioc->rdma;
>      RDMAControlHeader head;
>      int ret = 0;
> +    ssize_t i;
> +    size_t done = 0;
>  
>      CHECK_ERROR_STATE();
>  
> -    /*
> -     * First, we hold on to the last SEND message we
> -     * were given and dish out the bytes until we run
> -     * out of bytes.
> -     */
> -    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
> -    if (r->len) {
> -        return r->len;
> -    }
> +    for (i = 0; i < niov; i++) {
> +        size_t want = iov[i].iov_len;
> +        uint8_t *data = (void *)iov[i].iov_base;
>  
> -    /*
> -     * Once we run out, we block and wait for another
> -     * SEND message to arrive.
> -     */
> -    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> +        /*
> +         * First, we hold on to the last SEND message we
> +         * were given and dish out the bytes until we run
> +         * out of bytes.
> +         */
> +        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> +        if (ret > 0) {
> +            done += ret;
> +            if (ret < want) {
> +                break;
> +            } else {
> +                continue;
> +            }

> +        }
>  
> -    if (ret < 0) {
> -        rdma->error_state = ret;
> -        return ret;
> -    }
> +        /*
> +         * Once we run out, we block and wait for another
> +         * SEND message to arrive.
> +         */
> +        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
>  
> -    /*
> -     * SEND was received with new bytes, now try again.
> -     */
> -    return qemu_rdma_fill(r->rdma, buf, size, 0);
> +        if (ret < 0) {
> +            rdma->error_state = ret;
> +            return ret;
> +        }
> +
> +        /*
> +         * SEND was received with new bytes, now try again.
> +         */
> +        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> +        if (ret > 0) {
> +            done += ret;
> +            if (ret < want) {
> +                break;
> +            }
> +        }

I don't quite understand the behaviour of this loop.
If rdma_fill returns less than you wanted for the first iov we break.
If it returns 0 then we try and get some more.
The weird thing to me is if we have two iov entries; if the
amount returned by the qemu_rdma_fill happens to match the size of
the 1st iov then I think we end up doing the exchange_recv and
waiting for more.  Is that what we want? Why?

Dave

> +    }
> +    rioc->len = done;
> +    return rioc->len;
>  }
>  
>  /*
> @@ -2646,15 +2683,16 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
>      return 0;
>  }
>  
> -static int qemu_rdma_close(void *opaque)
> +static int qio_channel_rdma_close(QIOChannel *ioc,
> +                                  Error **errp)
>  {
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>      trace_qemu_rdma_close();
> -    QEMUFileRDMA *r = opaque;
> -    if (r->rdma) {
> -        qemu_rdma_cleanup(r->rdma);
> -        g_free(r->rdma);
> +    if (rioc->rdma) {
> +        qemu_rdma_cleanup(rioc->rdma);
> +        g_free(rioc->rdma);
> +        rioc->rdma = NULL;
>      }
> -    g_free(r);
>      return 0;
>  }
>  
> @@ -2696,8 +2734,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
>                                    ram_addr_t block_offset, ram_addr_t offset,
>                                    size_t size, uint64_t *bytes_sent)
>  {
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>      int ret;
>  
>      CHECK_ERROR_STATE();
> @@ -2951,8 +2989,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
>                               };
>      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>                                   .repeat = 1 };
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>      RDMALocalBlocks *local = &rdma->local_ram_blocks;
>      RDMAControlHeader head;
>      RDMARegister *reg, *registers;
> @@ -3207,9 +3245,10 @@ out:
>   * We've already built our local RAMBlock list, but not yet sent the list to
>   * the source.
>   */
> -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name)
> +static int
> +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
>  {
> -    RDMAContext *rdma = rfile->rdma;
> +    RDMAContext *rdma = rioc->rdma;
>      int curr;
>      int found = -1;
>  
> @@ -3251,8 +3290,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
>  static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>                                          uint64_t flags, void *data)
>  {
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>  
>      CHECK_ERROR_STATE();
>  
> @@ -3271,8 +3310,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>                                         uint64_t flags, void *data)
>  {
>      Error *local_err = NULL, **errp = &local_err;
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> +    RDMAContext *rdma = rioc->rdma;
>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>      int ret = 0;
>  
> @@ -3368,55 +3407,78 @@ err:
>      return ret;
>  }
>  
> -static int qemu_rdma_get_fd(void *opaque)
> -{
> -    QEMUFileRDMA *rfile = opaque;
> -    RDMAContext *rdma = rfile->rdma;
> -
> -    return rdma->comp_channel->fd;
> -}
> -
> -static const QEMUFileOps rdma_read_ops = {
> -    .get_buffer    = qemu_rdma_get_buffer,
> -    .get_fd        = qemu_rdma_get_fd,
> -    .close         = qemu_rdma_close,
> -};
> -
>  static const QEMUFileHooks rdma_read_hooks = {
>      .hook_ram_load = rdma_load_hook,
>  };
>  
> -static const QEMUFileOps rdma_write_ops = {
> -    .put_buffer         = qemu_rdma_put_buffer,
> -    .close              = qemu_rdma_close,
> -};
> -
>  static const QEMUFileHooks rdma_write_hooks = {
>      .before_ram_iterate = qemu_rdma_registration_start,
>      .after_ram_iterate  = qemu_rdma_registration_stop,
>      .save_page          = qemu_rdma_save_page,
>  };
>  
> -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
> +
> +static void qio_channel_rdma_finalize(Object *obj)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
> +    if (rioc->rdma) {
> +        qemu_rdma_cleanup(rioc->rdma);
> +        g_free(rioc->rdma);
> +        rioc->rdma = NULL;
> +    }
> +}
> +
> +static void qio_channel_rdma_class_init(ObjectClass *klass,
> +                                        void *class_data G_GNUC_UNUSED)
> +{
> +    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
> +
> +    ioc_klass->io_writev = qio_channel_rdma_writev;
> +    ioc_klass->io_readv = qio_channel_rdma_readv;
> +    /* XXX
> +    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
> +    */
> +    ioc_klass->io_close = qio_channel_rdma_close;
> +    /* XXX
> +    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
> +    */
> +}
> +
> +static const TypeInfo qio_channel_rdma_info = {
> +    .parent = TYPE_QIO_CHANNEL,
> +    .name = TYPE_QIO_CHANNEL_RDMA,
> +    .instance_size = sizeof(QIOChannelRDMA),
> +    .instance_finalize = qio_channel_rdma_finalize,
> +    .class_init = qio_channel_rdma_class_init,
> +};
> +
> +static void qio_channel_rdma_register_types(void)
> +{
> +    type_register_static(&qio_channel_rdma_info);
> +}
> +
> +type_init(qio_channel_rdma_register_types);
> +
> +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
>  {
> -    QEMUFileRDMA *r;
> +    QIOChannelRDMA *rioc;
>  
>      if (qemu_file_mode_is_not_valid(mode)) {
>          return NULL;
>      }
>  
> -    r = g_new0(QEMUFileRDMA, 1);
> -    r->rdma = rdma;
> +    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> +    rioc->rdma = rdma;
>  
>      if (mode[0] == 'w') {
> -        r->file = qemu_fopen_ops(r, &rdma_write_ops);
> -        qemu_file_set_hooks(r->file, &rdma_write_hooks);
> +        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
> +        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
>      } else {
> -        r->file = qemu_fopen_ops(r, &rdma_read_ops);
> -        qemu_file_set_hooks(r->file, &rdma_read_hooks);
> +        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
> +        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
>      }
>  
> -    return r->file;
> +    return rioc->file;
>  }
>  
>  static void rdma_accept_incoming_migration(void *opaque)
> -- 
> 2.5.0
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Daniel P. Berrangé Feb. 3, 2016, 11:37 a.m. UTC | #2
On Tue, Feb 02, 2016 at 08:01:36PM +0000, Dr. David Alan Gilbert wrote:
> * Daniel P. Berrange (berrange@redhat.com) wrote:
> > This converts the RDMA code to provide a subclass of
> > QIOChannel that uses RDMA for the data transport.
> > 
> > The RDMA code would be much better off it it could
> > be split up in a generic RDMA layer, a QIOChannel
> > impl based on RMDA, and then the RMDA migration
> > glue. This is left as a future exercise for the brave.
> > 
> > Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
> > ---
> >  migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++---------------------
> >  1 file changed, 161 insertions(+), 99 deletions(-)
> > 

> > +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
> > +                                      const struct iovec *iov,
> > +                                      size_t niov,
> > +                                      int **fds,
> > +                                      size_t *nfds,
> > +                                      Error **errp)
> > +{
> > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > +    RDMAContext *rdma = rioc->rdma;
> >      RDMAControlHeader head;
> >      int ret = 0;
> > +    ssize_t i;
> > +    size_t done = 0;
> >  
> >      CHECK_ERROR_STATE();
> >  
> > -    /*
> > -     * First, we hold on to the last SEND message we
> > -     * were given and dish out the bytes until we run
> > -     * out of bytes.
> > -     */
> > -    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
> > -    if (r->len) {
> > -        return r->len;
> > -    }
> > +    for (i = 0; i < niov; i++) {
> > +        size_t want = iov[i].iov_len;
> > +        uint8_t *data = (void *)iov[i].iov_base;
> >  
> > -    /*
> > -     * Once we run out, we block and wait for another
> > -     * SEND message to arrive.
> > -     */
> > -    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> > +        /*
> > +         * First, we hold on to the last SEND message we
> > +         * were given and dish out the bytes until we run
> > +         * out of bytes.
> > +         */
> > +        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> > +        if (ret > 0) {
> > +            done += ret;
> > +            if (ret < want) {
> > +                break;
> > +            } else {
> > +                continue;
> > +            }
> 
> > +        }
> >  
> > -    if (ret < 0) {
> > -        rdma->error_state = ret;
> > -        return ret;
> > -    }
> > +        /*
> > +         * Once we run out, we block and wait for another
> > +         * SEND message to arrive.
> > +         */
> > +        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> >  
> > -    /*
> > -     * SEND was received with new bytes, now try again.
> > -     */
> > -    return qemu_rdma_fill(r->rdma, buf, size, 0);
> > +        if (ret < 0) {
> > +            rdma->error_state = ret;
> > +            return ret;
> > +        }
> > +
> > +        /*
> > +         * SEND was received with new bytes, now try again.
> > +         */
> > +        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> > +        if (ret > 0) {
> > +            done += ret;
> > +            if (ret < want) {
> > +                break;
> > +            }
> > +        }
> 
> I don't quite understand the behaviour of this loop.
> If rdma_fill returns less than you wanted for the first iov we break.
> If it returns 0 then we try and get some more.
> The weird thing to me is if we have two iov entries; if the
> amount returned by the qemu_rdma_fill happens to match the size of
> the 1st iov then I think we end up doing the exchange_recv and
> waiting for more.  Is that what we want? Why?

No, it isn't quite what we want. If we have successfully received
some data in a preceeding iov, then we shouldn't wait for more data
for any following iov. I'll rework this bit of code to work better

In fact technically, we should not block for data at all when the
channel is in non-blocking mode. Fixing that would require some
refactoring of qemu_rdma_block_for_wrid() so that we could tell
it to only look for an already completed work request and not
block

Regards,
Daniel
Dr. David Alan Gilbert Feb. 3, 2016, 1:23 p.m. UTC | #3
* Daniel P. Berrange (berrange@redhat.com) wrote:
> On Tue, Feb 02, 2016 at 08:01:36PM +0000, Dr. David Alan Gilbert wrote:
> > * Daniel P. Berrange (berrange@redhat.com) wrote:
> > > This converts the RDMA code to provide a subclass of
> > > QIOChannel that uses RDMA for the data transport.
> > > 
> > > The RDMA code would be much better off it it could
> > > be split up in a generic RDMA layer, a QIOChannel
> > > impl based on RMDA, and then the RMDA migration
> > > glue. This is left as a future exercise for the brave.
> > > 
> > > Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
> > > ---
> > >  migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++---------------------
> > >  1 file changed, 161 insertions(+), 99 deletions(-)
> > > 
> 
> > > +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
> > > +                                      const struct iovec *iov,
> > > +                                      size_t niov,
> > > +                                      int **fds,
> > > +                                      size_t *nfds,
> > > +                                      Error **errp)
> > > +{
> > > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > > +    RDMAContext *rdma = rioc->rdma;
> > >      RDMAControlHeader head;
> > >      int ret = 0;
> > > +    ssize_t i;
> > > +    size_t done = 0;
> > >  
> > >      CHECK_ERROR_STATE();
> > >  
> > > -    /*
> > > -     * First, we hold on to the last SEND message we
> > > -     * were given and dish out the bytes until we run
> > > -     * out of bytes.
> > > -     */
> > > -    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
> > > -    if (r->len) {
> > > -        return r->len;
> > > -    }
> > > +    for (i = 0; i < niov; i++) {
> > > +        size_t want = iov[i].iov_len;
> > > +        uint8_t *data = (void *)iov[i].iov_base;
> > >  
> > > -    /*
> > > -     * Once we run out, we block and wait for another
> > > -     * SEND message to arrive.
> > > -     */
> > > -    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> > > +        /*
> > > +         * First, we hold on to the last SEND message we
> > > +         * were given and dish out the bytes until we run
> > > +         * out of bytes.
> > > +         */
> > > +        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> > > +        if (ret > 0) {
> > > +            done += ret;
> > > +            if (ret < want) {
> > > +                break;
> > > +            } else {
> > > +                continue;
> > > +            }
> > 
> > > +        }
> > >  
> > > -    if (ret < 0) {
> > > -        rdma->error_state = ret;
> > > -        return ret;
> > > -    }
> > > +        /*
> > > +         * Once we run out, we block and wait for another
> > > +         * SEND message to arrive.
> > > +         */
> > > +        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> > >  
> > > -    /*
> > > -     * SEND was received with new bytes, now try again.
> > > -     */
> > > -    return qemu_rdma_fill(r->rdma, buf, size, 0);
> > > +        if (ret < 0) {
> > > +            rdma->error_state = ret;
> > > +            return ret;
> > > +        }
> > > +
> > > +        /*
> > > +         * SEND was received with new bytes, now try again.
> > > +         */
> > > +        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> > > +        if (ret > 0) {
> > > +            done += ret;
> > > +            if (ret < want) {
> > > +                break;
> > > +            }
> > > +        }
> > 
> > I don't quite understand the behaviour of this loop.
> > If rdma_fill returns less than you wanted for the first iov we break.
> > If it returns 0 then we try and get some more.
> > The weird thing to me is if we have two iov entries; if the
> > amount returned by the qemu_rdma_fill happens to match the size of
> > the 1st iov then I think we end up doing the exchange_recv and
> > waiting for more.  Is that what we want? Why?
> 
> No, it isn't quite what we want. If we have successfully received
> some data in a preceeding iov, then we shouldn't wait for more data
> for any following iov. I'll rework this bit of code to work better
> 
> In fact technically, we should not block for data at all when the
> channel is in non-blocking mode. Fixing that would require some
> refactoring of qemu_rdma_block_for_wrid() so that we could tell
> it to only look for an already completed work request and not
> block

I wouldn't go changing qemu_rdma_block_for_wrid unless you need
to.

Dave

> 
> Regards,
> Daniel
> -- 
> |: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
> |: http://libvirt.org              -o-             http://virt-manager.org :|
> |: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
> |: http://entangle-photo.org       -o-       http://live.gnome.org/gtk-vnc :|
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Daniel P. Berrangé Feb. 3, 2016, 1:25 p.m. UTC | #4
On Wed, Feb 03, 2016 at 01:23:04PM +0000, Dr. David Alan Gilbert wrote:
> * Daniel P. Berrange (berrange@redhat.com) wrote:
> > On Tue, Feb 02, 2016 at 08:01:36PM +0000, Dr. David Alan Gilbert wrote:
> > > * Daniel P. Berrange (berrange@redhat.com) wrote:
> > > > This converts the RDMA code to provide a subclass of
> > > > QIOChannel that uses RDMA for the data transport.
> > > > 
> > > > The RDMA code would be much better off it it could
> > > > be split up in a generic RDMA layer, a QIOChannel
> > > > impl based on RMDA, and then the RMDA migration
> > > > glue. This is left as a future exercise for the brave.
> > > > 
> > > > Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
> > > > ---
> > > >  migration/rdma.c | 260 ++++++++++++++++++++++++++++++++++---------------------
> > > >  1 file changed, 161 insertions(+), 99 deletions(-)
> > > > 

> > > 
> > > I don't quite understand the behaviour of this loop.
> > > If rdma_fill returns less than you wanted for the first iov we break.
> > > If it returns 0 then we try and get some more.
> > > The weird thing to me is if we have two iov entries; if the
> > > amount returned by the qemu_rdma_fill happens to match the size of
> > > the 1st iov then I think we end up doing the exchange_recv and
> > > waiting for more.  Is that what we want? Why?
> > 
> > No, it isn't quite what we want. If we have successfully received
> > some data in a preceeding iov, then we shouldn't wait for more data
> > for any following iov. I'll rework this bit of code to work better
> > 
> > In fact technically, we should not block for data at all when the
> > channel is in non-blocking mode. Fixing that would require some
> > refactoring of qemu_rdma_block_for_wrid() so that we could tell
> > it to only look for an already completed work request and not
> > block
> 
> I wouldn't go changing qemu_rdma_block_for_wrid unless you need
> to.

Yeah, I won't do it now - just something to think about for the future
to properly do non-blocking I/o channels.

Regards,
Daniel
diff mbox

Patch

diff --git a/migration/rdma.c b/migration/rdma.c
index bffbfaf..3e961cb 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -374,14 +374,19 @@  typedef struct RDMAContext {
     GHashTable *blockmap;
 } RDMAContext;
 
-/*
- * Interface to the rest of the migration call stack.
- */
-typedef struct QEMUFileRDMA {
+#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
+#define QIO_CHANNEL_RDMA(obj)                                     \
+    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
+
+typedef struct QIOChannelRDMA QIOChannelRDMA;
+
+
+struct QIOChannelRDMA {
+    QIOChannel parent;
     RDMAContext *rdma;
+    QEMUFile *file;
     size_t len;
-    void *file;
-} QEMUFileRDMA;
+};
 
 /*
  * Main structure for IB Send/Recv control messages.
@@ -2518,15 +2523,19 @@  static void *qemu_rdma_data_init(const char *host_port, Error **errp)
  * SEND messages for control only.
  * VM's ram is handled with regular RDMA messages.
  */
-static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
-                                    int64_t pos, size_t size)
-{
-    QEMUFileRDMA *r = opaque;
-    QEMUFile *f = r->file;
-    RDMAContext *rdma = r->rdma;
-    size_t remaining = size;
-    uint8_t * data = (void *) buf;
+static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
+                                       const struct iovec *iov,
+                                       size_t niov,
+                                       int *fds,
+                                       size_t nfds,
+                                       Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    QEMUFile *f = rioc->file;
+    RDMAContext *rdma = rioc->rdma;
     int ret;
+    ssize_t done = 0;
+    size_t i;
 
     CHECK_ERROR_STATE();
 
@@ -2540,27 +2549,31 @@  static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
         return ret;
     }
 
-    while (remaining) {
-        RDMAControlHeader head;
+    for (i = 0; i < niov; i++) {
+        size_t remaining = iov[i].iov_len;
+        uint8_t * data = (void *)iov[i].iov_base;
+        while (remaining) {
+            RDMAControlHeader head;
 
-        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
-        remaining -= r->len;
+            rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
+            remaining -= rioc->len;
 
-        /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
-        head.len = (uint32_t)r->len;
-        head.type = RDMA_CONTROL_QEMU_FILE;
+            head.len = rioc->len;
+            head.type = RDMA_CONTROL_QEMU_FILE;
 
-        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
 
-        if (ret < 0) {
-            rdma->error_state = ret;
-            return ret;
-        }
+            if (ret < 0) {
+                rdma->error_state = ret;
+                return ret;
+            }
 
-        data += r->len;
+            data += rioc->len;
+            done += rioc->len;
+        }
     }
 
-    return size;
+    return done;
 }
 
 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
@@ -2585,41 +2598,65 @@  static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
  * RDMA links don't use bytestreams, so we have to
  * return bytes to QEMUFile opportunistically.
  */
-static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
-                                    int64_t pos, size_t size)
-{
-    QEMUFileRDMA *r = opaque;
-    RDMAContext *rdma = r->rdma;
+static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int **fds,
+                                      size_t *nfds,
+                                      Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    RDMAContext *rdma = rioc->rdma;
     RDMAControlHeader head;
     int ret = 0;
+    ssize_t i;
+    size_t done = 0;
 
     CHECK_ERROR_STATE();
 
-    /*
-     * First, we hold on to the last SEND message we
-     * were given and dish out the bytes until we run
-     * out of bytes.
-     */
-    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
-    if (r->len) {
-        return r->len;
-    }
+    for (i = 0; i < niov; i++) {
+        size_t want = iov[i].iov_len;
+        uint8_t *data = (void *)iov[i].iov_base;
 
-    /*
-     * Once we run out, we block and wait for another
-     * SEND message to arrive.
-     */
-    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+        /*
+         * First, we hold on to the last SEND message we
+         * were given and dish out the bytes until we run
+         * out of bytes.
+         */
+        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        if (ret > 0) {
+            done += ret;
+            if (ret < want) {
+                break;
+            } else {
+                continue;
+            }
+        }
 
-    if (ret < 0) {
-        rdma->error_state = ret;
-        return ret;
-    }
+        /*
+         * Once we run out, we block and wait for another
+         * SEND message to arrive.
+         */
+        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
 
-    /*
-     * SEND was received with new bytes, now try again.
-     */
-    return qemu_rdma_fill(r->rdma, buf, size, 0);
+        if (ret < 0) {
+            rdma->error_state = ret;
+            return ret;
+        }
+
+        /*
+         * SEND was received with new bytes, now try again.
+         */
+        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        if (ret > 0) {
+            done += ret;
+            if (ret < want) {
+                break;
+            }
+        }
+    }
+    rioc->len = done;
+    return rioc->len;
 }
 
 /*
@@ -2646,15 +2683,16 @@  static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
     return 0;
 }
 
-static int qemu_rdma_close(void *opaque)
+static int qio_channel_rdma_close(QIOChannel *ioc,
+                                  Error **errp)
 {
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     trace_qemu_rdma_close();
-    QEMUFileRDMA *r = opaque;
-    if (r->rdma) {
-        qemu_rdma_cleanup(r->rdma);
-        g_free(r->rdma);
+    if (rioc->rdma) {
+        qemu_rdma_cleanup(rioc->rdma);
+        g_free(rioc->rdma);
+        rioc->rdma = NULL;
     }
-    g_free(r);
     return 0;
 }
 
@@ -2696,8 +2734,8 @@  static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
                                   ram_addr_t block_offset, ram_addr_t offset,
                                   size_t size, uint64_t *bytes_sent)
 {
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     int ret;
 
     CHECK_ERROR_STATE();
@@ -2951,8 +2989,8 @@  static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
                              };
     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
                                  .repeat = 1 };
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
     RDMAControlHeader head;
     RDMARegister *reg, *registers;
@@ -3207,9 +3245,10 @@  out:
  * We've already built our local RAMBlock list, but not yet sent the list to
  * the source.
  */
-static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name)
+static int
+rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
 {
-    RDMAContext *rdma = rfile->rdma;
+    RDMAContext *rdma = rioc->rdma;
     int curr;
     int found = -1;
 
@@ -3251,8 +3290,8 @@  static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
                                         uint64_t flags, void *data)
 {
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
 
     CHECK_ERROR_STATE();
 
@@ -3271,8 +3310,8 @@  static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
                                        uint64_t flags, void *data)
 {
     Error *local_err = NULL, **errp = &local_err;
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     RDMAControlHeader head = { .len = 0, .repeat = 1 };
     int ret = 0;
 
@@ -3368,55 +3407,78 @@  err:
     return ret;
 }
 
-static int qemu_rdma_get_fd(void *opaque)
-{
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
-
-    return rdma->comp_channel->fd;
-}
-
-static const QEMUFileOps rdma_read_ops = {
-    .get_buffer    = qemu_rdma_get_buffer,
-    .get_fd        = qemu_rdma_get_fd,
-    .close         = qemu_rdma_close,
-};
-
 static const QEMUFileHooks rdma_read_hooks = {
     .hook_ram_load = rdma_load_hook,
 };
 
-static const QEMUFileOps rdma_write_ops = {
-    .put_buffer         = qemu_rdma_put_buffer,
-    .close              = qemu_rdma_close,
-};
-
 static const QEMUFileHooks rdma_write_hooks = {
     .before_ram_iterate = qemu_rdma_registration_start,
     .after_ram_iterate  = qemu_rdma_registration_stop,
     .save_page          = qemu_rdma_save_page,
 };
 
-static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+
+static void qio_channel_rdma_finalize(Object *obj)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
+    if (rioc->rdma) {
+        qemu_rdma_cleanup(rioc->rdma);
+        g_free(rioc->rdma);
+        rioc->rdma = NULL;
+    }
+}
+
+static void qio_channel_rdma_class_init(ObjectClass *klass,
+                                        void *class_data G_GNUC_UNUSED)
+{
+    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+    ioc_klass->io_writev = qio_channel_rdma_writev;
+    ioc_klass->io_readv = qio_channel_rdma_readv;
+    /* XXX
+    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
+    */
+    ioc_klass->io_close = qio_channel_rdma_close;
+    /* XXX
+    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+    */
+}
+
+static const TypeInfo qio_channel_rdma_info = {
+    .parent = TYPE_QIO_CHANNEL,
+    .name = TYPE_QIO_CHANNEL_RDMA,
+    .instance_size = sizeof(QIOChannelRDMA),
+    .instance_finalize = qio_channel_rdma_finalize,
+    .class_init = qio_channel_rdma_class_init,
+};
+
+static void qio_channel_rdma_register_types(void)
+{
+    type_register_static(&qio_channel_rdma_info);
+}
+
+type_init(qio_channel_rdma_register_types);
+
+static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
 {
-    QEMUFileRDMA *r;
+    QIOChannelRDMA *rioc;
 
     if (qemu_file_mode_is_not_valid(mode)) {
         return NULL;
     }
 
-    r = g_new0(QEMUFileRDMA, 1);
-    r->rdma = rdma;
+    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+    rioc->rdma = rdma;
 
     if (mode[0] == 'w') {
-        r->file = qemu_fopen_ops(r, &rdma_write_ops);
-        qemu_file_set_hooks(r->file, &rdma_write_hooks);
+        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
     } else {
-        r->file = qemu_fopen_ops(r, &rdma_read_ops);
-        qemu_file_set_hooks(r->file, &rdma_read_hooks);
+        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
     }
 
-    return r->file;
+    return rioc->file;
 }
 
 static void rdma_accept_incoming_migration(void *opaque)