diff mbox

[RFC,3/5] nbd: Use aio_set_fd_handler2()

Message ID 1401561792-13410-4-git-send-email-mreitz@redhat.com
State New
Headers show

Commit Message

Max Reitz May 31, 2014, 6:43 p.m. UTC
Instead of using the main loop function qemu_set_fd_handler2(), use the
AIO function in the context of the exported BDS.

Signed-off-by: Max Reitz <mreitz@redhat.com>
---
 nbd.c | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

Comments

Stefan Hajnoczi June 4, 2014, 12:37 p.m. UTC | #1
On Sat, May 31, 2014 at 08:43:10PM +0200, Max Reitz wrote:
> Instead of using the main loop function qemu_set_fd_handler2(), use the
> AIO function in the context of the exported BDS.

Managing fd handlers shouldn't be necessary at the NBD code level.  The
current NBD code hasn't been fully converted to coroutines.

This email explains how the NBD code can be fully converted to
coroutines.  It should simplify the code and reduce the chance of bugs.
Whether you want to actually do the conversion is up to you, since it's
somewhat orthogonal to the purpose of this patch series.

The point of coroutines is that blocking operations like send/recv on a
socket should look like regular blocking calls.  Let coroutines handle
the event loop housekeeping (registering fd handlers, callbacks).  Only
use aio explicitly when concurrency is needed.

Here is how I would structure NBD using coroutines:

1 coroutine per connection to receive NBD commands and submit I/O
requests:

def nbd_server_receive_co(conn):
    while True:
        req = nbd_read_req(conn)
    if req is None:
        break
    if req.type == NBD_READ:
        bdrv_aio_readv(bs, ...)
    elif req.type == NBD_WRITE:
        ...

Notice that bdrv_aio_*() is used since we want concurrent I/O requests.

1 coroutine per connection to send NBD replies:

def nbd_server_send_co(conn):
    while True:
        while conn.send_queue:
            resp = conn.send_queue.pop()
            nbd_write_resp(conn, resp)
        qemu_coroutine_yield()

And finally the bdrv_aio_*() callback to put responses on to the send
queue:

def nbd_server_aio_cb(conn):
    resp = NBDResponse(...)
    conn.send_queue.push(resp)
    conn.send_co.enter()

Why is this design cleaner?  Because NBD code doesn't have to worry
about fd handlers.  It uses straightforward coroutine send/recv for
socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
that only one coroutine receives from the socket and that only one
coroutine writes to the socket.

Stefan
Paolo Bonzini June 4, 2014, 6:02 p.m. UTC | #2
Il 04/06/2014 14:37, Stefan Hajnoczi ha scritto:
> Why is this design cleaner?  Because NBD code doesn't have to worry
> about fd handlers.  It uses straightforward coroutine send/recv for
> socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
> that only one coroutine receives from the socket and that only one
> coroutine writes to the socket.

I don't understand how this would work without managing fd handlers.

- If you don't want to receive anything (because you reached the maximum 
queue depth), and the client sends something, the read handler will busy 
wait.  The current code solves it with can_read; you could do it by 
enabling or disabling the read handler as you need, but the idea is the 
same.

- If you're not sending anything, a socket that has a write handler will 
have POLLOUT continuously signaled and again you'd busy wait.  Since 
there is no can_write, nbd.c instead enables/disables the write handler 
around nbd_co_send_reply.

Removing can_read from the NBD server is simpler than Max's patch 2, so 
that's what I suggest.

The NBD server code uses a coroutine per request.  Using one coroutine 
per direction is nice, but doesn't avert the need to manage fd handlers.

Basically, it boils down to select(2) being level-triggered.  If it were 
edge-triggered, of course you could have other bugs but you probably 
could write network code without managing fd handlers.

Paolo
Stefan Hajnoczi June 5, 2014, 8:12 a.m. UTC | #3
On Wed, Jun 04, 2014 at 08:02:06PM +0200, Paolo Bonzini wrote:
> Il 04/06/2014 14:37, Stefan Hajnoczi ha scritto:
> >Why is this design cleaner?  Because NBD code doesn't have to worry
> >about fd handlers.  It uses straightforward coroutine send/recv for
> >socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
> >that only one coroutine receives from the socket and that only one
> >coroutine writes to the socket.
> 
> I don't understand how this would work without managing fd handlers.

fd handlers still need to be managed, but not by NBD code.  They must be
managed by coroutine recv/send utility functions.  In other words, fd
handlers are used locally, not globally.

def co_recv(fd, buf):
    while True:
        nbytes = recv(fd, buf, len(buf))
        if nbytes == -1:
            if errno == EINTR:
                continue
            if errno == EAGAIN or errno == EWOULDBLOCK:
                aio_set_fd_read_handler(fd, co_recv_cb)
                qemu_coroutine_yield()
                aio_set_fd_read_handler(fd, NULL)
                continue
        return nbytes

The send function is similar.

This does require an extension to the AioContext API.  We need to be
able to modify the read/write callback independently without clobbering
the other callback.  This way full duplex I/O is possible.

> - If you don't want to receive anything (because you reached the maximum
> queue depth), and the client sends something, the read handler will busy
> wait.  The current code solves it with can_read; you could do it by enabling
> or disabling the read handler as you need, but the idea is the same.
> 
> - If you're not sending anything, a socket that has a write handler will
> have POLLOUT continuously signaled and again you'd busy wait.  Since there
> is no can_write, nbd.c instead enables/disables the write handler around
> nbd_co_send_reply.

You only install an fd handler when you want to read/write.  This does
mean that the request coroutine needs to be woken up by the response
coroutine if we were at maximum queue depth.

Stefan
Paolo Bonzini June 5, 2014, 9:27 a.m. UTC | #4
Il 05/06/2014 10:12, Stefan Hajnoczi ha scritto:
> On Wed, Jun 04, 2014 at 08:02:06PM +0200, Paolo Bonzini wrote:
>> Il 04/06/2014 14:37, Stefan Hajnoczi ha scritto:
>>> Why is this design cleaner?  Because NBD code doesn't have to worry
>>> about fd handlers.  It uses straightforward coroutine send/recv for
>>> socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
>>> that only one coroutine receives from the socket and that only one
>>> coroutine writes to the socket.
>>
>> I don't understand how this would work without managing fd handlers.
>
> fd handlers still need to be managed, but not by NBD code.  They must be
> managed by coroutine recv/send utility functions.  In other words, fd
> handlers are used locally, not globally.
>
> def co_recv(fd, buf):
>     while True:
>         nbytes = recv(fd, buf, len(buf))
>         if nbytes == -1:
>             if errno == EINTR:
>                 continue
>             if errno == EAGAIN or errno == EWOULDBLOCK:
>                 aio_set_fd_read_handler(fd, co_recv_cb)
>                 qemu_coroutine_yield()
>                 aio_set_fd_read_handler(fd, NULL)
>                 continue
>         return nbytes
>
> The send function is similar.

I see what you mean now---this however is not how qemu_co_recv and 
qemu_co_send work.  NBD uses them, but they require the caller to set up 
the handlers manually.

Paolo
Stefan Hajnoczi June 5, 2014, 1:32 p.m. UTC | #5
On Thu, Jun 05, 2014 at 11:27:49AM +0200, Paolo Bonzini wrote:
> Il 05/06/2014 10:12, Stefan Hajnoczi ha scritto:
> >On Wed, Jun 04, 2014 at 08:02:06PM +0200, Paolo Bonzini wrote:
> >>Il 04/06/2014 14:37, Stefan Hajnoczi ha scritto:
> >>>Why is this design cleaner?  Because NBD code doesn't have to worry
> >>>about fd handlers.  It uses straightforward coroutine send/recv for
> >>>socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
> >>>that only one coroutine receives from the socket and that only one
> >>>coroutine writes to the socket.
> >>
> >>I don't understand how this would work without managing fd handlers.
> >
> >fd handlers still need to be managed, but not by NBD code.  They must be
> >managed by coroutine recv/send utility functions.  In other words, fd
> >handlers are used locally, not globally.
> >
> >def co_recv(fd, buf):
> >    while True:
> >        nbytes = recv(fd, buf, len(buf))
> >        if nbytes == -1:
> >            if errno == EINTR:
> >                continue
> >            if errno == EAGAIN or errno == EWOULDBLOCK:
> >                aio_set_fd_read_handler(fd, co_recv_cb)
> >                qemu_coroutine_yield()
> >                aio_set_fd_read_handler(fd, NULL)
> >                continue
> >        return nbytes
> >
> >The send function is similar.
> 
> I see what you mean now---this however is not how qemu_co_recv and
> qemu_co_send work.  NBD uses them, but they require the caller to set up the
> handlers manually.

Yep, and I think that's not a clean way to program since it forgoes the
advantage of coroutines: functions that look just like their blocking
equivalents.  I think we should hide the fd handler management so
coroutine socket users don't need to worry about it.

Stefan
Max Reitz June 5, 2014, 6:18 p.m. UTC | #6
On 04.06.2014 14:37, Stefan Hajnoczi wrote:
> On Sat, May 31, 2014 at 08:43:10PM +0200, Max Reitz wrote:
>> Instead of using the main loop function qemu_set_fd_handler2(), use the
>> AIO function in the context of the exported BDS.
> Managing fd handlers shouldn't be necessary at the NBD code level.  The
> current NBD code hasn't been fully converted to coroutines.
>
> This email explains how the NBD code can be fully converted to
> coroutines.  It should simplify the code and reduce the chance of bugs.
> Whether you want to actually do the conversion is up to you, since it's
> somewhat orthogonal to the purpose of this patch series.
>
> The point of coroutines is that blocking operations like send/recv on a
> socket should look like regular blocking calls.  Let coroutines handle
> the event loop housekeeping (registering fd handlers, callbacks).  Only
> use aio explicitly when concurrency is needed.
>
> Here is how I would structure NBD using coroutines:
>
> 1 coroutine per connection to receive NBD commands and submit I/O
> requests:
>
> def nbd_server_receive_co(conn):
>      while True:
>          req = nbd_read_req(conn)
>      if req is None:
>          break
>      if req.type == NBD_READ:
>          bdrv_aio_readv(bs, ...)
>      elif req.type == NBD_WRITE:
>          ...
>
> Notice that bdrv_aio_*() is used since we want concurrent I/O requests.
>
> 1 coroutine per connection to send NBD replies:
>
> def nbd_server_send_co(conn):
>      while True:
>          while conn.send_queue:
>              resp = conn.send_queue.pop()
>              nbd_write_resp(conn, resp)
>          qemu_coroutine_yield()
>
> And finally the bdrv_aio_*() callback to put responses on to the send
> queue:
>
> def nbd_server_aio_cb(conn):
>      resp = NBDResponse(...)
>      conn.send_queue.push(resp)
>      conn.send_co.enter()
>
> Why is this design cleaner?  Because NBD code doesn't have to worry
> about fd handlers.  It uses straightforward coroutine send/recv for
> socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
> that only one coroutine receives from the socket and that only one
> coroutine writes to the socket.

Yes, this sounds better. I'll take a look into it and see how far I can get.

Max
Paolo Bonzini June 6, 2014, 7:44 a.m. UTC | #7
Il 05/06/2014 20:18, Max Reitz ha scritto:
>>
>> Why is this design cleaner?  Because NBD code doesn't have to worry
>> about fd handlers.  It uses straightforward coroutine send/recv for
>> socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to see
>> that only one coroutine receives from the socket and that only one
>> coroutine writes to the socket.
>
> Yes, this sounds better. I'll take a look into it and see how far I can
> get.

But it doesn't solve any problem, and requires rethinking the 
aio_set_fd_handler API.  I suggest you just refactor all calls to 
qemu_set_fd_handler2 into a function like

	qemu_set_fd_handler2(fd, NULL,
			     nbd_can_read(client) ? nbd_read : NULL,
			     client->send_coroutine ?
					nbd_restart_write : NULL,
			     opaque);

and call this function every time the result of nbd_can_read() changes. 
  Then you can switch to aio_set_fd_handler easily.

Paolo
Max Reitz June 7, 2014, 7:27 p.m. UTC | #8
On 06.06.2014 09:44, Paolo Bonzini wrote:
> Il 05/06/2014 20:18, Max Reitz ha scritto:
>>>
>>> Why is this design cleaner?  Because NBD code doesn't have to worry
>>> about fd handlers.  It uses straightforward coroutine send/recv for
>>> socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to 
>>> see
>>> that only one coroutine receives from the socket and that only one
>>> coroutine writes to the socket.
>>
>> Yes, this sounds better. I'll take a look into it and see how far I can
>> get.
>
> But it doesn't solve any problem, and requires rethinking the 
> aio_set_fd_handler API.

It might clean things up. :-)

On the other hand, you are right, it's probably too complicated right now.

> I suggest you just refactor all calls to qemu_set_fd_handler2 into a 
> function like
>
>     qemu_set_fd_handler2(fd, NULL,
>                  nbd_can_read(client) ? nbd_read : NULL,
>                  client->send_coroutine ?
>                     nbd_restart_write : NULL,
>                  opaque);
>
> and call this function every time the result of nbd_can_read() 
> changes.  Then you can switch to aio_set_fd_handler easily.

Seems good, I'll do that.

Max
Stefan Hajnoczi June 9, 2014, 1:35 p.m. UTC | #9
On Sat, Jun 07, 2014 at 09:27:26PM +0200, Max Reitz wrote:
> On 06.06.2014 09:44, Paolo Bonzini wrote:
> >Il 05/06/2014 20:18, Max Reitz ha scritto:
> >>>
> >>>Why is this design cleaner?  Because NBD code doesn't have to worry
> >>>about fd handlers.  It uses straightforward coroutine send/recv for
> >>>socket I/O inside nbd_read_req() and nbd_write_resp().  It's easy to
> >>>see
> >>>that only one coroutine receives from the socket and that only one
> >>>coroutine writes to the socket.
> >>
> >>Yes, this sounds better. I'll take a look into it and see how far I can
> >>get.
> >
> >But it doesn't solve any problem, and requires rethinking the
> >aio_set_fd_handler API.
> 
> It might clean things up. :-)
> 
> On the other hand, you are right, it's probably too complicated right now.
> 
> >I suggest you just refactor all calls to qemu_set_fd_handler2 into a
> >function like
> >
> >    qemu_set_fd_handler2(fd, NULL,
> >                 nbd_can_read(client) ? nbd_read : NULL,
> >                 client->send_coroutine ?
> >                    nbd_restart_write : NULL,
> >                 opaque);
> >
> >and call this function every time the result of nbd_can_read() changes.
> >Then you can switch to aio_set_fd_handler easily.
> 
> Seems good, I'll do that.

Sounds fine to me.  Restructuring NBD can be done later, if desirable.

Stefan
diff mbox

Patch

diff --git a/nbd.c b/nbd.c
index 0787cba..803b3d7 100644
--- a/nbd.c
+++ b/nbd.c
@@ -18,6 +18,7 @@ 
 
 #include "block/nbd.h"
 #include "block/block.h"
+#include "block/block_int.h"
 
 #include "block/coroutine.h"
 
@@ -100,6 +101,8 @@  struct NBDExport {
     uint32_t nbdflags;
     QTAILQ_HEAD(, NBDClient) clients;
     QTAILQ_ENTRY(NBDExport) next;
+
+    AioContext *ctx;
 };
 
 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
@@ -744,7 +747,10 @@  void nbd_client_put(NBDClient *client)
          */
         assert(client->closing);
 
-        qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL);
+        if (client->exp) {
+            aio_set_fd_handler2(client->exp->ctx, client->sock,
+                                NULL, NULL, NULL, NULL);
+        }
         close(client->sock);
         client->sock = -1;
         if (client->exp) {
@@ -797,7 +803,7 @@  static void nbd_request_put(NBDRequest *req)
     g_slice_free(NBDRequest, req);
 
     if (client->nb_requests-- == MAX_NBD_REQUESTS) {
-        qemu_notify_event();
+        aio_notify(client->exp->ctx);
     }
     nbd_client_put(client);
 }
@@ -814,6 +820,7 @@  NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset,
     exp->nbdflags = nbdflags;
     exp->size = size == -1 ? bdrv_getlength(bs) : size;
     exp->close = close;
+    exp->ctx = bdrv_get_aio_context(bs);
     bdrv_ref(bs);
     return exp;
 }
@@ -917,8 +924,8 @@  static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
     ssize_t rc, ret;
 
     qemu_co_mutex_lock(&client->send_lock);
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
-                         nbd_restart_write, client);
+    aio_set_fd_handler2(client->exp->ctx, csock,
+                        nbd_can_read, nbd_read, nbd_restart_write, client);
     client->send_coroutine = qemu_coroutine_self();
 
     if (!len) {
@@ -936,7 +943,8 @@  static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
     }
 
     client->send_coroutine = NULL;
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
+    aio_set_fd_handler2(client->exp->ctx, csock,
+                        nbd_can_read, nbd_read, NULL, client);
     qemu_co_mutex_unlock(&client->send_lock);
     return rc;
 }
@@ -1179,7 +1187,8 @@  NBDClient *nbd_client_new(NBDExport *exp, int csock,
     }
     client->close = close;
     qemu_co_mutex_init(&client->send_lock);
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
+    aio_set_fd_handler2(client->exp->ctx, csock,
+                        nbd_can_read, nbd_read, NULL, client);
 
     if (exp) {
         QTAILQ_INSERT_TAIL(&exp->clients, client, next);