commit 1543313a3b590e6422f5d547cabd6662e0a6f538
Author: Torvald Riegel <triegel@redhat.com>
Date: Sun Nov 16 12:07:22 2014 +0100
[WIP] Optimize synchronization in std::future if futexes are available.
@@ -83,6 +83,7 @@ bits_headers = \
${bits_srcdir}/allocated_ptr.h \
${bits_srcdir}/allocator.h \
${bits_srcdir}/atomic_base.h \
+ ${bits_srcdir}/atomic_futex.h \
${bits_srcdir}/basic_ios.h \
${bits_srcdir}/basic_ios.tcc \
${bits_srcdir}/basic_string.h \
@@ -350,6 +350,7 @@ bits_headers = \
${bits_srcdir}/allocated_ptr.h \
${bits_srcdir}/allocator.h \
${bits_srcdir}/atomic_base.h \
+ ${bits_srcdir}/atomic_futex.h \
${bits_srcdir}/basic_ios.h \
${bits_srcdir}/basic_ios.tcc \
${bits_srcdir}/basic_string.h \
new file mode 100644
@@ -0,0 +1,175 @@
+// -*- C++ -*- header.
+
+// Copyright (C) 2014 Free Software Foundation, Inc.
+//
+// This file is part of the GNU ISO C++ Library. This library is free
+// software; you can redistribute it and/or modify it under the
+// terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 3, or (at your option)
+// any later version.
+
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// Under Section 7 of GPL version 3, you are granted additional
+// permissions described in the GCC Runtime Library Exception, version
+// 3.1, as published by the Free Software Foundation.
+
+// You should have received a copy of the GNU General Public License and
+// a copy of the GCC Runtime Library Exception along with this program;
+// see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
+// <http://www.gnu.org/licenses/>.
+
+/** @file bits/atomic_futex.h
+ * This is an internal header file, included by other library headers.
+ * Do not attempt to use it directly.
+ */
+
+#ifndef _GLIBCXX_ATOMIC_FUTEX_H
+#define _GLIBCXX_ATOMIC_FUTEX_H 1
+
+#pragma GCC system_header
+
+#include <bits/c++config.h>
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+
+#ifndef _GLIBCXX_ALWAYS_INLINE
+#define _GLIBCXX_ALWAYS_INLINE inline __attribute__((always_inline))
+#endif
+
+namespace std _GLIBCXX_VISIBILITY(default)
+{
+_GLIBCXX_BEGIN_NAMESPACE_VERSION
+
+//#ifdef _GLIBCXX_USE_FUTEX
+ struct __atomic_futex_unsigned_base
+ {
+ // Returns false iff a timeout occurred.
+ static bool
+ futex_wait_for(unsigned *addr, unsigned val, bool has_timeout,
+ chrono::seconds s, chrono::nanoseconds ns);
+
+ // This is static because it can be executed after the object has been
+ // destroyed.
+ static void futex_notify_all(unsigned* addr);
+ };
+
+ template <unsigned _Waiter_bit = 0x80000000>
+ struct __atomic_futex_unsigned : public __atomic_futex_unsigned_base
+ {
+ // XXX We expect this to be lock-free, and having the payload at offset 0
+ atomic<unsigned> _M_data;
+
+ __atomic_futex_unsigned(unsigned data) : _M_data(data)
+ { }
+
+ _GLIBCXX_ALWAYS_INLINE unsigned
+ load(memory_order mo)
+ {
+ return _M_data.load(mo) & ~_Waiter_bit;
+ }
+
+ // If a timeout occurs, returns a current value after the timeout;
+ // otherwise, returns the operand's value if equal is true or a different
+ // value if equal is false.
+ // The assumed value is the caller's assumption about the current value
+ // when making the call.
+ unsigned
+ load_and_test_for_slow(unsigned assumed, unsigned operand, bool equal,
+ memory_order mo, bool has_timeout,
+ chrono::seconds s, chrono::nanoseconds ns)
+ {
+ for (;;)
+ {
+ // Don't bother checking the value again because we expect the caller to
+ // have done it recently.
+ // memory_order_relaxed is sufficient because we can rely on just the
+ // modification order (store_notify uses an atomic RMW operation too),
+ // and the futex syscalls synchronize between themselves.
+ _M_data.fetch_or(_Waiter_bit, memory_order_relaxed);
+ bool ret = futex_wait_for((unsigned*)(void*)&_M_data,
+ assumed | _Waiter_bit, has_timeout, s, ns);
+ // Fetch the current value after waiting (clears _Waiter_bit).
+ assumed = load(mo);
+ if (!ret || ((operand == assumed) == equal))
+ return assumed;
+ // TODO adapt wait time
+ }
+ }
+
+ _GLIBCXX_ALWAYS_INLINE unsigned
+ load_when_not_equal(unsigned val, memory_order mo)
+ {
+ unsigned i = load(mo);
+ if ((i & ~_Waiter_bit) != val) return;
+ // TODO Spin-wait first.
+ return load_and_test_for_slow(i, val, false, mo, false, 0, 0);
+ }
+
+ _GLIBCXX_ALWAYS_INLINE void
+ load_when_equal(unsigned val, memory_order mo)
+ {
+ unsigned i = load(mo);
+ if ((i & ~_Waiter_bit) == val)
+ return;
+ // TODO Spin-wait first.
+ load_and_test_for_slow(i, val, true, mo, false, 0, 0);
+ }
+
+ template<typename _Rep, typename _Period>
+ _GLIBCXX_ALWAYS_INLINE bool
+ load_when_equal_for(unsigned val, memory_order mo,
+ const chrono::duration<_Rep, _Period>& rel_time)
+ {
+ unsigned i = load(mo);
+ if ((i & ~_Waiter_bit) == val)
+ return true;
+ // TODO Spin-wait first. Ignore effect on timeout.
+ auto s = chrono::duration_cast<chrono::seconds>(rel_time);
+ auto ns = chrono::duration_cast<chrono::nanoseconds>(rel_time - s);
+ i = load_and_test_for_slow(i, val, true, mo, false, 0, 0);
+ return (i & ~_Waiter_bit) == val;
+ }
+
+ template<typename _Clock, typename _Duration>
+ _GLIBCXX_ALWAYS_INLINE bool
+ load_when_equal_until(unsigned val, memory_order mo,
+ const chrono::time_point<_Clock, _Duration>& abs_time)
+ {
+ unsigned i = load(mo);
+ if ((i & ~_Waiter_bit) == val)
+ return true;
+ // TODO Spin-wait first. Ignore effect on timeout.
+ auto rel_time = abs_time - chrono::steady_clock::now();
+ auto s = chrono::duration_cast<chrono::seconds>(rel_time);
+ auto ns = chrono::duration_cast<chrono::nanoseconds>(rel_time - s);
+ i = load_and_test_for_slow(i, val, true, mo, false, 0, 0);
+ return (i & ~_Waiter_bit) == val;
+ }
+
+ _GLIBCXX_ALWAYS_INLINE void
+ store_notify_all(unsigned val, memory_order mo)
+ {
+ void* futex = (unsigned *)(void *)&_M_data;
+ if (_M_data.exchange(val, mo) & _Waiter_bit)
+ futex_notify_all(futex);
+ }
+ };
+
+//#else
+// struct __atomic_futex
+// {
+// int _M_data;
+// mutex _M_mutex;
+// condition_variable _M_condvar;
+// };
+//#endif
+
+_GLIBCXX_END_NAMESPACE_VERSION
+} // namespace std
+
+#endif
@@ -41,6 +41,7 @@
#include <condition_variable>
#include <system_error>
#include <atomic>
+#include <bits/atomic_futex.h>
#include <bits/functexcept.h>
#include <bits/unique_ptr.h>
#include <bits/shared_ptr.h>
@@ -291,14 +292,19 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
{
typedef _Ptr<_Result_base> _Ptr_type;
+ enum _Status {
+ not_ready,
+ ready
+ };
+
_Ptr_type _M_result;
- mutex _M_mutex;
- condition_variable _M_cond;
+ __atomic_futex_unsigned<> _M_status;
atomic_flag _M_retrieved;
once_flag _M_once;
public:
- _State_baseV2() noexcept : _M_result(), _M_retrieved(ATOMIC_FLAG_INIT)
+ _State_baseV2() noexcept : _M_result(), _M_retrieved(ATOMIC_FLAG_INIT),
+ _M_status(_Status::not_ready)
{ }
_State_baseV2(const _State_baseV2&) = delete;
_State_baseV2& operator=(const _State_baseV2&) = delete;
@@ -308,8 +314,9 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
wait()
{
_M_complete_async();
- unique_lock<mutex> __lock(_M_mutex);
- _M_cond.wait(__lock, [&] { return _M_ready(); });
+ // Acquire MO makes sure this synchronizes with the thread that made
+ // the future ready.
+ _M_status.load_when_equal(_Status::ready, memory_order_acquire);
return *_M_result;
}
@@ -317,15 +324,23 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
future_status
wait_for(const chrono::duration<_Rep, _Period>& __rel)
{
- unique_lock<mutex> __lock(_M_mutex);
- if (_M_ready())
+ _Status _s = _M_status.load(memory_order_acquire);
+ if (_s == _Status::ready)
return future_status::ready;
- if (_M_has_deferred())
+ if (_M_is_deferred_future())
return future_status::deferred;
- if (_M_cond.wait_for(__lock, __rel, [&] { return _M_ready(); }))
+ if (_M_status.load_when_equal_for(_Status::ready,
+ memory_order_acquire, __rel))
{
// _GLIBCXX_RESOLVE_LIB_DEFECTS
// 2100. timed waiting functions must also join
+ // This call is a no-op by default except on an async future,
+ // in which case the async thread is joined. It's also not a
+ // no-op for a deferred future, but such a future will never
+ // reach this point because it returns future_status::deferred
+ // instead of waiting for the future to become ready (see
+ // above). Async futures synchronize in this call, so we need
+ // no further synchronization here.
_M_complete_async();
return future_status::ready;
}
@@ -336,15 +351,18 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
future_status
wait_until(const chrono::time_point<_Clock, _Duration>& __abs)
{
- unique_lock<mutex> __lock(_M_mutex);
- if (_M_ready())
+ // Use atomics + futex loop (or synchronic)
+ _Status _s = _M_status.load(memory_order_acquire);
+ if (_s == _Status::ready)
return future_status::ready;
- if (_M_has_deferred())
+ if (_M_is_deferred_future())
return future_status::deferred;
- if (_M_cond.wait_until(__lock, __abs, [&] { return _M_ready(); }))
+ if (_M_status.load_when_equal_until(_Status::ready,
+ memory_order_acquire, __abs))
{
// _GLIBCXX_RESOLVE_LIB_DEFECTS
// 2100. timed waiting functions must also join
+ // See wait_for(...).
_M_complete_async();
return future_status::ready;
}
@@ -354,14 +372,12 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
void
_M_set_result(function<_Ptr_type()> __res, bool __ignore_failure = false)
{
- unique_lock<mutex> __lock(_M_mutex, defer_lock);
+ bool __did_set = false;
// all calls to this function are serialized,
// side-effects of invoking __res only happen once
call_once(_M_once, &_State_baseV2::_M_do_set, this,
- std::__addressof(__res), std::__addressof(__lock));
- if (__lock.owns_lock())
- _M_cond.notify_all();
- else if (!__ignore_failure)
+ std::__addressof(__res), std::__addressof(__did_set));
+ if (!__did_set && !__ignore_failure)
__throw_future_error(int(future_errc::promise_already_satisfied));
}
@@ -372,11 +388,9 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
{
error_code __ec(make_error_code(future_errc::broken_promise));
__res->_M_error = make_exception_ptr(future_error(__ec));
- {
- lock_guard<mutex> __lock(_M_mutex);
- _M_result.swap(__res);
- }
- _M_cond.notify_all();
+ // Must not be called concurrently with set_result.
+ _M_result.swap(__res);
+ _M_status.store_notify_all(_Status::ready, memory_order_release);
}
}
@@ -466,21 +480,22 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
private:
void
- _M_do_set(function<_Ptr_type()>* __f, unique_lock<mutex>* __lock)
+ _M_do_set(function<_Ptr_type()>* __f, bool* __did_set)
{
- _Ptr_type __res = (*__f)(); // do not hold lock while running setter
- __lock->lock();
+ _Ptr_type __res = (*__f)();
+ // Notify the caller that we did try to set; if we do not throw an
+ // exception, the caller will be aware that it did set (e.g., see
+ // _M_set_result).
+ *__did_set = true;
_M_result.swap(__res);
+ _M_status.store_notify_all(_Status::ready, memory_order_release);
}
- bool _M_ready() const noexcept { return static_cast<bool>(_M_result); }
-
// Wait for completion of async function.
virtual void _M_complete_async() { }
- // Return true if state contains a deferred function.
- // Caller must own _M_mutex.
- virtual bool _M_has_deferred() const { return false; }
+ // Return true if the future contains a deferred function.
+ virtual bool _M_is_deferred_future() const { return false; }
};
#ifdef _GLIBCXX_ASYNC_ABI_COMPAT
@@ -1458,7 +1473,7 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
}
virtual bool
- _M_has_deferred() const { return static_cast<bool>(_M_result); }
+ _M_is_deferred_future() const { return true; }
};
class __future_base::_Async_state_commonV2
@@ -53,6 +53,7 @@ sources = \
debug.cc \
functexcept.cc \
functional.cc \
+ futex.cc \
future.cc \
hash_c++0x.cc \
hashtable_c++0x.cc \
@@ -70,7 +70,7 @@ libc__11convenience_la_LIBADD =
@ENABLE_CXX11_ABI_TRUE@am__objects_1 = cxx11-ios_failure.lo
am__objects_2 = ctype_configure_char.lo ctype_members.lo
am__objects_3 = chrono.lo condition_variable.lo ctype.lo debug.lo \
- functexcept.lo functional.lo future.lo hash_c++0x.lo \
+ functexcept.lo functional.lo futex.lo future.lo hash_c++0x.lo \
hashtable_c++0x.lo ios.lo limits.lo mutex.lo placeholders.lo \
random.lo regex.lo shared_ptr.lo snprintf_lite.lo \
system_error.lo thread.lo $(am__objects_1) $(am__objects_2)
@@ -334,6 +334,7 @@ sources = \
debug.cc \
functexcept.cc \
functional.cc \
+ futex.cc \
future.cc \
hash_c++0x.cc \
hashtable_c++0x.cc \
new file mode 100644
@@ -0,0 +1,72 @@
+// futex -*- C++ -*-
+
+// Copyright (C) 2008-2014 Free Software Foundation, Inc.
+//
+// This file is part of the GNU ISO C++ Library. This library is free
+// software; you can redistribute it and/or modify it under the
+// terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 3, or (at your option)
+// any later version.
+
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// Under Section 7 of GPL version 3, you are granted additional
+// permissions described in the GCC Runtime Library Exception, version
+// 3.1, as published by the Free Software Foundation.
+
+// You should have received a copy of the GNU General Public License and
+// a copy of the GCC Runtime Library Exception along with this program;
+// see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
+// <http://www.gnu.org/licenses/>.
+
+#include <bits/c++config.h>
+#include <bits/atomic_futex.h>
+#include <bits/gthr.h>
+#if defined(__GTHREADS) && defined(__GTHREAD_HAS_COND) \
+ && (ATOMIC_INT_LOCK_FREE > 1) && defined(_GLIBCXX_HAVE_LINUX_FUTEX)
+# include <climits>
+# include <syscall.h>
+# include <unistd.h>
+# define _GLIBCXX_USE_FUTEX
+# define _GLIBCXX_FUTEX_WAIT 0
+# define _GLIBCXX_FUTEX_WAKE 1
+#endif
+
+namespace std //_GLIBCXX_VISIBILITY(default)
+{
+//_GLIBCXX_BEGIN_NAMESPACE_VERSION
+
+ bool
+ __atomic_futex_unsigned_base::futex_wait_for(unsigned *addr, unsigned val,
+ bool has_timeout, chrono::seconds __s, chrono::nanoseconds __ns)
+ {
+ if (!has_timeout)
+ {
+ syscall (SYS_futex, addr, _GLIBCXX_FUTEX_WAIT, val);
+ return true;
+ }
+ else
+ {
+ // If we already timed out, don't execute the futex operation.
+ if (__ns.count() < 0 || __s.count() < 0)
+ return ETIMEDOUT;
+ __gthread_time_t ts =
+ {
+ static_cast<std::time_t>(__s.count()),
+ static_cast<long>(__ns.count())
+ };
+ int err = syscall (SYS_futex, addr, _GLIBCXX_FUTEX_WAIT, val, &ts);
+ return err != ETIMEDOUT;
+ }
+ }
+
+ void
+ __atomic_futex_unsigned_base::futex_notify_all(unsigned* addr)
+ {
+ syscall (SYS_futex, addr, _GLIBCXX_FUTEX_WAKE, INT_MAX);
+ }
+
+}