diff mbox

[RFC] Convert ram_list to RCU DQ V2

Message ID 1377878817-4513-1-git-send-email-ncmike@ncultra.org
State New
Headers show

Commit Message

Mike D. Day Aug. 30, 2013, 4:06 p.m. UTC
Changes from V1:

* Omitted locks or rcu critical sections within Some functions that
  read or write the ram_list but are called in a protected context
  (the caller holds the iothread lock, the ram_list mutex, or an rcu
  critical section).

Allow "unlocked" reads of the ram_list by using an RCU-enabled
DQ. Most readers of the list no longer require holding the list mutex.

The ram_list now uses a QLIST instead of a QTAILQ. The difference is
minimal.

This patch has been built and make-checked for the x86_64, ppc64,
s390x, and arm targets. It has not been tested further than that at
this point.

To apply this patch, you must base upon Paolo Bonzini's rcu tree and
also apply the RCU DQ patch (below).

https://github.com/bonzini/qemu/tree/rcu
http://article.gmane.org/gmane.comp.emulators.qemu/230159/

Signed-off-by: Mike Day <ncmike@ncultra.org>
---
 arch_init.c               |  80 +++++++++++++++-----------
 exec.c                    | 142 ++++++++++++++++++++++++++++++----------------
 hw/9pfs/virtio-9p-synth.c |   2 +-
 include/exec/cpu-all.h    |   4 +-
 include/qemu/rcu_queue.h  |   8 +++
 5 files changed, 151 insertions(+), 85 deletions(-)

Comments

Paolo Bonzini Aug. 30, 2013, 4:38 p.m. UTC | #1
Il 30/08/2013 18:06, Mike Day ha scritto:
> Changes from V1:
> 
> * Omitted locks or rcu critical sections within Some functions that
>   read or write the ram_list but are called in a protected context
>   (the caller holds the iothread lock, the ram_list mutex, or an rcu
>   critical section).
> 
> Allow "unlocked" reads of the ram_list by using an RCU-enabled
> DQ. Most readers of the list no longer require holding the list mutex.
> 
> The ram_list now uses a QLIST instead of a QTAILQ. The difference is
> minimal.
> 
> This patch has been built and make-checked for the x86_64, ppc64,
> s390x, and arm targets. It has not been tested further than that at
> this point.
> 
> To apply this patch, you must base upon Paolo Bonzini's rcu tree and
> also apply the RCU DQ patch (below).
> 
> https://github.com/bonzini/qemu/tree/rcu
> http://article.gmane.org/gmane.comp.emulators.qemu/230159/
> 
> Signed-off-by: Mike Day <ncmike@ncultra.org>

Thanks.  I moved the include/qemu/rcu_queue.h part to the previous patch
and applied it.

ram_save_complete is called with the iothread lock held, so you can get
rid of the ramlist mutex completely.

> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +     /* This assumes the iothread lock or the ram_list mutex is taken.
> +     * if that changes, accesses to ram_list need to be protected
> +     * by a mutex (writes) or an rcu read lock (reads)
> +     */
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          for (addr = 0; addr < block->length; addr += TARGET_PAGE_SIZE) {
>              if (memory_region_test_and_clear_dirty(block->mr,
>                                                     addr, TARGET_PAGE_SIZE,

Perhaps rcu_read_lock/unlock unconditionally is simpler here, since it's
cheap?

> @@ -815,6 +822,10 @@ static inline void *host_from_stream_offset(QEMUFile *f,
>      char id[256];
>      uint8_t len;
>  
> +    /* Must be called from within a rcu critical section.
> +     * Returns a pointer from within the RCU-protected ram_list.
> +     */
> +
>      if (flags & RAM_SAVE_FLAG_CONTINUE) {
>          if (!block) {
>              fprintf(stderr, "Ack, bad migration stream!\n");

Please put the comment before the "static inline void
*host_from_stream_offset" line.

> @@ -828,9 +839,10 @@ static inline void *host_from_stream_offset(QEMUFile *f,
>      qemu_get_buffer(f, (uint8_t *)id, len);
>      id[len] = 0;
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> -        if (!strncmp(id, block->idstr, sizeof(id)))
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
> +        if (!strncmp(id, block->idstr, sizeof(id))) {
>              return memory_region_get_ram_ptr(block->mr) + offset;
> +        }
>      }
>  
>      fprintf(stderr, "Can't find block %s!\n", id);
> @@ -867,7 +879,12 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>      if (version_id < 4 || version_id > 4) {
>          return -EINVAL;
>      }
> -
> +    /* this implements a long-running RCU critical section.
> +     * When rcu reclaims in the code start to become numerous
> +     * it will be necessary to reduce the granularity of this critical
> +     * section.
> +     */

Please add the same comment (and a rcu_read_lock/unlock pair replacing
the ramlist mutex) in ram_save_iterate, too.

> diff --git a/exec.c b/exec.c
> index 5eebcc1..d1132da 100644
> --- a/exec.c
> +++ b/exec.c
> @@ -46,7 +46,7 @@
>  #endif
>  #include "exec/cpu-all.h"
>  #include "qemu/tls.h"
> -
> +#include "qemu/rcu_queue.h"
>  #include "exec/cputlb.h"
>  #include "translate-all.h"
>  
> @@ -57,7 +57,7 @@
>  #if !defined(CONFIG_USER_ONLY)
>  static int in_migration;
>  
> -RAMList ram_list = { .blocks = QTAILQ_HEAD_INITIALIZER(ram_list.blocks) };
> +RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
>  
>  static MemoryRegion *system_memory;
>  static MemoryRegion *system_io;
> @@ -1021,17 +1021,24 @@ static ram_addr_t find_ram_offset(ram_addr_t size)
>      RAMBlock *block, *next_block;
>      ram_addr_t offset = RAM_ADDR_MAX, mingap = RAM_ADDR_MAX;
>  
> +    /* ram_list must be protected by a mutex (for writes), or

s/a mutex/the iothread lock/

> +     * an rcu critical section (for reads). Currently this code
> +     * is called with the iothread lock held. If that changes,
> +     * make sure to protect ram_list with an rcu critical section.
> +    */

Right---also because find_ram_offset returns a value that is within the
RCU-protected list.

> @@ -1126,13 +1138,18 @@ static int memory_try_enable_merging(void *addr, size_t len)
>  ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
>                                     MemoryRegion *mr)
>  {
> -    RAMBlock *block, *new_block;
> +    RAMBlock *block, *new_block, *last_block = 0;
>  
>      size = TARGET_PAGE_ALIGN(size);
>      new_block = g_malloc0(sizeof(*new_block));
>  
> -    /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
> +    /* ram_list needs to be protected by the ram_list mutex
> +     * for writes, and by an rcu critical section for reads.
> +     * Currently this function is called with the iothread lock
> +     * being held, so we can foregoe protecting the ram_list.
> +     * when that changes, acquire the ram_list mutex before
> +     * sorting and writing the list below.
> +     */

Please remove other references to the ram_list mutex---in practice, the
ram_list is protected by the BQL.  The practice of using the BQL for the
write side is not going to go away anytime soon, so we might as well
make the code simpler and codify it.

>      new_block->mr = mr;
>      new_block->offset = find_ram_offset(size);
>      if (host) {
> @@ -1164,21 +1181,28 @@ ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
>      }
>      new_block->length = size;
>  
> +    /* when this code is called without the iothread lock, protect
> +     * the ram_list here by acquiring its mutex.
> +     */
>      /* Keep the list sorted from biggest to smallest block.  */
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
> +        last_block = block;
>          if (block->length < new_block->length) {
>              break;
>          }
>      }
>      if (block) {
> -        QTAILQ_INSERT_BEFORE(block, new_block, next);
> +        QLIST_INSERT_BEFORE_RCU(block, new_block, next);
>      } else {
> -        QTAILQ_INSERT_TAIL(&ram_list.blocks, new_block, next);
> +        if (last_block) {
> +            QLIST_INSERT_AFTER_RCU(last_block, new_block, next);
> +        } else { /* list is empty */
> +            QLIST_INSERT_HEAD_RCU(&ram_list.blocks, new_block, next);
> +        }
>      }
>      ram_list.mru_block = NULL;
>  
>      ram_list.version++;
> -    qemu_mutex_unlock_ramlist();
>  
>      ram_list.phys_dirty = g_realloc(ram_list.phys_dirty,
>                                         last_ram_offset() >> TARGET_PAGE_BITS);
> @@ -1204,29 +1228,32 @@ void qemu_ram_free_from_ptr(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    /* This assumes the iothread lock is taken here too.
> +     * when this code is called without the iothread lock, protect
> +     * the ram_list here by acquiring its mutex.
> +     */
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr == block->offset) {
> -            QTAILQ_REMOVE(&ram_list.blocks, block, next);
> +            QLIST_REMOVE_RCU(block, next);
>              ram_list.mru_block = NULL;
>              ram_list.version++;
>              g_free(block);
>              break;
>          }
>      }
> -    qemu_mutex_unlock_ramlist();
>  }
>  
>  void qemu_ram_free(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* This assumes the iothread lock is taken here too.  */
> -    qemu_mutex_lock_ramlist();
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    /* This assumes the iothread lock is taken here too.
> +     * if that changes, accesses to ram_list need to be protected
> +     * by a mutex (writes) or an rcu read lock (reads)
> +     */

Same here.  Also, wherever a loop writes to the ram_list, there's no
need to mention RCU.  It's already evident from the code's usage of
QLIST_FOREACH_RCU.  So:

- read-side with rcu_read_lock -> no comment

- read-side without rcu_read_lock, returns RCU-protected value (which
includes ram_addr_t) -> comment that caller must hold RCU lock or
iothread mutex

- read-side without rcu_read_lock, does not return RCU-protected value
-> don't do it, just use rcu_read_lock :)

- write-side -> no comment (but please make sure the policy is
documented where ram_list is declared)


> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr == block->offset) {
> -            QTAILQ_REMOVE(&ram_list.blocks, block, next);
> +            QLIST_REMOVE_RCU(block, next);
>              ram_list.mru_block = NULL;
>              ram_list.version++;
>              if (block->flags & RAM_PREALLOC_MASK) {

...

            g_free(block);

This should be changed to call_rcu.

> @@ -1253,8 +1280,6 @@ void qemu_ram_free(ram_addr_t addr)
>              break;
>          }
>      }
> -    qemu_mutex_unlock_ramlist();
> -
>  }
>  
>  #ifndef _WIN32
> @@ -1265,7 +1290,8 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
>      int flags;
>      void *area, *vaddr;
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    rcu_read_lock();

This is a write side, so no need to use rcu_read_lock.

(Note that I'm not commenting on all the occurrences).

> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          offset = addr - block->offset;
>          if (offset < block->length) {
>              vaddr = block->host + offset;
> @@ -1313,9 +1339,11 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
>                  memory_try_enable_merging(vaddr, length);
>                  qemu_ram_setup_dump(vaddr, length);
>              }
> -            return;
> +            goto unlock_out;
>          }
>      }
> +unlock_out:
> +    rcu_read_unlock();
>  }
>  #endif /* !_WIN32 */
>  
> @@ -1323,12 +1351,15 @@ static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* The list is protected by the iothread lock here.  */
> +   /* This assumes the iothread lock is taken here too.
> +     * if that changes, accesses to ram_list need to be protected
> +     * by a mutex (writes) or an rcu read lock (reads)
> +     */
>      block = ram_list.mru_block;
>      if (block && addr - block->offset < block->length) {
>          goto found;
>      }
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr - block->offset < block->length) {
>              goto found;
>          }
> @@ -1378,8 +1409,11 @@ static void *qemu_safe_ram_ptr(ram_addr_t addr)
>  {
>      RAMBlock *block;
>  
> -    /* The list is protected by the iothread lock here.  */
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    /* This assumes the iothread lock is taken here too.
> +     * if that changes, accesses to ram_list need to be protected
> +     * by a mutex (writes) or an rcu read lock (reads)
> +     */
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          if (addr - block->offset < block->length) {
>              if (xen_enabled()) {
>                  /* We need to check if the requested address is in the RAM
> @@ -1399,7 +1433,6 @@ static void *qemu_safe_ram_ptr(ram_addr_t addr)
>  
>      fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
>      abort();
> -
>      return NULL;
>  }
>  
> @@ -1407,6 +1440,7 @@ static void *qemu_safe_ram_ptr(ram_addr_t addr)
>   * but takes a size argument */
>  static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
>  {
> +    void *ptr = NULL;
>      if (*size == 0) {
>          return NULL;
>      }
> @@ -1414,18 +1448,22 @@ static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
>          return xen_map_cache(addr, *size, 1);
>      } else {
>          RAMBlock *block;
> -
> -        QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +        rcu_read_lock();
> +        QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>              if (addr - block->offset < block->length) {
>                  if (addr - block->offset + *size > block->length)
>                      *size = block->length - addr + block->offset;
> -                return block->host + (addr - block->offset);
> +                ptr = block->host + (addr - block->offset);
> +                goto unlock_out;
>              }
>          }
>  
>          fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
>          abort();
>      }
> +unlock_out:
> +    rcu_read_unlock();
> +    return ptr;
>  }

This returns a RCU-protected value.  However, it's okay to include the
function in a "small" critical section instead of involving the caller,
because the caller is ensuring that the returned block doesn't disappear
(through memory_region_ref/unref).  Perhaps you can add a comment.

>  /* Some of the softmmu routines need to translate from a host pointer
> @@ -1434,32 +1472,35 @@ MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
>  {
>      RAMBlock *block;
>      uint8_t *host = ptr;
> +    MemoryRegion *mr = NULL;
>  
>      if (xen_enabled()) {
>          *ram_addr = xen_ram_addr_from_mapcache(ptr);
>          return qemu_get_ram_block(*ram_addr)->mr;
>      }
> -
> +    rcu_read_lock();
>      block = ram_list.mru_block;
>      if (block && block->host && host - block->host < block->length) {
> -        goto found;
> +        *ram_addr = block->offset + (host - block->host);
> +        mr = block->mr;
> +        goto unlock_out;
>      }
>  
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          /* This case append when the block is not mapped. */
>          if (block->host == NULL) {
>              continue;
>          }
>          if (host - block->host < block->length) {
> -            goto found;
> +            *ram_addr = block->offset + (host - block->host);
> +            mr = block->mr;
> +            goto unlock_out;
>          }
>      }
>  
> -    return NULL;
> -
> -found:
> -    *ram_addr = block->offset + (host - block->host);
> -    return block->mr;
> +unlock_out:
> +    rcu_read_unlock();
> +    return mr;

Same here.  Perhaps add a comment that the caller must either have
already a reference to mr (as is the case for address_space_unmap) or be
holding the iothread mutex.

In general, a good comment is IMO one that occurs once or twice.  A bad
comment is one that is cut-and-pasted all over the place.

Paolo

>  }
>  
>  static void notdirty_mem_write(void *opaque, hwaddr ram_addr,
> @@ -2709,9 +2750,10 @@ bool cpu_physical_memory_is_io(hwaddr phys_addr)
>  void qemu_ram_foreach_block(RAMBlockIterFunc func, void *opaque)
>  {
>      RAMBlock *block;
> -
> -    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
> +    rcu_read_lock();
> +    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
>          func(block->host, block->offset, block->length, opaque);
>      }
> +    rcu_read_unlock();
>  }
>  #endif
> diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
> index fdfea21..c2efaca 100644
> --- a/hw/9pfs/virtio-9p-synth.c
> +++ b/hw/9pfs/virtio-9p-synth.c
> @@ -18,7 +18,7 @@
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-synth.h"
>  #include "qemu/rcu.h"
> -
> +#include "qemu/rcu_queue.h"
>  #include <sys/stat.h>
>  
>  /* Root node for synth file system */
> diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
> index e088089..9cd8a30 100644
> --- a/include/exec/cpu-all.h
> +++ b/include/exec/cpu-all.h
> @@ -457,7 +457,7 @@ typedef struct RAMBlock {
>      /* Reads can take either the iothread or the ramlist lock.
>       * Writes must take both locks.
>       */
> -    QTAILQ_ENTRY(RAMBlock) next;
> +    QLIST_ENTRY(RAMBlock) next;
>  #if defined(__linux__) && !defined(TARGET_S390X)
>      int fd;
>  #endif
> @@ -469,7 +469,7 @@ typedef struct RAMList {
>      uint8_t *phys_dirty;
>      RAMBlock *mru_block;
>      /* Protected by the ramlist lock.  */
> -    QTAILQ_HEAD(, RAMBlock) blocks;
> +    QLIST_HEAD(, RAMBlock) blocks;
>      uint32_t version;
>  } RAMList;
>  extern RAMList ram_list;
> diff --git a/include/qemu/rcu_queue.h b/include/qemu/rcu_queue.h
> index e2b8ba5..d159850 100644
> --- a/include/qemu/rcu_queue.h
> +++ b/include/qemu/rcu_queue.h
> @@ -37,6 +37,14 @@
>  extern "C" {
>  #endif
>  
> +
> +/*
> + * List access methods.
> + */
> +#define QLIST_EMPTY_RCU(head) (atomic_rcu_read(&(head)->lh_first) == NULL)
> +#define QLIST_FIRST_RCU(head) (atomic_rcu_read(&(head)->lh_first))
> +#define QLIST_NEXT_RCU(elm, field) (atomic_rcu_read(&(elm)->field.le_next))
> +
>  /*
>   * List functions.
>   */
>
Mike D. Day Sept. 3, 2013, 1:56 p.m. UTC | #2
On Fri, Aug 30, 2013 at 12:38 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>
> > @@ -867,7 +879,12 @@ static int ram_load(QEMUFile *f, void *opaque, int
version_id)
> >      if (version_id < 4 || version_id > 4) {
> >          return -EINVAL;
> >      }
> > -
> > +    /* this implements a long-running RCU critical section.
> > +     * When rcu reclaims in the code start to become numerous
> > +     * it will be necessary to reduce the granularity of this critical
> > +     * section.
> > +     */
>
> Please add the same comment (and a rcu_read_lock/unlock pair replacing
> the ramlist mutex) in ram_save_iterate, too.

Just double checking on this particular change. In practice ram_save
manipulates the ram_list indirectly through ram_save_block. But I'm
assuming you want this change because of the ram state info that persists
between calls to ram_save (ram_list version in particular). Also, there is
potential for the callback functions ram_control_*_iterate to manipulate
the ram_list.

I'm adding the rcu_read_lock/unlock pair in ram_load. It will be recursive
with the same calls in ram_save_block, but as you pointed out this is low
overhead.

With this change in my working code, ram_control_*_iterate are called from
within an rcu critical section.

Mike
Paolo Bonzini Sept. 3, 2013, 2:09 p.m. UTC | #3
Il 03/09/2013 15:56, Mike Day ha scritto:
>> > +    /* this implements a long-running RCU critical section.
>> > +     * When rcu reclaims in the code start to become numerous
>> > +     * it will be necessary to reduce the granularity of this critical
>> > +     * section.
>> > +     */
>>
>> Please add the same comment (and a rcu_read_lock/unlock pair replacing
>> the ramlist mutex) in ram_save_iterate, too.
> 
> Just double checking on this particular change. In practice ram_save
> manipulates the ram_list indirectly through ram_save_block. But I'm
> assuming you want this change because of the ram state info that
> persists between calls to ram_save (ram_list version in particular).

ram_list.version is not really a problem, but last_seen_block has to
persist across ram_save_block calls.

> Also, there is potential for the callback functions
> ram_control_*_iterate to manipulate the ram_list.

I think that's right now not possible (and they could use
rcu_read_lock/unlock as well).

Paolo
Mike D. Day Sept. 3, 2013, 2:19 p.m. UTC | #4
On Tue, Sep 3, 2013 at 10:09 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>
> Il 03/09/2013 15:56, Mike Day ha scritto:
> >> > +    /* this implements a long-running RCU critical section.
> >> > +     * When rcu reclaims in the code start to become numerous
> >> > +     * it will be necessary to reduce the granularity of this
critical
> >> > +     * section.
> >> > +     */
> >>
> >> Please add the same comment (and a rcu_read_lock/unlock pair replacing
> >> the ramlist mutex) in ram_save_iterate, too.
> >
> > Just double checking on this particular change. In practice ram_save
> > manipulates the ram_list indirectly through ram_save_block. But I'm
> > assuming you want this change because of the ram state info that
> > persists between calls to ram_save (ram_list version in particular).
>
> ram_list.version is not really a problem, but last_seen_block has to
> persist across ram_save_block calls.

Got it. that's a subtle point.

> > Also, there is potential for the callback functions
> > ram_control_*_iterate to manipulate the ram_list.
>
> I think that's right now not possible (and they could use
> rcu_read_lock/unlock as well).

Yeah. So how about we say for now that the rcu critical section status upon
entry to the ram_control_*_iterate functions is undefined. I'll make some
updates.

Mike
Paolo Bonzini Sept. 3, 2013, 2:21 p.m. UTC | #5
Il 03/09/2013 16:19, Mike Day ha scritto:
> 
> Yeah. So how about we say for now that the rcu critical section status
> upon entry to the ram_control_*_iterate functions is undefined. I'll
> make some updates.

Sure.

Paolo
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index 68a7ab7..3f4d676 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -49,6 +49,7 @@ 
 #include "trace.h"
 #include "exec/cpu-all.h"
 #include "hw/acpi/acpi.h"
+#include "qemu/rcu_queue.h"
 
 #ifdef DEBUG_ARCH_INIT
 #define DPRINTF(fmt, ...) \
@@ -398,7 +399,11 @@  static void migration_bitmap_sync(void)
     trace_migration_bitmap_sync_start();
     address_space_sync_dirty_bitmap(&address_space_memory);
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+     /* This assumes the iothread lock or the ram_list mutex is taken.
+     * if that changes, accesses to ram_list need to be protected
+     * by a mutex (writes) or an rcu read lock (reads)
+     */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         for (addr = 0; addr < block->length; addr += TARGET_PAGE_SIZE) {
             if (memory_region_test_and_clear_dirty(block->mr,
                                                    addr, TARGET_PAGE_SIZE,
@@ -457,8 +462,13 @@  static int ram_save_block(QEMUFile *f, bool last_stage)
     MemoryRegion *mr;
     ram_addr_t current_addr;
 
+    /* Sometimes called with the ram_list mutex held (ram_save_complete)
+     * also called WITHOUT the ram_list mutex held. (ram_save_iterate).
+     * Protect ram_list with an rcu critical section.
+     */
+    rcu_read_lock();
     if (!block)
-        block = QTAILQ_FIRST(&ram_list.blocks);
+        block = QLIST_FIRST_RCU(&ram_list.blocks);
 
     while (true) {
         mr = block->mr;
@@ -469,9 +479,9 @@  static int ram_save_block(QEMUFile *f, bool last_stage)
         }
         if (offset >= block->length) {
             offset = 0;
-            block = QTAILQ_NEXT(block, next);
+            block = QLIST_NEXT_RCU(block, next);
             if (!block) {
-                block = QTAILQ_FIRST(&ram_list.blocks);
+                block = QLIST_FIRST_RCU(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
             }
@@ -526,6 +536,7 @@  static int ram_save_block(QEMUFile *f, bool last_stage)
             }
         }
     }
+    rcu_read_unlock();
     last_seen_block = block;
     last_offset = offset;
 
@@ -565,10 +576,10 @@  uint64_t ram_bytes_total(void)
 {
     RAMBlock *block;
     uint64_t total = 0;
-
-    QTAILQ_FOREACH(block, &ram_list.blocks, next)
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
         total += block->length;
-
+    rcu_read_unlock();
     return total;
 }
 
@@ -631,7 +642,7 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     qemu_mutex_lock_iothread();
-    qemu_mutex_lock_ramlist();
+    rcu_read_lock();
     bytes_transferred = 0;
     reset_ram_globals();
 
@@ -641,13 +652,13 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
 
     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         qemu_put_byte(f, strlen(block->idstr));
         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
         qemu_put_be64(f, block->length);
     }
 
-    qemu_mutex_unlock_ramlist();
+    rcu_read_unlock();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
@@ -664,8 +675,6 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
     int64_t t0;
     int total_sent = 0;
 
-    qemu_mutex_lock_ramlist();
-
     if (ram_list.version != last_version) {
         reset_ram_globals();
     }
@@ -701,8 +710,6 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
-    qemu_mutex_unlock_ramlist();
-
     /*
      * Must occur before EOS (or any QEMUFile operation)
      * because of RDMA protocol.
@@ -815,6 +822,10 @@  static inline void *host_from_stream_offset(QEMUFile *f,
     char id[256];
     uint8_t len;
 
+    /* Must be called from within a rcu critical section.
+     * Returns a pointer from within the RCU-protected ram_list.
+     */
+
     if (flags & RAM_SAVE_FLAG_CONTINUE) {
         if (!block) {
             fprintf(stderr, "Ack, bad migration stream!\n");
@@ -828,9 +839,10 @@  static inline void *host_from_stream_offset(QEMUFile *f,
     qemu_get_buffer(f, (uint8_t *)id, len);
     id[len] = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
-        if (!strncmp(id, block->idstr, sizeof(id)))
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
+        if (!strncmp(id, block->idstr, sizeof(id))) {
             return memory_region_get_ram_ptr(block->mr) + offset;
+        }
     }
 
     fprintf(stderr, "Can't find block %s!\n", id);
@@ -867,7 +879,12 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
     if (version_id < 4 || version_id > 4) {
         return -EINVAL;
     }
-
+    /* this implements a long-running RCU critical section.
+     * When rcu reclaims in the code start to become numerous
+     * it will be necessary to reduce the granularity of this critical
+     * section.
+     */
+    rcu_read_lock();
     do {
         addr = qemu_get_be64(f);
 
@@ -889,21 +906,19 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
                     qemu_get_buffer(f, (uint8_t *)id, len);
                     id[len] = 0;
                     length = qemu_get_be64(f);
-
-                    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+                    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
                         if (!strncmp(id, block->idstr, sizeof(id))) {
                             if (block->length != length) {
                                 fprintf(stderr,
                                         "Length mismatch: %s: " RAM_ADDR_FMT
                                         " in != " RAM_ADDR_FMT "\n", id, length,
                                         block->length);
-                                ret =  -EINVAL;
+                                ret = -EINVAL;
                                 goto done;
                             }
                             break;
                         }
                     }
-
                     if (!block) {
                         fprintf(stderr, "Unknown ramblock \"%s\", cannot "
                                 "accept migration\n", id);
@@ -916,30 +931,30 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
             }
         }
 
+        /* Call host_from_stream_offset while holding an rcu read lock.
+         * It returns a pointer from within the rcu-protected ram_list.
+         */
         if (flags & RAM_SAVE_FLAG_COMPRESS) {
-            void *host;
             uint8_t ch;
-
-            host = host_from_stream_offset(f, addr, flags);
+            void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
-                return -EINVAL;
+                ret = -EINVAL;
+                goto done;
             }
-
             ch = qemu_get_byte(f);
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
         } else if (flags & RAM_SAVE_FLAG_PAGE) {
-            void *host;
-
-            host = host_from_stream_offset(f, addr, flags);
+            void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
-                return -EINVAL;
+                ret = -EINVAL;
+                goto done;
             }
-
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
         } else if (flags & RAM_SAVE_FLAG_XBZRLE) {
             void *host = host_from_stream_offset(f, addr, flags);
             if (!host) {
-                return -EINVAL;
+                ret = -EINVAL;
+                goto done;
             }
 
             if (load_xbzrle(f, addr, host) < 0) {
@@ -957,6 +972,7 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
     } while (!(flags & RAM_SAVE_FLAG_EOS));
 
 done:
+    rcu_read_unlock();
     DPRINTF("Completed load of VM with exit code %d seq iteration "
             "%" PRIu64 "\n", ret, seq_iter);
     return ret;
diff --git a/exec.c b/exec.c
index 5eebcc1..d1132da 100644
--- a/exec.c
+++ b/exec.c
@@ -46,7 +46,7 @@ 
 #endif
 #include "exec/cpu-all.h"
 #include "qemu/tls.h"
-
+#include "qemu/rcu_queue.h"
 #include "exec/cputlb.h"
 #include "translate-all.h"
 
@@ -57,7 +57,7 @@ 
 #if !defined(CONFIG_USER_ONLY)
 static int in_migration;
 
-RAMList ram_list = { .blocks = QTAILQ_HEAD_INITIALIZER(ram_list.blocks) };
+RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
 
 static MemoryRegion *system_memory;
 static MemoryRegion *system_io;
@@ -1021,17 +1021,24 @@  static ram_addr_t find_ram_offset(ram_addr_t size)
     RAMBlock *block, *next_block;
     ram_addr_t offset = RAM_ADDR_MAX, mingap = RAM_ADDR_MAX;
 
+    /* ram_list must be protected by a mutex (for writes), or
+     * an rcu critical section (for reads). Currently this code
+     * is called with the iothread lock held. If that changes,
+     * make sure to protect ram_list with an rcu critical section.
+    */
+
     assert(size != 0); /* it would hand out same offset multiple times */
 
-    if (QTAILQ_EMPTY(&ram_list.blocks))
+    if (QLIST_EMPTY_RCU(&ram_list.blocks)) {
         return 0;
+    }
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         ram_addr_t end, next = RAM_ADDR_MAX;
 
         end = block->offset + block->length;
 
-        QTAILQ_FOREACH(next_block, &ram_list.blocks, next) {
+        QLIST_FOREACH_RCU(next_block, &ram_list.blocks, next) {
             if (next_block->offset >= end) {
                 next = MIN(next, next_block->offset);
             }
@@ -1056,9 +1063,11 @@  ram_addr_t last_ram_offset(void)
     RAMBlock *block;
     ram_addr_t last = 0;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next)
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         last = MAX(last, block->offset + block->length);
-
+    }
+    rcu_read_unlock();
     return last;
 }
 
@@ -1083,7 +1092,12 @@  void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
     RAMBlock *new_block, *block;
 
     new_block = NULL;
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+
+    /* Assumes that the iothread lock is taken ... if that changes,
+     * add an rcu_read_lock()/unlock pair when traversing the
+     * ram list
+     */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (block->offset == addr) {
             new_block = block;
             break;
@@ -1102,15 +1116,13 @@  void qemu_ram_set_idstr(ram_addr_t addr, const char *name, DeviceState *dev)
     pstrcat(new_block->idstr, sizeof(new_block->idstr), name);
 
     /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (block != new_block && !strcmp(block->idstr, new_block->idstr)) {
             fprintf(stderr, "RAMBlock \"%s\" already registered, abort!\n",
                     new_block->idstr);
             abort();
         }
     }
-    qemu_mutex_unlock_ramlist();
 }
 
 static int memory_try_enable_merging(void *addr, size_t len)
@@ -1126,13 +1138,18 @@  static int memory_try_enable_merging(void *addr, size_t len)
 ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
                                    MemoryRegion *mr)
 {
-    RAMBlock *block, *new_block;
+    RAMBlock *block, *new_block, *last_block = 0;
 
     size = TARGET_PAGE_ALIGN(size);
     new_block = g_malloc0(sizeof(*new_block));
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
+    /* ram_list needs to be protected by the ram_list mutex
+     * for writes, and by an rcu critical section for reads.
+     * Currently this function is called with the iothread lock
+     * being held, so we can foregoe protecting the ram_list.
+     * when that changes, acquire the ram_list mutex before
+     * sorting and writing the list below.
+     */
     new_block->mr = mr;
     new_block->offset = find_ram_offset(size);
     if (host) {
@@ -1164,21 +1181,28 @@  ram_addr_t qemu_ram_alloc_from_ptr(ram_addr_t size, void *host,
     }
     new_block->length = size;
 
+    /* when this code is called without the iothread lock, protect
+     * the ram_list here by acquiring its mutex.
+     */
     /* Keep the list sorted from biggest to smallest block.  */
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
+        last_block = block;
         if (block->length < new_block->length) {
             break;
         }
     }
     if (block) {
-        QTAILQ_INSERT_BEFORE(block, new_block, next);
+        QLIST_INSERT_BEFORE_RCU(block, new_block, next);
     } else {
-        QTAILQ_INSERT_TAIL(&ram_list.blocks, new_block, next);
+        if (last_block) {
+            QLIST_INSERT_AFTER_RCU(last_block, new_block, next);
+        } else { /* list is empty */
+            QLIST_INSERT_HEAD_RCU(&ram_list.blocks, new_block, next);
+        }
     }
     ram_list.mru_block = NULL;
 
     ram_list.version++;
-    qemu_mutex_unlock_ramlist();
 
     ram_list.phys_dirty = g_realloc(ram_list.phys_dirty,
                                        last_ram_offset() >> TARGET_PAGE_BITS);
@@ -1204,29 +1228,32 @@  void qemu_ram_free_from_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    /* This assumes the iothread lock is taken here too.
+     * when this code is called without the iothread lock, protect
+     * the ram_list here by acquiring its mutex.
+     */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QTAILQ_REMOVE(&ram_list.blocks, block, next);
+            QLIST_REMOVE_RCU(block, next);
             ram_list.mru_block = NULL;
             ram_list.version++;
             g_free(block);
             break;
         }
     }
-    qemu_mutex_unlock_ramlist();
 }
 
 void qemu_ram_free(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* This assumes the iothread lock is taken here too.  */
-    qemu_mutex_lock_ramlist();
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    /* This assumes the iothread lock is taken here too.
+     * if that changes, accesses to ram_list need to be protected
+     * by a mutex (writes) or an rcu read lock (reads)
+     */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
-            QTAILQ_REMOVE(&ram_list.blocks, block, next);
+            QLIST_REMOVE_RCU(block, next);
             ram_list.mru_block = NULL;
             ram_list.version++;
             if (block->flags & RAM_PREALLOC_MASK) {
@@ -1253,8 +1280,6 @@  void qemu_ram_free(ram_addr_t addr)
             break;
         }
     }
-    qemu_mutex_unlock_ramlist();
-
 }
 
 #ifndef _WIN32
@@ -1265,7 +1290,8 @@  void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
     int flags;
     void *area, *vaddr;
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         offset = addr - block->offset;
         if (offset < block->length) {
             vaddr = block->host + offset;
@@ -1313,9 +1339,11 @@  void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
                 memory_try_enable_merging(vaddr, length);
                 qemu_ram_setup_dump(vaddr, length);
             }
-            return;
+            goto unlock_out;
         }
     }
+unlock_out:
+    rcu_read_unlock();
 }
 #endif /* !_WIN32 */
 
@@ -1323,12 +1351,15 @@  static RAMBlock *qemu_get_ram_block(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* The list is protected by the iothread lock here.  */
+   /* This assumes the iothread lock is taken here too.
+     * if that changes, accesses to ram_list need to be protected
+     * by a mutex (writes) or an rcu read lock (reads)
+     */
     block = ram_list.mru_block;
     if (block && addr - block->offset < block->length) {
         goto found;
     }
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->length) {
             goto found;
         }
@@ -1378,8 +1409,11 @@  static void *qemu_safe_ram_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    /* The list is protected by the iothread lock here.  */
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    /* This assumes the iothread lock is taken here too.
+     * if that changes, accesses to ram_list need to be protected
+     * by a mutex (writes) or an rcu read lock (reads)
+     */
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->length) {
             if (xen_enabled()) {
                 /* We need to check if the requested address is in the RAM
@@ -1399,7 +1433,6 @@  static void *qemu_safe_ram_ptr(ram_addr_t addr)
 
     fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
     abort();
-
     return NULL;
 }
 
@@ -1407,6 +1440,7 @@  static void *qemu_safe_ram_ptr(ram_addr_t addr)
  * but takes a size argument */
 static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
 {
+    void *ptr = NULL;
     if (*size == 0) {
         return NULL;
     }
@@ -1414,18 +1448,22 @@  static void *qemu_ram_ptr_length(ram_addr_t addr, hwaddr *size)
         return xen_map_cache(addr, *size, 1);
     } else {
         RAMBlock *block;
-
-        QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+        rcu_read_lock();
+        QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
             if (addr - block->offset < block->length) {
                 if (addr - block->offset + *size > block->length)
                     *size = block->length - addr + block->offset;
-                return block->host + (addr - block->offset);
+                ptr = block->host + (addr - block->offset);
+                goto unlock_out;
             }
         }
 
         fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
         abort();
     }
+unlock_out:
+    rcu_read_unlock();
+    return ptr;
 }
 
 /* Some of the softmmu routines need to translate from a host pointer
@@ -1434,32 +1472,35 @@  MemoryRegion *qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
 {
     RAMBlock *block;
     uint8_t *host = ptr;
+    MemoryRegion *mr = NULL;
 
     if (xen_enabled()) {
         *ram_addr = xen_ram_addr_from_mapcache(ptr);
         return qemu_get_ram_block(*ram_addr)->mr;
     }
-
+    rcu_read_lock();
     block = ram_list.mru_block;
     if (block && block->host && host - block->host < block->length) {
-        goto found;
+        *ram_addr = block->offset + (host - block->host);
+        mr = block->mr;
+        goto unlock_out;
     }
 
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         /* This case append when the block is not mapped. */
         if (block->host == NULL) {
             continue;
         }
         if (host - block->host < block->length) {
-            goto found;
+            *ram_addr = block->offset + (host - block->host);
+            mr = block->mr;
+            goto unlock_out;
         }
     }
 
-    return NULL;
-
-found:
-    *ram_addr = block->offset + (host - block->host);
-    return block->mr;
+unlock_out:
+    rcu_read_unlock();
+    return mr;
 }
 
 static void notdirty_mem_write(void *opaque, hwaddr ram_addr,
@@ -2709,9 +2750,10 @@  bool cpu_physical_memory_is_io(hwaddr phys_addr)
 void qemu_ram_foreach_block(RAMBlockIterFunc func, void *opaque)
 {
     RAMBlock *block;
-
-    QTAILQ_FOREACH(block, &ram_list.blocks, next) {
+    rcu_read_lock();
+    QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
         func(block->host, block->offset, block->length, opaque);
     }
+    rcu_read_unlock();
 }
 #endif
diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index fdfea21..c2efaca 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -18,7 +18,7 @@ 
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-synth.h"
 #include "qemu/rcu.h"
-
+#include "qemu/rcu_queue.h"
 #include <sys/stat.h>
 
 /* Root node for synth file system */
diff --git a/include/exec/cpu-all.h b/include/exec/cpu-all.h
index e088089..9cd8a30 100644
--- a/include/exec/cpu-all.h
+++ b/include/exec/cpu-all.h
@@ -457,7 +457,7 @@  typedef struct RAMBlock {
     /* Reads can take either the iothread or the ramlist lock.
      * Writes must take both locks.
      */
-    QTAILQ_ENTRY(RAMBlock) next;
+    QLIST_ENTRY(RAMBlock) next;
 #if defined(__linux__) && !defined(TARGET_S390X)
     int fd;
 #endif
@@ -469,7 +469,7 @@  typedef struct RAMList {
     uint8_t *phys_dirty;
     RAMBlock *mru_block;
     /* Protected by the ramlist lock.  */
-    QTAILQ_HEAD(, RAMBlock) blocks;
+    QLIST_HEAD(, RAMBlock) blocks;
     uint32_t version;
 } RAMList;
 extern RAMList ram_list;
diff --git a/include/qemu/rcu_queue.h b/include/qemu/rcu_queue.h
index e2b8ba5..d159850 100644
--- a/include/qemu/rcu_queue.h
+++ b/include/qemu/rcu_queue.h
@@ -37,6 +37,14 @@ 
 extern "C" {
 #endif
 
+
+/*
+ * List access methods.
+ */
+#define QLIST_EMPTY_RCU(head) (atomic_rcu_read(&(head)->lh_first) == NULL)
+#define QLIST_FIRST_RCU(head) (atomic_rcu_read(&(head)->lh_first))
+#define QLIST_NEXT_RCU(elm, field) (atomic_rcu_read(&(elm)->field.le_next))
+
 /*
  * List functions.
  */