Patchwork Convert ram_list to RCU DQ V4,2

login
register
mail settings
Submitter Mike D. Day
Date Sept. 9, 2013, 3:13 p.m.
Message ID <1378739597-24681-1-git-send-email-ncmike@ncultra.org>
Download mbox | patch
Permalink /patch/273583/
State New
Headers show

Comments

Mike D. Day - Sept. 9, 2013, 3:13 p.m.
Changes from V4.1:

* Correct memory barriers for ram_list globals.

Changes from V4:

* rebased on https://github.com/bonzini/qemu/tree/rcu
  commit 965f3b2aac93bca6df50c86fb17a06b3c856fa30

Changes from V3:

* now passes virt-test -t qemu
* uses call_rcu instead of call_rcu1
* completely removed the ram_list mutex and its locking functions
* cleaned up some comments

Changes from V2:

* added rcu reclaim function to free ram blocks
* reduced the number of comments
* made rcu locking policy consistent for different caller scenarios
* rcu updates to ram_block now are protected only by the iothread mutex

Changes from V1:

* Omitted locks or rcu critical sections within Some functions that
  read or write the ram_list but are called in a protected context
  (the caller holds the iothread lock, the ram_list mutex, or an rcu
  critical section).

Allow "unlocked" reads of the ram_list by using an RCU-enabled
DQ. Most readers of the list no longer require holding the list mutex.

The ram_list now uses a QLIST instead of a QTAILQ. The difference is
minimal.

This patch has been built and make-checked for the x86_64, ppc64,
s390x, and arm targets. It has been virt-tested using KVM -t qemu and
passes 15/15 migration tests.

To apply this patch, you must base upon Paolo Bonzini's rcu tree and
also apply the RCU DQ patch (below).

https://github.com/bonzini/qemu/tree/rcu
http://article.gmane.org/gmane.comp.emulators.qemu/230159/

Signed-off-by: Mike Day <ncmike@ncultra.org>

Signed-off-by: Mike Day <ncmike@ncultra.org>
---
 arch_init.c            | 107 ++++++++++++++++++-------------
 exec.c                 | 167 +++++++++++++++++++++++++++----------------------
 include/exec/cpu-all.h |  13 ++--
 3 files changed, 161 insertions(+), 126 deletions(-)
Paolo Bonzini - Sept. 9, 2013, 4:21 p.m.
Il 09/09/2013 17:13, Mike Day ha scritto:
> Changes from V4.1:
> 
> * Correct memory barriers for ram_list globals.
> 
> Changes from V4:
> 
> * rebased on https://github.com/bonzini/qemu/tree/rcu
>   commit 965f3b2aac93bca6df50c86fb17a06b3c856fa30
> 
> Changes from V3:
> 
> * now passes virt-test -t qemu
> * uses call_rcu instead of call_rcu1
> * completely removed the ram_list mutex and its locking functions
> * cleaned up some comments
> 
> Changes from V2:
> 
> * added rcu reclaim function to free ram blocks
> * reduced the number of comments
> * made rcu locking policy consistent for different caller scenarios
> * rcu updates to ram_block now are protected only by the iothread mutex
> 
> Changes from V1:
> 
> * Omitted locks or rcu critical sections within Some functions that
>   read or write the ram_list but are called in a protected context
>   (the caller holds the iothread lock, the ram_list mutex, or an rcu
>   critical section).
> 
> Allow "unlocked" reads of the ram_list by using an RCU-enabled
> DQ. Most readers of the list no longer require holding the list mutex.
> 
> The ram_list now uses a QLIST instead of a QTAILQ. The difference is
> minimal.
> 
> This patch has been built and make-checked for the x86_64, ppc64,
> s390x, and arm targets. It has been virt-tested using KVM -t qemu and
> passes 15/15 migration tests.
> 
> To apply this patch, you must base upon Paolo Bonzini's rcu tree and
> also apply the RCU DQ patch (below).
> 
> https://github.com/bonzini/qemu/tree/rcu
> http://article.gmane.org/gmane.comp.emulators.qemu/230159/
> 
> Signed-off-by: Mike Day <ncmike@ncultra.org>

> @@ -601,12 +608,22 @@ static void reset_ram_globals(void)
>      last_seen_block = NULL;
>      last_sent_block = NULL;
>      last_offset = 0;
> -    last_version = ram_list.version;
>      ram_bulk_stage = true;
> +    smp_wmb();
> +    last_version = ram_list.version;

This barrier is still not needed.

Can you take a look at my rcu branch?

I pushed there the conversion of mru_block to RCU (based on 4.1), and
clarified a bit more the locking conventions.  Basically we have:

- functions called under iothread lock (e.g. qemu_ram_alloc)

- functions called under RCU or iothread lock (e.g. qemu_get_ram_ptr)

- functions called under RCU or iothread lock, or while holding a
reference to the MemoryRegion that is looked up again (e.g.
qemu_ram_addr_from_host).

The difference between the second and third group is simply that the
second will not call rcu_read_lock()/rcu_read_unlock(), the third will.

Does it make sense?  Should we simplify it by always calling
rcu_read_lock()/rcu_read_unlock(), which removes the second group?

Paolo

>  }
>  
>  #define MAX_WAIT 50 /* ms, half buffered_file limit */
>  
> +
> +/* ram_save_* functions each  implement a long-running RCU critical
> + * section. When rcu-reclaims in the code start to become numerous
> + * it will be necessary to reduce the granularity of these critical
> + * sections.
> + *
> + * (ram_save_setup, ram_save_iterate, and ram_save_complete.)
> + */
> +
>  static int ram_save_setup(QEMUFile *f, void *opaque)
>  {
>      RAMBlock *block;
> @@ -632,7 +649,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      }
>  
>      qemu_mutex_lock_iothread();
> -    qemu_mutex_lock_ramlist();
> +    rcu_read_lock();
>      bytes_transferred = 0;
>      reset_ram_globals();
>  
> @@ -642,13 +659,13 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>  
>      qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          qemu_put_byte(f, strlen(block->idstr));
>          qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
>          qemu_put_be64(f, block->length);
>      }
>  
> -    qemu_mutex_unlock_ramlist();
> +    rcu_read_unlock();
>  
>      ram_control_before_iterate(f, RAM_CONTROL_SETUP);
>      ram_control_after_iterate(f, RAM_CONTROL_SETUP);
> @@ -658,6 +675,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      return 0;
>  }
>  
> +
>  static int ram_save_iterate(QEMUFile *f, void *opaque)
>  {
>      int ret;
> @@ -665,16 +683,14 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>      int64_t t0;
>      int total_sent = 0;
>  
> -    qemu_mutex_lock_ramlist();
> -
> -    if (ram_list.version != last_version) {
> +    if (atomic_rcu_read(&ram_list.version) != last_version) {
>          reset_ram_globals();
>      }
>  
>      ram_control_before_iterate(f, RAM_CONTROL_ROUND);
> -
>      t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>      i = 0;
> +    rcu_read_lock();
>      while ((ret = qemu_file_rate_limit(f)) == 0) {
>          int bytes_sent;
>  
> @@ -701,9 +717,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          }
>          i++;
>      }
> -
> -    qemu_mutex_unlock_ramlist();
> -
> +    rcu_read_unlock();
>      /*
>       * Must occur before EOS (or any QEMUFile operation)
>       * because of RDMA protocol.
> @@ -718,19 +732,19 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>      total_sent += 8;
>      bytes_transferred += total_sent;
> -
>      return total_sent;
>  }
>  
>  static int ram_save_complete(QEMUFile *f, void *opaque)
>  {
> -    qemu_mutex_lock_ramlist();
> +
>      migration_bitmap_sync();
>  
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
>  
>      /* try transferring iterative blocks of memory */
>  
> +    rcu_read_lock();
>      /* flush all remaining blocks regardless of rate limiting */
>      while (true) {
>          int bytes_sent;
> @@ -742,11 +756,10 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          }
>          bytes_transferred += bytes_sent;
>      }
> -
> +    rcu_read_unlock();
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> -    qemu_mutex_unlock_ramlist();
>      qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>  
>      return 0;
> @@ -808,6 +821,9 @@ static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
>      return rc;
>  }
>  
> +/* Must be called from within a rcu critical section.
> + * Returns a pointer from within the RCU-protected ram_list.
> + */
>  static inline void *host_from_stream_offset(QEMUFile *f,
>                                              ram_addr_t offset,
>                                              int flags)
> @@ -829,9 +845,10 @@ static inline void *host_from_stream_offset(QEMUFile *f,
>      qemu_get_buffer(f, (uint8_t *)id, len);
>      id[len] = 0;
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> -        if (!strncmp(id, block->idstr, sizeof(id)))
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
> +        if (!strncmp(id, block->idstr, sizeof(id))) {
>              return memory_region_get_ram_ptr(block->mr) + offset;
> +        }
>      }
>  
>      fprintf(stderr, "Can't find block %s!\n", id);
> @@ -868,7 +885,12 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (version_id < 4 || version_id > 4) {
>          return -EINVAL;
>      }
> -
> +    /* this implements a long-running RCU critical section.
> +     * When rcu reclaims in the code start to become numerous
> +     * it will be necessary to reduce the granularity of this critical
> +     * section.
> +     */
> +    rcu_read_lock();
>      do {
>          addr = qemu_get_be64(f);
>  
> @@ -890,21 +912,19 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>                      qemu_get_buffer(f, (uint8_t *)id, len);
>                      id[len] = 0;
>                      length = qemu_get_be64(f);
> -
> -                    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +                    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>                          if (!strncmp(id, block->idstr, sizeof(id))) {
>                              if (block->length != length) {
>                                  fprintf(stderr,
>                                          "Length mismatch: %s: " RAM_ADDR_FMT
>                                          " in != " RAM_ADDR_FMT "\n", id, length,
>                                          block->length);
> -                                ret =  -EINVAL;
> +                                ret = -EINVAL;
>                                  goto done;
>                              }
>                              break;
>                          }
>                      }
> -
>                      if (!block) {
>                          fprintf(stderr, "Unknown ramblock \"%s\", cannot "
>                                  "accept migration\n", id);
> @@ -917,30 +937,30 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>              }
>          }
>  
> +        /* Call host_from_stream_offset while holding an rcu read lock.
> +         * It returns a pointer from within the rcu-protected ram_list.
> +         */
>          if (flags & RAM_SAVE_FLAG_COMPRESS) {
> -            void *host;
>              uint8_t ch;
> -
> -            host = host_from_stream_offset(f, addr, flags);
> +            void *host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> -                return -EINVAL;
> +                ret = -EINVAL;
> +                goto done;
>              }
> -
>              ch = qemu_get_byte(f);
>              ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
>          } else if (flags & RAM_SAVE_FLAG_PAGE) {
> -            void *host;
> -
> -            host = host_from_stream_offset(f, addr, flags);
> +            void *host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> -                return -EINVAL;
> +                ret = -EINVAL;
> +                goto done;
>              }
> -
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>          } else if (flags & RAM_SAVE_FLAG_XBZRLE) {
>              void *host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> -                return -EINVAL;
> +                ret = -EINVAL;
> +                goto done;
>              }
>  
>              if (load_xbzrle(f, addr, host) < 0) {
> @@ -958,6 +978,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      } while (!(flags & RAM_SAVE_FLAG_EOS));
>  
>  done:
> +    rcu_read_unlock();
>      DPRINTF("Completed load of VM with exit code %d seq iteration "
>              "%" PRIu64 "\n", ret, seq_iter);
>      return ret;
> @@ -1126,8 +1147,8 @@ void do_acpitable_option(const QemuOpts *opts)
>  
>      acpi_table_add(opts, &err);
>      if (err) {
> -        error_report("Wrong acpi table provided: %s",
> -                     error_get_pretty(err));
> +        fprintf(stderr, "Wrong acpi table provided: %s\n",
> +                error_get_pretty(err));
>          error_free(err);
>          exit(1);
>      }
> diff --git a/exec.c b/exec.c
> index c0780ca..fbf3b2b 100644
> --- a/exec.c
> +++ b/exec.c
> @@ -46,7 +46,7 @@
>  #endif
>  #include "exec/cpu-all.h"
>  #include "qemu/tls.h"
> -
> +#include "qemu/rcu_queue.h"
>  #include "exec/cputlb.h"
>  #include "translate-all.h"
>  
> @@ -57,7 +57,10 @@
>  #if !defined(CONFIG_USER_ONLY)
>  static int in_migration;
>  
> -RAMList ram_list = { .blocks = QTAILQ_HEAD_INITIALIZER(ram_list.blocks) };
> +/* ram_list is read under rcu_read_lock()/rcu_read_unlock().  Writes
> + * are protected by a lock, currently the iothread lock.
> + */
> +RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
>  
>  static MemoryRegion *system_memory;
>  static MemoryRegion *system_io;
> @@ -329,7 +332,6 @@ void cpu_exec_init_all(void)
>  {
>      tls_alloc_current_cpu_var();
>  #if !defined(CONFIG_USER_ONLY)
> -    qemu_mutex_init(&ram_list.mutex);
>      memory_map_init();
>      io_mem_init();
>  #endif
> @@ -885,7 +887,7 @@ static void mem_add(MemoryListener *listener, MemoryRegionSection *section)
>          now = remain;
>          if (int128_lt(remain.size, page_size)) {
>              register_subpage(d, &now);
> -        } else if (remain.offset_within_address_space & ~TARGET_PAGE_MASK) {
> +        } else if (remain.offset_within_region & ~TARGET_PAGE_MASK) {
>              now.size = page_size;
>              register_subpage(d, &now);
>          } else {
> @@ -901,16 +903,6 @@ void qemu_flush_coalesced_mmio_buffer(void)
>          kvm_flush_coalesced_mmio_buffer();
>  }
>  
> -void qemu_mutex_lock_ramlist(void)
> -{
> -    qemu_mutex_lock(&ram_list.mutex);
> -}
> -
> -void qemu_mutex_unlock_ramlist(void)
> -{
> -    qemu_mutex_unlock(&ram_list.mutex);
> -}
> -
>  #if defined(__linux__) && !defined(TARGET_S390X)
>  
>  #include <sys/vfs.h>
> @@ -1021,17 +1013,24 @@ static ram_addr_t find_ram_offset(ram_addr_t size)
>      RAMBlock *block, *next_block;
>      ram_addr_t offset = RAM_ADDR_MAX, mingap = RAM_ADDR_MAX;
>  
> +    /* ram_list must be protected by a mutex (for writes), or
> +     * an rcu critical section (for reads). Currently this code
> +     * is called with the iothread lock held. If that changes,
> +     * make sure to protect ram_list with an rcu critical section.
> +    */
> +
>      assert(size != 0); /* it would hand out same offset multiple times */
>  
> -    if (QTAILQ_EMPTY(&ram_list.blocks))
> +    if (QLIST_EMPTY_RCU(&ram_list.blocks)) {
>          return 0;
> +    }
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          ram_addr_t end, next = RAM_ADDR_MAX;
>  
>          end = block->offset + block->length;
>  
> -        QTAILQ_FOREACH(next_block, &ram_list.blocks, next) {
> +        QLIST_FOREACH_RCU(next_block, &ram_list.blocks, next) {
>              if (next_block->offset >= end) {
>                  next = MIN(next, next_block->offset);
>              }
> @@ -1056,9 +1055,11 @@ ram_addr_t last_ram_offset(void)
>      RAMBlock *block;
>      ram_addr_t last = 0;
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next)
> +    rcu_read_lock();
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          last = MAX(last, block->offset + block->length);
> -
> +    }
> +    rcu_read_unlock();
>      return last;
>  }
>  
> @@ -1083,7 +1084,12 @@ void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
>      RAMBlock *new_block, *block;
>  
>      new_block = NULL;
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +
> +    /* Assumes that the iothread lock is taken ... if that changes,
> +     * add an rcu_read_lock()/unlock pair when traversing the
> +     * ram list
> +     */
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (block->offset == addr) {
>              new_block = block;
>              break;
> @@ -1102,15 +1108,13 @@ void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
>      pstrcat(new_block->idstr, sizeof(new_block->idstr), name);
>  
>      /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (block != new_block && !strcmp(block->idstr, new_block->idstr)) {
>              fprintf(stderr, "RAMBlock \"%s\" already registered, abort!\n",
>                      new_block->idstr);
>              abort();
>          }
>      }
> -    qemu_mutex_unlock_ramlist();
>  }
>  
>  static int memory_try_enable_merging(void *addr, size_t len)
> @@ -1123,16 +1127,16 @@ static int memory_try_enable_merging(void *addr, size_t len)
>      return qemu_madvise(addr, len, QEMU_MADV_MERGEABLE);
>  }
>  
> +/* Called with the iothread lock being held */
> +
>  ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
>                                     MemoryRegion *mr)
>  {
> -    RAMBlock *block, *new_block;
> +    RAMBlock *block, *new_block, *last_block = 0;
>  
>      size = TARGET_PAGE_ALIGN(size);
>      new_block = g_malloc0(sizeof(*new_block));
>  
> -    /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
>      new_block->mr = mr;
>      new_block->offset = find_ram_offset(size);
>      if (host) {
> @@ -1165,21 +1169,24 @@ ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
>      new_block->length = size;
>  
>      /* Keep the list sorted from biggest to smallest block.  */
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
> +        last_block = block;
>          if (block->length < new_block->length) {
>              break;
>          }
>      }
>      if (block) {
> -        QTAILQ_INSERT_BEFORE(block, new_block, next);
> +        QLIST_INSERT_BEFORE_RCU(block, new_block, next);
>      } else {
> -        QTAILQ_INSERT_TAIL(&ram_list.blocks, new_block, next);
> +        if (last_block) {
> +            QLIST_INSERT_AFTER_RCU(last_block, new_block, next);
> +        } else { /* list is empty */
> +            QLIST_INSERT_HEAD_RCU(&ram_list.blocks, new_block, next);
> +        }
>      }
>      ram_list.mru_block = NULL;
> -
> +    smp_wmb();
>      ram_list.version++;
> -    qemu_mutex_unlock_ramlist();
> -
>      ram_list.phys_dirty = g_realloc(ram_list.phys_dirty,
>                                         last_ram_offset() >> TARGET_PAGE_BITS);
>      memset(ram_list.phys_dirty + (new_block->offset >> TARGET_PAGE_BITS),
> @@ -1204,30 +1211,29 @@ void qemu_ram_free_from_ptr(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    /* This assumes the iothread lock is taken here too. */
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr == block->offset) {
> -            QTAILQ_REMOVE(&ram_list.blocks, block, next);
> +            QLIST_REMOVE_RCU(block, next);
>              ram_list.mru_block = NULL;
> +            smp_wmb();
>              ram_list.version++;
> -            g_free(block);
> +            call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
>              break;
>          }
>      }
> -    qemu_mutex_unlock_ramlist();
>  }
>  
> +/* called with the iothread lock held */
>  void qemu_ram_free(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr == block->offset) {
> -            QTAILQ_REMOVE(&ram_list.blocks, block, next);
> +            QLIST_REMOVE_RCU(block, next);
>              ram_list.mru_block = NULL;
> +            smp_wmb();
>              ram_list.version++;
>              if (block->flags & RAM_PREALLOC_MASK) {
>                  ;
> @@ -1249,12 +1255,10 @@ void qemu_ram_free(ram_addr_t addr)
>                      qemu_anon_ram_free(block->host, block->length);
>                  }
>              }
> -            g_free(block);
> +            call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
>              break;
>          }
>      }
> -    qemu_mutex_unlock_ramlist();
> -
>  }
>  
>  #ifndef _WIN32
> @@ -1265,7 +1269,7 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
>      int flags;
>      void *area, *vaddr;
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          offset = addr - block->offset;
>          if (offset < block->length) {
>              vaddr = block->host + offset;
> @@ -1313,7 +1317,6 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
>                  memory_try_enable_merging(vaddr, length);
>                  qemu_ram_setup_dump(vaddr, length);
>              }
> -            return;
>          }
>      }
>  }
> @@ -1323,23 +1326,21 @@ static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* The list is protected by the iothread lock here.  */
> -    block = ram_list.mru_block;
> +    /* This assumes the iothread lock is taken here too. */
> +
> +    block = atomic_rcu_read(&ram_list.mru_block);
>      if (block && addr - block->offset < block->length) {
> -        goto found;
> +        return block;
>      }
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr - block->offset < block->length) {
> -            goto found;
> +            atomic_rcu_set(&ram_list.mru_block, block);
> +            return block;
>          }
>      }
>  
>      fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
>      abort();
> -
> -found:
> -    ram_list.mru_block = block;
> -    return block;
>  }
>  
>  /* Return a host pointer to ram allocated with qemu_ram_alloc.
> @@ -1378,8 +1379,8 @@ static void *qemu_safe_ram_ptr(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* The list is protected by the iothread lock here.  */
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    /* This assumes the iothread lock is taken here too. */
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr - block->offset < block->length) {
>              if (xen_enabled()) {
>                  /* We need to check if the requested address is in the RAM
> @@ -1399,14 +1400,19 @@ static void *qemu_safe_ram_ptr(ram_addr_t addr)
>  
>      fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
>      abort();
> -
>      return NULL;
>  }
>  
>  /* Return a host pointer to guest's ram. Similar to qemu_get_ram_ptr
> - * but takes a size argument */
> + * but takes a size argument
> + *
> + * The returned pointer is not protected by RCU so the caller
> + * must have other means of protecting the pointer, such as a
> + * reference.
> + * */
>  static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
>  {
> +    void *ptr = NULL;
>      if (*size == 0) {
>          return NULL;
>      }
> @@ -1414,12 +1420,14 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
>          return xen_map_cache(addr, *size, 1);
>      } else {
>          RAMBlock *block;
> -
> -        QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +        rcu_read_lock();
> +        QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>              if (addr - block->offset < block->length) {
>                  if (addr - block->offset + *size > block->length)
>                      *size = block->length - addr + block->offset;
> -                return block->host + (addr - block->offset);
> +                ptr = block->host + (addr - block->offset);
> +                rcu_read_unlock();
> +                return ptr;
>              }
>          }
>  
> @@ -1429,37 +1437,43 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
>  }
>  
>  /* Some of the softmmu routines need to translate from a host pointer
> -   (typically a TLB entry) back to a ram offset.  */
> + * (typically a TLB entry) back to a ram offset.
> + *
> + * Note that the returned MemoryRegion is not RCU-protected.
> + */
>  MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
>  {
>      RAMBlock *block;
>      uint8_t *host = ptr;
> +    MemoryRegion *mr = NULL;
>  
>      if (xen_enabled()) {
>          *ram_addr = xen_ram_addr_from_mapcache(ptr);
>          return qemu_get_ram_block(*ram_addr)->mr;
>      }
> -
> -    block = ram_list.mru_block;
> +    rcu_read_lock();
> +    block = atomic_rcu_read(&ram_list.mru_block);
>      if (block && block->host && host - block->host < block->length) {
> -        goto found;
> +        *ram_addr = block->offset + (host - block->host);
> +        mr = block->mr;
> +        goto unlock_out;
>      }
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          /* This case append when the block is not mapped. */
>          if (block->host == NULL) {
>              continue;
>          }
>          if (host - block->host < block->length) {
> -            goto found;
> +            *ram_addr = block->offset + (host - block->host);
> +            mr = block->mr;
> +            goto unlock_out;
>          }
>      }
>  
> -    return NULL;
> -
> -found:
> -    *ram_addr = block->offset + (host - block->host);
> -    return block->mr;
> +unlock_out:
> +    rcu_read_unlock();
> +    return mr;
>  }
>  
>  static void notdirty_mem_write(void *opaque, hwaddr ram_addr,
> @@ -2709,9 +2723,10 @@ bool cpu_physical_memory_is_io(hwaddr phys_addr)
>  void qemu_ram_foreach_block(RAMBlockIterFunc func, void *opaque)
>  {
>      RAMBlock *block;
> -
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    rcu_read_lock();
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          func(block->host, block->offset, block->length, opaque);
>      }
> +    rcu_read_unlock();
>  }
>  #endif
> diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
> index e088089..e68b86b 100644
> --- a/include/exec/cpu-all.h
> +++ b/include/exec/cpu-all.h
> @@ -24,6 +24,7 @@
>  #include "qemu/thread.h"
>  #include "qom/cpu.h"
>  #include "qemu/tls.h"
> +#include "qemu/rcu.h"
>  
>  /* some important defines:
>   *
> @@ -448,28 +449,26 @@ extern ram_addr_t ram_size;
>  #define RAM_PREALLOC_MASK   (1 << 0)
>  
>  typedef struct RAMBlock {
> +    struct rcu_head rcu;
>      struct MemoryRegion *mr;
>      uint8_t *host;
>      ram_addr_t offset;
>      ram_addr_t length;
>      uint32_t flags;
>      char idstr[256];
> -    /* Reads can take either the iothread or the ramlist lock.
> -     * Writes must take both locks.
> -     */
> -    QTAILQ_ENTRY(RAMBlock) next;
> +    /* Writes must hold the iothread lock */
> +    QLIST_ENTRY(RAMBlock) next;
>  #if defined(__linux__) && !defined(TARGET_S390X)
>      int fd;
>  #endif
>  } RAMBlock;
>  
>  typedef struct RAMList {
> -    QemuMutex mutex;
>      /* Protected by the iothread lock.  */
>      uint8_t *phys_dirty;
>      RAMBlock *mru_block;
> -    /* Protected by the ramlist lock.  */
> -    QTAILQ_HEAD(, RAMBlock) blocks;
> +    /* RCU-enabled, writes protected by the iothread lock. */
> +    QLIST_HEAD(, RAMBlock) blocks;
>      uint32_t version;
>  } RAMList;
>  extern RAMList ram_list;
>
Mike D. Day - Sept. 9, 2013, 6:18 p.m.
On Mon, Sep 9, 2013 at 12:21 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>
> > @@ -601,12 +608,22 @@ static void reset_ram_globals(void)
> >      last_seen_block = NULL;
> >      last_sent_block = NULL;
> >      last_offset = 0;
> > -    last_version = ram_list.version;
> >      ram_bulk_stage = true;
> > +    smp_wmb();
> > +    last_version = ram_list.version;
>
> This barrier is still not needed.

Yes, I agree, because of the calling context. It is brittle though because
reset_ram_globals is a static function and may be called differently in the
future (or by new code at a different location). The need for a barrier may
change and it would be opaque to the internals of the reset function. Also,
the globals are writable elsewhere in the file. It needs more organization
but I agree that should be a discrete patch.

> Can you take a look at my rcu branch?

The comments clarify to me why writing to last_mru does not need a write
barrier, thank you.

> I pushed there the conversion of mru_block to RCU (based on 4.1), and
> clarified a bit more the locking conventions.  Basically we have:
>
> - functions called under iothread lock (e.g. qemu_ram_alloc)
>
> - functions called under RCU or iothread lock (e.g. qemu_get_ram_ptr)
>
> - functions called under RCU or iothread lock, or while holding a
> reference to the MemoryRegion that is looked up again (e.g.
> qemu_ram_addr_from_host).
>
> The difference between the second and third group is simply that the
> second will not call rcu_read_lock()/rcu_read_unlock(), the third will.
>
> Does it make sense?  Should we simplify it by always calling
> rcu_read_lock()/rcu_read_unlock(), which removes the second group?

I think the benefits of simplification and future reliability are greater
than the performance cost of the rcu_read_lock. The latter should be
something we verify, though.

Thank you!

Mike

Patch

diff --git a/arch_init.c b/arch_init.c
index 0471cd5..e24d29e 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -49,6 +49,7 @@ 
 #include "trace.h"
 #include "exec/cpu-all.h"
 #include "hw/acpi/acpi.h"
+#include "qemu/rcu_queue.h"
 
 #ifdef DEBUG_ARCH_INIT
 #define DPRINTF(fmt, ...) \
@@ -399,7 +400,8 @@  static void migration_bitmap_sync(void)
     trace_migration_bitmap_sync_start();
     address_space_sync_dirty_bitmap(&address_space_memory);
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         for (addr = 0; addr < block->length; addr += TARGET_PAGE_SIZE) {
             if (memory_region_test_and_clear_dirty(block->mr,
                                                    addr, TARGET_PAGE_SIZE,
@@ -408,6 +410,8 @@  static void migration_bitmap_sync(void)
             }
         }
     }
+    rcu_read_unlock();
+
     trace_migration_bitmap_sync_end(migration_dirty_pages
                                     - num_dirty_pages_init);
     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
@@ -447,6 +451,8 @@  static void migration_bitmap_sync(void)
  *
  * Returns:  The number of bytes written.
  *           0 means no dirty pages
+ *
+ * assumes that the caller has rcu-locked the ram_list
  */
 
 static int ram_save_block(QEMUFile *f, bool last_stage)
@@ -458,8 +464,9 @@  static int ram_save_block(QEMUFile *f, bool last_stage)
     MemoryRegion *mr;
     ram_addr_t current_addr;
 
+
     if (!block)
-        block = QTAILQ_FIRST(&ram_list.blocks);
+        block = QLIST_FIRST_RCU(&ram_list.blocks);
 
     while (true) {
         mr = block->mr;
@@ -470,9 +477,9 @@  static int ram_save_block(QEMUFile *f, bool last_stage)
         }
         if (offset >= block->length) {
             offset = 0;
-            block = QTAILQ_NEXT(block, next);
+            block = QLIST_NEXT_RCU(block, next);
             if (!block) {
-                block = QTAILQ_FIRST(&ram_list.blocks);
+                block = QLIST_FIRST_RCU(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
             }
@@ -527,9 +534,9 @@  static int ram_save_block(QEMUFile *f, bool last_stage)
             }
         }
     }
+
     last_seen_block = block;
     last_offset = offset;
-
     return bytes_sent;
 }
 
@@ -566,10 +573,10 @@  uint64_t ram_bytes_total(void)
 {
     RAMBlock *block;
     uint64_t total = 0;
-
-    QTAILQ_FOREACH(block, &ram_list.blocks, next)
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
         total += block->length;
-
+    rcu_read_unlock();
     return total;
 }
 
@@ -601,12 +608,22 @@  static void reset_ram_globals(void)
     last_seen_block = NULL;
     last_sent_block = NULL;
     last_offset = 0;
-    last_version = ram_list.version;
     ram_bulk_stage = true;
+    smp_wmb();
+    last_version = ram_list.version;
 }
 
 #define MAX_WAIT 50 /* ms, half buffered_file limit */
 
+
+/* ram_save_* functions each  implement a long-running RCU critical
+ * section. When rcu-reclaims in the code start to become numerous
+ * it will be necessary to reduce the granularity of these critical
+ * sections.
+ *
+ * (ram_save_setup, ram_save_iterate, and ram_save_complete.)
+ */
+
 static int ram_save_setup(QEMUFile *f, void *opaque)
 {
     RAMBlock *block;
@@ -632,7 +649,7 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     qemu_mutex_lock_iothread();
-    qemu_mutex_lock_ramlist();
+    rcu_read_lock();
     bytes_transferred = 0;
     reset_ram_globals();
 
@@ -642,13 +659,13 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
 
     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         qemu_put_byte(f, strlen(block->idstr));
         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
         qemu_put_be64(f, block->length);
     }
 
-    qemu_mutex_unlock_ramlist();
+    rcu_read_unlock();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
@@ -658,6 +675,7 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
     return 0;
 }
 
+
 static int ram_save_iterate(QEMUFile *f, void *opaque)
 {
     int ret;
@@ -665,16 +683,14 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
     int64_t t0;
     int total_sent = 0;
 
-    qemu_mutex_lock_ramlist();
-
-    if (ram_list.version != last_version) {
+    if (atomic_rcu_read(&ram_list.version) != last_version) {
         reset_ram_globals();
     }
 
     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
-
     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
     i = 0;
+    rcu_read_lock();
     while ((ret = qemu_file_rate_limit(f)) == 0) {
         int bytes_sent;
 
@@ -701,9 +717,7 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
         }
         i++;
     }
-
-    qemu_mutex_unlock_ramlist();
-
+    rcu_read_unlock();
     /*
      * Must occur before EOS (or any QEMUFile operation)
      * because of RDMA protocol.
@@ -718,19 +732,19 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     total_sent += 8;
     bytes_transferred += total_sent;
-
     return total_sent;
 }
 
 static int ram_save_complete(QEMUFile *f, void *opaque)
 {
-    qemu_mutex_lock_ramlist();
+
     migration_bitmap_sync();
 
     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
 
     /* try transferring iterative blocks of memory */
 
+    rcu_read_lock();
     /* flush all remaining blocks regardless of rate limiting */
     while (true) {
         int bytes_sent;
@@ -742,11 +756,10 @@  static int ram_save_complete(QEMUFile *f, void *opaque)
         }
         bytes_transferred += bytes_sent;
     }
-
+    rcu_read_unlock();
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-    qemu_mutex_unlock_ramlist();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -808,6 +821,9 @@  static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
     return rc;
 }
 
+/* Must be called from within a rcu critical section.
+ * Returns a pointer from within the RCU-protected ram_list.
+ */
 static inline void *host_from_stream_offset(QEMUFile *f,
                                             ram_addr_t offset,
                                             int flags)
@@ -829,9 +845,10 @@  static inline void *host_from_stream_offset(QEMUFile *f,
     qemu_get_buffer(f, (uint8_t *)id, len);
     id[len] = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
-        if (!strncmp(id, block->idstr, sizeof(id)))
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
+        if (!strncmp(id, block->idstr, sizeof(id))) {
             return memory_region_get_ram_ptr(block->mr) + offset;
+        }
     }
 
     fprintf(stderr, "Can't find block %s!\n", id);
@@ -868,7 +885,12 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
     if (version_id < 4 || version_id > 4) {
         return -EINVAL;
     }
-
+    /* this implements a long-running RCU critical section.
+     * When rcu reclaims in the code start to become numerous
+     * it will be necessary to reduce the granularity of this critical
+     * section.
+     */
+    rcu_read_lock();
     do {
         addr = qemu_get_be64(f);
 
@@ -890,21 +912,19 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
                     qemu_get_buffer(f, (uint8_t *)id, len);
                     id[len] = 0;
                     length = qemu_get_be64(f);
-
-                    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+                    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
                         if (!strncmp(id, block->idstr, sizeof(id))) {
                             if (block->length != length) {
                                 fprintf(stderr,
                                         "Length mismatch: %s: " RAM_ADDR_FMT
                                         " in != " RAM_ADDR_FMT "\n", id, length,
                                         block->length);
-                                ret =  -EINVAL;
+                                ret = -EINVAL;
                                 goto done;
                             }
                             break;
                         }
                     }
-
                     if (!block) {
                         fprintf(stderr, "Unknown ramblock \"%s\", cannot "
                                 "accept migration\n", id);
@@ -917,30 +937,30 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
             }
         }
 
+        /* Call host_from_stream_offset while holding an rcu read lock.
+         * It returns a pointer from within the rcu-protected ram_list.
+         */
         if (flags & RAM_SAVE_FLAG_COMPRESS) {
-            void *host;
             uint8_t ch;
-
-            host = host_from_stream_offset(f, addr, flags);
+            void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
-                return -EINVAL;
+                ret = -EINVAL;
+                goto done;
             }
-
             ch = qemu_get_byte(f);
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
         } else if (flags & RAM_SAVE_FLAG_PAGE) {
-            void *host;
-
-            host = host_from_stream_offset(f, addr, flags);
+            void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
-                return -EINVAL;
+                ret = -EINVAL;
+                goto done;
             }
-
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
         } else if (flags & RAM_SAVE_FLAG_XBZRLE) {
             void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
-                return -EINVAL;
+                ret = -EINVAL;
+                goto done;
             }
 
             if (load_xbzrle(f, addr, host) < 0) {
@@ -958,6 +978,7 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
     } while (!(flags & RAM_SAVE_FLAG_EOS));
 
 done:
+    rcu_read_unlock();
     DPRINTF("Completed load of VM with exit code %d seq iteration "
             "%" PRIu64 "\n", ret, seq_iter);
     return ret;
@@ -1126,8 +1147,8 @@  void do_acpitable_option(const QemuOpts *opts)
 
     acpi_table_add(opts, &err);
     if (err) {
-        error_report("Wrong acpi table provided: %s",
-                     error_get_pretty(err));
+        fprintf(stderr, "Wrong acpi table provided: %s\n",
+                error_get_pretty(err));
         error_free(err);
         exit(1);
     }
diff --git a/exec.c b/exec.c
index c0780ca..fbf3b2b 100644
--- a/exec.c
+++ b/exec.c
@@ -46,7 +46,7 @@ 
 #endif
 #include "exec/cpu-all.h"
 #include "qemu/tls.h"
-
+#include "qemu/rcu_queue.h"
 #include "exec/cputlb.h"
 #include "translate-all.h"
 
@@ -57,7 +57,10 @@ 
 #if !defined(CONFIG_USER_ONLY)
 static int in_migration;
 
-RAMList ram_list = { .blocks = QTAILQ_HEAD_INITIALIZER(ram_list.blocks) };
+/* ram_list is read under rcu_read_lock()/rcu_read_unlock().  Writes
+ * are protected by a lock, currently the iothread lock.
+ */
+RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
 
 static MemoryRegion *system_memory;
 static MemoryRegion *system_io;
@@ -329,7 +332,6 @@  void cpu_exec_init_all(void)
 {
     tls_alloc_current_cpu_var();
 #if !defined(CONFIG_USER_ONLY)
-    qemu_mutex_init(&ram_list.mutex);
     memory_map_init();
     io_mem_init();
 #endif
@@ -885,7 +887,7 @@  static void mem_add(MemoryListener *listener, MemoryRegionSection *section)
         now = remain;
         if (int128_lt(remain.size, page_size)) {
             register_subpage(d, &now);
-        } else if (remain.offset_within_address_space & ~TARGET_PAGE_MASK) {
+        } else if (remain.offset_within_region & ~TARGET_PAGE_MASK) {
             now.size = page_size;
             register_subpage(d, &now);
         } else {
@@ -901,16 +903,6 @@  void qemu_flush_coalesced_mmio_buffer(void)
         kvm_flush_coalesced_mmio_buffer();
 }
 
-void qemu_mutex_lock_ramlist(void)
-{
-    qemu_mutex_lock(&ram_list.mutex);
-}
-
-void qemu_mutex_unlock_ramlist(void)
-{
-    qemu_mutex_unlock(&ram_list.mutex);
-}
-
 #if defined(__linux__) && !defined(TARGET_S390X)
 
 #include <sys/vfs.h>
@@ -1021,17 +1013,24 @@  static ram_addr_t find_ram_offset(ram_addr_t size)
     RAMBlock *block, *next_block;
     ram_addr_t offset = RAM_ADDR_MAX, mingap = RAM_ADDR_MAX;
 
+    /* ram_list must be protected by a mutex (for writes), or
+     * an rcu critical section (for reads). Currently this code
+     * is called with the iothread lock held. If that changes,
+     * make sure to protect ram_list with an rcu critical section.
+    */
+
     assert(size != 0); /* it would hand out same offset multiple times */
 
-    if (QTAILQ_EMPTY(&ram_list.blocks))
+    if (QLIST_EMPTY_RCU(&ram_list.blocks)) {
         return 0;
+    }
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         ram_addr_t end, next = RAM_ADDR_MAX;
 
         end = block->offset + block->length;
 
-        QTAILQ_FOREACH(next_block, &ram_list.blocks, next) {
+        QLIST_FOREACH_RCU(next_block, &ram_list.blocks, next) {
             if (next_block->offset >= end) {
                 next = MIN(next, next_block->offset);
             }
@@ -1056,9 +1055,11 @@  ram_addr_t last_ram_offset(void)
     RAMBlock *block;
     ram_addr_t last = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next)
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         last = MAX(last, block->offset + block->length);
-
+    }
+    rcu_read_unlock();
     return last;
 }
 
@@ -1083,7 +1084,12 @@  void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
     RAMBlock *new_block, *block;
 
     new_block = NULL;
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+
+    /* Assumes that the iothread lock is taken ... if that changes,
+     * add an rcu_read_lock()/unlock pair when traversing the
+     * ram list
+     */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (block->offset == addr) {
             new_block = block;
             break;
@@ -1102,15 +1108,13 @@  void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
     pstrcat(new_block->idstr, sizeof(new_block->idstr), name);
 
     /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (block != new_block && !strcmp(block->idstr, new_block->idstr)) {
             fprintf(stderr, "RAMBlock \"%s\" already registered, abort!\n",
                     new_block->idstr);
             abort();
         }
     }
-    qemu_mutex_unlock_ramlist();
 }
 
 static int memory_try_enable_merging(void *addr, size_t len)
@@ -1123,16 +1127,16 @@  static int memory_try_enable_merging(void *addr, size_t len)
     return qemu_madvise(addr, len, QEMU_MADV_MERGEABLE);
 }
 
+/* Called with the iothread lock being held */
+
 ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
                                    MemoryRegion *mr)
 {
-    RAMBlock *block, *new_block;
+    RAMBlock *block, *new_block, *last_block = 0;
 
     size = TARGET_PAGE_ALIGN(size);
     new_block = g_malloc0(sizeof(*new_block));
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
     new_block->mr = mr;
     new_block->offset = find_ram_offset(size);
     if (host) {
@@ -1165,21 +1169,24 @@  ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
     new_block->length = size;
 
     /* Keep the list sorted from biggest to smallest block.  */
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
+        last_block = block;
         if (block->length < new_block->length) {
             break;
         }
     }
     if (block) {
-        QTAILQ_INSERT_BEFORE(block, new_block, next);
+        QLIST_INSERT_BEFORE_RCU(block, new_block, next);
     } else {
-        QTAILQ_INSERT_TAIL(&ram_list.blocks, new_block, next);
+        if (last_block) {
+            QLIST_INSERT_AFTER_RCU(last_block, new_block, next);
+        } else { /* list is empty */
+            QLIST_INSERT_HEAD_RCU(&ram_list.blocks, new_block, next);
+        }
     }
     ram_list.mru_block = NULL;
-
+    smp_wmb();
     ram_list.version++;
-    qemu_mutex_unlock_ramlist();
-
     ram_list.phys_dirty = g_realloc(ram_list.phys_dirty,
                                        last_ram_offset() >> TARGET_PAGE_BITS);
     memset(ram_list.phys_dirty + (new_block->offset >> TARGET_PAGE_BITS),
@@ -1204,30 +1211,29 @@  void qemu_ram_free_from_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    /* This assumes the iothread lock is taken here too. */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QTAILQ_REMOVE(&ram_list.blocks, block, next);
+            QLIST_REMOVE_RCU(block, next);
             ram_list.mru_block = NULL;
+            smp_wmb();
             ram_list.version++;
-            g_free(block);
+            call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
             break;
         }
     }
-    qemu_mutex_unlock_ramlist();
 }
 
+/* called with the iothread lock held */
 void qemu_ram_free(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QTAILQ_REMOVE(&ram_list.blocks, block, next);
+            QLIST_REMOVE_RCU(block, next);
             ram_list.mru_block = NULL;
+            smp_wmb();
             ram_list.version++;
             if (block->flags & RAM_PREALLOC_MASK) {
                 ;
@@ -1249,12 +1255,10 @@  void qemu_ram_free(ram_addr_t addr)
                     qemu_anon_ram_free(block->host, block->length);
                 }
             }
-            g_free(block);
+            call_rcu(block, (void (*)(struct RAMBlock *))g_free, rcu);
             break;
         }
     }
-    qemu_mutex_unlock_ramlist();
-
 }
 
 #ifndef _WIN32
@@ -1265,7 +1269,7 @@  void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
     int flags;
     void *area, *vaddr;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         offset = addr - block->offset;
         if (offset < block->length) {
             vaddr = block->host + offset;
@@ -1313,7 +1317,6 @@  void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
                 memory_try_enable_merging(vaddr, length);
                 qemu_ram_setup_dump(vaddr, length);
             }
-            return;
         }
     }
 }
@@ -1323,23 +1326,21 @@  static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* The list is protected by the iothread lock here.  */
-    block = ram_list.mru_block;
+    /* This assumes the iothread lock is taken here too. */
+
+    block = atomic_rcu_read(&ram_list.mru_block);
     if (block && addr - block->offset < block->length) {
-        goto found;
+        return block;
     }
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->length) {
-            goto found;
+            atomic_rcu_set(&ram_list.mru_block, block);
+            return block;
         }
     }
 
     fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
     abort();
-
-found:
-    ram_list.mru_block = block;
-    return block;
 }
 
 /* Return a host pointer to ram allocated with qemu_ram_alloc.
@@ -1378,8 +1379,8 @@  static void *qemu_safe_ram_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* The list is protected by the iothread lock here.  */
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    /* This assumes the iothread lock is taken here too. */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->length) {
             if (xen_enabled()) {
                 /* We need to check if the requested address is in the RAM
@@ -1399,14 +1400,19 @@  static void *qemu_safe_ram_ptr(ram_addr_t addr)
 
     fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
     abort();
-
     return NULL;
 }
 
 /* Return a host pointer to guest's ram. Similar to qemu_get_ram_ptr
- * but takes a size argument */
+ * but takes a size argument
+ *
+ * The returned pointer is not protected by RCU so the caller
+ * must have other means of protecting the pointer, such as a
+ * reference.
+ * */
 static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
 {
+    void *ptr = NULL;
     if (*size == 0) {
         return NULL;
     }
@@ -1414,12 +1420,14 @@  static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
         return xen_map_cache(addr, *size, 1);
     } else {
         RAMBlock *block;
-
-        QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+        rcu_read_lock();
+        QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
             if (addr - block->offset < block->length) {
                 if (addr - block->offset + *size > block->length)
                     *size = block->length - addr + block->offset;
-                return block->host + (addr - block->offset);
+                ptr = block->host + (addr - block->offset);
+                rcu_read_unlock();
+                return ptr;
             }
         }
 
@@ -1429,37 +1437,43 @@  static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
 }
 
 /* Some of the softmmu routines need to translate from a host pointer
-   (typically a TLB entry) back to a ram offset.  */
+ * (typically a TLB entry) back to a ram offset.
+ *
+ * Note that the returned MemoryRegion is not RCU-protected.
+ */
 MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
 {
     RAMBlock *block;
     uint8_t *host = ptr;
+    MemoryRegion *mr = NULL;
 
     if (xen_enabled()) {
         *ram_addr = xen_ram_addr_from_mapcache(ptr);
         return qemu_get_ram_block(*ram_addr)->mr;
     }
-
-    block = ram_list.mru_block;
+    rcu_read_lock();
+    block = atomic_rcu_read(&ram_list.mru_block);
     if (block && block->host && host - block->host < block->length) {
-        goto found;
+        *ram_addr = block->offset + (host - block->host);
+        mr = block->mr;
+        goto unlock_out;
     }
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         /* This case append when the block is not mapped. */
         if (block->host == NULL) {
             continue;
         }
         if (host - block->host < block->length) {
-            goto found;
+            *ram_addr = block->offset + (host - block->host);
+            mr = block->mr;
+            goto unlock_out;
         }
     }
 
-    return NULL;
-
-found:
-    *ram_addr = block->offset + (host - block->host);
-    return block->mr;
+unlock_out:
+    rcu_read_unlock();
+    return mr;
 }
 
 static void notdirty_mem_write(void *opaque, hwaddr ram_addr,
@@ -2709,9 +2723,10 @@  bool cpu_physical_memory_is_io(hwaddr phys_addr)
 void qemu_ram_foreach_block(RAMBlockIterFunc func, void *opaque)
 {
     RAMBlock *block;
-
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         func(block->host, block->offset, block->length, opaque);
     }
+    rcu_read_unlock();
 }
 #endif
diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
index e088089..e68b86b 100644
--- a/include/exec/cpu-all.h
+++ b/include/exec/cpu-all.h
@@ -24,6 +24,7 @@ 
 #include "qemu/thread.h"
 #include "qom/cpu.h"
 #include "qemu/tls.h"
+#include "qemu/rcu.h"
 
 /* some important defines:
  *
@@ -448,28 +449,26 @@  extern ram_addr_t ram_size;
 #define RAM_PREALLOC_MASK   (1 << 0)
 
 typedef struct RAMBlock {
+    struct rcu_head rcu;
     struct MemoryRegion *mr;
     uint8_t *host;
     ram_addr_t offset;
     ram_addr_t length;
     uint32_t flags;
     char idstr[256];
-    /* Reads can take either the iothread or the ramlist lock.
-     * Writes must take both locks.
-     */
-    QTAILQ_ENTRY(RAMBlock) next;
+    /* Writes must hold the iothread lock */
+    QLIST_ENTRY(RAMBlock) next;
 #if defined(__linux__) && !defined(TARGET_S390X)
     int fd;
 #endif
 } RAMBlock;
 
 typedef struct RAMList {
-    QemuMutex mutex;
     /* Protected by the iothread lock.  */
     uint8_t *phys_dirty;
     RAMBlock *mru_block;
-    /* Protected by the ramlist lock.  */
-    QTAILQ_HEAD(, RAMBlock) blocks;
+    /* RCU-enabled, writes protected by the iothread lock. */
+    QLIST_HEAD(, RAMBlock) blocks;
     uint32_t version;
 } RAMList;
 extern RAMList ram_list;