diff mbox series

[2/2] nbd/server: Quiesce coroutines on context switch

Message ID 20201204165347.73542-3-slp@redhat.com
State New
Headers show
Series [1/2] virtio-blk: Acquire context while switching them on dataplane start | expand

Commit Message

Sergio Lopez Dec. 4, 2020, 4:53 p.m. UTC
When switching between AIO contexts we need to me make sure that both
recv_coroutine and send_coroutine are not scheduled to run. Otherwise,
QEMU may crash while attaching the new context with an error like
this one:

aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule'

To achieve this we need a local implementation of
'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done
by 'nbd/client.c') that allows us to interrupt the operation and to
know when recv_coroutine is yielding.

With this in place, we delegate detaching the AIO context to the
owning context with a BH ('nbd_aio_detach_bh') scheduled using
'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the
channel by setting 'client->quiescing' to 'true', and either waits for
the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in
'nbd_read_eof', actively enters the coroutine to interrupt it.

RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326
Signed-off-by: Sergio Lopez <slp@redhat.com>
---
 nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 106 insertions(+), 14 deletions(-)

Comments

Eric Blake Dec. 4, 2020, 6:39 p.m. UTC | #1
On 12/4/20 10:53 AM, Sergio Lopez wrote:
> When switching between AIO contexts we need to me make sure that both
> recv_coroutine and send_coroutine are not scheduled to run. Otherwise,
> QEMU may crash while attaching the new context with an error like
> this one:
> 
> aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule'
> 
> To achieve this we need a local implementation of
> 'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done
> by 'nbd/client.c') that allows us to interrupt the operation and to
> know when recv_coroutine is yielding.
> 
> With this in place, we delegate detaching the AIO context to the
> owning context with a BH ('nbd_aio_detach_bh') scheduled using
> 'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the
> channel by setting 'client->quiescing' to 'true', and either waits for
> the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in
> 'nbd_read_eof', actively enters the coroutine to interrupt it.
> 
> RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326
> Signed-off-by: Sergio Lopez <slp@redhat.com>
> ---
>  nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 106 insertions(+), 14 deletions(-)

A complex patch, so I'd appreciate a second set of eyes.

> 
> diff --git a/nbd/server.c b/nbd/server.c
> index 613ed2634a..7229f487d2 100644
> --- a/nbd/server.c
> +++ b/nbd/server.c
> @@ -132,6 +132,9 @@ struct NBDClient {
>      CoMutex send_lock;
>      Coroutine *send_coroutine;
>  
> +    bool read_yielding;
> +    bool quiescing;

Will either of these fields need to be accessed atomically once the
'yank' code is added, or are we still safe with direct access because
coroutines are not multithreaded?

> +
>      QTAILQ_ENTRY(NBDClient) next;
>      int nb_requests;
>      bool closing;
> @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
>      return 0;
>  }
>  
> -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
> +/* nbd_read_eof
> + * Tries to read @size bytes from @ioc. This is a local implementation of
> + * qio_channel_readv_all_eof. We have it here because we need it to be
> + * interruptible and to know when the coroutine is yielding.
> + * Returns 1 on success
> + *         0 on eof, when no data was read (errp is not set)
> + *         negative errno on failure (errp is set)
> + */
> +static inline int coroutine_fn
> +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
> +{
> +    bool partial = false;
> +
> +    assert(size);
> +    while (size > 0) {
> +        struct iovec iov = { .iov_base = buffer, .iov_len = size };
> +        ssize_t len;
> +
> +        len = qio_channel_readv(client->ioc, &iov, 1, errp);
> +        if (len == QIO_CHANNEL_ERR_BLOCK) {
> +            client->read_yielding = true;
> +            qio_channel_yield(client->ioc, G_IO_IN);
> +            client->read_yielding = false;

nbd/client.c:nbd_read_eof() uses bdrv_dec/inc_in_flight instead of
read_yielding...

> +            if (client->quiescing) {
> +                return -EAGAIN;
> +            }

and the quiescing check is new; otherwise, these two functions look
identical.  Having two static functions with the same name makes gdb a
bit more annoying (which one of the two did you want your breakpoint
on?).  Is there any way we could write this code only once in
nbd/common.c for reuse by both client and server?  But I can live with
it as written.

> @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
>  
>  /* nbd_co_receive_request
>   * Collect a client request. Return 0 if request looks valid, -EIO to drop
> - * connection right away, and any other negative value to report an error to
> - * the client (although the caller may still need to disconnect after reporting
> - * the error).
> + * connection right away, -EAGAIN to indicate we were interrupted and the
> + * channel should be quiesced, and any other negative value to report an error
> + * to the client (although the caller may still need to disconnect after
> + * reporting the error).
>   */
>  static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
>                                    Error **errp)
>  {
>      NBDClient *client = req->client;
>      int valid_flags;
> +    int ret;
>  
>      g_assert(qemu_in_coroutine());
>      assert(client->recv_coroutine == qemu_coroutine_self());
> -    if (nbd_receive_request(client->ioc, request, errp) < 0) {
> -        return -EIO;
> +    ret = nbd_receive_request(client, request, errp);
> +    if (ret < 0) {
> +        return  ret;

Why the double space?

The old code slams to EIO, you preserve errors.  Is that going to bite
us by causing us to see a different errno leaked through?

>      }
>  
>      trace_nbd_co_receive_request_decode_type(request->handle, request->type,
> @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque)
>          return;
>      }
>  
> +    if (client->quiescing) {
> +        /*
> +         * We're switching between AIO contexts. Don't attempt to receive a new
> +         * request and kick the main context which may be waiting for us.

s/request/request,/

> +         */
> +        nbd_client_put(client);
> +        client->recv_coroutine = NULL;
> +        aio_wait_kick();
> +        return;
> +    }
> +
>      req = nbd_request_get(client);
>      ret = nbd_co_receive_request(req, &request, &local_err);
>      client->recv_coroutine = NULL;
> @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque)
>          goto done;
>      }
>  
> +    if (ret == -EAGAIN) {
> +        assert(client->quiescing);
> +        goto done;
> +    }
> +
>      nbd_client_receive_next_request(client);
>      if (ret == -EIO) {
>          goto disconnect;
> @@ -2565,7 +2656,8 @@ disconnect:
>  
>  static void nbd_client_receive_next_request(NBDClient *client)
>  {
> -    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
> +    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
> +        !client->quiescing) {
>          nbd_client_get(client);
>          client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
>          aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
> 

Overall looks okay to me,
Reviewed-by: Eric Blake <eblake@redhat.com>
Sergio Lopez Dec. 9, 2020, 5:31 p.m. UTC | #2
On Fri, Dec 04, 2020 at 12:39:07PM -0600, Eric Blake wrote:
> On 12/4/20 10:53 AM, Sergio Lopez wrote:
> > When switching between AIO contexts we need to me make sure that both
> > recv_coroutine and send_coroutine are not scheduled to run. Otherwise,
> > QEMU may crash while attaching the new context with an error like
> > this one:
> > 
> > aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule'
> > 
> > To achieve this we need a local implementation of
> > 'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done
> > by 'nbd/client.c') that allows us to interrupt the operation and to
> > know when recv_coroutine is yielding.
> > 
> > With this in place, we delegate detaching the AIO context to the
> > owning context with a BH ('nbd_aio_detach_bh') scheduled using
> > 'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the
> > channel by setting 'client->quiescing' to 'true', and either waits for
> > the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in
> > 'nbd_read_eof', actively enters the coroutine to interrupt it.
> > 
> > RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326
> > Signed-off-by: Sergio Lopez <slp@redhat.com>
> > ---
> >  nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------
> >  1 file changed, 106 insertions(+), 14 deletions(-)
> 
> A complex patch, so I'd appreciate a second set of eyes.
> 
> > 
> > diff --git a/nbd/server.c b/nbd/server.c
> > index 613ed2634a..7229f487d2 100644
> > --- a/nbd/server.c
> > +++ b/nbd/server.c
> > @@ -132,6 +132,9 @@ struct NBDClient {
> >      CoMutex send_lock;
> >      Coroutine *send_coroutine;
> >  
> > +    bool read_yielding;
> > +    bool quiescing;
> 
> Will either of these fields need to be accessed atomically once the
> 'yank' code is added, or are we still safe with direct access because
> coroutines are not multithreaded?

Yes, those are only accessed from coroutines, which will be scheduled
on the same thread.

> > +
> >      QTAILQ_ENTRY(NBDClient) next;
> >      int nb_requests;
> >      bool closing;
> > @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
> >      return 0;
> >  }
> >  
> > -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
> > +/* nbd_read_eof
> > + * Tries to read @size bytes from @ioc. This is a local implementation of
> > + * qio_channel_readv_all_eof. We have it here because we need it to be
> > + * interruptible and to know when the coroutine is yielding.
> > + * Returns 1 on success
> > + *         0 on eof, when no data was read (errp is not set)
> > + *         negative errno on failure (errp is set)
> > + */
> > +static inline int coroutine_fn
> > +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
> > +{
> > +    bool partial = false;
> > +
> > +    assert(size);
> > +    while (size > 0) {
> > +        struct iovec iov = { .iov_base = buffer, .iov_len = size };
> > +        ssize_t len;
> > +
> > +        len = qio_channel_readv(client->ioc, &iov, 1, errp);
> > +        if (len == QIO_CHANNEL_ERR_BLOCK) {
> > +            client->read_yielding = true;
> > +            qio_channel_yield(client->ioc, G_IO_IN);
> > +            client->read_yielding = false;
> 
> nbd/client.c:nbd_read_eof() uses bdrv_dec/inc_in_flight instead of
> read_yielding...
> 
> > +            if (client->quiescing) {
> > +                return -EAGAIN;
> > +            }
> 
> and the quiescing check is new; otherwise, these two functions look
> identical.  Having two static functions with the same name makes gdb a
> bit more annoying (which one of the two did you want your breakpoint
> on?).  Is there any way we could write this code only once in
> nbd/common.c for reuse by both client and server?  But I can live with
> it as written.

I'm not happy with this either, but on the first implementation I've
tried to come up with a unique function for both use cases, and it
looked terrible.

We can easily use a different name, though.

> > @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
> >  
> >  /* nbd_co_receive_request
> >   * Collect a client request. Return 0 if request looks valid, -EIO to drop
> > - * connection right away, and any other negative value to report an error to
> > - * the client (although the caller may still need to disconnect after reporting
> > - * the error).
> > + * connection right away, -EAGAIN to indicate we were interrupted and the
> > + * channel should be quiesced, and any other negative value to report an error
> > + * to the client (although the caller may still need to disconnect after
> > + * reporting the error).
> >   */
> >  static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
> >                                    Error **errp)
> >  {
> >      NBDClient *client = req->client;
> >      int valid_flags;
> > +    int ret;
> >  
> >      g_assert(qemu_in_coroutine());
> >      assert(client->recv_coroutine == qemu_coroutine_self());
> > -    if (nbd_receive_request(client->ioc, request, errp) < 0) {
> > -        return -EIO;
> > +    ret = nbd_receive_request(client, request, errp);
> > +    if (ret < 0) {
> > +        return  ret;
> 
> Why the double space?

Ouch, copy/paste mistake.

> The old code slams to EIO, you preserve errors.  Is that going to bite
> us by causing us to see a different errno leaked through?

When reading from the channel, nbd_read_eof hides the actual errno
behind EIO, so the only actual difference is that, with this change,
nbd_receive_request may send this error...

    if (magic != NBD_REQUEST_MAGIC) {
        error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
        return -EINVAL;
    }

... to the client (via server.c:2624), which I think is the right
thing to do.

> >      }
> >  
> >      trace_nbd_co_receive_request_decode_type(request->handle, request->type,
> > @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque)
> >          return;
> >      }
> >  
> > +    if (client->quiescing) {
> > +        /*
> > +         * We're switching between AIO contexts. Don't attempt to receive a new
> > +         * request and kick the main context which may be waiting for us.
> 
> s/request/request,/

Thanks, will fix this comment.

> > +         */
> > +        nbd_client_put(client);
> > +        client->recv_coroutine = NULL;
> > +        aio_wait_kick();
> > +        return;
> > +    }
> > +
> >      req = nbd_request_get(client);
> >      ret = nbd_co_receive_request(req, &request, &local_err);
> >      client->recv_coroutine = NULL;
> > @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque)
> >          goto done;
> >      }
> >  
> > +    if (ret == -EAGAIN) {
> > +        assert(client->quiescing);
> > +        goto done;
> > +    }
> > +
> >      nbd_client_receive_next_request(client);
> >      if (ret == -EIO) {
> >          goto disconnect;
> > @@ -2565,7 +2656,8 @@ disconnect:
> >  
> >  static void nbd_client_receive_next_request(NBDClient *client)
> >  {
> > -    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
> > +    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
> > +        !client->quiescing) {
> >          nbd_client_get(client);
> >          client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
> >          aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
> > 
> 
> Overall looks okay to me,
> Reviewed-by: Eric Blake <eblake@redhat.com>
> 
> -- 
> Eric Blake, Principal Software Engineer
> Red Hat, Inc.           +1-919-301-3226
> Virtualization:  qemu.org | libvirt.org
>
diff mbox series

Patch

diff --git a/nbd/server.c b/nbd/server.c
index 613ed2634a..7229f487d2 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -132,6 +132,9 @@  struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
+    bool read_yielding;
+    bool quiescing;
+
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -1352,14 +1355,60 @@  static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     return 0;
 }
 
-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ *         0 on eof, when no data was read (errp is not set)
+ *         negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+    bool partial = false;
+
+    assert(size);
+    while (size > 0) {
+        struct iovec iov = { .iov_base = buffer, .iov_len = size };
+        ssize_t len;
+
+        len = qio_channel_readv(client->ioc, &iov, 1, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            client->read_yielding = true;
+            qio_channel_yield(client->ioc, G_IO_IN);
+            client->read_yielding = false;
+            if (client->quiescing) {
+                return -EAGAIN;
+            }
+            continue;
+        } else if (len < 0) {
+            return -EIO;
+        } else if (len == 0) {
+            if (partial) {
+                error_setg(errp,
+                           "Unexpected end-of-file before all bytes were read");
+                return -EIO;
+            } else {
+                return 0;
+            }
+        }
+
+        partial = true;
+        size -= len;
+        buffer = (uint8_t *) buffer + len;
+    }
+    return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
                                Error **errp)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     int ret;
 
-    ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+    ret = nbd_read_eof(client, buf, sizeof(buf), errp);
     if (ret < 0) {
         return ret;
     }
@@ -1480,11 +1529,37 @@  static void blk_aio_attached(AioContext *ctx, void *opaque)
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
         qio_channel_attach_aio_context(client->ioc, ctx);
+
+        assert(client->recv_coroutine == NULL);
+        assert(client->send_coroutine == NULL);
+
+        if (client->quiescing) {
+            client->quiescing = false;
+            nbd_client_receive_next_request(client);
+        }
+    }
+}
+
+static void nbd_aio_detach_bh(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        qio_channel_detach_aio_context(client->ioc);
+        client->quiescing = true;
+
         if (client->recv_coroutine) {
-            aio_co_schedule(ctx, client->recv_coroutine);
+            if (client->read_yielding) {
+                qemu_aio_coroutine_enter(exp->common.ctx,
+                                         client->recv_coroutine);
+            } else {
+                AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
+            }
         }
+
         if (client->send_coroutine) {
-            aio_co_schedule(ctx, client->send_coroutine);
+            AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
         }
     }
 }
@@ -1492,13 +1567,10 @@  static void blk_aio_attached(AioContext *ctx, void *opaque)
 static void blk_aio_detach(void *opaque)
 {
     NBDExport *exp = opaque;
-    NBDClient *client;
 
     trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
 
-    QTAILQ_FOREACH(client, &exp->clients, next) {
-        qio_channel_detach_aio_context(client->ioc);
-    }
+    aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
 
     exp->common.ctx = NULL;
 }
@@ -2151,20 +2223,23 @@  static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
 
 /* nbd_co_receive_request
  * Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
  */
 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
                                   Error **errp)
 {
     NBDClient *client = req->client;
     int valid_flags;
+    int ret;
 
     g_assert(qemu_in_coroutine());
     assert(client->recv_coroutine == qemu_coroutine_self());
-    if (nbd_receive_request(client->ioc, request, errp) < 0) {
-        return -EIO;
+    ret = nbd_receive_request(client, request, errp);
+    if (ret < 0) {
+        return  ret;
     }
 
     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
@@ -2507,6 +2582,17 @@  static coroutine_fn void nbd_trip(void *opaque)
         return;
     }
 
+    if (client->quiescing) {
+        /*
+         * We're switching between AIO contexts. Don't attempt to receive a new
+         * request and kick the main context which may be waiting for us.
+         */
+        nbd_client_put(client);
+        client->recv_coroutine = NULL;
+        aio_wait_kick();
+        return;
+    }
+
     req = nbd_request_get(client);
     ret = nbd_co_receive_request(req, &request, &local_err);
     client->recv_coroutine = NULL;
@@ -2519,6 +2605,11 @@  static coroutine_fn void nbd_trip(void *opaque)
         goto done;
     }
 
+    if (ret == -EAGAIN) {
+        assert(client->quiescing);
+        goto done;
+    }
+
     nbd_client_receive_next_request(client);
     if (ret == -EIO) {
         goto disconnect;
@@ -2565,7 +2656,8 @@  disconnect:
 
 static void nbd_client_receive_next_request(NBDClient *client)
 {
-    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+        !client->quiescing) {
         nbd_client_get(client);
         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
         aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);