Message ID | 20201204165347.73542-3-slp@redhat.com |
---|---|
State | New |
Headers | show |
Series | [1/2] virtio-blk: Acquire context while switching them on dataplane start | expand |
On 12/4/20 10:53 AM, Sergio Lopez wrote: > When switching between AIO contexts we need to me make sure that both > recv_coroutine and send_coroutine are not scheduled to run. Otherwise, > QEMU may crash while attaching the new context with an error like > this one: > > aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule' > > To achieve this we need a local implementation of > 'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done > by 'nbd/client.c') that allows us to interrupt the operation and to > know when recv_coroutine is yielding. > > With this in place, we delegate detaching the AIO context to the > owning context with a BH ('nbd_aio_detach_bh') scheduled using > 'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the > channel by setting 'client->quiescing' to 'true', and either waits for > the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in > 'nbd_read_eof', actively enters the coroutine to interrupt it. > > RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326 > Signed-off-by: Sergio Lopez <slp@redhat.com> > --- > nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------ > 1 file changed, 106 insertions(+), 14 deletions(-) A complex patch, so I'd appreciate a second set of eyes. > > diff --git a/nbd/server.c b/nbd/server.c > index 613ed2634a..7229f487d2 100644 > --- a/nbd/server.c > +++ b/nbd/server.c > @@ -132,6 +132,9 @@ struct NBDClient { > CoMutex send_lock; > Coroutine *send_coroutine; > > + bool read_yielding; > + bool quiescing; Will either of these fields need to be accessed atomically once the 'yank' code is added, or are we still safe with direct access because coroutines are not multithreaded? > + > QTAILQ_ENTRY(NBDClient) next; > int nb_requests; > bool closing; > @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp) > return 0; > } > > -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request, > +/* nbd_read_eof > + * Tries to read @size bytes from @ioc. This is a local implementation of > + * qio_channel_readv_all_eof. We have it here because we need it to be > + * interruptible and to know when the coroutine is yielding. > + * Returns 1 on success > + * 0 on eof, when no data was read (errp is not set) > + * negative errno on failure (errp is set) > + */ > +static inline int coroutine_fn > +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) > +{ > + bool partial = false; > + > + assert(size); > + while (size > 0) { > + struct iovec iov = { .iov_base = buffer, .iov_len = size }; > + ssize_t len; > + > + len = qio_channel_readv(client->ioc, &iov, 1, errp); > + if (len == QIO_CHANNEL_ERR_BLOCK) { > + client->read_yielding = true; > + qio_channel_yield(client->ioc, G_IO_IN); > + client->read_yielding = false; nbd/client.c:nbd_read_eof() uses bdrv_dec/inc_in_flight instead of read_yielding... > + if (client->quiescing) { > + return -EAGAIN; > + } and the quiescing check is new; otherwise, these two functions look identical. Having two static functions with the same name makes gdb a bit more annoying (which one of the two did you want your breakpoint on?). Is there any way we could write this code only once in nbd/common.c for reuse by both client and server? But I can live with it as written. > @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle, > > /* nbd_co_receive_request > * Collect a client request. Return 0 if request looks valid, -EIO to drop > - * connection right away, and any other negative value to report an error to > - * the client (although the caller may still need to disconnect after reporting > - * the error). > + * connection right away, -EAGAIN to indicate we were interrupted and the > + * channel should be quiesced, and any other negative value to report an error > + * to the client (although the caller may still need to disconnect after > + * reporting the error). > */ > static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, > Error **errp) > { > NBDClient *client = req->client; > int valid_flags; > + int ret; > > g_assert(qemu_in_coroutine()); > assert(client->recv_coroutine == qemu_coroutine_self()); > - if (nbd_receive_request(client->ioc, request, errp) < 0) { > - return -EIO; > + ret = nbd_receive_request(client, request, errp); > + if (ret < 0) { > + return ret; Why the double space? The old code slams to EIO, you preserve errors. Is that going to bite us by causing us to see a different errno leaked through? > } > > trace_nbd_co_receive_request_decode_type(request->handle, request->type, > @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque) > return; > } > > + if (client->quiescing) { > + /* > + * We're switching between AIO contexts. Don't attempt to receive a new > + * request and kick the main context which may be waiting for us. s/request/request,/ > + */ > + nbd_client_put(client); > + client->recv_coroutine = NULL; > + aio_wait_kick(); > + return; > + } > + > req = nbd_request_get(client); > ret = nbd_co_receive_request(req, &request, &local_err); > client->recv_coroutine = NULL; > @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque) > goto done; > } > > + if (ret == -EAGAIN) { > + assert(client->quiescing); > + goto done; > + } > + > nbd_client_receive_next_request(client); > if (ret == -EIO) { > goto disconnect; > @@ -2565,7 +2656,8 @@ disconnect: > > static void nbd_client_receive_next_request(NBDClient *client) > { > - if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { > + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && > + !client->quiescing) { > nbd_client_get(client); > client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); > aio_co_schedule(client->exp->common.ctx, client->recv_coroutine); > Overall looks okay to me, Reviewed-by: Eric Blake <eblake@redhat.com>
On Fri, Dec 04, 2020 at 12:39:07PM -0600, Eric Blake wrote: > On 12/4/20 10:53 AM, Sergio Lopez wrote: > > When switching between AIO contexts we need to me make sure that both > > recv_coroutine and send_coroutine are not scheduled to run. Otherwise, > > QEMU may crash while attaching the new context with an error like > > this one: > > > > aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule' > > > > To achieve this we need a local implementation of > > 'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done > > by 'nbd/client.c') that allows us to interrupt the operation and to > > know when recv_coroutine is yielding. > > > > With this in place, we delegate detaching the AIO context to the > > owning context with a BH ('nbd_aio_detach_bh') scheduled using > > 'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the > > channel by setting 'client->quiescing' to 'true', and either waits for > > the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in > > 'nbd_read_eof', actively enters the coroutine to interrupt it. > > > > RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326 > > Signed-off-by: Sergio Lopez <slp@redhat.com> > > --- > > nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------ > > 1 file changed, 106 insertions(+), 14 deletions(-) > > A complex patch, so I'd appreciate a second set of eyes. > > > > > diff --git a/nbd/server.c b/nbd/server.c > > index 613ed2634a..7229f487d2 100644 > > --- a/nbd/server.c > > +++ b/nbd/server.c > > @@ -132,6 +132,9 @@ struct NBDClient { > > CoMutex send_lock; > > Coroutine *send_coroutine; > > > > + bool read_yielding; > > + bool quiescing; > > Will either of these fields need to be accessed atomically once the > 'yank' code is added, or are we still safe with direct access because > coroutines are not multithreaded? Yes, those are only accessed from coroutines, which will be scheduled on the same thread. > > + > > QTAILQ_ENTRY(NBDClient) next; > > int nb_requests; > > bool closing; > > @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp) > > return 0; > > } > > > > -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request, > > +/* nbd_read_eof > > + * Tries to read @size bytes from @ioc. This is a local implementation of > > + * qio_channel_readv_all_eof. We have it here because we need it to be > > + * interruptible and to know when the coroutine is yielding. > > + * Returns 1 on success > > + * 0 on eof, when no data was read (errp is not set) > > + * negative errno on failure (errp is set) > > + */ > > +static inline int coroutine_fn > > +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) > > +{ > > + bool partial = false; > > + > > + assert(size); > > + while (size > 0) { > > + struct iovec iov = { .iov_base = buffer, .iov_len = size }; > > + ssize_t len; > > + > > + len = qio_channel_readv(client->ioc, &iov, 1, errp); > > + if (len == QIO_CHANNEL_ERR_BLOCK) { > > + client->read_yielding = true; > > + qio_channel_yield(client->ioc, G_IO_IN); > > + client->read_yielding = false; > > nbd/client.c:nbd_read_eof() uses bdrv_dec/inc_in_flight instead of > read_yielding... > > > + if (client->quiescing) { > > + return -EAGAIN; > > + } > > and the quiescing check is new; otherwise, these two functions look > identical. Having two static functions with the same name makes gdb a > bit more annoying (which one of the two did you want your breakpoint > on?). Is there any way we could write this code only once in > nbd/common.c for reuse by both client and server? But I can live with > it as written. I'm not happy with this either, but on the first implementation I've tried to come up with a unique function for both use cases, and it looked terrible. We can easily use a different name, though. > > @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle, > > > > /* nbd_co_receive_request > > * Collect a client request. Return 0 if request looks valid, -EIO to drop > > - * connection right away, and any other negative value to report an error to > > - * the client (although the caller may still need to disconnect after reporting > > - * the error). > > + * connection right away, -EAGAIN to indicate we were interrupted and the > > + * channel should be quiesced, and any other negative value to report an error > > + * to the client (although the caller may still need to disconnect after > > + * reporting the error). > > */ > > static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, > > Error **errp) > > { > > NBDClient *client = req->client; > > int valid_flags; > > + int ret; > > > > g_assert(qemu_in_coroutine()); > > assert(client->recv_coroutine == qemu_coroutine_self()); > > - if (nbd_receive_request(client->ioc, request, errp) < 0) { > > - return -EIO; > > + ret = nbd_receive_request(client, request, errp); > > + if (ret < 0) { > > + return ret; > > Why the double space? Ouch, copy/paste mistake. > The old code slams to EIO, you preserve errors. Is that going to bite > us by causing us to see a different errno leaked through? When reading from the channel, nbd_read_eof hides the actual errno behind EIO, so the only actual difference is that, with this change, nbd_receive_request may send this error... if (magic != NBD_REQUEST_MAGIC) { error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic); return -EINVAL; } ... to the client (via server.c:2624), which I think is the right thing to do. > > } > > > > trace_nbd_co_receive_request_decode_type(request->handle, request->type, > > @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque) > > return; > > } > > > > + if (client->quiescing) { > > + /* > > + * We're switching between AIO contexts. Don't attempt to receive a new > > + * request and kick the main context which may be waiting for us. > > s/request/request,/ Thanks, will fix this comment. > > + */ > > + nbd_client_put(client); > > + client->recv_coroutine = NULL; > > + aio_wait_kick(); > > + return; > > + } > > + > > req = nbd_request_get(client); > > ret = nbd_co_receive_request(req, &request, &local_err); > > client->recv_coroutine = NULL; > > @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque) > > goto done; > > } > > > > + if (ret == -EAGAIN) { > > + assert(client->quiescing); > > + goto done; > > + } > > + > > nbd_client_receive_next_request(client); > > if (ret == -EIO) { > > goto disconnect; > > @@ -2565,7 +2656,8 @@ disconnect: > > > > static void nbd_client_receive_next_request(NBDClient *client) > > { > > - if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { > > + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && > > + !client->quiescing) { > > nbd_client_get(client); > > client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); > > aio_co_schedule(client->exp->common.ctx, client->recv_coroutine); > > > > Overall looks okay to me, > Reviewed-by: Eric Blake <eblake@redhat.com> > > -- > Eric Blake, Principal Software Engineer > Red Hat, Inc. +1-919-301-3226 > Virtualization: qemu.org | libvirt.org >
diff --git a/nbd/server.c b/nbd/server.c index 613ed2634a..7229f487d2 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -132,6 +132,9 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; + bool read_yielding; + bool quiescing; + QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp) return 0; } -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request, +/* nbd_read_eof + * Tries to read @size bytes from @ioc. This is a local implementation of + * qio_channel_readv_all_eof. We have it here because we need it to be + * interruptible and to know when the coroutine is yielding. + * Returns 1 on success + * 0 on eof, when no data was read (errp is not set) + * negative errno on failure (errp is set) + */ +static inline int coroutine_fn +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) +{ + bool partial = false; + + assert(size); + while (size > 0) { + struct iovec iov = { .iov_base = buffer, .iov_len = size }; + ssize_t len; + + len = qio_channel_readv(client->ioc, &iov, 1, errp); + if (len == QIO_CHANNEL_ERR_BLOCK) { + client->read_yielding = true; + qio_channel_yield(client->ioc, G_IO_IN); + client->read_yielding = false; + if (client->quiescing) { + return -EAGAIN; + } + continue; + } else if (len < 0) { + return -EIO; + } else if (len == 0) { + if (partial) { + error_setg(errp, + "Unexpected end-of-file before all bytes were read"); + return -EIO; + } else { + return 0; + } + } + + partial = true; + size -= len; + buffer = (uint8_t *) buffer + len; + } + return 1; +} + +static int nbd_receive_request(NBDClient *client, NBDRequest *request, Error **errp) { uint8_t buf[NBD_REQUEST_SIZE]; uint32_t magic; int ret; - ret = nbd_read(ioc, buf, sizeof(buf), "request", errp); + ret = nbd_read_eof(client, buf, sizeof(buf), errp); if (ret < 0) { return ret; } @@ -1480,11 +1529,37 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) QTAILQ_FOREACH(client, &exp->clients, next) { qio_channel_attach_aio_context(client->ioc, ctx); + + assert(client->recv_coroutine == NULL); + assert(client->send_coroutine == NULL); + + if (client->quiescing) { + client->quiescing = false; + nbd_client_receive_next_request(client); + } + } +} + +static void nbd_aio_detach_bh(void *opaque) +{ + NBDExport *exp = opaque; + NBDClient *client; + + QTAILQ_FOREACH(client, &exp->clients, next) { + qio_channel_detach_aio_context(client->ioc); + client->quiescing = true; + if (client->recv_coroutine) { - aio_co_schedule(ctx, client->recv_coroutine); + if (client->read_yielding) { + qemu_aio_coroutine_enter(exp->common.ctx, + client->recv_coroutine); + } else { + AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL); + } } + if (client->send_coroutine) { - aio_co_schedule(ctx, client->send_coroutine); + AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL); } } } @@ -1492,13 +1567,10 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) static void blk_aio_detach(void *opaque) { NBDExport *exp = opaque; - NBDClient *client; trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); - QTAILQ_FOREACH(client, &exp->clients, next) { - qio_channel_detach_aio_context(client->ioc); - } + aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp); exp->common.ctx = NULL; } @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle, /* nbd_co_receive_request * Collect a client request. Return 0 if request looks valid, -EIO to drop - * connection right away, and any other negative value to report an error to - * the client (although the caller may still need to disconnect after reporting - * the error). + * connection right away, -EAGAIN to indicate we were interrupted and the + * channel should be quiesced, and any other negative value to report an error + * to the client (although the caller may still need to disconnect after + * reporting the error). */ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, Error **errp) { NBDClient *client = req->client; int valid_flags; + int ret; g_assert(qemu_in_coroutine()); assert(client->recv_coroutine == qemu_coroutine_self()); - if (nbd_receive_request(client->ioc, request, errp) < 0) { - return -EIO; + ret = nbd_receive_request(client, request, errp); + if (ret < 0) { + return ret; } trace_nbd_co_receive_request_decode_type(request->handle, request->type, @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque) return; } + if (client->quiescing) { + /* + * We're switching between AIO contexts. Don't attempt to receive a new + * request and kick the main context which may be waiting for us. + */ + nbd_client_put(client); + client->recv_coroutine = NULL; + aio_wait_kick(); + return; + } + req = nbd_request_get(client); ret = nbd_co_receive_request(req, &request, &local_err); client->recv_coroutine = NULL; @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque) goto done; } + if (ret == -EAGAIN) { + assert(client->quiescing); + goto done; + } + nbd_client_receive_next_request(client); if (ret == -EIO) { goto disconnect; @@ -2565,7 +2656,8 @@ disconnect: static void nbd_client_receive_next_request(NBDClient *client) { - if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && + !client->quiescing) { nbd_client_get(client); client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
When switching between AIO contexts we need to me make sure that both recv_coroutine and send_coroutine are not scheduled to run. Otherwise, QEMU may crash while attaching the new context with an error like this one: aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule' To achieve this we need a local implementation of 'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done by 'nbd/client.c') that allows us to interrupt the operation and to know when recv_coroutine is yielding. With this in place, we delegate detaching the AIO context to the owning context with a BH ('nbd_aio_detach_bh') scheduled using 'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the channel by setting 'client->quiescing' to 'true', and either waits for the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in 'nbd_read_eof', actively enters the coroutine to interrupt it. RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326 Signed-off-by: Sergio Lopez <slp@redhat.com> --- nbd/server.c | 120 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 14 deletions(-)