diff mbox series

[1/1] io-wq: split bounded and unbounded work into separate lists

Message ID 20220406161954.832191-2-tjaalton@ubuntu.com
State New
Headers show
Series io_uring regression - lost write request | expand

Commit Message

Timo Aaltonen April 6, 2022, 4:19 p.m. UTC
From: Jens Axboe <axboe@kernel.dk>

BugLink: https://bugs.launchpad.net/bugs/1952222

We've got a few issues that all boil down to the fact that we have one
list of pending work items, yet two different types of workers to
serve them. This causes some oddities around workers switching type and
even hashed work vs regular work on the same bounded list.

Just separate them out cleanly, similarly to how we already do
accounting of what is running. That provides a clean separation and
removes some corner cases that can cause stalls when handling IO
that is punted to io-wq.

Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
Signed-off-by: Jens Axboe <axboe@kernel.dk>
(backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments)
Signed-off-by: Timo Aaltonen <timo.aaltonen@canonical.com>
---
 fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------
 1 file changed, 69 insertions(+), 89 deletions(-)

Comments

Stefan Bader April 13, 2022, 7:48 a.m. UTC | #1
On 06.04.22 18:19, Timo Aaltonen wrote:
> From: Jens Axboe <axboe@kernel.dk>
> 
> BugLink: https://bugs.launchpad.net/bugs/1952222
> 
> We've got a few issues that all boil down to the fact that we have one
> list of pending work items, yet two different types of workers to
> serve them. This causes some oddities around workers switching type and
> even hashed work vs regular work on the same bounded list.
> 
> Just separate them out cleanly, similarly to how we already do
> accounting of what is running. That provides a clean separation and
> removes some corner cases that can cause stalls when handling IO
> that is punted to io-wq.
> 
> Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments)
> Signed-off-by: Timo Aaltonen <timo.aaltonen@canonical.com>
Acked-by: Stefan Bader <stefan.bader@canonical.com>
> ---

Assuming this is more or less tested via oem and can be double checked when it 
lands in 5.13 kernels.

-Stefan

>   fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------
>   1 file changed, 69 insertions(+), 89 deletions(-)
> 
> diff --git a/fs/io-wq.c b/fs/io-wq.c
> index ba7aaf2b95d0..bded284a56d0 100644
> --- a/fs/io-wq.c
> +++ b/fs/io-wq.c
> @@ -34,7 +34,7 @@ enum {
>   };
>   
>   enum {
> -	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
> +	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
>   };
>   
>   /*
> @@ -73,25 +73,24 @@ struct io_wqe_acct {
>   	unsigned max_workers;
>   	int index;
>   	atomic_t nr_running;
> +	struct io_wq_work_list work_list;
> +	unsigned long flags;
>   };
>   
>   enum {
>   	IO_WQ_ACCT_BOUND,
>   	IO_WQ_ACCT_UNBOUND,
> +	IO_WQ_ACCT_NR,
>   };
>   
>   /*
>    * Per-node worker thread pool
>    */
>   struct io_wqe {
> -	struct {
> -		raw_spinlock_t lock;
> -		struct io_wq_work_list work_list;
> -		unsigned flags;
> -	} ____cacheline_aligned_in_smp;
> +	raw_spinlock_t lock;
> +	struct io_wqe_acct acct[2];
>   
>   	int node;
> -	struct io_wqe_acct acct[2];
>   
>   	struct hlist_nulls_head free_list;
>   	struct list_head all_list;
> @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker)
>   	do_exit(0);
>   }
>   
> -static inline bool io_wqe_run_queue(struct io_wqe *wqe)
> -	__must_hold(wqe->lock)
> +static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
>   {
> -	if (!wq_list_empty(&wqe->work_list) &&
> -	    !(wqe->flags & IO_WQE_FLAG_STALLED))
> +	if (!wq_list_empty(&acct->work_list) &&
> +	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
>   		return true;
>   	return false;
>   }
> @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
>    * Check head of free list for an available worker. If one isn't available,
>    * caller must create one.
>    */
> -static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
> +static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
> +					struct io_wqe_acct *acct)
>   	__must_hold(RCU)
>   {
>   	struct hlist_nulls_node *n;
> @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
>   	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
>   		if (!io_worker_get(worker))
>   			continue;
> +		if (io_wqe_get_acct(worker) != acct) {
> +			io_worker_release(worker);
> +			continue;
> +		}
>   		if (wake_up_process(worker->task)) {
>   			io_worker_release(worker);
>   			return true;
> @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
>   	if (!(worker->flags & IO_WORKER_F_UP))
>   		return;
>   
> -	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
> +	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
>   		atomic_inc(&acct->nr_running);
>   		atomic_inc(&wqe->wq->worker_refs);
>   		io_queue_worker_create(wqe, worker, acct);
> @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
>   			     struct io_wq_work *work)
>   	__must_hold(wqe->lock)
>   {
> -	bool worker_bound, work_bound;
> -
> -	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
> -
>   	if (worker->flags & IO_WORKER_F_FREE) {
>   		worker->flags &= ~IO_WORKER_F_FREE;
>   		hlist_nulls_del_init_rcu(&worker->nulls_node);
>   	}
> -
> -	/*
> -	 * If worker is moving from bound to unbound (or vice versa), then
> -	 * ensure we update the running accounting.
> -	 */
> -	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
> -	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
> -	if (worker_bound != work_bound) {
> -		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
> -		io_wqe_dec_running(worker);
> -		worker->flags ^= IO_WORKER_F_BOUND;
> -		wqe->acct[index].nr_workers--;
> -		wqe->acct[index ^ 1].nr_workers++;
> -		io_wqe_inc_running(worker);
> -	 }
>   }
>   
>   /*
> @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
>   	return ret;
>   }
>   
> -/*
> - * We can always run the work if the worker is currently the same type as
> - * the work (eg both are bound, or both are unbound). If they are not the
> - * same, only allow it if incrementing the worker count would be allowed.
> - */
> -static bool io_worker_can_run_work(struct io_worker *worker,
> -				   struct io_wq_work *work)
> -{
> -	struct io_wqe_acct *acct;
> -
> -	if (!(worker->flags & IO_WORKER_F_BOUND) !=
> -	    !(work->flags & IO_WQ_WORK_UNBOUND))
> -		return true;
> -
> -	/* not the same type, check if we'd go over the limit */
> -	acct = io_work_get_acct(worker->wqe, work);
> -	return acct->nr_workers < acct->max_workers;
> -}
> -
> -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
> +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
>   					   struct io_worker *worker)
>   	__must_hold(wqe->lock)
>   {
>   	struct io_wq_work_node *node, *prev;
>   	struct io_wq_work *work, *tail;
>   	unsigned int stall_hash = -1U;
> +	struct io_wqe *wqe = worker->wqe;
>   
> -	wq_list_for_each(node, prev, &wqe->work_list) {
> +	wq_list_for_each(node, prev, &acct->work_list) {
>   		unsigned int hash;
>   
>   		work = container_of(node, struct io_wq_work, list);
>   
> -		if (!io_worker_can_run_work(worker, work))
> -			break;
> -
>   		/* not hashed, can run anytime */
>   		if (!io_wq_is_hashed(work)) {
> -			wq_list_del(&wqe->work_list, node, prev);
> +			wq_list_del(&acct->work_list, node, prev);
>   			return work;
>   		}
>   
> @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
>   		/* hashed, can run if not already running */
>   		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
>   			wqe->hash_tail[hash] = NULL;
> -			wq_list_cut(&wqe->work_list, &tail->list, prev);
> +			wq_list_cut(&acct->work_list, &tail->list, prev);
>   			return work;
>   		}
>   		if (stall_hash == -1U)
> @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
>   		 * Set this before dropping the lock to avoid racing with new
>   		 * work being added and clearing the stalled bit.
>   		 */
> -		wqe->flags |= IO_WQE_FLAG_STALLED;
> +		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   		raw_spin_unlock(&wqe->lock);
>   		unstalled = io_wait_on_hash(wqe, stall_hash);
>   		raw_spin_lock(&wqe->lock);
>   		if (unstalled) {
> -			wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   			if (wq_has_sleeper(&wqe->wq->hash->wait))
>   				wake_up(&wqe->wq->hash->wait);
>   		}
> @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
>   static void io_worker_handle_work(struct io_worker *worker)
>   	__releases(wqe->lock)
>   {
> +	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>   	struct io_wqe *wqe = worker->wqe;
>   	struct io_wq *wq = wqe->wq;
>   	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
> @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   		 * can't make progress, any work completion or insertion will
>   		 * clear the stalled flag.
>   		 */
> -		work = io_get_next_work(wqe, worker);
> +		work = io_get_next_work(acct, worker);
>   		if (work)
>   			__io_worker_busy(wqe, worker, work);
>   
> @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   				/* serialize hash clear with wake_up() */
>   				spin_lock_irq(&wq->hash->wait.lock);
>   				clear_bit(hash, &wq->hash->map);
> -				wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   				spin_unlock_irq(&wq->hash->wait.lock);
>   				if (wq_has_sleeper(&wq->hash->wait))
>   					wake_up(&wq->hash->wait);
> @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   static int io_wqe_worker(void *data)
>   {
>   	struct io_worker *worker = data;
> +	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>   	struct io_wqe *wqe = worker->wqe;
>   	struct io_wq *wq = wqe->wq;
>   	char buf[TASK_COMM_LEN];
> @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data)
>   		set_current_state(TASK_INTERRUPTIBLE);
>   loop:
>   		raw_spin_lock_irq(&wqe->lock);
> -		if (io_wqe_run_queue(wqe)) {
> +		if (io_acct_run_queue(acct)) {
>   			io_worker_handle_work(worker);
>   			goto loop;
>   		}
> @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data)
>   
>   	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
>   		raw_spin_lock_irq(&wqe->lock);
> -		if (!wq_list_empty(&wqe->work_list))
> +		if (!wq_list_empty(&acct->work_list))
>   			io_worker_handle_work(worker);
>   		else
>   			raw_spin_unlock_irq(&wqe->lock);
> @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
>   
>   static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
>   {
> +	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
>   	unsigned int hash;
>   	struct io_wq_work *tail;
>   
>   	if (!io_wq_is_hashed(work)) {
>   append:
> -		wq_list_add_tail(&work->list, &wqe->work_list);
> +		wq_list_add_tail(&work->list, &acct->work_list);
>   		return;
>   	}
>   
> @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
>   	if (!tail)
>   		goto append;
>   
> -	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
> +	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
>   }
>   
>   static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
> @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
>   
>   	raw_spin_lock_irqsave(&wqe->lock, flags);
>   	io_wqe_insert_work(wqe, work);
> -	wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   
>   	rcu_read_lock();
> -	do_create = !io_wqe_activate_free_worker(wqe);
> +	do_create = !io_wqe_activate_free_worker(wqe, acct);
>   	rcu_read_unlock();
>   
>   	raw_spin_unlock_irqrestore(&wqe->lock, flags);
> @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
>   					 struct io_wq_work *work,
>   					 struct io_wq_work_node *prev)
>   {
> +	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
>   	unsigned int hash = io_get_work_hash(work);
>   	struct io_wq_work *prev_work = NULL;
>   
> @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
>   		else
>   			wqe->hash_tail[hash] = NULL;
>   	}
> -	wq_list_del(&wqe->work_list, &work->list, prev);
> +	wq_list_del(&acct->work_list, &work->list, prev);
>   }
>   
>   static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
> @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
>   	struct io_wq_work_node *node, *prev;
>   	struct io_wq_work *work;
>   	unsigned long flags;
> +	int i;
>   
>   retry:
>   	raw_spin_lock_irqsave(&wqe->lock, flags);
> -	wq_list_for_each(node, prev, &wqe->work_list) {
> -		work = container_of(node, struct io_wq_work, list);
> -		if (!match->fn(work, match->data))
> -			continue;
> -		io_wqe_remove_pending(wqe, work, prev);
> -		raw_spin_unlock_irqrestore(&wqe->lock, flags);
> -		io_run_cancel(work, wqe);
> -		match->nr_pending++;
> -		if (!match->cancel_all)
> -			return;
> +	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
>   
> -		/* not safe to continue after unlock */
> -		goto retry;
> +		wq_list_for_each(node, prev, &acct->work_list) {
> +			work = container_of(node, struct io_wq_work, list);
> +			if (!match->fn(work, match->data))
> +				continue;
> +			io_wqe_remove_pending(wqe, work, prev);
> +			raw_spin_unlock_irqrestore(&wqe->lock, flags);
> +			io_run_cancel(work, wqe);
> +			match->nr_pending++;
> +			if (!match->cancel_all)
> +				return;
> +
> +			/* not safe to continue after unlock */
> +			goto retry;
> +		}
>   	}
>   	raw_spin_unlock_irqrestore(&wqe->lock, flags);
>   }
> @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
>   			    int sync, void *key)
>   {
>   	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
> +	int i;
>   
>   	list_del_init(&wait->entry);
>   
>   	rcu_read_lock();
> -	io_wqe_activate_free_worker(wqe);
> +	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +		struct io_wqe_acct *acct = &wqe->acct[i];
> +
> +		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
> +			io_wqe_activate_free_worker(wqe, acct);
> +	}
>   	rcu_read_unlock();
>   	return 1;
>   }
>   
>   struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
>   {
> -	int ret = -ENOMEM, node;
> +	int ret = -ENOMEM, node, i;
>   	struct io_wq *wq;
>   
>   	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
> @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
>   			goto err;
>   		wq->wqes[node] = wqe;
>   		wqe->node = alloc_node;
> -		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
> -		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
>   		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
> -		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
>   		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
>   					task_rlimit(current, RLIMIT_NPROC);
> -		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
> -		wqe->wait.func = io_wqe_hash_wake;
>   		INIT_LIST_HEAD(&wqe->wait.entry);
> +		wqe->wait.func = io_wqe_hash_wake;
> +		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +			struct io_wqe_acct *acct = &wqe->acct[i];
> +
> +			acct->index = i;
> +			atomic_set(&acct->nr_running, 0);
> +			INIT_WQ_LIST(&acct->work_list);
> +		}
>   		wqe->wq = wq;
>   		raw_spin_lock_init(&wqe->lock);
> -		INIT_WQ_LIST(&wqe->work_list);
>   		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
>   		INIT_LIST_HEAD(&wqe->all_list);
>   	}
Kleber Sacilotto de Souza April 13, 2022, 10:13 a.m. UTC | #2
On 06.04.22 18:19, Timo Aaltonen wrote:
> From: Jens Axboe <axboe@kernel.dk>
> 
> BugLink: https://bugs.launchpad.net/bugs/1952222
> 
> We've got a few issues that all boil down to the fact that we have one
> list of pending work items, yet two different types of workers to
> serve them. This causes some oddities around workers switching type and
> even hashed work vs regular work on the same bounded list.
> 
> Just separate them out cleanly, similarly to how we already do
> accounting of what is running. That provides a clean separation and
> removes some corner cases that can cause stalls when handling IO
> that is punted to io-wq.
> 
> Fixes: ecc53c48c13d ("io-wq: check max_worker limits if a worker transitions bound state")
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> (backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e; minor adjustments)

To follow the pattern, this should be added as something like:

(backported from commit f95dc207b93da9c88ddbb7741ec3730c6657b88e)
[ tjaalton: minor adjustments ]

Which can be fixed when applying the patch.

> Signed-off-by: Timo Aaltonen <timo.aaltonen@canonical.com>

Acked-by: Kleber Sacilotto de Souza <kleber.souza@canonical.com>

Thanks

> ---
>   fs/io-wq.c | 158 +++++++++++++++++++++++------------------------------
>   1 file changed, 69 insertions(+), 89 deletions(-)
> 
> diff --git a/fs/io-wq.c b/fs/io-wq.c
> index ba7aaf2b95d0..bded284a56d0 100644
> --- a/fs/io-wq.c
> +++ b/fs/io-wq.c
> @@ -34,7 +34,7 @@ enum {
>   };
>   
>   enum {
> -	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
> +	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
>   };
>   
>   /*
> @@ -73,25 +73,24 @@ struct io_wqe_acct {
>   	unsigned max_workers;
>   	int index;
>   	atomic_t nr_running;
> +	struct io_wq_work_list work_list;
> +	unsigned long flags;
>   };
>   
>   enum {
>   	IO_WQ_ACCT_BOUND,
>   	IO_WQ_ACCT_UNBOUND,
> +	IO_WQ_ACCT_NR,
>   };
>   
>   /*
>    * Per-node worker thread pool
>    */
>   struct io_wqe {
> -	struct {
> -		raw_spinlock_t lock;
> -		struct io_wq_work_list work_list;
> -		unsigned flags;
> -	} ____cacheline_aligned_in_smp;
> +	raw_spinlock_t lock;
> +	struct io_wqe_acct acct[2];
>   
>   	int node;
> -	struct io_wqe_acct acct[2];
>   
>   	struct hlist_nulls_head free_list;
>   	struct list_head all_list;
> @@ -196,11 +195,10 @@ static void io_worker_exit(struct io_worker *worker)
>   	do_exit(0);
>   }
>   
> -static inline bool io_wqe_run_queue(struct io_wqe *wqe)
> -	__must_hold(wqe->lock)
> +static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
>   {
> -	if (!wq_list_empty(&wqe->work_list) &&
> -	    !(wqe->flags & IO_WQE_FLAG_STALLED))
> +	if (!wq_list_empty(&acct->work_list) &&
> +	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
>   		return true;
>   	return false;
>   }
> @@ -209,7 +207,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
>    * Check head of free list for an available worker. If one isn't available,
>    * caller must create one.
>    */
> -static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
> +static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
> +					struct io_wqe_acct *acct)
>   	__must_hold(RCU)
>   {
>   	struct hlist_nulls_node *n;
> @@ -223,6 +222,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
>   	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
>   		if (!io_worker_get(worker))
>   			continue;
> +		if (io_wqe_get_acct(worker) != acct) {
> +			io_worker_release(worker);
> +			continue;
> +		}
>   		if (wake_up_process(worker->task)) {
>   			io_worker_release(worker);
>   			return true;
> @@ -341,7 +344,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
>   	if (!(worker->flags & IO_WORKER_F_UP))
>   		return;
>   
> -	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
> +	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
>   		atomic_inc(&acct->nr_running);
>   		atomic_inc(&wqe->wq->worker_refs);
>   		io_queue_worker_create(wqe, worker, acct);
> @@ -356,29 +359,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
>   			     struct io_wq_work *work)
>   	__must_hold(wqe->lock)
>   {
> -	bool worker_bound, work_bound;
> -
> -	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
> -
>   	if (worker->flags & IO_WORKER_F_FREE) {
>   		worker->flags &= ~IO_WORKER_F_FREE;
>   		hlist_nulls_del_init_rcu(&worker->nulls_node);
>   	}
> -
> -	/*
> -	 * If worker is moving from bound to unbound (or vice versa), then
> -	 * ensure we update the running accounting.
> -	 */
> -	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
> -	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
> -	if (worker_bound != work_bound) {
> -		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
> -		io_wqe_dec_running(worker);
> -		worker->flags ^= IO_WORKER_F_BOUND;
> -		wqe->acct[index].nr_workers--;
> -		wqe->acct[index ^ 1].nr_workers++;
> -		io_wqe_inc_running(worker);
> -	 }
>   }
>   
>   /*
> @@ -420,44 +404,23 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
>   	return ret;
>   }
>   
> -/*
> - * We can always run the work if the worker is currently the same type as
> - * the work (eg both are bound, or both are unbound). If they are not the
> - * same, only allow it if incrementing the worker count would be allowed.
> - */
> -static bool io_worker_can_run_work(struct io_worker *worker,
> -				   struct io_wq_work *work)
> -{
> -	struct io_wqe_acct *acct;
> -
> -	if (!(worker->flags & IO_WORKER_F_BOUND) !=
> -	    !(work->flags & IO_WQ_WORK_UNBOUND))
> -		return true;
> -
> -	/* not the same type, check if we'd go over the limit */
> -	acct = io_work_get_acct(worker->wqe, work);
> -	return acct->nr_workers < acct->max_workers;
> -}
> -
> -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
> +static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
>   					   struct io_worker *worker)
>   	__must_hold(wqe->lock)
>   {
>   	struct io_wq_work_node *node, *prev;
>   	struct io_wq_work *work, *tail;
>   	unsigned int stall_hash = -1U;
> +	struct io_wqe *wqe = worker->wqe;
>   
> -	wq_list_for_each(node, prev, &wqe->work_list) {
> +	wq_list_for_each(node, prev, &acct->work_list) {
>   		unsigned int hash;
>   
>   		work = container_of(node, struct io_wq_work, list);
>   
> -		if (!io_worker_can_run_work(worker, work))
> -			break;
> -
>   		/* not hashed, can run anytime */
>   		if (!io_wq_is_hashed(work)) {
> -			wq_list_del(&wqe->work_list, node, prev);
> +			wq_list_del(&acct->work_list, node, prev);
>   			return work;
>   		}
>   
> @@ -468,7 +431,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
>   		/* hashed, can run if not already running */
>   		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
>   			wqe->hash_tail[hash] = NULL;
> -			wq_list_cut(&wqe->work_list, &tail->list, prev);
> +			wq_list_cut(&acct->work_list, &tail->list, prev);
>   			return work;
>   		}
>   		if (stall_hash == -1U)
> @@ -484,12 +447,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
>   		 * Set this before dropping the lock to avoid racing with new
>   		 * work being added and clearing the stalled bit.
>   		 */
> -		wqe->flags |= IO_WQE_FLAG_STALLED;
> +		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   		raw_spin_unlock(&wqe->lock);
>   		unstalled = io_wait_on_hash(wqe, stall_hash);
>   		raw_spin_lock(&wqe->lock);
>   		if (unstalled) {
> -			wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   			if (wq_has_sleeper(&wqe->wq->hash->wait))
>   				wake_up(&wqe->wq->hash->wait);
>   		}
> @@ -526,6 +489,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
>   static void io_worker_handle_work(struct io_worker *worker)
>   	__releases(wqe->lock)
>   {
> +	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>   	struct io_wqe *wqe = worker->wqe;
>   	struct io_wq *wq = wqe->wq;
>   	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
> @@ -540,7 +504,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   		 * can't make progress, any work completion or insertion will
>   		 * clear the stalled flag.
>   		 */
> -		work = io_get_next_work(wqe, worker);
> +		work = io_get_next_work(acct, worker);
>   		if (work)
>   			__io_worker_busy(wqe, worker, work);
>   
> @@ -576,7 +540,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   				/* serialize hash clear with wake_up() */
>   				spin_lock_irq(&wq->hash->wait.lock);
>   				clear_bit(hash, &wq->hash->map);
> -				wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   				spin_unlock_irq(&wq->hash->wait.lock);
>   				if (wq_has_sleeper(&wq->hash->wait))
>   					wake_up(&wq->hash->wait);
> @@ -595,6 +559,7 @@ static void io_worker_handle_work(struct io_worker *worker)
>   static int io_wqe_worker(void *data)
>   {
>   	struct io_worker *worker = data;
> +	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
>   	struct io_wqe *wqe = worker->wqe;
>   	struct io_wq *wq = wqe->wq;
>   	char buf[TASK_COMM_LEN];
> @@ -610,7 +575,7 @@ static int io_wqe_worker(void *data)
>   		set_current_state(TASK_INTERRUPTIBLE);
>   loop:
>   		raw_spin_lock_irq(&wqe->lock);
> -		if (io_wqe_run_queue(wqe)) {
> +		if (io_acct_run_queue(acct)) {
>   			io_worker_handle_work(worker);
>   			goto loop;
>   		}
> @@ -636,7 +601,7 @@ static int io_wqe_worker(void *data)
>   
>   	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
>   		raw_spin_lock_irq(&wqe->lock);
> -		if (!wq_list_empty(&wqe->work_list))
> +		if (!wq_list_empty(&acct->work_list))
>   			io_worker_handle_work(worker);
>   		else
>   			raw_spin_unlock_irq(&wqe->lock);
> @@ -782,12 +747,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
>   
>   static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
>   {
> +	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
>   	unsigned int hash;
>   	struct io_wq_work *tail;
>   
>   	if (!io_wq_is_hashed(work)) {
>   append:
> -		wq_list_add_tail(&work->list, &wqe->work_list);
> +		wq_list_add_tail(&work->list, &acct->work_list);
>   		return;
>   	}
>   
> @@ -797,7 +763,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
>   	if (!tail)
>   		goto append;
>   
> -	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
> +	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
>   }
>   
>   static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
> @@ -819,10 +785,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
>   
>   	raw_spin_lock_irqsave(&wqe->lock, flags);
>   	io_wqe_insert_work(wqe, work);
> -	wqe->flags &= ~IO_WQE_FLAG_STALLED;
> +	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
>   
>   	rcu_read_lock();
> -	do_create = !io_wqe_activate_free_worker(wqe);
> +	do_create = !io_wqe_activate_free_worker(wqe, acct);
>   	rcu_read_unlock();
>   
>   	raw_spin_unlock_irqrestore(&wqe->lock, flags);
> @@ -875,6 +841,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
>   					 struct io_wq_work *work,
>   					 struct io_wq_work_node *prev)
>   {
> +	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
>   	unsigned int hash = io_get_work_hash(work);
>   	struct io_wq_work *prev_work = NULL;
>   
> @@ -886,7 +853,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
>   		else
>   			wqe->hash_tail[hash] = NULL;
>   	}
> -	wq_list_del(&wqe->work_list, &work->list, prev);
> +	wq_list_del(&acct->work_list, &work->list, prev);
>   }
>   
>   static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
> @@ -895,22 +862,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
>   	struct io_wq_work_node *node, *prev;
>   	struct io_wq_work *work;
>   	unsigned long flags;
> +	int i;
>   
>   retry:
>   	raw_spin_lock_irqsave(&wqe->lock, flags);
> -	wq_list_for_each(node, prev, &wqe->work_list) {
> -		work = container_of(node, struct io_wq_work, list);
> -		if (!match->fn(work, match->data))
> -			continue;
> -		io_wqe_remove_pending(wqe, work, prev);
> -		raw_spin_unlock_irqrestore(&wqe->lock, flags);
> -		io_run_cancel(work, wqe);
> -		match->nr_pending++;
> -		if (!match->cancel_all)
> -			return;
> +	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
>   
> -		/* not safe to continue after unlock */
> -		goto retry;
> +		wq_list_for_each(node, prev, &acct->work_list) {
> +			work = container_of(node, struct io_wq_work, list);
> +			if (!match->fn(work, match->data))
> +				continue;
> +			io_wqe_remove_pending(wqe, work, prev);
> +			raw_spin_unlock_irqrestore(&wqe->lock, flags);
> +			io_run_cancel(work, wqe);
> +			match->nr_pending++;
> +			if (!match->cancel_all)
> +				return;
> +
> +			/* not safe to continue after unlock */
> +			goto retry;
> +		}
>   	}
>   	raw_spin_unlock_irqrestore(&wqe->lock, flags);
>   }
> @@ -971,18 +943,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
>   			    int sync, void *key)
>   {
>   	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
> +	int i;
>   
>   	list_del_init(&wait->entry);
>   
>   	rcu_read_lock();
> -	io_wqe_activate_free_worker(wqe);
> +	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +		struct io_wqe_acct *acct = &wqe->acct[i];
> +
> +		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
> +			io_wqe_activate_free_worker(wqe, acct);
> +	}
>   	rcu_read_unlock();
>   	return 1;
>   }
>   
>   struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
>   {
> -	int ret = -ENOMEM, node;
> +	int ret = -ENOMEM, node, i;
>   	struct io_wq *wq;
>   
>   	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
> @@ -1019,18 +997,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
>   			goto err;
>   		wq->wqes[node] = wqe;
>   		wqe->node = alloc_node;
> -		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
> -		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
>   		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
> -		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
>   		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
>   					task_rlimit(current, RLIMIT_NPROC);
> -		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
> -		wqe->wait.func = io_wqe_hash_wake;
>   		INIT_LIST_HEAD(&wqe->wait.entry);
> +		wqe->wait.func = io_wqe_hash_wake;
> +		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
> +			struct io_wqe_acct *acct = &wqe->acct[i];
> +
> +			acct->index = i;
> +			atomic_set(&acct->nr_running, 0);
> +			INIT_WQ_LIST(&acct->work_list);
> +		}
>   		wqe->wq = wq;
>   		raw_spin_lock_init(&wqe->lock);
> -		INIT_WQ_LIST(&wqe->work_list);
>   		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
>   		INIT_LIST_HEAD(&wqe->all_list);
>   	}
diff mbox series

Patch

diff --git a/fs/io-wq.c b/fs/io-wq.c
index ba7aaf2b95d0..bded284a56d0 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -34,7 +34,7 @@  enum {
 };
 
 enum {
-	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
+	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
 };
 
 /*
@@ -73,25 +73,24 @@  struct io_wqe_acct {
 	unsigned max_workers;
 	int index;
 	atomic_t nr_running;
+	struct io_wq_work_list work_list;
+	unsigned long flags;
 };
 
 enum {
 	IO_WQ_ACCT_BOUND,
 	IO_WQ_ACCT_UNBOUND,
+	IO_WQ_ACCT_NR,
 };
 
 /*
  * Per-node worker thread pool
  */
 struct io_wqe {
-	struct {
-		raw_spinlock_t lock;
-		struct io_wq_work_list work_list;
-		unsigned flags;
-	} ____cacheline_aligned_in_smp;
+	raw_spinlock_t lock;
+	struct io_wqe_acct acct[2];
 
 	int node;
-	struct io_wqe_acct acct[2];
 
 	struct hlist_nulls_head free_list;
 	struct list_head all_list;
@@ -196,11 +195,10 @@  static void io_worker_exit(struct io_worker *worker)
 	do_exit(0);
 }
 
-static inline bool io_wqe_run_queue(struct io_wqe *wqe)
-	__must_hold(wqe->lock)
+static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
 {
-	if (!wq_list_empty(&wqe->work_list) &&
-	    !(wqe->flags & IO_WQE_FLAG_STALLED))
+	if (!wq_list_empty(&acct->work_list) &&
+	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
 		return true;
 	return false;
 }
@@ -209,7 +207,8 @@  static inline bool io_wqe_run_queue(struct io_wqe *wqe)
  * Check head of free list for an available worker. If one isn't available,
  * caller must create one.
  */
-static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
+static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
+					struct io_wqe_acct *acct)
 	__must_hold(RCU)
 {
 	struct hlist_nulls_node *n;
@@ -223,6 +222,10 @@  static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
 		if (!io_worker_get(worker))
 			continue;
+		if (io_wqe_get_acct(worker) != acct) {
+			io_worker_release(worker);
+			continue;
+		}
 		if (wake_up_process(worker->task)) {
 			io_worker_release(worker);
 			return true;
@@ -341,7 +344,7 @@  static void io_wqe_dec_running(struct io_worker *worker)
 	if (!(worker->flags & IO_WORKER_F_UP))
 		return;
 
-	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
+	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
 		atomic_inc(&acct->nr_running);
 		atomic_inc(&wqe->wq->worker_refs);
 		io_queue_worker_create(wqe, worker, acct);
@@ -356,29 +359,10 @@  static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
 			     struct io_wq_work *work)
 	__must_hold(wqe->lock)
 {
-	bool worker_bound, work_bound;
-
-	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
-
 	if (worker->flags & IO_WORKER_F_FREE) {
 		worker->flags &= ~IO_WORKER_F_FREE;
 		hlist_nulls_del_init_rcu(&worker->nulls_node);
 	}
-
-	/*
-	 * If worker is moving from bound to unbound (or vice versa), then
-	 * ensure we update the running accounting.
-	 */
-	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
-	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
-	if (worker_bound != work_bound) {
-		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
-		io_wqe_dec_running(worker);
-		worker->flags ^= IO_WORKER_F_BOUND;
-		wqe->acct[index].nr_workers--;
-		wqe->acct[index ^ 1].nr_workers++;
-		io_wqe_inc_running(worker);
-	 }
 }
 
 /*
@@ -420,44 +404,23 @@  static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
 	return ret;
 }
 
-/*
- * We can always run the work if the worker is currently the same type as
- * the work (eg both are bound, or both are unbound). If they are not the
- * same, only allow it if incrementing the worker count would be allowed.
- */
-static bool io_worker_can_run_work(struct io_worker *worker,
-				   struct io_wq_work *work)
-{
-	struct io_wqe_acct *acct;
-
-	if (!(worker->flags & IO_WORKER_F_BOUND) !=
-	    !(work->flags & IO_WQ_WORK_UNBOUND))
-		return true;
-
-	/* not the same type, check if we'd go over the limit */
-	acct = io_work_get_acct(worker->wqe, work);
-	return acct->nr_workers < acct->max_workers;
-}
-
-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
+static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 					   struct io_worker *worker)
 	__must_hold(wqe->lock)
 {
 	struct io_wq_work_node *node, *prev;
 	struct io_wq_work *work, *tail;
 	unsigned int stall_hash = -1U;
+	struct io_wqe *wqe = worker->wqe;
 
-	wq_list_for_each(node, prev, &wqe->work_list) {
+	wq_list_for_each(node, prev, &acct->work_list) {
 		unsigned int hash;
 
 		work = container_of(node, struct io_wq_work, list);
 
-		if (!io_worker_can_run_work(worker, work))
-			break;
-
 		/* not hashed, can run anytime */
 		if (!io_wq_is_hashed(work)) {
-			wq_list_del(&wqe->work_list, node, prev);
+			wq_list_del(&acct->work_list, node, prev);
 			return work;
 		}
 
@@ -468,7 +431,7 @@  static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
 		/* hashed, can run if not already running */
 		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
 			wqe->hash_tail[hash] = NULL;
-			wq_list_cut(&wqe->work_list, &tail->list, prev);
+			wq_list_cut(&acct->work_list, &tail->list, prev);
 			return work;
 		}
 		if (stall_hash == -1U)
@@ -484,12 +447,12 @@  static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
 		 * Set this before dropping the lock to avoid racing with new
 		 * work being added and clearing the stalled bit.
 		 */
-		wqe->flags |= IO_WQE_FLAG_STALLED;
+		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 		raw_spin_unlock(&wqe->lock);
 		unstalled = io_wait_on_hash(wqe, stall_hash);
 		raw_spin_lock(&wqe->lock);
 		if (unstalled) {
-			wqe->flags &= ~IO_WQE_FLAG_STALLED;
+			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 			if (wq_has_sleeper(&wqe->wq->hash->wait))
 				wake_up(&wqe->wq->hash->wait);
 		}
@@ -526,6 +489,7 @@  static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 static void io_worker_handle_work(struct io_worker *worker)
 	__releases(wqe->lock)
 {
+	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
 	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -540,7 +504,7 @@  static void io_worker_handle_work(struct io_worker *worker)
 		 * can't make progress, any work completion or insertion will
 		 * clear the stalled flag.
 		 */
-		work = io_get_next_work(wqe, worker);
+		work = io_get_next_work(acct, worker);
 		if (work)
 			__io_worker_busy(wqe, worker, work);
 
@@ -576,7 +540,7 @@  static void io_worker_handle_work(struct io_worker *worker)
 				/* serialize hash clear with wake_up() */
 				spin_lock_irq(&wq->hash->wait.lock);
 				clear_bit(hash, &wq->hash->map);
-				wqe->flags &= ~IO_WQE_FLAG_STALLED;
+				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 				spin_unlock_irq(&wq->hash->wait.lock);
 				if (wq_has_sleeper(&wq->hash->wait))
 					wake_up(&wq->hash->wait);
@@ -595,6 +559,7 @@  static void io_worker_handle_work(struct io_worker *worker)
 static int io_wqe_worker(void *data)
 {
 	struct io_worker *worker = data;
+	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
 	struct io_wq *wq = wqe->wq;
 	char buf[TASK_COMM_LEN];
@@ -610,7 +575,7 @@  static int io_wqe_worker(void *data)
 		set_current_state(TASK_INTERRUPTIBLE);
 loop:
 		raw_spin_lock_irq(&wqe->lock);
-		if (io_wqe_run_queue(wqe)) {
+		if (io_acct_run_queue(acct)) {
 			io_worker_handle_work(worker);
 			goto loop;
 		}
@@ -636,7 +601,7 @@  static int io_wqe_worker(void *data)
 
 	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
 		raw_spin_lock_irq(&wqe->lock);
-		if (!wq_list_empty(&wqe->work_list))
+		if (!wq_list_empty(&acct->work_list))
 			io_worker_handle_work(worker);
 		else
 			raw_spin_unlock_irq(&wqe->lock);
@@ -782,12 +747,13 @@  static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 
 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 {
+	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 	unsigned int hash;
 	struct io_wq_work *tail;
 
 	if (!io_wq_is_hashed(work)) {
 append:
-		wq_list_add_tail(&work->list, &wqe->work_list);
+		wq_list_add_tail(&work->list, &acct->work_list);
 		return;
 	}
 
@@ -797,7 +763,7 @@  static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 	if (!tail)
 		goto append;
 
-	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
+	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
 }
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
@@ -819,10 +785,10 @@  static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 
 	raw_spin_lock_irqsave(&wqe->lock, flags);
 	io_wqe_insert_work(wqe, work);
-	wqe->flags &= ~IO_WQE_FLAG_STALLED;
+	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 
 	rcu_read_lock();
-	do_create = !io_wqe_activate_free_worker(wqe);
+	do_create = !io_wqe_activate_free_worker(wqe, acct);
 	rcu_read_unlock();
 
 	raw_spin_unlock_irqrestore(&wqe->lock, flags);
@@ -875,6 +841,7 @@  static inline void io_wqe_remove_pending(struct io_wqe *wqe,
 					 struct io_wq_work *work,
 					 struct io_wq_work_node *prev)
 {
+	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 	unsigned int hash = io_get_work_hash(work);
 	struct io_wq_work *prev_work = NULL;
 
@@ -886,7 +853,7 @@  static inline void io_wqe_remove_pending(struct io_wqe *wqe,
 		else
 			wqe->hash_tail[hash] = NULL;
 	}
-	wq_list_del(&wqe->work_list, &work->list, prev);
+	wq_list_del(&acct->work_list, &work->list, prev);
 }
 
 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
@@ -895,22 +862,27 @@  static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 	struct io_wq_work_node *node, *prev;
 	struct io_wq_work *work;
 	unsigned long flags;
+	int i;
 
 retry:
 	raw_spin_lock_irqsave(&wqe->lock, flags);
-	wq_list_for_each(node, prev, &wqe->work_list) {
-		work = container_of(node, struct io_wq_work, list);
-		if (!match->fn(work, match->data))
-			continue;
-		io_wqe_remove_pending(wqe, work, prev);
-		raw_spin_unlock_irqrestore(&wqe->lock, flags);
-		io_run_cancel(work, wqe);
-		match->nr_pending++;
-		if (!match->cancel_all)
-			return;
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
 
-		/* not safe to continue after unlock */
-		goto retry;
+		wq_list_for_each(node, prev, &acct->work_list) {
+			work = container_of(node, struct io_wq_work, list);
+			if (!match->fn(work, match->data))
+				continue;
+			io_wqe_remove_pending(wqe, work, prev);
+			raw_spin_unlock_irqrestore(&wqe->lock, flags);
+			io_run_cancel(work, wqe);
+			match->nr_pending++;
+			if (!match->cancel_all)
+				return;
+
+			/* not safe to continue after unlock */
+			goto retry;
+		}
 	}
 	raw_spin_unlock_irqrestore(&wqe->lock, flags);
 }
@@ -971,18 +943,24 @@  static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
 			    int sync, void *key)
 {
 	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
+	int i;
 
 	list_del_init(&wait->entry);
 
 	rcu_read_lock();
-	io_wqe_activate_free_worker(wqe);
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+		struct io_wqe_acct *acct = &wqe->acct[i];
+
+		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
+			io_wqe_activate_free_worker(wqe, acct);
+	}
 	rcu_read_unlock();
 	return 1;
 }
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 {
-	int ret = -ENOMEM, node;
+	int ret = -ENOMEM, node, i;
 	struct io_wq *wq;
 
 	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
@@ -1019,18 +997,20 @@  struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 			goto err;
 		wq->wqes[node] = wqe;
 		wqe->node = alloc_node;
-		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
-		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
 		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
-		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
 		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
 					task_rlimit(current, RLIMIT_NPROC);
-		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
-		wqe->wait.func = io_wqe_hash_wake;
 		INIT_LIST_HEAD(&wqe->wait.entry);
+		wqe->wait.func = io_wqe_hash_wake;
+		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+			struct io_wqe_acct *acct = &wqe->acct[i];
+
+			acct->index = i;
+			atomic_set(&acct->nr_running, 0);
+			INIT_WQ_LIST(&acct->work_list);
+		}
 		wqe->wq = wq;
 		raw_spin_lock_init(&wqe->lock);
-		INIT_WQ_LIST(&wqe->work_list);
 		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
 		INIT_LIST_HEAD(&wqe->all_list);
 	}