Patchwork [PULL,v4,08/11] rdma: core logic

login
register
mail settings
Submitter mrhines@linux.vnet.ibm.com
Date April 17, 2013, 11:07 p.m.
Message ID <1366240040-10730-9-git-send-email-mrhines@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/237386/
State New
Headers show

Comments

mrhines@linux.vnet.ibm.com - April 17, 2013, 11:07 p.m.
From: "Michael R. Hines" <mrhines@us.ibm.com>

Code that does need to be visible is kept
well contained inside this file and this is the only
new additional file to the entire patch - good
progress.

This file includes the entire protocol and interfaces
required to perform RDMA migration.

Also, the configure and Makefile modifications to link
this file are included.

Full documentation is in docs/rdma.txt

Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
---
 Makefile.objs                 |    1 +
 configure                     |   29 +
 include/migration/migration.h |    4 +
 migration-rdma.c              | 2667 +++++++++++++++++++++++++++++++++++++++++
 migration.c                   |    8 +
 5 files changed, 2709 insertions(+)
 create mode 100644 migration-rdma.c
Paolo Bonzini - April 18, 2013, 7:55 a.m.
Il 18/04/2013 01:07, mrhines@linux.vnet.ibm.com ha scritto:
> +/*
> + * Virtual address of the above structures used for transmitting
> + * the RAMBlock descriptions at connection-time.
> + * This structure is *not* transmitted.
> + */
> +typedef struct RDMALocalBlocks {
> +    int num_blocks;
> +    RDMALocalBlock *block;
> +} RDMALocalBlocks;
> +
> +/*
> + * Same as above
> + */
> +typedef struct RDMARemoteBlocks {
> +    RDMARemoteBlock *block;
> +    void *remote_area;
> +} RDMARemoteBlocks;

block and remote_area can be reduced to a single pointer.

> +    if (rdma == NULL) {
> +        goto err;
> +    }
> +
> +    ret = qemu_rdma_source_init(rdma, NULL,
> +        s->enabled_capabilities[MIGRATION_CAPABILITY_X_CHUNK_REGISTER_DESTINATION]);
> +
> +    if (ret) {
> +        goto err;
> +    }
> +
> +    DPRINTF("qemu_rdma_source_init success\n");
> +    ret = qemu_rdma_connect(rdma, NULL);

The usage of error_setg is correct, but here you must pass your errp
down to qemu_rdma_source_init and qemu_rdma_connect.  You can then
remove this:

+    error_setg(errp, "Error connecting using rdma! %d\n", ret);

because the error was already set in the callees.

Also, you should use the Error API also on the destination side, where
rdma_start_incoming_migration can pass errp to qemu_rdma_dest_prepare or
set it itself instead of printing to stderr.

The reason is that when using a management layer stderr will be logged
but not printed to the operator's console.  Instead, Errors are sent on
the monitor connection.  The management layer(s) then pick up the error
and propagate it through the various APIs until they appear in the
terminal/browser/whatever.

Nothing else from me.

Paolo
Michael S. Tsirkin - April 18, 2013, 7:58 a.m.
The following comment applies to all of this code:

On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
> diff --git a/migration-rdma.c b/migration-rdma.c
> new file mode 100644
> index 0000000..1dff06f
> --- /dev/null
> +++ b/migration-rdma.c
> @@ -0,0 +1,2667 @@
> +/*
> + *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
> + *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>

By the way do you both really hold copyrights on this code?
It's not assigned to IBM?
If the later it should say:
	Copyright IBM
	Author: Michael R. Hines
Or some such.

> + *
> + *  RDMA protocol and interfaces
> + *
> + *  This program is free software; you can redistribute it and/or modify
> + *  it under the terms of the GNU General Public License as published by
> + *  the Free Software Foundation; under version 2 of the License.


We prefer 2 or later for new code.
I'm guessing you need approval from Jiuxing Liu for this,
pls make him ack license change.

> + *
> + *  This program is distributed in the hope that it will be useful,
> + *  but WITHOUT ANY WARRANTY; without even the implied warranty of
> + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + *  GNU General Public License for more details.
> + *
> + *  You should have received a copy of the GNU General Public License
> + *  along with this program; if not, see <http://www.gnu.org/licenses/>.
> + */
Orit Wasserman - April 18, 2013, 8:44 a.m.
Hi Michael,

I don't see you addressed any of the comment I had in v3 
(especially the error handling)

please, fix those

Orit

On 04/18/2013 02:07 AM, mrhines@linux.vnet.ibm.com wrote:
> From: "Michael R. Hines" <mrhines@us.ibm.com>
> 
> Code that does need to be visible is kept
> well contained inside this file and this is the only
> new additional file to the entire patch - good
> progress.
> 
> This file includes the entire protocol and interfaces
> required to perform RDMA migration.
> 
> Also, the configure and Makefile modifications to link
> this file are included.
> 
> Full documentation is in docs/rdma.txt
> 
> Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
> ---
>  Makefile.objs                 |    1 +
>  configure                     |   29 +
>  include/migration/migration.h |    4 +
>  migration-rdma.c              | 2667 +++++++++++++++++++++++++++++++++++++++++
>  migration.c                   |    8 +
>  5 files changed, 2709 insertions(+)
>  create mode 100644 migration-rdma.c
> 
> diff --git a/Makefile.objs b/Makefile.objs
> index a473348..d744827 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -49,6 +49,7 @@ common-obj-$(CONFIG_POSIX) += os-posix.o
>  common-obj-$(CONFIG_LINUX) += fsdev/
>  
>  common-obj-y += migration.o migration-tcp.o
> +common-obj-$(CONFIG_RDMA) += migration-rdma.o
>  common-obj-y += qemu-char.o #aio.o
>  common-obj-y += block-migration.o
>  common-obj-y += page_cache.o xbzrle.o
> diff --git a/configure b/configure
> index 4c4f6f6..9decae2 100755
> --- a/configure
> +++ b/configure
> @@ -180,6 +180,7 @@ xfs=""
>  
>  vhost_net="no"
>  kvm="no"
> +rdma="yes"
>  gprof="no"
>  debug_tcg="no"
>  debug="no"
> @@ -925,6 +926,10 @@ for opt do
>    ;;
>    --enable-gtk) gtk="yes"
>    ;;
> +  --enable-rdma) rdma="yes"
> +  ;;
> +  --disable-rdma) rdma="no"
> +  ;;
>    --with-gtkabi=*) gtkabi="$optarg"
>    ;;
>    --enable-tpm) tpm="yes"
> @@ -1133,6 +1138,8 @@ echo "  --enable-bluez           enable bluez stack connectivity"
>  echo "  --disable-slirp          disable SLIRP userspace network connectivity"
>  echo "  --disable-kvm            disable KVM acceleration support"
>  echo "  --enable-kvm             enable KVM acceleration support"
> +echo "  --disable-rdma           disable RDMA-based migration support"
> +echo "  --enable-rdma            enable RDMA-based migration support"
>  echo "  --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
>  echo "  --disable-nptl           disable usermode NPTL support"
>  echo "  --enable-nptl            enable usermode NPTL support"
> @@ -1782,6 +1789,23 @@ EOF
>    libs_softmmu="$sdl_libs $libs_softmmu"
>  fi
>  
> +if test "$rdma" != "no" ; then
> +  cat > $TMPC <<EOF
> +#include <rdma/rdma_cma.h>
> +int main(void) { return 0; }
> +EOF
> +  rdma_libs="-lrdmacm -libverbs"
> +  if compile_prog "-Werror" "$rdma_libs" ; then
> +    rdma="yes"
> +    libs_softmmu="$libs_softmmu $rdma_libs"
> +  else
> +    if test "$rdma" = "yes" ; then
> +      feature_not_found "rdma"
> +    fi
> +    rdma="no"
> +  fi
> +fi
> +
>  ##########################################
>  # VNC TLS/WS detection
>  if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
> @@ -3524,6 +3548,7 @@ echo "Linux AIO support $linux_aio"
>  echo "ATTR/XATTR support $attr"
>  echo "Install blobs     $blobs"
>  echo "KVM support       $kvm"
> +echo "RDMA support      $rdma"
>  echo "TCG interpreter   $tcg_interpreter"
>  echo "fdt support       $fdt"
>  echo "preadv support    $preadv"
> @@ -4510,6 +4535,10 @@ if [ "$pixman" = "internal" ]; then
>    echo "config-host.h: subdir-pixman" >> $config_host_mak
>  fi
>  
> +if test "$rdma" = "yes" ; then
> +echo "CONFIG_RDMA=y" >> $config_host_mak
> +fi
> +
>  # build tree in object directory in case the source is not in the current directory
>  DIRS="tests tests/tcg tests/tcg/cris tests/tcg/lm32"
>  DIRS="$DIRS pc-bios/optionrom pc-bios/spapr-rtas"
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 8e02391..720e0a5 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -76,6 +76,10 @@ void fd_start_incoming_migration(const char *path, Error **errp);
>  
>  void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp);
>  
> +void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp);
> +
> +void rdma_start_incoming_migration(const char *host_port, Error **errp);
> +
>  void migrate_fd_error(MigrationState *s);
>  
>  void migrate_fd_connect(MigrationState *s);
> diff --git a/migration-rdma.c b/migration-rdma.c
> new file mode 100644
> index 0000000..1dff06f
> --- /dev/null
> +++ b/migration-rdma.c
> @@ -0,0 +1,2667 @@
> +/*
> + *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
> + *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
> + *
> + *  RDMA protocol and interfaces
> + *
> + *  This program is free software; you can redistribute it and/or modify
> + *  it under the terms of the GNU General Public License as published by
> + *  the Free Software Foundation; under version 2 of the License.
> + *
> + *  This program is distributed in the hope that it will be useful,
> + *  but WITHOUT ANY WARRANTY; without even the implied warranty of
> + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + *  GNU General Public License for more details.
> + *
> + *  You should have received a copy of the GNU General Public License
> + *  along with this program; if not, see <http://www.gnu.org/licenses/>.
> + */
> +#include "qemu-common.h"
> +#include "migration/migration.h"
> +#include "migration/qemu-file.h"
> +#include "exec/cpu-common.h"
> +#include "qemu/main-loop.h"
> +#include "qemu/sockets.h"
> +#include "block/coroutine.h"
> +#include <stdio.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <netdb.h>
> +#include <arpa/inet.h>
> +#include <string.h>
> +#include <poll.h>
> +#include <rdma/rdma_cma.h>
> +
> +//#define DEBUG_RDMA
> +//#define DEBUG_RDMA_VERBOSE
> +
> +#ifdef DEBUG_RDMA
> +#define DPRINTF(fmt, ...) \
> +    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
> +#else
> +#define DPRINTF(fmt, ...) \
> +    do { } while (0)
> +#endif
> +
> +#ifdef DEBUG_RDMA_VERBOSE
> +#define DDPRINTF(fmt, ...) \
> +    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
> +#else
> +#define DDPRINTF(fmt, ...) \
> +    do { } while (0)
> +#endif
> +
> +#define RDMA_RESOLVE_TIMEOUT_MS 10000
> +
> +/* Do not merge data if larger than this. */
> +#define RDMA_MERGE_MAX (4 * 1024 * 1024)
> +#define RDMA_UNSIGNALED_SEND_MAX 64
> +
> +#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
> +
> +/*
> + * Debugging only. Hard-coded only
> + */
> +//#define RDMA_REG_CHUNK_SHIFT 21 /* 2 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 22 /* 4 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 23 /* 8 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 24 /* 16 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 25 /* 32 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 26 /* 64 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 27 /* 128 MB */
> +//#define RDMA_REG_CHUNK_SHIFT 28 /* 256 MB */
> +
> +#define RDMA_REG_CHUNK_SIZE (1UL << (RDMA_REG_CHUNK_SHIFT))
> +
> +/*
> + * This is only for non-live state being migrated.
> + * Instead of RDMA_WRITE messages, we use RDMA_SEND
> + * messages for that state, which requires a different
> + * delivery design than main memory.
> + */
> +#define RDMA_SEND_INCREMENT 32768
> +
> +/*
> + * Completion queue can be filled by both read and write work requests,
> + * so must reflect the sum of both possible queue sizes.
> + */
> +#define RDMA_QP_SIZE 1000
> +#define RDMA_CQ_SIZE (RDMA_QP_SIZE * 3)
> +
> +/*
> + * Maximum size infiniband SEND message
> + */
> +#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
> +#define RDMA_CONTROL_MAX_WR 2
> +#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
> +
> +/*
> + * Capabilities for negotiation.
> + */
> +#define RDMA_CAPABILITY_CHUNK_REGISTER 0x01
> +
> +/*
> + * Add the other flags above to this list of known capabilities
> + * as they are introduced.
> + */
> +static uint32_t known_capabilities = RDMA_CAPABILITY_CHUNK_REGISTER;
> +
> +#define CHECK_ERROR_STATE() \
> +    do { \
> +        if (rdma->error_state) { \
> +            fprintf(stderr, "RDMA is in an error state waiting migration" \
> +                            " to abort!\n"); \
> +            return rdma->error_state; \
> +        } \
> +    } while(0);
> +/*
> + * RDMA migration protocol:
> + * 1. RDMA Writes (data messages, i.e. RAM)
> + * 2. IB Send/Recv (control channel messages)
> + */
> +enum {
> +    RDMA_WRID_NONE = 0,
> +    RDMA_WRID_RDMA_WRITE,
> +    RDMA_WRID_SEND_CONTROL = 1000,
> +    RDMA_WRID_RECV_CONTROL = 2000,
> +};
> +
> +const char *wrid_desc[] = {
> +        [RDMA_WRID_NONE] = "NONE",
> +        [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
> +        [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
> +        [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
> +};
> +
> +/*
> + * SEND/RECV IB Control Messages.
> + */
> +enum {
> +    RDMA_CONTROL_NONE = 0,
> +    RDMA_CONTROL_ERROR,
> +    RDMA_CONTROL_READY,             /* ready to receive */
> +    RDMA_CONTROL_QEMU_FILE,         /* QEMUFile-transmitted bytes */
> +    RDMA_CONTROL_RAM_BLOCKS,        /* RAMBlock synchronization */
> +    RDMA_CONTROL_COMPRESS,          /* page contains repeat values */
> +    RDMA_CONTROL_REGISTER_REQUEST,  /* dynamic page registration */
> +    RDMA_CONTROL_REGISTER_RESULT,   /* key to use after registration */
> +    RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
> +};
> +
> +const char *control_desc[] = {
> +        [RDMA_CONTROL_NONE] = "NONE",
> +        [RDMA_CONTROL_ERROR] = "ERROR",
> +        [RDMA_CONTROL_READY] = "READY",
> +        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
> +        [RDMA_CONTROL_RAM_BLOCKS] = "REMOTE INFO",
> +        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
> +        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
> +        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
> +        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
> +};
> +
> +/*
> + * Memory and MR structures used to represent an IB Send/Recv work request.
> + * This is *not* used for RDMA, only IB Send/Recv.
> + */
> +typedef struct {
> +    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
> +    struct   ibv_mr *control_mr;               /* registration metadata */
> +    size_t   control_len;                      /* length of the message */
> +    uint8_t *control_curr;                     /* start of unconsumed bytes */
> +} RDMAWorkRequestData;
> +
> +/*
> + * Negotiate RDMA capabilities during connection-setup time.
> + */
> +typedef struct {
> +    uint32_t version;
> +    uint32_t flags;
> +} RDMACapabilities;
> +
> +static void caps_to_network(RDMACapabilities *cap)
> +{
> +    cap->version = htonl(cap->version);
> +    cap->flags = htonl(cap->flags);
> +}
> +
> +static void network_to_caps(RDMACapabilities *cap)
> +{
> +    cap->version = ntohl(cap->version);
> +    cap->flags = ntohl(cap->flags);
> +}
> +
> +/*
> + * Representation of a RAMBlock from an RDMA perspective.
> + * This is not transmitted, only local.
> + * This and subsequent structures cannot be linked lists
> + * because we're using a single IB message to transmit
> + * the information. It's small anyway, so a list is overkill.
> + */
> +typedef struct RDMALocalBlock {
> +    uint8_t  *local_host_addr; /* local virtual address */
> +    uint64_t remote_host_addr; /* remote virtual address */
> +    uint64_t offset;
> +    uint64_t length;
> +    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
> +    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
> +    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
> +    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
> +} RDMALocalBlock;
> +
> +/*
> + * Also represents a RAMblock, but only on the dest.
> + * This gets transmitted by the dest during connection-time
> + * to the source / primary VM and then is used to populate the
> + * corresponding RDMALocalBlock with
> + * the information needed to perform the actual RDMA.
> + */
> +typedef struct QEMU_PACKED RDMARemoteBlock {
> +    uint64_t remote_host_addr;
> +    uint64_t offset;
> +    uint64_t length;
> +    uint32_t remote_rkey;
> +    uint32_t padding;
> +} QEMU_PACKED RDMARemoteBlock;
> +
> +/*
> + * Virtual address of the above structures used for transmitting
> + * the RAMBlock descriptions at connection-time.
> + * This structure is *not* transmitted.
> + */
> +typedef struct RDMALocalBlocks {
> +    int num_blocks;
> +    RDMALocalBlock *block;
> +} RDMALocalBlocks;
> +
> +/*
> + * Same as above
> + */
> +typedef struct RDMARemoteBlocks {
> +    RDMARemoteBlock *block;
> +    void *remote_area;
> +} RDMARemoteBlocks;
> +
> +/*
> + * Main data structure for RDMA state.
> + * While there is only one copy of this structure being allocated right now,
> + * this is the place where one would start if you wanted to consider
> + * having more than one RDMA connection open at the same time.
> + */
> +typedef struct RDMAContext {
> +    char *host;
> +    int port;
> +
> +    /* This is used by the migration protocol to transmit
> +     * control messages (such as device state and registration commands)
> +     *
> +     * WR #0 is for control channel ready messages from the destination.
> +     * WR #1 is for control channel data messages from the destination.
> +     * WR #2 is for control channel send messages.
> +     *
> +     * We could use more WRs, but we have enough for now.
> +     */
> +    RDMAWorkRequestData wr_data[RDMA_CONTROL_MAX_WR + 1];
> +
> +    /*
> +     * This is used by *_exchange_send() to figure out whether or not
> +     * the initial "READY" message has already been received or not.
> +     * This is because other functions may potentially poll() and detect
> +     * the READY message before send() does, in which case we need to
> +     * know if it completed.
> +     */
> +    int control_ready_expected;
> +
> +    /* number of outstanding unsignaled send */
> +    int num_unsignaled_send;
> +
> +    /* number of outstanding signaled send */
> +    int num_signaled_send;
> +
> +    /* store info about current buffer so that we can
> +       merge it with future sends */
> +    uint64_t current_offset;
> +    uint64_t current_length;
> +    /* index of ram block the current buffer belongs to */
> +    int current_index;
> +    /* index of the chunk in the current ram block */
> +    int current_chunk;
> +
> +    bool chunk_register_destination;
> +
> +    /*
> +     * infiniband-specific variables for opening the device
> +     * and maintaining connection state and so forth.
> +     *
> +     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
> +     * cm_id->verbs, cm_id->channel, and cm_id->qp.
> +     */
> +    struct rdma_cm_id *cm_id;               /* connection manager ID */
> +    struct rdma_cm_id *listen_id;
> +
> +    struct ibv_context *verbs;
> +    struct rdma_event_channel *channel;
> +    struct ibv_qp *qp;                      /* queue pair */
> +    struct ibv_comp_channel *comp_channel;  /* completion channel */
> +    struct ibv_pd *pd;                      /* protection domain */
> +    struct ibv_cq *cq;                      /* completion queue */
> +
> +    /*
> +     * If a previous write failed (perhaps because of a failed
> +     * memory registration, then do not attempt any future work
> +     * and remember the error state.
> +     */
> +    int error_state;
> +
> +    /*
> +     * Description of ram blocks used throughout the code.
> +     */
> +    RDMALocalBlocks local_ram_blocks;
> +    RDMARemoteBlocks remote_ram_blocks;
> +
> +    /*
> +     * Migration on *destination* started. 
> +     * Then use coroutine yield function.
> +     * Source runs in a thread, so we don't care.
> +     */
> +    int migration_started_on_destination;
> +} RDMAContext;
> +
> +/*
> + * Interface to the rest of the migration call stack.
> + */
> +typedef struct QEMUFileRDMA {
> +    RDMAContext *rdma;
> +    size_t len;
> +    void *file;
> +} QEMUFileRDMA;
> +
> +#define RDMA_CONTROL_VERSION_CURRENT 1
> +
> +/*
> + * Main structure for IB Send/Recv control messages.
> + * This gets prepended at the beginning of every Send/Recv.
> + */
> +typedef struct QEMU_PACKED {
> +    uint32_t len;     /* Total length of data portion */
> +    uint32_t type;    /* which control command to perform */
> +    uint32_t repeat;  /* number of commands in data portion of same type */
> +    uint32_t padding;
> +} QEMU_PACKED RDMAControlHeader;
> +
> +static void control_to_network(RDMAControlHeader *control)
> +{
> +    control->type = htonl(control->type);
> +    control->len = htonl(control->len);
> +    control->repeat = htonl(control->repeat);
> +}
> +
> +static void network_to_control(RDMAControlHeader *control)
> +{
> +    control->type = ntohl(control->type);
> +    control->len = ntohl(control->len);
> +    control->repeat = ntohl(control->repeat);
> +}
> +
> +/*
> + * Register a single Chunk.
> + * Information sent by the primary VM to inform the dest
> + * to register an single chunk of memory before we can perform
> + * the actual RDMA operation.
> + */
> +typedef struct QEMU_PACKED {
> +    uint32_t len;           /* length of the chunk to be registered */
> +    uint32_t current_index; /* which ramblock the chunk belongs to */
> +    uint64_t offset;        /* offset into the ramblock of the chunk */
> +} QEMU_PACKED RDMARegister;
> +
> +typedef struct QEMU_PACKED {
> +    uint32_t value;     /* if zero, we will madvise() */
> +    uint32_t block_idx; /* which ram block index */
> +    uint64_t offset;    /* where in the remote ramblock this chunk */
> +    uint64_t length;    /* length of the chunk */
> +} QEMU_PACKED RDMACompress;
> +
> +/*
> + * The result of the dest's memory registration produces an "rkey"
> + * which the primary VM must reference in order to perform
> + * the RDMA operation.
> + */
> +typedef struct QEMU_PACKED {
> +    uint32_t rkey;
> +    uint32_t padding;
> +} QEMU_PACKED RDMARegisterResult;
> +
> +
> +inline static int ram_chunk_index(uint8_t *start, uint8_t *host)
> +{
> +    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
> +}
> +
> +inline static int ram_chunk_count(RDMALocalBlock *rdma_ram_block)
> +{
> +    return ram_chunk_index(rdma_ram_block->local_host_addr,
> +        rdma_ram_block->local_host_addr + rdma_ram_block->length) + 1;
> +}
> +
> +static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block, int i)
> +{
> +    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
> +                                    + (i << RDMA_REG_CHUNK_SHIFT));
> +}
> +
> +inline static uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, int i)
> +{
> +    uint8_t *result = ram_chunk_start(rdma_ram_block, i) + RDMA_REG_CHUNK_SIZE;
> +
> +    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
> +        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
> +    }
> +
> +    return result;
> +}
> +
> +/*
> + * Memory regions need to be registered with the device and queue pairs setup
> + * in advanced before the migration starts. This tells us where the RAM blocks
> + * are so that we can register them individually.
> + */
> +static void qemu_rdma_init_one_block(void *host_addr,
> +    ram_addr_t offset, ram_addr_t length, void *opaque)
> +{
> +    RDMALocalBlocks *rdma_local_ram_blocks = opaque;
> +    int num_blocks = rdma_local_ram_blocks->num_blocks;
> +
> +    rdma_local_ram_blocks->block[num_blocks].local_host_addr = host_addr;
> +    rdma_local_ram_blocks->block[num_blocks].offset = (uint64_t)offset;
> +    rdma_local_ram_blocks->block[num_blocks].length = (uint64_t)length;
> +    rdma_local_ram_blocks->num_blocks++;
> +
> +}
> +
> +static void qemu_rdma_ram_block_counter(void *host_addr,
> +            ram_addr_t offset, ram_addr_t length, void *opaque)
> +{
> +    int *num_blocks = opaque;
> +    *num_blocks = *num_blocks + 1;
> +}
> +
> +/*
> + * Identify the RAMBlocks and their quantity. They will be references to
> + * identify chunk boundaries inside each RAMBlock and also be referenced
> + * during dynamic page registration.
> + */
> +static int qemu_rdma_init_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> +    int num_blocks = 0;
> +
> +    qemu_ram_foreach_block(qemu_rdma_ram_block_counter, &num_blocks);
> +
> +    memset(rdma_local_ram_blocks, 0, sizeof *rdma_local_ram_blocks);
> +    rdma_local_ram_blocks->block = g_malloc0(sizeof(RDMALocalBlock) *
> +                                    num_blocks);
> +
> +    rdma_local_ram_blocks->num_blocks = 0;
> +    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma_local_ram_blocks);
> +
> +    DPRINTF("Allocated %d local ram block structures\n",
> +                    rdma_local_ram_blocks->num_blocks);
> +    return 0;
> +}
> +
> +/*
> + * Put in the log file which RDMA device was opened and the details
> + * associated with that device.
> + */
> +static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
> +{
> +    printf("%s RDMA Device opened: kernel name %s "
> +           "uverbs device name %s, "
> +           "infiniband_verbs class device path %s,"
> +           " infiniband class device path %s\n",
> +                who,
> +                verbs->device->name,
> +                verbs->device->dev_name,
> +                verbs->device->dev_path,
> +                verbs->device->ibdev_path);
> +}
> +
> +/*
> + * Put in the log file the RDMA gid addressing information,
> + * useful for folks who have trouble understanding the
> + * RDMA device hierarchy in the kernel.
> + */
> +static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
> +{
> +    char sgid[33];
> +    char dgid[33];
> +    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
> +    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
> +    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
> +}
> +
> +/*
> + * Figure out which RDMA device corresponds to the requested IP hostname
> + * Also create the initial connection manager identifiers for opening
> + * the connection.
> + */
> +static int qemu_rdma_resolve_host(RDMAContext *rdma)
> +{
> +    int ret;
> +    struct addrinfo *res;
> +    char port_str[16];
> +    struct rdma_cm_event *cm_event;
> +    char ip[40] = "unknown";
> +
> +    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
> +        fprintf(stderr, "RDMA hostname has not been set\n");
> +        return -1;
> +    }
> +
> +    /* create CM channel */
> +    rdma->channel = rdma_create_event_channel();
> +    if (!rdma->channel) {
> +        fprintf(stderr, "could not create CM channel\n");
> +        return -1;
> +    }
> +
> +    /* create CM id */
> +    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
> +    if (ret) {
> +        fprintf(stderr, "could not create channel id\n");
> +        goto err_resolve_create_id;
> +    }
> +
> +    snprintf(port_str, 16, "%d", rdma->port);
> +    port_str[15] = '\0';
> +
> +    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
> +    if (ret < 0) {
> +        fprintf(stderr, "could not getaddrinfo destination address %s\n",
> +                        rdma->host);
> +        goto err_resolve_get_addr;
> +    }
> +
> +    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
> +                                ip, sizeof ip);
> +    DPRINTF("%s => %s\n", rdma->host, ip);
> +
> +    /* resolve the first address */
> +    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
> +            RDMA_RESOLVE_TIMEOUT_MS);
> +    if (ret) {
> +        fprintf(stderr, "could not resolve address %s\n", rdma->host);
> +        goto err_resolve_get_addr;
> +    }
> +
> +    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
> +
> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
> +    if (ret) {
> +        fprintf(stderr, "could not perform event_addr_resolved\n");
> +        goto err_resolve_get_addr;
> +    }
> +
> +    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
> +        fprintf(stderr, "result not equal to event_addr_resolved %s\n",
> +                rdma_event_str(cm_event->event));
> +        perror("rdma_resolve_addr");
> +        goto err_resolve_get_addr;
> +    }
> +    rdma_ack_cm_event(cm_event);
> +
> +    /* resolve route */
> +    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
> +    if (ret) {
> +        fprintf(stderr, "could not resolve rdma route\n");
> +        goto err_resolve_get_addr;
> +    }
> +
> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
> +    if (ret) {
> +        fprintf(stderr, "could not perform event_route_resolved\n");
> +        goto err_resolve_get_addr;
> +    }
> +    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
> +        fprintf(stderr, "result not equal to event_route_resolved: %s\n",
> +                        rdma_event_str(cm_event->event));
> +        rdma_ack_cm_event(cm_event);
> +        goto err_resolve_get_addr;
> +    }
> +    rdma_ack_cm_event(cm_event);
> +    rdma->verbs = rdma->cm_id->verbs;
> +    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
> +    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
> +    return 0;
> +
> +err_resolve_get_addr:
> +    rdma_destroy_id(rdma->cm_id);
> +err_resolve_create_id:
> +    rdma_destroy_event_channel(rdma->channel);
> +    rdma->channel = NULL;
> +
> +    return -1;
> +}
> +
> +/*
> + * Create protection domain and completion queues
> + */
> +static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
> +{
> +    /* allocate pd */
> +    rdma->pd = ibv_alloc_pd(rdma->verbs);
> +    if (!rdma->pd) {
> +        fprintf(stderr, "failed to allocate protection domain\n");
> +        return -1;
> +    }
> +
> +    /* create completion channel */
> +    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
> +    if (!rdma->comp_channel) {
> +        fprintf(stderr, "failed to allocate completion channel\n");
> +        goto err_alloc_pd_cq;
> +    }
> +
> +    /* create cq */
> +    rdma->cq = ibv_create_cq(rdma->verbs, RDMA_CQ_SIZE,
> +            NULL, rdma->comp_channel, 0);
> +    if (!rdma->cq) {
> +        fprintf(stderr, "failed to allocate completion queue\n");
> +        goto err_alloc_pd_cq;
> +    }
> +
> +    return 0;
> +
> +err_alloc_pd_cq:
> +    if (rdma->pd) {
> +        ibv_dealloc_pd(rdma->pd);
> +    }
> +    if (rdma->comp_channel) {
> +        ibv_destroy_comp_channel(rdma->comp_channel);
> +    }
> +    rdma->pd = NULL;
> +    rdma->comp_channel = NULL;
> +    return -1;
> +
> +}
> +
> +/*
> + * Create queue pairs.
> + */
> +static int qemu_rdma_alloc_qp(RDMAContext *rdma)
> +{
> +    struct ibv_qp_init_attr attr = { 0 };
> +    int ret;
> +
> +    attr.cap.max_send_wr = RDMA_QP_SIZE;
> +    attr.cap.max_recv_wr = 3;
> +    attr.cap.max_send_sge = 1;
> +    attr.cap.max_recv_sge = 1;
> +    attr.send_cq = rdma->cq;
> +    attr.recv_cq = rdma->cq;
> +    attr.qp_type = IBV_QPT_RC;
> +
> +    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
> +    if (ret) {
> +        return -1;
> +    }
> +
> +    rdma->qp = rdma->cm_id->qp;
> +    return 0;
> +}
> +
> +static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma,
> +                                RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> +    int i;
> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> +        DDPRINTF("Registering whole ram blocks\n");
> +        rdma_local_ram_blocks->block[i].mr =
> +            ibv_reg_mr(rdma->pd,
> +                    rdma_local_ram_blocks->block[i].local_host_addr,
> +                    rdma_local_ram_blocks->block[i].length,
> +                    IBV_ACCESS_LOCAL_WRITE |
> +                    IBV_ACCESS_REMOTE_WRITE
> +                    );
> +        if (!rdma_local_ram_blocks->block[i].mr) {
> +            fprintf(stderr, "Failed to register local dest ram block!\n");
> +            break;
> +        }
> +        DDPRINTF("Finished registering whole ram blocks\n");
> +    }
> +
> +    if (i >= rdma_local_ram_blocks->num_blocks) {
> +        return 0;
> +    }
> +
> +    for (i--; i >= 0; i--) {
> +        ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
> +    }
> +
> +    return -1;
> +
> +}
> +
> +/*
> + * Shutdown and clean things up.
> + */
> +static void qemu_rdma_dereg_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
> +{
> +    int i, j;
> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> +        int num_chunks;
> +        if (!rdma_local_ram_blocks->block[i].pmr) {
> +            continue;
> +        }
> +        num_chunks = ram_chunk_count(&(rdma_local_ram_blocks->block[i]));
> +        for (j = 0; j < num_chunks; j++) {
> +            if (!rdma_local_ram_blocks->block[i].pmr[j]) {
> +                continue;
> +            }
> +            ibv_dereg_mr(rdma_local_ram_blocks->block[i].pmr[j]);
> +        }
> +        g_free(rdma_local_ram_blocks->block[i].pmr);
> +        rdma_local_ram_blocks->block[i].pmr = NULL;
> +    }
> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
> +        if (!rdma_local_ram_blocks->block[i].mr) {
> +            continue;
> +        }
> +        ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
> +        rdma_local_ram_blocks->block[i].mr = NULL;
> +    }
> +}
> +
> +/*
> + * Server uses this to prepare to transmit the RAMBlock descriptions
> + * to the primary VM after connection setup.
> + * Both sides use the "remote" structure to communicate and update
> + * their "local" descriptions with what was sent.
> + */
> +static void qemu_rdma_copy_to_remote_ram_blocks(RDMAContext *rdma,
> +                                                RDMALocalBlocks *local,
> +                                                RDMARemoteBlocks *remote)
> +{
> +    int i;
> +    DPRINTF("Allocating %d remote ram block structures\n", local->num_blocks);
> +
> +    for (i = 0; i < local->num_blocks; i++) {
> +            remote->block[i].remote_host_addr =
> +                (uint64_t)(local->block[i].local_host_addr);
> +
> +            if (!rdma->chunk_register_destination) {
> +                remote->block[i].remote_rkey = local->block[i].mr->rkey;
> +            }
> +
> +            remote->block[i].offset = local->block[i].offset;
> +            remote->block[i].length = local->block[i].length;
> +    }
> +}
> +
> +/*
> + * The protocol uses two different sets of rkeys (mutually exclusive):
> + * 1. One key to represent the virtual address of the entire ram block.
> + *    (dynamic chunk registration disabled - pin everything with one rkey.)
> + * 2. One to represent individual chunks within a ram block. 
> + *    (dynamic chunk registration enabled - pin individual chunks.)
> + *
> + * Once the capability is successfully negotiated, the destination transmits
> + * the keys to use (or sends them later) including the virtual addresses
> + * and then propagates the remote ram block descriptions to his local copy.
> + */
> +static int qemu_rdma_process_remote_ram_blocks(RDMALocalBlocks *local,
> +                                               RDMARemoteBlocks *remote,
> +                                               int num_blocks)
> +{
> +    int i, j;
> +
> +    if (local->num_blocks != num_blocks) {
> +        fprintf(stderr, "local %d != remote %d\n",
> +            local->num_blocks, num_blocks);
> +        return -1;
> +    }
> +
> +    for (i = 0; i < num_blocks; i++) {
> +        /* search local ram blocks */
> +        for (j = 0; j < local->num_blocks; j++) {
> +            if (remote->block[i].offset != local->block[j].offset) {
> +                continue;
> +            }
> +            if (remote->block[i].length != local->block[j].length) {
> +                return -1;
> +            }
> +            local->block[j].remote_host_addr =
> +                remote->block[i].remote_host_addr;
> +            local->block[j].remote_rkey = remote->block[i].remote_rkey;
> +            break;
> +        }
> +        if (j >= local->num_blocks) {
> +            return -1;
> +        }
> +    }
> +
> +    return 0;
> +}
> +
> +/*
> + * Find the ram block that corresponds to the page requested to be
> + * transmitted by QEMU.
> + *
> + * Once the block is found, also identify which 'chunk' within that
> + * block that the page belongs to.
> + *
> + * This search cannot fail or the migration will fail.
> + */
> +static int qemu_rdma_search_ram_block(uint64_t offset, uint64_t length,
> +        RDMALocalBlocks *blocks, int *block_index, int *chunk_index)
> +{
> +    int i;
> +    uint8_t *host_addr;
> +
> +    for (i = 0; i < blocks->num_blocks; i++) {
> +        if (offset < blocks->block[i].offset) {
> +            continue;
> +        }
> +        if (offset + length >
> +                blocks->block[i].offset + blocks->block[i].length) {
> +            continue;
> +        }
> +
> +        *block_index = i;
> +        host_addr = blocks->block[i].local_host_addr +
> +                (offset - blocks->block[i].offset);
> +        *chunk_index = ram_chunk_index(blocks->block[i].local_host_addr, host_addr);
> +        return 0;
> +    }
> +    return -1;
> +}
> +
> +/*
> + * Register a chunk with IB. If the chunk was already registered
> + * previously, then skip.
> + *
> + * Also return the keys associated with the registration needed
> + * to perform the actual RDMA operation.
> + */
> +static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
> +        RDMALocalBlock *block, uint8_t * host_addr,
> +        uint32_t *lkey, uint32_t *rkey)
> +{
> +    int chunk;
> +    if (block->mr) {
> +        if (lkey) {
> +            *lkey = block->mr->lkey;
> +        }
> +        if (rkey) {
> +            *rkey = block->mr->rkey;
> +        }
> +        return 0;
> +    }
> +
> +    /* allocate memory to store chunk MRs */
> +    if (!block->pmr) {
> +        int num_chunks = ram_chunk_count(block);
> +        block->pmr = g_malloc0(num_chunks *
> +                sizeof(struct ibv_mr *));
> +        if (!block->pmr) {
> +            return -1;
> +        }
> +    }
> +
> +    /*
> +     * If 'rkey', then we're the destination, so grant access to the source.
> +     *
> +     * If 'lkey', then we're the primary VM, so grant access only to ourselves.
> +     */
> +    chunk = ram_chunk_index(block->local_host_addr, host_addr);
> +    if (!block->pmr[chunk]) {
> +        uint8_t *start_addr = ram_chunk_start(block, chunk);
> +        uint8_t *end_addr = ram_chunk_end(block, chunk);
> +
> +        DDPRINTF("Registering chunk\n");
> +
> +        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
> +                start_addr,
> +                end_addr - start_addr,
> +                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
> +                        IBV_ACCESS_REMOTE_WRITE) : 0));
> +
> +        if (!block->pmr[chunk]) {
> +            fprintf(stderr, "Failed to register chunk!\n");
> +            return -1;
> +        }
> +        DDPRINTF("Finished registering chunk\n");
> +    }
> +
> +    if (lkey) {
> +        *lkey = block->pmr[chunk]->lkey;
> +    }
> +    if (rkey) {
> +        *rkey = block->pmr[chunk]->rkey;
> +    }
> +    return 0;
> +}
> +
> +/*
> + * Register (at connection time) the memory used for control
> + * channel messages.
> + */
> +static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
> +{
> +    DDPRINTF("Registering control\n");
> +    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
> +            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
> +            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
> +    if (rdma->wr_data[idx].control_mr) {
> +        DDPRINTF("Finished registering control\n");
> +        return 0;
> +    }
> +    fprintf(stderr, "qemu_rdma_reg_control failed!\n");
> +    return -1;
> +}
> +
> +static int qemu_rdma_dereg_control(RDMAContext *rdma, int idx)
> +{
> +    return ibv_dereg_mr(rdma->wr_data[idx].control_mr);
> +}
> +
> +#if defined(DEBUG_RDMA) || defined(DEBUG_RDMA_VERBOSE)
> +static const char *print_wrid(int wrid)
> +{
> +    if (wrid >= RDMA_WRID_RECV_CONTROL) {
> +        return wrid_desc[RDMA_WRID_RECV_CONTROL];
> +    }
> +    return wrid_desc[wrid];
> +}
> +#endif
> +
> +/*
> + * Consult the connection manager to see a work request
> + * (of any kind) has completed.
> + * Return the work request ID that completed.
> + */
> +static int qemu_rdma_poll(RDMAContext *rdma)
> +{
> +    int ret;
> +    struct ibv_wc wc;
> +
> +    ret = ibv_poll_cq(rdma->cq, 1, &wc);
> +    if (!ret) {
> +        return RDMA_WRID_NONE;
> +    }
> +    if (ret < 0) {
> +        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
> +        return ret;
> +    }
> +    if (wc.status != IBV_WC_SUCCESS) {
> +        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
> +                        wc.status, ibv_wc_status_str(wc.status));
> +        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wc.wr_id]);
> +
> +        return -1;
> +    }
> +
> +    if (rdma->control_ready_expected &&
> +        (wc.wr_id >= RDMA_WRID_RECV_CONTROL)) {
> +        DDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")\n",
> +            wrid_desc[RDMA_WRID_RECV_CONTROL], wc.wr_id -
> +            RDMA_WRID_RECV_CONTROL, wc.wr_id);
> +        rdma->control_ready_expected = 0;
> +    }
> +
> +    if (wc.wr_id == RDMA_WRID_RDMA_WRITE) {
> +        rdma->num_signaled_send--;
> +        DDPRINTF("completions %s (%" PRId64 ") left %d\n",
> +            print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
> +    } else {
> +        DDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
> +            print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
> +    }
> +
> +    return  (int)wc.wr_id;
> +}
> +
> +/*
> + * Block until the next work request has completed.
> + *
> + * First poll to see if a work request has already completed,
> + * otherwise block.
> + *
> + * If we encounter completed work requests for IDs other than
> + * the one we're interested in, then that's generally an error.
> + *
> + * The only exception is actual RDMA Write completions. These
> + * completions only need to be recorded, but do not actually
> + * need further processing.
> + */
> +static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid)
> +{
> +    int num_cq_events = 0;
> +    int r = RDMA_WRID_NONE;
> +    struct ibv_cq *cq;
> +    void *cq_ctx;
> +
> +    if (ibv_req_notify_cq(rdma->cq, 0)) {
> +        return -1;
> +    }
> +    /* poll cq first */
> +    while (r != wrid) {
> +        r = qemu_rdma_poll(rdma);
> +        if (r < 0) {
> +            return r;
> +        }
> +        if (r == RDMA_WRID_NONE) {
> +            break;
> +        }
> +        if (r != wrid) {
> +            DDPRINTF("A Wanted wrid %s (%d) but got %s (%d)\n",
> +                print_wrid(wrid), wrid, print_wrid(r), r);
> +        }
> +    }
> +    if (r == wrid) {
> +        return 0;
> +    }
> +
> +    while (1) {
> +        /*
> +         * Coroutine doesn't start until process_incoming_migration()
> +         * so don't yield unless we know we're running inside of a coroutine.
> +         */
> +        if (rdma->migration_started_on_destination) {
> +            yield_until_fd_readable(rdma->comp_channel->fd);
> +        }
> +
> +        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
> +            perror("ibv_get_cq_event");
> +            goto err_block_for_wrid;
> +        }
> +
> +        num_cq_events++;
> +
> +        if (ibv_req_notify_cq(cq, 0)) {
> +            goto err_block_for_wrid;
> +        }
> +        /* poll cq */
> +        while (r != wrid) {
> +            r = qemu_rdma_poll(rdma);
> +            if (r < 0) {
> +                goto err_block_for_wrid;
> +            }
> +            if (r == RDMA_WRID_NONE) {
> +                break;
> +            }
> +            if (r != wrid) {
> +                DDPRINTF("B Wanted wrid %s (%d) but got %s (%d)\n",
> +                    print_wrid(wrid), wrid, print_wrid(r), r);
> +            }
> +        }
> +        if (r == wrid) {
> +            goto success_block_for_wrid;
> +        }
> +    }
> +
> +success_block_for_wrid:
> +    if (num_cq_events) {
> +        ibv_ack_cq_events(cq, num_cq_events);
> +    }
> +    return 0;
> +
> +err_block_for_wrid:
> +    if (num_cq_events) {
> +        ibv_ack_cq_events(cq, num_cq_events);
> +    }
> +    return -1;
> +}
> +
> +/*
> + * Post a SEND message work request for the control channel
> + * containing some data and block until the post completes.
> + */
> +static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
> +                                       RDMAControlHeader *head)
> +{
> +    int ret = 0;
> +    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_CONTROL_MAX_WR];
> +    struct ibv_send_wr *bad_wr;
> +    struct ibv_sge sge = {
> +                           .addr = (uint64_t)(wr->control),
> +                           .length = head->len + sizeof(RDMAControlHeader),
> +                           .lkey = wr->control_mr->lkey,
> +                         };
> +    struct ibv_send_wr send_wr = {
> +                                   .wr_id = RDMA_WRID_SEND_CONTROL,
> +                                   .opcode = IBV_WR_SEND,
> +                                   .send_flags = IBV_SEND_SIGNALED,
> +                                   .sg_list = &sge,
> +                                   .num_sge = 1,
> +                                };
> +
> +    DPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
> +
> +    /*
> +     * We don't actually need to do a memcpy() in here if we used
> +     * the "sge" properly, but since we're only sending control messages
> +     * (not RAM in a performance-critical path), then its OK for now.
> +     *
> +     * The copy makes the RDMAControlHeader simpler to manipulate
> +     * for the time being.
> +     */
> +    memcpy(wr->control, head, sizeof(RDMAControlHeader));
> +    control_to_network((void *) wr->control);
> +
> +    if (buf) {
> +        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
> +    }
> +
> +
> +    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
> +        return -1;
> +    }
> +
> +    if (ret < 0) {
> +        fprintf(stderr, "Failed to use post IB SEND for control!\n");
> +        return ret;
> +    }
> +
> +    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
> +    if (ret < 0) {
> +        fprintf(stderr, "rdma migration: send polling control error!\n");
> +    }
> +
> +    return ret;
> +}
> +
> +/*
> + * Post a RECV work request in anticipation of some future receipt
> + * of data on the control channel.
> + */
> +static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
> +{
> +    struct ibv_recv_wr *bad_wr;
> +    struct ibv_sge sge = {
> +                            .addr = (uint64_t)(rdma->wr_data[idx].control),
> +                            .length = RDMA_CONTROL_MAX_BUFFER,
> +                            .lkey = rdma->wr_data[idx].control_mr->lkey,
> +                         };
> +
> +    struct ibv_recv_wr recv_wr = {
> +                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
> +                                    .sg_list = &sge,
> +                                    .num_sge = 1,
> +                                 };
> +
> +
> +    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +/*
> + * Block and wait for a RECV control channel message to arrive.
> + */
> +static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
> +                RDMAControlHeader *head, int expecting, int idx)
> +{
> +    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
> +
> +    if (ret < 0) {
> +        fprintf(stderr, "rdma migration: recv polling control error!\n");
> +        return ret;
> +    }
> +
> +    network_to_control((void *) rdma->wr_data[idx].control);
> +    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
> +
> +    DPRINTF("CONTROL: %s received\n", control_desc[expecting]);
> +
> +    if ((expecting != RDMA_CONTROL_NONE && head->type != expecting)
> +            || head->type == RDMA_CONTROL_ERROR) {
> +        fprintf(stderr, "Was expecting a %s (%d) control message"
> +                ", but got: %s (%d), length: %d\n",
> +                control_desc[expecting], expecting,
> +                control_desc[head->type], head->type, head->len);
> +        return -EIO;
> +    }
> +
> +    return 0;
> +}
> +
> +/*
> + * When a RECV work request has completed, the work request's
> + * buffer is pointed at the header.
> + *
> + * This will advance the pointer to the data portion
> + * of the control message of the work request's buffer that
> + * was populated after the work request finished.
> + */
> +static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
> +                                  RDMAControlHeader *head)
> +{
> +    rdma->wr_data[idx].control_len = head->len;
> +    rdma->wr_data[idx].control_curr =
> +        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
> +}
> +
> +/*
> + * This is an 'atomic' high-level operation to deliver a single, unified
> + * control-channel message.
> + *
> + * Additionally, if the user is expecting some kind of reply to this message,
> + * they can request a 'resp' response message be filled in by posting an
> + * additional work request on behalf of the user and waiting for an additional
> + * completion.
> + *
> + * The extra (optional) response is used during registration to us from having
> + * to perform an *additional* exchange of message just to provide a response by
> + * instead piggy-backing on the acknowledgement.
> + */
> +static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
> +                                   uint8_t *data, RDMAControlHeader *resp,
> +                                   int *resp_idx)
> +{
> +    int ret = 0;
> +    int idx = 0;
> +
> +    /*
> +     * Wait until the dest is ready before attempting to deliver the message
> +     * by waiting for a READY message.
> +     */
> +    if (rdma->control_ready_expected) {
> +        RDMAControlHeader resp;
> +        ret = qemu_rdma_exchange_get_response(rdma,
> +                                    &resp, RDMA_CONTROL_READY, idx);
> +        if (ret < 0) {
> +            return ret;
> +        }
> +    }
> +
> +    /*
> +     * If the user is expecting a response, post a WR in anticipation of it.
> +     */
> +    if (resp) {
> +        ret = qemu_rdma_post_recv_control(rdma, idx + 1);
> +        if (ret) {
> +            fprintf(stderr, "rdma migration: error posting"
> +                    " extra control recv for anticipated result!");
> +            return ret;
> +        }
> +    }
> +
> +    /*
> +     * Post a WR to replace the one we just consumed for the READY message.
> +     */
> +    ret = qemu_rdma_post_recv_control(rdma, idx);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error posting first control recv!");
> +        return ret;
> +    }
> +
> +    /*
> +     * Deliver the control message that was requested.
> +     */
> +    ret = qemu_rdma_post_send_control(rdma, data, head);
> +
> +    if (ret < 0) {
> +        fprintf(stderr, "Failed to send control buffer!\n");
> +        return ret;
> +    }
> +
> +    /*
> +     * If we're expecting a response, block and wait for it.
> +     */
> +    if (resp) {
> +        DPRINTF("Waiting for response %s\n", control_desc[resp->type]);
> +        ret = qemu_rdma_exchange_get_response(rdma, resp, resp->type, idx + 1);
> +
> +        if (ret < 0) {
> +            return ret;
> +        }
> +
> +        qemu_rdma_move_header(rdma, idx + 1, resp);
> +        *resp_idx = idx + 1;
> +        DPRINTF("Response %s received.\n", control_desc[resp->type]);
> +    }
> +
> +    rdma->control_ready_expected = 1;
> +
> +    return 0;
> +}
> +
> +/*
> + * This is an 'atomic' high-level operation to receive a single, unified
> + * control-channel message.
> + */
> +static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
> +                                int expecting)
> +{
> +    RDMAControlHeader ready = {
> +                                .len = 0,
> +                                .type = RDMA_CONTROL_READY,
> +                                .repeat = 1,
> +                              };
> +    int ret;
> +    int idx = 0;
> +
> +    /*
> +     * Inform the source that we're ready to receive a message.
> +     */
> +    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
> +
> +    if (ret < 0) {
> +        fprintf(stderr, "Failed to send control buffer!\n");
> +        return ret;
> +    }
> +
> +    /*
> +     * Block and wait for the message.
> +     */
> +    ret = qemu_rdma_exchange_get_response(rdma, head, expecting, idx);
> +
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    qemu_rdma_move_header(rdma, idx, head);
> +
> +    /*
> +     * Post a new RECV work request to replace the one we just consumed.
> +     */
> +    ret = qemu_rdma_post_recv_control(rdma, idx);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error posting second control recv!");
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +/*
> + * Write an actual chunk of memory using RDMA.
> + *
> + * If we're using dynamic registration on the dest-side, we have to
> + * send a registration command first.
> + */
> +static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
> +        int current_index,
> +        uint64_t offset, uint64_t length,
> +        uint64_t wr_id, enum ibv_send_flags flag)
> +{
> +    struct ibv_sge sge;
> +    struct ibv_send_wr send_wr = { 0 };
> +    struct ibv_send_wr *bad_wr;
> +    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
> +    int chunk;
> +    RDMARegister reg;
> +    RDMARegisterResult *reg_result;
> +    int reg_result_idx;
> +    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
> +    RDMAControlHeader head = { .len = sizeof(RDMARegister),
> +                               .type = RDMA_CONTROL_REGISTER_REQUEST,
> +                               .repeat = 1,
> +                             };
> +    int ret;
> +
> +    sge.addr = (uint64_t)(block->local_host_addr + (offset - block->offset));
> +    sge.length = length;
> +
> +    if (rdma->chunk_register_destination) {
> +        chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
> +        if (!block->remote_keys[chunk]) {
> +            /*
> +             * This page has not yet been registered, so first check to see
> +             * if the entire chunk is zero. If so, tell the other size to
> +             * memset() + madvise() the entire chunk without RDMA.
> +             */
> +            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
> +                   && buffer_find_nonzero_offset((void *)sge.addr,
> +                                                    length) == length) {
> +                RDMACompress comp = {
> +                                        .offset = offset,
> +                                        .value = 0,
> +                                        .block_idx = current_index,
> +                                        .length = length,
> +                                    };
> +
> +                head.len = sizeof(comp);
> +                head.type = RDMA_CONTROL_COMPRESS;
> +
> +                DPRINTF("Entire chunk is zero, sending compress: %d for %d "
> +                    "bytes, index: %d, offset: %" PRId64 "...\n",
> +                    chunk, sge.length, current_index, offset);
> +
> +                ret = qemu_rdma_exchange_send(rdma, &head,
> +                                (uint8_t *) &comp, NULL, NULL);
> +
> +                if (ret < 0) {
> +                    return -EIO;
> +                }
> +
> +                return 1;
> +            }
> +
> +            /*
> +             * Otherwise, tell other side to register.
> +             */
> +            reg.len = sge.length;
> +            reg.current_index = current_index;
> +            reg.offset = offset;
> +
> +            DPRINTF("Sending registration request chunk %d for %d "
> +                    "bytes, index: %d, offset: %" PRId64 "...\n",
> +                    chunk, sge.length, current_index, offset);
> +
> +            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
> +                                    &resp, &reg_result_idx);
> +            if (ret < 0) {
> +                return ret;
> +            }
> +
> +            reg_result = (RDMARegisterResult *)
> +                    rdma->wr_data[reg_result_idx].control_curr;
> +
> +            DPRINTF("Received registration result:"
> +                    " my key: %x their key %x, chunk %d\n",
> +                    block->remote_keys[chunk], reg_result->rkey, chunk);
> +
> +            block->remote_keys[chunk] = reg_result->rkey;
> +        }
> +
> +        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
> +    } else {
> +        send_wr.wr.rdma.rkey = block->remote_rkey;
> +    }
> +
> +    if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
> +                                        &sge.lkey, NULL)) {
> +        fprintf(stderr, "cannot get lkey!\n");
> +        return -EINVAL;
> +    }
> +
> +    send_wr.wr_id = wr_id;
> +    send_wr.opcode = IBV_WR_RDMA_WRITE;
> +    send_wr.send_flags = flag;
> +    send_wr.sg_list = &sge;
> +    send_wr.num_sge = 1;
> +    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
> +        (offset - block->offset);
> +
> +    return ibv_post_send(rdma->qp, &send_wr, &bad_wr);
> +}
> +
> +/*
> + * Push out any unwritten RDMA operations.
> + *
> + * We support sending out multiple chunks at the same time.
> + * Not all of them need to get signaled in the completion queue.
> + */
> +static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
> +{
> +    int ret;
> +    enum ibv_send_flags flags = 0;
> +
> +    if (!rdma->current_length) {
> +        return 0;
> +    }
> +    if (rdma->num_unsignaled_send >=
> +            RDMA_UNSIGNALED_SEND_MAX) {
> +        flags = IBV_SEND_SIGNALED;
> +    }
> +
> +retry:
> +    ret = qemu_rdma_write_one(f, rdma,
> +            rdma->current_index,
> +            rdma->current_offset,
> +            rdma->current_length,
> +            RDMA_WRID_RDMA_WRITE, flags);
> +
> +    if (ret < 0) {
> +        if (ret == -ENOMEM) {
> +            DPRINTF("send queue is full. wait a little....\n");
> +            ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
> +            if (ret >= 0) {
> +                goto retry;
> +            }
> +            if (ret < 0) {
> +                fprintf(stderr, "rdma migration: failed to make "
> +                                "room in full send queue! %d\n", ret);
> +                return ret;
> +            }
> +        }
> +        perror("write flush error");
> +        return ret;
> +    }
> +
> +    if (ret == 0) {
> +        if (rdma->num_unsignaled_send >=
> +                RDMA_UNSIGNALED_SEND_MAX) {
> +            rdma->num_unsignaled_send = 0;
> +            rdma->num_signaled_send++;
> +            DPRINTF("signaled total: %d\n", rdma->num_signaled_send);
> +        } else {
> +            rdma->num_unsignaled_send++;
> +        }
> +    }
> +
> +    rdma->current_length = 0;
> +    rdma->current_offset = 0;
> +
> +    return 0;
> +}
> +
> +static inline int qemu_rdma_in_current_block(RDMAContext *rdma,
> +                uint64_t offset, uint64_t len)
> +{
> +    RDMALocalBlock *block =
> +        &(rdma->local_ram_blocks.block[rdma->current_index]);
> +    if (rdma->current_index < 0) {
> +        return 0;
> +    }
> +    if (offset < block->offset) {
> +        return 0;
> +    }
> +    if (offset + len > block->offset + block->length) {
> +        return 0;
> +    }
> +    return 1;
> +}
> +
> +static inline int qemu_rdma_in_current_chunk(RDMAContext *rdma,
> +                uint64_t offset, uint64_t len)
> +{
> +    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[rdma->current_index]);
> +    uint8_t *chunk_start, *chunk_end, *host_addr;
> +    if (rdma->current_chunk < 0) {
> +        return 0;
> +    }
> +    host_addr = block->local_host_addr + (offset - block->offset);
> +    chunk_start = ram_chunk_start(block, rdma->current_chunk);
> +
> +    if (host_addr < chunk_start) {
> +        return 0;
> +    }
> +
> +    chunk_end = ram_chunk_end(block, rdma->current_chunk);
> +
> +    if ((host_addr + len) > chunk_end) {
> +        return 0;
> +    }
> +    return 1;
> +}
> +
> +static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
> +                    uint64_t offset, uint64_t len)
> +{
> +    if (rdma->current_length == 0) {
> +        return 0;
> +    }
> +    if (offset != rdma->current_offset + rdma->current_length) {
> +        return 0;
> +    }
> +    if (!qemu_rdma_in_current_block(rdma, offset, len)) {
> +        return 0;
> +    }
> +    if (!qemu_rdma_in_current_chunk(rdma, offset, len)) {
> +        return 0;
> +    }
> +    return 1;
> +}
> +
> +/*
> + * We're not actually writing here, but doing three things:
> + *
> + * 1. Identify the chunk the buffer belongs to.
> + * 2. If the chunk is full or the buffer doesn't belong to the current
> + *    chunk, then start a new chunk and flush() the old chunk.
> + * 3. To keep the hardware busy, we also group chunks into batches
> + *    and only require that a batch gets acknowledged in the completion
> + *    qeueue instead of each individual chunk.
> + */
> +static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
> +                           uint64_t offset, uint64_t len)
> +{
> +    int index = rdma->current_index;
> +    int chunk_index = rdma->current_chunk;
> +    int ret;
> +
> +    /* If we cannot merge it, we flush the current buffer first. */
> +    if (!qemu_rdma_buffer_mergable(rdma, offset, len)) {
> +        ret = qemu_rdma_write_flush(f, rdma);
> +        if (ret) {
> +            return ret;
> +        }
> +        rdma->current_length = 0;
> +        rdma->current_offset = offset;
> +
> +        ret = qemu_rdma_search_ram_block(offset, len,
> +                    &rdma->local_ram_blocks, &index, &chunk_index);
> +        if (ret) {
> +            fprintf(stderr, "ram block search failed\n");
> +            return ret;
> +        }
> +        rdma->current_index = index;
> +        rdma->current_chunk = chunk_index;
> +    }
> +
> +    /* merge it */
> +    rdma->current_length += len;
> +
> +    /* flush it if buffer is too large */
> +    if (rdma->current_length >= RDMA_MERGE_MAX) {
> +        return qemu_rdma_write_flush(f, rdma);
> +    }
> +
> +    return 0;
> +}
> +
> +static void qemu_rdma_cleanup(RDMAContext *rdma)
> +{
> +    struct rdma_cm_event *cm_event;
> +    int ret, idx;
> +
> +    if (rdma->cm_id) {
> +        if(rdma->error_state) {
> +            RDMAControlHeader head = { .len = 0,
> +                                       .type = RDMA_CONTROL_ERROR,
> +                                       .repeat = 1,
> +                                     };
> +            fprintf(stderr, "Early error. Sending error.\n");
> +            qemu_rdma_post_send_control(rdma, NULL, &head);
> +        }
> +
> +        ret = rdma_disconnect(rdma->cm_id);
> +        if (!ret) {
> +            DPRINTF("waiting for disconnect\n");
> +            ret = rdma_get_cm_event(rdma->channel, &cm_event);
> +            if (!ret) {
> +                rdma_ack_cm_event(cm_event);
> +            }
> +        }
> +        DPRINTF("Disconnected.\n");
> +        rdma->cm_id = 0;
> +    }
> +
> +    g_free(rdma->remote_ram_blocks.remote_area);
> +    rdma->remote_ram_blocks.remote_area = NULL;
> +
> +    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
> +        if (rdma->wr_data[idx].control_mr) {
> +            qemu_rdma_dereg_control(rdma, idx);
> +        }
> +        rdma->wr_data[idx].control_mr = NULL;
> +    }
> +
> +    if (rdma->local_ram_blocks.block) {
> +        qemu_rdma_dereg_ram_blocks(&rdma->local_ram_blocks);
> +
> +        if (rdma->chunk_register_destination) {
> +            for (idx = 0; idx < rdma->local_ram_blocks.num_blocks; idx++) {
> +                RDMALocalBlock *block = &(rdma->local_ram_blocks.block[idx]);
> +                g_free(block->remote_keys);
> +                block->remote_keys = NULL;
> +            }
> +        }
> +        g_free(rdma->local_ram_blocks.block);
> +        rdma->local_ram_blocks.block = NULL;
> +    }
> +
> +    if (rdma->qp) {
> +        ibv_destroy_qp(rdma->qp);
> +        rdma->qp = NULL;
> +    }
> +    if (rdma->cq) {
> +        ibv_destroy_cq(rdma->cq);
> +        rdma->cq = NULL;
> +    }
> +    if (rdma->comp_channel) {
> +        ibv_destroy_comp_channel(rdma->comp_channel);
> +        rdma->comp_channel = NULL;
> +    }
> +    if (rdma->pd) {
> +        ibv_dealloc_pd(rdma->pd);
> +        rdma->pd = NULL;
> +    }
> +    if (rdma->listen_id) {
> +        rdma_destroy_id(rdma->listen_id);
> +        rdma->listen_id = 0;
> +    }
> +    if (rdma->cm_id) {
> +        rdma_destroy_id(rdma->cm_id);
> +        rdma->cm_id = 0;
> +    }
> +    if (rdma->channel) {
> +        rdma_destroy_event_channel(rdma->channel);
> +        rdma->channel = NULL;
> +    }
> +}
> +
> +static void qemu_rdma_remote_ram_blocks_init(RDMAContext *rdma)
> +{
> +    int remote_size = sizeof(RDMARemoteBlock) * rdma->local_ram_blocks.num_blocks;
> +
> +    DPRINTF("Preparing %d bytes for remote info\n", remote_size);
> +
> +    rdma->remote_ram_blocks.remote_area = g_malloc0(remote_size);
> +    rdma->remote_ram_blocks.block = (RDMARemoteBlock *) rdma->remote_ram_blocks.remote_area;
> +}
> +
> +static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp,
> +                                 bool chunk_register_destination)
> +{
> +    int ret, idx;
> +
> +    /*
> +     * Will be validated against destination's actual capabilities
> +     * after the connect() completes.
> +     */
> +    rdma->chunk_register_destination = chunk_register_destination;
> +
> +    ret = qemu_rdma_resolve_host(rdma);
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error resolving host!\n");
> +        goto err_rdma_source_init;
> +    }
> +
> +    ret = qemu_rdma_alloc_pd_cq(rdma);
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error allocating pd and cq!\n");
> +        goto err_rdma_source_init;
> +    }
> +
> +    ret = qemu_rdma_alloc_qp(rdma);
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error allocating qp!\n");
> +        goto err_rdma_source_init;
> +    }
> +
> +    ret = qemu_rdma_init_ram_blocks(&rdma->local_ram_blocks);
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error initializing ram blocks!\n");
> +        goto err_rdma_source_init;
> +    }
> +
> +    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
> +        ret = qemu_rdma_reg_control(rdma, idx);
> +        if (ret) {
> +            error_setg(errp, "rdma migration: error registering %d control!\n",
> +                                                            idx);
> +            goto err_rdma_source_init;
> +        }
> +    }
> +
> +    qemu_rdma_remote_ram_blocks_init(rdma);
> +    return 0;
> +
> +err_rdma_source_init:
> +    qemu_rdma_cleanup(rdma);
> +    return -1;
> +}
> +
> +static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
> +{
> +    RDMAControlHeader head;
> +    RDMACapabilities cap = {
> +                                .version = RDMA_CONTROL_VERSION_CURRENT,
> +                                .flags = 0,
> +                           };
> +    struct rdma_conn_param conn_param = { .initiator_depth = 2,
> +                                          .retry_count = 5,
> +                                          .private_data = &cap,
> +                                          .private_data_len = sizeof(cap),
> +                                        };
> +    struct rdma_cm_event *cm_event;
> +    int ret;
> +    int idx = 0;
> +    int x;
> +
> +    /*
> +     * Only negotiate the capability with destination if the user
> +     * on the source first requested the capability.
> +     */
> +    if (rdma->chunk_register_destination) {
> +        DPRINTF("Server dynamic registration requested.\n");
> +        cap.flags |= RDMA_CAPABILITY_CHUNK_REGISTER;
> +    }
> +
> +    caps_to_network(&cap);
> +
> +    ret = rdma_connect(rdma->cm_id, &conn_param);
> +    if (ret) {
> +        perror("rdma_connect");
> +        error_setg(errp, "rdma migration: error connecting!\n");
> +        rdma_destroy_id(rdma->cm_id);
> +        rdma->cm_id = 0;
> +        goto err_rdma_source_connect;
> +    }
> +
> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
> +    if (ret) {
> +        perror("rdma_get_cm_event after rdma_connect");
> +        error_setg(errp, "rdma migration: error connecting!\n");
> +        rdma_ack_cm_event(cm_event);
> +        rdma_destroy_id(rdma->cm_id);
> +        rdma->cm_id = 0;
> +        goto err_rdma_source_connect;
> +    }
> +
> +    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
> +        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
> +        error_setg(errp, "rdma migration: error connecting!\n");
> +        rdma_ack_cm_event(cm_event);
> +        rdma_destroy_id(rdma->cm_id);
> +        rdma->cm_id = 0;
> +        goto err_rdma_source_connect;
> +    }
> +
> +    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
> +    network_to_caps(&cap);
> +
> +    /*
> +     * Verify that the *requested* capabilities are supported by the destination
> +     * and disable them otherwise.
> +     */
> +    if (!(cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) &&
> +                                rdma->chunk_register_destination) {
> +        fprintf(stderr, "Server cannot support dynamic registration. Will disable\n");
> +        rdma->chunk_register_destination = false;
> +    }
> +
> +    DPRINTF("Chunk registration %s\n",
> +        rdma->chunk_register_destination ? "enabled" : "disabled");
> +
> +    rdma_ack_cm_event(cm_event);
> +
> +    ret = qemu_rdma_post_recv_control(rdma, idx + 1);
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error posting first control recv!\n");
> +        goto err_rdma_source_connect;
> +    }
> +
> +    ret = qemu_rdma_post_recv_control(rdma, idx);
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error posting second control recv!\n");
> +        goto err_rdma_source_connect;
> +    }
> +
> +    ret = qemu_rdma_exchange_get_response(rdma,
> +                                &head, RDMA_CONTROL_RAM_BLOCKS, idx + 1);
> +
> +    if (ret < 0) {
> +        error_setg(errp, "rdma migration: error receiving remote info!\n");
> +        goto err_rdma_source_connect;
> +    }
> +
> +    qemu_rdma_move_header(rdma, idx + 1, &head);
> +    memcpy(rdma->remote_ram_blocks.remote_area, 
> +                rdma->wr_data[idx + 1].control_curr, head.len);
> +
> +    ret = qemu_rdma_process_remote_ram_blocks(&rdma->local_ram_blocks, 
> +                &rdma->remote_ram_blocks,
> +                (head.len / sizeof(RDMARemoteBlock)));
> +    if (ret) {
> +        error_setg(errp, "rdma migration: error processing"
> +                        " remote ram blocks!\n");
> +        goto err_rdma_source_connect;
> +    }
> +
> +    if (rdma->chunk_register_destination) {
> +        for (x = 0; x < rdma->local_ram_blocks.num_blocks; x++) {
> +            RDMALocalBlock *block = &(rdma->local_ram_blocks.block[x]);
> +            int num_chunks = ram_chunk_count(block);
> +            /* allocate memory to store remote rkeys */
> +            block->remote_keys = g_malloc0(num_chunks * sizeof(uint32_t));
> +        }
> +    }
> +    rdma->control_ready_expected = 1;
> +    rdma->num_signaled_send = 0;
> +    return 0;
> +
> +err_rdma_source_connect:
> +    qemu_rdma_cleanup(rdma);
> +    return -1;
> +}
> +
> +static int qemu_rdma_dest_init(RDMAContext *rdma)
> +{
> +    int ret = -EINVAL, idx;
> +    struct sockaddr_in sin;
> +    struct rdma_cm_id *listen_id;
> +    char ip[40] = "unknown";
> +
> +    for (idx = 0; idx < RDMA_CONTROL_MAX_WR; idx++) {
> +        rdma->wr_data[idx].control_len = 0;
> +        rdma->wr_data[idx].control_curr = NULL;
> +    }
> +
> +    if (rdma->host == NULL) {
> +        fprintf(stderr, "Error: RDMA host is not set!");
> +        rdma->error_state = -EINVAL;
> +        return -1;
> +    }
> +    /* create CM channel */
> +    rdma->channel = rdma_create_event_channel();
> +    if (!rdma->channel) {
> +        fprintf(stderr, "Error: could not create rdma event channel");
> +        rdma->error_state = -EINVAL;
> +        return -1;
> +    }
> +
> +    /* create CM id */
> +    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
> +    if (ret) {
> +        fprintf(stderr, "Error: could not create cm_id!");
> +        goto err_dest_init_create_listen_id;
> +    }
> +
> +    memset(&sin, 0, sizeof(sin));
> +    sin.sin_family = AF_INET;
> +    sin.sin_port = htons(rdma->port);
> +
> +    if (rdma->host && strcmp("", rdma->host)) {
> +        struct hostent *dest_addr;
> +        dest_addr = gethostbyname(rdma->host);
> +        if (!dest_addr) {
> +            fprintf(stderr, "Error: migration could not gethostbyname!");
> +            ret = -EINVAL;
> +            goto err_dest_init_bind_addr;
> +        }
> +        memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
> +                dest_addr->h_length);
> +        inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
> +    } else {
> +        sin.sin_addr.s_addr = INADDR_ANY;
> +    }
> +
> +    DPRINTF("%s => %s\n", rdma->host, ip);
> +
> +    ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
> +    if (ret) {
> +        fprintf(stderr, "Error: could not rdma_bind_addr!");
> +        goto err_dest_init_bind_addr;
> +    }
> +
> +    rdma->listen_id = listen_id;
> +    if (listen_id->verbs) {
> +        rdma->verbs = listen_id->verbs;
> +    }
> +    qemu_rdma_dump_id("dest_init", rdma->verbs);
> +    qemu_rdma_dump_gid("dest_init", listen_id);
> +    return 0;
> +
> +err_dest_init_bind_addr:
> +    rdma_destroy_id(listen_id);
> +err_dest_init_create_listen_id:
> +    rdma_destroy_event_channel(rdma->channel);
> +    rdma->channel = NULL;
> +    rdma->error_state = ret;
> +    return ret;
> +
> +}
> +
> +static int qemu_rdma_dest_prepare(RDMAContext *rdma)
> +{
> +    int ret;
> +    int idx;
> +
> +    if (!rdma->verbs) {
> +        fprintf(stderr, "rdma migration: no verbs context!");
> +        return 0;
> +    }
> +
> +    ret = qemu_rdma_alloc_pd_cq(rdma);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error allocating pd and cq!");
> +        goto err_rdma_dest_prepare;
> +    }
> +
> +    ret = qemu_rdma_init_ram_blocks(&rdma->local_ram_blocks);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error initializing ram blocks!");
> +        goto err_rdma_dest_prepare;
> +    }
> +
> +    qemu_rdma_remote_ram_blocks_init(rdma);
> +
> +    /* Extra one for the send buffer */
> +    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
> +        ret = qemu_rdma_reg_control(rdma, idx);
> +        if (ret) {
> +            fprintf(stderr, "rdma migration: error registering %d control!",
> +                        idx);
> +            goto err_rdma_dest_prepare;
> +        }
> +    }
> +
> +    ret = rdma_listen(rdma->listen_id, 5);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error listening on socket!");
> +        goto err_rdma_dest_prepare;
> +    }
> +
> +    return 0;
> +
> +err_rdma_dest_prepare:
> +    qemu_rdma_cleanup(rdma);
> +    return -1;
> +}
> +
> +static void *qemu_rdma_data_init(const char *host_port)
> +{
> +    RDMAContext *rdma = NULL;
> +    InetSocketAddress *addr;
> +
> +    if (host_port) {
> +        rdma = g_malloc0(sizeof(RDMAContext));
> +        memset(rdma, 0, sizeof(RDMAContext));
> +        rdma->current_index = -1;
> +        rdma->current_chunk = -1;
> +
> +        addr = inet_parse(host_port, NULL);
> +        if (addr != NULL) {
> +            rdma->port = atoi(addr->port);
> +            rdma->host = g_strdup(addr->host);
> +        } else {
> +            fprintf(stderr, "bad RDMA migration address '%s'", host_port);
> +            g_free(rdma);
> +            return NULL;
> +        }
> +    }
> +
> +    return rdma;
> +}
> +
> +/*
> + * QEMUFile interface to the control channel.
> + * SEND messages for control only.
> + * pc.ram is handled with regular RDMA messages.
> + */
> +static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
> +                                int64_t pos, int size)
> +{
> +    QEMUFileRDMA *r = opaque;
> +    QEMUFile *f = r->file;
> +    RDMAContext *rdma = r->rdma;
> +    size_t remaining = size;
> +    uint8_t * data = (void *) buf;
> +    int ret;
> +
> +    CHECK_ERROR_STATE();
> +
> +    /*
> +     * Push out any writes that
> +     * we're queued up for pc.ram.
> +     */
> +    if (qemu_rdma_write_flush(f, rdma) < 0) {
> +        rdma->error_state = -EIO;
> +        return rdma->error_state;
> +    }
> +
> +    while (remaining) {
> +        RDMAControlHeader head;
> +
> +        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
> +        remaining -= r->len;
> +
> +        head.len = r->len;
> +        head.type = RDMA_CONTROL_QEMU_FILE;
> +
> +        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL);
> +
> +        if (ret < 0) {
> +            rdma->error_state = ret;
> +            return ret;
> +        }
> +
> +        data += r->len;
> +    }
> +
> +    return size;
> +}
> +
> +static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
> +                             int size, int idx)
> +{
> +    size_t len = 0;
> +
> +    if (rdma->wr_data[idx].control_len) {
> +        DPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
> +                    rdma->wr_data[idx].control_len, size);
> +
> +        len = MIN(size, rdma->wr_data[idx].control_len);
> +        memcpy(buf, rdma->wr_data[idx].control_curr, len);
> +        rdma->wr_data[idx].control_curr += len;
> +        rdma->wr_data[idx].control_len -= len;
> +    }
> +
> +    return len;
> +}
> +
> +/*
> + * QEMUFile interface to the control channel.
> + * RDMA links don't use bytestreams, so we have to
> + * return bytes to QEMUFile opportunistically.
> + */
> +static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
> +                                int64_t pos, int size)
> +{
> +    QEMUFileRDMA *r = opaque;
> +    RDMAContext *rdma = r->rdma;
> +    RDMAControlHeader head;
> +    int ret = 0;
> +
> +    CHECK_ERROR_STATE();
> +
> +    /*
> +     * First, we hold on to the last SEND message we
> +     * were given and dish out the bytes until we run
> +     * out of bytes.
> +     */
> +    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
> +    if (r->len) {
> +        return r->len;
> +    }
> +
> +    /*
> +     * Once we run out, we block and wait for another
> +     * SEND message to arrive.
> +     */
> +    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
> +
> +    if (ret < 0) {
> +        rdma->error_state = ret;
> +        return ret;
> +    }
> +
> +    /*
> +     * SEND was received with new bytes, now try again.
> +     */
> +    return qemu_rdma_fill(r->rdma, buf, size, 0);
> +}
> +
> +/*
> + * Block until all the outstanding chunks have been delivered by the hardware.
> + */
> +static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
> +{
> +    int ret;
> +
> +    if (qemu_rdma_write_flush(f, rdma) < 0) {
> +        return -EIO;
> +    }
> +
> +    while (rdma->num_signaled_send) {
> +        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
> +        if (ret < 0) {
> +            fprintf(stderr, "rdma migration: complete polling error!\n");
> +            return -EIO;
> +        }
> +    }
> +
> +    return 0;
> +}
> +
> +static int qemu_rdma_close(void *opaque)
> +{
> +    QEMUFileRDMA *r = opaque;
> +    if (r->rdma) {
> +        qemu_rdma_cleanup(r->rdma);
> +        g_free(r->rdma);
> +    }
> +    g_free(r);
> +    return 0;
> +}
> +
> +static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
> +                   ram_addr_t block_offset, ram_addr_t offset, size_t size)
> +{
> +    ram_addr_t current_addr = block_offset + offset;
> +    QEMUFileRDMA *rfile = opaque;
> +    RDMAContext *rdma = rfile->rdma;
> +    int ret;
> +
> +    CHECK_ERROR_STATE();
> +
> +    qemu_fflush(f);
> +
> +    /*
> +     * Add this page to the current 'chunk'. If the chunk
> +     * is full, or the page doen't belong to the current chunk,
> +     * an actual RDMA write will occur and a new chunk will be formed.
> +     */
> +    ret = qemu_rdma_write(f, rdma, current_addr, size);
> +    if (ret < 0) {
> +        rdma->error_state = ret;
> +        fprintf(stderr, "rdma migration: write error! %d\n", ret);
> +        return ret;
> +    }
> +
> +    /*
> +     * Drain the Completion Queue if possible, but do not block,
> +     * just poll.
> +     *
> +     * If nothing to poll, the end of the iteration will do this
> +     * again to make sure we don't overflow the request queue.
> +     */
> +    while (1) {
> +        int ret = qemu_rdma_poll(rdma);
> +        if (ret == RDMA_WRID_NONE) {
> +            break;
> +        }
> +        if (ret < 0) {
> +            rdma->error_state = ret;
> +            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
> +            return ret;
> +        }
> +    }
> +
> +    return size;
> +}
> +
> +static int qemu_rdma_accept(RDMAContext *rdma)
> +{
> +    RDMAControlHeader head = { .len = rdma->local_ram_blocks.num_blocks *
> +                                        sizeof(RDMARemoteBlock),
> +                               .type = RDMA_CONTROL_RAM_BLOCKS,
> +                               .repeat = 1,
> +                             };
> +    RDMACapabilities cap;
> +    struct rdma_conn_param conn_param = {
> +                                            .responder_resources = 2,
> +                                            .private_data = &cap,
> +                                            .private_data_len = sizeof(cap),
> +                                         };
> +    struct rdma_cm_event *cm_event;
> +    struct ibv_context *verbs;
> +    int ret = -EINVAL;
> +
> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
> +    if (ret) {
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
> +        rdma_ack_cm_event(cm_event);
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
> +
> +    network_to_caps(&cap);
> +
> +    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
> +            fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
> +                            cap.version);
> +            rdma_ack_cm_event(cm_event);
> +            goto err_rdma_dest_wait;
> +    }
> +
> +    /*
> +     * Response with only the capabilities this version of QEMU knows about.
> +     */
> +    cap.flags &= known_capabilities;
> +
> +    /*
> +     * Enable the ones that we do know about.
> +     * Add other checks here as new ones are introduced.
> +     */
> +    if (cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) {
> +        rdma->chunk_register_destination = true;
> +    }
> +
> +    rdma->cm_id = cm_event->id;
> +    verbs = cm_event->id->verbs;
> +
> +    rdma_ack_cm_event(cm_event);
> +
> +    DPRINTF("Chunk registration %s\n",
> +        rdma->chunk_register_destination ? "enabled" : "disabled");
> +
> +    caps_to_network(&cap);
> +
> +    DPRINTF("verbs context after listen: %p\n", verbs);
> +
> +    if (!rdma->verbs) {
> +        rdma->verbs = verbs;
> +        ret = qemu_rdma_dest_prepare(rdma);
> +        if (ret) {
> +            fprintf(stderr, "rdma migration: error preparing dest!\n");
> +            goto err_rdma_dest_wait;
> +        }
> +    } else if (rdma->verbs != verbs) {
> +            fprintf(stderr, "ibv context not matching %p, %p!\n",
> +                    rdma->verbs, verbs);
> +            goto err_rdma_dest_wait;
> +    }
> +
> +    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
> +
> +    ret = qemu_rdma_alloc_qp(rdma);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error allocating qp!\n");
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    ret = rdma_accept(rdma->cm_id, &conn_param);
> +    if (ret) {
> +        fprintf(stderr, "rdma_accept returns %d!\n", ret);
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
> +    if (ret) {
> +        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
> +        fprintf(stderr, "rdma_accept not event established!\n");
> +        rdma_ack_cm_event(cm_event);
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    rdma_ack_cm_event(cm_event);
> +
> +    ret = qemu_rdma_post_recv_control(rdma, 0);
> +    if (ret) {
> +        fprintf(stderr, "rdma migration: error posting second control recv!\n");
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    if (!rdma->chunk_register_destination) {
> +        ret = qemu_rdma_reg_whole_ram_blocks(rdma, &rdma->local_ram_blocks);
> +        if (ret) {
> +            fprintf(stderr, "rdma migration: error dest "
> +                            "registering ram blocks!\n");
> +            goto err_rdma_dest_wait;
> +        }
> +    }
> +
> +    qemu_rdma_copy_to_remote_ram_blocks(rdma,
> +            &rdma->local_ram_blocks, &rdma->remote_ram_blocks);
> +
> +    ret = qemu_rdma_post_send_control(rdma,
> +            (uint8_t *) rdma->remote_ram_blocks.remote_area, &head);
> +
> +    if (ret < 0) {
> +        fprintf(stderr, "rdma migration: error sending remote info!\n");
> +        goto err_rdma_dest_wait;
> +    }
> +
> +    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
> +
> +    return 0;
> +
> +err_rdma_dest_wait:
> +    rdma->error_state = ret;
> +    qemu_rdma_cleanup(rdma);
> +    return ret;
> +}
> +                                       
> +/*
> + * During each iteration of the migration, we listen for instructions
> + * by the primary VM to perform dynamic page registrations before they
> + * can perform RDMA operations.
> + *
> + * We respond with the 'rkey'.
> + *
> + * Keep doing this until the primary tells us to stop.
> + */
> +static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
> +                                         uint64_t flags)
> +{
> +    RDMAControlHeader resp = { .len = sizeof(RDMARegisterResult),
> +                               .type = RDMA_CONTROL_REGISTER_RESULT,
> +                               .repeat = 0,
> +                             };
> +    QEMUFileRDMA *rfile = opaque;
> +    RDMAContext *rdma = rfile->rdma;
> +    RDMAControlHeader head;
> +    RDMARegister *reg, *registers;
> +    RDMACompress *comp;
> +    RDMARegisterResult *reg_result;
> +    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
> +    RDMALocalBlock *block;
> +    void *host_addr;
> +    int ret = 0;
> +    int idx = 0;
> +    int count = 0;
> +
> +    CHECK_ERROR_STATE();
> +
> +    do {
> +        DPRINTF("Waiting for next registration %d...\n", flags);
> +
> +        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
> +
> +        if (ret < 0) {
> +            break;
> +        }
> +
> +        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
> +            fprintf(stderr, "Too many requests in this message (%d). Bailing.\n",
> +                head.repeat);
> +            ret = -EIO;
> +            break;
> +        }
> +
> +        switch (head.type) {
> +        case RDMA_CONTROL_COMPRESS:
> +            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
> +
> +            DPRINTF("Zapping zero chunk: %" PRId64
> +                " bytes, index %d, offset %" PRId64 "\n",
> +                comp->length, comp->block_idx, comp->offset);
> +            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
> +            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
> +
> +            host_addr = block->local_host_addr +
> +                            (comp->offset - block->offset);
> +
> +            ram_handle_compressed(host_addr, comp->value, comp->length);
> +            break;
> +        case RDMA_CONTROL_REGISTER_FINISHED:
> +            DPRINTF("Current registrations complete.\n");
> +            goto out;
> +        case RDMA_CONTROL_REGISTER_REQUEST:
> +            DPRINTF("There are %d registration requests\n", head.repeat);
> +
> +            resp.repeat = head.repeat;
> +            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
> +
> +            for (count = 0; count < head.repeat; count++) {
> +                reg = &registers[count];
> +                reg_result = &results[count];
> +
> +                DPRINTF("Registration request (%d): %d"
> +                    " bytes, index %d, offset %" PRId64 "\n",
> +                    count, reg->len, reg->current_index, reg->offset);
> +
> +                block = &(rdma->local_ram_blocks.block[reg->current_index]);
> +                host_addr = (block->local_host_addr +
> +                            (reg->offset - block->offset));
> +                if (qemu_rdma_register_and_get_keys(rdma, block,
> +                            (uint8_t *)host_addr, NULL, &reg_result->rkey)) {
> +                    fprintf(stderr, "cannot get rkey!\n");
> +                    ret = -EINVAL;
> +                    goto out;
> +                }
> +
> +                DPRINTF("Registered rkey for this request: %x\n",
> +                                reg_result->rkey);
> +            }
> +
> +            ret = qemu_rdma_post_send_control(rdma,
> +                            (uint8_t *) results, &resp);
> +
> +            if (ret < 0) {
> +                fprintf(stderr, "Failed to send control buffer!\n");
> +                goto out;
> +            }
> +            break;
> +        case RDMA_CONTROL_REGISTER_RESULT:
> +            fprintf(stderr, "Invalid RESULT message at dest.\n");
> +            ret = -EIO;
> +            goto out;
> +        default:
> +            fprintf(stderr, "Unknown control message %s\n",
> +                                control_desc[head.type]);
> +            ret = -EIO;
> +            goto out;
> +        }
> +    } while (1);
> +out:
> +    if(ret < 0) {
> +        rdma->error_state = ret;
> +    }
> +    return ret;
> +}
> +
> +static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
> +                                        uint64_t flags)
> +{
> +    QEMUFileRDMA *rfile = opaque;
> +    RDMAContext *rdma = rfile->rdma;
> +
> +    CHECK_ERROR_STATE();
> +
> +    DPRINTF("start section: %" PRIu64 "\n", flags);
> +    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
> +    qemu_fflush(f);
> +    return 0;
> +}
> +
> +/*
> + * Inform dest that dynamic registrations are done for now.
> + * First, flush writes, if any.
> + */
> +static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
> +                                       uint64_t flags)
> +{
> +    QEMUFileRDMA *rfile = opaque;
> +    RDMAContext *rdma = rfile->rdma;
> +    RDMAControlHeader head = { .len = 0,
> +                               .type = RDMA_CONTROL_REGISTER_FINISHED,
> +                               .repeat = 1,
> +                             };
> +
> +    CHECK_ERROR_STATE();
> +
> +    qemu_fflush(f);
> +    int ret = qemu_rdma_drain_cq(f, rdma);
> +
> +    if (ret >= 0) {
> +        DPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
> +
> +        ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL);
> +    }
> +
> +    if (ret < 0) {
> +        rdma->error_state = ret;
> +    }
> +
> +    return ret;
> +}
> +
> +static int qemu_rdma_get_fd(void *opaque) {
> +    QEMUFileRDMA *rfile = opaque;
> +    RDMAContext *rdma = rfile->rdma;
> +
> +    return rdma->comp_channel->fd;
> +}
> +
> +const QEMUFileOps rdma_read_ops = {
> +    .get_buffer    = qemu_rdma_get_buffer,
> +    .get_fd        = qemu_rdma_get_fd,
> +    .close         = qemu_rdma_close,
> +    .hook_ram_load = qemu_rdma_registration_handle,
> +};
> +
> +const QEMUFileOps rdma_write_ops = {
> +    .put_buffer           = qemu_rdma_put_buffer,
> +    .close                = qemu_rdma_close,
> +    .before_ram_iterate   = qemu_rdma_registration_start,
> +    .after_ram_iterate    = qemu_rdma_registration_stop,
> +    .save_page            = qemu_rdma_save_page,
> +};
> +
> +static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
> +{
> +    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
> +
> +    if (qemu_file_mode_is_not_valid(mode)) {
> +        return NULL;
> +    }
> +
> +    r->rdma = rdma;
> +
> +    if (mode[0] == 'w') {
> +        r->file = qemu_fopen_ops(r, &rdma_write_ops);
> +    } else {
> +        r->file = qemu_fopen_ops(r, &rdma_read_ops);
> +    }
> +
> +    return r->file;
> +}
> +
> +static void rdma_accept_incoming_migration(void *opaque)
> +{
> +    RDMAContext *rdma = opaque;
> +    int ret;
> +    QEMUFile *f;
> +
> +    DPRINTF("Accepting rdma connection...\n");
> +    ret = qemu_rdma_accept(rdma);
> +
> +    if (ret) {
> +        fprintf(stderr, "RDMA Migration initialization failed!\n");
> +        goto err;
> +    }
> +
> +    DPRINTF("Accepted migration\n");
> +
> +    f = qemu_fopen_rdma(rdma, "rb");
> +    if (f == NULL) {
> +        fprintf(stderr, "could not qemu_fopen_rdma!\n");
> +        goto err;
> +    }
> +
> +    rdma->migration_started_on_destination = 1;
> +    process_incoming_migration(f);
> +    return;
> +
> +err:
> +    qemu_rdma_cleanup(rdma);
> +}
> +
> +void rdma_start_incoming_migration(const char *host_port, Error **errp)
> +{
> +    int ret;
> +    RDMAContext *rdma;
> +
> +    DPRINTF("Starting RDMA-based incoming migration\n");
> +    rdma = qemu_rdma_data_init(host_port);
> +    if (rdma == NULL) {
> +        goto err;
> +    }
> +
> +    ret = qemu_rdma_dest_init(rdma);
> +
> +    if (ret) {
> +        goto err;
> +    }
> +
> +    DPRINTF("qemu_rdma_dest_init success\n");
> +    ret = qemu_rdma_dest_prepare(rdma);
> +
> +    if (ret) {
> +        goto err;
> +    }
> +
> +    DPRINTF("qemu_rdma_dest_prepare success\n");
> +
> +    qemu_set_fd_handler2(rdma->channel->fd, NULL,
> +                         rdma_accept_incoming_migration, NULL,
> +                            (void *)(intptr_t) rdma);
> +    return;
> +err:
> +    error_setg(errp, "error connecting using rdma!\n");
> +
> +    g_free(rdma);
> +}
> +
> +void rdma_start_outgoing_migration(void *opaque,
> +                            const char *host_port, Error **errp)
> +{
> +    MigrationState *s = opaque;
> +    RDMAContext *rdma = qemu_rdma_data_init(host_port);
> +    int ret;
> +
> +    if (rdma == NULL) {
> +        goto err;
> +    }
> +
> +    ret = qemu_rdma_source_init(rdma, NULL,
> +        s->enabled_capabilities[MIGRATION_CAPABILITY_X_CHUNK_REGISTER_DESTINATION]);
> +
> +    if (ret) {
> +        goto err;
> +    }
> +
> +    DPRINTF("qemu_rdma_source_init success\n");
> +    ret = qemu_rdma_connect(rdma, NULL);
> +
> +    if (ret) {
> +        goto err;
> +    }
> +
> +    DPRINTF("qemu_rdma_source_connect success\n");
> +
> +    s->file = qemu_fopen_rdma(rdma, "wb");
> +    migrate_fd_connect(s);
> +    return;
> +err:
> +    g_free(rdma);
> +    migrate_fd_error(s);
> +    error_setg(errp, "Error connecting using rdma! %d\n", ret);
> +}
> diff --git a/migration.c b/migration.c
> index 5afd9b8..2f33914 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -78,6 +78,10 @@ void qemu_start_incoming_migration(const char *uri, Error **errp)
>  
>      if (strstart(uri, "tcp:", &p))
>          tcp_start_incoming_migration(p, errp);
> +#ifdef CONFIG_RDMA
> +    else if (strstart(uri, "x-rdma:", &p))
> +        rdma_start_incoming_migration(p, errp);
> +#endif
>  #if !defined(WIN32)
>      else if (strstart(uri, "exec:", &p))
>          exec_start_incoming_migration(p, errp);
> @@ -406,6 +410,10 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
>  
>      if (strstart(uri, "tcp:", &p)) {
>          tcp_start_outgoing_migration(s, p, &local_err);
> +#ifdef CONFIG_RDMA
> +    } else if (strstart(uri, "x-rdma:", &p)) {
> +        rdma_start_outgoing_migration(s, p, &local_err);
> +#endif
>  #if !defined(WIN32)
>      } else if (strstart(uri, "exec:", &p)) {
>          exec_start_outgoing_migration(s, p, &local_err);
>
Michael S. Tsirkin - April 18, 2013, 1:06 p.m.
On Thu, Apr 18, 2013 at 09:59:52AM -0400, Michael R. Hines wrote:
> On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
> >The following comment applies to all of this code:
> >
> >On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
> >>diff --git a/migration-rdma.c b/migration-rdma.c
> >>new file mode 100644
> >>index 0000000..1dff06f
> >>--- /dev/null
> >>+++ b/migration-rdma.c
> >>@@ -0,0 +1,2667 @@
> >>+/*
> >>+ *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
> >>+ *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
> >By the way do you both really hold copyrights on this code?
> >It's not assigned to IBM?
> >If the later it should say:
> >	Copyright IBM
> >	Author: Michael R. Hines
> >Or some such.
> 
> Acknowledged.
> 
> >>+ *
> >>+ *  RDMA protocol and interfaces
> >>+ *
> >>+ *  This program is free software; you can redistribute it and/or modify
> >>+ *  it under the terms of the GNU General Public License as published by
> >>+ *  the Free Software Foundation; under version 2 of the License.
> >
> >We prefer 2 or later for new code.
> 
> Doesn't it say version 2 already?

version 2 != 'version 2 or later'. See COPYING at the top level in qemu.

> >I'm guessing you need approval from Jiuxing Liu for this,
> >pls make him ack license change.
> 
> He approves. He doesn't work for IBM anymore.

Does he approve version 2 or version 2 or later?
Michael S. Tsirkin - April 18, 2013, 1:32 p.m.
On Thu, Apr 18, 2013 at 10:14:27AM -0400, Michael R. Hines wrote:
> On 04/18/2013 09:06 AM, Michael S. Tsirkin wrote:
> >On Thu, Apr 18, 2013 at 09:59:52AM -0400, Michael R. Hines wrote:
> >>On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
> >>>The following comment applies to all of this code:
> >>>
> >>>On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
> >>>>diff --git a/migration-rdma.c b/migration-rdma.c
> >>>>new file mode 100644
> >>>>index 0000000..1dff06f
> >>>>--- /dev/null
> >>>>+++ b/migration-rdma.c
> >>>>@@ -0,0 +1,2667 @@
> >>>>+/*
> >>>>+ *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
> >>>>+ *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
> >>>By the way do you both really hold copyrights on this code?
> >>>It's not assigned to IBM?
> >>>If the later it should say:
> >>>	Copyright IBM
> >>>	Author: Michael R. Hines
> >>>Or some such.
> >>Acknowledged.
> >>
> >>>>+ *
> >>>>+ *  RDMA protocol and interfaces
> >>>>+ *
> >>>>+ *  This program is free software; you can redistribute it and/or modify
> >>>>+ *  it under the terms of the GNU General Public License as published by
> >>>>+ *  the Free Software Foundation; under version 2 of the License.
> >>>We prefer 2 or later for new code.
> >>Doesn't it say version 2 already?
> >version 2 != 'version 2 or later'. See COPYING at the top level in qemu.
> 
> I'll duplicate the copyright used in Anthony's migration.c and
> tailor it to match.

Sigh.  Please don't copy migration.c, that includes a legacy header.
If you want to make things 2 or later, just take the
template from COPYING. There is a section 'How to Apply These Terms to
Your New Programs' there.  It tells you exactly what to put in the code.
It's 2013 so if you want to be hip you can replace the FSF snail mail
address with " if not, see http://www.gnu.org/licenses/.  "
which is a bit shorter.


> >>>I'm guessing you need approval from Jiuxing Liu for this,
> >>>pls make him ack license change.
> >>He approves. He doesn't work for IBM anymore.
> >Does he approve version 2 or version 2 or later?
> >
> 
> It doesn't matter - his contributions were made at IBM and he
> doesn't work for IBM anymore.

It's considered polite not to ignore author's wishes wrt licensing.
It's best to contact and get the ack if possible.  If not, please tell
the list and we'll consider the options.
Michael S. Tsirkin - April 18, 2013, 1:52 p.m.
On Thu, Apr 18, 2013 at 10:45:19AM -0400, Michael R. Hines wrote:
> On 04/18/2013 09:32 AM, Michael S. Tsirkin wrote:
> >On Thu, Apr 18, 2013 at 10:14:27AM -0400, Michael R. Hines wrote:
> >>On 04/18/2013 09:06 AM, Michael S. Tsirkin wrote:
> >>>On Thu, Apr 18, 2013 at 09:59:52AM -0400, Michael R. Hines wrote:
> >>>>On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
> >>>>>The following comment applies to all of this code:
> >>>>>
> >>>>>On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
> >>>>>>diff --git a/migration-rdma.c b/migration-rdma.c
> >>>>>>new file mode 100644
> >>>>>>index 0000000..1dff06f
> >>>>>>--- /dev/null
> >>>>>>+++ b/migration-rdma.c
> >>>>>>@@ -0,0 +1,2667 @@
> >>>>>>+/*
> >>>>>>+ *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
> >>>>>>+ *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
> >>>>>By the way do you both really hold copyrights on this code?
> >>>>>It's not assigned to IBM?
> >>>>>If the later it should say:
> >>>>>	Copyright IBM
> >>>>>	Author: Michael R. Hines
> >>>>>Or some such.
> >>>>Acknowledged.
> >>>>
> >>>>>>+ *
> >>>>>>+ *  RDMA protocol and interfaces
> >>>>>>+ *
> >>>>>>+ *  This program is free software; you can redistribute it and/or modify
> >>>>>>+ *  it under the terms of the GNU General Public License as published by
> >>>>>>+ *  the Free Software Foundation; under version 2 of the License.
> >>>>>We prefer 2 or later for new code.
> >>>>Doesn't it say version 2 already?
> >>>version 2 != 'version 2 or later'. See COPYING at the top level in qemu.
> >>I'll duplicate the copyright used in Anthony's migration.c and
> >>tailor it to match.
> >Sigh.  Please don't copy migration.c, that includes a legacy header.
> >If you want to make things 2 or later, just take the
> >template from COPYING. There is a section 'How to Apply These Terms to
> >Your New Programs' there.  It tells you exactly what to put in the code.
> >It's 2013 so if you want to be hip you can replace the FSF snail mail
> >address with " if not, see http://www.gnu.org/licenses/.  "
> >which is a bit shorter.
> 
> OK, I read the file, please review this version,
> and I will paste into migration-rdma.c:

Sure, that's what COPYING says. So yea, just get ack from
Jiuxing Liu.

> /*
>  *    RDMA Migration Protocol and Interfaces
>  *
>  *    Copyright (C) 2013  Michael R. Hines <mrhines@us.ibm.com>
>  *    Copyright (C) 2010  Jiuxing Liu <jl@us.ibm.com>

Except you said you will change this to Copyright IBM + Author.

>  *   This program is free software; you can redistribute it and/or modify
>  *   it under the terms of the GNU General Public License as published by
>  *   the Free Software Foundation; either version 2 of the License, or
>  *   (at your option) any later version.
> 
>  *   This program is distributed in the hope that it will be useful,
>  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
>  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>  *   GNU General Public License for more details.
>  *
>  *
>  *   You should have received a copy of the GNU General Public
> License along
>  *   with this program; if not, write to the Free Software
> Foundation, Inc.,
>  *   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
>  */
> 
> 
> >
> >>>>>I'm guessing you need approval from Jiuxing Liu for this,
> >>>>>pls make him ack license change.
> >>>>He approves. He doesn't work for IBM anymore.
> >>>Does he approve version 2 or version 2 or later?
> >>>
> >>It doesn't matter - his contributions were made at IBM and he
> >>doesn't work for IBM anymore.
> >It's considered polite not to ignore author's wishes wrt licensing.
> >It's best to contact and get the ack if possible.  If not, please tell
> >the list and we'll consider the options.
> >
> >
mrhines@linux.vnet.ibm.com - April 18, 2013, 1:54 p.m.
On 04/18/2013 04:44 AM, Orit Wasserman wrote:
> Hi Michael,
>
> I don't see you addressed any of the comment I had in v3
> (especially the error handling)
>
> please, fix those

I did fix them, but not all of your comments were correct,
because I was passing errp in many places were errp
did not supposed to belong there.

So, first I *removed* errp from being proliferated all over
the entire file, which was not necessary.

Then, in the places where errp is required by migration.c,
I added new uses of errp.

There are some places, as Paolo mentioned where errp was
written twice, which I must fix, however.
mrhines@linux.vnet.ibm.com - April 18, 2013, 1:57 p.m.
On 04/18/2013 03:55 AM, Paolo Bonzini wrote:
> The reason is that when using a management layer stderr will be logged
> but not printed to the operator's console.  Instead, Errors are sent on
> the monitor connection.  The management layer(s) then pick up the error
> and propagate it through the various APIs until they appear in the
> terminal/browser/whatever.

The incoming migration is done through "qemu_set_fd_handler2",
which does not pass an error pointer, after 
"rdma_accept_incoming_migration()".

How exactly would you propagate destination errors through the
monitor without an error pointer?
mrhines@linux.vnet.ibm.com - April 18, 2013, 1:59 p.m.
On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
> The following comment applies to all of this code:
>
> On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
>> diff --git a/migration-rdma.c b/migration-rdma.c
>> new file mode 100644
>> index 0000000..1dff06f
>> --- /dev/null
>> +++ b/migration-rdma.c
>> @@ -0,0 +1,2667 @@
>> +/*
>> + *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
>> + *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
> By the way do you both really hold copyrights on this code?
> It's not assigned to IBM?
> If the later it should say:
> 	Copyright IBM
> 	Author: Michael R. Hines
> Or some such.

Acknowledged.

>> + *
>> + *  RDMA protocol and interfaces
>> + *
>> + *  This program is free software; you can redistribute it and/or modify
>> + *  it under the terms of the GNU General Public License as published by
>> + *  the Free Software Foundation; under version 2 of the License.
>
> We prefer 2 or later for new code.

Doesn't it say version 2 already?

> I'm guessing you need approval from Jiuxing Liu for this,
> pls make him ack license change.

He approves. He doesn't work for IBM anymore.
mrhines@linux.vnet.ibm.com - April 18, 2013, 2:14 p.m.
On 04/18/2013 09:06 AM, Michael S. Tsirkin wrote:
> On Thu, Apr 18, 2013 at 09:59:52AM -0400, Michael R. Hines wrote:
>> On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
>>> The following comment applies to all of this code:
>>>
>>> On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
>>>> diff --git a/migration-rdma.c b/migration-rdma.c
>>>> new file mode 100644
>>>> index 0000000..1dff06f
>>>> --- /dev/null
>>>> +++ b/migration-rdma.c
>>>> @@ -0,0 +1,2667 @@
>>>> +/*
>>>> + *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
>>>> + *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
>>> By the way do you both really hold copyrights on this code?
>>> It's not assigned to IBM?
>>> If the later it should say:
>>> 	Copyright IBM
>>> 	Author: Michael R. Hines
>>> Or some such.
>> Acknowledged.
>>
>>>> + *
>>>> + *  RDMA protocol and interfaces
>>>> + *
>>>> + *  This program is free software; you can redistribute it and/or modify
>>>> + *  it under the terms of the GNU General Public License as published by
>>>> + *  the Free Software Foundation; under version 2 of the License.
>>> We prefer 2 or later for new code.
>> Doesn't it say version 2 already?
> version 2 != 'version 2 or later'. See COPYING at the top level in qemu.

I'll duplicate the copyright used in Anthony's migration.c and tailor it 
to match.

>>> I'm guessing you need approval from Jiuxing Liu for this,
>>> pls make him ack license change.
>> He approves. He doesn't work for IBM anymore.
> Does he approve version 2 or version 2 or later?
>

It doesn't matter - his contributions were made at IBM and he doesn't 
work for IBM anymore.
mrhines@linux.vnet.ibm.com - April 18, 2013, 2:45 p.m.
On 04/18/2013 09:32 AM, Michael S. Tsirkin wrote:
> On Thu, Apr 18, 2013 at 10:14:27AM -0400, Michael R. Hines wrote:
>> On 04/18/2013 09:06 AM, Michael S. Tsirkin wrote:
>>> On Thu, Apr 18, 2013 at 09:59:52AM -0400, Michael R. Hines wrote:
>>>> On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
>>>>> The following comment applies to all of this code:
>>>>>
>>>>> On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
>>>>>> diff --git a/migration-rdma.c b/migration-rdma.c
>>>>>> new file mode 100644
>>>>>> index 0000000..1dff06f
>>>>>> --- /dev/null
>>>>>> +++ b/migration-rdma.c
>>>>>> @@ -0,0 +1,2667 @@
>>>>>> +/*
>>>>>> + *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
>>>>>> + *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
>>>>> By the way do you both really hold copyrights on this code?
>>>>> It's not assigned to IBM?
>>>>> If the later it should say:
>>>>> 	Copyright IBM
>>>>> 	Author: Michael R. Hines
>>>>> Or some such.
>>>> Acknowledged.
>>>>
>>>>>> + *
>>>>>> + *  RDMA protocol and interfaces
>>>>>> + *
>>>>>> + *  This program is free software; you can redistribute it and/or modify
>>>>>> + *  it under the terms of the GNU General Public License as published by
>>>>>> + *  the Free Software Foundation; under version 2 of the License.
>>>>> We prefer 2 or later for new code.
>>>> Doesn't it say version 2 already?
>>> version 2 != 'version 2 or later'. See COPYING at the top level in qemu.
>> I'll duplicate the copyright used in Anthony's migration.c and
>> tailor it to match.
> Sigh.  Please don't copy migration.c, that includes a legacy header.
> If you want to make things 2 or later, just take the
> template from COPYING. There is a section 'How to Apply These Terms to
> Your New Programs' there.  It tells you exactly what to put in the code.
> It's 2013 so if you want to be hip you can replace the FSF snail mail
> address with " if not, see http://www.gnu.org/licenses/.  "
> which is a bit shorter.

OK, I read the file, please review this version,
and I will paste into migration-rdma.c:

/*
  *    RDMA Migration Protocol and Interfaces
  *
  *    Copyright (C) 2013  Michael R. Hines <mrhines@us.ibm.com>
  *    Copyright (C) 2010  Jiuxing Liu <jl@us.ibm.com>
  *
  *   This program is free software; you can redistribute it and/or modify
  *   it under the terms of the GNU General Public License as published by
  *   the Free Software Foundation; either version 2 of the License, or
  *   (at your option) any later version.

  *   This program is distributed in the hope that it will be useful,
  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *   GNU General Public License for more details.
  *
  *
  *   You should have received a copy of the GNU General Public License 
along
  *   with this program; if not, write to the Free Software Foundation, 
Inc.,
  *   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */


>
>>>>> I'm guessing you need approval from Jiuxing Liu for this,
>>>>> pls make him ack license change.
>>>> He approves. He doesn't work for IBM anymore.
>>> Does he approve version 2 or version 2 or later?
>>>
>> It doesn't matter - his contributions were made at IBM and he
>> doesn't work for IBM anymore.
> It's considered polite not to ignore author's wishes wrt licensing.
> It's best to contact and get the ack if possible.  If not, please tell
> the list and we'll consider the options.
>
>
Paolo Bonzini - April 18, 2013, 2:53 p.m.
> > >>>I'm guessing you need approval from Jiuxing Liu for this,
> > >>>pls make him ack license change.
> > >>He approves. He doesn't work for IBM anymore.
> > >Does he approve version 2 or version 2 or later?
> > >
> > 
> > It doesn't matter - his contributions were made at IBM and he
> > doesn't work for IBM anymore.
> 
> It's considered polite not to ignore author's wishes wrt licensing.
> It's best to contact and get the ack if possible.  If not, please tell
> the list and we'll consider the options.

There are three cases:

- relicensing to something less restrictive, employer agrees on more 
liberal license, employee doesn't.  The employer's choice wins.  There
are certainly RH employees that would prefer v2 and no later version,
but we are still releasing all RH changes as GPLv2+.

- relicensing to something less restrictive, no employer (or employer
lets the employee choose the license).  Here you *must* contact the
employee.

- relicensing to something compatible but more restrictive (e.g.
BSD -> GPL).  No legal obligation to contact the author (his contributions
would still be available on the older license, all you get by contacting
the author is that you can remove the BSD terms from the file), but it
is indeed more polite to do so.


I'm pretty sure that IBM cares about licensing, but I don't know what
their global policy is.  Surely they didn't do a full grant of their
contributions to GPLv2+ (which RH and a bunch of other people did),
but if _new_ IBM contributions are to be GPLv2+, there is no need to
contact Jiuxing Liu.  Anthony?

Paolo
Anthony Liguori - April 18, 2013, 3:14 p.m.
"Michael S. Tsirkin" <mst@redhat.com> writes:

> On Thu, Apr 18, 2013 at 10:45:19AM -0400, Michael R. Hines wrote:
>> On 04/18/2013 09:32 AM, Michael S. Tsirkin wrote:
>> >On Thu, Apr 18, 2013 at 10:14:27AM -0400, Michael R. Hines wrote:
>> >>On 04/18/2013 09:06 AM, Michael S. Tsirkin wrote:
>> >>>On Thu, Apr 18, 2013 at 09:59:52AM -0400, Michael R. Hines wrote:
>> >>>>On 04/18/2013 03:58 AM, Michael S. Tsirkin wrote:
>> >>>>>The following comment applies to all of this code:
>> >>>>>
>> >>>>>On Wed, Apr 17, 2013 at 07:07:17PM -0400, mrhines@linux.vnet.ibm.com wrote:
>> >>>>>>diff --git a/migration-rdma.c b/migration-rdma.c
>> >>>>>>new file mode 100644
>> >>>>>>index 0000000..1dff06f
>> >>>>>>--- /dev/null
>> >>>>>>+++ b/migration-rdma.c
>> >>>>>>@@ -0,0 +1,2667 @@
>> >>>>>>+/*
>> >>>>>>+ *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
>> >>>>>>+ *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
>> >>>>>By the way do you both really hold copyrights on this code?
>> >>>>>It's not assigned to IBM?
>> >>>>>If the later it should say:
>> >>>>>	Copyright IBM
>> >>>>>	Author: Michael R. Hines
>> >>>>>Or some such.
>> >>>>Acknowledged.
>> >>>>
>> >>>>>>+ *
>> >>>>>>+ *  RDMA protocol and interfaces
>> >>>>>>+ *
>> >>>>>>+ *  This program is free software; you can redistribute it and/or modify
>> >>>>>>+ *  it under the terms of the GNU General Public License as published by
>> >>>>>>+ *  the Free Software Foundation; under version 2 of the License.
>> >>>>>We prefer 2 or later for new code.
>> >>>>Doesn't it say version 2 already?
>> >>>version 2 != 'version 2 or later'. See COPYING at the top level in qemu.
>> >>I'll duplicate the copyright used in Anthony's migration.c and
>> >>tailor it to match.
>> >Sigh.  Please don't copy migration.c, that includes a legacy header.
>> >If you want to make things 2 or later, just take the
>> >template from COPYING. There is a section 'How to Apply These Terms to
>> >Your New Programs' there.  It tells you exactly what to put in the code.
>> >It's 2013 so if you want to be hip you can replace the FSF snail mail
>> >address with " if not, see http://www.gnu.org/licenses/.  "
>> >which is a bit shorter.
>> 
>> OK, I read the file, please review this version,
>> and I will paste into migration-rdma.c:
>
> Sure, that's what COPYING says. So yea, just get ack from
> Jiuxing Liu.

Michael, please ping me on SameTime and we'll discuss how to license
this code.

FYI, for the other folks on CC, we have very clear rules at IBM about
how we do this and none of what's been posted so far follows it :-)

Regards,

Anthony Liguori
Orit Wasserman - April 18, 2013, 3:51 p.m.
On 04/18/2013 04:54 PM, Michael R. Hines wrote:
> On 04/18/2013 04:44 AM, Orit Wasserman wrote:
>> Hi Michael,
>>
>> I don't see you addressed any of the comment I had in v3
>> (especially the error handling)
>>
>> please, fix those
> 
> I did fix them, but not all of your comments were correct,
> because I was passing errp in many places were errp
> did not supposed to belong there.

If you decide not to fix some comment you just need
to reply to the comment and explain the reason,
this makes the reviewing process easier.

> 
> So, first I *removed* errp from being proliferated all over
> the entire file, which was not necessary.
> 
> Then, in the places where errp is required by migration.c,
> I added new uses of errp.

That is fine as long as external rdma API use errp and the 
internal always return some error code.

Also I noticed that the errors are very general for example
in qemu_rdma_connect we set the same error always it will be more
helpful to have different errors for each case. We need
errors that can help the user to understand what went wrong.

Orit
> 
> There are some places, as Paolo mentioned where errp was
> written twice, which I must fix, however.
> 

>
mrhines@linux.vnet.ibm.com - April 18, 2013, 7:15 p.m.
On 04/18/2013 10:53 AM, Paolo Bonzini wrote:
>>>>>> I'm guessing you need approval from Jiuxing Liu for this,
>>>>>> pls make him ack license change.
>>>>> He approves. He doesn't work for IBM anymore.
>>>> Does he approve version 2 or version 2 or later?
>>>>
>>> It doesn't matter - his contributions were made at IBM and he
>>> doesn't work for IBM anymore.
>> It's considered polite not to ignore author's wishes wrt licensing.
>> It's best to contact and get the ack if possible.  If not, please tell
>> the list and we'll consider the options.
> There are three cases:
>
> - relicensing to something less restrictive, employer agrees on more
> liberal license, employee doesn't.  The employer's choice wins.  There
> are certainly RH employees that would prefer v2 and no later version,
> but we are still releasing all RH changes as GPLv2+.
>
> - relicensing to something less restrictive, no employer (or employer
> lets the employee choose the license).  Here you *must* contact the
> employee.
>
> - relicensing to something compatible but more restrictive (e.g.
> BSD -> GPL).  No legal obligation to contact the author (his contributions
> would still be available on the older license, all you get by contacting
> the author is that you can remove the BSD terms from the file), but it
> is indeed more polite to do so.
>
>
> I'm pretty sure that IBM cares about licensing, but I don't know what
> their global policy is.  Surely they didn't do a full grant of their
> contributions to GPLv2+ (which RH and a bunch of other people did),
> but if _new_ IBM contributions are to be GPLv2+, there is no need to
> contact Jiuxing Liu.  Anthony?
>
> Paolo
>

Let me sync up with Anthony and we'll get back to everybody.

- Michael
mrhines@linux.vnet.ibm.com - April 18, 2013, 7:41 p.m.
On 04/18/2013 11:51 AM, Orit Wasserman wrote:
> On 04/18/2013 04:54 PM, Michael R. Hines wrote:
>> On 04/18/2013 04:44 AM, Orit Wasserman wrote:
>>> Hi Michael,
>>>
>>> I don't see you addressed any of the comment I had in v3
>>> (especially the error handling)
>>>
>>> please, fix those
>> I did fix them, but not all of your comments were correct,
>> because I was passing errp in many places were errp
>> did not supposed to belong there.
> If you decide not to fix some comment you just need
> to reply to the comment and explain the reason,
> this makes the reviewing process easier.
>

Ok, so here's a more specific question while I have your ears,
that is a problem I'm having with error_setg():

Currently, the migration works by calling "qemu_set_handler_fd2()",
which comes along and calls "accept_incoming_migration" on the
destination side.

The problem with this from an error-handling perspective is that
there is no global datastructure on the destination side, similar to
"MigrationState".

Thus, there is nowhere to propagate errors to, even if we want to
do so - and that's why I deleted most of the error_setg() comments
that you had because they were mostly focus on delivering an error
without having any part of the call stack to actually *process* the error.

So, without overly-complicating error propagation on the destination side,
what solution would you recommend for both TCP and RDMA?



>> So, first I *removed* errp from being proliferated all over
>> the entire file, which was not necessary.
>>
>> Then, in the places where errp is required by migration.c,
>> I added new uses of errp.
> That is fine as long as external rdma API use errp and the
> internal always return some error code.
>
> Also I noticed that the errors are very general for example
> in qemu_rdma_connect we set the same error always it will be more
> helpful to have different errors for each case. We need
> errors that can help the user to understand what went wrong.
>
> Orit
>> There are some places, as Paolo mentioned where errp was
>> written twice, which I must fix, however.
>>
Eric Blake - April 18, 2013, 10:12 p.m.
On 04/17/2013 05:07 PM, mrhines@linux.vnet.ibm.com wrote:
> From: "Michael R. Hines" <mrhines@us.ibm.com>
> 
> Code that does need to be visible is kept
> well contained inside this file and this is the only
> new additional file to the entire patch - good
> progress.
> 
> This file includes the entire protocol and interfaces
> required to perform RDMA migration.
> 
> Also, the configure and Makefile modifications to link
> this file are included.
> 
> Full documentation is in docs/rdma.txt

Which doesn't appear until later in the series?  If a git bisect lands
on this patch, I have to go out of my way to find the later commit that
adds the docs.

I personally like series that put the docs FIRST.  On initial review,
that gives the docs a chance for a clean-room review untainted by the
implementation choices; and leaves the docs fresh in reviewers' minds
during the rest of the series to validate that the implementation
matches docs.  On later review (such as git bisect landing here), it
means the docs are in-tree for any other commit that references them.
If it were me, I'd rebase things to put docs in patch 1 on the v5 series.
mrhines@linux.vnet.ibm.com - April 19, 2013, 12:35 a.m.
On 04/18/2013 06:12 PM, Eric Blake wrote:
> On 04/17/2013 05:07 PM, mrhines@linux.vnet.ibm.com wrote:
>> From: "Michael R. Hines" <mrhines@us.ibm.com>
>>
>> Code that does need to be visible is kept
>> well contained inside this file and this is the only
>> new additional file to the entire patch - good
>> progress.
>>
>> This file includes the entire protocol and interfaces
>> required to perform RDMA migration.
>>
>> Also, the configure and Makefile modifications to link
>> this file are included.
>>
>> Full documentation is in docs/rdma.txt
> Which doesn't appear until later in the series?  If a git bisect lands
> on this patch, I have to go out of my way to find the later commit that
> adds the docs.
>
> I personally like series that put the docs FIRST.  On initial review,
> that gives the docs a chance for a clean-room review untainted by the
> implementation choices; and leaves the docs fresh in reviewers' minds
> during the rest of the series to validate that the implementation
> matches docs.  On later review (such as git bisect landing here), it
> means the docs are in-tree for any other commit that references them.
> If it were me, I'd rebase things to put docs in patch 1 on the v5 series.
>

No problem. Will do.
Anthony Liguori - April 19, 2013, 12:35 a.m.
Paolo Bonzini <pbonzini@redhat.com> writes:

>> > >>>I'm guessing you need approval from Jiuxing Liu for this,
>> > >>>pls make him ack license change.
>> > >>He approves. He doesn't work for IBM anymore.
>> > >Does he approve version 2 or version 2 or later?
>> > >
>> > 
>> > It doesn't matter - his contributions were made at IBM and he
>> > doesn't work for IBM anymore.
>> 
>> It's considered polite not to ignore author's wishes wrt licensing.
>> It's best to contact and get the ack if possible.  If not, please tell
>> the list and we'll consider the options.
>
> There are three cases:
>
> - relicensing to something less restrictive, employer agrees on more 
> liberal license, employee doesn't.  The employer's choice wins.  There
> are certainly RH employees that would prefer v2 and no later version,
> but we are still releasing all RH changes as GPLv2+.
>
> - relicensing to something less restrictive, no employer (or employer
> lets the employee choose the license).  Here you *must* contact the
> employee.
>
> - relicensing to something compatible but more restrictive (e.g.
> BSD -> GPL).  No legal obligation to contact the author (his contributions
> would still be available on the older license, all you get by contacting
> the author is that you can remove the BSD terms from the file), but it
> is indeed more polite to do so.
>
>
> I'm pretty sure that IBM cares about licensing, but I don't know what
> their global policy is.  Surely they didn't do a full grant of their
> contributions to GPLv2+ (which RH and a bunch of other people did),
> but if _new_ IBM contributions are to be GPLv2+, there is no need to
> contact Jiuxing Liu.  Anthony?

Guys, we'll fixup the copyright in the next round of patches.  I can't
talk publicly about our internal policies around copyright assignment
and licensing or else I would.

Just ignore the copyrights in this series as they are bogus for all
intents and purposes.

Regards,

Anthony Liguori

>
> Paolo

Patch

diff --git a/Makefile.objs b/Makefile.objs
index a473348..d744827 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -49,6 +49,7 @@  common-obj-$(CONFIG_POSIX) += os-posix.o
 common-obj-$(CONFIG_LINUX) += fsdev/
 
 common-obj-y += migration.o migration-tcp.o
+common-obj-$(CONFIG_RDMA) += migration-rdma.o
 common-obj-y += qemu-char.o #aio.o
 common-obj-y += block-migration.o
 common-obj-y += page_cache.o xbzrle.o
diff --git a/configure b/configure
index 4c4f6f6..9decae2 100755
--- a/configure
+++ b/configure
@@ -180,6 +180,7 @@  xfs=""
 
 vhost_net="no"
 kvm="no"
+rdma="yes"
 gprof="no"
 debug_tcg="no"
 debug="no"
@@ -925,6 +926,10 @@  for opt do
   ;;
   --enable-gtk) gtk="yes"
   ;;
+  --enable-rdma) rdma="yes"
+  ;;
+  --disable-rdma) rdma="no"
+  ;;
   --with-gtkabi=*) gtkabi="$optarg"
   ;;
   --enable-tpm) tpm="yes"
@@ -1133,6 +1138,8 @@  echo "  --enable-bluez           enable bluez stack connectivity"
 echo "  --disable-slirp          disable SLIRP userspace network connectivity"
 echo "  --disable-kvm            disable KVM acceleration support"
 echo "  --enable-kvm             enable KVM acceleration support"
+echo "  --disable-rdma           disable RDMA-based migration support"
+echo "  --enable-rdma            enable RDMA-based migration support"
 echo "  --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
 echo "  --disable-nptl           disable usermode NPTL support"
 echo "  --enable-nptl            enable usermode NPTL support"
@@ -1782,6 +1789,23 @@  EOF
   libs_softmmu="$sdl_libs $libs_softmmu"
 fi
 
+if test "$rdma" != "no" ; then
+  cat > $TMPC <<EOF
+#include <rdma/rdma_cma.h>
+int main(void) { return 0; }
+EOF
+  rdma_libs="-lrdmacm -libverbs"
+  if compile_prog "-Werror" "$rdma_libs" ; then
+    rdma="yes"
+    libs_softmmu="$libs_softmmu $rdma_libs"
+  else
+    if test "$rdma" = "yes" ; then
+      feature_not_found "rdma"
+    fi
+    rdma="no"
+  fi
+fi
+
 ##########################################
 # VNC TLS/WS detection
 if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
@@ -3524,6 +3548,7 @@  echo "Linux AIO support $linux_aio"
 echo "ATTR/XATTR support $attr"
 echo "Install blobs     $blobs"
 echo "KVM support       $kvm"
+echo "RDMA support      $rdma"
 echo "TCG interpreter   $tcg_interpreter"
 echo "fdt support       $fdt"
 echo "preadv support    $preadv"
@@ -4510,6 +4535,10 @@  if [ "$pixman" = "internal" ]; then
   echo "config-host.h: subdir-pixman" >> $config_host_mak
 fi
 
+if test "$rdma" = "yes" ; then
+echo "CONFIG_RDMA=y" >> $config_host_mak
+fi
+
 # build tree in object directory in case the source is not in the current directory
 DIRS="tests tests/tcg tests/tcg/cris tests/tcg/lm32"
 DIRS="$DIRS pc-bios/optionrom pc-bios/spapr-rtas"
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 8e02391..720e0a5 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -76,6 +76,10 @@  void fd_start_incoming_migration(const char *path, Error **errp);
 
 void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp);
 
+void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp);
+
+void rdma_start_incoming_migration(const char *host_port, Error **errp);
+
 void migrate_fd_error(MigrationState *s);
 
 void migrate_fd_connect(MigrationState *s);
diff --git a/migration-rdma.c b/migration-rdma.c
new file mode 100644
index 0000000..1dff06f
--- /dev/null
+++ b/migration-rdma.c
@@ -0,0 +1,2667 @@ 
+/*
+ *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
+ *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
+ *
+ *  RDMA protocol and interfaces
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; under version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+#include "qemu-common.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "exec/cpu-common.h"
+#include "qemu/main-loop.h"
+#include "qemu/sockets.h"
+#include "block/coroutine.h"
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <string.h>
+#include <poll.h>
+#include <rdma/rdma_cma.h>
+
+//#define DEBUG_RDMA
+//#define DEBUG_RDMA_VERBOSE
+
+#ifdef DEBUG_RDMA
+#define DPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#ifdef DEBUG_RDMA_VERBOSE
+#define DDPRINTF(fmt, ...) \
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define DDPRINTF(fmt, ...) \
+    do { } while (0)
+#endif
+
+#define RDMA_RESOLVE_TIMEOUT_MS 10000
+
+/* Do not merge data if larger than this. */
+#define RDMA_MERGE_MAX (4 * 1024 * 1024)
+#define RDMA_UNSIGNALED_SEND_MAX 64
+
+#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
+
+/*
+ * Debugging only. Hard-coded only
+ */
+//#define RDMA_REG_CHUNK_SHIFT 21 /* 2 MB */
+//#define RDMA_REG_CHUNK_SHIFT 22 /* 4 MB */
+//#define RDMA_REG_CHUNK_SHIFT 23 /* 8 MB */
+//#define RDMA_REG_CHUNK_SHIFT 24 /* 16 MB */
+//#define RDMA_REG_CHUNK_SHIFT 25 /* 32 MB */
+//#define RDMA_REG_CHUNK_SHIFT 26 /* 64 MB */
+//#define RDMA_REG_CHUNK_SHIFT 27 /* 128 MB */
+//#define RDMA_REG_CHUNK_SHIFT 28 /* 256 MB */
+
+#define RDMA_REG_CHUNK_SIZE (1UL << (RDMA_REG_CHUNK_SHIFT))
+
+/*
+ * This is only for non-live state being migrated.
+ * Instead of RDMA_WRITE messages, we use RDMA_SEND
+ * messages for that state, which requires a different
+ * delivery design than main memory.
+ */
+#define RDMA_SEND_INCREMENT 32768
+
+/*
+ * Completion queue can be filled by both read and write work requests,
+ * so must reflect the sum of both possible queue sizes.
+ */
+#define RDMA_QP_SIZE 1000
+#define RDMA_CQ_SIZE (RDMA_QP_SIZE * 3)
+
+/*
+ * Maximum size infiniband SEND message
+ */
+#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
+#define RDMA_CONTROL_MAX_WR 2
+#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
+
+/*
+ * Capabilities for negotiation.
+ */
+#define RDMA_CAPABILITY_CHUNK_REGISTER 0x01
+
+/*
+ * Add the other flags above to this list of known capabilities
+ * as they are introduced.
+ */
+static uint32_t known_capabilities = RDMA_CAPABILITY_CHUNK_REGISTER;
+
+#define CHECK_ERROR_STATE() \
+    do { \
+        if (rdma->error_state) { \
+            fprintf(stderr, "RDMA is in an error state waiting migration" \
+                            " to abort!\n"); \
+            return rdma->error_state; \
+        } \
+    } while(0);
+/*
+ * RDMA migration protocol:
+ * 1. RDMA Writes (data messages, i.e. RAM)
+ * 2. IB Send/Recv (control channel messages)
+ */
+enum {
+    RDMA_WRID_NONE = 0,
+    RDMA_WRID_RDMA_WRITE,
+    RDMA_WRID_SEND_CONTROL = 1000,
+    RDMA_WRID_RECV_CONTROL = 2000,
+};
+
+const char *wrid_desc[] = {
+        [RDMA_WRID_NONE] = "NONE",
+        [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
+        [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
+        [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
+};
+
+/*
+ * SEND/RECV IB Control Messages.
+ */
+enum {
+    RDMA_CONTROL_NONE = 0,
+    RDMA_CONTROL_ERROR,
+    RDMA_CONTROL_READY,             /* ready to receive */
+    RDMA_CONTROL_QEMU_FILE,         /* QEMUFile-transmitted bytes */
+    RDMA_CONTROL_RAM_BLOCKS,        /* RAMBlock synchronization */
+    RDMA_CONTROL_COMPRESS,          /* page contains repeat values */
+    RDMA_CONTROL_REGISTER_REQUEST,  /* dynamic page registration */
+    RDMA_CONTROL_REGISTER_RESULT,   /* key to use after registration */
+    RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
+};
+
+const char *control_desc[] = {
+        [RDMA_CONTROL_NONE] = "NONE",
+        [RDMA_CONTROL_ERROR] = "ERROR",
+        [RDMA_CONTROL_READY] = "READY",
+        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+        [RDMA_CONTROL_RAM_BLOCKS] = "REMOTE INFO",
+        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+};
+
+/*
+ * Memory and MR structures used to represent an IB Send/Recv work request.
+ * This is *not* used for RDMA, only IB Send/Recv.
+ */
+typedef struct {
+    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
+    struct   ibv_mr *control_mr;               /* registration metadata */
+    size_t   control_len;                      /* length of the message */
+    uint8_t *control_curr;                     /* start of unconsumed bytes */
+} RDMAWorkRequestData;
+
+/*
+ * Negotiate RDMA capabilities during connection-setup time.
+ */
+typedef struct {
+    uint32_t version;
+    uint32_t flags;
+} RDMACapabilities;
+
+static void caps_to_network(RDMACapabilities *cap)
+{
+    cap->version = htonl(cap->version);
+    cap->flags = htonl(cap->flags);
+}
+
+static void network_to_caps(RDMACapabilities *cap)
+{
+    cap->version = ntohl(cap->version);
+    cap->flags = ntohl(cap->flags);
+}
+
+/*
+ * Representation of a RAMBlock from an RDMA perspective.
+ * This is not transmitted, only local.
+ * This and subsequent structures cannot be linked lists
+ * because we're using a single IB message to transmit
+ * the information. It's small anyway, so a list is overkill.
+ */
+typedef struct RDMALocalBlock {
+    uint8_t  *local_host_addr; /* local virtual address */
+    uint64_t remote_host_addr; /* remote virtual address */
+    uint64_t offset;
+    uint64_t length;
+    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
+    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
+    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
+    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
+} RDMALocalBlock;
+
+/*
+ * Also represents a RAMblock, but only on the dest.
+ * This gets transmitted by the dest during connection-time
+ * to the source / primary VM and then is used to populate the
+ * corresponding RDMALocalBlock with
+ * the information needed to perform the actual RDMA.
+ */
+typedef struct QEMU_PACKED RDMARemoteBlock {
+    uint64_t remote_host_addr;
+    uint64_t offset;
+    uint64_t length;
+    uint32_t remote_rkey;
+    uint32_t padding;
+} QEMU_PACKED RDMARemoteBlock;
+
+/*
+ * Virtual address of the above structures used for transmitting
+ * the RAMBlock descriptions at connection-time.
+ * This structure is *not* transmitted.
+ */
+typedef struct RDMALocalBlocks {
+    int num_blocks;
+    RDMALocalBlock *block;
+} RDMALocalBlocks;
+
+/*
+ * Same as above
+ */
+typedef struct RDMARemoteBlocks {
+    RDMARemoteBlock *block;
+    void *remote_area;
+} RDMARemoteBlocks;
+
+/*
+ * Main data structure for RDMA state.
+ * While there is only one copy of this structure being allocated right now,
+ * this is the place where one would start if you wanted to consider
+ * having more than one RDMA connection open at the same time.
+ */
+typedef struct RDMAContext {
+    char *host;
+    int port;
+
+    /* This is used by the migration protocol to transmit
+     * control messages (such as device state and registration commands)
+     *
+     * WR #0 is for control channel ready messages from the destination.
+     * WR #1 is for control channel data messages from the destination.
+     * WR #2 is for control channel send messages.
+     *
+     * We could use more WRs, but we have enough for now.
+     */
+    RDMAWorkRequestData wr_data[RDMA_CONTROL_MAX_WR + 1];
+
+    /*
+     * This is used by *_exchange_send() to figure out whether or not
+     * the initial "READY" message has already been received or not.
+     * This is because other functions may potentially poll() and detect
+     * the READY message before send() does, in which case we need to
+     * know if it completed.
+     */
+    int control_ready_expected;
+
+    /* number of outstanding unsignaled send */
+    int num_unsignaled_send;
+
+    /* number of outstanding signaled send */
+    int num_signaled_send;
+
+    /* store info about current buffer so that we can
+       merge it with future sends */
+    uint64_t current_offset;
+    uint64_t current_length;
+    /* index of ram block the current buffer belongs to */
+    int current_index;
+    /* index of the chunk in the current ram block */
+    int current_chunk;
+
+    bool chunk_register_destination;
+
+    /*
+     * infiniband-specific variables for opening the device
+     * and maintaining connection state and so forth.
+     *
+     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
+     * cm_id->verbs, cm_id->channel, and cm_id->qp.
+     */
+    struct rdma_cm_id *cm_id;               /* connection manager ID */
+    struct rdma_cm_id *listen_id;
+
+    struct ibv_context *verbs;
+    struct rdma_event_channel *channel;
+    struct ibv_qp *qp;                      /* queue pair */
+    struct ibv_comp_channel *comp_channel;  /* completion channel */
+    struct ibv_pd *pd;                      /* protection domain */
+    struct ibv_cq *cq;                      /* completion queue */
+
+    /*
+     * If a previous write failed (perhaps because of a failed
+     * memory registration, then do not attempt any future work
+     * and remember the error state.
+     */
+    int error_state;
+
+    /*
+     * Description of ram blocks used throughout the code.
+     */
+    RDMALocalBlocks local_ram_blocks;
+    RDMARemoteBlocks remote_ram_blocks;
+
+    /*
+     * Migration on *destination* started. 
+     * Then use coroutine yield function.
+     * Source runs in a thread, so we don't care.
+     */
+    int migration_started_on_destination;
+} RDMAContext;
+
+/*
+ * Interface to the rest of the migration call stack.
+ */
+typedef struct QEMUFileRDMA {
+    RDMAContext *rdma;
+    size_t len;
+    void *file;
+} QEMUFileRDMA;
+
+#define RDMA_CONTROL_VERSION_CURRENT 1
+
+/*
+ * Main structure for IB Send/Recv control messages.
+ * This gets prepended at the beginning of every Send/Recv.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t len;     /* Total length of data portion */
+    uint32_t type;    /* which control command to perform */
+    uint32_t repeat;  /* number of commands in data portion of same type */
+    uint32_t padding;
+} QEMU_PACKED RDMAControlHeader;
+
+static void control_to_network(RDMAControlHeader *control)
+{
+    control->type = htonl(control->type);
+    control->len = htonl(control->len);
+    control->repeat = htonl(control->repeat);
+}
+
+static void network_to_control(RDMAControlHeader *control)
+{
+    control->type = ntohl(control->type);
+    control->len = ntohl(control->len);
+    control->repeat = ntohl(control->repeat);
+}
+
+/*
+ * Register a single Chunk.
+ * Information sent by the primary VM to inform the dest
+ * to register an single chunk of memory before we can perform
+ * the actual RDMA operation.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t len;           /* length of the chunk to be registered */
+    uint32_t current_index; /* which ramblock the chunk belongs to */
+    uint64_t offset;        /* offset into the ramblock of the chunk */
+} QEMU_PACKED RDMARegister;
+
+typedef struct QEMU_PACKED {
+    uint32_t value;     /* if zero, we will madvise() */
+    uint32_t block_idx; /* which ram block index */
+    uint64_t offset;    /* where in the remote ramblock this chunk */
+    uint64_t length;    /* length of the chunk */
+} QEMU_PACKED RDMACompress;
+
+/*
+ * The result of the dest's memory registration produces an "rkey"
+ * which the primary VM must reference in order to perform
+ * the RDMA operation.
+ */
+typedef struct QEMU_PACKED {
+    uint32_t rkey;
+    uint32_t padding;
+} QEMU_PACKED RDMARegisterResult;
+
+
+inline static int ram_chunk_index(uint8_t *start, uint8_t *host)
+{
+    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
+}
+
+inline static int ram_chunk_count(RDMALocalBlock *rdma_ram_block)
+{
+    return ram_chunk_index(rdma_ram_block->local_host_addr,
+        rdma_ram_block->local_host_addr + rdma_ram_block->length) + 1;
+}
+
+static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block, int i)
+{
+    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
+                                    + (i << RDMA_REG_CHUNK_SHIFT));
+}
+
+inline static uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, int i)
+{
+    uint8_t *result = ram_chunk_start(rdma_ram_block, i) + RDMA_REG_CHUNK_SIZE;
+
+    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
+        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
+    }
+
+    return result;
+}
+
+/*
+ * Memory regions need to be registered with the device and queue pairs setup
+ * in advanced before the migration starts. This tells us where the RAM blocks
+ * are so that we can register them individually.
+ */
+static void qemu_rdma_init_one_block(void *host_addr,
+    ram_addr_t offset, ram_addr_t length, void *opaque)
+{
+    RDMALocalBlocks *rdma_local_ram_blocks = opaque;
+    int num_blocks = rdma_local_ram_blocks->num_blocks;
+
+    rdma_local_ram_blocks->block[num_blocks].local_host_addr = host_addr;
+    rdma_local_ram_blocks->block[num_blocks].offset = (uint64_t)offset;
+    rdma_local_ram_blocks->block[num_blocks].length = (uint64_t)length;
+    rdma_local_ram_blocks->num_blocks++;
+
+}
+
+static void qemu_rdma_ram_block_counter(void *host_addr,
+            ram_addr_t offset, ram_addr_t length, void *opaque)
+{
+    int *num_blocks = opaque;
+    *num_blocks = *num_blocks + 1;
+}
+
+/*
+ * Identify the RAMBlocks and their quantity. They will be references to
+ * identify chunk boundaries inside each RAMBlock and also be referenced
+ * during dynamic page registration.
+ */
+static int qemu_rdma_init_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
+{
+    int num_blocks = 0;
+
+    qemu_ram_foreach_block(qemu_rdma_ram_block_counter, &num_blocks);
+
+    memset(rdma_local_ram_blocks, 0, sizeof *rdma_local_ram_blocks);
+    rdma_local_ram_blocks->block = g_malloc0(sizeof(RDMALocalBlock) *
+                                    num_blocks);
+
+    rdma_local_ram_blocks->num_blocks = 0;
+    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma_local_ram_blocks);
+
+    DPRINTF("Allocated %d local ram block structures\n",
+                    rdma_local_ram_blocks->num_blocks);
+    return 0;
+}
+
+/*
+ * Put in the log file which RDMA device was opened and the details
+ * associated with that device.
+ */
+static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
+{
+    printf("%s RDMA Device opened: kernel name %s "
+           "uverbs device name %s, "
+           "infiniband_verbs class device path %s,"
+           " infiniband class device path %s\n",
+                who,
+                verbs->device->name,
+                verbs->device->dev_name,
+                verbs->device->dev_path,
+                verbs->device->ibdev_path);
+}
+
+/*
+ * Put in the log file the RDMA gid addressing information,
+ * useful for folks who have trouble understanding the
+ * RDMA device hierarchy in the kernel.
+ */
+static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
+{
+    char sgid[33];
+    char dgid[33];
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
+    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
+}
+
+/*
+ * Figure out which RDMA device corresponds to the requested IP hostname
+ * Also create the initial connection manager identifiers for opening
+ * the connection.
+ */
+static int qemu_rdma_resolve_host(RDMAContext *rdma)
+{
+    int ret;
+    struct addrinfo *res;
+    char port_str[16];
+    struct rdma_cm_event *cm_event;
+    char ip[40] = "unknown";
+
+    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
+        fprintf(stderr, "RDMA hostname has not been set\n");
+        return -1;
+    }
+
+    /* create CM channel */
+    rdma->channel = rdma_create_event_channel();
+    if (!rdma->channel) {
+        fprintf(stderr, "could not create CM channel\n");
+        return -1;
+    }
+
+    /* create CM id */
+    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
+    if (ret) {
+        fprintf(stderr, "could not create channel id\n");
+        goto err_resolve_create_id;
+    }
+
+    snprintf(port_str, 16, "%d", rdma->port);
+    port_str[15] = '\0';
+
+    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
+    if (ret < 0) {
+        fprintf(stderr, "could not getaddrinfo destination address %s\n",
+                        rdma->host);
+        goto err_resolve_get_addr;
+    }
+
+    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
+                                ip, sizeof ip);
+    DPRINTF("%s => %s\n", rdma->host, ip);
+
+    /* resolve the first address */
+    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
+            RDMA_RESOLVE_TIMEOUT_MS);
+    if (ret) {
+        fprintf(stderr, "could not resolve address %s\n", rdma->host);
+        goto err_resolve_get_addr;
+    }
+
+    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        fprintf(stderr, "could not perform event_addr_resolved\n");
+        goto err_resolve_get_addr;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
+        fprintf(stderr, "result not equal to event_addr_resolved %s\n",
+                rdma_event_str(cm_event->event));
+        perror("rdma_resolve_addr");
+        goto err_resolve_get_addr;
+    }
+    rdma_ack_cm_event(cm_event);
+
+    /* resolve route */
+    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
+    if (ret) {
+        fprintf(stderr, "could not resolve rdma route\n");
+        goto err_resolve_get_addr;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        fprintf(stderr, "could not perform event_route_resolved\n");
+        goto err_resolve_get_addr;
+    }
+    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
+        fprintf(stderr, "result not equal to event_route_resolved: %s\n",
+                        rdma_event_str(cm_event->event));
+        rdma_ack_cm_event(cm_event);
+        goto err_resolve_get_addr;
+    }
+    rdma_ack_cm_event(cm_event);
+    rdma->verbs = rdma->cm_id->verbs;
+    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
+    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
+    return 0;
+
+err_resolve_get_addr:
+    rdma_destroy_id(rdma->cm_id);
+err_resolve_create_id:
+    rdma_destroy_event_channel(rdma->channel);
+    rdma->channel = NULL;
+
+    return -1;
+}
+
+/*
+ * Create protection domain and completion queues
+ */
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
+{
+    /* allocate pd */
+    rdma->pd = ibv_alloc_pd(rdma->verbs);
+    if (!rdma->pd) {
+        fprintf(stderr, "failed to allocate protection domain\n");
+        return -1;
+    }
+
+    /* create completion channel */
+    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
+    if (!rdma->comp_channel) {
+        fprintf(stderr, "failed to allocate completion channel\n");
+        goto err_alloc_pd_cq;
+    }
+
+    /* create cq */
+    rdma->cq = ibv_create_cq(rdma->verbs, RDMA_CQ_SIZE,
+            NULL, rdma->comp_channel, 0);
+    if (!rdma->cq) {
+        fprintf(stderr, "failed to allocate completion queue\n");
+        goto err_alloc_pd_cq;
+    }
+
+    return 0;
+
+err_alloc_pd_cq:
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+    }
+    rdma->pd = NULL;
+    rdma->comp_channel = NULL;
+    return -1;
+
+}
+
+/*
+ * Create queue pairs.
+ */
+static int qemu_rdma_alloc_qp(RDMAContext *rdma)
+{
+    struct ibv_qp_init_attr attr = { 0 };
+    int ret;
+
+    attr.cap.max_send_wr = RDMA_QP_SIZE;
+    attr.cap.max_recv_wr = 3;
+    attr.cap.max_send_sge = 1;
+    attr.cap.max_recv_sge = 1;
+    attr.send_cq = rdma->cq;
+    attr.recv_cq = rdma->cq;
+    attr.qp_type = IBV_QPT_RC;
+
+    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
+    if (ret) {
+        return -1;
+    }
+
+    rdma->qp = rdma->cm_id->qp;
+    return 0;
+}
+
+static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma,
+                                RDMALocalBlocks *rdma_local_ram_blocks)
+{
+    int i;
+    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
+        DDPRINTF("Registering whole ram blocks\n");
+        rdma_local_ram_blocks->block[i].mr =
+            ibv_reg_mr(rdma->pd,
+                    rdma_local_ram_blocks->block[i].local_host_addr,
+                    rdma_local_ram_blocks->block[i].length,
+                    IBV_ACCESS_LOCAL_WRITE |
+                    IBV_ACCESS_REMOTE_WRITE
+                    );
+        if (!rdma_local_ram_blocks->block[i].mr) {
+            fprintf(stderr, "Failed to register local dest ram block!\n");
+            break;
+        }
+        DDPRINTF("Finished registering whole ram blocks\n");
+    }
+
+    if (i >= rdma_local_ram_blocks->num_blocks) {
+        return 0;
+    }
+
+    for (i--; i >= 0; i--) {
+        ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
+    }
+
+    return -1;
+
+}
+
+/*
+ * Shutdown and clean things up.
+ */
+static void qemu_rdma_dereg_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
+{
+    int i, j;
+    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
+        int num_chunks;
+        if (!rdma_local_ram_blocks->block[i].pmr) {
+            continue;
+        }
+        num_chunks = ram_chunk_count(&(rdma_local_ram_blocks->block[i]));
+        for (j = 0; j < num_chunks; j++) {
+            if (!rdma_local_ram_blocks->block[i].pmr[j]) {
+                continue;
+            }
+            ibv_dereg_mr(rdma_local_ram_blocks->block[i].pmr[j]);
+        }
+        g_free(rdma_local_ram_blocks->block[i].pmr);
+        rdma_local_ram_blocks->block[i].pmr = NULL;
+    }
+    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
+        if (!rdma_local_ram_blocks->block[i].mr) {
+            continue;
+        }
+        ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
+        rdma_local_ram_blocks->block[i].mr = NULL;
+    }
+}
+
+/*
+ * Server uses this to prepare to transmit the RAMBlock descriptions
+ * to the primary VM after connection setup.
+ * Both sides use the "remote" structure to communicate and update
+ * their "local" descriptions with what was sent.
+ */
+static void qemu_rdma_copy_to_remote_ram_blocks(RDMAContext *rdma,
+                                                RDMALocalBlocks *local,
+                                                RDMARemoteBlocks *remote)
+{
+    int i;
+    DPRINTF("Allocating %d remote ram block structures\n", local->num_blocks);
+
+    for (i = 0; i < local->num_blocks; i++) {
+            remote->block[i].remote_host_addr =
+                (uint64_t)(local->block[i].local_host_addr);
+
+            if (!rdma->chunk_register_destination) {
+                remote->block[i].remote_rkey = local->block[i].mr->rkey;
+            }
+
+            remote->block[i].offset = local->block[i].offset;
+            remote->block[i].length = local->block[i].length;
+    }
+}
+
+/*
+ * The protocol uses two different sets of rkeys (mutually exclusive):
+ * 1. One key to represent the virtual address of the entire ram block.
+ *    (dynamic chunk registration disabled - pin everything with one rkey.)
+ * 2. One to represent individual chunks within a ram block. 
+ *    (dynamic chunk registration enabled - pin individual chunks.)
+ *
+ * Once the capability is successfully negotiated, the destination transmits
+ * the keys to use (or sends them later) including the virtual addresses
+ * and then propagates the remote ram block descriptions to his local copy.
+ */
+static int qemu_rdma_process_remote_ram_blocks(RDMALocalBlocks *local,
+                                               RDMARemoteBlocks *remote,
+                                               int num_blocks)
+{
+    int i, j;
+
+    if (local->num_blocks != num_blocks) {
+        fprintf(stderr, "local %d != remote %d\n",
+            local->num_blocks, num_blocks);
+        return -1;
+    }
+
+    for (i = 0; i < num_blocks; i++) {
+        /* search local ram blocks */
+        for (j = 0; j < local->num_blocks; j++) {
+            if (remote->block[i].offset != local->block[j].offset) {
+                continue;
+            }
+            if (remote->block[i].length != local->block[j].length) {
+                return -1;
+            }
+            local->block[j].remote_host_addr =
+                remote->block[i].remote_host_addr;
+            local->block[j].remote_rkey = remote->block[i].remote_rkey;
+            break;
+        }
+        if (j >= local->num_blocks) {
+            return -1;
+        }
+    }
+
+    return 0;
+}
+
+/*
+ * Find the ram block that corresponds to the page requested to be
+ * transmitted by QEMU.
+ *
+ * Once the block is found, also identify which 'chunk' within that
+ * block that the page belongs to.
+ *
+ * This search cannot fail or the migration will fail.
+ */
+static int qemu_rdma_search_ram_block(uint64_t offset, uint64_t length,
+        RDMALocalBlocks *blocks, int *block_index, int *chunk_index)
+{
+    int i;
+    uint8_t *host_addr;
+
+    for (i = 0; i < blocks->num_blocks; i++) {
+        if (offset < blocks->block[i].offset) {
+            continue;
+        }
+        if (offset + length >
+                blocks->block[i].offset + blocks->block[i].length) {
+            continue;
+        }
+
+        *block_index = i;
+        host_addr = blocks->block[i].local_host_addr +
+                (offset - blocks->block[i].offset);
+        *chunk_index = ram_chunk_index(blocks->block[i].local_host_addr, host_addr);
+        return 0;
+    }
+    return -1;
+}
+
+/*
+ * Register a chunk with IB. If the chunk was already registered
+ * previously, then skip.
+ *
+ * Also return the keys associated with the registration needed
+ * to perform the actual RDMA operation.
+ */
+static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
+        RDMALocalBlock *block, uint8_t * host_addr,
+        uint32_t *lkey, uint32_t *rkey)
+{
+    int chunk;
+    if (block->mr) {
+        if (lkey) {
+            *lkey = block->mr->lkey;
+        }
+        if (rkey) {
+            *rkey = block->mr->rkey;
+        }
+        return 0;
+    }
+
+    /* allocate memory to store chunk MRs */
+    if (!block->pmr) {
+        int num_chunks = ram_chunk_count(block);
+        block->pmr = g_malloc0(num_chunks *
+                sizeof(struct ibv_mr *));
+        if (!block->pmr) {
+            return -1;
+        }
+    }
+
+    /*
+     * If 'rkey', then we're the destination, so grant access to the source.
+     *
+     * If 'lkey', then we're the primary VM, so grant access only to ourselves.
+     */
+    chunk = ram_chunk_index(block->local_host_addr, host_addr);
+    if (!block->pmr[chunk]) {
+        uint8_t *start_addr = ram_chunk_start(block, chunk);
+        uint8_t *end_addr = ram_chunk_end(block, chunk);
+
+        DDPRINTF("Registering chunk\n");
+
+        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
+                start_addr,
+                end_addr - start_addr,
+                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
+                        IBV_ACCESS_REMOTE_WRITE) : 0));
+
+        if (!block->pmr[chunk]) {
+            fprintf(stderr, "Failed to register chunk!\n");
+            return -1;
+        }
+        DDPRINTF("Finished registering chunk\n");
+    }
+
+    if (lkey) {
+        *lkey = block->pmr[chunk]->lkey;
+    }
+    if (rkey) {
+        *rkey = block->pmr[chunk]->rkey;
+    }
+    return 0;
+}
+
+/*
+ * Register (at connection time) the memory used for control
+ * channel messages.
+ */
+static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
+{
+    DDPRINTF("Registering control\n");
+    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
+            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
+            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+    if (rdma->wr_data[idx].control_mr) {
+        DDPRINTF("Finished registering control\n");
+        return 0;
+    }
+    fprintf(stderr, "qemu_rdma_reg_control failed!\n");
+    return -1;
+}
+
+static int qemu_rdma_dereg_control(RDMAContext *rdma, int idx)
+{
+    return ibv_dereg_mr(rdma->wr_data[idx].control_mr);
+}
+
+#if defined(DEBUG_RDMA) || defined(DEBUG_RDMA_VERBOSE)
+static const char *print_wrid(int wrid)
+{
+    if (wrid >= RDMA_WRID_RECV_CONTROL) {
+        return wrid_desc[RDMA_WRID_RECV_CONTROL];
+    }
+    return wrid_desc[wrid];
+}
+#endif
+
+/*
+ * Consult the connection manager to see a work request
+ * (of any kind) has completed.
+ * Return the work request ID that completed.
+ */
+static int qemu_rdma_poll(RDMAContext *rdma)
+{
+    int ret;
+    struct ibv_wc wc;
+
+    ret = ibv_poll_cq(rdma->cq, 1, &wc);
+    if (!ret) {
+        return RDMA_WRID_NONE;
+    }
+    if (ret < 0) {
+        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
+        return ret;
+    }
+    if (wc.status != IBV_WC_SUCCESS) {
+        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
+                        wc.status, ibv_wc_status_str(wc.status));
+        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wc.wr_id]);
+
+        return -1;
+    }
+
+    if (rdma->control_ready_expected &&
+        (wc.wr_id >= RDMA_WRID_RECV_CONTROL)) {
+        DDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")\n",
+            wrid_desc[RDMA_WRID_RECV_CONTROL], wc.wr_id -
+            RDMA_WRID_RECV_CONTROL, wc.wr_id);
+        rdma->control_ready_expected = 0;
+    }
+
+    if (wc.wr_id == RDMA_WRID_RDMA_WRITE) {
+        rdma->num_signaled_send--;
+        DDPRINTF("completions %s (%" PRId64 ") left %d\n",
+            print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
+    } else {
+        DDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
+            print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
+    }
+
+    return  (int)wc.wr_id;
+}
+
+/*
+ * Block until the next work request has completed.
+ *
+ * First poll to see if a work request has already completed,
+ * otherwise block.
+ *
+ * If we encounter completed work requests for IDs other than
+ * the one we're interested in, then that's generally an error.
+ *
+ * The only exception is actual RDMA Write completions. These
+ * completions only need to be recorded, but do not actually
+ * need further processing.
+ */
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid)
+{
+    int num_cq_events = 0;
+    int r = RDMA_WRID_NONE;
+    struct ibv_cq *cq;
+    void *cq_ctx;
+
+    if (ibv_req_notify_cq(rdma->cq, 0)) {
+        return -1;
+    }
+    /* poll cq first */
+    while (r != wrid) {
+        r = qemu_rdma_poll(rdma);
+        if (r < 0) {
+            return r;
+        }
+        if (r == RDMA_WRID_NONE) {
+            break;
+        }
+        if (r != wrid) {
+            DDPRINTF("A Wanted wrid %s (%d) but got %s (%d)\n",
+                print_wrid(wrid), wrid, print_wrid(r), r);
+        }
+    }
+    if (r == wrid) {
+        return 0;
+    }
+
+    while (1) {
+        /*
+         * Coroutine doesn't start until process_incoming_migration()
+         * so don't yield unless we know we're running inside of a coroutine.
+         */
+        if (rdma->migration_started_on_destination) {
+            yield_until_fd_readable(rdma->comp_channel->fd);
+        }
+
+        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+            perror("ibv_get_cq_event");
+            goto err_block_for_wrid;
+        }
+
+        num_cq_events++;
+
+        if (ibv_req_notify_cq(cq, 0)) {
+            goto err_block_for_wrid;
+        }
+        /* poll cq */
+        while (r != wrid) {
+            r = qemu_rdma_poll(rdma);
+            if (r < 0) {
+                goto err_block_for_wrid;
+            }
+            if (r == RDMA_WRID_NONE) {
+                break;
+            }
+            if (r != wrid) {
+                DDPRINTF("B Wanted wrid %s (%d) but got %s (%d)\n",
+                    print_wrid(wrid), wrid, print_wrid(r), r);
+            }
+        }
+        if (r == wrid) {
+            goto success_block_for_wrid;
+        }
+    }
+
+success_block_for_wrid:
+    if (num_cq_events) {
+        ibv_ack_cq_events(cq, num_cq_events);
+    }
+    return 0;
+
+err_block_for_wrid:
+    if (num_cq_events) {
+        ibv_ack_cq_events(cq, num_cq_events);
+    }
+    return -1;
+}
+
+/*
+ * Post a SEND message work request for the control channel
+ * containing some data and block until the post completes.
+ */
+static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
+                                       RDMAControlHeader *head)
+{
+    int ret = 0;
+    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_CONTROL_MAX_WR];
+    struct ibv_send_wr *bad_wr;
+    struct ibv_sge sge = {
+                           .addr = (uint64_t)(wr->control),
+                           .length = head->len + sizeof(RDMAControlHeader),
+                           .lkey = wr->control_mr->lkey,
+                         };
+    struct ibv_send_wr send_wr = {
+                                   .wr_id = RDMA_WRID_SEND_CONTROL,
+                                   .opcode = IBV_WR_SEND,
+                                   .send_flags = IBV_SEND_SIGNALED,
+                                   .sg_list = &sge,
+                                   .num_sge = 1,
+                                };
+
+    DPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
+
+    /*
+     * We don't actually need to do a memcpy() in here if we used
+     * the "sge" properly, but since we're only sending control messages
+     * (not RAM in a performance-critical path), then its OK for now.
+     *
+     * The copy makes the RDMAControlHeader simpler to manipulate
+     * for the time being.
+     */
+    memcpy(wr->control, head, sizeof(RDMAControlHeader));
+    control_to_network((void *) wr->control);
+
+    if (buf) {
+        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
+    }
+
+
+    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
+        return -1;
+    }
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to use post IB SEND for control!\n");
+        return ret;
+    }
+
+    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: send polling control error!\n");
+    }
+
+    return ret;
+}
+
+/*
+ * Post a RECV work request in anticipation of some future receipt
+ * of data on the control channel.
+ */
+static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
+{
+    struct ibv_recv_wr *bad_wr;
+    struct ibv_sge sge = {
+                            .addr = (uint64_t)(rdma->wr_data[idx].control),
+                            .length = RDMA_CONTROL_MAX_BUFFER,
+                            .lkey = rdma->wr_data[idx].control_mr->lkey,
+                         };
+
+    struct ibv_recv_wr recv_wr = {
+                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
+                                    .sg_list = &sge,
+                                    .num_sge = 1,
+                                 };
+
+
+    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
+        return -1;
+    }
+
+    return 0;
+}
+
+/*
+ * Block and wait for a RECV control channel message to arrive.
+ */
+static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
+                RDMAControlHeader *head, int expecting, int idx)
+{
+    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
+
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: recv polling control error!\n");
+        return ret;
+    }
+
+    network_to_control((void *) rdma->wr_data[idx].control);
+    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
+
+    DPRINTF("CONTROL: %s received\n", control_desc[expecting]);
+
+    if ((expecting != RDMA_CONTROL_NONE && head->type != expecting)
+            || head->type == RDMA_CONTROL_ERROR) {
+        fprintf(stderr, "Was expecting a %s (%d) control message"
+                ", but got: %s (%d), length: %d\n",
+                control_desc[expecting], expecting,
+                control_desc[head->type], head->type, head->len);
+        return -EIO;
+    }
+
+    return 0;
+}
+
+/*
+ * When a RECV work request has completed, the work request's
+ * buffer is pointed at the header.
+ *
+ * This will advance the pointer to the data portion
+ * of the control message of the work request's buffer that
+ * was populated after the work request finished.
+ */
+static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
+                                  RDMAControlHeader *head)
+{
+    rdma->wr_data[idx].control_len = head->len;
+    rdma->wr_data[idx].control_curr =
+        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
+}
+
+/*
+ * This is an 'atomic' high-level operation to deliver a single, unified
+ * control-channel message.
+ *
+ * Additionally, if the user is expecting some kind of reply to this message,
+ * they can request a 'resp' response message be filled in by posting an
+ * additional work request on behalf of the user and waiting for an additional
+ * completion.
+ *
+ * The extra (optional) response is used during registration to us from having
+ * to perform an *additional* exchange of message just to provide a response by
+ * instead piggy-backing on the acknowledgement.
+ */
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
+                                   uint8_t *data, RDMAControlHeader *resp,
+                                   int *resp_idx)
+{
+    int ret = 0;
+    int idx = 0;
+
+    /*
+     * Wait until the dest is ready before attempting to deliver the message
+     * by waiting for a READY message.
+     */
+    if (rdma->control_ready_expected) {
+        RDMAControlHeader resp;
+        ret = qemu_rdma_exchange_get_response(rdma,
+                                    &resp, RDMA_CONTROL_READY, idx);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    /*
+     * If the user is expecting a response, post a WR in anticipation of it.
+     */
+    if (resp) {
+        ret = qemu_rdma_post_recv_control(rdma, idx + 1);
+        if (ret) {
+            fprintf(stderr, "rdma migration: error posting"
+                    " extra control recv for anticipated result!");
+            return ret;
+        }
+    }
+
+    /*
+     * Post a WR to replace the one we just consumed for the READY message.
+     */
+    ret = qemu_rdma_post_recv_control(rdma, idx);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting first control recv!");
+        return ret;
+    }
+
+    /*
+     * Deliver the control message that was requested.
+     */
+    ret = qemu_rdma_post_send_control(rdma, data, head);
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to send control buffer!\n");
+        return ret;
+    }
+
+    /*
+     * If we're expecting a response, block and wait for it.
+     */
+    if (resp) {
+        DPRINTF("Waiting for response %s\n", control_desc[resp->type]);
+        ret = qemu_rdma_exchange_get_response(rdma, resp, resp->type, idx + 1);
+
+        if (ret < 0) {
+            return ret;
+        }
+
+        qemu_rdma_move_header(rdma, idx + 1, resp);
+        *resp_idx = idx + 1;
+        DPRINTF("Response %s received.\n", control_desc[resp->type]);
+    }
+
+    rdma->control_ready_expected = 1;
+
+    return 0;
+}
+
+/*
+ * This is an 'atomic' high-level operation to receive a single, unified
+ * control-channel message.
+ */
+static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
+                                int expecting)
+{
+    RDMAControlHeader ready = {
+                                .len = 0,
+                                .type = RDMA_CONTROL_READY,
+                                .repeat = 1,
+                              };
+    int ret;
+    int idx = 0;
+
+    /*
+     * Inform the source that we're ready to receive a message.
+     */
+    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
+
+    if (ret < 0) {
+        fprintf(stderr, "Failed to send control buffer!\n");
+        return ret;
+    }
+
+    /*
+     * Block and wait for the message.
+     */
+    ret = qemu_rdma_exchange_get_response(rdma, head, expecting, idx);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    qemu_rdma_move_header(rdma, idx, head);
+
+    /*
+     * Post a new RECV work request to replace the one we just consumed.
+     */
+    ret = qemu_rdma_post_recv_control(rdma, idx);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting second control recv!");
+        return ret;
+    }
+
+    return 0;
+}
+
+/*
+ * Write an actual chunk of memory using RDMA.
+ *
+ * If we're using dynamic registration on the dest-side, we have to
+ * send a registration command first.
+ */
+static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
+        int current_index,
+        uint64_t offset, uint64_t length,
+        uint64_t wr_id, enum ibv_send_flags flag)
+{
+    struct ibv_sge sge;
+    struct ibv_send_wr send_wr = { 0 };
+    struct ibv_send_wr *bad_wr;
+    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
+    int chunk;
+    RDMARegister reg;
+    RDMARegisterResult *reg_result;
+    int reg_result_idx;
+    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
+    RDMAControlHeader head = { .len = sizeof(RDMARegister),
+                               .type = RDMA_CONTROL_REGISTER_REQUEST,
+                               .repeat = 1,
+                             };
+    int ret;
+
+    sge.addr = (uint64_t)(block->local_host_addr + (offset - block->offset));
+    sge.length = length;
+
+    if (rdma->chunk_register_destination) {
+        chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
+        if (!block->remote_keys[chunk]) {
+            /*
+             * This page has not yet been registered, so first check to see
+             * if the entire chunk is zero. If so, tell the other size to
+             * memset() + madvise() the entire chunk without RDMA.
+             */
+            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
+                   && buffer_find_nonzero_offset((void *)sge.addr,
+                                                    length) == length) {
+                RDMACompress comp = {
+                                        .offset = offset,
+                                        .value = 0,
+                                        .block_idx = current_index,
+                                        .length = length,
+                                    };
+
+                head.len = sizeof(comp);
+                head.type = RDMA_CONTROL_COMPRESS;
+
+                DPRINTF("Entire chunk is zero, sending compress: %d for %d "
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
+                    chunk, sge.length, current_index, offset);
+
+                ret = qemu_rdma_exchange_send(rdma, &head,
+                                (uint8_t *) &comp, NULL, NULL);
+
+                if (ret < 0) {
+                    return -EIO;
+                }
+
+                return 1;
+            }
+
+            /*
+             * Otherwise, tell other side to register.
+             */
+            reg.len = sge.length;
+            reg.current_index = current_index;
+            reg.offset = offset;
+
+            DPRINTF("Sending registration request chunk %d for %d "
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
+                    chunk, sge.length, current_index, offset);
+
+            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
+                                    &resp, &reg_result_idx);
+            if (ret < 0) {
+                return ret;
+            }
+
+            reg_result = (RDMARegisterResult *)
+                    rdma->wr_data[reg_result_idx].control_curr;
+
+            DPRINTF("Received registration result:"
+                    " my key: %x their key %x, chunk %d\n",
+                    block->remote_keys[chunk], reg_result->rkey, chunk);
+
+            block->remote_keys[chunk] = reg_result->rkey;
+        }
+
+        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
+    } else {
+        send_wr.wr.rdma.rkey = block->remote_rkey;
+    }
+
+    if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
+                                        &sge.lkey, NULL)) {
+        fprintf(stderr, "cannot get lkey!\n");
+        return -EINVAL;
+    }
+
+    send_wr.wr_id = wr_id;
+    send_wr.opcode = IBV_WR_RDMA_WRITE;
+    send_wr.send_flags = flag;
+    send_wr.sg_list = &sge;
+    send_wr.num_sge = 1;
+    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
+        (offset - block->offset);
+
+    return ibv_post_send(rdma->qp, &send_wr, &bad_wr);
+}
+
+/*
+ * Push out any unwritten RDMA operations.
+ *
+ * We support sending out multiple chunks at the same time.
+ * Not all of them need to get signaled in the completion queue.
+ */
+static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
+{
+    int ret;
+    enum ibv_send_flags flags = 0;
+
+    if (!rdma->current_length) {
+        return 0;
+    }
+    if (rdma->num_unsignaled_send >=
+            RDMA_UNSIGNALED_SEND_MAX) {
+        flags = IBV_SEND_SIGNALED;
+    }
+
+retry:
+    ret = qemu_rdma_write_one(f, rdma,
+            rdma->current_index,
+            rdma->current_offset,
+            rdma->current_length,
+            RDMA_WRID_RDMA_WRITE, flags);
+
+    if (ret < 0) {
+        if (ret == -ENOMEM) {
+            DPRINTF("send queue is full. wait a little....\n");
+            ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+            if (ret >= 0) {
+                goto retry;
+            }
+            if (ret < 0) {
+                fprintf(stderr, "rdma migration: failed to make "
+                                "room in full send queue! %d\n", ret);
+                return ret;
+            }
+        }
+        perror("write flush error");
+        return ret;
+    }
+
+    if (ret == 0) {
+        if (rdma->num_unsignaled_send >=
+                RDMA_UNSIGNALED_SEND_MAX) {
+            rdma->num_unsignaled_send = 0;
+            rdma->num_signaled_send++;
+            DPRINTF("signaled total: %d\n", rdma->num_signaled_send);
+        } else {
+            rdma->num_unsignaled_send++;
+        }
+    }
+
+    rdma->current_length = 0;
+    rdma->current_offset = 0;
+
+    return 0;
+}
+
+static inline int qemu_rdma_in_current_block(RDMAContext *rdma,
+                uint64_t offset, uint64_t len)
+{
+    RDMALocalBlock *block =
+        &(rdma->local_ram_blocks.block[rdma->current_index]);
+    if (rdma->current_index < 0) {
+        return 0;
+    }
+    if (offset < block->offset) {
+        return 0;
+    }
+    if (offset + len > block->offset + block->length) {
+        return 0;
+    }
+    return 1;
+}
+
+static inline int qemu_rdma_in_current_chunk(RDMAContext *rdma,
+                uint64_t offset, uint64_t len)
+{
+    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[rdma->current_index]);
+    uint8_t *chunk_start, *chunk_end, *host_addr;
+    if (rdma->current_chunk < 0) {
+        return 0;
+    }
+    host_addr = block->local_host_addr + (offset - block->offset);
+    chunk_start = ram_chunk_start(block, rdma->current_chunk);
+
+    if (host_addr < chunk_start) {
+        return 0;
+    }
+
+    chunk_end = ram_chunk_end(block, rdma->current_chunk);
+
+    if ((host_addr + len) > chunk_end) {
+        return 0;
+    }
+    return 1;
+}
+
+static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
+                    uint64_t offset, uint64_t len)
+{
+    if (rdma->current_length == 0) {
+        return 0;
+    }
+    if (offset != rdma->current_offset + rdma->current_length) {
+        return 0;
+    }
+    if (!qemu_rdma_in_current_block(rdma, offset, len)) {
+        return 0;
+    }
+    if (!qemu_rdma_in_current_chunk(rdma, offset, len)) {
+        return 0;
+    }
+    return 1;
+}
+
+/*
+ * We're not actually writing here, but doing three things:
+ *
+ * 1. Identify the chunk the buffer belongs to.
+ * 2. If the chunk is full or the buffer doesn't belong to the current
+ *    chunk, then start a new chunk and flush() the old chunk.
+ * 3. To keep the hardware busy, we also group chunks into batches
+ *    and only require that a batch gets acknowledged in the completion
+ *    qeueue instead of each individual chunk.
+ */
+static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
+                           uint64_t offset, uint64_t len)
+{
+    int index = rdma->current_index;
+    int chunk_index = rdma->current_chunk;
+    int ret;
+
+    /* If we cannot merge it, we flush the current buffer first. */
+    if (!qemu_rdma_buffer_mergable(rdma, offset, len)) {
+        ret = qemu_rdma_write_flush(f, rdma);
+        if (ret) {
+            return ret;
+        }
+        rdma->current_length = 0;
+        rdma->current_offset = offset;
+
+        ret = qemu_rdma_search_ram_block(offset, len,
+                    &rdma->local_ram_blocks, &index, &chunk_index);
+        if (ret) {
+            fprintf(stderr, "ram block search failed\n");
+            return ret;
+        }
+        rdma->current_index = index;
+        rdma->current_chunk = chunk_index;
+    }
+
+    /* merge it */
+    rdma->current_length += len;
+
+    /* flush it if buffer is too large */
+    if (rdma->current_length >= RDMA_MERGE_MAX) {
+        return qemu_rdma_write_flush(f, rdma);
+    }
+
+    return 0;
+}
+
+static void qemu_rdma_cleanup(RDMAContext *rdma)
+{
+    struct rdma_cm_event *cm_event;
+    int ret, idx;
+
+    if (rdma->cm_id) {
+        if(rdma->error_state) {
+            RDMAControlHeader head = { .len = 0,
+                                       .type = RDMA_CONTROL_ERROR,
+                                       .repeat = 1,
+                                     };
+            fprintf(stderr, "Early error. Sending error.\n");
+            qemu_rdma_post_send_control(rdma, NULL, &head);
+        }
+
+        ret = rdma_disconnect(rdma->cm_id);
+        if (!ret) {
+            DPRINTF("waiting for disconnect\n");
+            ret = rdma_get_cm_event(rdma->channel, &cm_event);
+            if (!ret) {
+                rdma_ack_cm_event(cm_event);
+            }
+        }
+        DPRINTF("Disconnected.\n");
+        rdma->cm_id = 0;
+    }
+
+    g_free(rdma->remote_ram_blocks.remote_area);
+    rdma->remote_ram_blocks.remote_area = NULL;
+
+    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
+        if (rdma->wr_data[idx].control_mr) {
+            qemu_rdma_dereg_control(rdma, idx);
+        }
+        rdma->wr_data[idx].control_mr = NULL;
+    }
+
+    if (rdma->local_ram_blocks.block) {
+        qemu_rdma_dereg_ram_blocks(&rdma->local_ram_blocks);
+
+        if (rdma->chunk_register_destination) {
+            for (idx = 0; idx < rdma->local_ram_blocks.num_blocks; idx++) {
+                RDMALocalBlock *block = &(rdma->local_ram_blocks.block[idx]);
+                g_free(block->remote_keys);
+                block->remote_keys = NULL;
+            }
+        }
+        g_free(rdma->local_ram_blocks.block);
+        rdma->local_ram_blocks.block = NULL;
+    }
+
+    if (rdma->qp) {
+        ibv_destroy_qp(rdma->qp);
+        rdma->qp = NULL;
+    }
+    if (rdma->cq) {
+        ibv_destroy_cq(rdma->cq);
+        rdma->cq = NULL;
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+        rdma->comp_channel = NULL;
+    }
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+        rdma->pd = NULL;
+    }
+    if (rdma->listen_id) {
+        rdma_destroy_id(rdma->listen_id);
+        rdma->listen_id = 0;
+    }
+    if (rdma->cm_id) {
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = 0;
+    }
+    if (rdma->channel) {
+        rdma_destroy_event_channel(rdma->channel);
+        rdma->channel = NULL;
+    }
+}
+
+static void qemu_rdma_remote_ram_blocks_init(RDMAContext *rdma)
+{
+    int remote_size = sizeof(RDMARemoteBlock) * rdma->local_ram_blocks.num_blocks;
+
+    DPRINTF("Preparing %d bytes for remote info\n", remote_size);
+
+    rdma->remote_ram_blocks.remote_area = g_malloc0(remote_size);
+    rdma->remote_ram_blocks.block = (RDMARemoteBlock *) rdma->remote_ram_blocks.remote_area;
+}
+
+static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp,
+                                 bool chunk_register_destination)
+{
+    int ret, idx;
+
+    /*
+     * Will be validated against destination's actual capabilities
+     * after the connect() completes.
+     */
+    rdma->chunk_register_destination = chunk_register_destination;
+
+    ret = qemu_rdma_resolve_host(rdma);
+    if (ret) {
+        error_setg(errp, "rdma migration: error resolving host!\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_alloc_pd_cq(rdma);
+    if (ret) {
+        error_setg(errp, "rdma migration: error allocating pd and cq!\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_alloc_qp(rdma);
+    if (ret) {
+        error_setg(errp, "rdma migration: error allocating qp!\n");
+        goto err_rdma_source_init;
+    }
+
+    ret = qemu_rdma_init_ram_blocks(&rdma->local_ram_blocks);
+    if (ret) {
+        error_setg(errp, "rdma migration: error initializing ram blocks!\n");
+        goto err_rdma_source_init;
+    }
+
+    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
+        ret = qemu_rdma_reg_control(rdma, idx);
+        if (ret) {
+            error_setg(errp, "rdma migration: error registering %d control!\n",
+                                                            idx);
+            goto err_rdma_source_init;
+        }
+    }
+
+    qemu_rdma_remote_ram_blocks_init(rdma);
+    return 0;
+
+err_rdma_source_init:
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
+{
+    RDMAControlHeader head;
+    RDMACapabilities cap = {
+                                .version = RDMA_CONTROL_VERSION_CURRENT,
+                                .flags = 0,
+                           };
+    struct rdma_conn_param conn_param = { .initiator_depth = 2,
+                                          .retry_count = 5,
+                                          .private_data = &cap,
+                                          .private_data_len = sizeof(cap),
+                                        };
+    struct rdma_cm_event *cm_event;
+    int ret;
+    int idx = 0;
+    int x;
+
+    /*
+     * Only negotiate the capability with destination if the user
+     * on the source first requested the capability.
+     */
+    if (rdma->chunk_register_destination) {
+        DPRINTF("Server dynamic registration requested.\n");
+        cap.flags |= RDMA_CAPABILITY_CHUNK_REGISTER;
+    }
+
+    caps_to_network(&cap);
+
+    ret = rdma_connect(rdma->cm_id, &conn_param);
+    if (ret) {
+        perror("rdma_connect");
+        error_setg(errp, "rdma migration: error connecting!\n");
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = 0;
+        goto err_rdma_source_connect;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        perror("rdma_get_cm_event after rdma_connect");
+        error_setg(errp, "rdma migration: error connecting!\n");
+        rdma_ack_cm_event(cm_event);
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = 0;
+        goto err_rdma_source_connect;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
+        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
+        error_setg(errp, "rdma migration: error connecting!\n");
+        rdma_ack_cm_event(cm_event);
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = 0;
+        goto err_rdma_source_connect;
+    }
+
+    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
+    network_to_caps(&cap);
+
+    /*
+     * Verify that the *requested* capabilities are supported by the destination
+     * and disable them otherwise.
+     */
+    if (!(cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) &&
+                                rdma->chunk_register_destination) {
+        fprintf(stderr, "Server cannot support dynamic registration. Will disable\n");
+        rdma->chunk_register_destination = false;
+    }
+
+    DPRINTF("Chunk registration %s\n",
+        rdma->chunk_register_destination ? "enabled" : "disabled");
+
+    rdma_ack_cm_event(cm_event);
+
+    ret = qemu_rdma_post_recv_control(rdma, idx + 1);
+    if (ret) {
+        error_setg(errp, "rdma migration: error posting first control recv!\n");
+        goto err_rdma_source_connect;
+    }
+
+    ret = qemu_rdma_post_recv_control(rdma, idx);
+    if (ret) {
+        error_setg(errp, "rdma migration: error posting second control recv!\n");
+        goto err_rdma_source_connect;
+    }
+
+    ret = qemu_rdma_exchange_get_response(rdma,
+                                &head, RDMA_CONTROL_RAM_BLOCKS, idx + 1);
+
+    if (ret < 0) {
+        error_setg(errp, "rdma migration: error receiving remote info!\n");
+        goto err_rdma_source_connect;
+    }
+
+    qemu_rdma_move_header(rdma, idx + 1, &head);
+    memcpy(rdma->remote_ram_blocks.remote_area, 
+                rdma->wr_data[idx + 1].control_curr, head.len);
+
+    ret = qemu_rdma_process_remote_ram_blocks(&rdma->local_ram_blocks, 
+                &rdma->remote_ram_blocks,
+                (head.len / sizeof(RDMARemoteBlock)));
+    if (ret) {
+        error_setg(errp, "rdma migration: error processing"
+                        " remote ram blocks!\n");
+        goto err_rdma_source_connect;
+    }
+
+    if (rdma->chunk_register_destination) {
+        for (x = 0; x < rdma->local_ram_blocks.num_blocks; x++) {
+            RDMALocalBlock *block = &(rdma->local_ram_blocks.block[x]);
+            int num_chunks = ram_chunk_count(block);
+            /* allocate memory to store remote rkeys */
+            block->remote_keys = g_malloc0(num_chunks * sizeof(uint32_t));
+        }
+    }
+    rdma->control_ready_expected = 1;
+    rdma->num_signaled_send = 0;
+    return 0;
+
+err_rdma_source_connect:
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static int qemu_rdma_dest_init(RDMAContext *rdma)
+{
+    int ret = -EINVAL, idx;
+    struct sockaddr_in sin;
+    struct rdma_cm_id *listen_id;
+    char ip[40] = "unknown";
+
+    for (idx = 0; idx < RDMA_CONTROL_MAX_WR; idx++) {
+        rdma->wr_data[idx].control_len = 0;
+        rdma->wr_data[idx].control_curr = NULL;
+    }
+
+    if (rdma->host == NULL) {
+        fprintf(stderr, "Error: RDMA host is not set!");
+        rdma->error_state = -EINVAL;
+        return -1;
+    }
+    /* create CM channel */
+    rdma->channel = rdma_create_event_channel();
+    if (!rdma->channel) {
+        fprintf(stderr, "Error: could not create rdma event channel");
+        rdma->error_state = -EINVAL;
+        return -1;
+    }
+
+    /* create CM id */
+    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
+    if (ret) {
+        fprintf(stderr, "Error: could not create cm_id!");
+        goto err_dest_init_create_listen_id;
+    }
+
+    memset(&sin, 0, sizeof(sin));
+    sin.sin_family = AF_INET;
+    sin.sin_port = htons(rdma->port);
+
+    if (rdma->host && strcmp("", rdma->host)) {
+        struct hostent *dest_addr;
+        dest_addr = gethostbyname(rdma->host);
+        if (!dest_addr) {
+            fprintf(stderr, "Error: migration could not gethostbyname!");
+            ret = -EINVAL;
+            goto err_dest_init_bind_addr;
+        }
+        memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
+                dest_addr->h_length);
+        inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
+    } else {
+        sin.sin_addr.s_addr = INADDR_ANY;
+    }
+
+    DPRINTF("%s => %s\n", rdma->host, ip);
+
+    ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
+    if (ret) {
+        fprintf(stderr, "Error: could not rdma_bind_addr!");
+        goto err_dest_init_bind_addr;
+    }
+
+    rdma->listen_id = listen_id;
+    if (listen_id->verbs) {
+        rdma->verbs = listen_id->verbs;
+    }
+    qemu_rdma_dump_id("dest_init", rdma->verbs);
+    qemu_rdma_dump_gid("dest_init", listen_id);
+    return 0;
+
+err_dest_init_bind_addr:
+    rdma_destroy_id(listen_id);
+err_dest_init_create_listen_id:
+    rdma_destroy_event_channel(rdma->channel);
+    rdma->channel = NULL;
+    rdma->error_state = ret;
+    return ret;
+
+}
+
+static int qemu_rdma_dest_prepare(RDMAContext *rdma)
+{
+    int ret;
+    int idx;
+
+    if (!rdma->verbs) {
+        fprintf(stderr, "rdma migration: no verbs context!");
+        return 0;
+    }
+
+    ret = qemu_rdma_alloc_pd_cq(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error allocating pd and cq!");
+        goto err_rdma_dest_prepare;
+    }
+
+    ret = qemu_rdma_init_ram_blocks(&rdma->local_ram_blocks);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error initializing ram blocks!");
+        goto err_rdma_dest_prepare;
+    }
+
+    qemu_rdma_remote_ram_blocks_init(rdma);
+
+    /* Extra one for the send buffer */
+    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
+        ret = qemu_rdma_reg_control(rdma, idx);
+        if (ret) {
+            fprintf(stderr, "rdma migration: error registering %d control!",
+                        idx);
+            goto err_rdma_dest_prepare;
+        }
+    }
+
+    ret = rdma_listen(rdma->listen_id, 5);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error listening on socket!");
+        goto err_rdma_dest_prepare;
+    }
+
+    return 0;
+
+err_rdma_dest_prepare:
+    qemu_rdma_cleanup(rdma);
+    return -1;
+}
+
+static void *qemu_rdma_data_init(const char *host_port)
+{
+    RDMAContext *rdma = NULL;
+    InetSocketAddress *addr;
+
+    if (host_port) {
+        rdma = g_malloc0(sizeof(RDMAContext));
+        memset(rdma, 0, sizeof(RDMAContext));
+        rdma->current_index = -1;
+        rdma->current_chunk = -1;
+
+        addr = inet_parse(host_port, NULL);
+        if (addr != NULL) {
+            rdma->port = atoi(addr->port);
+            rdma->host = g_strdup(addr->host);
+        } else {
+            fprintf(stderr, "bad RDMA migration address '%s'", host_port);
+            g_free(rdma);
+            return NULL;
+        }
+    }
+
+    return rdma;
+}
+
+/*
+ * QEMUFile interface to the control channel.
+ * SEND messages for control only.
+ * pc.ram is handled with regular RDMA messages.
+ */
+static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
+                                int64_t pos, int size)
+{
+    QEMUFileRDMA *r = opaque;
+    QEMUFile *f = r->file;
+    RDMAContext *rdma = r->rdma;
+    size_t remaining = size;
+    uint8_t * data = (void *) buf;
+    int ret;
+
+    CHECK_ERROR_STATE();
+
+    /*
+     * Push out any writes that
+     * we're queued up for pc.ram.
+     */
+    if (qemu_rdma_write_flush(f, rdma) < 0) {
+        rdma->error_state = -EIO;
+        return rdma->error_state;
+    }
+
+    while (remaining) {
+        RDMAControlHeader head;
+
+        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
+        remaining -= r->len;
+
+        head.len = r->len;
+        head.type = RDMA_CONTROL_QEMU_FILE;
+
+        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL);
+
+        if (ret < 0) {
+            rdma->error_state = ret;
+            return ret;
+        }
+
+        data += r->len;
+    }
+
+    return size;
+}
+
+static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
+                             int size, int idx)
+{
+    size_t len = 0;
+
+    if (rdma->wr_data[idx].control_len) {
+        DPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
+                    rdma->wr_data[idx].control_len, size);
+
+        len = MIN(size, rdma->wr_data[idx].control_len);
+        memcpy(buf, rdma->wr_data[idx].control_curr, len);
+        rdma->wr_data[idx].control_curr += len;
+        rdma->wr_data[idx].control_len -= len;
+    }
+
+    return len;
+}
+
+/*
+ * QEMUFile interface to the control channel.
+ * RDMA links don't use bytestreams, so we have to
+ * return bytes to QEMUFile opportunistically.
+ */
+static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
+                                int64_t pos, int size)
+{
+    QEMUFileRDMA *r = opaque;
+    RDMAContext *rdma = r->rdma;
+    RDMAControlHeader head;
+    int ret = 0;
+
+    CHECK_ERROR_STATE();
+
+    /*
+     * First, we hold on to the last SEND message we
+     * were given and dish out the bytes until we run
+     * out of bytes.
+     */
+    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
+    if (r->len) {
+        return r->len;
+    }
+
+    /*
+     * Once we run out, we block and wait for another
+     * SEND message to arrive.
+     */
+    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+    if (ret < 0) {
+        rdma->error_state = ret;
+        return ret;
+    }
+
+    /*
+     * SEND was received with new bytes, now try again.
+     */
+    return qemu_rdma_fill(r->rdma, buf, size, 0);
+}
+
+/*
+ * Block until all the outstanding chunks have been delivered by the hardware.
+ */
+static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
+{
+    int ret;
+
+    if (qemu_rdma_write_flush(f, rdma) < 0) {
+        return -EIO;
+    }
+
+    while (rdma->num_signaled_send) {
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
+        if (ret < 0) {
+            fprintf(stderr, "rdma migration: complete polling error!\n");
+            return -EIO;
+        }
+    }
+
+    return 0;
+}
+
+static int qemu_rdma_close(void *opaque)
+{
+    QEMUFileRDMA *r = opaque;
+    if (r->rdma) {
+        qemu_rdma_cleanup(r->rdma);
+        g_free(r->rdma);
+    }
+    g_free(r);
+    return 0;
+}
+
+static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
+                   ram_addr_t block_offset, ram_addr_t offset, size_t size)
+{
+    ram_addr_t current_addr = block_offset + offset;
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    int ret;
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+
+    /*
+     * Add this page to the current 'chunk'. If the chunk
+     * is full, or the page doen't belong to the current chunk,
+     * an actual RDMA write will occur and a new chunk will be formed.
+     */
+    ret = qemu_rdma_write(f, rdma, current_addr, size);
+    if (ret < 0) {
+        rdma->error_state = ret;
+        fprintf(stderr, "rdma migration: write error! %d\n", ret);
+        return ret;
+    }
+
+    /*
+     * Drain the Completion Queue if possible, but do not block,
+     * just poll.
+     *
+     * If nothing to poll, the end of the iteration will do this
+     * again to make sure we don't overflow the request queue.
+     */
+    while (1) {
+        int ret = qemu_rdma_poll(rdma);
+        if (ret == RDMA_WRID_NONE) {
+            break;
+        }
+        if (ret < 0) {
+            rdma->error_state = ret;
+            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
+            return ret;
+        }
+    }
+
+    return size;
+}
+
+static int qemu_rdma_accept(RDMAContext *rdma)
+{
+    RDMAControlHeader head = { .len = rdma->local_ram_blocks.num_blocks *
+                                        sizeof(RDMARemoteBlock),
+                               .type = RDMA_CONTROL_RAM_BLOCKS,
+                               .repeat = 1,
+                             };
+    RDMACapabilities cap;
+    struct rdma_conn_param conn_param = {
+                                            .responder_resources = 2,
+                                            .private_data = &cap,
+                                            .private_data_len = sizeof(cap),
+                                         };
+    struct rdma_cm_event *cm_event;
+    struct ibv_context *verbs;
+    int ret = -EINVAL;
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        goto err_rdma_dest_wait;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+        rdma_ack_cm_event(cm_event);
+        goto err_rdma_dest_wait;
+    }
+
+    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
+
+    network_to_caps(&cap);
+
+    if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
+            fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
+                            cap.version);
+            rdma_ack_cm_event(cm_event);
+            goto err_rdma_dest_wait;
+    }
+
+    /*
+     * Response with only the capabilities this version of QEMU knows about.
+     */
+    cap.flags &= known_capabilities;
+
+    /*
+     * Enable the ones that we do know about.
+     * Add other checks here as new ones are introduced.
+     */
+    if (cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) {
+        rdma->chunk_register_destination = true;
+    }
+
+    rdma->cm_id = cm_event->id;
+    verbs = cm_event->id->verbs;
+
+    rdma_ack_cm_event(cm_event);
+
+    DPRINTF("Chunk registration %s\n",
+        rdma->chunk_register_destination ? "enabled" : "disabled");
+
+    caps_to_network(&cap);
+
+    DPRINTF("verbs context after listen: %p\n", verbs);
+
+    if (!rdma->verbs) {
+        rdma->verbs = verbs;
+        ret = qemu_rdma_dest_prepare(rdma);
+        if (ret) {
+            fprintf(stderr, "rdma migration: error preparing dest!\n");
+            goto err_rdma_dest_wait;
+        }
+    } else if (rdma->verbs != verbs) {
+            fprintf(stderr, "ibv context not matching %p, %p!\n",
+                    rdma->verbs, verbs);
+            goto err_rdma_dest_wait;
+    }
+
+    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
+
+    ret = qemu_rdma_alloc_qp(rdma);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error allocating qp!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    ret = rdma_accept(rdma->cm_id, &conn_param);
+    if (ret) {
+        fprintf(stderr, "rdma_accept returns %d!\n", ret);
+        goto err_rdma_dest_wait;
+    }
+
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
+    if (ret) {
+        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
+        goto err_rdma_dest_wait;
+    }
+
+    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
+        fprintf(stderr, "rdma_accept not event established!\n");
+        rdma_ack_cm_event(cm_event);
+        goto err_rdma_dest_wait;
+    }
+
+    rdma_ack_cm_event(cm_event);
+
+    ret = qemu_rdma_post_recv_control(rdma, 0);
+    if (ret) {
+        fprintf(stderr, "rdma migration: error posting second control recv!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    if (!rdma->chunk_register_destination) {
+        ret = qemu_rdma_reg_whole_ram_blocks(rdma, &rdma->local_ram_blocks);
+        if (ret) {
+            fprintf(stderr, "rdma migration: error dest "
+                            "registering ram blocks!\n");
+            goto err_rdma_dest_wait;
+        }
+    }
+
+    qemu_rdma_copy_to_remote_ram_blocks(rdma,
+            &rdma->local_ram_blocks, &rdma->remote_ram_blocks);
+
+    ret = qemu_rdma_post_send_control(rdma,
+            (uint8_t *) rdma->remote_ram_blocks.remote_area, &head);
+
+    if (ret < 0) {
+        fprintf(stderr, "rdma migration: error sending remote info!\n");
+        goto err_rdma_dest_wait;
+    }
+
+    qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
+
+    return 0;
+
+err_rdma_dest_wait:
+    rdma->error_state = ret;
+    qemu_rdma_cleanup(rdma);
+    return ret;
+}
+                                       
+/*
+ * During each iteration of the migration, we listen for instructions
+ * by the primary VM to perform dynamic page registrations before they
+ * can perform RDMA operations.
+ *
+ * We respond with the 'rkey'.
+ *
+ * Keep doing this until the primary tells us to stop.
+ */
+static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
+                                         uint64_t flags)
+{
+    RDMAControlHeader resp = { .len = sizeof(RDMARegisterResult),
+                               .type = RDMA_CONTROL_REGISTER_RESULT,
+                               .repeat = 0,
+                             };
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    RDMAControlHeader head;
+    RDMARegister *reg, *registers;
+    RDMACompress *comp;
+    RDMARegisterResult *reg_result;
+    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
+    RDMALocalBlock *block;
+    void *host_addr;
+    int ret = 0;
+    int idx = 0;
+    int count = 0;
+
+    CHECK_ERROR_STATE();
+
+    do {
+        DPRINTF("Waiting for next registration %d...\n", flags);
+
+        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
+
+        if (ret < 0) {
+            break;
+        }
+
+        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
+            fprintf(stderr, "Too many requests in this message (%d). Bailing.\n",
+                head.repeat);
+            ret = -EIO;
+            break;
+        }
+
+        switch (head.type) {
+        case RDMA_CONTROL_COMPRESS:
+            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
+
+            DPRINTF("Zapping zero chunk: %" PRId64
+                " bytes, index %d, offset %" PRId64 "\n",
+                comp->length, comp->block_idx, comp->offset);
+            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
+            block = &(rdma->local_ram_blocks.block[comp->block_idx]);
+
+            host_addr = block->local_host_addr +
+                            (comp->offset - block->offset);
+
+            ram_handle_compressed(host_addr, comp->value, comp->length);
+            break;
+        case RDMA_CONTROL_REGISTER_FINISHED:
+            DPRINTF("Current registrations complete.\n");
+            goto out;
+        case RDMA_CONTROL_REGISTER_REQUEST:
+            DPRINTF("There are %d registration requests\n", head.repeat);
+
+            resp.repeat = head.repeat;
+            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
+
+            for (count = 0; count < head.repeat; count++) {
+                reg = &registers[count];
+                reg_result = &results[count];
+
+                DPRINTF("Registration request (%d): %d"
+                    " bytes, index %d, offset %" PRId64 "\n",
+                    count, reg->len, reg->current_index, reg->offset);
+
+                block = &(rdma->local_ram_blocks.block[reg->current_index]);
+                host_addr = (block->local_host_addr +
+                            (reg->offset - block->offset));
+                if (qemu_rdma_register_and_get_keys(rdma, block,
+                            (uint8_t *)host_addr, NULL, &reg_result->rkey)) {
+                    fprintf(stderr, "cannot get rkey!\n");
+                    ret = -EINVAL;
+                    goto out;
+                }
+
+                DPRINTF("Registered rkey for this request: %x\n",
+                                reg_result->rkey);
+            }
+
+            ret = qemu_rdma_post_send_control(rdma,
+                            (uint8_t *) results, &resp);
+
+            if (ret < 0) {
+                fprintf(stderr, "Failed to send control buffer!\n");
+                goto out;
+            }
+            break;
+        case RDMA_CONTROL_REGISTER_RESULT:
+            fprintf(stderr, "Invalid RESULT message at dest.\n");
+            ret = -EIO;
+            goto out;
+        default:
+            fprintf(stderr, "Unknown control message %s\n",
+                                control_desc[head.type]);
+            ret = -EIO;
+            goto out;
+        }
+    } while (1);
+out:
+    if(ret < 0) {
+        rdma->error_state = ret;
+    }
+    return ret;
+}
+
+static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
+                                        uint64_t flags)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+
+    CHECK_ERROR_STATE();
+
+    DPRINTF("start section: %" PRIu64 "\n", flags);
+    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+    qemu_fflush(f);
+    return 0;
+}
+
+/*
+ * Inform dest that dynamic registrations are done for now.
+ * First, flush writes, if any.
+ */
+static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
+                                       uint64_t flags)
+{
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+    RDMAControlHeader head = { .len = 0,
+                               .type = RDMA_CONTROL_REGISTER_FINISHED,
+                               .repeat = 1,
+                             };
+
+    CHECK_ERROR_STATE();
+
+    qemu_fflush(f);
+    int ret = qemu_rdma_drain_cq(f, rdma);
+
+    if (ret >= 0) {
+        DPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
+
+        ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL);
+    }
+
+    if (ret < 0) {
+        rdma->error_state = ret;
+    }
+
+    return ret;
+}
+
+static int qemu_rdma_get_fd(void *opaque) {
+    QEMUFileRDMA *rfile = opaque;
+    RDMAContext *rdma = rfile->rdma;
+
+    return rdma->comp_channel->fd;
+}
+
+const QEMUFileOps rdma_read_ops = {
+    .get_buffer    = qemu_rdma_get_buffer,
+    .get_fd        = qemu_rdma_get_fd,
+    .close         = qemu_rdma_close,
+    .hook_ram_load = qemu_rdma_registration_handle,
+};
+
+const QEMUFileOps rdma_write_ops = {
+    .put_buffer           = qemu_rdma_put_buffer,
+    .close                = qemu_rdma_close,
+    .before_ram_iterate   = qemu_rdma_registration_start,
+    .after_ram_iterate    = qemu_rdma_registration_stop,
+    .save_page            = qemu_rdma_save_page,
+};
+
+static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+{
+    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
+
+    if (qemu_file_mode_is_not_valid(mode)) {
+        return NULL;
+    }
+
+    r->rdma = rdma;
+
+    if (mode[0] == 'w') {
+        r->file = qemu_fopen_ops(r, &rdma_write_ops);
+    } else {
+        r->file = qemu_fopen_ops(r, &rdma_read_ops);
+    }
+
+    return r->file;
+}
+
+static void rdma_accept_incoming_migration(void *opaque)
+{
+    RDMAContext *rdma = opaque;
+    int ret;
+    QEMUFile *f;
+
+    DPRINTF("Accepting rdma connection...\n");
+    ret = qemu_rdma_accept(rdma);
+
+    if (ret) {
+        fprintf(stderr, "RDMA Migration initialization failed!\n");
+        goto err;
+    }
+
+    DPRINTF("Accepted migration\n");
+
+    f = qemu_fopen_rdma(rdma, "rb");
+    if (f == NULL) {
+        fprintf(stderr, "could not qemu_fopen_rdma!\n");
+        goto err;
+    }
+
+    rdma->migration_started_on_destination = 1;
+    process_incoming_migration(f);
+    return;
+
+err:
+    qemu_rdma_cleanup(rdma);
+}
+
+void rdma_start_incoming_migration(const char *host_port, Error **errp)
+{
+    int ret;
+    RDMAContext *rdma;
+
+    DPRINTF("Starting RDMA-based incoming migration\n");
+    rdma = qemu_rdma_data_init(host_port);
+    if (rdma == NULL) {
+        goto err;
+    }
+
+    ret = qemu_rdma_dest_init(rdma);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_dest_init success\n");
+    ret = qemu_rdma_dest_prepare(rdma);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_dest_prepare success\n");
+
+    qemu_set_fd_handler2(rdma->channel->fd, NULL,
+                         rdma_accept_incoming_migration, NULL,
+                            (void *)(intptr_t) rdma);
+    return;
+err:
+    error_setg(errp, "error connecting using rdma!\n");
+
+    g_free(rdma);
+}
+
+void rdma_start_outgoing_migration(void *opaque,
+                            const char *host_port, Error **errp)
+{
+    MigrationState *s = opaque;
+    RDMAContext *rdma = qemu_rdma_data_init(host_port);
+    int ret;
+
+    if (rdma == NULL) {
+        goto err;
+    }
+
+    ret = qemu_rdma_source_init(rdma, NULL,
+        s->enabled_capabilities[MIGRATION_CAPABILITY_X_CHUNK_REGISTER_DESTINATION]);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_source_init success\n");
+    ret = qemu_rdma_connect(rdma, NULL);
+
+    if (ret) {
+        goto err;
+    }
+
+    DPRINTF("qemu_rdma_source_connect success\n");
+
+    s->file = qemu_fopen_rdma(rdma, "wb");
+    migrate_fd_connect(s);
+    return;
+err:
+    g_free(rdma);
+    migrate_fd_error(s);
+    error_setg(errp, "Error connecting using rdma! %d\n", ret);
+}
diff --git a/migration.c b/migration.c
index 5afd9b8..2f33914 100644
--- a/migration.c
+++ b/migration.c
@@ -78,6 +78,10 @@  void qemu_start_incoming_migration(const char *uri, Error **errp)
 
     if (strstart(uri, "tcp:", &p))
         tcp_start_incoming_migration(p, errp);
+#ifdef CONFIG_RDMA
+    else if (strstart(uri, "x-rdma:", &p))
+        rdma_start_incoming_migration(p, errp);
+#endif
 #if !defined(WIN32)
     else if (strstart(uri, "exec:", &p))
         exec_start_incoming_migration(p, errp);
@@ -406,6 +410,10 @@  void qmp_migrate(const char *uri, bool has_blk, bool blk,
 
     if (strstart(uri, "tcp:", &p)) {
         tcp_start_outgoing_migration(s, p, &local_err);
+#ifdef CONFIG_RDMA
+    } else if (strstart(uri, "x-rdma:", &p)) {
+        rdma_start_outgoing_migration(s, p, &local_err);
+#endif
 #if !defined(WIN32)
     } else if (strstart(uri, "exec:", &p)) {
         exec_start_outgoing_migration(s, p, &local_err);