diff mbox series

[v2,4/5] migration: implement bi-directional RDMA QIOChannel

Message ID 1524666934-8064-5-git-send-email-lidongchen@tencent.com
State New
Headers show
Series Enable postcopy RDMA live migration | expand

Commit Message

858585 jemmy April 25, 2018, 2:35 p.m. UTC
This patch implements bi-directional RDMA QIOChannel. Because different
threads may access RDMAQIOChannel concurrently, this patch use RCU to protect it.

Signed-off-by: Lidong Chen <lidongchen@tencent.com>
---
 migration/rdma.c | 162 +++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 146 insertions(+), 16 deletions(-)

Comments

Dr. David Alan Gilbert April 26, 2018, 5:36 p.m. UTC | #1
* Lidong Chen (jemmy858585@gmail.com) wrote:
> This patch implements bi-directional RDMA QIOChannel. Because different
> threads may access RDMAQIOChannel concurrently, this patch use RCU to protect it.
> 
> Signed-off-by: Lidong Chen <lidongchen@tencent.com>

I'm a bit confused by this.

I can see it's adding RCU to protect the rdma structures against
deletion from multiple threads; that I'm OK with in principal; is that
the only locking we need? (I guess the two directions are actually
separate RDMAContext's so maybe).

But is there nothing else to make the QIOChannel bidirectional?

Also, a lot seems dependent on listen_id, can you explain how that's
being used.

Finally, I don't think you have anywhere that destroys the new mutex you
added.

Dave
P.S. Please cc Daniel Berrange on this series, since it's so much
IOChannel stuff.

> ---
>  migration/rdma.c | 162 +++++++++++++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 146 insertions(+), 16 deletions(-)
> 
> diff --git a/migration/rdma.c b/migration/rdma.c
> index f5c1d02..0652224 100644
> --- a/migration/rdma.c
> +++ b/migration/rdma.c
> @@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
>                                  " to abort!"); \
>                  rdma->error_reported = 1; \
>              } \
> +            rcu_read_unlock(); \
>              return rdma->error_state; \
>          } \
>      } while (0)
> @@ -405,6 +406,7 @@ struct QIOChannelRDMA {
>      RDMAContext *rdma;
>      QEMUFile *file;
>      bool blocking; /* XXX we don't actually honour this yet */
> +    QemuMutex lock;
>  };
>  
>  /*
> @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>      QEMUFile *f = rioc->file;
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      int ret;
>      ssize_t done = 0;
>      size_t i;
>      size_t len = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
> +    if (rdma->listen_id) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      /*
> @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>      ret = qemu_rdma_write_flush(f, rdma);
>      if (ret < 0) {
>          rdma->error_state = ret;
> +        rcu_read_unlock();
>          return ret;
>      }
>  
> @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>  
>              if (ret < 0) {
>                  rdma->error_state = ret;
> +                rcu_read_unlock();
>                  return ret;
>              }
>  
> @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>          }
>      }
>  
> +    rcu_read_unlock();
>      return done;
>  }
>  
> @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>                                        Error **errp)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      RDMAControlHeader head;
>      int ret = 0;
>      ssize_t i;
>      size_t done = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
> +    if (!rdma->listen_id) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      for (i = 0; i < niov; i++) {
> @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>           * were given and dish out the bytes until we run
>           * out of bytes.
>           */
> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>          done += ret;
>          want -= ret;
>          /* Got what we needed, so go to next iovec */
> @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>  
>          if (ret < 0) {
>              rdma->error_state = ret;
> +            rcu_read_unlock();
>              return ret;
>          }
>  
>          /*
>           * SEND was received with new bytes, now try again.
>           */
> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>          done += ret;
>          want -= ret;
>  
>          /* Still didn't get enough, so lets just return */
>          if (want) {
>              if (done == 0) {
> +                rcu_read_unlock();
>                  return QIO_CHANNEL_ERR_BLOCK;
>              } else {
>                  break;
>              }
>          }
>      }
> +    rcu_read_unlock();
>      return done;
>  }
>  
> @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source,
>      GIOCondition cond = 0;
>      *timeout = -1;
>  
> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        error_report("RDMAContext is NULL when prepare Gsource");
> +        return FALSE;
> +    }
> +
>      if (rdma->wr_data[0].control_len) {
>          cond |= G_IO_IN;
>      }
> @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source)
>      RDMAContext *rdma = rsource->rioc->rdma;
>      GIOCondition cond = 0;
>  
> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        error_report("RDMAContext is NULL when check Gsource");
> +        return FALSE;
> +    }
> +
>      if (rdma->wr_data[0].control_len) {
>          cond |= G_IO_IN;
>      }
> @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source,
>      RDMAContext *rdma = rsource->rioc->rdma;
>      GIOCondition cond = 0;
>  
> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
> +        rdma = rdma->return_path;
> +    }
> +
> +    if (!rdma) {
> +        error_report("RDMAContext is NULL when dispatch Gsource");
> +        return FALSE;
> +    }
> +
>      if (rdma->wr_data[0].control_len) {
>          cond |= G_IO_IN;
>      }
> @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
>                                    Error **errp)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +    RDMAContext *rdma;
>      trace_qemu_rdma_close();
> -    if (rioc->rdma) {
> -        if (!rioc->rdma->error_state) {
> -            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
> -        }
> -        qemu_rdma_cleanup(rioc->rdma);
> -        g_free(rioc->rdma);
> -        rioc->rdma = NULL;
> +
> +    qemu_mutex_lock(&rioc->lock);
> +    rdma = rioc->rdma;
> +    if (!rdma) {
> +        qemu_mutex_unlock(&rioc->lock);
> +        return 0;
> +    }
> +    atomic_rcu_set(&rioc->rdma, NULL);
> +    qemu_mutex_unlock(&rioc->lock);
> +
> +    if (!rdma->error_state) {
> +        rdma->error_state = qemu_file_get_error(rioc->file);
> +    }
> +    qemu_rdma_cleanup(rdma);
> +
> +    if (rdma->return_path) {
> +        qemu_rdma_cleanup(rdma->return_path);
> +        g_free(rdma->return_path);
>      }
> +
> +    g_free(rdma);
>      return 0;
>  }
>  
> @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
>                                    size_t size, uint64_t *bytes_sent)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      int ret;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
> +        rcu_read_unlock();
>          return RAM_SAVE_CONTROL_NOT_SUPP;
>      }
>  
> @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
>          }
>      }
>  
> +    rcu_read_unlock();
>      return RAM_SAVE_CONTROL_DELAYED;
>  err:
>      rdma->error_state = ret;
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
>      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>                                   .repeat = 1 };
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> -    RDMALocalBlocks *local = &rdma->local_ram_blocks;
> +    RDMAContext *rdma;
> +    RDMALocalBlocks *local;
>      RDMAControlHeader head;
>      RDMARegister *reg, *registers;
>      RDMACompress *comp;
> @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
>      int count = 0;
>      int i = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
> +    local = &rdma->local_ram_blocks;
>      do {
>          trace_qemu_rdma_registration_handle_wait();
>  
> @@ -3469,6 +3575,7 @@ out:
>      if (ret < 0) {
>          rdma->error_state = ret;
>      }
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>                                          uint64_t flags, void *data)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
> +
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
>  
>      CHECK_ERROR_STATE();
>  
>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
> +        rcu_read_unlock();
>          return 0;
>      }
>  
> @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>      qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
>      qemu_fflush(f);
>  
> +    rcu_read_unlock();
>      return 0;
>  }
>  
> @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>  {
>      Error *local_err = NULL, **errp = &local_err;
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
> -    RDMAContext *rdma = rioc->rdma;
> +    RDMAContext *rdma;
>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>      int ret = 0;
>  
> +    rcu_read_lock();
> +    rdma = atomic_rcu_read(&rioc->rdma);
> +    if (!rdma) {
> +        rcu_read_unlock();
> +        return -EIO;
> +    }
> +
>      CHECK_ERROR_STATE();
>  
>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
> +        rcu_read_unlock();
>          return 0;
>      }
>  
> @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>                      qemu_rdma_reg_whole_ram_blocks : NULL);
>          if (ret < 0) {
>              ERROR(errp, "receiving remote info!");
> +            rcu_read_unlock();
>              return ret;
>          }
>  
> @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>                          "not identical on both the source and destination.",
>                          local->nb_blocks, nb_dest_blocks);
>              rdma->error_state = -EINVAL;
> +            rcu_read_unlock();
>              return -EINVAL;
>          }
>  
> @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>                              local->block[i].length,
>                              rdma->dest_blocks[i].length);
>                  rdma->error_state = -EINVAL;
> +                rcu_read_unlock();
>                  return -EINVAL;
>              }
>              local->block[i].remote_host_addr =
> @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>          goto err;
>      }
>  
> +    rcu_read_unlock();
>      return 0;
>  err:
>      rdma->error_state = ret;
> +    rcu_read_unlock();
>      return ret;
>  }
>  
> @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
>  
>      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>      rioc->rdma = rdma;
> +    qemu_mutex_init(&rioc->lock);
>  
>      if (mode[0] == 'w') {
>          rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
> -- 
> 1.8.3.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
858585 jemmy April 27, 2018, 7:56 a.m. UTC | #2
On Fri, Apr 27, 2018 at 1:36 AM, Dr. David Alan Gilbert
<dgilbert@redhat.com> wrote:
> * Lidong Chen (jemmy858585@gmail.com) wrote:
>> This patch implements bi-directional RDMA QIOChannel. Because different
>> threads may access RDMAQIOChannel concurrently, this patch use RCU to protect it.
>>
>> Signed-off-by: Lidong Chen <lidongchen@tencent.com>
>
> I'm a bit confused by this.
>
> I can see it's adding RCU to protect the rdma structures against
> deletion from multiple threads; that I'm OK with in principal; is that
> the only locking we need? (I guess the two directions are actually
> separate RDMAContext's so maybe).

The qio_channel_rdma_close maybe invoked by migration thread and
return path thread
concurrently, so I use a mutex to protect it.

If one thread invoke qio_channel_rdma_writev, another thread invokes
qio_channel_rdma_readv,
two threads will use separate RDMAContext, so it does not need a lock.

If two threads invoke qio_channel_rdma_writev concurrently, it will
need a lock to protect.
but I find source qemu migration thread only invoke
qio_channel_rdma_writev, the return path
thread only invokes qio_channel_rdma_readv.

The destination qemu only invoked qio_channel_rdma_readv by main
thread before postcopy and or
listen thread after postcopy.

The destination qemu have already protected it by using
qemu_mutex_lock(&mis->rp_mutex) when writing data to
source qemu.

But should we use qemu_mutex_lock to protect qio_channel_rdma_writev
and qio_channel_rdma_readv?
to avoid some change in future invoke qio_channel_rdma_writev or
qio_channel_rdma_readv concurrently?

>
> But is there nothing else to make the QIOChannel bidirectional?
>
> Also, a lot seems dependent on listen_id, can you explain how that's
> being used.

The destination qemu is server side, so listen_id is not zero. the
source qemu is client side,
the listen_id is zero.
I use listen_id to determine whether qemu is destination or source.

for the destination qemu, if write data to source, it need use the
return_path rdma, like this:
    if (rdma->listen_id) {
        rdma = rdma->return_path;
    }

for the source qemu, if read data from destination, it also need use
the return_path rdma.
    if (!rdma->listen_id) {
        rdma = rdma->return_path;
    }

>
> Finally, I don't think you have anywhere that destroys the new mutex you
> added.
I will fix this next version.

>
> Dave
> P.S. Please cc Daniel Berrange on this series, since it's so much
> IOChannel stuff.
>
>> ---
>>  migration/rdma.c | 162 +++++++++++++++++++++++++++++++++++++++++++++++++------
>>  1 file changed, 146 insertions(+), 16 deletions(-)
>>
>> diff --git a/migration/rdma.c b/migration/rdma.c
>> index f5c1d02..0652224 100644
>> --- a/migration/rdma.c
>> +++ b/migration/rdma.c
>> @@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
>>                                  " to abort!"); \
>>                  rdma->error_reported = 1; \
>>              } \
>> +            rcu_read_unlock(); \
>>              return rdma->error_state; \
>>          } \
>>      } while (0)
>> @@ -405,6 +406,7 @@ struct QIOChannelRDMA {
>>      RDMAContext *rdma;
>>      QEMUFile *file;
>>      bool blocking; /* XXX we don't actually honour this yet */
>> +    QemuMutex lock;
>>  };
>>
>>  /*
>> @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>>      QEMUFile *f = rioc->file;
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      int ret;
>>      ssize_t done = 0;
>>      size_t i;
>>      size_t len = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>> +    if (rdma->listen_id) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      /*
>> @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>      ret = qemu_rdma_write_flush(f, rdma);
>>      if (ret < 0) {
>>          rdma->error_state = ret;
>> +        rcu_read_unlock();
>>          return ret;
>>      }
>>
>> @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>
>>              if (ret < 0) {
>>                  rdma->error_state = ret;
>> +                rcu_read_unlock();
>>                  return ret;
>>              }
>>
>> @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
>>          }
>>      }
>>
>> +    rcu_read_unlock();
>>      return done;
>>  }
>>
>> @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>>                                        Error **errp)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      RDMAControlHeader head;
>>      int ret = 0;
>>      ssize_t i;
>>      size_t done = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>> +    if (!rdma->listen_id) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      for (i = 0; i < niov; i++) {
>> @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>>           * were given and dish out the bytes until we run
>>           * out of bytes.
>>           */
>> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
>> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>>          done += ret;
>>          want -= ret;
>>          /* Got what we needed, so go to next iovec */
>> @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
>>
>>          if (ret < 0) {
>>              rdma->error_state = ret;
>> +            rcu_read_unlock();
>>              return ret;
>>          }
>>
>>          /*
>>           * SEND was received with new bytes, now try again.
>>           */
>> -        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
>> +        ret = qemu_rdma_fill(rdma, data, want, 0);
>>          done += ret;
>>          want -= ret;
>>
>>          /* Still didn't get enough, so lets just return */
>>          if (want) {
>>              if (done == 0) {
>> +                rcu_read_unlock();
>>                  return QIO_CHANNEL_ERR_BLOCK;
>>              } else {
>>                  break;
>>              }
>>          }
>>      }
>> +    rcu_read_unlock();
>>      return done;
>>  }
>>
>> @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source,
>>      GIOCondition cond = 0;
>>      *timeout = -1;
>>
>> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        error_report("RDMAContext is NULL when prepare Gsource");
>> +        return FALSE;
>> +    }
>> +
>>      if (rdma->wr_data[0].control_len) {
>>          cond |= G_IO_IN;
>>      }
>> @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source)
>>      RDMAContext *rdma = rsource->rioc->rdma;
>>      GIOCondition cond = 0;
>>
>> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        error_report("RDMAContext is NULL when check Gsource");
>> +        return FALSE;
>> +    }
>> +
>>      if (rdma->wr_data[0].control_len) {
>>          cond |= G_IO_IN;
>>      }
>> @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source,
>>      RDMAContext *rdma = rsource->rioc->rdma;
>>      GIOCondition cond = 0;
>>
>> +    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
>> +       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
>> +        rdma = rdma->return_path;
>> +    }
>> +
>> +    if (!rdma) {
>> +        error_report("RDMAContext is NULL when dispatch Gsource");
>> +        return FALSE;
>> +    }
>> +
>>      if (rdma->wr_data[0].control_len) {
>>          cond |= G_IO_IN;
>>      }
>> @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
>>                                    Error **errp)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>> +    RDMAContext *rdma;
>>      trace_qemu_rdma_close();
>> -    if (rioc->rdma) {
>> -        if (!rioc->rdma->error_state) {
>> -            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
>> -        }
>> -        qemu_rdma_cleanup(rioc->rdma);
>> -        g_free(rioc->rdma);
>> -        rioc->rdma = NULL;
>> +
>> +    qemu_mutex_lock(&rioc->lock);
>> +    rdma = rioc->rdma;
>> +    if (!rdma) {
>> +        qemu_mutex_unlock(&rioc->lock);
>> +        return 0;
>> +    }
>> +    atomic_rcu_set(&rioc->rdma, NULL);
>> +    qemu_mutex_unlock(&rioc->lock);
>> +
>> +    if (!rdma->error_state) {
>> +        rdma->error_state = qemu_file_get_error(rioc->file);
>> +    }
>> +    qemu_rdma_cleanup(rdma);
>> +
>> +    if (rdma->return_path) {
>> +        qemu_rdma_cleanup(rdma->return_path);
>> +        g_free(rdma->return_path);
>>      }
>> +
>> +    g_free(rdma);
>>      return 0;
>>  }
>>
>> @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
>>                                    size_t size, uint64_t *bytes_sent)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      int ret;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> +        rcu_read_unlock();
>>          return RAM_SAVE_CONTROL_NOT_SUPP;
>>      }
>>
>> @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
>>          }
>>      }
>>
>> +    rcu_read_unlock();
>>      return RAM_SAVE_CONTROL_DELAYED;
>>  err:
>>      rdma->error_state = ret;
>> +    rcu_read_unlock();
>>      return ret;
>>  }
>>
>> @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
>>      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
>>                                   .repeat = 1 };
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> -    RDMALocalBlocks *local = &rdma->local_ram_blocks;
>> +    RDMAContext *rdma;
>> +    RDMALocalBlocks *local;
>>      RDMAControlHeader head;
>>      RDMARegister *reg, *registers;
>>      RDMACompress *comp;
>> @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
>>      int count = 0;
>>      int i = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>> +    local = &rdma->local_ram_blocks;
>>      do {
>>          trace_qemu_rdma_registration_handle_wait();
>>
>> @@ -3469,6 +3575,7 @@ out:
>>      if (ret < 0) {
>>          rdma->error_state = ret;
>>      }
>> +    rcu_read_unlock();
>>      return ret;
>>  }
>>
>> @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>>                                          uint64_t flags, void *data)
>>  {
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>> +
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>>
>>      CHECK_ERROR_STATE();
>>
>>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> +        rcu_read_unlock();
>>          return 0;
>>      }
>>
>> @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>>      qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
>>      qemu_fflush(f);
>>
>> +    rcu_read_unlock();
>>      return 0;
>>  }
>>
>> @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>  {
>>      Error *local_err = NULL, **errp = &local_err;
>>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
>> -    RDMAContext *rdma = rioc->rdma;
>> +    RDMAContext *rdma;
>>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>>      int ret = 0;
>>
>> +    rcu_read_lock();
>> +    rdma = atomic_rcu_read(&rioc->rdma);
>> +    if (!rdma) {
>> +        rcu_read_unlock();
>> +        return -EIO;
>> +    }
>> +
>>      CHECK_ERROR_STATE();
>>
>>      if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
>> +        rcu_read_unlock();
>>          return 0;
>>      }
>>
>> @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>                      qemu_rdma_reg_whole_ram_blocks : NULL);
>>          if (ret < 0) {
>>              ERROR(errp, "receiving remote info!");
>> +            rcu_read_unlock();
>>              return ret;
>>          }
>>
>> @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>                          "not identical on both the source and destination.",
>>                          local->nb_blocks, nb_dest_blocks);
>>              rdma->error_state = -EINVAL;
>> +            rcu_read_unlock();
>>              return -EINVAL;
>>          }
>>
>> @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>                              local->block[i].length,
>>                              rdma->dest_blocks[i].length);
>>                  rdma->error_state = -EINVAL;
>> +                rcu_read_unlock();
>>                  return -EINVAL;
>>              }
>>              local->block[i].remote_host_addr =
>> @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>          goto err;
>>      }
>>
>> +    rcu_read_unlock();
>>      return 0;
>>  err:
>>      rdma->error_state = ret;
>> +    rcu_read_unlock();
>>      return ret;
>>  }
>>
>> @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
>>
>>      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>>      rioc->rdma = rdma;
>> +    qemu_mutex_init(&rioc->lock);
>>
>>      if (mode[0] == 'w') {
>>          rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
>> --
>> 1.8.3.1
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Daniel P. Berrangé April 27, 2018, 9:16 a.m. UTC | #3
On Fri, Apr 27, 2018 at 03:56:38PM +0800, 858585 jemmy wrote:
> On Fri, Apr 27, 2018 at 1:36 AM, Dr. David Alan Gilbert
> <dgilbert@redhat.com> wrote:
> > * Lidong Chen (jemmy858585@gmail.com) wrote:
> >> This patch implements bi-directional RDMA QIOChannel. Because different
> >> threads may access RDMAQIOChannel concurrently, this patch use RCU to protect it.
> >>
> >> Signed-off-by: Lidong Chen <lidongchen@tencent.com>
> >
> > I'm a bit confused by this.
> >
> > I can see it's adding RCU to protect the rdma structures against
> > deletion from multiple threads; that I'm OK with in principal; is that
> > the only locking we need? (I guess the two directions are actually
> > separate RDMAContext's so maybe).
> 
> The qio_channel_rdma_close maybe invoked by migration thread and
> return path thread
> concurrently, so I use a mutex to protect it.

Hmm, that is not good - concurrent threads calling close must not be
allowed to happen even with non-RDMA I/O chanels.

For example, with the QIOChannelSocket, one thread can call close
which sets the fd = -1, another thread can race with this and either
end up calling close again on the same FD or calling close on -1.
Either way the second thread will get an error from close() when
it should have skipped the close() and returned success. Perhaps
migration gets lucky and this doesn't result in it being marked
as failed, but it is still not good.

So only one thread should be calling close().

> If one thread invoke qio_channel_rdma_writev, another thread invokes
> qio_channel_rdma_readv,
> two threads will use separate RDMAContext, so it does not need a lock.
> 
> If two threads invoke qio_channel_rdma_writev concurrently, it will
> need a lock to protect.
> but I find source qemu migration thread only invoke
> qio_channel_rdma_writev, the return path
> thread only invokes qio_channel_rdma_readv.

QIOChannel impls are only intended to cope with a single thread doing
I/O in each direction. If you have two threads needing to read, or
two threads needing to write, the layer above should provide locking
to ensure correct ordering  of I/O oprations.

> The destination qemu only invoked qio_channel_rdma_readv by main
> thread before postcopy and or
> listen thread after postcopy.
> 
> The destination qemu have already protected it by using
> qemu_mutex_lock(&mis->rp_mutex) when writing data to
> source qemu.
> 
> But should we use qemu_mutex_lock to protect qio_channel_rdma_writev
> and qio_channel_rdma_readv?
> to avoid some change in future invoke qio_channel_rdma_writev or
> qio_channel_rdma_readv concurrently?

> 
> >
> > But is there nothing else to make the QIOChannel bidirectional?
> >
> > Also, a lot seems dependent on listen_id, can you explain how that's
> > being used.
> 
> The destination qemu is server side, so listen_id is not zero. the
> source qemu is client side,
> the listen_id is zero.
> I use listen_id to determine whether qemu is destination or source.
> 
> for the destination qemu, if write data to source, it need use the
> return_path rdma, like this:
>     if (rdma->listen_id) {
>         rdma = rdma->return_path;
>     }
> 
> for the source qemu, if read data from destination, it also need use
> the return_path rdma.
>     if (!rdma->listen_id) {
>         rdma = rdma->return_path;
>     }

This feels uncessarily complex to me. Why not just change QIOCHannelRMDA
struct, so that it has 2 RDMA context pointers eg

struct QIOChannelRDMA {
    QIOChannel parent;
    RDMAContext *rdmain;
    RDMAContext *rdmaout;
    QEMUFile *file;
    bool blocking; /* XXX we don't actually honour this yet */
};



Regards,
Daniel
858585 jemmy April 28, 2018, 4:16 a.m. UTC | #4
On Fri, Apr 27, 2018 at 5:16 PM, Daniel P. Berrangé <berrange@redhat.com> wrote:
> On Fri, Apr 27, 2018 at 03:56:38PM +0800, 858585 jemmy wrote:
>> On Fri, Apr 27, 2018 at 1:36 AM, Dr. David Alan Gilbert
>> <dgilbert@redhat.com> wrote:
>> > * Lidong Chen (jemmy858585@gmail.com) wrote:
>> >> This patch implements bi-directional RDMA QIOChannel. Because different
>> >> threads may access RDMAQIOChannel concurrently, this patch use RCU to protect it.
>> >>
>> >> Signed-off-by: Lidong Chen <lidongchen@tencent.com>
>> >
>> > I'm a bit confused by this.
>> >
>> > I can see it's adding RCU to protect the rdma structures against
>> > deletion from multiple threads; that I'm OK with in principal; is that
>> > the only locking we need? (I guess the two directions are actually
>> > separate RDMAContext's so maybe).
>>
>> The qio_channel_rdma_close maybe invoked by migration thread and
>> return path thread
>> concurrently, so I use a mutex to protect it.
>
> Hmm, that is not good - concurrent threads calling close must not be
> allowed to happen even with non-RDMA I/O chanels.
>
> For example, with the QIOChannelSocket, one thread can call close
> which sets the fd = -1, another thread can race with this and either
> end up calling close again on the same FD or calling close on -1.
> Either way the second thread will get an error from close() when
> it should have skipped the close() and returned success. Perhaps
> migration gets lucky and this doesn't result in it being marked
> as failed, but it is still not good.
>
> So only one thread should be calling close().

for live migration, source qemu invokes qemu_fclose in different
threads, include main thread,
migration thread, return path thread.
destination qemu invokes qemu_fclose in main thread, listen thread and
COLO incoming thread.

so I prefer to add a lock for QEMUFile struct, like this:

int qemu_fclose(QEMUFile *f)
{
    int ret;
    qemu_fflush(f);
    ret = qemu_file_get_error(f);

    if (f->ops->close) {
+        qemu_mutex_lock(&f->lock);
        int ret2 = f->ops->close(f->opaque);
+       qemu_mutex_unlock(&f->lock);
        if (ret >= 0) {
            ret = ret2;
        }
    }
    /* If any error was spotted before closing, we should report it
     * instead of the close() return value.
     */
    if (f->last_error) {
        ret = f->last_error;
    }
    g_free(f);
    trace_qemu_file_fclose();
    return ret;
}

Any suggestion?

>
>> If one thread invoke qio_channel_rdma_writev, another thread invokes
>> qio_channel_rdma_readv,
>> two threads will use separate RDMAContext, so it does not need a lock.
>>
>> If two threads invoke qio_channel_rdma_writev concurrently, it will
>> need a lock to protect.
>> but I find source qemu migration thread only invoke
>> qio_channel_rdma_writev, the return path
>> thread only invokes qio_channel_rdma_readv.
>
> QIOChannel impls are only intended to cope with a single thread doing
> I/O in each direction. If you have two threads needing to read, or
> two threads needing to write, the layer above should provide locking
> to ensure correct ordering  of I/O oprations.

yes, so I think RCU is enough, we do not need more lock.

>
>> The destination qemu only invoked qio_channel_rdma_readv by main
>> thread before postcopy and or
>> listen thread after postcopy.
>>
>> The destination qemu have already protected it by using
>> qemu_mutex_lock(&mis->rp_mutex) when writing data to
>> source qemu.
>>
>> But should we use qemu_mutex_lock to protect qio_channel_rdma_writev
>> and qio_channel_rdma_readv?
>> to avoid some change in future invoke qio_channel_rdma_writev or
>> qio_channel_rdma_readv concurrently?
>
>>
>> >
>> > But is there nothing else to make the QIOChannel bidirectional?
>> >
>> > Also, a lot seems dependent on listen_id, can you explain how that's
>> > being used.
>>
>> The destination qemu is server side, so listen_id is not zero. the
>> source qemu is client side,
>> the listen_id is zero.
>> I use listen_id to determine whether qemu is destination or source.
>>
>> for the destination qemu, if write data to source, it need use the
>> return_path rdma, like this:
>>     if (rdma->listen_id) {
>>         rdma = rdma->return_path;
>>     }
>>
>> for the source qemu, if read data from destination, it also need use
>> the return_path rdma.
>>     if (!rdma->listen_id) {
>>         rdma = rdma->return_path;
>>     }
>
> This feels uncessarily complex to me. Why not just change QIOCHannelRMDA
> struct, so that it has 2 RDMA context pointers eg
>
> struct QIOChannelRDMA {
>     QIOChannel parent;
>     RDMAContext *rdmain;
>     RDMAContext *rdmaout;
>     QEMUFile *file;
>     bool blocking; /* XXX we don't actually honour this yet */
> };

The reason is the parameter of some function is RDMAContext.
like qemu_rdma_accept, rdma_start_outgoing_migration.
It's easier to implement return path.

so I should add some comment to the code.

>
>
>
> Regards,
> Daniel
> --
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
Daniel P. Berrangé April 30, 2018, 9:18 a.m. UTC | #5
On Sat, Apr 28, 2018 at 12:16:36PM +0800, 858585 jemmy wrote:
> On Fri, Apr 27, 2018 at 5:16 PM, Daniel P. Berrangé <berrange@redhat.com> wrote:
> so I prefer to add a lock for QEMUFile struct, like this:
> 
> int qemu_fclose(QEMUFile *f)
> {
>     int ret;
>     qemu_fflush(f);
>     ret = qemu_file_get_error(f);
> 
>     if (f->ops->close) {
> +        qemu_mutex_lock(&f->lock);
>         int ret2 = f->ops->close(f->opaque);
> +       qemu_mutex_unlock(&f->lock);
>         if (ret >= 0) {
>             ret = ret2;
>         }
>     }

That looks ok


> >> > But is there nothing else to make the QIOChannel bidirectional?
> >> >
> >> > Also, a lot seems dependent on listen_id, can you explain how that's
> >> > being used.
> >>
> >> The destination qemu is server side, so listen_id is not zero. the
> >> source qemu is client side,
> >> the listen_id is zero.
> >> I use listen_id to determine whether qemu is destination or source.
> >>
> >> for the destination qemu, if write data to source, it need use the
> >> return_path rdma, like this:
> >>     if (rdma->listen_id) {
> >>         rdma = rdma->return_path;
> >>     }
> >>
> >> for the source qemu, if read data from destination, it also need use
> >> the return_path rdma.
> >>     if (!rdma->listen_id) {
> >>         rdma = rdma->return_path;
> >>     }
> >
> > This feels uncessarily complex to me. Why not just change QIOCHannelRMDA
> > struct, so that it has 2 RDMA context pointers eg
> >
> > struct QIOChannelRDMA {
> >     QIOChannel parent;
> >     RDMAContext *rdmain;
> >     RDMAContext *rdmaout;
> >     QEMUFile *file;
> >     bool blocking; /* XXX we don't actually honour this yet */
> > };
> 
> The reason is the parameter of some function is RDMAContext.
> like qemu_rdma_accept, rdma_start_outgoing_migration.
> It's easier to implement return path.

I don't see that as an issue - qemu_fopen_rdma() function should be able
to setup a pair of RDMAContext pointers when it creates the QIOChannelRDMA
struct. It just has to request the return path from the single RDMAContext
it gets passed, and then set that on the QIOChannelRDMA struct.

Regards,
Daniel
diff mbox series

Patch

diff --git a/migration/rdma.c b/migration/rdma.c
index f5c1d02..0652224 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -86,6 +86,7 @@  static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
                                 " to abort!"); \
                 rdma->error_reported = 1; \
             } \
+            rcu_read_unlock(); \
             return rdma->error_state; \
         } \
     } while (0)
@@ -405,6 +406,7 @@  struct QIOChannelRDMA {
     RDMAContext *rdma;
     QEMUFile *file;
     bool blocking; /* XXX we don't actually honour this yet */
+    QemuMutex lock;
 };
 
 /*
@@ -2635,12 +2637,29 @@  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     QEMUFile *f = rioc->file;
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     int ret;
     ssize_t done = 0;
     size_t i;
     size_t len = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
+    if (rdma->listen_id) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     /*
@@ -2650,6 +2669,7 @@  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
     ret = qemu_rdma_write_flush(f, rdma);
     if (ret < 0) {
         rdma->error_state = ret;
+        rcu_read_unlock();
         return ret;
     }
 
@@ -2669,6 +2689,7 @@  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
 
             if (ret < 0) {
                 rdma->error_state = ret;
+                rcu_read_unlock();
                 return ret;
             }
 
@@ -2677,6 +2698,7 @@  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
         }
     }
 
+    rcu_read_unlock();
     return done;
 }
 
@@ -2710,12 +2732,29 @@  static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
                                       Error **errp)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     RDMAControlHeader head;
     int ret = 0;
     ssize_t i;
     size_t done = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
+    if (!rdma->listen_id) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     for (i = 0; i < niov; i++) {
@@ -2727,7 +2766,7 @@  static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
          * were given and dish out the bytes until we run
          * out of bytes.
          */
-        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        ret = qemu_rdma_fill(rdma, data, want, 0);
         done += ret;
         want -= ret;
         /* Got what we needed, so go to next iovec */
@@ -2749,25 +2788,28 @@  static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
 
         if (ret < 0) {
             rdma->error_state = ret;
+            rcu_read_unlock();
             return ret;
         }
 
         /*
          * SEND was received with new bytes, now try again.
          */
-        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        ret = qemu_rdma_fill(rdma, data, want, 0);
         done += ret;
         want -= ret;
 
         /* Still didn't get enough, so lets just return */
         if (want) {
             if (done == 0) {
+                rcu_read_unlock();
                 return QIO_CHANNEL_ERR_BLOCK;
             } else {
                 break;
             }
         }
     }
+    rcu_read_unlock();
     return done;
 }
 
@@ -2823,6 +2865,16 @@  qio_channel_rdma_source_prepare(GSource *source,
     GIOCondition cond = 0;
     *timeout = -1;
 
+    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
+       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        error_report("RDMAContext is NULL when prepare Gsource");
+        return FALSE;
+    }
+
     if (rdma->wr_data[0].control_len) {
         cond |= G_IO_IN;
     }
@@ -2838,6 +2890,16 @@  qio_channel_rdma_source_check(GSource *source)
     RDMAContext *rdma = rsource->rioc->rdma;
     GIOCondition cond = 0;
 
+    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
+       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        error_report("RDMAContext is NULL when check Gsource");
+        return FALSE;
+    }
+
     if (rdma->wr_data[0].control_len) {
         cond |= G_IO_IN;
     }
@@ -2856,6 +2918,16 @@  qio_channel_rdma_source_dispatch(GSource *source,
     RDMAContext *rdma = rsource->rioc->rdma;
     GIOCondition cond = 0;
 
+    if ((rdma->listen_id && rsource->condition == G_IO_OUT) ||
+       (!rdma->listen_id && rsource->condition == G_IO_IN)) {
+        rdma = rdma->return_path;
+    }
+
+    if (!rdma) {
+        error_report("RDMAContext is NULL when dispatch Gsource");
+        return FALSE;
+    }
+
     if (rdma->wr_data[0].control_len) {
         cond |= G_IO_IN;
     }
@@ -2905,15 +2977,29 @@  static int qio_channel_rdma_close(QIOChannel *ioc,
                                   Error **errp)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    RDMAContext *rdma;
     trace_qemu_rdma_close();
-    if (rioc->rdma) {
-        if (!rioc->rdma->error_state) {
-            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
-        }
-        qemu_rdma_cleanup(rioc->rdma);
-        g_free(rioc->rdma);
-        rioc->rdma = NULL;
+
+    qemu_mutex_lock(&rioc->lock);
+    rdma = rioc->rdma;
+    if (!rdma) {
+        qemu_mutex_unlock(&rioc->lock);
+        return 0;
+    }
+    atomic_rcu_set(&rioc->rdma, NULL);
+    qemu_mutex_unlock(&rioc->lock);
+
+    if (!rdma->error_state) {
+        rdma->error_state = qemu_file_get_error(rioc->file);
+    }
+    qemu_rdma_cleanup(rdma);
+
+    if (rdma->return_path) {
+        qemu_rdma_cleanup(rdma->return_path);
+        g_free(rdma->return_path);
     }
+
+    g_free(rdma);
     return 0;
 }
 
@@ -2956,12 +3042,21 @@  static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
                                   size_t size, uint64_t *bytes_sent)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     int ret;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        rcu_read_unlock();
         return RAM_SAVE_CONTROL_NOT_SUPP;
     }
 
@@ -3046,9 +3141,11 @@  static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
         }
     }
 
+    rcu_read_unlock();
     return RAM_SAVE_CONTROL_DELAYED;
 err:
     rdma->error_state = ret;
+    rcu_read_unlock();
     return ret;
 }
 
@@ -3224,8 +3321,8 @@  static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
                                  .repeat = 1 };
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
-    RDMALocalBlocks *local = &rdma->local_ram_blocks;
+    RDMAContext *rdma;
+    RDMALocalBlocks *local;
     RDMAControlHeader head;
     RDMARegister *reg, *registers;
     RDMACompress *comp;
@@ -3238,8 +3335,17 @@  static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
     int count = 0;
     int i = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
+    local = &rdma->local_ram_blocks;
     do {
         trace_qemu_rdma_registration_handle_wait();
 
@@ -3469,6 +3575,7 @@  out:
     if (ret < 0) {
         rdma->error_state = ret;
     }
+    rcu_read_unlock();
     return ret;
 }
 
@@ -3525,11 +3632,19 @@  static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
                                         uint64_t flags, void *data)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
+
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
 
     CHECK_ERROR_STATE();
 
     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        rcu_read_unlock();
         return 0;
     }
 
@@ -3537,6 +3652,7 @@  static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
     qemu_fflush(f);
 
+    rcu_read_unlock();
     return 0;
 }
 
@@ -3549,13 +3665,21 @@  static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
 {
     Error *local_err = NULL, **errp = &local_err;
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
-    RDMAContext *rdma = rioc->rdma;
+    RDMAContext *rdma;
     RDMAControlHeader head = { .len = 0, .repeat = 1 };
     int ret = 0;
 
+    rcu_read_lock();
+    rdma = atomic_rcu_read(&rioc->rdma);
+    if (!rdma) {
+        rcu_read_unlock();
+        return -EIO;
+    }
+
     CHECK_ERROR_STATE();
 
     if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        rcu_read_unlock();
         return 0;
     }
 
@@ -3587,6 +3711,7 @@  static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
                     qemu_rdma_reg_whole_ram_blocks : NULL);
         if (ret < 0) {
             ERROR(errp, "receiving remote info!");
+            rcu_read_unlock();
             return ret;
         }
 
@@ -3610,6 +3735,7 @@  static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
                         "not identical on both the source and destination.",
                         local->nb_blocks, nb_dest_blocks);
             rdma->error_state = -EINVAL;
+            rcu_read_unlock();
             return -EINVAL;
         }
 
@@ -3626,6 +3752,7 @@  static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
                             local->block[i].length,
                             rdma->dest_blocks[i].length);
                 rdma->error_state = -EINVAL;
+                rcu_read_unlock();
                 return -EINVAL;
             }
             local->block[i].remote_host_addr =
@@ -3643,9 +3770,11 @@  static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
         goto err;
     }
 
+    rcu_read_unlock();
     return 0;
 err:
     rdma->error_state = ret;
+    rcu_read_unlock();
     return ret;
 }
 
@@ -3707,6 +3836,7 @@  static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
 
     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
     rioc->rdma = rdma;
+    qemu_mutex_init(&rioc->lock);
 
     if (mode[0] == 'w') {
         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));