diff mbox series

[07/30] aio: add delayed cancel support

Message ID 20180329203328.3248-8-hch@lst.de
State Not Applicable, archived
Delegated to: David Miller
Headers show
Series [01/30] fs: unexport poll_schedule_timeout | expand

Commit Message

Christoph Hellwig March 29, 2018, 8:33 p.m. UTC
The upcoming aio poll support would like to be able to complete the
iocb inline from the cancellation context, but that would cause a
double lock of ctx_lock with the current locking scheme.  Move the
cancelation outside the context lock to avoid this reversal, which
suits the existing usb gadgets users just fine as well (in fact
both unconditionally disable irqs and thus seem broken without
this change).

To make this safe aio_complete needs to check if this call should
complete the iocb.  If it didn't the callers must not release any
other resources.

Signed-off-by: Christoph Hellwig <hch@lst.de>
---
 fs/aio.c | 60 ++++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 44 insertions(+), 16 deletions(-)

Comments

Al Viro March 29, 2018, 10:35 p.m. UTC | #1
On Thu, Mar 29, 2018 at 10:33:05PM +0200, Christoph Hellwig wrote:
> The upcoming aio poll support would like to be able to complete the
> iocb inline from the cancellation context, but that would cause a
> double lock of ctx_lock with the current locking scheme.  Move the
> cancelation outside the context lock to avoid this reversal, which
> suits the existing usb gadgets users just fine as well (in fact
> both unconditionally disable irqs and thus seem broken without
> this change).
> 
> To make this safe aio_complete needs to check if this call should
> complete the iocb.  If it didn't the callers must not release any
> other resources.

Uh-oh...  What happens to existing users of kiocb_set_cancel_fn() now?
AFAICS, those guys will *not* get aio_kiocb freed at all in case of
io_cancel(2).  Look: we mark them with AIO_IOCB_CANCELLED and
call whatever ->ki_cancel() the driver has set.  Later the damn
thing calls ->ki_complete() (i.e. aio_complete_rw()), which calls
aio_complete(iocb, res, res2, 0) and gets false.  Nothing's freed,
struct file is leaked.

Frankly, the more I look at that, the less I like what you've done
with ->ki_cancel() overloading.  In regular case it's just accelerating
the call of ->ki_complete(), which will do freeing.  Here you have
->ki_cancel() free the damn thing, with the resulting need to play
silly buggers with locking, freeing logics in aio_complete(), etc.
Christoph Hellwig March 30, 2018, 7:14 a.m. UTC | #2
On Thu, Mar 29, 2018 at 11:35:00PM +0100, Al Viro wrote:
> Uh-oh...  What happens to existing users of kiocb_set_cancel_fn() now?
> AFAICS, those guys will *not* get aio_kiocb freed at all in case of
> io_cancel(2).  Look: we mark them with AIO_IOCB_CANCELLED and
> call whatever ->ki_cancel() the driver has set.  Later the damn
> thing calls ->ki_complete() (i.e. aio_complete_rw()), which calls
> aio_complete(iocb, res, res2, 0) and gets false.  Nothing's freed,
> struct file is leaked.

True, we'd need a complete call from it.

> Frankly, the more I look at that, the less I like what you've done
> with ->ki_cancel() overloading.  In regular case it's just accelerating
> the call of ->ki_complete(), which will do freeing.  Here you have
> ->ki_cancel() free the damn thing, with the resulting need to play
> silly buggers with locking, freeing logics in aio_complete(), etc.

I don't really like it all that much either, but I also think the
current model is pretty broken - called under spinlock with irqs
disabled is not even what the current users expect.  Second issue
with the existing ki_cancel is that kiocb_set_cancel_fn operates
on a kiocb, but expects that to be embedded in an aio_kiocb, which
might not always be the case, althought in-kernel I/O is unlikely to
be used on them.  And based on all of these I bet gadget aio cancel
is basically untested.

Anyway, I guess splitting poll direct cancel out in a way that doesn't
overload ->ki_cancel might be a good idea.  It all is inside aio.c
so simply switching on the opcode similar to the submission path
might be one option, or having separate methods.  Moving ki_cancel
to kiocb would also solve above mismatch issue.
diff mbox series

Patch

diff --git a/fs/aio.c b/fs/aio.c
index c724f1429176..2406644e1ecc 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -177,6 +177,9 @@  struct aio_kiocb {
 	struct list_head	ki_list;	/* the aio core uses this
 						 * for cancellation */
 
+	unsigned int		flags;		/* protected by ctx->ctx_lock */
+#define AIO_IOCB_CANCELLED	(1 << 0)
+
 	/*
 	 * If the aio_resfd field of the userspace iocb is not zero,
 	 * this is the underlying eventfd context to deliver events to.
@@ -543,9 +546,9 @@  static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
 #define AIO_EVENTS_FIRST_PAGE	((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
 #define AIO_EVENTS_OFFSET	(AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
 
-void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel)
+static void __kiocb_set_cancel_fn(struct aio_kiocb *req,
+		kiocb_cancel_fn *cancel)
 {
-	struct aio_kiocb *req = container_of(iocb, struct aio_kiocb, rw);
 	struct kioctx *ctx = req->ki_ctx;
 	unsigned long flags;
 
@@ -557,6 +560,12 @@  void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel)
 	req->ki_cancel = cancel;
 	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 }
+
+void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel)
+{
+	return __kiocb_set_cancel_fn(container_of(iocb, struct aio_kiocb, rw),
+			cancel);
+}
 EXPORT_SYMBOL(kiocb_set_cancel_fn);
 
 static void free_ioctx(struct work_struct *work)
@@ -593,18 +602,23 @@  static void free_ioctx_users(struct percpu_ref *ref)
 {
 	struct kioctx *ctx = container_of(ref, struct kioctx, users);
 	struct aio_kiocb *req;
+	LIST_HEAD(list);
 
 	spin_lock_irq(&ctx->ctx_lock);
-
 	while (!list_empty(&ctx->active_reqs)) {
 		req = list_first_entry(&ctx->active_reqs,
 				       struct aio_kiocb, ki_list);
+		req->flags |= AIO_IOCB_CANCELLED;
+		list_move_tail(&req->ki_list, &list);
+	}
+	spin_unlock_irq(&ctx->ctx_lock);
+
+	while (!list_empty(&list)) {
+		req = list_first_entry(&list, struct aio_kiocb, ki_list);
 		list_del_init(&req->ki_list);
 		req->ki_cancel(&req->rw);
 	}
 
-	spin_unlock_irq(&ctx->ctx_lock);
-
 	percpu_ref_kill(&ctx->reqs);
 	percpu_ref_put(&ctx->reqs);
 }
@@ -1040,22 +1054,30 @@  static struct kioctx *lookup_ioctx(unsigned long ctx_id)
 	return ret;
 }
 
+#define AIO_COMPLETE_CANCEL	(1 << 0)
+
 /* aio_complete
  *	Called when the io request on the given iocb is complete.
  */
-static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
+static bool aio_complete(struct aio_kiocb *iocb, long res, long res2,
+		unsigned complete_flags)
 {
 	struct kioctx	*ctx = iocb->ki_ctx;
 	struct aio_ring	*ring;
 	struct io_event	*ev_page, *event;
 	unsigned tail, pos, head;
-	unsigned long	flags;
-
-	if (!list_empty_careful(&iocb->ki_list)) {
-		unsigned long flags;
+	unsigned long flags;
 
+	if (iocb->ki_cancel) {
 		spin_lock_irqsave(&ctx->ctx_lock, flags);
-		list_del(&iocb->ki_list);
+		if (!(complete_flags & AIO_COMPLETE_CANCEL) &&
+		    (iocb->flags & AIO_IOCB_CANCELLED)) {
+			spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+			return false;
+		}
+
+		if (!list_empty(&iocb->ki_list))
+			list_del(&iocb->ki_list);
 		spin_unlock_irqrestore(&ctx->ctx_lock, flags);
 	}
 
@@ -1131,6 +1153,7 @@  static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
 		wake_up(&ctx->wait);
 
 	percpu_ref_put(&ctx->reqs);
+	return true;
 }
 
 /* aio_read_events_ring
@@ -1379,6 +1402,7 @@  SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
 static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
 {
 	struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, rw);
+	struct file *file = kiocb->ki_filp;
 
 	if (kiocb->ki_flags & IOCB_WRITE) {
 		struct inode *inode = file_inode(kiocb->ki_filp);
@@ -1392,8 +1416,8 @@  static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
 		file_end_write(kiocb->ki_filp);
 	}
 
-	fput(kiocb->ki_filp);
-	aio_complete(iocb, res, res2);
+	if (aio_complete(iocb, res, res2, 0))
+		fput(file);
 }
 
 static int aio_prep_rw(struct kiocb *req, struct iocb *iocb)
@@ -1536,11 +1560,13 @@  static ssize_t aio_write(struct kiocb *req, struct iocb *iocb, bool vectored,
 static void aio_fsync_work(struct work_struct *work)
 {
 	struct fsync_iocb *req = container_of(work, struct fsync_iocb, work);
+	struct aio_kiocb *iocb = container_of(req, struct aio_kiocb, fsync);
+	struct file *file = req->file;
 	int ret;
 
 	ret = vfs_fsync(req->file, req->datasync);
-	fput(req->file);
-	aio_complete(container_of(req, struct aio_kiocb, fsync), ret, 0);
+	if (aio_complete(iocb, ret, 0, 0))
+		fput(file);
 }
 
 static int aio_fsync(struct fsync_iocb *req, struct iocb *iocb, bool datasync)
@@ -1816,11 +1842,13 @@  SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
 	spin_lock_irq(&ctx->ctx_lock);
 	kiocb = lookup_kiocb(ctx, iocb, key);
 	if (kiocb) {
+		kiocb->flags |= AIO_IOCB_CANCELLED;
 		list_del_init(&kiocb->ki_list);
-		ret = kiocb->ki_cancel(&kiocb->rw);
 	}
 	spin_unlock_irq(&ctx->ctx_lock);
 
+	if (kiocb)
+		ret = kiocb->ki_cancel(&kiocb->rw);
 	if (!ret) {
 		/*
 		 * The result argument is no longer used - the io_event is