diff mbox

[RFC,v5,05/14] net: port socket to GSource

Message ID 1366944455-14239-6-git-send-email-qemulist@gmail.com
State New
Headers show

Commit Message

pingfan liu April 26, 2013, 2:47 a.m. UTC
From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>

Port NetSocketState onto NetClientSource. The only thing specail is that
owning to the socket's state machine changes, we need to change the handler.
We implement that by destroy the old NetClientSource and attach a new one
with NetSocketState.

Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
---
 net/socket.c |  196 +++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 159 insertions(+), 37 deletions(-)

Comments

Stefan Hajnoczi April 26, 2013, 9:48 a.m. UTC | #1
On Fri, Apr 26, 2013 at 10:47:26AM +0800, Liu Ping Fan wrote:
> @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf,
>      return ret;
>  }
>  
> +static gushort socket_connecting_readable(void *opaque)
> +{
> +    return G_IO_IN;
> +}
> +
> +static gushort socket_listen_readable(void *opaque)
> +{
> +    /* listen only handle in-req, no err */
> +    return G_IO_IN;

From the accept(2) man page:

"Linux accept() (and accept4()) passes already-pending network errors on
the new socket as an error code from accept()."

So we must handle errors from accept(2), please use G_IO_IN | G_IO_HUP |
G_IO_ERR.

> +static gushort socket_establish_readable(void *opaque)
> +{
> +    NetSocketState *s = opaque;
> +
> +    /* rely on net_socket_send to handle err */
> +    if (s->read_poll && net_socket_can_send(s)) {
> +        return G_IO_IN|G_IO_HUP|G_IO_ERR;
> +    }
> +    return G_IO_HUP|G_IO_ERR;
> +}

This new function always monitors G_IO_HUP | G_IO_ERR.  The old code
only monitored it when read_poll == true and net_socket_can_send() ==
true.

Please preserve semantics.

> +static gushort socket_establish_writable(void *opaque)
> +{
> +    NetSocketState *s = opaque;
> +
> +    if (s->write_poll) {
> +        return G_IO_OUT;

Errors/hang up?

> @@ -440,9 +529,20 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>      s->listen_fd = -1;
>  
>      if (is_connected) {
> -        net_socket_connect(s);
> +        assert(!s->nsrc);
> +        s->nsrc = event_source_new(fd, net_socket_establish_handler, s);
> +        s->nsrc->readable = socket_establish_readable;
> +        s->nsrc->writable = socket_establish_writable;
> +        nc->info->bind_ctx(nc, NULL);
> +        net_socket_read_poll(s, true);
> +        net_socket_write_poll(s, true);
>      } else {
> -        qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
> +        assert(!s->nsrc);
> +        s->nsrc = event_source_new(fd, net_socket_connect_handler, s);
> +        s->nsrc->readable = socket_connecting_readable;

The original code wants writeable, not readable.

> +static gboolean net_socket_listen_handler(gpointer data)
> +{
> +    EventGSource *nsrc = data;
> +    NetSocketState *s = nsrc->opaque;
>      struct sockaddr_in saddr;
>      socklen_t len;
>      int fd;
>  
> -    for(;;) {
> -        len = sizeof(saddr);
> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
> -        if (fd < 0 && errno != EINTR) {
> -            return;
> -        } else if (fd >= 0) {
> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
> -            break;
> -        }
> +    len = sizeof(saddr);
> +    fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
> +    if (fd < 0 && errno != EINTR) {
> +        return false;
>      }

This breaks the code when accept(2) is interrupted by a signal and we
get -1, errno == EINTR.  Why did you remove the loop?
pingfan liu April 27, 2013, 7:09 a.m. UTC | #2
On Fri, Apr 26, 2013 at 5:48 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> On Fri, Apr 26, 2013 at 10:47:26AM +0800, Liu Ping Fan wrote:
>> @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf,
>>      return ret;
>>  }
>>
>> +static gushort socket_connecting_readable(void *opaque)
>> +{
>> +    return G_IO_IN;
>> +}
>> +
>> +static gushort socket_listen_readable(void *opaque)
>> +{
>> +    /* listen only handle in-req, no err */
>> +    return G_IO_IN;
>
> From the accept(2) man page:
>
> "Linux accept() (and accept4()) passes already-pending network errors on
> the new socket as an error code from accept()."
>
> So we must handle errors from accept(2), please use G_IO_IN | G_IO_HUP |
> G_IO_ERR.
>
Here, we handle listen(2), not accept(2)
>> +static gushort socket_establish_readable(void *opaque)
>> +{
>> +    NetSocketState *s = opaque;
>> +
>> +    /* rely on net_socket_send to handle err */
>> +    if (s->read_poll && net_socket_can_send(s)) {
>> +        return G_IO_IN|G_IO_HUP|G_IO_ERR;
>> +    }
>> +    return G_IO_HUP|G_IO_ERR;
>> +}
>
> This new function always monitors G_IO_HUP | G_IO_ERR.  The old code
> only monitored it when read_poll == true and net_socket_can_send() ==
> true.
>
> Please preserve semantics.
>
But the only the code in net_socket_send() will handle the err
condition. See the code behind "/* end of connection */".  And I think
it is safely to handle err, even when the peer is not ready to
receive.

>> +static gushort socket_establish_writable(void *opaque)
>> +{
>> +    NetSocketState *s = opaque;
>> +
>> +    if (s->write_poll) {
>> +        return G_IO_OUT;
>
> Errors/hang up?
>
As explained above, net_socket_writable() does not handle the err
condition. But maybe we need the qemu_flush_queued_packets() in it?

>> @@ -440,9 +529,20 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>>      s->listen_fd = -1;
>>
>>      if (is_connected) {
>> -        net_socket_connect(s);
>> +        assert(!s->nsrc);
>> +        s->nsrc = event_source_new(fd, net_socket_establish_handler, s);
>> +        s->nsrc->readable = socket_establish_readable;
>> +        s->nsrc->writable = socket_establish_writable;
>> +        nc->info->bind_ctx(nc, NULL);
>> +        net_socket_read_poll(s, true);
>> +        net_socket_write_poll(s, true);
>>      } else {
>> -        qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
>> +        assert(!s->nsrc);
>> +        s->nsrc = event_source_new(fd, net_socket_connect_handler, s);
>> +        s->nsrc->readable = socket_connecting_readable;
>
> The original code wants writeable, not readable.
>
Will fix it.
>> +static gboolean net_socket_listen_handler(gpointer data)
>> +{
>> +    EventGSource *nsrc = data;
>> +    NetSocketState *s = nsrc->opaque;
>>      struct sockaddr_in saddr;
>>      socklen_t len;
>>      int fd;
>>
>> -    for(;;) {
>> -        len = sizeof(saddr);
>> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>> -        if (fd < 0 && errno != EINTR) {
>> -            return;
>> -        } else if (fd >= 0) {
>> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>> -            break;
>> -        }
>> +    len = sizeof(saddr);
>> +    fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>> +    if (fd < 0 && errno != EINTR) {
>> +        return false;
>>      }
>
> This breaks the code when accept(2) is interrupted by a signal and we
> get -1, errno == EINTR.  Why did you remove the loop?
Oh, will fix it.

Thanks,
Pingfan
Stefan Hajnoczi April 29, 2013, 8:21 a.m. UTC | #3
On Sat, Apr 27, 2013 at 03:09:10PM +0800, liu ping fan wrote:
> On Fri, Apr 26, 2013 at 5:48 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> > On Fri, Apr 26, 2013 at 10:47:26AM +0800, Liu Ping Fan wrote:
> >> @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf,
> >>      return ret;
> >>  }
> >>
> >> +static gushort socket_connecting_readable(void *opaque)
> >> +{
> >> +    return G_IO_IN;
> >> +}
> >> +
> >> +static gushort socket_listen_readable(void *opaque)
> >> +{
> >> +    /* listen only handle in-req, no err */
> >> +    return G_IO_IN;
> >
> > From the accept(2) man page:
> >
> > "Linux accept() (and accept4()) passes already-pending network errors on
> > the new socket as an error code from accept()."
> >
> > So we must handle errors from accept(2), please use G_IO_IN | G_IO_HUP |
> > G_IO_ERR.
> >
> Here, we handle listen(2), not accept(2)

Look again, the handler invokes accept(2).  listen(2) was called to put
the socket into the listening state but now we are monitoring for
accept.

> >> +static gushort socket_establish_readable(void *opaque)
> >> +{
> >> +    NetSocketState *s = opaque;
> >> +
> >> +    /* rely on net_socket_send to handle err */
> >> +    if (s->read_poll && net_socket_can_send(s)) {
> >> +        return G_IO_IN|G_IO_HUP|G_IO_ERR;
> >> +    }
> >> +    return G_IO_HUP|G_IO_ERR;
> >> +}
> >
> > This new function always monitors G_IO_HUP | G_IO_ERR.  The old code
> > only monitored it when read_poll == true and net_socket_can_send() ==
> > true.
> >
> > Please preserve semantics.
> >
> But the only the code in net_socket_send() will handle the err
> condition. See the code behind "/* end of connection */".  And I think
> it is safely to handle err, even when the peer is not ready to
> receive.
> 
> >> +static gushort socket_establish_writable(void *opaque)
> >> +{
> >> +    NetSocketState *s = opaque;
> >> +
> >> +    if (s->write_poll) {
> >> +        return G_IO_OUT;
> >
> > Errors/hang up?
> >
> As explained above, net_socket_writable() does not handle the err
> condition. But maybe we need the qemu_flush_queued_packets() in it?

net_socket_receive() does handle send(2) errors but it does so
differently from net_socket_send().  It fails the packet and resets
->send_index.

The change you made doesn't really solve this because an error could
still happen right when QEMU calls send(2) and therefore not be
processed by net_socket_send().

Changing the send/receive error handling is something that could be done
carefully in a separate patch.  But please preserve semantics in
conversion patches - it makes them easy to review and merge.

As explained above, I'm not convinced that the change you made is
useful.

Stefan
diff mbox

Patch

diff --git a/net/socket.c b/net/socket.c
index 396dc8c..abdb809 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -31,6 +31,8 @@ 
 #include "qemu/option.h"
 #include "qemu/sockets.h"
 #include "qemu/iov.h"
+#include "util/event_gsource.h"
+
 
 typedef struct NetSocketState {
     NetClientState nc;
@@ -42,13 +44,15 @@  typedef struct NetSocketState {
     unsigned int send_index;      /* number of bytes sent (only SOCK_STREAM) */
     uint8_t buf[4096];
     struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */
-    IOHandler *send_fn;           /* differs between SOCK_STREAM/SOCK_DGRAM */
     bool read_poll;               /* waiting to receive data? */
     bool write_poll;              /* waiting to transmit data? */
+    EventGSource *nsrc;
 } NetSocketState;
 
-static void net_socket_accept(void *opaque);
 static void net_socket_writable(void *opaque);
+static gboolean net_socket_listen_handler(gpointer data);
+static gboolean net_socket_establish_handler(gpointer data);
+
 
 /* Only read packets from socket when peer can receive them */
 static int net_socket_can_send(void *opaque)
@@ -58,25 +62,14 @@  static int net_socket_can_send(void *opaque)
     return qemu_can_send_packet(&s->nc);
 }
 
-static void net_socket_update_fd_handler(NetSocketState *s)
-{
-    qemu_set_fd_handler2(s->fd,
-                         s->read_poll  ? net_socket_can_send : NULL,
-                         s->read_poll  ? s->send_fn : NULL,
-                         s->write_poll ? net_socket_writable : NULL,
-                         s);
-}
-
 static void net_socket_read_poll(NetSocketState *s, bool enable)
 {
     s->read_poll = enable;
-    net_socket_update_fd_handler(s);
 }
 
 static void net_socket_write_poll(NetSocketState *s, bool enable)
 {
     s->write_poll = enable;
-    net_socket_update_fd_handler(s);
 }
 
 static void net_socket_writable(void *opaque)
@@ -141,6 +134,59 @@  static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf,
     return ret;
 }
 
+static gushort socket_connecting_readable(void *opaque)
+{
+    return G_IO_IN;
+}
+
+static gushort socket_listen_readable(void *opaque)
+{
+    /* listen only handle in-req, no err */
+    return G_IO_IN;
+}
+
+static gushort socket_establish_readable(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    /* rely on net_socket_send to handle err */
+    if (s->read_poll && net_socket_can_send(s)) {
+        return G_IO_IN|G_IO_HUP|G_IO_ERR;
+    }
+    return G_IO_HUP|G_IO_ERR;
+}
+
+static gushort socket_establish_writable(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    if (s->write_poll) {
+        return G_IO_OUT;
+    }
+    return 0;
+}
+
+static gushort socket_dgram_readable(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    /* rely on net_socket_send_dgram to handle err */
+    if (s->read_poll && net_socket_can_send(s)) {
+        return G_IO_IN|G_IO_ERR;
+    }
+    return G_IO_ERR;
+}
+
+static gushort socket_dgram_writable(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    if (s->write_poll) {
+        return G_IO_OUT;
+    }
+    return 0;
+}
+
 static void net_socket_send(void *opaque)
 {
     NetSocketState *s = opaque;
@@ -160,7 +206,11 @@  static void net_socket_send(void *opaque)
         net_socket_read_poll(s, false);
         net_socket_write_poll(s, false);
         if (s->listen_fd != -1) {
-            qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
+            event_source_release(s->nsrc);
+            s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler,
+                                s);
+            s->nsrc->readable = socket_listen_readable;
+            s->nc.info->bind_ctx(&s->nc, NULL);
         }
         closesocket(s->fd);
 
@@ -231,6 +281,8 @@  static void net_socket_send_dgram(void *opaque)
         /* end of connection */
         net_socket_read_poll(s, false);
         net_socket_write_poll(s, false);
+        /* for dgram err, removing it */
+        g_source_remove_poll(&s->nsrc->source, &s->nsrc->gfd);
         return;
     }
     qemu_send_packet(&s->nc, s->buf, size);
@@ -331,6 +383,14 @@  static void net_socket_cleanup(NetClientState *nc)
         closesocket(s->listen_fd);
         s->listen_fd = -1;
     }
+    event_source_release(s->nsrc);
+}
+
+static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx)
+{
+    NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
+
+    g_source_attach(&s->nsrc->source, ctx);
 }
 
 static NetClientInfo net_dgram_socket_info = {
@@ -338,8 +398,23 @@  static NetClientInfo net_dgram_socket_info = {
     .size = sizeof(NetSocketState),
     .receive = net_socket_receive_dgram,
     .cleanup = net_socket_cleanup,
+    .bind_ctx = net_socket_bind_ctx,
 };
 
+static gboolean net_socket_dgram_handler(gpointer data)
+{
+    EventGSource *nsrc = (EventGSource *)data;
+    NetSocketState *s = nsrc->opaque;
+
+    /* for err, unregister the handler */
+    if (nsrc->gfd.revents & (G_IO_IN|G_IO_ERR)) {
+        net_socket_send_dgram(s);
+    } else {
+        net_socket_writable(s);
+    }
+    return true;
+}
+
 static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
                                                 const char *model,
                                                 const char *name,
@@ -393,8 +468,12 @@  static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
 
     s->fd = fd;
     s->listen_fd = -1;
-    s->send_fn = net_socket_send_dgram;
+    s->nsrc = event_source_new(fd, net_socket_dgram_handler, s);
+    s->nsrc->readable = socket_dgram_readable;
+    s->nsrc->writable = socket_dgram_writable;
+    nc->info->bind_ctx(nc, NULL);
     net_socket_read_poll(s, true);
+    net_socket_write_poll(s, true);
 
     /* mcast: save bound address as dst */
     if (is_connected) {
@@ -408,20 +487,30 @@  err:
     return NULL;
 }
 
-static void net_socket_connect(void *opaque)
-{
-    NetSocketState *s = opaque;
-    s->send_fn = net_socket_send;
-    net_socket_read_poll(s, true);
-}
-
 static NetClientInfo net_socket_info = {
     .type = NET_CLIENT_OPTIONS_KIND_SOCKET,
     .size = sizeof(NetSocketState),
     .receive = net_socket_receive,
     .cleanup = net_socket_cleanup,
+    .bind_ctx = net_socket_bind_ctx,
 };
 
+static gboolean net_socket_connect_handler(gpointer data)
+{
+    EventGSource *nsrc = data;
+    NetSocketState *s = nsrc->opaque;
+
+    event_source_release(s->nsrc);
+    s->nsrc = event_source_new(s->fd, net_socket_establish_handler, s);
+    s->nsrc->readable = socket_establish_readable;
+    s->nsrc->writable = socket_establish_writable;
+    s->nc.info->bind_ctx(&s->nc, NULL);
+    net_socket_read_poll(s, true);
+    net_socket_write_poll(s, true);
+
+    return true;
+}
+
 static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
                                                  const char *model,
                                                  const char *name,
@@ -440,9 +529,20 @@  static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
     s->listen_fd = -1;
 
     if (is_connected) {
-        net_socket_connect(s);
+        assert(!s->nsrc);
+        s->nsrc = event_source_new(fd, net_socket_establish_handler, s);
+        s->nsrc->readable = socket_establish_readable;
+        s->nsrc->writable = socket_establish_writable;
+        nc->info->bind_ctx(nc, NULL);
+        net_socket_read_poll(s, true);
+        net_socket_write_poll(s, true);
     } else {
-        qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
+        assert(!s->nsrc);
+        s->nsrc = event_source_new(fd, net_socket_connect_handler, s);
+        s->nsrc->readable = socket_connecting_readable;
+        nc->info->bind_ctx(nc, NULL);
+        net_socket_read_poll(s, true);
+        net_socket_write_poll(s, false);
     }
     return s;
 }
@@ -473,30 +573,49 @@  static NetSocketState *net_socket_fd_init(NetClientState *peer,
     return NULL;
 }
 
-static void net_socket_accept(void *opaque)
+static gboolean net_socket_establish_handler(gpointer data)
 {
-    NetSocketState *s = opaque;
+    EventGSource *nsrc = (EventGSource *)data;
+    NetSocketState *s = nsrc->opaque;
+
+    /* for err case, resort to the logic in net_socket_send to recover */
+    if (nsrc->gfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) {
+        net_socket_send(s);
+    }
+    if ((nsrc->gfd.revents & G_IO_OUT)) {
+        net_socket_writable(s);
+    }
+    return true;
+}
+
+static gboolean net_socket_listen_handler(gpointer data)
+{
+    EventGSource *nsrc = data;
+    NetSocketState *s = nsrc->opaque;
     struct sockaddr_in saddr;
     socklen_t len;
     int fd;
 
-    for(;;) {
-        len = sizeof(saddr);
-        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
-        if (fd < 0 && errno != EINTR) {
-            return;
-        } else if (fd >= 0) {
-            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
-            break;
-        }
+    len = sizeof(saddr);
+    fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
+    if (fd < 0 && errno != EINTR) {
+        return false;
     }
 
     s->fd = fd;
     s->nc.link_down = false;
-    net_socket_connect(s);
+    /* prevent more than one connect req */
+    event_source_release(s->nsrc);
+    s->nsrc = event_source_new(fd, net_socket_establish_handler, s);
+    s->nsrc->readable = socket_establish_readable;
+    s->nsrc->writable = socket_establish_writable;
+    s->nc.info->bind_ctx(&s->nc, NULL);
+    net_socket_read_poll(s, true);
     snprintf(s->nc.info_str, sizeof(s->nc.info_str),
              "socket: connection from %s:%d",
              inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port));
+
+    return true;
 }
 
 static int net_socket_listen_init(NetClientState *peer,
@@ -542,7 +661,10 @@  static int net_socket_listen_init(NetClientState *peer,
     s->listen_fd = fd;
     s->nc.link_down = true;
 
-    qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
+    s->nsrc = event_source_new(fd, net_socket_listen_handler, s);
+    s->nsrc->readable = socket_listen_readable;
+    nc->info->bind_ctx(nc, NULL);
+
     return 0;
 }