diff mbox

[-v4,5/7] locking, arch: Update spin_unlock_wait()

Message ID 20160602163425.GV3205@twins.programming.kicks-ass.net
State Awaiting Upstream
Delegated to: Pablo Neira
Headers show

Commit Message

Peter Zijlstra June 2, 2016, 4:34 p.m. UTC
On Thu, Jun 02, 2016 at 04:44:24PM +0200, Peter Zijlstra wrote:
> On Thu, Jun 02, 2016 at 10:24:40PM +0800, Boqun Feng wrote:
> > On Thu, Jun 02, 2016 at 01:52:02PM +0200, Peter Zijlstra wrote:
> > About spin_unlock_wait() on ppc, I actually have a fix pending review:
> > 
> > http://lkml.kernel.org/r/1461130033-70898-1-git-send-email-boqun.feng@gmail.com
> 
> > that patch fixed a different problem when people want to pair a
> > spin_unlock_wait() with a spin_lock().
> 
> Argh, indeed, and I think qspinlock is still broken there :/ But my poor
> brain is about to give in for the day.

This 'replaces' commit:

  54cf809b9512 ("locking,qspinlock: Fix spin_is_locked() and spin_unlock_wait()")

and seems to still work with the test case from that thread while
getting rid of the extra barriers.

---
 include/asm-generic/qspinlock.h | 37 +++++++----------------------------
 kernel/locking/qspinlock.c      | 43 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 30 deletions(-)

--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Will Deacon June 2, 2016, 5:57 p.m. UTC | #1
On Thu, Jun 02, 2016 at 06:34:25PM +0200, Peter Zijlstra wrote:
> On Thu, Jun 02, 2016 at 04:44:24PM +0200, Peter Zijlstra wrote:
> > On Thu, Jun 02, 2016 at 10:24:40PM +0800, Boqun Feng wrote:
> > > On Thu, Jun 02, 2016 at 01:52:02PM +0200, Peter Zijlstra wrote:
> > > About spin_unlock_wait() on ppc, I actually have a fix pending review:
> > > 
> > > http://lkml.kernel.org/r/1461130033-70898-1-git-send-email-boqun.feng@gmail.com
> > 
> > > that patch fixed a different problem when people want to pair a
> > > spin_unlock_wait() with a spin_lock().
> > 
> > Argh, indeed, and I think qspinlock is still broken there :/ But my poor
> > brain is about to give in for the day.
> 
> This 'replaces' commit:
> 
>   54cf809b9512 ("locking,qspinlock: Fix spin_is_locked() and spin_unlock_wait()")
> 
> and seems to still work with the test case from that thread while
> getting rid of the extra barriers.
> 
> ---
>  include/asm-generic/qspinlock.h | 37 +++++++----------------------------
>  kernel/locking/qspinlock.c      | 43 +++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 50 insertions(+), 30 deletions(-)
> 
> diff --git a/include/asm-generic/qspinlock.h b/include/asm-generic/qspinlock.h
> index 6bd05700d8c9..9e3dff16d5dc 100644
> --- a/include/asm-generic/qspinlock.h
> +++ b/include/asm-generic/qspinlock.h
> @@ -28,30 +28,13 @@
>   */
>  static __always_inline int queued_spin_is_locked(struct qspinlock *lock)
>  {
> -	/*
> -	 * queued_spin_lock_slowpath() can ACQUIRE the lock before
> -	 * issuing the unordered store that sets _Q_LOCKED_VAL.
> -	 *
> -	 * See both smp_cond_acquire() sites for more detail.
> -	 *
> -	 * This however means that in code like:
> -	 *
> -	 *   spin_lock(A)		spin_lock(B)
> -	 *   spin_unlock_wait(B)	spin_is_locked(A)
> -	 *   do_something()		do_something()
> +	/* 
> +	 * See queued_spin_unlock_wait().
>  	 *
> -	 * Both CPUs can end up running do_something() because the store
> -	 * setting _Q_LOCKED_VAL will pass through the loads in
> -	 * spin_unlock_wait() and/or spin_is_locked().
> -	 *
> -	 * Avoid this by issuing a full memory barrier between the spin_lock()
> -	 * and the loads in spin_unlock_wait() and spin_is_locked().
> -	 *
> -	 * Note that regular mutual exclusion doesn't care about this
> -	 * delayed store.
> +	 * Any !0 state indicates it is locked, even if _Q_LOCKED_VAL
> +	 * isn't immediately observable.
>  	 */
> -	smp_mb();
> -	return atomic_read(&lock->val) & _Q_LOCKED_MASK;
> +	return !!atomic_read(&lock->val);
>  }

I'm failing to keep up here :(

The fast-path code in queued_spin_lock is just an atomic_cmpxchg_acquire.
If that's built out of LL/SC instructions, then why don't we need a barrier
here in queued_spin_is_locked?

Or is the decision now that only spin_unlock_wait is required to enforce
this ordering?

Will
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Peter Zijlstra June 2, 2016, 9:51 p.m. UTC | #2
On Thu, Jun 02, 2016 at 06:57:00PM +0100, Will Deacon wrote:
> > +++ b/include/asm-generic/qspinlock.h
> > @@ -28,30 +28,13 @@
> >   */
> >  static __always_inline int queued_spin_is_locked(struct qspinlock *lock)
> >  {
> > +	/* 
> > +	 * See queued_spin_unlock_wait().
> >  	 *
> > +	 * Any !0 state indicates it is locked, even if _Q_LOCKED_VAL
> > +	 * isn't immediately observable.
> >  	 */
> > -	smp_mb();
> > +	return !!atomic_read(&lock->val);
> >  }
> 
> I'm failing to keep up here :(
> 
> The fast-path code in queued_spin_lock is just an atomic_cmpxchg_acquire.
> If that's built out of LL/SC instructions, then why don't we need a barrier
> here in queued_spin_is_locked?
> 
> Or is the decision now that only spin_unlock_wait is required to enforce
> this ordering?

(warning: long, somewhat rambling, email)

You're talking about the smp_mb() that went missing?

So that wasn't the reason that smp_mb() existed..... but that makes the
atomic_foo_acquire() things somewhat hard to use, because I don't think
we want to unconditionally put the smp_mb() in there just in case.

Now, the normal atomic_foo_acquire() stuff uses smp_mb() as per
smp_mb__after_atomic(), its just ARM64 and PPC that go all 'funny' and
need this extra barrier. Blergh. So lets shelf this issue for a bit.

Let me write some text to hopefully explain where it did come from and
why I now removed it.


So the spin_is_locked() correctness issue comes from something like:

CPU0					CPU1

global_lock();				local_lock(i)
  spin_lock(&G)				  spin_lock(&L[i])
  for (i)				  if (!spin_is_locked(&G)) {
    spin_unlock_wait(&L[i]);		    smp_acquire__after_ctrl_dep();
					    return;
					  }
					  /* deal with fail */

Where it is important CPU1 sees G locked or CPU0 sees L[i] locked such
that there is exclusion between the two critical sections.

The load from spin_is_locked(&G) is constrained by the ACQUIRE from
spin_lock(&L[i]), and similarly the load(s) from spin_unlock_wait(&L[i])
are constrained by the ACQUIRE from spin_lock(&G).

Similarly, later stuff is constrained by the ACQUIRE from CTRL+RMB.


Given a simple (SC) test-and-set spinlock the above is fairly straight
forward and 'correct', right?


Now, the 'problem' with qspinlock is that one possible acquire path goes
like (there are more, but this is the easiest):

	smp_cond_acquire(!(atomic_read(&lock->val) & _Q_LOCKED_MASK));
	clear_pending_set_locked(lock);

And one possible implementation of clear_pending_set_locked() is:

	WRITE_ONCE(l->locked_pending, _Q_LOCKED_VAL);

IOW, we load-acquire the locked byte until its cleared, at which point
we know the pending byte to be 1. Then we consider the lock owned by us
and issue a regular unordered store to flip the pending and locked
bytes.

Normal mutual exclusion is fine with this, no new pending can happen
until this store becomes visible at which time the locked byte is
visibly taken.

This unordered store however, can be delayed (store buffer) such that
the loads from spin_unlock_wait/spin_is_locked can pass up before it
(even on TSO arches).

_IF_ spin_unlock_wait/spin_is_locked only look at the locked byte, this
is a problem because at that point the crossed store-load pattern
becomes uncrossed and we loose our guarantee. That is, what used to be:

	[S] G.locked = 1	[S] L[i].locked = 1
	[MB]			[MB]
	[L] L[i].locked		[L] G.locked

becomes:

	[S] G.locked = 1	[S] L[i].locked = 1
	[L] L[i].locked		[L] G.locked

Which we can reorder at will and bad things follow.

The previous fix for this was to include an smp_mb() in both
spin_is_locked() and spin_unlock_wait() to restore that ordering.


So at this point spin_is_locked() looks like:

	smp_mb();
	while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
		cpu_relax();


But for spin_unlock_wait() there is a second correctness issue, namely:

CPU0				CPU1

flag = set;
smp_mb();			spin_lock(&l)
spin_unlock_wait(&l);		if (flag)
				  /* bail */

				/* add to lockless list */
				spin_unlock(&l);

/* iterate lockless list */


Which ensures that CPU1 will stop adding bits to the list and CPU0 will
observe the last entry on the list (if spin_unlock_wait() had ACQUIRE
semantics etc..)

This however, is still broken.. nothing ensures CPU0 sees l.locked
before CPU1 tests flag.

So while we fixed the first correctness case (global / local locking as
employed by ipc/sem and nf_conntrack) this is still very much broken.

My patch today rewrites spin_unlock_wait() and spin_is_locked() to rely
on more information to (hopefully -- I really need sleep) fix both.

The primary observation is that even though the l.locked store is
delayed, there has been a prior atomic operation on the lock word to
register the contending lock (in the above scenario, set the pending
byte, in the other paths, queue onto the tail word).

This means that any load passing the .locked byte store, must at least
observe that state.

Therefore, spin_is_locked() looks for any !0 state and is done. Even if
the locked byte is cleared, if any of the other fields are !0 it is in
fact locked.

spin_unlock_wait() is slightly more complex, it becomes:

 1) if the entire word is 0 -- we're unlocked, done
 2) if the locked byte is 0 (there must be other state, otherwise the
    previous case), wait until we see the locked byte.
 3) wait for the locked byte to be cleared

Which relies on the same observation, that even though we might not
observe the locked store, we must observe a store by the contending
lock.

This means the scenario above now becomes:

	[S] G.pending = 1		[S] L[i].pending = 1
	[MB]				[MB]
	[S] G.locked_pending = 1	[S] L[i].locked_pending = 1
	[L] L[i]			[L] G

And things are good again, we simply do not care if the unordered store
is observed or not.

Make sense?

--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Will Deacon June 3, 2016, 12:47 p.m. UTC | #3
Hi Peter,

On Thu, Jun 02, 2016 at 11:51:19PM +0200, Peter Zijlstra wrote:
> On Thu, Jun 02, 2016 at 06:57:00PM +0100, Will Deacon wrote:
> > > +++ b/include/asm-generic/qspinlock.h
> > > @@ -28,30 +28,13 @@
> > >   */
> > >  static __always_inline int queued_spin_is_locked(struct qspinlock *lock)
> > >  {
> > > +	/* 
> > > +	 * See queued_spin_unlock_wait().
> > >  	 *
> > > +	 * Any !0 state indicates it is locked, even if _Q_LOCKED_VAL
> > > +	 * isn't immediately observable.
> > >  	 */
> > > -	smp_mb();
> > > +	return !!atomic_read(&lock->val);
> > >  }
> > 
> > I'm failing to keep up here :(
> > 
> > The fast-path code in queued_spin_lock is just an atomic_cmpxchg_acquire.
> > If that's built out of LL/SC instructions, then why don't we need a barrier
> > here in queued_spin_is_locked?
> > 
> > Or is the decision now that only spin_unlock_wait is required to enforce
> > this ordering?
> 
> (warning: long, somewhat rambling, email)

Thanks for taking the time to write this up. Comments inline.

> You're talking about the smp_mb() that went missing?

Right -- I think you still need it.

> So that wasn't the reason that smp_mb() existed..... but that makes the
> atomic_foo_acquire() things somewhat hard to use, because I don't think
> we want to unconditionally put the smp_mb() in there just in case.
> 
> Now, the normal atomic_foo_acquire() stuff uses smp_mb() as per
> smp_mb__after_atomic(), its just ARM64 and PPC that go all 'funny' and
> need this extra barrier. Blergh. So lets shelf this issue for a bit.

Hmm... I certainly plan to get qspinlock up and running for arm64 in the
near future, so I'm not keen on shelving it for very long.

> Let me write some text to hopefully explain where it did come from and
> why I now removed it.
> 
> 
> So the spin_is_locked() correctness issue comes from something like:
> 
> CPU0					CPU1
> 
> global_lock();				local_lock(i)
>   spin_lock(&G)				  spin_lock(&L[i])
>   for (i)				  if (!spin_is_locked(&G)) {
>     spin_unlock_wait(&L[i]);		    smp_acquire__after_ctrl_dep();
> 					    return;
> 					  }
> 					  /* deal with fail */
> 
> Where it is important CPU1 sees G locked or CPU0 sees L[i] locked such
> that there is exclusion between the two critical sections.

Yes, and there's also a version of this where CPU0 is using spin_is_locked
(see 51d7d5205d338 "powerpc: Add smp_mb() to arch_spin_is_locked()").

> The load from spin_is_locked(&G) is constrained by the ACQUIRE from
> spin_lock(&L[i]), and similarly the load(s) from spin_unlock_wait(&L[i])
> are constrained by the ACQUIRE from spin_lock(&G).
> 
> Similarly, later stuff is constrained by the ACQUIRE from CTRL+RMB.
> 
> Given a simple (SC) test-and-set spinlock the above is fairly straight
> forward and 'correct', right?

Well, the same issue that you want to shelve can manifest here with
test-and-set locks too, allowing the spin_is_locked(&G) to be speculated
before the spin_lock(&L[i]) has finished taking the lock on CPU1.

Even ignoring that, I'm not convinced this would work for test-and-set
locks without barriers in unlock_wait and is_locked. For example, a
cut-down version of your test looks like:

CPU0:		CPU1:
LOCK x		LOCK y
Read y		Read x

and you don't want the reads to both return "unlocked".

Even on x86, I think you need a fence here:

X86 lock
{
}
 P0                | P1                ;
 MOV EAX,$1        | MOV EAX,$1        ;
 LOCK XCHG [x],EAX | LOCK XCHG [y],EAX ;
 MOV EBX,[y]       | MOV EBX,[x]       ;
exists
(0:EAX=0 /\ 0:EBX=0 /\ 1:EAX=0 /\ 1:EBX=0)

is permitted by herd.

> Now, the 'problem' with qspinlock is that one possible acquire path goes
> like (there are more, but this is the easiest):
> 
> 	smp_cond_acquire(!(atomic_read(&lock->val) & _Q_LOCKED_MASK));
> 	clear_pending_set_locked(lock);
> 
> And one possible implementation of clear_pending_set_locked() is:
> 
> 	WRITE_ONCE(l->locked_pending, _Q_LOCKED_VAL);
> 
> IOW, we load-acquire the locked byte until its cleared, at which point
> we know the pending byte to be 1. Then we consider the lock owned by us
> and issue a regular unordered store to flip the pending and locked
> bytes.
> 
> Normal mutual exclusion is fine with this, no new pending can happen
> until this store becomes visible at which time the locked byte is
> visibly taken.
> 
> This unordered store however, can be delayed (store buffer) such that
> the loads from spin_unlock_wait/spin_is_locked can pass up before it
> (even on TSO arches).

Right, and this is surprisingly similar to the LL/SC problem imo.

> _IF_ spin_unlock_wait/spin_is_locked only look at the locked byte, this
> is a problem because at that point the crossed store-load pattern
> becomes uncrossed and we loose our guarantee. That is, what used to be:
> 
> 	[S] G.locked = 1	[S] L[i].locked = 1
> 	[MB]			[MB]
> 	[L] L[i].locked		[L] G.locked
> 
> becomes:
> 
> 	[S] G.locked = 1	[S] L[i].locked = 1
> 	[L] L[i].locked		[L] G.locked
> 
> Which we can reorder at will and bad things follow.
> 
> The previous fix for this was to include an smp_mb() in both
> spin_is_locked() and spin_unlock_wait() to restore that ordering.
> 
> 
> So at this point spin_is_locked() looks like:
> 
> 	smp_mb();
> 	while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
> 		cpu_relax();

I have something similar queued for arm64's ticket locks.

> But for spin_unlock_wait() there is a second correctness issue, namely:
> 
> CPU0				CPU1
> 
> flag = set;
> smp_mb();			spin_lock(&l)
> spin_unlock_wait(&l);		if (flag)
> 				  /* bail */
> 
> 				/* add to lockless list */
> 				spin_unlock(&l);
> 
> /* iterate lockless list */
> 
> 
> Which ensures that CPU1 will stop adding bits to the list and CPU0 will
> observe the last entry on the list (if spin_unlock_wait() had ACQUIRE
> semantics etc..)
> 
> This however, is still broken.. nothing ensures CPU0 sees l.locked
> before CPU1 tests flag.

Yup.

> So while we fixed the first correctness case (global / local locking as
> employed by ipc/sem and nf_conntrack) this is still very much broken.
> 
> My patch today rewrites spin_unlock_wait() and spin_is_locked() to rely
> on more information to (hopefully -- I really need sleep) fix both.
> 
> The primary observation is that even though the l.locked store is
> delayed, there has been a prior atomic operation on the lock word to
> register the contending lock (in the above scenario, set the pending
> byte, in the other paths, queue onto the tail word).
> 
> This means that any load passing the .locked byte store, must at least
> observe that state.

That's what I'm not sure about. Just because there was an atomic operation
writing that state, I don't think it means that it's visible to a normal
load.

Will
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Peter Zijlstra June 3, 2016, 1:42 p.m. UTC | #4
On Fri, Jun 03, 2016 at 01:47:34PM +0100, Will Deacon wrote:
> Even on x86, I think you need a fence here:
> 
> X86 lock
> {
> }
>  P0                | P1                ;
>  MOV EAX,$1        | MOV EAX,$1        ;
>  LOCK XCHG [x],EAX | LOCK XCHG [y],EAX ;
>  MOV EBX,[y]       | MOV EBX,[x]       ;
> exists
> (0:EAX=0 /\ 0:EBX=0 /\ 1:EAX=0 /\ 1:EBX=0)
> 
> is permitted by herd.

I am puzzled.. this should not be. You say adding MFENCE after LOCK XCHG
makes it 'work', but we assume LOCK <op> is a full fence all over the
place.

I'm thinking herd is busted.

Anybody? hpa, Linus?
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Peter Zijlstra June 3, 2016, 1:48 p.m. UTC | #5
On Fri, Jun 03, 2016 at 01:47:34PM +0100, Will Deacon wrote:
> > Now, the normal atomic_foo_acquire() stuff uses smp_mb() as per
> > smp_mb__after_atomic(), its just ARM64 and PPC that go all 'funny' and
> > need this extra barrier. Blergh. So lets shelf this issue for a bit.
> 
> Hmm... I certainly plan to get qspinlock up and running for arm64 in the
> near future, so I'm not keen on shelving it for very long.

Sure; so short term we could always have arm64/ppc specific versions of
these functions where the difference matters.

Alternatively we need to introduce yet another barrier like:

	smp_mb__after_acquire()

Or something like that, which is a no-op by default except for arm64 and
ppc.

But I'm thinking nobody really wants more barrier primitives :/ (says he
who just added one).

> > This unordered store however, can be delayed (store buffer) such that
> > the loads from spin_unlock_wait/spin_is_locked can pass up before it
> > (even on TSO arches).
> 
> Right, and this is surprisingly similar to the LL/SC problem imo.

Yes and no. Yes because its an unordered store, no because a competing
ll/sc cannot make it fail the store and retry as done per your and
boqun's fancy solution.


--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Will Deacon June 3, 2016, 5:35 p.m. UTC | #6
On Fri, Jun 03, 2016 at 03:42:49PM +0200, Peter Zijlstra wrote:
> On Fri, Jun 03, 2016 at 01:47:34PM +0100, Will Deacon wrote:
> > Even on x86, I think you need a fence here:
> > 
> > X86 lock
> > {
> > }
> >  P0                | P1                ;
> >  MOV EAX,$1        | MOV EAX,$1        ;
> >  LOCK XCHG [x],EAX | LOCK XCHG [y],EAX ;
> >  MOV EBX,[y]       | MOV EBX,[x]       ;
> > exists
> > (0:EAX=0 /\ 0:EBX=0 /\ 1:EAX=0 /\ 1:EBX=0)
> > 
> > is permitted by herd.
> 
> I am puzzled.. this should not be. You say adding MFENCE after LOCK XCHG
> makes it 'work', but we assume LOCK <op> is a full fence all over the
> place.
> 
> I'm thinking herd is busted.

FWIW -- I spoke to the Herd authors and they confirmed that this is a
regresion in herd (fixed in the bleeding edge version).

Will
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Peter Zijlstra June 3, 2016, 7:13 p.m. UTC | #7
On Fri, Jun 03, 2016 at 06:35:37PM +0100, Will Deacon wrote:
> On Fri, Jun 03, 2016 at 03:42:49PM +0200, Peter Zijlstra wrote:
> > On Fri, Jun 03, 2016 at 01:47:34PM +0100, Will Deacon wrote:
> > > Even on x86, I think you need a fence here:
> > > 
> > > X86 lock
> > > {
> > > }
> > >  P0                | P1                ;
> > >  MOV EAX,$1        | MOV EAX,$1        ;
> > >  LOCK XCHG [x],EAX | LOCK XCHG [y],EAX ;
> > >  MOV EBX,[y]       | MOV EBX,[x]       ;
> > > exists
> > > (0:EAX=0 /\ 0:EBX=0 /\ 1:EAX=0 /\ 1:EBX=0)
> > > 
> > > is permitted by herd.
> > 
> > I am puzzled.. this should not be. You say adding MFENCE after LOCK XCHG
> > makes it 'work', but we assume LOCK <op> is a full fence all over the
> > place.
> > 
> > I'm thinking herd is busted.
> 
> FWIW -- I spoke to the Herd authors and they confirmed that this is a
> regresion in herd (fixed in the bleeding edge version).

Good, means I'm not slowly going crazeh -- or at least, this isn't one
of the signs :-)
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/asm-generic/qspinlock.h b/include/asm-generic/qspinlock.h
index 6bd05700d8c9..9e3dff16d5dc 100644
--- a/include/asm-generic/qspinlock.h
+++ b/include/asm-generic/qspinlock.h
@@ -28,30 +28,13 @@ 
  */
 static __always_inline int queued_spin_is_locked(struct qspinlock *lock)
 {
-	/*
-	 * queued_spin_lock_slowpath() can ACQUIRE the lock before
-	 * issuing the unordered store that sets _Q_LOCKED_VAL.
-	 *
-	 * See both smp_cond_acquire() sites for more detail.
-	 *
-	 * This however means that in code like:
-	 *
-	 *   spin_lock(A)		spin_lock(B)
-	 *   spin_unlock_wait(B)	spin_is_locked(A)
-	 *   do_something()		do_something()
+	/* 
+	 * See queued_spin_unlock_wait().
 	 *
-	 * Both CPUs can end up running do_something() because the store
-	 * setting _Q_LOCKED_VAL will pass through the loads in
-	 * spin_unlock_wait() and/or spin_is_locked().
-	 *
-	 * Avoid this by issuing a full memory barrier between the spin_lock()
-	 * and the loads in spin_unlock_wait() and spin_is_locked().
-	 *
-	 * Note that regular mutual exclusion doesn't care about this
-	 * delayed store.
+	 * Any !0 state indicates it is locked, even if _Q_LOCKED_VAL
+	 * isn't immediately observable.
 	 */
-	smp_mb();
-	return atomic_read(&lock->val) & _Q_LOCKED_MASK;
+	return !!atomic_read(&lock->val);
 }
 
 /**
@@ -123,19 +106,13 @@  static __always_inline void queued_spin_unlock(struct qspinlock *lock)
 #endif
 
 /**
- * queued_spin_unlock_wait - wait until current lock holder releases the lock
+ * queued_spin_unlock_wait - wait until the _current_ lock holder releases the lock
  * @lock : Pointer to queued spinlock structure
  *
  * There is a very slight possibility of live-lock if the lockers keep coming
  * and the waiter is just unfortunate enough to not see any unlock state.
  */
-static inline void queued_spin_unlock_wait(struct qspinlock *lock)
-{
-	/* See queued_spin_is_locked() */
-	smp_mb();
-	while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
-		cpu_relax();
-}
+void queued_spin_unlock_wait(struct qspinlock *lock);
 
 #ifndef virt_spin_lock
 static __always_inline bool virt_spin_lock(struct qspinlock *lock)
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index ce2f75e32ae1..3a4e4b34584e 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -496,6 +496,49 @@  void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 EXPORT_SYMBOL(queued_spin_lock_slowpath);
 
 /*
+ * queued_spin_lock_slowpath() can ACQUIRE the lock before
+ * issuing an _unordered_ store to set _Q_LOCKED_VAL.
+ *
+ * This means that the store can be delayed, but no later than the
+ * store-release from the unlock. This means that simply observing
+ * _Q_LOCKED_VAL is not sufficient to determine if the lock is acquired.
+ *
+ * There are two sites that can issue the unordered store:
+ *
+ *  - clear_pending_set_locked(): *,1,0 -> *,0,1
+ *  - set_locked(): t,0,0 -> t,0,1 ; t != 0
+ *
+ * In both cases we have other !0 state that is observable before the
+ * lock store comes through. This means we can use that to wait for
+ * the lock store, and then wait for an unlock.
+ */
+void queued_spin_unlock_wait(struct qspinlock *lock)
+{
+	u32 val;
+
+	for (;;) {
+		val = atomic_read(&lock->val);
+
+		if (!val) /* not locked, we're done */
+			goto done;
+
+		if (val & _Q_LOCKED_MASK) /* locked, go wait for unlock */
+			break;
+
+		/* not locked, but pending, wait until we observe the lock */
+		cpu_relax();
+	}
+
+	/* any unlock is good */
+	while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
+		cpu_relax();
+
+done:
+	smp_rmb();
+}
+EXPORT_SYMBOL(queued_spin_unlock_wait);
+
+/*
  * Generate the paravirt code for queued_spin_unlock_slowpath().
  */
 #if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS)