diff mbox series

[v3] eliminate mutex in fast path of __register_frame

Message ID 21a63ae5-1f97-e084-afeb-c74e5cba137d@in.tum.de
State New
Headers show
Series [v3] eliminate mutex in fast path of __register_frame | expand

Commit Message

Thomas Neumann June 26, 2022, 9:13 a.m. UTC
NOTE: A stress test program and a detailed walkthrough that breaks this
patch into manageable parts can be found here:
https://databasearchitects.blogspot.com/2022/06/making-unwinding-through-jit-ed-code.html

The __register_frame/__deregister_frame functions are used to register
unwinding frames from JITed code in a sorted list. That list itself
is protected by object_mutex, which leads to terrible performance
in multi-threaded code and is somewhat expensive even if single-threaded.
There was already a fast-path that avoided taking the mutex if no
frame was registered at all.

This commit eliminates both the mutex and the sorted list from
the atomic fast path, and replaces it with a btree that uses
optimistic lock coupling during lookup. This allows for fully parallel
unwinding and is essential to scale exception handling to large
core counts.

Changes since v2:
- fix contention problem during unlocking

libgcc/ChangeLog:

        * unwind-dw2-fde.c (release_registered_frames): Cleanup at shutdown.
        (__register_frame_info_table_bases): Use btree in atomic fast path.
        (__deregister_frame_info_bases): Likewise.
        (_Unwind_Find_FDE): Likewise.
        (base_from_object): Make parameter const.
        (get_pc_range_from_fdes): Compute PC range for lookup.
        (get_pc_range): Likewise.
        * unwind-dw2-fde.h (last_fde): Make parameter const.
        * unwind-dw2-btree.h: New file.
---
 libgcc/unwind-dw2-btree.h | 953 ++++++++++++++++++++++++++++++++++++++
 libgcc/unwind-dw2-fde.c   | 194 ++++++--
 libgcc/unwind-dw2-fde.h   |   2 +-
 3 files changed, 1113 insertions(+), 36 deletions(-)
 create mode 100644 libgcc/unwind-dw2-btree.h

Comments

Jason Merrill Sept. 12, 2022, 4:04 p.m. UTC | #1
On 6/26/22 05:13, Thomas Neumann via Gcc-patches wrote:
> NOTE: A stress test program and a detailed walkthrough that breaks this
> patch into manageable parts can be found here:
> https://databasearchitects.blogspot.com/2022/06/making-unwinding-through-jit-ed-code.html
> 
> The __register_frame/__deregister_frame functions are used to register
> unwinding frames from JITed code in a sorted list. That list itself
> is protected by object_mutex, which leads to terrible performance
> in multi-threaded code and is somewhat expensive even if single-threaded.
> There was already a fast-path that avoided taking the mutex if no
> frame was registered at all.
> 
> This commit eliminates both the mutex and the sorted list from
> the atomic fast path, and replaces it with a btree that uses
> optimistic lock coupling during lookup. This allows for fully parallel
> unwinding and is essential to scale exception handling to large
> core counts.
> 
> Changes since v2:
> - fix contention problem during unlocking

Thanks!  The blog entries are very helpful for the review.

One general minor issue: Please add a period at the end of most comments.

> libgcc/ChangeLog:
> 
>          * unwind-dw2-fde.c (release_registered_frames): Cleanup at shutdown.
>          (__register_frame_info_table_bases): Use btree in atomic fast path.
>          (__deregister_frame_info_bases): Likewise.
>          (_Unwind_Find_FDE): Likewise.
>          (base_from_object): Make parameter const.
>          (get_pc_range_from_fdes): Compute PC range for lookup.
>          (get_pc_range): Likewise.
>          * unwind-dw2-fde.h (last_fde): Make parameter const.
>          * unwind-dw2-btree.h: New file.
> ---
>   libgcc/unwind-dw2-btree.h | 953 ++++++++++++++++++++++++++++++++++++++
>   libgcc/unwind-dw2-fde.c   | 194 ++++++--
>   libgcc/unwind-dw2-fde.h   |   2 +-
>   3 files changed, 1113 insertions(+), 36 deletions(-)
>   create mode 100644 libgcc/unwind-dw2-btree.h
> 
> diff --git a/libgcc/unwind-dw2-btree.h b/libgcc/unwind-dw2-btree.h
> new file mode 100644
> index 00000000000..3b2b6871b46
> --- /dev/null
> +++ b/libgcc/unwind-dw2-btree.h
> @@ -0,0 +1,953 @@
> +/* Lock-free btree for manually registered unwind frames  */
> +/* Copyright (C) 2022 Free Software Foundation, Inc.
> +   Contributed by Thomas Neumann
> +
> +This file is part of GCC.
> +
> +GCC is free software; you can redistribute it and/or modify it under
> +the terms of the GNU General Public License as published by the Free
> +Software Foundation; either version 3, or (at your option) any later
> +version.
> +
> +GCC is distributed in the hope that it will be useful, but WITHOUT ANY
> +WARRANTY; without even the implied warranty of MERCHANTABILITY or
> +FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
> +for more details.
> +
> +Under Section 7 of GPL version 3, you are granted additional
> +permissions described in the GCC Runtime Library Exception, version
> +3.1, as published by the Free Software Foundation.
> +
> +You should have received a copy of the GNU General Public License and
> +a copy of the GCC Runtime Library Exception along with this program;
> +see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
> +<http://www.gnu.org/licenses/>.  */
> +
> +#ifndef GCC_UNWIND_DW2_BTREE_H
> +#define GCC_UNWIND_DW2_BTREE_H
> +
> +#include <stdbool.h>
> +
> +// Common logic for version locks
> +struct version_lock
> +{
> +  // The lock itself. The lowest bit indicates an exclusive lock,
> +  // the second bit indicates waiting threads. All other bits are
> +  // used as counter to recognize changes.
> +  // Overflows are okay here, we must only prevent overflow to the
> +  // same value within one lock_optimistic/validate
> +  // range. Even on 32 bit platforms that would require 1 billion
> +  // frame registrations within the time span of a few assembler
> +  // instructions.
> +  uintptr_t version_lock;
> +};
> +
> +#ifdef __GTHREAD_HAS_COND
> +// We should never get contention within the tree as it rarely changes.
> +// But if we ever do get contention we use these for waiting
> +static __gthread_mutex_t version_lock_mutex = __GTHREAD_MUTEX_INIT;
> +static __gthread_cond_t version_lock_cond = __GTHREAD_COND_INIT;
> +#endif
> +
> +// Initialize in locked state
> +static inline void
> +version_lock_initialize_locked_exclusive (struct version_lock *vl)
> +{
> +  vl->version_lock = 1;
> +}
> +
> +// Try to lock the node exclusive
> +static inline bool
> +version_lock_try_lock_exclusive (struct version_lock *vl)
> +{
> +  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +  if (state & 1)
> +    return false;
> +  return __atomic_compare_exchange_n (&(vl->version_lock), &state, state | 1,
> +				      false, __ATOMIC_SEQ_CST,
> +				      __ATOMIC_SEQ_CST);
> +}
> +
> +// Lock the node exclusive, blocking as needed
> +static void
> +version_lock_lock_exclusive (struct version_lock *vl)
> +{
> +#ifndef __GTHREAD_HAS_COND
> +restart:
> +#endif
> +
> +  // We should virtually never get contention here, as frame
> +  // changes are rare
> +  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +  if (!(state & 1))
> +    {
> +      if (__atomic_compare_exchange_n (&(vl->version_lock), &state, state | 1,
> +				       false, __ATOMIC_SEQ_CST,
> +				       __ATOMIC_SEQ_CST))
> +	return;
> +    }
> +
> +    // We did get contention, wait properly
> +#ifdef __GTHREAD_HAS_COND
> +  __gthread_mutex_lock (&version_lock_mutex);
> +  state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +  while (true)
> +    {
> +      // Check if the lock is still held
> +      if (!(state & 1))
> +	{
> +	  if (__atomic_compare_exchange_n (&(vl->version_lock), &state,
> +					   state | 1, false, __ATOMIC_SEQ_CST,
> +					   __ATOMIC_SEQ_CST))
> +	    {
> +	      __gthread_mutex_unlock (&version_lock_mutex);
> +	      return;
> +	    }
> +	  else
> +	    {
> +	      continue;
> +	    }
> +	}
> +
> +      // Register waiting thread
> +      if (!(state & 2))
> +	{
> +	  if (!__atomic_compare_exchange_n (&(vl->version_lock), &state,
> +					    state | 2, false, __ATOMIC_SEQ_CST,
> +					    __ATOMIC_SEQ_CST))
> +	    continue;
> +	}
> +
> +      // And sleep
> +      __gthread_cond_wait (&version_lock_cond, &version_lock_mutex);
> +      state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +    }
> +#else
> +  // Spin if we do not have condition variables available
> +  // We expect no contention here, spinning should be okay
> +  goto restart;
> +#endif
> +}
> +
> +// Release a locked node and increase the version lock
> +static void
> +version_lock_unlock_exclusive (struct version_lock *vl)
> +{
> +  // increase version, reset exclusive lock bits
> +  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +  uintptr_t ns = (state + 4) & (~((uintptr_t) 3));
> +  state = __atomic_exchange_n (&(vl->version_lock), ns, __ATOMIC_SEQ_CST);
> +
> +#ifdef __GTHREAD_HAS_COND
> +  if (state & 2)
> +    {
> +      // Wake up waiting threads. This should be extremely rare.
> +      __gthread_mutex_lock (&version_lock_mutex);
> +      __gthread_cond_broadcast (&version_lock_cond);
> +      __gthread_mutex_unlock (&version_lock_mutex);
> +    }
> +#endif
> +}
> +
> +// Acquire an optimistic "lock". Note that this does not lock at all, it
> +// only allows for validation later
> +static inline bool
> +version_lock_lock_optimistic (const struct version_lock *vl, uintptr_t *lock)
> +{
> +  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +  *lock = state;
> +
> +  // Acquiring the lock fails when there is currently an exclusive lock
> +  return !(state & 1);
> +}
> +
> +// Validate a previously acquire lock

...previously acquired "lock"

> +static inline bool
> +version_lock_validate (const struct version_lock *vl, uintptr_t lock)
> +{
> +  // Prevent the reordering of non-atomic loads behind the atomic load.
> +  // Hans Boehm, Can Seqlocks Get Along with Programming Language Memory
> +  // Models?, Section 4.
> +  __atomic_thread_fence (__ATOMIC_ACQUIRE);
> +
> +  // Check that the node is still in the same state
> +  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
> +  return (state == lock);
> +}
> +
> +// The largest possible separator value
> +static const uintptr_t max_separator = ~((uintptr_t) (0));
> +
> +struct btree_node;
> +
> +// Inner entry. The child tree contains all entries <= separator
> +struct inner_entry
> +{
> +  uintptr_t separator;
> +  struct btree_node *child;
> +};
> +
> +// Leaf entry. Stores an object entry
> +struct leaf_entry
> +{
> +  uintptr_t base, size;
> +  struct object *ob;
> +};
> +
> +// node types
> +enum node_type
> +{
> +  btree_node_inner,
> +  btree_node_leaf,
> +  btree_node_free
> +};
> +
> +// Node sizes. Chosen such that the result size is roughly 256 bytes
> +#define max_fanout_inner 15
> +#define max_fanout_leaf 10
> +
> +// A btree node
> +struct btree_node
> +{
> +  // The version lock used for optimistic lock coupling
> +  struct version_lock version_lock;
> +  // The number of entries
> +  unsigned entry_count;
> +  // The type
> +  enum node_type type;
> +  // The payload
> +  union
> +  {
> +    // The inner nodes have fence keys, i.e., the right-most entry includes a
> +    // separator
> +    struct inner_entry children[max_fanout_inner];
> +    struct leaf_entry entries[max_fanout_leaf];
> +  } content;
> +};
> +
> +// Is an inner node?
> +static inline bool
> +btree_node_is_inner (const struct btree_node *n)
> +{
> +  return n->type == btree_node_inner;
> +}
> +
> +// Is a leaf node?
> +static inline bool
> +btree_node_is_leaf (const struct btree_node *n)
> +{
> +  return n->type == btree_node_leaf;
> +}
> +
> +// Should the node be merged?
> +static inline bool
> +btree_node_needs_merge (const struct btree_node *n)
> +{
> +  return n->entry_count < (btree_node_is_inner (n) ? (max_fanout_inner / 2)
> +						   : (max_fanout_leaf / 2));
> +}
> +
> +// Get the fence key for inner nodes
> +static inline uintptr_t
> +btree_node_get_fence_key (const struct btree_node *n)
> +{
> +  // For inner nodes we just return our right-most entry
> +  return n->content.children[n->entry_count - 1].separator;
> +}
> +
> +// Find the position for a slot in an inner node
> +static unsigned
> +btree_node_find_inner_slot (const struct btree_node *n, uintptr_t value)
> +{
> +  for (unsigned index = 0, ec = n->entry_count; index != ec; ++index)
> +    if (n->content.children[index].separator >= value)
> +      return index;
> +  return n->entry_count;
> +}
> +
> +// Find the position for a slot in a leaf node
> +static unsigned
> +btree_node_find_leaf_slot (const struct btree_node *n, uintptr_t value)
> +{
> +  for (unsigned index = 0, ec = n->entry_count; index != ec; ++index)
> +    if (n->content.entries[index].base + n->content.entries[index].size > value)
> +      return index;
> +  return n->entry_count;
> +}
> +
> +// Try to lock the node exclusive
> +static inline bool
> +btree_node_try_lock_exclusive (struct btree_node *n)
> +{
> +  return version_lock_try_lock_exclusive (&(n->version_lock));
> +}
> +
> +// Lock the node exclusive, blocking as needed
> +static inline void
> +btree_node_lock_exclusive (struct btree_node *n)
> +{
> +  version_lock_lock_exclusive (&(n->version_lock));
> +}
> +
> +// Release a locked node and increase the version lock
> +static inline void
> +btree_node_unlock_exclusive (struct btree_node *n)
> +{
> +  version_lock_unlock_exclusive (&(n->version_lock));
> +}
> +
> +// Acquire an optimistic "lock". Note that this does not lock at all, it
> +// only allows for validation later
> +static inline bool
> +btree_node_lock_optimistic (const struct btree_node *n, uintptr_t *lock)
> +{
> +  return version_lock_lock_optimistic (&(n->version_lock), lock);
> +}
> +
> +// Validate a previously acquire lock
> +static inline bool
> +btree_node_validate (const struct btree_node *n, uintptr_t lock)
> +{
> +  return version_lock_validate (&(n->version_lock), lock);
> +}
> +
> +// Insert a new separator after splitting
> +static void
> +btree_node_update_separator_after_split (struct btree_node *n,
> +					 uintptr_t old_separator,
> +					 uintptr_t new_separator,
> +					 struct btree_node *new_right)
> +{
> +  unsigned slot = btree_node_find_inner_slot (n, old_separator);
> +  for (unsigned index = n->entry_count; index > slot; --index)
> +    n->content.children[index] = n->content.children[index - 1];
> +  n->content.children[slot].separator = new_separator;
> +  n->content.children[slot + 1].child = new_right;
> +  n->entry_count++;
> +}
> +
> +// A btree. Suitable for static initialization, all members are zero at the
> +// beginning
> +struct btree
> +{
> +  // The root of the btree
> +  struct btree_node *root;
> +  // The free list of released node
> +  struct btree_node *free_list;
> +  // The version lock used to protect the root
> +  struct version_lock root_lock;
> +};
> +
> +// Initialize a btree. Not actually used, just for exposition
> +static inline void
> +btree_init (struct btree *t)
> +{
> +  t->root = NULL;
> +  t->free_list = NULL;
> +  t->root_lock.version_lock = 0;
> +};
> +
> +static void
> +btree_release_tree_recursively (struct btree *t, struct btree_node *n);
> +
> +// Destroy a tree and release all nodes
> +static void
> +btree_destroy (struct btree *t)
> +{
> +  // Disable the mechanism before cleaning up
> +  struct btree_node *old_root
> +    = __atomic_exchange_n (&(t->root), NULL, __ATOMIC_SEQ_CST);
> +  if (old_root)
> +    btree_release_tree_recursively (t, old_root);
> +
> +  // Release all free nodes
> +  while (t->free_list)
> +    {
> +      struct btree_node *next = t->free_list->content.children[0].child;
> +      free (t->free_list);
> +      t->free_list = next;
> +    }
> +}
> +
> +// Allocate a node. This node will be returned in locked exclusive state
> +static struct btree_node *
> +btree_allocate_node (struct btree *t, bool inner)
> +{
> +  while (true)
> +    {
> +      // Try the free list first
> +      struct btree_node *next_free
> +	= __atomic_load_n (&(t->free_list), __ATOMIC_SEQ_CST);
> +      if (next_free)
> +	{
> +	  if (!btree_node_try_lock_exclusive (next_free))
> +	    continue;
> +	  // The node might no longer be free, check that again after acquiring
> +	  // the exclusive lock
> +	  if (next_free->type == btree_node_free)
> +	    {
> +	      struct btree_node *ex = next_free;
> +	      if (__atomic_compare_exchange_n (
> +		    &(t->free_list), &ex, next_free->content.children[0].child,
> +		    false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
> +		{
> +		  next_free->entry_count = 0;
> +		  next_free->type = inner ? btree_node_inner : btree_node_leaf;
> +		  return next_free;
> +		}
> +	    }
> +	  btree_node_unlock_exclusive (next_free);
> +	  continue;
> +	}
> +
> +      // No free node available, allocate a new one
> +      struct btree_node *new_node
> +	= (struct btree_node *) (malloc (sizeof (struct btree_node)));
> +      version_lock_initialize_locked_exclusive (
> +	&(new_node->version_lock)); // initialize the node in locked state
> +      new_node->entry_count = 0;
> +      new_node->type = inner ? btree_node_inner : btree_node_leaf;
> +      return new_node;
> +    }
> +}
> +
> +// Release a node. This node must be currently locked exclusively and will
> +// be placed in the free list
> +static void
> +btree_release_node (struct btree *t, struct btree_node *node)
> +{
> +  // We cannot release the memory immediately because there might still be
> +  // concurrent readers on that node. Put it in the free list instead
> +  node->type = btree_node_free;
> +  struct btree_node *next_free
> +    = __atomic_load_n (&(t->free_list), __ATOMIC_SEQ_CST);
> +  do
> +    {
> +      node->content.children[0].child = next_free;
> +  } while (!__atomic_compare_exchange_n (&(t->free_list), &next_free, node,
> +					 false, __ATOMIC_SEQ_CST,
> +					 __ATOMIC_SEQ_CST));
> +  btree_node_unlock_exclusive (node);
> +}
> +
> +// Recursively release a tree. The btree is by design very shallow, thus
> +// we can risk recursion here
> +static void
> +btree_release_tree_recursively (struct btree *t, struct btree_node *node)
> +{
> +  btree_node_lock_exclusive (node);
> +  if (btree_node_is_inner (node))
> +    {
> +      for (unsigned index = 0; index < node->entry_count; ++index)
> +	btree_release_tree_recursively (t, node->content.children[index].child);
> +    }
> +  btree_release_node (t, node);
> +}
> +
> +// Check if we are splitting the root
> +static void
> +btree_handle_root_split (struct btree *t, struct btree_node **node,
> +			 struct btree_node **parent)
> +{
> +  // We want to keep the root pointer stable to allow for contention
> +  // free reads. Thus, we split the root by first moving the content
> +  // of the root node to a new node, and then split that new node
> +  if (!*parent)
> +    {
> +      // Allocate a new node, this guarantees us that we will have a parent
> +      // afterwards
> +      struct btree_node *new_node
> +	= btree_allocate_node (t, btree_node_is_inner (*node));
> +      struct btree_node *old_node = *node;
> +      new_node->entry_count = old_node->entry_count;
> +      new_node->content = old_node->content;
> +      old_node->content.children[0].separator = max_separator;
> +      old_node->content.children[0].child = new_node;
> +      old_node->entry_count = 1;
> +      old_node->type = btree_node_inner;
> +
> +      *parent = old_node;
> +      *node = new_node;
> +    }
> +}
> +
> +// Split an inner node
> +static void
> +btree_split_inner (struct btree *t, struct btree_node **inner,
> +		   struct btree_node **parent, uintptr_t target)
> +{
> +  // Check for the root
> +  btree_handle_root_split (t, inner, parent);
> +
> +  // Create two inner node
> +  uintptr_t right_fence = btree_node_get_fence_key (*inner);
> +  struct btree_node *left_inner = *inner;
> +  struct btree_node *right_inner = btree_allocate_node (t, true);
> +  unsigned split = left_inner->entry_count / 2;
> +  right_inner->entry_count = left_inner->entry_count - split;
> +  for (unsigned index = 0; index < right_inner->entry_count; ++index)
> +    right_inner->content.children[index]
> +      = left_inner->content.children[split + index];
> +  left_inner->entry_count = split;
> +  uintptr_t left_fence = btree_node_get_fence_key (left_inner);
> +  btree_node_update_separator_after_split (*parent, right_fence, left_fence,
> +					   right_inner);
> +  if (target <= left_fence)
> +    {
> +      *inner = left_inner;
> +      btree_node_unlock_exclusive (right_inner);
> +    }
> +  else
> +    {
> +      *inner = right_inner;
> +      btree_node_unlock_exclusive (left_inner);
> +    }
> +}
> +
> +// Split a leaf node
> +static void
> +btree_split_leaf (struct btree *t, struct btree_node **leaf,
> +		  struct btree_node **parent, uintptr_t fence, uintptr_t target)
> +{
> +  // Check for the root
> +  btree_handle_root_split (t, leaf, parent);
> +
> +  // Create two leaf node
> +  uintptr_t right_fence = fence;
> +  struct btree_node *left_leaf = *leaf;
> +  struct btree_node *right_leaf = btree_allocate_node (t, false);
> +  unsigned split = left_leaf->entry_count / 2;
> +  right_leaf->entry_count = left_leaf->entry_count - split;
> +  for (unsigned index = 0; index != right_leaf->entry_count; ++index)
> +    right_leaf->content.entries[index]
> +      = left_leaf->content.entries[split + index];
> +  left_leaf->entry_count = split;
> +  uintptr_t left_fence = right_leaf->content.entries[0].base - 1;
> +  btree_node_update_separator_after_split (*parent, right_fence, left_fence,
> +					   right_leaf);
> +  if (target <= left_fence)
> +    {
> +      *leaf = left_leaf;
> +      btree_node_unlock_exclusive (right_leaf);
> +    }
> +  else
> +    {
> +      *leaf = right_leaf;
> +      btree_node_unlock_exclusive (left_leaf);
> +    }
> +}
> +
> +// Merge (or balance) child nodes
> +static struct btree_node *
> +btree_merge_node (struct btree *t, unsigned child_slot,
> +		  struct btree_node *parent, uintptr_t target)
> +{
> +  // Choose the emptiest neighbor and lock both. The target child is already
> +  // locked
> +  unsigned left_slot;
> +  struct btree_node *left_node, *right_node;
> +  if ((child_slot == 0)
> +      || (((child_slot + 1) < parent->entry_count)
> +	  && (parent->content.children[child_slot + 1].child->entry_count
> +	      < parent->content.children[child_slot - 1].child->entry_count)))
> +    {
> +      left_slot = child_slot;
> +      left_node = parent->content.children[left_slot].child;
> +      right_node = parent->content.children[left_slot + 1].child;
> +      btree_node_lock_exclusive (right_node);
> +    }
> +  else
> +    {
> +      left_slot = child_slot - 1;
> +      left_node = parent->content.children[left_slot].child;
> +      right_node = parent->content.children[left_slot + 1].child;
> +      btree_node_lock_exclusive (left_node);
> +    }
> +
> +  // Can we merge both nodes into one node?
> +  unsigned total_count = left_node->entry_count + right_node->entry_count;
> +  unsigned max_count
> +    = btree_node_is_inner (left_node) ? max_fanout_inner : max_fanout_leaf;
> +  if (total_count <= max_count)
> +    {
> +      // Merge into the parent?
> +      if (parent->entry_count == 2)
> +	{
> +	  // Merge children into parent. This can only happen at the root
> +	  if (btree_node_is_inner (left_node))
> +	    {
> +	      for (unsigned index = 0; index != left_node->entry_count; ++index)
> +		parent->content.children[index]
> +		  = left_node->content.children[index];
> +	      for (unsigned index = 0; index != right_node->entry_count;
> +		   ++index)
> +		parent->content.children[index + left_node->entry_count]
> +		  = right_node->content.children[index];
> +	    }
> +	  else
> +	    {
> +	      parent->type = btree_node_leaf;
> +	      for (unsigned index = 0; index != left_node->entry_count; ++index)
> +		parent->content.entries[index]
> +		  = left_node->content.entries[index];
> +	      for (unsigned index = 0; index != right_node->entry_count;
> +		   ++index)
> +		parent->content.entries[index + left_node->entry_count]
> +		  = right_node->content.entries[index];
> +	    }
> +	  parent->entry_count = total_count;
> +	  btree_release_node (t, left_node);
> +	  btree_release_node (t, right_node);
> +	  return parent;
> +	}
> +      else
> +	{
> +	  // Regular merge
> +	  if (btree_node_is_inner (left_node))
> +	    {
> +	      for (unsigned index = 0; index != right_node->entry_count;
> +		   ++index)
> +		left_node->content.children[left_node->entry_count++]
> +		  = right_node->content.children[index];
> +	    }
> +	  else
> +	    {
> +	      for (unsigned index = 0; index != right_node->entry_count;
> +		   ++index)
> +		left_node->content.entries[left_node->entry_count++]
> +		  = right_node->content.entries[index];
> +	    }
> +	  parent->content.children[left_slot].separator
> +	    = parent->content.children[left_slot + 1].separator;
> +	  for (unsigned index = left_slot + 1; index + 1 < parent->entry_count;
> +	       ++index)
> +	    parent->content.children[index]
> +	      = parent->content.children[index + 1];
> +	  parent->entry_count--;
> +	  btree_release_node (t, right_node);
> +	  btree_node_unlock_exclusive (parent);
> +	  return left_node;
> +	}
> +    }
> +
> +  // No merge possible, rebalance instead
> +  if (left_node->entry_count > right_node->entry_count)
> +    {
> +      // Shift from left to right
> +      unsigned to_shift
> +	= (left_node->entry_count - right_node->entry_count) / 2;
> +      if (btree_node_is_inner (left_node))
> +	{
> +	  for (unsigned index = 0; index != right_node->entry_count; ++index)
> +	    {
> +	      unsigned pos = right_node->entry_count - 1 - index;
> +	      right_node->content.children[pos + to_shift]
> +		= right_node->content.children[pos];
> +	    }
> +	  for (unsigned index = 0; index != to_shift; ++index)
> +	    right_node->content.children[index]
> +	      = left_node->content
> +		  .children[left_node->entry_count - to_shift + index];
> +	}
> +      else
> +	{
> +	  for (unsigned index = 0; index != right_node->entry_count; ++index)
> +	    {
> +	      unsigned pos = right_node->entry_count - 1 - index;
> +	      right_node->content.entries[pos + to_shift]
> +		= right_node->content.entries[pos];
> +	    }
> +	  for (unsigned index = 0; index != to_shift; ++index)
> +	    right_node->content.entries[index]
> +	      = left_node->content
> +		  .entries[left_node->entry_count - to_shift + index];
> +	}
> +      left_node->entry_count -= to_shift;
> +      right_node->entry_count += to_shift;
> +    }
> +  else
> +    {
> +      // Shift from right to left
> +      unsigned to_shift
> +	= (right_node->entry_count - left_node->entry_count) / 2;
> +      if (btree_node_is_inner (left_node))
> +	{
> +	  for (unsigned index = 0; index != to_shift; ++index)
> +	    left_node->content.children[left_node->entry_count + index]
> +	      = right_node->content.children[index];
> +	  for (unsigned index = 0; index != right_node->entry_count - to_shift;
> +	       ++index)
> +	    right_node->content.children[index]
> +	      = right_node->content.children[index + to_shift];
> +	}
> +      else
> +	{
> +	  for (unsigned index = 0; index != to_shift; ++index)
> +	    left_node->content.entries[left_node->entry_count + index]
> +	      = right_node->content.entries[index];
> +	  for (unsigned index = 0; index != right_node->entry_count - to_shift;
> +	       ++index)
> +	    right_node->content.entries[index]
> +	      = right_node->content.entries[index + to_shift];
> +	}
> +      left_node->entry_count += to_shift;
> +      right_node->entry_count -= to_shift;
> +    }
> +  uintptr_t left_fence;
> +  if (btree_node_is_leaf (left_node))
> +    {
> +      left_fence = right_node->content.entries[0].base - 1;
> +    }
> +  else
> +    {
> +      left_fence = btree_node_get_fence_key (left_node);
> +    }
> +  parent->content.children[left_slot].separator = left_fence;
> +  btree_node_unlock_exclusive (parent);
> +  if (target <= left_fence)
> +    {
> +      btree_node_unlock_exclusive (right_node);
> +      return left_node;
> +    }
> +  else
> +    {
> +      btree_node_unlock_exclusive (left_node);
> +      return right_node;
> +    }
> +}
> +
> +// Insert an entry
> +static bool
> +btree_insert (struct btree *t, uintptr_t base, uintptr_t size,
> +	      struct object *ob)
> +{
> +  // Sanity check
> +  if (!size)
> +    return false;
> +
> +  // Access the root
> +  struct btree_node *iter, *parent = NULL;
> +  {
> +    version_lock_lock_exclusive (&(t->root_lock));
> +    iter = t->root;
> +    if (iter)
> +      {
> +	btree_node_lock_exclusive (iter);
> +      }
> +    else
> +      {
> +	t->root = iter = btree_allocate_node (t, false);
> +      }
> +    version_lock_unlock_exclusive (&(t->root_lock));
> +  }
> +
> +  // Walk down the btree with classic lock coupling and eager splits.
> +  // Strictly speaking this is not performance optimal, we could use
> +  // optimistic lock coupling until we hit a node that has to be modified.
> +  // But that is more difficult to implement and frame registration is
> +  // rare anyway, we use simple locking for now
> +
> +  uintptr_t fence = max_separator;
> +  while (btree_node_is_inner (iter))
> +    {
> +      // Use eager splits to avoid lock coupling up
> +      if (iter->entry_count == max_fanout_inner)
> +	btree_split_inner (t, &iter, &parent, base);
> +
> +      unsigned slot = btree_node_find_inner_slot (iter, base);
> +      if (parent)
> +	btree_node_unlock_exclusive (parent);
> +      parent = iter;
> +      fence = iter->content.children[slot].separator;
> +      iter = iter->content.children[slot].child;
> +      btree_node_lock_exclusive (iter);
> +    }
> +
> +  // Make sure we have space
> +  if (iter->entry_count == max_fanout_leaf)
> +    btree_split_leaf (t, &iter, &parent, fence, base);
> +  if (parent)
> +    btree_node_unlock_exclusive (parent);
> +
> +  // Insert in node
> +  unsigned slot = btree_node_find_leaf_slot (iter, base);
> +  if ((slot < iter->entry_count) && (iter->content.entries[slot].base == base))
> +    {
> +      // duplicate entry, this should never happen
> +      btree_node_unlock_exclusive (iter);
> +      return false;
> +    }
> +  for (unsigned index = iter->entry_count; index > slot; --index)
> +    iter->content.entries[index] = iter->content.entries[index - 1];
> +  struct leaf_entry *e = &(iter->content.entries[slot]);
> +  e->base = base;
> +  e->size = size;
> +  e->ob = ob;
> +  iter->entry_count++;
> +  btree_node_unlock_exclusive (iter);
> +  return true;
> +}
> +
> +// Remove an entry
> +static struct object *
> +btree_remove (struct btree *t, uintptr_t base)
> +{
> +  // Access the root
> +  version_lock_lock_exclusive (&(t->root_lock));
> +  struct btree_node *iter = t->root;
> +  if (iter)
> +    btree_node_lock_exclusive (iter);
> +  version_lock_unlock_exclusive (&(t->root_lock));
> +  if (!iter)
> +    return NULL;
> +
> +  // Same strategy as with insert, walk down with lock coupling and
> +  // merge eagerly
> +  while (btree_node_is_inner (iter))
> +    {
> +      unsigned slot = btree_node_find_inner_slot (iter, base);
> +      struct btree_node *next = iter->content.children[slot].child;
> +      btree_node_lock_exclusive (next);
> +      if (btree_node_needs_merge (next))
> +	{
> +	  // Use eager merges to avoid lock coupling up
> +	  iter = btree_merge_node (t, slot, iter, base);
> +	}
> +      else
> +	{
> +	  btree_node_unlock_exclusive (iter);
> +	  iter = next;
> +	}
> +    }
> +
> +  // Remove existing entry
> +  unsigned slot = btree_node_find_leaf_slot (iter, base);
> +  if ((slot >= iter->entry_count) || (iter->content.entries[slot].base != base))
> +    {
> +      // not found, this should never happen
> +      btree_node_unlock_exclusive (iter);
> +      return NULL;
> +    }
> +  struct object *ob = iter->content.entries[slot].ob;
> +  for (unsigned index = slot; index + 1 < iter->entry_count; ++index)
> +    iter->content.entries[index] = iter->content.entries[index + 1];
> +  iter->entry_count--;
> +  btree_node_unlock_exclusive (iter);
> +  return ob;
> +}
> +
> +// Find the corresponding entry for the given address
> +static struct object *
> +btree_lookup (const struct btree *t, uintptr_t target_addr)
> +{
> +  // Within this function many loads are relaxed atomic loads.
> +  // Use a macro to keep the code reasonable
> +#define RLOAD(x) __atomic_load_n (&(x), __ATOMIC_RELAXED)
> +
> +  // For targets where unwind info is usually not registered through these
> +  // APIs anymore, avoid any sequential consistent atomics.
> +  // Use relaxed MO here, it is up to the app to ensure that the library
> +  // loading/initialization happens-before using that library in other
> +  // threads (in particular unwinding with that library's functions
> +  // appearing in the backtraces).  Calling that library's functions
> +  // without waiting for the library to initialize would be racy.
> +  if (__builtin_expect (!RLOAD (t->root), 1))
> +    return NULL;
> +
> +  // The unwinding tables are mostly static, they only change when
> +  // frames are added or removed. This makes it extremely unlikely that they
> +  // change during a given unwinding sequence. Thus, we optimize for the
> +  // contention free case and use optimistic lock coupling. This does not
> +  // require any writes to shared state, instead we validate every read. It is
> +  // important that we do not trust any value that we have read until we call
> +  // validate again. Data can change at arbitrary points in time, thus we always
> +  // copy something into a local variable and validate again before acting on
> +  // the read. In the unlikely event that we encounter a concurrent change we
> +  // simply restart and try again.
> +
> +restart:
> +  struct btree_node *iter;
> +  uintptr_t lock;
> +  {
> +    // Accessing the root node requires defending against concurrent pointer
> +    // changes Thus we couple rootLock -> lock on root node -> validate rootLock

Can you elaborate a bit more in the comment on why it's necessary and 
sufficient to validate the containing node after "locking" the child node?

> +    if (!version_lock_lock_optimistic (&(t->root_lock), &lock))
> +      goto restart;
> +    iter = RLOAD (t->root);
> +    if (!version_lock_validate (&(t->root_lock), lock))
> +      goto restart;
> +    if (!iter)
> +      return NULL;
> +    uintptr_t child_lock;
> +    if ((!btree_node_lock_optimistic (iter, &child_lock))
> +	|| (!version_lock_validate (&(t->root_lock), lock)))
> +      goto restart;
> +    lock = child_lock;
> +  }
> +
> +  // Now we can walk down towards the right leaf node
> +  while (true)
> +    {
> +      enum node_type type = RLOAD (iter->type);
> +      unsigned entry_count = RLOAD (iter->entry_count);
> +      if (!btree_node_validate (iter, lock))
> +	goto restart;
> +      if (!entry_count)
> +	return NULL;
> +
> +      if (type == btree_node_inner)
> +	{
> +	  // We cannot call find_inner_slot here because we need (relaxed)
> +	  // atomic reads here
> +	  unsigned slot = 0;
> +	  while (
> +	    ((slot + 1) < entry_count)
> +	    && (RLOAD (iter->content.children[slot].separator) < target_addr))
> +	    ++slot;
> +	  struct btree_node *child = RLOAD (iter->content.children[slot].child);
> +	  if (!btree_node_validate (iter, lock))
> +	    goto restart;
> +
> +	  // The node content can change at any point in time, thus we must
> +	  // interleave parent and child checks
> +	  uintptr_t child_lock;
> +	  if (!btree_node_lock_optimistic (child, &child_lock))
> +	    goto restart;
> +	  if (!btree_node_validate (iter, lock))
> +	    goto restart; // make sure we still point to the correct node after
> +			  // acquiring the optimistic lock
> +
> +	  // Go down
> +	  iter = child;
> +	  lock = child_lock;
> +	}
> +      else
> +	{
> +	  // We cannot call find_leaf_slot here because we need (relaxed)
> +	  // atomic reads here
> +	  unsigned slot = 0;
> +	  while (((slot + 1) < entry_count)
> +		 && (RLOAD (iter->content.entries[slot].base)
> +		       + RLOAD (iter->content.entries[slot].size)
> +		     <= target_addr))
> +	    ++slot;
> +	  struct leaf_entry entry;
> +	  entry.base = RLOAD (iter->content.entries[slot].base);
> +	  entry.size = RLOAD (iter->content.entries[slot].size);
> +	  entry.ob = RLOAD (iter->content.entries[slot].ob);
> +	  if (!btree_node_validate (iter, lock))
> +	    goto restart;
> +
> +	  // Check if we have a hit
> +	  if ((entry.base <= target_addr)
> +	      && (target_addr < entry.base + entry.size))
> +	    {
> +	      return entry.ob;
> +	    }
> +	  return NULL;
> +	}
> +    }
> +#undef RLOAD
> +}
> +
> +#endif /* unwind-dw2-btree.h */
> diff --git a/libgcc/unwind-dw2-fde.c b/libgcc/unwind-dw2-fde.c
> index 8ee55be5675..d546b9e4c43 100644
> --- a/libgcc/unwind-dw2-fde.c
> +++ b/libgcc/unwind-dw2-fde.c
> @@ -42,15 +42,34 @@ see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
>   #endif
>   #endif
>   
> +#ifdef ATOMIC_FDE_FAST_PATH
> +#include "unwind-dw2-btree.h"
> +
> +static struct btree registered_frames;
> +
> +static void
> +release_registered_frames (void) __attribute__ ((destructor (110)));
> +static void
> +release_registered_frames (void)
> +{
> +  /* Release the b-tree and all frames. Frame releases that happen later are
> +   * silently ignored */
> +  btree_destroy (&registered_frames);
> +}
> +
> +static void
> +get_pc_range (const struct object *ob, uintptr_t *range);
> +static void
> +init_object (struct object *ob);
> +
> +#else
> +
>   /* The unseen_objects list contains objects that have been registered
>      but not yet categorized in any way.  The seen_objects list has had
>      its pc_begin and count fields initialized at minimum, and is sorted
>      by decreasing value of pc_begin.  */
>   static struct object *unseen_objects;
>   static struct object *seen_objects;
> -#ifdef ATOMIC_FDE_FAST_PATH
> -static int any_objects_registered;
> -#endif
>   
>   #ifdef __GTHREAD_MUTEX_INIT
>   static __gthread_mutex_t object_mutex = __GTHREAD_MUTEX_INIT;
> @@ -78,6 +97,7 @@ init_object_mutex_once (void)
>   static __gthread_mutex_t object_mutex;
>   #endif
>   #endif
> +#endif
>   
>   /* Called from crtbegin.o to register the unwind info for an object.  */
>   
> @@ -99,23 +119,23 @@ __register_frame_info_bases (const void *begin, struct object *ob,
>     ob->fde_end = NULL;
>   #endif
>   
> +#ifdef ATOMIC_FDE_FAST_PATH
> +  // Initialize  eagerly to avoid locking later
> +  init_object (ob);
> +
> +  // And register the frame
> +  uintptr_t range[2];
> +  get_pc_range (ob, range);
> +  btree_insert (&registered_frames, range[0], range[1] - range[0], ob);
> +#else
>     init_object_mutex_once ();
>     __gthread_mutex_lock (&object_mutex);
>   
>     ob->next = unseen_objects;
>     unseen_objects = ob;
> -#ifdef ATOMIC_FDE_FAST_PATH
> -  /* Set flag that at least one library has registered FDEs.
> -     Use relaxed MO here, it is up to the app to ensure that the library
> -     loading/initialization happens-before using that library in other
> -     threads (in particular unwinding with that library's functions
> -     appearing in the backtraces).  Calling that library's functions
> -     without waiting for the library to initialize would be racy.  */
> -  if (!any_objects_registered)
> -    __atomic_store_n (&any_objects_registered, 1, __ATOMIC_RELAXED);
> -#endif
>   
>     __gthread_mutex_unlock (&object_mutex);
> +#endif
>   }
>   
>   void
> @@ -153,23 +173,23 @@ __register_frame_info_table_bases (void *begin, struct object *ob,
>     ob->s.b.from_array = 1;
>     ob->s.b.encoding = DW_EH_PE_omit;
>   
> +#ifdef ATOMIC_FDE_FAST_PATH
> +  // Initialize  eagerly to avoid locking later
> +  init_object (ob);
> +
> +  // And register the frame
> +  uintptr_t range[2];
> +  get_pc_range (ob, range);
> +  btree_insert (&registered_frames, range[0], range[1] - range[0], ob);
> +#else
>     init_object_mutex_once ();
>     __gthread_mutex_lock (&object_mutex);
>   
>     ob->next = unseen_objects;
>     unseen_objects = ob;
> -#ifdef ATOMIC_FDE_FAST_PATH
> -  /* Set flag that at least one library has registered FDEs.
> -     Use relaxed MO here, it is up to the app to ensure that the library
> -     loading/initialization happens-before using that library in other
> -     threads (in particular unwinding with that library's functions
> -     appearing in the backtraces).  Calling that library's functions
> -     without waiting for the library to initialize would be racy.  */
> -  if (!any_objects_registered)
> -    __atomic_store_n (&any_objects_registered, 1, __ATOMIC_RELAXED);
> -#endif
>   
>     __gthread_mutex_unlock (&object_mutex);
> +#endif
>   }
>   
>   void
> @@ -200,16 +220,33 @@ __register_frame_table (void *begin)
>   void *
>   __deregister_frame_info_bases (const void *begin)
>   {
> -  struct object **p;
>     struct object *ob = 0;
>   
>     /* If .eh_frame is empty, we haven't registered.  */
>     if ((const uword *) begin == 0 || *(const uword *) begin == 0)
>       return ob;
>   
> +#ifdef ATOMIC_FDE_FAST_PATH
> +  // Find the corresponding PC range
> +  struct object lookupob;
> +  lookupob.tbase = 0;
> +  lookupob.dbase = 0;
> +  lookupob.u.single = begin;
> +  lookupob.s.i = 0;
> +  lookupob.s.b.encoding = DW_EH_PE_omit;
> +#ifdef DWARF2_OBJECT_END_PTR_EXTENSION
> +  lookupob.fde_end = NULL;
> +#endif
> +  uintptr_t range[2];
> +  get_pc_range (&lookupob, range);
> +
> +  // And remove
> +  ob = btree_remove (&registered_frames, range[0]);
> +#else
>     init_object_mutex_once ();
>     __gthread_mutex_lock (&object_mutex);
>   
> +  struct object **p;
>     for (p = &unseen_objects; *p ; p = &(*p)->next)
>       if ((*p)->u.single == begin)
>         {
> @@ -241,6 +278,8 @@ __deregister_frame_info_bases (const void *begin)
>   
>    out:
>     __gthread_mutex_unlock (&object_mutex);
> +#endif
> +
>     gcc_assert (ob);
>     return (void *) ob;
>   }
> @@ -264,7 +303,7 @@ __deregister_frame (void *begin)
>      instead of an _Unwind_Context.  */
>   
>   static _Unwind_Ptr
> -base_from_object (unsigned char encoding, struct object *ob)
> +base_from_object (unsigned char encoding, const struct object *ob)
>   {
>     if (encoding == DW_EH_PE_omit)
>       return 0;
> @@ -821,6 +860,91 @@ init_object (struct object* ob)
>     ob->s.b.sorted = 1;
>   }
>   
> +#ifdef ATOMIC_FDE_FAST_PATH
> +/* Get the PC range from FDEs. The code is very similar to
> +   classify_object_over_fdes and should be kept in sync with
> +   that. The main difference is that classify_object_over_fdes
> +   modifies the object, which we cannot do here */

Can we factor this out of classify_object_over_fdes and just leave the 
modifications in that function?  Trying to keep functions this size in 
sync often fails.

> +static void
> +get_pc_range_from_fdes (const struct object *ob, const fde *this_fde,
> +			uintptr_t *range)
> +{
> +  const struct dwarf_cie *last_cie = 0;
> +  int encoding = DW_EH_PE_absptr;
> +  _Unwind_Ptr base = 0;
> +
> +  for (; !last_fde (ob, this_fde); this_fde = next_fde (this_fde))
> +    {
> +      const struct dwarf_cie *this_cie;
> +      _Unwind_Ptr mask, pc_begin, pc_range;
> +
> +      /* Skip CIEs.  */
> +      if (this_fde->CIE_delta == 0)
> +	continue;
> +
> +      this_cie = get_cie (this_fde);
> +      if (this_cie != last_cie)
> +	{
> +	  last_cie = this_cie;
> +	  encoding = get_cie_encoding (this_cie);
> +	  base = base_from_object (encoding, ob);
> +	}
> +
> +      const unsigned char *p;
> +      p = read_encoded_value_with_base (encoding, base, this_fde->pc_begin,
> +					&pc_begin);
> +      read_encoded_value_with_base (encoding & 0x0F, 0, p, &pc_range);
> +
> +      /* Take care to ignore link-once functions that were removed.
> +	 In these cases, the function address will be NULL, but if
> +	 the encoding is smaller than a pointer a true NULL may not
> +	 be representable.  Assume 0 in the representable bits is NULL.  */
> +      mask = size_of_encoded_value (encoding);
> +      if (mask < sizeof (void *))
> +	mask = (((_Unwind_Ptr) 1) << (mask << 3)) - 1;
> +      else
> +	mask = -1;
> +      if ((pc_begin & mask) == 0)
> +	continue;
> +
> +      _Unwind_Ptr pc_end = pc_begin + pc_range;
> +      if ((!range[0]) && (!range[1]))
> +	{
> +	  range[0] = pc_begin;
> +	  range[1] = pc_end;
> +	}
> +      else
> +	{
> +	  if (pc_begin < range[0])
> +	    range[0] = pc_begin;
> +	  if (pc_end > range[1])
> +	    range[1] = pc_end;
> +	}
> +    }
> +}
> +
> +/* Get the PC range for lookup */
> +static void
> +get_pc_range (const struct object *ob, uintptr_t *range)
> +{
> +  range[0] = range[1] = 0;
> +  if (ob->s.b.sorted)
> +    {
> +      get_pc_range_from_fdes (ob, ob->u.sort->orig_data, range);
> +    }
> +  else if (ob->s.b.from_array)
> +    {
> +      fde **p = ob->u.array;
> +      for (; *p; ++p)
> +	get_pc_range_from_fdes (ob, *p, range);
> +    }
> +  else
> +    {
> +      get_pc_range_from_fdes (ob, ob->u.single, range);
> +    }
> +}
> +#endif
> +
>   /* A linear search through a set of FDEs for the given PC.  This is
>      used when there was insufficient memory to allocate and sort an
>      array.  */
> @@ -985,6 +1109,9 @@ binary_search_mixed_encoding_fdes (struct object *ob, void *pc)
>   static const fde *
>   search_object (struct object* ob, void *pc)
>   {
> +  /* The fast path initializes objects eagerly to avoid locking.
> +   * On the slow path we initialize them now */
> +#ifndef ATOMIC_FDE_FAST_PATH
>     /* If the data hasn't been sorted, try to do this now.  We may have
>        more memory available than last time we tried.  */
>     if (! ob->s.b.sorted)
> @@ -997,6 +1124,7 @@ search_object (struct object* ob, void *pc)
>         if (pc < ob->pc_begin)
>   	return NULL;
>       }
> +#endif
>   
>     if (ob->s.b.sorted)
>       {
> @@ -1033,17 +1161,12 @@ _Unwind_Find_FDE (void *pc, struct dwarf_eh_bases *bases)
>     const fde *f = NULL;
>   
>   #ifdef ATOMIC_FDE_FAST_PATH
> -  /* For targets where unwind info is usually not registered through these
> -     APIs anymore, avoid taking a global lock.
> -     Use relaxed MO here, it is up to the app to ensure that the library
> -     loading/initialization happens-before using that library in other
> -     threads (in particular unwinding with that library's functions
> -     appearing in the backtraces).  Calling that library's functions
> -     without waiting for the library to initialize would be racy.  */
> -  if (__builtin_expect (!__atomic_load_n (&any_objects_registered,
> -					  __ATOMIC_RELAXED), 1))
> +  ob = btree_lookup (&registered_frames, (uintptr_t) pc);
> +  if (!ob)
>       return NULL;
> -#endif
> +
> +  f = search_object (ob, pc);
> +#else
>   
>     init_object_mutex_once ();
>     __gthread_mutex_lock (&object_mutex);
> @@ -1081,6 +1204,7 @@ _Unwind_Find_FDE (void *pc, struct dwarf_eh_bases *bases)
>   
>    fini:
>     __gthread_mutex_unlock (&object_mutex);
> +#endif
>   
>     if (f)
>       {
> diff --git a/libgcc/unwind-dw2-fde.h b/libgcc/unwind-dw2-fde.h
> index 8a011c358b4..77c2caa4f5a 100644
> --- a/libgcc/unwind-dw2-fde.h
> +++ b/libgcc/unwind-dw2-fde.h
> @@ -166,7 +166,7 @@ next_fde (const fde *f)
>   extern const fde * _Unwind_Find_FDE (void *, struct dwarf_eh_bases *);
>   
>   static inline int
> -last_fde (struct object *obj __attribute__ ((__unused__)), const fde *f)
> +last_fde (const struct object *obj __attribute__ ((__unused__)), const fde *f)
>   {
>   #ifdef DWARF2_OBJECT_END_PTR_EXTENSION
>     return f == (const fde *) obj->fde_end || f->length == 0;
Thomas Neumann Sept. 12, 2022, 6:03 p.m. UTC | #2
Thanks for your feedback, I will update the patch in the next few days, 
addressing the comments and reorganizing classify_object_over_fdes. 
Concerning your question:

>> +
>> +restart:
>> +  struct btree_node *iter;
>> +  uintptr_t lock;
>> +  {
>> +    // Accessing the root node requires defending against concurrent 
>> pointer
>> +    // changes Thus we couple rootLock -> lock on root node -> 
>> validate rootLock
> 
> Can you elaborate a bit more in the comment on why it's necessary and 
> sufficient to validate the containing node after "locking" the child node?
> 
>> +    if (!version_lock_lock_optimistic (&(t->root_lock), &lock))
>> +      goto restart;
>> +    iter = RLOAD (t->root);
>> +    if (!version_lock_validate (&(t->root_lock), lock))
>> +      goto restart;
>> +    if (!iter)
>> +      return NULL;
>> +    uintptr_t child_lock;
>> +    if ((!btree_node_lock_optimistic (iter, &child_lock))
>> +    || (!version_lock_validate (&(t->root_lock), lock)))
>> +      goto restart;
>> +    lock = child_lock;
>> +  }
>> +
>> +  // Now we can walk down towards the right leaf node

We are reading here without explicit locks, i.e., memory can change 
behind our back at any time. Thus, before we act on a value that we have 
read, we must check if that value is still valid.
The invariants that we rely upon are:
I1) a node pointer will always be either a nullptr or point to a b-tree 
node. The node might have been logically deleted, but the memory will 
always remain valid due to our free list.
I2) The root pointer is only modified by a thread that holds an 
exclusive lock on root_lock.
I3) valide calls fail if a) somebody else currently holds the lock, or 
b) if the version value has changed since locking, which happens when 
somebody releases an exlusive lock.

Using that, the code does the following steps

1. We lock root_lock optimistic. In case of an exclusive lock we restart 
immediately, otherwise we remember the current version
2. We load the root pointer and store it in iter. iter might contain 
arbitrary garbage due to races
3. We validate the root_lock, and restart in case of errors. Now we know 
that a) nobody has locked the root exclusively or modified it since step 
1 (I3), and b) iter indeed points to the node that corresponds the to 
the version that we have read in step 1 (due to I2)

At this point we now that iter is indeed the right root pointer. If it 
is a nullptr we stop here. Otherwise we continue with

4. We lock the root node itself (iter), storing the validated version in 
child_lock. We need that lock to detect changes to the content of the 
root, the root_lock only protects the pointer to the root. Loading the 
lock is safe due to I1. As our tree can change at any point in time, we 
need another valide call on root_lock to make sure that we still have 
the right root pointer.


At this point we have both a pointer to the root node and an optimistic 
lock that allows us to detect changes to the content of the root. Thus, 
we can safely navigate down, any changes to the b-tree that could affect 
us, like, e.g., node splits in the root, will be detected when 
validating the lock.


I hope that answered both the necessary and sufficient part. As a 
general rule, you must always use the pair relax-atomic-read + validate 
to avoid data races. Acting on any value that you have not validated is 
unsafe, as you could get races.

Best

Thomas
diff mbox series

Patch

diff --git a/libgcc/unwind-dw2-btree.h b/libgcc/unwind-dw2-btree.h
new file mode 100644
index 00000000000..3b2b6871b46
--- /dev/null
+++ b/libgcc/unwind-dw2-btree.h
@@ -0,0 +1,953 @@ 
+/* Lock-free btree for manually registered unwind frames  */
+/* Copyright (C) 2022 Free Software Foundation, Inc.
+   Contributed by Thomas Neumann
+
+This file is part of GCC.
+
+GCC is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 3, or (at your option) any later
+version.
+
+GCC is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+for more details.
+
+Under Section 7 of GPL version 3, you are granted additional
+permissions described in the GCC Runtime Library Exception, version
+3.1, as published by the Free Software Foundation.
+
+You should have received a copy of the GNU General Public License and
+a copy of the GCC Runtime Library Exception along with this program;
+see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
+<http://www.gnu.org/licenses/>.  */
+
+#ifndef GCC_UNWIND_DW2_BTREE_H
+#define GCC_UNWIND_DW2_BTREE_H
+
+#include <stdbool.h>
+
+// Common logic for version locks
+struct version_lock
+{
+  // The lock itself. The lowest bit indicates an exclusive lock,
+  // the second bit indicates waiting threads. All other bits are
+  // used as counter to recognize changes.
+  // Overflows are okay here, we must only prevent overflow to the
+  // same value within one lock_optimistic/validate
+  // range. Even on 32 bit platforms that would require 1 billion
+  // frame registrations within the time span of a few assembler
+  // instructions.
+  uintptr_t version_lock;
+};
+
+#ifdef __GTHREAD_HAS_COND
+// We should never get contention within the tree as it rarely changes.
+// But if we ever do get contention we use these for waiting
+static __gthread_mutex_t version_lock_mutex = __GTHREAD_MUTEX_INIT;
+static __gthread_cond_t version_lock_cond = __GTHREAD_COND_INIT;
+#endif
+
+// Initialize in locked state
+static inline void
+version_lock_initialize_locked_exclusive (struct version_lock *vl)
+{
+  vl->version_lock = 1;
+}
+
+// Try to lock the node exclusive
+static inline bool
+version_lock_try_lock_exclusive (struct version_lock *vl)
+{
+  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+  if (state & 1)
+    return false;
+  return __atomic_compare_exchange_n (&(vl->version_lock), &state, state | 1,
+				      false, __ATOMIC_SEQ_CST,
+				      __ATOMIC_SEQ_CST);
+}
+
+// Lock the node exclusive, blocking as needed
+static void
+version_lock_lock_exclusive (struct version_lock *vl)
+{
+#ifndef __GTHREAD_HAS_COND
+restart:
+#endif
+
+  // We should virtually never get contention here, as frame
+  // changes are rare
+  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+  if (!(state & 1))
+    {
+      if (__atomic_compare_exchange_n (&(vl->version_lock), &state, state | 1,
+				       false, __ATOMIC_SEQ_CST,
+				       __ATOMIC_SEQ_CST))
+	return;
+    }
+
+    // We did get contention, wait properly
+#ifdef __GTHREAD_HAS_COND
+  __gthread_mutex_lock (&version_lock_mutex);
+  state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+  while (true)
+    {
+      // Check if the lock is still held
+      if (!(state & 1))
+	{
+	  if (__atomic_compare_exchange_n (&(vl->version_lock), &state,
+					   state | 1, false, __ATOMIC_SEQ_CST,
+					   __ATOMIC_SEQ_CST))
+	    {
+	      __gthread_mutex_unlock (&version_lock_mutex);
+	      return;
+	    }
+	  else
+	    {
+	      continue;
+	    }
+	}
+
+      // Register waiting thread
+      if (!(state & 2))
+	{
+	  if (!__atomic_compare_exchange_n (&(vl->version_lock), &state,
+					    state | 2, false, __ATOMIC_SEQ_CST,
+					    __ATOMIC_SEQ_CST))
+	    continue;
+	}
+
+      // And sleep
+      __gthread_cond_wait (&version_lock_cond, &version_lock_mutex);
+      state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+    }
+#else
+  // Spin if we do not have condition variables available
+  // We expect no contention here, spinning should be okay
+  goto restart;
+#endif
+}
+
+// Release a locked node and increase the version lock
+static void
+version_lock_unlock_exclusive (struct version_lock *vl)
+{
+  // increase version, reset exclusive lock bits
+  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+  uintptr_t ns = (state + 4) & (~((uintptr_t) 3));
+  state = __atomic_exchange_n (&(vl->version_lock), ns, __ATOMIC_SEQ_CST);
+
+#ifdef __GTHREAD_HAS_COND
+  if (state & 2)
+    {
+      // Wake up waiting threads. This should be extremely rare.
+      __gthread_mutex_lock (&version_lock_mutex);
+      __gthread_cond_broadcast (&version_lock_cond);
+      __gthread_mutex_unlock (&version_lock_mutex);
+    }
+#endif
+}
+
+// Acquire an optimistic "lock". Note that this does not lock at all, it
+// only allows for validation later
+static inline bool
+version_lock_lock_optimistic (const struct version_lock *vl, uintptr_t *lock)
+{
+  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+  *lock = state;
+
+  // Acquiring the lock fails when there is currently an exclusive lock
+  return !(state & 1);
+}
+
+// Validate a previously acquire lock
+static inline bool
+version_lock_validate (const struct version_lock *vl, uintptr_t lock)
+{
+  // Prevent the reordering of non-atomic loads behind the atomic load.
+  // Hans Boehm, Can Seqlocks Get Along with Programming Language Memory
+  // Models?, Section 4.
+  __atomic_thread_fence (__ATOMIC_ACQUIRE);
+
+  // Check that the node is still in the same state
+  uintptr_t state = __atomic_load_n (&(vl->version_lock), __ATOMIC_SEQ_CST);
+  return (state == lock);
+}
+
+// The largest possible separator value
+static const uintptr_t max_separator = ~((uintptr_t) (0));
+
+struct btree_node;
+
+// Inner entry. The child tree contains all entries <= separator
+struct inner_entry
+{
+  uintptr_t separator;
+  struct btree_node *child;
+};
+
+// Leaf entry. Stores an object entry
+struct leaf_entry
+{
+  uintptr_t base, size;
+  struct object *ob;
+};
+
+// node types
+enum node_type
+{
+  btree_node_inner,
+  btree_node_leaf,
+  btree_node_free
+};
+
+// Node sizes. Chosen such that the result size is roughly 256 bytes
+#define max_fanout_inner 15
+#define max_fanout_leaf 10
+
+// A btree node
+struct btree_node
+{
+  // The version lock used for optimistic lock coupling
+  struct version_lock version_lock;
+  // The number of entries
+  unsigned entry_count;
+  // The type
+  enum node_type type;
+  // The payload
+  union
+  {
+    // The inner nodes have fence keys, i.e., the right-most entry includes a
+    // separator
+    struct inner_entry children[max_fanout_inner];
+    struct leaf_entry entries[max_fanout_leaf];
+  } content;
+};
+
+// Is an inner node?
+static inline bool
+btree_node_is_inner (const struct btree_node *n)
+{
+  return n->type == btree_node_inner;
+}
+
+// Is a leaf node?
+static inline bool
+btree_node_is_leaf (const struct btree_node *n)
+{
+  return n->type == btree_node_leaf;
+}
+
+// Should the node be merged?
+static inline bool
+btree_node_needs_merge (const struct btree_node *n)
+{
+  return n->entry_count < (btree_node_is_inner (n) ? (max_fanout_inner / 2)
+						   : (max_fanout_leaf / 2));
+}
+
+// Get the fence key for inner nodes
+static inline uintptr_t
+btree_node_get_fence_key (const struct btree_node *n)
+{
+  // For inner nodes we just return our right-most entry
+  return n->content.children[n->entry_count - 1].separator;
+}
+
+// Find the position for a slot in an inner node
+static unsigned
+btree_node_find_inner_slot (const struct btree_node *n, uintptr_t value)
+{
+  for (unsigned index = 0, ec = n->entry_count; index != ec; ++index)
+    if (n->content.children[index].separator >= value)
+      return index;
+  return n->entry_count;
+}
+
+// Find the position for a slot in a leaf node
+static unsigned
+btree_node_find_leaf_slot (const struct btree_node *n, uintptr_t value)
+{
+  for (unsigned index = 0, ec = n->entry_count; index != ec; ++index)
+    if (n->content.entries[index].base + n->content.entries[index].size > value)
+      return index;
+  return n->entry_count;
+}
+
+// Try to lock the node exclusive
+static inline bool
+btree_node_try_lock_exclusive (struct btree_node *n)
+{
+  return version_lock_try_lock_exclusive (&(n->version_lock));
+}
+
+// Lock the node exclusive, blocking as needed
+static inline void
+btree_node_lock_exclusive (struct btree_node *n)
+{
+  version_lock_lock_exclusive (&(n->version_lock));
+}
+
+// Release a locked node and increase the version lock
+static inline void
+btree_node_unlock_exclusive (struct btree_node *n)
+{
+  version_lock_unlock_exclusive (&(n->version_lock));
+}
+
+// Acquire an optimistic "lock". Note that this does not lock at all, it
+// only allows for validation later
+static inline bool
+btree_node_lock_optimistic (const struct btree_node *n, uintptr_t *lock)
+{
+  return version_lock_lock_optimistic (&(n->version_lock), lock);
+}
+
+// Validate a previously acquire lock
+static inline bool
+btree_node_validate (const struct btree_node *n, uintptr_t lock)
+{
+  return version_lock_validate (&(n->version_lock), lock);
+}
+
+// Insert a new separator after splitting
+static void
+btree_node_update_separator_after_split (struct btree_node *n,
+					 uintptr_t old_separator,
+					 uintptr_t new_separator,
+					 struct btree_node *new_right)
+{
+  unsigned slot = btree_node_find_inner_slot (n, old_separator);
+  for (unsigned index = n->entry_count; index > slot; --index)
+    n->content.children[index] = n->content.children[index - 1];
+  n->content.children[slot].separator = new_separator;
+  n->content.children[slot + 1].child = new_right;
+  n->entry_count++;
+}
+
+// A btree. Suitable for static initialization, all members are zero at the
+// beginning
+struct btree
+{
+  // The root of the btree
+  struct btree_node *root;
+  // The free list of released node
+  struct btree_node *free_list;
+  // The version lock used to protect the root
+  struct version_lock root_lock;
+};
+
+// Initialize a btree. Not actually used, just for exposition
+static inline void
+btree_init (struct btree *t)
+{
+  t->root = NULL;
+  t->free_list = NULL;
+  t->root_lock.version_lock = 0;
+};
+
+static void
+btree_release_tree_recursively (struct btree *t, struct btree_node *n);
+
+// Destroy a tree and release all nodes
+static void
+btree_destroy (struct btree *t)
+{
+  // Disable the mechanism before cleaning up
+  struct btree_node *old_root
+    = __atomic_exchange_n (&(t->root), NULL, __ATOMIC_SEQ_CST);
+  if (old_root)
+    btree_release_tree_recursively (t, old_root);
+
+  // Release all free nodes
+  while (t->free_list)
+    {
+      struct btree_node *next = t->free_list->content.children[0].child;
+      free (t->free_list);
+      t->free_list = next;
+    }
+}
+
+// Allocate a node. This node will be returned in locked exclusive state
+static struct btree_node *
+btree_allocate_node (struct btree *t, bool inner)
+{
+  while (true)
+    {
+      // Try the free list first
+      struct btree_node *next_free
+	= __atomic_load_n (&(t->free_list), __ATOMIC_SEQ_CST);
+      if (next_free)
+	{
+	  if (!btree_node_try_lock_exclusive (next_free))
+	    continue;
+	  // The node might no longer be free, check that again after acquiring
+	  // the exclusive lock
+	  if (next_free->type == btree_node_free)
+	    {
+	      struct btree_node *ex = next_free;
+	      if (__atomic_compare_exchange_n (
+		    &(t->free_list), &ex, next_free->content.children[0].child,
+		    false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
+		{
+		  next_free->entry_count = 0;
+		  next_free->type = inner ? btree_node_inner : btree_node_leaf;
+		  return next_free;
+		}
+	    }
+	  btree_node_unlock_exclusive (next_free);
+	  continue;
+	}
+
+      // No free node available, allocate a new one
+      struct btree_node *new_node
+	= (struct btree_node *) (malloc (sizeof (struct btree_node)));
+      version_lock_initialize_locked_exclusive (
+	&(new_node->version_lock)); // initialize the node in locked state
+      new_node->entry_count = 0;
+      new_node->type = inner ? btree_node_inner : btree_node_leaf;
+      return new_node;
+    }
+}
+
+// Release a node. This node must be currently locked exclusively and will
+// be placed in the free list
+static void
+btree_release_node (struct btree *t, struct btree_node *node)
+{
+  // We cannot release the memory immediately because there might still be
+  // concurrent readers on that node. Put it in the free list instead
+  node->type = btree_node_free;
+  struct btree_node *next_free
+    = __atomic_load_n (&(t->free_list), __ATOMIC_SEQ_CST);
+  do
+    {
+      node->content.children[0].child = next_free;
+  } while (!__atomic_compare_exchange_n (&(t->free_list), &next_free, node,
+					 false, __ATOMIC_SEQ_CST,
+					 __ATOMIC_SEQ_CST));
+  btree_node_unlock_exclusive (node);
+}
+
+// Recursively release a tree. The btree is by design very shallow, thus
+// we can risk recursion here
+static void
+btree_release_tree_recursively (struct btree *t, struct btree_node *node)
+{
+  btree_node_lock_exclusive (node);
+  if (btree_node_is_inner (node))
+    {
+      for (unsigned index = 0; index < node->entry_count; ++index)
+	btree_release_tree_recursively (t, node->content.children[index].child);
+    }
+  btree_release_node (t, node);
+}
+
+// Check if we are splitting the root
+static void
+btree_handle_root_split (struct btree *t, struct btree_node **node,
+			 struct btree_node **parent)
+{
+  // We want to keep the root pointer stable to allow for contention
+  // free reads. Thus, we split the root by first moving the content
+  // of the root node to a new node, and then split that new node
+  if (!*parent)
+    {
+      // Allocate a new node, this guarantees us that we will have a parent
+      // afterwards
+      struct btree_node *new_node
+	= btree_allocate_node (t, btree_node_is_inner (*node));
+      struct btree_node *old_node = *node;
+      new_node->entry_count = old_node->entry_count;
+      new_node->content = old_node->content;
+      old_node->content.children[0].separator = max_separator;
+      old_node->content.children[0].child = new_node;
+      old_node->entry_count = 1;
+      old_node->type = btree_node_inner;
+
+      *parent = old_node;
+      *node = new_node;
+    }
+}
+
+// Split an inner node
+static void
+btree_split_inner (struct btree *t, struct btree_node **inner,
+		   struct btree_node **parent, uintptr_t target)
+{
+  // Check for the root
+  btree_handle_root_split (t, inner, parent);
+
+  // Create two inner node
+  uintptr_t right_fence = btree_node_get_fence_key (*inner);
+  struct btree_node *left_inner = *inner;
+  struct btree_node *right_inner = btree_allocate_node (t, true);
+  unsigned split = left_inner->entry_count / 2;
+  right_inner->entry_count = left_inner->entry_count - split;
+  for (unsigned index = 0; index < right_inner->entry_count; ++index)
+    right_inner->content.children[index]
+      = left_inner->content.children[split + index];
+  left_inner->entry_count = split;
+  uintptr_t left_fence = btree_node_get_fence_key (left_inner);
+  btree_node_update_separator_after_split (*parent, right_fence, left_fence,
+					   right_inner);
+  if (target <= left_fence)
+    {
+      *inner = left_inner;
+      btree_node_unlock_exclusive (right_inner);
+    }
+  else
+    {
+      *inner = right_inner;
+      btree_node_unlock_exclusive (left_inner);
+    }
+}
+
+// Split a leaf node
+static void
+btree_split_leaf (struct btree *t, struct btree_node **leaf,
+		  struct btree_node **parent, uintptr_t fence, uintptr_t target)
+{
+  // Check for the root
+  btree_handle_root_split (t, leaf, parent);
+
+  // Create two leaf node
+  uintptr_t right_fence = fence;
+  struct btree_node *left_leaf = *leaf;
+  struct btree_node *right_leaf = btree_allocate_node (t, false);
+  unsigned split = left_leaf->entry_count / 2;
+  right_leaf->entry_count = left_leaf->entry_count - split;
+  for (unsigned index = 0; index != right_leaf->entry_count; ++index)
+    right_leaf->content.entries[index]
+      = left_leaf->content.entries[split + index];
+  left_leaf->entry_count = split;
+  uintptr_t left_fence = right_leaf->content.entries[0].base - 1;
+  btree_node_update_separator_after_split (*parent, right_fence, left_fence,
+					   right_leaf);
+  if (target <= left_fence)
+    {
+      *leaf = left_leaf;
+      btree_node_unlock_exclusive (right_leaf);
+    }
+  else
+    {
+      *leaf = right_leaf;
+      btree_node_unlock_exclusive (left_leaf);
+    }
+}
+
+// Merge (or balance) child nodes
+static struct btree_node *
+btree_merge_node (struct btree *t, unsigned child_slot,
+		  struct btree_node *parent, uintptr_t target)
+{
+  // Choose the emptiest neighbor and lock both. The target child is already
+  // locked
+  unsigned left_slot;
+  struct btree_node *left_node, *right_node;
+  if ((child_slot == 0)
+      || (((child_slot + 1) < parent->entry_count)
+	  && (parent->content.children[child_slot + 1].child->entry_count
+	      < parent->content.children[child_slot - 1].child->entry_count)))
+    {
+      left_slot = child_slot;
+      left_node = parent->content.children[left_slot].child;
+      right_node = parent->content.children[left_slot + 1].child;
+      btree_node_lock_exclusive (right_node);
+    }
+  else
+    {
+      left_slot = child_slot - 1;
+      left_node = parent->content.children[left_slot].child;
+      right_node = parent->content.children[left_slot + 1].child;
+      btree_node_lock_exclusive (left_node);
+    }
+
+  // Can we merge both nodes into one node?
+  unsigned total_count = left_node->entry_count + right_node->entry_count;
+  unsigned max_count
+    = btree_node_is_inner (left_node) ? max_fanout_inner : max_fanout_leaf;
+  if (total_count <= max_count)
+    {
+      // Merge into the parent?
+      if (parent->entry_count == 2)
+	{
+	  // Merge children into parent. This can only happen at the root
+	  if (btree_node_is_inner (left_node))
+	    {
+	      for (unsigned index = 0; index != left_node->entry_count; ++index)
+		parent->content.children[index]
+		  = left_node->content.children[index];
+	      for (unsigned index = 0; index != right_node->entry_count;
+		   ++index)
+		parent->content.children[index + left_node->entry_count]
+		  = right_node->content.children[index];
+	    }
+	  else
+	    {
+	      parent->type = btree_node_leaf;
+	      for (unsigned index = 0; index != left_node->entry_count; ++index)
+		parent->content.entries[index]
+		  = left_node->content.entries[index];
+	      for (unsigned index = 0; index != right_node->entry_count;
+		   ++index)
+		parent->content.entries[index + left_node->entry_count]
+		  = right_node->content.entries[index];
+	    }
+	  parent->entry_count = total_count;
+	  btree_release_node (t, left_node);
+	  btree_release_node (t, right_node);
+	  return parent;
+	}
+      else
+	{
+	  // Regular merge
+	  if (btree_node_is_inner (left_node))
+	    {
+	      for (unsigned index = 0; index != right_node->entry_count;
+		   ++index)
+		left_node->content.children[left_node->entry_count++]
+		  = right_node->content.children[index];
+	    }
+	  else
+	    {
+	      for (unsigned index = 0; index != right_node->entry_count;
+		   ++index)
+		left_node->content.entries[left_node->entry_count++]
+		  = right_node->content.entries[index];
+	    }
+	  parent->content.children[left_slot].separator
+	    = parent->content.children[left_slot + 1].separator;
+	  for (unsigned index = left_slot + 1; index + 1 < parent->entry_count;
+	       ++index)
+	    parent->content.children[index]
+	      = parent->content.children[index + 1];
+	  parent->entry_count--;
+	  btree_release_node (t, right_node);
+	  btree_node_unlock_exclusive (parent);
+	  return left_node;
+	}
+    }
+
+  // No merge possible, rebalance instead
+  if (left_node->entry_count > right_node->entry_count)
+    {
+      // Shift from left to right
+      unsigned to_shift
+	= (left_node->entry_count - right_node->entry_count) / 2;
+      if (btree_node_is_inner (left_node))
+	{
+	  for (unsigned index = 0; index != right_node->entry_count; ++index)
+	    {
+	      unsigned pos = right_node->entry_count - 1 - index;
+	      right_node->content.children[pos + to_shift]
+		= right_node->content.children[pos];
+	    }
+	  for (unsigned index = 0; index != to_shift; ++index)
+	    right_node->content.children[index]
+	      = left_node->content
+		  .children[left_node->entry_count - to_shift + index];
+	}
+      else
+	{
+	  for (unsigned index = 0; index != right_node->entry_count; ++index)
+	    {
+	      unsigned pos = right_node->entry_count - 1 - index;
+	      right_node->content.entries[pos + to_shift]
+		= right_node->content.entries[pos];
+	    }
+	  for (unsigned index = 0; index != to_shift; ++index)
+	    right_node->content.entries[index]
+	      = left_node->content
+		  .entries[left_node->entry_count - to_shift + index];
+	}
+      left_node->entry_count -= to_shift;
+      right_node->entry_count += to_shift;
+    }
+  else
+    {
+      // Shift from right to left
+      unsigned to_shift
+	= (right_node->entry_count - left_node->entry_count) / 2;
+      if (btree_node_is_inner (left_node))
+	{
+	  for (unsigned index = 0; index != to_shift; ++index)
+	    left_node->content.children[left_node->entry_count + index]
+	      = right_node->content.children[index];
+	  for (unsigned index = 0; index != right_node->entry_count - to_shift;
+	       ++index)
+	    right_node->content.children[index]
+	      = right_node->content.children[index + to_shift];
+	}
+      else
+	{
+	  for (unsigned index = 0; index != to_shift; ++index)
+	    left_node->content.entries[left_node->entry_count + index]
+	      = right_node->content.entries[index];
+	  for (unsigned index = 0; index != right_node->entry_count - to_shift;
+	       ++index)
+	    right_node->content.entries[index]
+	      = right_node->content.entries[index + to_shift];
+	}
+      left_node->entry_count += to_shift;
+      right_node->entry_count -= to_shift;
+    }
+  uintptr_t left_fence;
+  if (btree_node_is_leaf (left_node))
+    {
+      left_fence = right_node->content.entries[0].base - 1;
+    }
+  else
+    {
+      left_fence = btree_node_get_fence_key (left_node);
+    }
+  parent->content.children[left_slot].separator = left_fence;
+  btree_node_unlock_exclusive (parent);
+  if (target <= left_fence)
+    {
+      btree_node_unlock_exclusive (right_node);
+      return left_node;
+    }
+  else
+    {
+      btree_node_unlock_exclusive (left_node);
+      return right_node;
+    }
+}
+
+// Insert an entry
+static bool
+btree_insert (struct btree *t, uintptr_t base, uintptr_t size,
+	      struct object *ob)
+{
+  // Sanity check
+  if (!size)
+    return false;
+
+  // Access the root
+  struct btree_node *iter, *parent = NULL;
+  {
+    version_lock_lock_exclusive (&(t->root_lock));
+    iter = t->root;
+    if (iter)
+      {
+	btree_node_lock_exclusive (iter);
+      }
+    else
+      {
+	t->root = iter = btree_allocate_node (t, false);
+      }
+    version_lock_unlock_exclusive (&(t->root_lock));
+  }
+
+  // Walk down the btree with classic lock coupling and eager splits.
+  // Strictly speaking this is not performance optimal, we could use
+  // optimistic lock coupling until we hit a node that has to be modified.
+  // But that is more difficult to implement and frame registration is
+  // rare anyway, we use simple locking for now
+
+  uintptr_t fence = max_separator;
+  while (btree_node_is_inner (iter))
+    {
+      // Use eager splits to avoid lock coupling up
+      if (iter->entry_count == max_fanout_inner)
+	btree_split_inner (t, &iter, &parent, base);
+
+      unsigned slot = btree_node_find_inner_slot (iter, base);
+      if (parent)
+	btree_node_unlock_exclusive (parent);
+      parent = iter;
+      fence = iter->content.children[slot].separator;
+      iter = iter->content.children[slot].child;
+      btree_node_lock_exclusive (iter);
+    }
+
+  // Make sure we have space
+  if (iter->entry_count == max_fanout_leaf)
+    btree_split_leaf (t, &iter, &parent, fence, base);
+  if (parent)
+    btree_node_unlock_exclusive (parent);
+
+  // Insert in node
+  unsigned slot = btree_node_find_leaf_slot (iter, base);
+  if ((slot < iter->entry_count) && (iter->content.entries[slot].base == base))
+    {
+      // duplicate entry, this should never happen
+      btree_node_unlock_exclusive (iter);
+      return false;
+    }
+  for (unsigned index = iter->entry_count; index > slot; --index)
+    iter->content.entries[index] = iter->content.entries[index - 1];
+  struct leaf_entry *e = &(iter->content.entries[slot]);
+  e->base = base;
+  e->size = size;
+  e->ob = ob;
+  iter->entry_count++;
+  btree_node_unlock_exclusive (iter);
+  return true;
+}
+
+// Remove an entry
+static struct object *
+btree_remove (struct btree *t, uintptr_t base)
+{
+  // Access the root
+  version_lock_lock_exclusive (&(t->root_lock));
+  struct btree_node *iter = t->root;
+  if (iter)
+    btree_node_lock_exclusive (iter);
+  version_lock_unlock_exclusive (&(t->root_lock));
+  if (!iter)
+    return NULL;
+
+  // Same strategy as with insert, walk down with lock coupling and
+  // merge eagerly
+  while (btree_node_is_inner (iter))
+    {
+      unsigned slot = btree_node_find_inner_slot (iter, base);
+      struct btree_node *next = iter->content.children[slot].child;
+      btree_node_lock_exclusive (next);
+      if (btree_node_needs_merge (next))
+	{
+	  // Use eager merges to avoid lock coupling up
+	  iter = btree_merge_node (t, slot, iter, base);
+	}
+      else
+	{
+	  btree_node_unlock_exclusive (iter);
+	  iter = next;
+	}
+    }
+
+  // Remove existing entry
+  unsigned slot = btree_node_find_leaf_slot (iter, base);
+  if ((slot >= iter->entry_count) || (iter->content.entries[slot].base != base))
+    {
+      // not found, this should never happen
+      btree_node_unlock_exclusive (iter);
+      return NULL;
+    }
+  struct object *ob = iter->content.entries[slot].ob;
+  for (unsigned index = slot; index + 1 < iter->entry_count; ++index)
+    iter->content.entries[index] = iter->content.entries[index + 1];
+  iter->entry_count--;
+  btree_node_unlock_exclusive (iter);
+  return ob;
+}
+
+// Find the corresponding entry for the given address
+static struct object *
+btree_lookup (const struct btree *t, uintptr_t target_addr)
+{
+  // Within this function many loads are relaxed atomic loads.
+  // Use a macro to keep the code reasonable
+#define RLOAD(x) __atomic_load_n (&(x), __ATOMIC_RELAXED)
+
+  // For targets where unwind info is usually not registered through these
+  // APIs anymore, avoid any sequential consistent atomics.
+  // Use relaxed MO here, it is up to the app to ensure that the library
+  // loading/initialization happens-before using that library in other
+  // threads (in particular unwinding with that library's functions
+  // appearing in the backtraces).  Calling that library's functions
+  // without waiting for the library to initialize would be racy.
+  if (__builtin_expect (!RLOAD (t->root), 1))
+    return NULL;
+
+  // The unwinding tables are mostly static, they only change when
+  // frames are added or removed. This makes it extremely unlikely that they
+  // change during a given unwinding sequence. Thus, we optimize for the
+  // contention free case and use optimistic lock coupling. This does not
+  // require any writes to shared state, instead we validate every read. It is
+  // important that we do not trust any value that we have read until we call
+  // validate again. Data can change at arbitrary points in time, thus we always
+  // copy something into a local variable and validate again before acting on
+  // the read. In the unlikely event that we encounter a concurrent change we
+  // simply restart and try again.
+
+restart:
+  struct btree_node *iter;
+  uintptr_t lock;
+  {
+    // Accessing the root node requires defending against concurrent pointer
+    // changes Thus we couple rootLock -> lock on root node -> validate rootLock
+    if (!version_lock_lock_optimistic (&(t->root_lock), &lock))
+      goto restart;
+    iter = RLOAD (t->root);
+    if (!version_lock_validate (&(t->root_lock), lock))
+      goto restart;
+    if (!iter)
+      return NULL;
+    uintptr_t child_lock;
+    if ((!btree_node_lock_optimistic (iter, &child_lock))
+	|| (!version_lock_validate (&(t->root_lock), lock)))
+      goto restart;
+    lock = child_lock;
+  }
+
+  // Now we can walk down towards the right leaf node
+  while (true)
+    {
+      enum node_type type = RLOAD (iter->type);
+      unsigned entry_count = RLOAD (iter->entry_count);
+      if (!btree_node_validate (iter, lock))
+	goto restart;
+      if (!entry_count)
+	return NULL;
+
+      if (type == btree_node_inner)
+	{
+	  // We cannot call find_inner_slot here because we need (relaxed)
+	  // atomic reads here
+	  unsigned slot = 0;
+	  while (
+	    ((slot + 1) < entry_count)
+	    && (RLOAD (iter->content.children[slot].separator) < target_addr))
+	    ++slot;
+	  struct btree_node *child = RLOAD (iter->content.children[slot].child);
+	  if (!btree_node_validate (iter, lock))
+	    goto restart;
+
+	  // The node content can change at any point in time, thus we must
+	  // interleave parent and child checks
+	  uintptr_t child_lock;
+	  if (!btree_node_lock_optimistic (child, &child_lock))
+	    goto restart;
+	  if (!btree_node_validate (iter, lock))
+	    goto restart; // make sure we still point to the correct node after
+			  // acquiring the optimistic lock
+
+	  // Go down
+	  iter = child;
+	  lock = child_lock;
+	}
+      else
+	{
+	  // We cannot call find_leaf_slot here because we need (relaxed)
+	  // atomic reads here
+	  unsigned slot = 0;
+	  while (((slot + 1) < entry_count)
+		 && (RLOAD (iter->content.entries[slot].base)
+		       + RLOAD (iter->content.entries[slot].size)
+		     <= target_addr))
+	    ++slot;
+	  struct leaf_entry entry;
+	  entry.base = RLOAD (iter->content.entries[slot].base);
+	  entry.size = RLOAD (iter->content.entries[slot].size);
+	  entry.ob = RLOAD (iter->content.entries[slot].ob);
+	  if (!btree_node_validate (iter, lock))
+	    goto restart;
+
+	  // Check if we have a hit
+	  if ((entry.base <= target_addr)
+	      && (target_addr < entry.base + entry.size))
+	    {
+	      return entry.ob;
+	    }
+	  return NULL;
+	}
+    }
+#undef RLOAD
+}
+
+#endif /* unwind-dw2-btree.h */
diff --git a/libgcc/unwind-dw2-fde.c b/libgcc/unwind-dw2-fde.c
index 8ee55be5675..d546b9e4c43 100644
--- a/libgcc/unwind-dw2-fde.c
+++ b/libgcc/unwind-dw2-fde.c
@@ -42,15 +42,34 @@  see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
 #endif
 #endif
 
+#ifdef ATOMIC_FDE_FAST_PATH
+#include "unwind-dw2-btree.h"
+
+static struct btree registered_frames;
+
+static void
+release_registered_frames (void) __attribute__ ((destructor (110)));
+static void
+release_registered_frames (void)
+{
+  /* Release the b-tree and all frames. Frame releases that happen later are
+   * silently ignored */
+  btree_destroy (&registered_frames);
+}
+
+static void
+get_pc_range (const struct object *ob, uintptr_t *range);
+static void
+init_object (struct object *ob);
+
+#else
+
 /* The unseen_objects list contains objects that have been registered
    but not yet categorized in any way.  The seen_objects list has had
    its pc_begin and count fields initialized at minimum, and is sorted
    by decreasing value of pc_begin.  */
 static struct object *unseen_objects;
 static struct object *seen_objects;
-#ifdef ATOMIC_FDE_FAST_PATH
-static int any_objects_registered;
-#endif
 
 #ifdef __GTHREAD_MUTEX_INIT
 static __gthread_mutex_t object_mutex = __GTHREAD_MUTEX_INIT;
@@ -78,6 +97,7 @@  init_object_mutex_once (void)
 static __gthread_mutex_t object_mutex;
 #endif
 #endif
+#endif
 
 /* Called from crtbegin.o to register the unwind info for an object.  */
 
@@ -99,23 +119,23 @@  __register_frame_info_bases (const void *begin, struct object *ob,
   ob->fde_end = NULL;
 #endif
 
+#ifdef ATOMIC_FDE_FAST_PATH
+  // Initialize  eagerly to avoid locking later
+  init_object (ob);
+
+  // And register the frame
+  uintptr_t range[2];
+  get_pc_range (ob, range);
+  btree_insert (&registered_frames, range[0], range[1] - range[0], ob);
+#else
   init_object_mutex_once ();
   __gthread_mutex_lock (&object_mutex);
 
   ob->next = unseen_objects;
   unseen_objects = ob;
-#ifdef ATOMIC_FDE_FAST_PATH
-  /* Set flag that at least one library has registered FDEs.
-     Use relaxed MO here, it is up to the app to ensure that the library
-     loading/initialization happens-before using that library in other
-     threads (in particular unwinding with that library's functions
-     appearing in the backtraces).  Calling that library's functions
-     without waiting for the library to initialize would be racy.  */
-  if (!any_objects_registered)
-    __atomic_store_n (&any_objects_registered, 1, __ATOMIC_RELAXED);
-#endif
 
   __gthread_mutex_unlock (&object_mutex);
+#endif
 }
 
 void
@@ -153,23 +173,23 @@  __register_frame_info_table_bases (void *begin, struct object *ob,
   ob->s.b.from_array = 1;
   ob->s.b.encoding = DW_EH_PE_omit;
 
+#ifdef ATOMIC_FDE_FAST_PATH
+  // Initialize  eagerly to avoid locking later
+  init_object (ob);
+
+  // And register the frame
+  uintptr_t range[2];
+  get_pc_range (ob, range);
+  btree_insert (&registered_frames, range[0], range[1] - range[0], ob);
+#else
   init_object_mutex_once ();
   __gthread_mutex_lock (&object_mutex);
 
   ob->next = unseen_objects;
   unseen_objects = ob;
-#ifdef ATOMIC_FDE_FAST_PATH
-  /* Set flag that at least one library has registered FDEs.
-     Use relaxed MO here, it is up to the app to ensure that the library
-     loading/initialization happens-before using that library in other
-     threads (in particular unwinding with that library's functions
-     appearing in the backtraces).  Calling that library's functions
-     without waiting for the library to initialize would be racy.  */
-  if (!any_objects_registered)
-    __atomic_store_n (&any_objects_registered, 1, __ATOMIC_RELAXED);
-#endif
 
   __gthread_mutex_unlock (&object_mutex);
+#endif
 }
 
 void
@@ -200,16 +220,33 @@  __register_frame_table (void *begin)
 void *
 __deregister_frame_info_bases (const void *begin)
 {
-  struct object **p;
   struct object *ob = 0;
 
   /* If .eh_frame is empty, we haven't registered.  */
   if ((const uword *) begin == 0 || *(const uword *) begin == 0)
     return ob;
 
+#ifdef ATOMIC_FDE_FAST_PATH
+  // Find the corresponding PC range
+  struct object lookupob;
+  lookupob.tbase = 0;
+  lookupob.dbase = 0;
+  lookupob.u.single = begin;
+  lookupob.s.i = 0;
+  lookupob.s.b.encoding = DW_EH_PE_omit;
+#ifdef DWARF2_OBJECT_END_PTR_EXTENSION
+  lookupob.fde_end = NULL;
+#endif
+  uintptr_t range[2];
+  get_pc_range (&lookupob, range);
+
+  // And remove
+  ob = btree_remove (&registered_frames, range[0]);
+#else
   init_object_mutex_once ();
   __gthread_mutex_lock (&object_mutex);
 
+  struct object **p;
   for (p = &unseen_objects; *p ; p = &(*p)->next)
     if ((*p)->u.single == begin)
       {
@@ -241,6 +278,8 @@  __deregister_frame_info_bases (const void *begin)
 
  out:
   __gthread_mutex_unlock (&object_mutex);
+#endif
+
   gcc_assert (ob);
   return (void *) ob;
 }
@@ -264,7 +303,7 @@  __deregister_frame (void *begin)
    instead of an _Unwind_Context.  */
 
 static _Unwind_Ptr
-base_from_object (unsigned char encoding, struct object *ob)
+base_from_object (unsigned char encoding, const struct object *ob)
 {
   if (encoding == DW_EH_PE_omit)
     return 0;
@@ -821,6 +860,91 @@  init_object (struct object* ob)
   ob->s.b.sorted = 1;
 }
 
+#ifdef ATOMIC_FDE_FAST_PATH
+/* Get the PC range from FDEs. The code is very similar to
+   classify_object_over_fdes and should be kept in sync with
+   that. The main difference is that classify_object_over_fdes
+   modifies the object, which we cannot do here */
+static void
+get_pc_range_from_fdes (const struct object *ob, const fde *this_fde,
+			uintptr_t *range)
+{
+  const struct dwarf_cie *last_cie = 0;
+  int encoding = DW_EH_PE_absptr;
+  _Unwind_Ptr base = 0;
+
+  for (; !last_fde (ob, this_fde); this_fde = next_fde (this_fde))
+    {
+      const struct dwarf_cie *this_cie;
+      _Unwind_Ptr mask, pc_begin, pc_range;
+
+      /* Skip CIEs.  */
+      if (this_fde->CIE_delta == 0)
+	continue;
+
+      this_cie = get_cie (this_fde);
+      if (this_cie != last_cie)
+	{
+	  last_cie = this_cie;
+	  encoding = get_cie_encoding (this_cie);
+	  base = base_from_object (encoding, ob);
+	}
+
+      const unsigned char *p;
+      p = read_encoded_value_with_base (encoding, base, this_fde->pc_begin,
+					&pc_begin);
+      read_encoded_value_with_base (encoding & 0x0F, 0, p, &pc_range);
+
+      /* Take care to ignore link-once functions that were removed.
+	 In these cases, the function address will be NULL, but if
+	 the encoding is smaller than a pointer a true NULL may not
+	 be representable.  Assume 0 in the representable bits is NULL.  */
+      mask = size_of_encoded_value (encoding);
+      if (mask < sizeof (void *))
+	mask = (((_Unwind_Ptr) 1) << (mask << 3)) - 1;
+      else
+	mask = -1;
+      if ((pc_begin & mask) == 0)
+	continue;
+
+      _Unwind_Ptr pc_end = pc_begin + pc_range;
+      if ((!range[0]) && (!range[1]))
+	{
+	  range[0] = pc_begin;
+	  range[1] = pc_end;
+	}
+      else
+	{
+	  if (pc_begin < range[0])
+	    range[0] = pc_begin;
+	  if (pc_end > range[1])
+	    range[1] = pc_end;
+	}
+    }
+}
+
+/* Get the PC range for lookup */
+static void
+get_pc_range (const struct object *ob, uintptr_t *range)
+{
+  range[0] = range[1] = 0;
+  if (ob->s.b.sorted)
+    {
+      get_pc_range_from_fdes (ob, ob->u.sort->orig_data, range);
+    }
+  else if (ob->s.b.from_array)
+    {
+      fde **p = ob->u.array;
+      for (; *p; ++p)
+	get_pc_range_from_fdes (ob, *p, range);
+    }
+  else
+    {
+      get_pc_range_from_fdes (ob, ob->u.single, range);
+    }
+}
+#endif
+
 /* A linear search through a set of FDEs for the given PC.  This is
    used when there was insufficient memory to allocate and sort an
    array.  */
@@ -985,6 +1109,9 @@  binary_search_mixed_encoding_fdes (struct object *ob, void *pc)
 static const fde *
 search_object (struct object* ob, void *pc)
 {
+  /* The fast path initializes objects eagerly to avoid locking.
+   * On the slow path we initialize them now */
+#ifndef ATOMIC_FDE_FAST_PATH
   /* If the data hasn't been sorted, try to do this now.  We may have
      more memory available than last time we tried.  */
   if (! ob->s.b.sorted)
@@ -997,6 +1124,7 @@  search_object (struct object* ob, void *pc)
       if (pc < ob->pc_begin)
 	return NULL;
     }
+#endif
 
   if (ob->s.b.sorted)
     {
@@ -1033,17 +1161,12 @@  _Unwind_Find_FDE (void *pc, struct dwarf_eh_bases *bases)
   const fde *f = NULL;
 
 #ifdef ATOMIC_FDE_FAST_PATH
-  /* For targets where unwind info is usually not registered through these
-     APIs anymore, avoid taking a global lock.
-     Use relaxed MO here, it is up to the app to ensure that the library
-     loading/initialization happens-before using that library in other
-     threads (in particular unwinding with that library's functions
-     appearing in the backtraces).  Calling that library's functions
-     without waiting for the library to initialize would be racy.  */
-  if (__builtin_expect (!__atomic_load_n (&any_objects_registered,
-					  __ATOMIC_RELAXED), 1))
+  ob = btree_lookup (&registered_frames, (uintptr_t) pc);
+  if (!ob)
     return NULL;
-#endif
+
+  f = search_object (ob, pc);
+#else
 
   init_object_mutex_once ();
   __gthread_mutex_lock (&object_mutex);
@@ -1081,6 +1204,7 @@  _Unwind_Find_FDE (void *pc, struct dwarf_eh_bases *bases)
 
  fini:
   __gthread_mutex_unlock (&object_mutex);
+#endif
 
   if (f)
     {
diff --git a/libgcc/unwind-dw2-fde.h b/libgcc/unwind-dw2-fde.h
index 8a011c358b4..77c2caa4f5a 100644
--- a/libgcc/unwind-dw2-fde.h
+++ b/libgcc/unwind-dw2-fde.h
@@ -166,7 +166,7 @@  next_fde (const fde *f)
 extern const fde * _Unwind_Find_FDE (void *, struct dwarf_eh_bases *);
 
 static inline int
-last_fde (struct object *obj __attribute__ ((__unused__)), const fde *f)
+last_fde (const struct object *obj __attribute__ ((__unused__)), const fde *f)
 {
 #ifdef DWARF2_OBJECT_END_PTR_EXTENSION
   return f == (const fde *) obj->fde_end || f->length == 0;