[v3,06/10] powerpc/opal: Rework the opal-async interface

Message ID 20170712042304.19745-7-cyrilbur@gmail.com
State New
Headers show

Commit Message

Cyril Bur July 12, 2017, 4:23 a.m.
Future work will add an opal_async_wait_response_interruptible()
which will call wait_event_interruptible(). This work requires extra
token state to be tracked as wait_event_interruptible() can return and
the caller could release the token before OPAL responds.

Currently token state is tracked with two bitfields which are 64 bits
big but may not need to be as OPAL informs Linux how many async tokens
there are. It also uses an array indexed by token to store response
messages for each token.

The bitfields make it difficult to add more state and also provide a
hard maximum as to how many tokens there can be - it is possible that
OPAL will inform Linux that there are more than 64 tokens.

Rather than add a bitfield to track the extra state, rework the
internals slightly.

Signed-off-by: Cyril Bur <cyrilbur@gmail.com>
---
 arch/powerpc/platforms/powernv/opal-async.c | 97 ++++++++++++++++-------------
 1 file changed, 53 insertions(+), 44 deletions(-)

Comments

Balbir Singh July 17, 2017, 11:30 a.m. | #1
On Wed, 2017-07-12 at 14:23 +1000, Cyril Bur wrote:
> Future work will add an opal_async_wait_response_interruptible()
> which will call wait_event_interruptible(). This work requires extra
> token state to be tracked as wait_event_interruptible() can return and
> the caller could release the token before OPAL responds.
> 
> Currently token state is tracked with two bitfields which are 64 bits
> big but may not need to be as OPAL informs Linux how many async tokens
> there are. It also uses an array indexed by token to store response
> messages for each token.
> 
> The bitfields make it difficult to add more state and also provide a
> hard maximum as to how many tokens there can be - it is possible that
> OPAL will inform Linux that there are more than 64 tokens.
> 
> Rather than add a bitfield to track the extra state, rework the
> internals slightly.
> 
> Signed-off-by: Cyril Bur <cyrilbur@gmail.com>
> ---
>  arch/powerpc/platforms/powernv/opal-async.c | 97 ++++++++++++++++-------------
>  1 file changed, 53 insertions(+), 44 deletions(-)
> 
> diff --git a/arch/powerpc/platforms/powernv/opal-async.c b/arch/powerpc/platforms/powernv/opal-async.c
> index 1d56ac9da347..d692372a0363 100644
> --- a/arch/powerpc/platforms/powernv/opal-async.c
> +++ b/arch/powerpc/platforms/powernv/opal-async.c
> @@ -1,7 +1,7 @@
>  /*
>   * PowerNV OPAL asynchronous completion interfaces
>   *
> - * Copyright 2013 IBM Corp.
> + * Copyright 2013-2017 IBM Corp.
>   *
>   * This program is free software; you can redistribute it and/or
>   * modify it under the terms of the GNU General Public License
> @@ -23,40 +23,46 @@
>  #include <asm/machdep.h>
>  #include <asm/opal.h>
>  
> -#define N_ASYNC_COMPLETIONS	64
> +enum opal_async_token_state {
> +	ASYNC_TOKEN_FREE,
> +	ASYNC_TOKEN_ALLOCATED,
> +	ASYNC_TOKEN_COMPLETED
> +};

Are these states mutually exclusive? Does _COMPLETED imply that it is also
_ALLOCATED? ALLOCATED and FREE are confusing, I would use IN_USE and NOT_IN_USE
for tokens. If these are mutually exclusive then you can use IN_USE and !IN_USE

> +
> +struct opal_async_token {
> +	enum opal_async_token_state state;
> +	struct opal_msg response;
> +};
>  
> -static DECLARE_BITMAP(opal_async_complete_map, N_ASYNC_COMPLETIONS) = {~0UL};
> -static DECLARE_BITMAP(opal_async_token_map, N_ASYNC_COMPLETIONS);
>  static DECLARE_WAIT_QUEUE_HEAD(opal_async_wait);
>  static DEFINE_SPINLOCK(opal_async_comp_lock);
>  static struct semaphore opal_async_sem;
> -static struct opal_msg *opal_async_responses;
>  static unsigned int opal_max_async_tokens;
> +static struct opal_async_token *opal_async_tokens;
>  
>  static int __opal_async_get_token(void)
>  {
>  	unsigned long flags;
>  	int token;
>  
> -	spin_lock_irqsave(&opal_async_comp_lock, flags);
> -	token = find_first_bit(opal_async_complete_map, opal_max_async_tokens);
> -	if (token >= opal_max_async_tokens) {
> -		token = -EBUSY;
> -		goto out;
> -	}
> -
> -	if (__test_and_set_bit(token, opal_async_token_map)) {
> -		token = -EBUSY;
> -		goto out;
> +	for (token = 0; token < opal_max_async_tokens; token++) {
> +		spin_lock_irqsave(&opal_async_comp_lock, flags);

Why is the spin lock inside the for loop? If the last token is free, the
number of times we'll take and release a lock is extensive, why are we
doing it this way?

> +		if (opal_async_tokens[token].state == ASYNC_TOKEN_FREE) {
> +			opal_async_tokens[token].state = ASYNC_TOKEN_ALLOCATED;
> +			spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> +			return token;
> +		}
> +		spin_unlock_irqrestore(&opal_async_comp_lock, flags);
>  	}
>  
> -	__clear_bit(token, opal_async_complete_map);
> -
> -out:
> -	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> -	return token;
> +	return -EBUSY;
>  }
>  
> +/*
> + * Note: If the returned token is used in an opal call and opal returns
> + * OPAL_ASYNC_COMPLETION you MUST opal_async_wait_response() before
> + * calling another other opal_async_* function
> + */
>  int opal_async_get_token_interruptible(void)
>  {
>  	int token;
> @@ -76,6 +82,7 @@ EXPORT_SYMBOL_GPL(opal_async_get_token_interruptible);
>  static int __opal_async_release_token(int token)
>  {
>  	unsigned long flags;
> +	int rc;
>  
>  	if (token < 0 || token >= opal_max_async_tokens) {
>  		pr_err("%s: Passed token is out of range, token %d\n",
> @@ -84,11 +91,18 @@ static int __opal_async_release_token(int token)
>  	}
>  
>  	spin_lock_irqsave(&opal_async_comp_lock, flags);
> -	__set_bit(token, opal_async_complete_map);
> -	__clear_bit(token, opal_async_token_map);
> +	switch (opal_async_tokens[token].state) {
> +	case ASYNC_TOKEN_COMPLETED:
> +	case ASYNC_TOKEN_ALLOCATED:
> +		opal_async_tokens[token].state = ASYNC_TOKEN_FREE;

So we can go from

_COMPLETED | _ALLOCATED to _FREE on release_token, why would be release
an _ALLOCATED token, in the case the callback is not really called?

> +		rc = 0;
> +		break;
> +	default:
> +		rc = 1;
> +	}
>  	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
>  
> -	return 0;
> +	return rc;
>  }
>  
>  int opal_async_release_token(int token)
> @@ -96,12 +110,10 @@ int opal_async_release_token(int token)
>  	int ret;
>  
>  	ret = __opal_async_release_token(token);
> -	if (ret)
> -		return ret;
> -
> -	up(&opal_async_sem);
> +	if (!ret)
> +		up(&opal_async_sem);

So we up the semaphore only if we made a transition and freed the token, right?
What happens otherwise?

>  
> -	return 0;
> +	return ret;
>  }
>  EXPORT_SYMBOL_GPL(opal_async_release_token);
>  
> @@ -122,13 +134,15 @@ int opal_async_wait_response(uint64_t token, struct opal_msg *msg)
>  	 * functional.
>  	 */
>  	opal_wake_poller();
> -	wait_event(opal_async_wait, test_bit(token, opal_async_complete_map));
> -	memcpy(msg, &opal_async_responses[token], sizeof(*msg));
> +	wait_event(opal_async_wait, opal_async_tokens[token].state
> +			== ASYNC_TOKEN_COMPLETED);

Since wait_event is a macro, I'd recommend parenthesis around the second
argument. I think there is also an inbuilt assumption that the barriers
in schedule() called by wait_event() will make the write to the token
state visible.

> +	memcpy(msg, &opal_async_tokens[token].response, sizeof(*msg));
>  
>  	return 0;
>  }
>  EXPORT_SYMBOL_GPL(opal_async_wait_response);
>  
> +/* Called from interrupt context */
>  static int opal_async_comp_event(struct notifier_block *nb,
>  		unsigned long msg_type, void *msg)
>  {
> @@ -140,9 +154,9 @@ static int opal_async_comp_event(struct notifier_block *nb,
>  		return 0;
>  
>  	token = be64_to_cpu(comp_msg->params[0]);
> -	memcpy(&opal_async_responses[token], comp_msg, sizeof(*comp_msg));
> +	memcpy(&opal_async_tokens[token].response, comp_msg, sizeof(*comp_msg));
>  	spin_lock_irqsave(&opal_async_comp_lock, flags);
> -	__set_bit(token, opal_async_complete_map);
> +	opal_async_tokens[token].state = ASYNC_TOKEN_COMPLETED;
>  	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
>  
>  	wake_up(&opal_async_wait);
> @@ -178,24 +192,19 @@ int __init opal_async_comp_init(void)
>  	}
>  
>  	opal_max_async_tokens = be32_to_cpup(async);
> -	if (opal_max_async_tokens > N_ASYNC_COMPLETIONS)
> -		opal_max_async_tokens = N_ASYNC_COMPLETIONS;
> +	opal_async_tokens = kcalloc(opal_max_async_tokens,
> +			sizeof(*opal_async_tokens), GFP_KERNEL);
> +	if (!opal_async_tokens) {
> +		err = -ENOMEM;
> +		goto out_opal_node;
> +	}
>  
>  	err = opal_message_notifier_register(OPAL_MSG_ASYNC_COMP,
>  			&opal_async_comp_nb);
>  	if (err) {
>  		pr_err("%s: Can't register OPAL event notifier (%d)\n",
>  				__func__, err);
> -		goto out_opal_node;
> -	}
> -
> -	opal_async_responses = kzalloc(
> -			sizeof(*opal_async_responses) * opal_max_async_tokens,
> -			GFP_KERNEL);
> -	if (!opal_async_responses) {
> -		pr_err("%s: Out of memory, failed to do asynchronous "
> -				"completion init\n", __func__);
> -		err = -ENOMEM;
> +		kfree(opal_async_tokens);
>  		goto out_opal_node;
>  	}
>  

Balbir Singh.
Cyril Bur July 18, 2017, 12:40 a.m. | #2
On Mon, 2017-07-17 at 21:30 +1000, Balbir Singh wrote:
> On Wed, 2017-07-12 at 14:23 +1000, Cyril Bur wrote:
> > Future work will add an opal_async_wait_response_interruptible()
> > which will call wait_event_interruptible(). This work requires extra
> > token state to be tracked as wait_event_interruptible() can return and
> > the caller could release the token before OPAL responds.
> > 
> > Currently token state is tracked with two bitfields which are 64 bits
> > big but may not need to be as OPAL informs Linux how many async tokens
> > there are. It also uses an array indexed by token to store response
> > messages for each token.
> > 
> > The bitfields make it difficult to add more state and also provide a
> > hard maximum as to how many tokens there can be - it is possible that
> > OPAL will inform Linux that there are more than 64 tokens.
> > 
> > Rather than add a bitfield to track the extra state, rework the
> > internals slightly.
> > 
> > Signed-off-by: Cyril Bur <cyrilbur@gmail.com>
> > ---
> >  arch/powerpc/platforms/powernv/opal-async.c | 97 ++++++++++++++++-------------
> >  1 file changed, 53 insertions(+), 44 deletions(-)
> > 
> > diff --git a/arch/powerpc/platforms/powernv/opal-async.c b/arch/powerpc/platforms/powernv/opal-async.c
> > index 1d56ac9da347..d692372a0363 100644
> > --- a/arch/powerpc/platforms/powernv/opal-async.c
> > +++ b/arch/powerpc/platforms/powernv/opal-async.c
> > @@ -1,7 +1,7 @@
> >  /*
> >   * PowerNV OPAL asynchronous completion interfaces
> >   *
> > - * Copyright 2013 IBM Corp.
> > + * Copyright 2013-2017 IBM Corp.
> >   *
> >   * This program is free software; you can redistribute it and/or
> >   * modify it under the terms of the GNU General Public License
> > @@ -23,40 +23,46 @@
> >  #include <asm/machdep.h>
> >  #include <asm/opal.h>
> >  
> > -#define N_ASYNC_COMPLETIONS	64
> > +enum opal_async_token_state {
> > +	ASYNC_TOKEN_FREE,
> > +	ASYNC_TOKEN_ALLOCATED,
> > +	ASYNC_TOKEN_COMPLETED
> > +};
> 
> Are these states mutually exclusive? Does _COMPLETED imply that it is also
> _ALLOCATED? 

Yes

> ALLOCATED and FREE are confusing, I would use IN_USE and NOT_IN_USE
> for tokens. If these are mutually exclusive then you can use IN_USE and !IN_USE
> 

Perhaps instead of _FREE it could be _UNALLOCATED ?

> > +
> > +struct opal_async_token {
> > +	enum opal_async_token_state state;
> > +	struct opal_msg response;
> > +};
> >  
> > -static DECLARE_BITMAP(opal_async_complete_map, N_ASYNC_COMPLETIONS) = {~0UL};
> > -static DECLARE_BITMAP(opal_async_token_map, N_ASYNC_COMPLETIONS);
> >  static DECLARE_WAIT_QUEUE_HEAD(opal_async_wait);
> >  static DEFINE_SPINLOCK(opal_async_comp_lock);
> >  static struct semaphore opal_async_sem;
> > -static struct opal_msg *opal_async_responses;
> >  static unsigned int opal_max_async_tokens;
> > +static struct opal_async_token *opal_async_tokens;
> >  
> >  static int __opal_async_get_token(void)
> >  {
> >  	unsigned long flags;
> >  	int token;
> >  
> > -	spin_lock_irqsave(&opal_async_comp_lock, flags);
> > -	token = find_first_bit(opal_async_complete_map, opal_max_async_tokens);
> > -	if (token >= opal_max_async_tokens) {
> > -		token = -EBUSY;
> > -		goto out;
> > -	}
> > -
> > -	if (__test_and_set_bit(token, opal_async_token_map)) {
> > -		token = -EBUSY;
> > -		goto out;
> > +	for (token = 0; token < opal_max_async_tokens; token++) {
> > +		spin_lock_irqsave(&opal_async_comp_lock, flags);
> 
> Why is the spin lock inside the for loop? If the last token is free, the
> number of times we'll take and release a lock is extensive, why are we
> doing it this way?
> 

Otherwise we might hold the lock for quite some time. At the moment I
think it isn't a bit deal since OPAL gives 8 but there is current work
to increase that number and while it seems the number might only grow
to 16, for a while it was looking like it might grow more.

In a previous iteration I had a check inside the loop but outside the
lock for if (token == ASYNC_TOKEN_FREE) which would then proceed to
take the lock, check again and mark it allocated...

Or I could put the lock around the loop, I'm not attached to any
particular approach.

> > +		if (opal_async_tokens[token].state == ASYNC_TOKEN_FREE) {
> > +			opal_async_tokens[token].state = ASYNC_TOKEN_ALLOCATED;
> > +			spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> > +			return token;
> > +		}
> > +		spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> >  	}
> >  
> > -	__clear_bit(token, opal_async_complete_map);
> > -
> > -out:
> > -	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> > -	return token;
> > +	return -EBUSY;
> >  }
> >  
> > +/*
> > + * Note: If the returned token is used in an opal call and opal returns
> > + * OPAL_ASYNC_COMPLETION you MUST opal_async_wait_response() before
> > + * calling another other opal_async_* function
> > + */
> >  int opal_async_get_token_interruptible(void)
> >  {
> >  	int token;
> > @@ -76,6 +82,7 @@ EXPORT_SYMBOL_GPL(opal_async_get_token_interruptible);
> >  static int __opal_async_release_token(int token)
> >  {
> >  	unsigned long flags;
> > +	int rc;
> >  
> >  	if (token < 0 || token >= opal_max_async_tokens) {
> >  		pr_err("%s: Passed token is out of range, token %d\n",
> > @@ -84,11 +91,18 @@ static int __opal_async_release_token(int token)
> >  	}
> >  
> >  	spin_lock_irqsave(&opal_async_comp_lock, flags);
> > -	__set_bit(token, opal_async_complete_map);
> > -	__clear_bit(token, opal_async_token_map);
> > +	switch (opal_async_tokens[token].state) {
> > +	case ASYNC_TOKEN_COMPLETED:
> > +	case ASYNC_TOKEN_ALLOCATED:
> > +		opal_async_tokens[token].state = ASYNC_TOKEN_FREE;
> 
> So we can go from
> 
> _COMPLETED | _ALLOCATED to _FREE on release_token, why would be release
> an _ALLOCATED token, in the case the callback is not really called?
> 

If the OPAL call fails, the caller can either choose to retry the OPAL
call or just abandon, if it abandons then it must free the token (which
will never complete since the OPAL call failed).

> > +		rc = 0;
> > +		break;
> > +	default:
> > +		rc = 1;
> > +	}
> >  	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> >  
> > -	return 0;
> > +	return rc;
> >  }
> >  
> >  int opal_async_release_token(int token)
> > @@ -96,12 +110,10 @@ int opal_async_release_token(int token)
> >  	int ret;
> >  
> >  	ret = __opal_async_release_token(token);
> > -	if (ret)
> > -		return ret;
> > -
> > -	up(&opal_async_sem);
> > +	if (!ret)
> > +		up(&opal_async_sem);
> 
> So we up the semaphore only if we made a transition and freed the token, right?
> What happens otherwise?
> 
> >  
> > -	return 0;
> > +	return ret;
> >  }
> >  EXPORT_SYMBOL_GPL(opal_async_release_token);
> >  
> > @@ -122,13 +134,15 @@ int opal_async_wait_response(uint64_t token, struct opal_msg *msg)
> >  	 * functional.
> >  	 */
> >  	opal_wake_poller();
> > -	wait_event(opal_async_wait, test_bit(token, opal_async_complete_map));
> > -	memcpy(msg, &opal_async_responses[token], sizeof(*msg));
> > +	wait_event(opal_async_wait, opal_async_tokens[token].state
> > +			== ASYNC_TOKEN_COMPLETED);
> 
> Since wait_event is a macro, I'd recommend parenthesis around the second
> argument. I think there is also an inbuilt assumption that the barriers
> in schedule() called by wait_event() will make the write to the token
> state visible.

Yes good point.

> 
> > +	memcpy(msg, &opal_async_tokens[token].response, sizeof(*msg));
> >  
> >  	return 0;
> >  }
> >  EXPORT_SYMBOL_GPL(opal_async_wait_response);
> >  
> > +/* Called from interrupt context */
> >  static int opal_async_comp_event(struct notifier_block *nb,
> >  		unsigned long msg_type, void *msg)
> >  {
> > @@ -140,9 +154,9 @@ static int opal_async_comp_event(struct notifier_block *nb,
> >  		return 0;
> >  
> >  	token = be64_to_cpu(comp_msg->params[0]);
> > -	memcpy(&opal_async_responses[token], comp_msg, sizeof(*comp_msg));
> > +	memcpy(&opal_async_tokens[token].response, comp_msg, sizeof(*comp_msg));
> >  	spin_lock_irqsave(&opal_async_comp_lock, flags);
> > -	__set_bit(token, opal_async_complete_map);
> > +	opal_async_tokens[token].state = ASYNC_TOKEN_COMPLETED;
> >  	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
> >  
> >  	wake_up(&opal_async_wait);
> > @@ -178,24 +192,19 @@ int __init opal_async_comp_init(void)
> >  	}
> >  
> >  	opal_max_async_tokens = be32_to_cpup(async);
> > -	if (opal_max_async_tokens > N_ASYNC_COMPLETIONS)
> > -		opal_max_async_tokens = N_ASYNC_COMPLETIONS;
> > +	opal_async_tokens = kcalloc(opal_max_async_tokens,
> > +			sizeof(*opal_async_tokens), GFP_KERNEL);
> > +	if (!opal_async_tokens) {
> > +		err = -ENOMEM;
> > +		goto out_opal_node;
> > +	}
> >  
> >  	err = opal_message_notifier_register(OPAL_MSG_ASYNC_COMP,
> >  			&opal_async_comp_nb);
> >  	if (err) {
> >  		pr_err("%s: Can't register OPAL event notifier (%d)\n",
> >  				__func__, err);
> > -		goto out_opal_node;
> > -	}
> > -
> > -	opal_async_responses = kzalloc(
> > -			sizeof(*opal_async_responses) * opal_max_async_tokens,
> > -			GFP_KERNEL);
> > -	if (!opal_async_responses) {
> > -		pr_err("%s: Out of memory, failed to do asynchronous "
> > -				"completion init\n", __func__);
> > -		err = -ENOMEM;
> > +		kfree(opal_async_tokens);
> >  		goto out_opal_node;
> >  	}
> >  
> 
> Balbir Singh.
>
Michael Ellerman July 18, 2017, 3:20 a.m. | #3
Cyril Bur <cyrilbur@gmail.com> writes:
> On Mon, 2017-07-17 at 21:30 +1000, Balbir Singh wrote:
>> On Wed, 2017-07-12 at 14:23 +1000, Cyril Bur wrote:
>> >  static int __opal_async_get_token(void)
>> >  {
>> >  	unsigned long flags;
>> >  	int token;
>> >  
>> > -	spin_lock_irqsave(&opal_async_comp_lock, flags);
>> > -	token = find_first_bit(opal_async_complete_map, opal_max_async_tokens);
>> > -	if (token >= opal_max_async_tokens) {
>> > -		token = -EBUSY;
>> > -		goto out;
>> > -	}
>> > -
>> > -	if (__test_and_set_bit(token, opal_async_token_map)) {
>> > -		token = -EBUSY;
>> > -		goto out;
>> > +	for (token = 0; token < opal_max_async_tokens; token++) {
>> > +		spin_lock_irqsave(&opal_async_comp_lock, flags);
>> 
>> Why is the spin lock inside the for loop? If the last token is free, the
>> number of times we'll take and release a lock is extensive, why are we
>> doing it this way?
>
> Otherwise we might hold the lock for quite some time.

No we won't. A loop over a few 10s or 100s of tokens is not going to
take long, compared to the overhead of taking the lock every iteration
through the loop.

If the linear search with the lock held is a bottle neck, then you can
keep a hint variable which tracks the last freed token and start
searching from there.

cheers

Patch

diff --git a/arch/powerpc/platforms/powernv/opal-async.c b/arch/powerpc/platforms/powernv/opal-async.c
index 1d56ac9da347..d692372a0363 100644
--- a/arch/powerpc/platforms/powernv/opal-async.c
+++ b/arch/powerpc/platforms/powernv/opal-async.c
@@ -1,7 +1,7 @@ 
 /*
  * PowerNV OPAL asynchronous completion interfaces
  *
- * Copyright 2013 IBM Corp.
+ * Copyright 2013-2017 IBM Corp.
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -23,40 +23,46 @@ 
 #include <asm/machdep.h>
 #include <asm/opal.h>
 
-#define N_ASYNC_COMPLETIONS	64
+enum opal_async_token_state {
+	ASYNC_TOKEN_FREE,
+	ASYNC_TOKEN_ALLOCATED,
+	ASYNC_TOKEN_COMPLETED
+};
+
+struct opal_async_token {
+	enum opal_async_token_state state;
+	struct opal_msg response;
+};
 
-static DECLARE_BITMAP(opal_async_complete_map, N_ASYNC_COMPLETIONS) = {~0UL};
-static DECLARE_BITMAP(opal_async_token_map, N_ASYNC_COMPLETIONS);
 static DECLARE_WAIT_QUEUE_HEAD(opal_async_wait);
 static DEFINE_SPINLOCK(opal_async_comp_lock);
 static struct semaphore opal_async_sem;
-static struct opal_msg *opal_async_responses;
 static unsigned int opal_max_async_tokens;
+static struct opal_async_token *opal_async_tokens;
 
 static int __opal_async_get_token(void)
 {
 	unsigned long flags;
 	int token;
 
-	spin_lock_irqsave(&opal_async_comp_lock, flags);
-	token = find_first_bit(opal_async_complete_map, opal_max_async_tokens);
-	if (token >= opal_max_async_tokens) {
-		token = -EBUSY;
-		goto out;
-	}
-
-	if (__test_and_set_bit(token, opal_async_token_map)) {
-		token = -EBUSY;
-		goto out;
+	for (token = 0; token < opal_max_async_tokens; token++) {
+		spin_lock_irqsave(&opal_async_comp_lock, flags);
+		if (opal_async_tokens[token].state == ASYNC_TOKEN_FREE) {
+			opal_async_tokens[token].state = ASYNC_TOKEN_ALLOCATED;
+			spin_unlock_irqrestore(&opal_async_comp_lock, flags);
+			return token;
+		}
+		spin_unlock_irqrestore(&opal_async_comp_lock, flags);
 	}
 
-	__clear_bit(token, opal_async_complete_map);
-
-out:
-	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
-	return token;
+	return -EBUSY;
 }
 
+/*
+ * Note: If the returned token is used in an opal call and opal returns
+ * OPAL_ASYNC_COMPLETION you MUST opal_async_wait_response() before
+ * calling another other opal_async_* function
+ */
 int opal_async_get_token_interruptible(void)
 {
 	int token;
@@ -76,6 +82,7 @@  EXPORT_SYMBOL_GPL(opal_async_get_token_interruptible);
 static int __opal_async_release_token(int token)
 {
 	unsigned long flags;
+	int rc;
 
 	if (token < 0 || token >= opal_max_async_tokens) {
 		pr_err("%s: Passed token is out of range, token %d\n",
@@ -84,11 +91,18 @@  static int __opal_async_release_token(int token)
 	}
 
 	spin_lock_irqsave(&opal_async_comp_lock, flags);
-	__set_bit(token, opal_async_complete_map);
-	__clear_bit(token, opal_async_token_map);
+	switch (opal_async_tokens[token].state) {
+	case ASYNC_TOKEN_COMPLETED:
+	case ASYNC_TOKEN_ALLOCATED:
+		opal_async_tokens[token].state = ASYNC_TOKEN_FREE;
+		rc = 0;
+		break;
+	default:
+		rc = 1;
+	}
 	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
 
-	return 0;
+	return rc;
 }
 
 int opal_async_release_token(int token)
@@ -96,12 +110,10 @@  int opal_async_release_token(int token)
 	int ret;
 
 	ret = __opal_async_release_token(token);
-	if (ret)
-		return ret;
-
-	up(&opal_async_sem);
+	if (!ret)
+		up(&opal_async_sem);
 
-	return 0;
+	return ret;
 }
 EXPORT_SYMBOL_GPL(opal_async_release_token);
 
@@ -122,13 +134,15 @@  int opal_async_wait_response(uint64_t token, struct opal_msg *msg)
 	 * functional.
 	 */
 	opal_wake_poller();
-	wait_event(opal_async_wait, test_bit(token, opal_async_complete_map));
-	memcpy(msg, &opal_async_responses[token], sizeof(*msg));
+	wait_event(opal_async_wait, opal_async_tokens[token].state
+			== ASYNC_TOKEN_COMPLETED);
+	memcpy(msg, &opal_async_tokens[token].response, sizeof(*msg));
 
 	return 0;
 }
 EXPORT_SYMBOL_GPL(opal_async_wait_response);
 
+/* Called from interrupt context */
 static int opal_async_comp_event(struct notifier_block *nb,
 		unsigned long msg_type, void *msg)
 {
@@ -140,9 +154,9 @@  static int opal_async_comp_event(struct notifier_block *nb,
 		return 0;
 
 	token = be64_to_cpu(comp_msg->params[0]);
-	memcpy(&opal_async_responses[token], comp_msg, sizeof(*comp_msg));
+	memcpy(&opal_async_tokens[token].response, comp_msg, sizeof(*comp_msg));
 	spin_lock_irqsave(&opal_async_comp_lock, flags);
-	__set_bit(token, opal_async_complete_map);
+	opal_async_tokens[token].state = ASYNC_TOKEN_COMPLETED;
 	spin_unlock_irqrestore(&opal_async_comp_lock, flags);
 
 	wake_up(&opal_async_wait);
@@ -178,24 +192,19 @@  int __init opal_async_comp_init(void)
 	}
 
 	opal_max_async_tokens = be32_to_cpup(async);
-	if (opal_max_async_tokens > N_ASYNC_COMPLETIONS)
-		opal_max_async_tokens = N_ASYNC_COMPLETIONS;
+	opal_async_tokens = kcalloc(opal_max_async_tokens,
+			sizeof(*opal_async_tokens), GFP_KERNEL);
+	if (!opal_async_tokens) {
+		err = -ENOMEM;
+		goto out_opal_node;
+	}
 
 	err = opal_message_notifier_register(OPAL_MSG_ASYNC_COMP,
 			&opal_async_comp_nb);
 	if (err) {
 		pr_err("%s: Can't register OPAL event notifier (%d)\n",
 				__func__, err);
-		goto out_opal_node;
-	}
-
-	opal_async_responses = kzalloc(
-			sizeof(*opal_async_responses) * opal_max_async_tokens,
-			GFP_KERNEL);
-	if (!opal_async_responses) {
-		pr_err("%s: Out of memory, failed to do asynchronous "
-				"completion init\n", __func__);
-		err = -ENOMEM;
+		kfree(opal_async_tokens);
 		goto out_opal_node;
 	}