diff mbox

Fix libgomp semaphores

Message ID 20111127225343.GA7827@bubble.grove.modra.org
State New
Headers show

Commit Message

Alan Modra Nov. 27, 2011, 10:53 p.m. UTC
On Fri, Nov 25, 2011 at 08:38:39AM +0100, Jakub Jelinek wrote:
> Furthermore, I'd prefer if the patch could be split into smaller parts,
> e.g. for bisecting purposes.  One patch would do the mutex changes
> to use new atomics, remove extra mutex.h headers and start using 0/1/-1
> instead of 0/1/2.  And another patch would rewrite the semaphores.

As requested, this is the semaphore rewrite.  The code is modelleled
on the existing mutex implementation, using a single int to hold 31
bits of count info and a wait flag bit.  Like the mutexes, these
semaphores have the nice property that no syscalls are made unless
threads are waiting on the semaphore, and if we reach the contended
case, there is an orderly transition back to uncontended.  Also, I
believe this implementation uses the bare minimum of synchronization
instructions.

Unlike the old semaphore implementation, I do not try to wake multiple
threads in the sem_post_slow futex_wake call.  That's because it is
necessary to wake further threads if possible on exiting from
sem_wait_slow, and given that is necessary, then waking multiple
threads just leads to unnecessary futex_wake syscalls.  You can see
why the wake in sem_wait_slow is necessary by considering a semaphore
set up to allow two threads to work on some job.  If there were four
threads trying to get the semaphore, two would succeed, A and B, and
two, C and D, end up waiting in gomp_set_wait_slow.  When A finishes
its job, it will call gomp_sem_post to increment the semaphore, see
and clear the wait flag, and call futex_wake on one thread.  If B also
finishes while this is happening, but before C has woken, then B's
call to gomp_sem_post may not see the wait flag set, leaving D asleep.

Incidentally, the regression I mentioned in
http://gcc.gnu.org/ml/gcc-patches/2011-11/msg02393.html wasn't real.
(I'd been testing for failures with a script that repeatedly hammered
selected libgomp tests with
while LD_LIBRARY_PATH=blahblah ./sometest; do :; done
because a single gcc testsuite run often doesn't catch locking
failures.  The regression was due to a typo on LD_LIBRARY_PATH..)

Bootstrapped and regression tested powerpc-linux.  OK to apply?

	PR libgomp/51249
	* config/linux/sem.h: Rewrite.
	* config/linux/sem.c: Rewrite.

Comments

Jakub Jelinek Nov. 28, 2011, 4:23 p.m. UTC | #1
On Mon, Nov 28, 2011 at 09:23:43AM +1030, Alan Modra wrote:
> +  int count = *sem;
> +
> +  while ((count & 0x7fffffff) != 0)
> +    {
> +      int oldval = count;
> +      __atomic_compare_exchange_4 (sem, &oldval, count - 1,
> +				   false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
> +      if (__builtin_expect (oldval == count, 1))
> +	return;

Why aren't you instead testing the return value of
__atomic_compare_exchange_4 (here and in other cases)?


	Jakub
Alan Modra Nov. 28, 2011, 10:16 p.m. UTC | #2
On Mon, Nov 28, 2011 at 05:23:37PM +0100, Jakub Jelinek wrote:
> On Mon, Nov 28, 2011 at 09:23:43AM +1030, Alan Modra wrote:
> > +  int count = *sem;
> > +
> > +  while ((count & 0x7fffffff) != 0)
> > +    {
> > +      int oldval = count;
> > +      __atomic_compare_exchange_4 (sem, &oldval, count - 1,
> > +				   false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
> > +      if (__builtin_expect (oldval == count, 1))
> > +	return;
> 
> Why aren't you instead testing the return value of
> __atomic_compare_exchange_4 (here and in other cases)?

If you use the return value on powerpc, you find that requires two
load immediate instructions (loading zero and one), and a compare
against zero.  That makes three fewer instructions as written, because
the oldval == count comparison has already been done inside the atomic
sequence.  I'd expect fewer on most other architectures unless they
happen to have a compare and exchange instruction that sets condition
bits (ie. Intel).  Even on Intel the way I've written the code
shouldn't take more instructions with a properly written cmpxchg rtl
description.  Does it?

Hmm, I suppose you could argue that powerpc and others ought to not
generate those three extra instructions when using the return value.
I'll see about fixing powerpc.
Richard Henderson Nov. 28, 2011, 11:05 p.m. UTC | #3
On 11/28/2011 02:16 PM, Alan Modra wrote:
> Hmm, I suppose you could argue that powerpc and others ought to not
> generate those three extra instructions when using the return value.
> I'll see about fixing powerpc.

True.  For weak, the value *should* always be used (otherwise the user program is broken).

However, we can do better by considering the value to be stored in CR0.  We perform the comparison in the loop, which sets CR0 == EQ (true) or NE (false); CR0 is set again by the STWCX insn in the same fashion.  So the

      /* ??? It's either this or an unlikely jump over (set bool 1).  */
      x = gen_rtx_EQ (SImode, cond, const0_rtx);
      emit_insn (gen_rtx_SET (VOIDmode, boolval, x));

code currently emitted by rs6000_expand_atomic_compare_and_swap is sufficient, and merely needs to be adjusted so that it is computed after label2, and other settings to boolval removed.

This may be sufficient for better stong compare-and-swap as well.


r~
Richard Henderson Nov. 29, 2011, midnight UTC | #4
On 11/27/2011 02:53 PM, Alan Modra wrote:
> +enum memmodel
> +{
> +  MEMMODEL_RELAXED = 0,
> +  MEMMODEL_CONSUME = 1,
> +  MEMMODEL_ACQUIRE = 2,
> +  MEMMODEL_RELEASE = 3,
> +  MEMMODEL_ACQ_REL = 4,
> +  MEMMODEL_SEQ_CST = 5,
> +  MEMMODEL_LAST = 6
> +};

This should probably go to libgomp.h.

>  /* This is a Linux specific implementation of a semaphore synchronization
>     mechanism for libgomp.  This type is private to the library.  This 
> +   counting semaphore implementation uses atomic instructions and the
> +   futex syscall, and a single 32-bit int to store semaphore state.
> +   The low 31 bits are the count, the top bit is a flag set when some
> +   threads may be waiting.  */

I think we'll get better code generation on a number of targets if we make the low bit the waiting  bit, and the higher bits the count.  This we do (count & 1) and (count + 2) instead of larger constants needing to be generated.  Not changed below...

> +static inline void
> +gomp_sem_wait (gomp_sem_t *sem)
>  {
> +  int count = *sem;
> +
> +  while ((count & 0x7fffffff) != 0)
> +    {
> +      int oldval = count;
> +      __atomic_compare_exchange_4 (sem, &oldval, count - 1,
> +				   false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
> +      if (__builtin_expect (oldval == count, 1))
> +	return;
> +      count = oldval;
> +    }

I'd really prefer not to hard-code any sizes at all.  Let's change the explicit _4 to _n everywhere.

The loop above doesn't actually make sense to me.  If the compare-and-swap succeeds, then oldval == count - 1.  Which doesn't seem to be what you're testing at all.

Perhaps this entire function is better written as

{
  int count = *sem;
  do
    {
      if (__builtin_expect (count & 0x80000000u, 0)
        {
          gomp_sem_wait_slow (sem, count);
          return;
        }
    }
  while (!__atomic_compare_exchange_n (sem, &count, count - 1, true,
				       MEMMODEL_ACQUIRE, MEMMODEL_RELAXED));
}

> +gomp_sem_post (gomp_sem_t *sem)
>  {
> +  int count = *sem;
> +
> +  while (1)
> +    {
> +      int oldval = count;
> +      __atomic_compare_exchange_4 (sem, &oldval, (count + 1) & 0x7fffffff,
> +				   false, MEMMODEL_RELEASE, MEMMODEL_RELAXED);
> +      if (__builtin_expect (oldval == count, 1))
> +	break;

Again, this equality doesn't make sense.  Further, you aren't testing for the
high bit set inside the loop, nor are you preserving the high bit.

Perhaps better written as

{
  int count = *sem;
  do
    {
      if (__builtin_expect (count & 0x80000000u, 0))
	{
	  gomp_sem_post_slow (sem, count);
	  return;
	}
      /* We can't post more than 2**31-1 times.  */
      assert (count < 0x7fffffff);
    }
  while (!__atomic_compare_exchange_n (sem, &count, count + 1, true,
				       MEMMODEL_RELEASE, MEMMODEL_RELAXED));
}

... All that said, I don't see how we can ever clear the wait bit once its set?  Are we better off splitting the 32-bit value into two 16-bit fields for value+waiters?  Or similarly splitting a 64-bit value?  That way we can at least update both fields atomically, and we ought never lose a waiter.


r~
Alan Modra Nov. 29, 2011, 12:52 a.m. UTC | #5
On Mon, Nov 28, 2011 at 04:00:10PM -0800, Richard Henderson wrote:
> On 11/27/2011 02:53 PM, Alan Modra wrote:
> > +enum memmodel
> > +{
> > +  MEMMODEL_RELAXED = 0,
> > +  MEMMODEL_CONSUME = 1,
> > +  MEMMODEL_ACQUIRE = 2,
> > +  MEMMODEL_RELEASE = 3,
> > +  MEMMODEL_ACQ_REL = 4,
> > +  MEMMODEL_SEQ_CST = 5,
> > +  MEMMODEL_LAST = 6
> > +};
> 
> This should probably go to libgomp.h.

I wondered about that.

> >  /* This is a Linux specific implementation of a semaphore synchronization
> >     mechanism for libgomp.  This type is private to the library.  This 
> > +   counting semaphore implementation uses atomic instructions and the
> > +   futex syscall, and a single 32-bit int to store semaphore state.
> > +   The low 31 bits are the count, the top bit is a flag set when some
> > +   threads may be waiting.  */
> 
> I think we'll get better code generation on a number of targets if we make the low bit the waiting  bit, and the higher bits the count.  This we do (count & 1) and (count + 2) instead of larger constants needing to be generated.  Not changed below...

Funny, that's the way I wrote this code at first, then went for the
wait bit as the sign bit because you can test > 0 in one place where
you want "not waiting and count non-zero".

> > +static inline void
> > +gomp_sem_wait (gomp_sem_t *sem)
> >  {
> > +  int count = *sem;
> > +
> > +  while ((count & 0x7fffffff) != 0)
> > +    {
> > +      int oldval = count;
> > +      __atomic_compare_exchange_4 (sem, &oldval, count - 1,
> > +				   false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
> > +      if (__builtin_expect (oldval == count, 1))
> > +	return;
> > +      count = oldval;
> > +    }
> 
> I'd really prefer not to hard-code any sizes at all.  Let's change the explicit _4 to _n everywhere.

OK.

> The loop above doesn't actually make sense to me.  If the compare-and-swap succeeds, then oldval == count - 1.  Which doesn't seem to be what you're testing at all.

Huh?  If it succeeds, oldval == count and we return.

> Perhaps this entire function is better written as
> 
> {
>   int count = *sem;
>   do
>     {
>       if (__builtin_expect (count & 0x80000000u, 0)
>         {
>           gomp_sem_wait_slow (sem, count);
>           return;
>         }
>     }
>   while (!__atomic_compare_exchange_n (sem, &count, count - 1, true,
> 				       MEMMODEL_ACQUIRE, MEMMODEL_RELAXED));
> }

No, we don't need the slow path if we have *sem & 0x7fffffff non-zero.

> 
> > +gomp_sem_post (gomp_sem_t *sem)
> >  {
> > +  int count = *sem;
> > +
> > +  while (1)
> > +    {
> > +      int oldval = count;
> > +      __atomic_compare_exchange_4 (sem, &oldval, (count + 1) & 0x7fffffff,
> > +				   false, MEMMODEL_RELEASE, MEMMODEL_RELAXED);
> > +      if (__builtin_expect (oldval == count, 1))
> > +	break;
> 
> Again, this equality doesn't make sense.  Further, you aren't testing for the
> high bit set inside the loop, nor are you preserving the high bit.

See above about the equality.  We don't want to preserve the wait bit
here.  Otherwise we'd never go back to the uncontended state, which
answers

> ... All that said, I don't see how we can ever clear the wait bit
> once its set?

this question.

> Are we better off splitting the 32-bit value into two 16-bit fields for value+waiters?  Or similarly splitting a 64-bit value?  That way we can at least update both fields atomically, and we ought never lose a waiter.

Other splits of the field are certainly possible, but of course
restrict the max number, and you'd need the fancy futex op_wait.
Some targets don't have 64-bit atomics, so we can't really go that
way.  Yes, if I did keep track of number of waiters we'd save one
unnecessary futex_wake call, but I'm quite confident no waiters will
be lost just using a flag.
diff mbox

Patch

Index: libgomp/config/linux/sem.h
===================================================================
--- libgomp/config/linux/sem.h	(revision 181718)
+++ libgomp/config/linux/sem.h	(working copy)
@@ -24,34 +24,74 @@ 
 
 /* This is a Linux specific implementation of a semaphore synchronization
    mechanism for libgomp.  This type is private to the library.  This 
-   implementation uses atomic instructions and the futex syscall.  */
+   counting semaphore implementation uses atomic instructions and the
+   futex syscall, and a single 32-bit int to store semaphore state.
+   The low 31 bits are the count, the top bit is a flag set when some
+   threads may be waiting.  */
 
 #ifndef GOMP_SEM_H
 #define GOMP_SEM_H 1
 
 typedef int gomp_sem_t;
 
-static inline void gomp_sem_init (gomp_sem_t *sem, int value)
+enum memmodel
+{
+  MEMMODEL_RELAXED = 0,
+  MEMMODEL_CONSUME = 1,
+  MEMMODEL_ACQUIRE = 2,
+  MEMMODEL_RELEASE = 3,
+  MEMMODEL_ACQ_REL = 4,
+  MEMMODEL_SEQ_CST = 5,
+  MEMMODEL_LAST = 6
+};
+
+extern void gomp_sem_wait_slow (gomp_sem_t *, int);
+extern void gomp_sem_post_slow (gomp_sem_t *);
+
+static inline void
+gomp_sem_init (gomp_sem_t *sem, int value)
 {
   *sem = value;
 }
 
-extern void gomp_sem_wait_slow (gomp_sem_t *);
-static inline void gomp_sem_wait (gomp_sem_t *sem)
+static inline void
+gomp_sem_destroy (gomp_sem_t *sem)
 {
-  if (!__sync_bool_compare_and_swap (sem, 1, 0))
-    gomp_sem_wait_slow (sem);
 }
 
-extern void gomp_sem_post_slow (gomp_sem_t *);
-static inline void gomp_sem_post (gomp_sem_t *sem)
+static inline void
+gomp_sem_wait (gomp_sem_t *sem)
 {
-  if (!__sync_bool_compare_and_swap (sem, 0, 1))
-    gomp_sem_post_slow (sem);
+  int count = *sem;
+
+  while ((count & 0x7fffffff) != 0)
+    {
+      int oldval = count;
+      __atomic_compare_exchange_4 (sem, &oldval, count - 1,
+				   false, MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
+      if (__builtin_expect (oldval == count, 1))
+	return;
+      count = oldval;
+    }
+  gomp_sem_wait_slow (sem, count);
 }
 
-static inline void gomp_sem_destroy (gomp_sem_t *sem)
+static inline void
+gomp_sem_post (gomp_sem_t *sem)
 {
-}
+  int count = *sem;
+
+  while (1)
+    {
+      int oldval = count;
+      __atomic_compare_exchange_4 (sem, &oldval, (count + 1) & 0x7fffffff,
+				   false, MEMMODEL_RELEASE, MEMMODEL_RELAXED);
+      if (__builtin_expect (oldval == count, 1))
+	break;
+      count = oldval;
+    }
 
+  if (__builtin_expect (count & 0x80000000, 0))
+    gomp_sem_post_slow (sem);
+}
 #endif /* GOMP_SEM_H */
Index: libgomp/config/linux/sem.c
===================================================================
--- libgomp/config/linux/sem.c	(revision 181718)
+++ libgomp/config/linux/sem.c	(working copy)
@@ -28,34 +28,69 @@ 
 
 #include "wait.h"
 
-
 void
-gomp_sem_wait_slow (gomp_sem_t *sem)
+gomp_sem_wait_slow (gomp_sem_t *sem, int count)
 {
-  while (1)
+  int oldval, newval;
+
+  /* First loop spins a while.  */
+  while (count == 0)
     {
-      int val = __sync_val_compare_and_swap (sem, 0, -1);
-      if (val > 0)
+      if (do_spin (sem, 0))
+	{
+	  /* Spin timeout, nothing changed.  Set waiting flag.  */
+	  oldval = 0;
+	  __atomic_compare_exchange_4 (sem, &oldval, 0x80000000, false,
+				       MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
+	  count = oldval;
+	  if (oldval == 0)
+	    {
+	      futex_wait (sem, 0x80000000);
+	      count = *sem;
+	    }
+	  break;
+	}
+      /* Something changed.  If positive, we're good to go.  */
+      else if ((count = *sem) > 0)
 	{
-	  if (__sync_bool_compare_and_swap (sem, val, val - 1))
+	  oldval = count;
+	  __atomic_compare_exchange_4 (sem, &oldval, count - 1, false,
+				       MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
+	  if (oldval == count)
 	    return;
+	  count = oldval;
 	}
-      do_wait (sem, -1);
+    }
+
+  /* Second loop waits until semaphore is posted.  We always exit this
+     loop with wait flag set, so next post will awaken a thread.  */
+  while (1)
+    {
+      oldval = count;
+      newval = 0x80000000;
+      if ((count & 0x7fffffff) != 0)
+	newval |= count - 1;
+      __atomic_compare_exchange_4 (sem, &oldval, newval, false,
+				   MEMMODEL_ACQUIRE, MEMMODEL_RELAXED);
+      if (oldval == count)
+	{
+	  if ((count & 0x7fffffff) != 0)
+	    {
+	      /* If we can wake more threads, do so now.  */
+	      if ((count & 0x7fffffff) > 1)
+		gomp_sem_post_slow (sem);
+	      break;
+	    }
+	  do_wait (sem, 0x80000000);
+	  count = *sem;
+	}
+      else
+	count = oldval;
     }
 }
 
 void
 gomp_sem_post_slow (gomp_sem_t *sem)
 {
-  int old, tmp = *sem, wake;
-
-  do
-    {
-      old = tmp;
-      wake = old > 0 ? old + 1 : 1;
-      tmp = __sync_val_compare_and_swap (sem, old, wake);
-    }
-  while (old != tmp);
-
-  futex_wake (sem, wake);
+  futex_wake (sem, 1);
 }