diff mbox series

[10/32] aio: implement IOCB_CMD_POLL

Message ID 20180515194833.6906-11-hch@lst.de
State Not Applicable, archived
Delegated to: David Miller
Headers show
Series [01/32] fs: unexport poll_schedule_timeout | expand

Commit Message

Christoph Hellwig May 15, 2018, 7:48 p.m. UTC
Simple one-shot poll through the io_submit() interface.  To poll for
a file descriptor the application should submit an iocb of type
IOCB_CMD_POLL.  It will poll the fd for the events specified in the
the first 32 bits of the aio_buf field of the iocb.

Unlike poll or epoll without EPOLLONESHOT this interface always works
in one shot mode, that is once the iocb is completed, it will have to be
resubmitted.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Acked-by: Jeff Moyer <jmoyer@redhat.com>
Reviewed-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Reviewed-by: Darrick J. Wong <darrick.wong@oracle.com>
---
 fs/aio.c                     | 92 +++++++++++++++++++++++++++++++++++-
 include/uapi/linux/aio_abi.h |  6 +--
 2 files changed, 93 insertions(+), 5 deletions(-)

Comments

Al Viro May 20, 2018, 5:32 a.m. UTC | #1
On Tue, May 15, 2018 at 09:48:11PM +0200, Christoph Hellwig wrote:
> +static ssize_t aio_poll(struct aio_kiocb *aiocb, struct iocb *iocb)
> +{
> +	struct kioctx *ctx = aiocb->ki_ctx;
> +	struct poll_iocb *req = &aiocb->poll;
> +	unsigned long flags;
> +	__poll_t mask;
> +
> +	/* reject any unknown events outside the normal event mask. */
> +	if ((u16)iocb->aio_buf != iocb->aio_buf)
> +		return -EINVAL;
> +	/* reject fields that are not defined for poll */
> +	if (iocb->aio_offset || iocb->aio_nbytes || iocb->aio_rw_flags)
> +		return -EINVAL;
> +
> +	req->events = demangle_poll(iocb->aio_buf) | POLLERR | POLLHUP;
> +	req->file = fget(iocb->aio_fildes);
> +	if (unlikely(!req->file))
> +		return -EBADF;
> +	if (!file_has_poll_mask(req->file))
> +		goto out_fail;
> +
> +	req->head = req->file->f_op->get_poll_head(req->file, req->events);
> +	if (!req->head)
> +		goto out_fail;
> +	if (IS_ERR(req->head)) {
> +		mask = EPOLLERR;
> +		goto done;
> +	}
> +
> +	init_waitqueue_func_entry(&req->wait, aio_poll_wake);
> +	aiocb->ki_cancel = aio_poll_cancel;
> +
> +	spin_lock_irqsave(&ctx->ctx_lock, flags);
> +	list_add_tail(&aiocb->ki_list, &ctx->delayed_cancel_reqs);
> +	spin_unlock(&ctx->ctx_lock);

... and io_cancel(2) comes, finds it and inhume^Wcompletes it, leaving us to...

> +	spin_lock(&req->head->lock);

... get buggered on attempt to dereference a pointer fetched from freed and
reused object.
Al Viro May 20, 2018, 7:33 a.m. UTC | #2
On Sun, May 20, 2018 at 06:32:25AM +0100, Al Viro wrote:
> > +	spin_lock_irqsave(&ctx->ctx_lock, flags);
> > +	list_add_tail(&aiocb->ki_list, &ctx->delayed_cancel_reqs);
> > +	spin_unlock(&ctx->ctx_lock);
> 
> ... and io_cancel(2) comes, finds it and inhume^Wcompletes it, leaving us to...
> 
> > +	spin_lock(&req->head->lock);
> 
> ... get buggered on attempt to dereference a pointer fetched from freed and
> reused object.

FWIW, how painful would it be to pull the following trick:
	* insert into wait queue under ->ctx_lock
	* have wakeup do schedule_work() with aio_complete() done from that
	* have ->ki_cancel() grab queue lock, remove from queue and use
the same schedule_work()

That way you'd get ->ki_cancel() with the same semantics as originally for
everything - "ask politely to finish ASAP", and called in the same locking
environment for everyone - under ->ctx_lock, that is.  queue lock nests
inside ->ctx_lock; no magical flags, etc.

The cost is schedule_work() for each async poll-related completion as you
have for fsync.  I don't know whether that's too costly or not; it certainly
simplifies the things, but whether it's OK performance-wise...

Comments?
Christoph Hellwig May 20, 2018, 5:32 p.m. UTC | #3
On Sun, May 20, 2018 at 08:33:39AM +0100, Al Viro wrote:
> > ... get buggered on attempt to dereference a pointer fetched from freed and
> > reused object.
> 
> FWIW, how painful would it be to pull the following trick:
> 	* insert into wait queue under ->ctx_lock
> 	* have wakeup do schedule_work() with aio_complete() done from that
> 	* have ->ki_cancel() grab queue lock, remove from queue and use
> the same schedule_work()
> 
> That way you'd get ->ki_cancel() with the same semantics as originally for
> everything - "ask politely to finish ASAP", and called in the same locking
> environment for everyone - under ->ctx_lock, that is.  queue lock nests
> inside ->ctx_lock; no magical flags, etc.
> 
> The cost is schedule_work() for each async poll-related completion as you
> have for fsync.  I don't know whether that's too costly or not; it certainly
> simplifies the things, but whether it's OK performance-wise...

I think it is doable:

	http://git.infradead.org/users/hch/vfs.git/commitdiff/c441130e405465268ea10c9ddd5639c155f779e8

downside is that sizeof(struct aio_kiocb) grows a bit.

For the completion performance we can use a spin_trylock to still avoid
the context switch for the common case:

	http://git.infradead.org/users/hch/vfs.git/commitdiff/6cc1827afbea87c52fe425cf533bfcf5f3308163
diff mbox series

Patch

diff --git a/fs/aio.c b/fs/aio.c
index 3afca506c7f0..f2c7674e2151 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -5,6 +5,7 @@ 
  *	Implements an efficient asynchronous io interface.
  *
  *	Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
+ *	Copyright 2018 Christoph Hellwig.
  *
  *	See ../COPYING for licensing terms.
  */
@@ -165,10 +166,18 @@  struct fsync_iocb {
 	bool			datasync;
 };
 
+struct poll_iocb {
+	struct file		*file;
+	__poll_t		events;
+	struct wait_queue_head	*head;
+	struct wait_queue_entry	wait;
+};
+
 struct aio_kiocb {
 	union {
 		struct kiocb		rw;
 		struct fsync_iocb	fsync;
+		struct poll_iocb	poll;
 	};
 
 	struct kioctx		*ki_ctx;
@@ -1569,7 +1578,6 @@  static int aio_fsync(struct fsync_iocb *req, struct iocb *iocb, bool datasync)
 	if (unlikely(iocb->aio_buf || iocb->aio_offset || iocb->aio_nbytes ||
 			iocb->aio_rw_flags))
 		return -EINVAL;
-
 	req->file = fget(iocb->aio_fildes);
 	if (unlikely(!req->file))
 		return -EBADF;
@@ -1584,6 +1592,86 @@  static int aio_fsync(struct fsync_iocb *req, struct iocb *iocb, bool datasync)
 	return -EIOCBQUEUED;
 }
 
+static int aio_poll_cancel(struct aio_kiocb *iocb)
+{
+	remove_wait_queue(iocb->poll.head, &iocb->poll.wait);
+	aio_complete(iocb, iocb->poll.file, 0, 0, AIO_COMPLETE_CANCEL);
+	return 0;
+}
+
+static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
+		void *key)
+{
+	struct poll_iocb *req = container_of(wait, struct poll_iocb, wait);
+	struct aio_kiocb *iocb = container_of(req, struct aio_kiocb, poll);
+	struct file *file = req->file;
+	__poll_t mask = key_to_poll(key);
+
+	assert_spin_locked(&req->head->lock);
+
+	/* for instances that support it check for an event match first: */
+	if (mask && !(mask & req->events))
+		return 0;
+
+	mask = file->f_op->poll_mask(file, req->events);
+	if (!mask)
+		return 0;
+
+	__remove_wait_queue(req->head, &req->wait);
+	aio_complete(iocb, req->file, mangle_poll(mask), 0, 0);
+	return 1;
+}
+
+static ssize_t aio_poll(struct aio_kiocb *aiocb, struct iocb *iocb)
+{
+	struct kioctx *ctx = aiocb->ki_ctx;
+	struct poll_iocb *req = &aiocb->poll;
+	unsigned long flags;
+	__poll_t mask;
+
+	/* reject any unknown events outside the normal event mask. */
+	if ((u16)iocb->aio_buf != iocb->aio_buf)
+		return -EINVAL;
+	/* reject fields that are not defined for poll */
+	if (iocb->aio_offset || iocb->aio_nbytes || iocb->aio_rw_flags)
+		return -EINVAL;
+
+	req->events = demangle_poll(iocb->aio_buf) | POLLERR | POLLHUP;
+	req->file = fget(iocb->aio_fildes);
+	if (unlikely(!req->file))
+		return -EBADF;
+	if (!file_has_poll_mask(req->file))
+		goto out_fail;
+
+	req->head = req->file->f_op->get_poll_head(req->file, req->events);
+	if (!req->head)
+		goto out_fail;
+	if (IS_ERR(req->head)) {
+		mask = EPOLLERR;
+		goto done;
+	}
+
+	init_waitqueue_func_entry(&req->wait, aio_poll_wake);
+	aiocb->ki_cancel = aio_poll_cancel;
+
+	spin_lock_irqsave(&ctx->ctx_lock, flags);
+	list_add_tail(&aiocb->ki_list, &ctx->delayed_cancel_reqs);
+	spin_unlock(&ctx->ctx_lock);
+
+	spin_lock(&req->head->lock);
+	mask = req->file->f_op->poll_mask(req->file, req->events);
+	if (!mask)
+		__add_wait_queue(req->head, &req->wait);
+	spin_unlock_irqrestore(&req->head->lock, flags);
+done:
+	if (mask)
+		aio_complete(aiocb, req->file, mangle_poll(mask), 0, 0);
+	return -EIOCBQUEUED;
+out_fail:
+	fput(req->file);
+	return -EINVAL; /* same as no support for IOCB_CMD_POLL */
+}
+
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 			 struct iocb *iocb, bool compat)
 {
@@ -1652,6 +1740,8 @@  static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		break;
 	case IOCB_CMD_FDSYNC:
 		ret = aio_fsync(&req->fsync, iocb, true);
+	case IOCB_CMD_POLL:
+		ret = aio_poll(req, iocb);
 		break;
 	default:
 		pr_debug("invalid aio operation %d\n", iocb->aio_lio_opcode);
diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h
index 2c0a3415beee..ed0185945bb2 100644
--- a/include/uapi/linux/aio_abi.h
+++ b/include/uapi/linux/aio_abi.h
@@ -39,10 +39,8 @@  enum {
 	IOCB_CMD_PWRITE = 1,
 	IOCB_CMD_FSYNC = 2,
 	IOCB_CMD_FDSYNC = 3,
-	/* These two are experimental.
-	 * IOCB_CMD_PREADX = 4,
-	 * IOCB_CMD_POLL = 5,
-	 */
+	/* 4 was the experimental IOCB_CMD_PREADX */
+	IOCB_CMD_POLL = 5,
 	IOCB_CMD_NOOP = 6,
 	IOCB_CMD_PREADV = 7,
 	IOCB_CMD_PWRITEV = 8,