diff mbox

[RFC,v4,06/15] net: port socket to GSource

Message ID 1366187964-14265-7-git-send-email-qemulist@gmail.com
State New
Headers show

Commit Message

pingfan liu April 17, 2013, 8:39 a.m. UTC
From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>

Port NetSocketState onto NetClientSource. The only thing specail is that
owning to the socket's state machine changes, we need to change the handler.
We implement that by destroy the old NetClientSource and attach a new one
with NetSocketState.

Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
---
 net/socket.c |  158 ++++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 122 insertions(+), 36 deletions(-)

Comments

Stefan Hajnoczi April 18, 2013, 2:34 p.m. UTC | #1
On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote:
> @@ -160,7 +154,13 @@ static void net_socket_send(void *opaque)
>          net_socket_read_poll(s, false);
>          net_socket_write_poll(s, false);
>          if (s->listen_fd != -1) {
> -            qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
> +            nsrc = s->nsrc;
> +            new_nsrc = event_source_new(s->listen_fd, net_socket_listen_handler,
> +                                s);
> +            s->nsrc = new_nsrc;
> +            new_nsrc->gfd.events = G_IO_IN;
> +            g_source_destroy(&nsrc->source);
> +            s->nc.info->bind_ctx(&s->nc, NULL);

The following is equivalent:

event_source_release(s->nsrc);
s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, s);
s->nc.info->bind_ctx(&s->nc, NULL);

Then new_nsrc/nsrc can be dropped and the nsrc memory leak is avoided.

Note that gfd.events = G_IO_IN does not get used since prepare()
overwrites gfd.events.  Please drop and make sure read_poll == true.

I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR.  Perhaps
disconnect and network errors will be ignored.

>          }
>          closesocket(s->fd);
>  
> @@ -331,6 +331,14 @@ static void net_socket_cleanup(NetClientState *nc)
>          closesocket(s->listen_fd);
>          s->listen_fd = -1;
>      }
> +    event_source_release(s->nsrc);
> +}
> +
> +static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx)
> +{
> +    NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
> +
> +    g_source_attach(&s->nsrc->source, ctx);
>  }
>  
>  static NetClientInfo net_dgram_socket_info = {
> @@ -338,8 +346,22 @@ static NetClientInfo net_dgram_socket_info = {
>      .size = sizeof(NetSocketState),
>      .receive = net_socket_receive_dgram,
>      .cleanup = net_socket_cleanup,
> +    .bind_ctx = net_socket_bind_ctx,
>  };
>  
> +static gboolean net_socket_dgram_handler(gpointer data)
> +{
> +    EventGSource *nsrc = (EventGSource *)data;
> +    NetSocketState *s = nsrc->opaque;
> +
> +    if (nsrc->gfd.revents & G_IO_IN) {
> +        net_socket_send_dgram(s);
> +    } else {
> +        net_socket_writable(s);
> +    }
> +    return true;
> +}
> +
>  static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
>                                                  const char *model,
>                                                  const char *name,
> @@ -350,6 +372,7 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
>      socklen_t saddr_len;
>      NetClientState *nc;
>      NetSocketState *s;
> +    EventGSource *nsrc;
>  
>      /* fd passed: multicast: "learn" dgram_dst address from bound address and save it
>       * Because this may be "shared" socket from a "master" process, datagrams would be recv()
> @@ -393,7 +416,10 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
>  
>      s->fd = fd;
>      s->listen_fd = -1;
> -    s->send_fn = net_socket_send_dgram;
> +    nsrc = event_source_new(fd, net_socket_dgram_handler, s);
> +    s->nsrc = nsrc;
> +    nsrc->gfd.events = G_IO_IN|G_IO_OUT;

Please drop.

> +    nc->info->bind_ctx(nc, NULL);
>      net_socket_read_poll(s, true);
>  
>      /* mcast: save bound address as dst */
> @@ -408,20 +434,28 @@ err:
>      return NULL;
>  }
>  
> -static void net_socket_connect(void *opaque)
> -{
> -    NetSocketState *s = opaque;
> -    s->send_fn = net_socket_send;
> -    net_socket_read_poll(s, true);
> -}
> -
>  static NetClientInfo net_socket_info = {
>      .type = NET_CLIENT_OPTIONS_KIND_SOCKET,
>      .size = sizeof(NetSocketState),
>      .receive = net_socket_receive,
>      .cleanup = net_socket_cleanup,
> +    .bind_ctx = net_socket_bind_ctx,
>  };
>  
> +static gboolean net_socket_connect_handler(gpointer data)
> +{
> +    EventGSource *new_nsrc, *nsrc = data;
> +    NetSocketState *s = nsrc->opaque;
> +
> +    new_nsrc = event_source_new(s->fd, net_socket_establish_handler, s);
> +    s->nsrc = new_nsrc;
> +    new_nsrc->gfd.events = G_IO_IN|G_IO_OUT;

Please drop.

> +    g_source_destroy(&nsrc->source);
> +    s->nc.info->bind_ctx(&s->nc, NULL);
> +
> +    return true;
> +}
> +
>  static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>                                                   const char *model,
>                                                   const char *name,
> @@ -429,6 +463,7 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>  {
>      NetClientState *nc;
>      NetSocketState *s;
> +    EventGSource *nsrc;
>  
>      nc = qemu_new_net_client(&net_socket_info, peer, model, name);
>  
> @@ -440,9 +475,16 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>      s->listen_fd = -1;
>  
>      if (is_connected) {
> -        net_socket_connect(s);
> +        nsrc = event_source_new(fd, net_socket_establish_handler, s);
> +        s->nsrc = nsrc;
> +        nsrc->gfd.events = G_IO_IN|G_IO_OUT;

Please drop.

> +        nc->info->bind_ctx(nc, NULL);
>      } else {
> -        qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
> +        nsrc = event_source_new(fd, net_socket_connect_handler, s);
> +        s->nsrc = nsrc;
> +        nsrc->gfd.events = G_IO_IN;

Please drop.

> +        nc->info->bind_ctx(nc, NULL);
> +
>      }
>      return s;
>  }
> @@ -473,30 +515,69 @@ static NetSocketState *net_socket_fd_init(NetClientState *peer,
>      return NULL;
>  }
>  
> -static void net_socket_accept(void *opaque)
> +static gboolean net_socket_establish_handler(gpointer data)
> +{
> +    EventGSource *nsrc = (EventGSource *)data;
> +    NetSocketState *s = nsrc->opaque;
> +
> +    if (nsrc->gfd.revents & G_IO_IN) {
> +        net_socket_send(s);
> +    } else {
> +        net_socket_writable(s);
> +    }
> +    return true;
> +}
> +
> +static bool readable(void *opaque)
>  {
>      NetSocketState *s = opaque;
> +
> +    if (s->read_poll && net_socket_can_send(s)) {
> +        return true;
> +    }
> +    return false;
> +}
> +
> +static bool writable(void *opaque)
> +{
> +    NetSocketState *s = opaque;
> +
> +    if (s->write_poll) {
> +        return true;
> +    }
> +    return false;
> +}
> +
> +static gboolean net_socket_listen_handler(gpointer data)
> +{
> +    EventGSource *new_nsrc, *nsrc = data;
> +    NetSocketState *s = nsrc->opaque;
>      struct sockaddr_in saddr;
>      socklen_t len;
>      int fd;
>  
> -    for(;;) {
> -        len = sizeof(saddr);
> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
> -        if (fd < 0 && errno != EINTR) {
> -            return;
> -        } else if (fd >= 0) {
> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
> -            break;
> -        }
> +    len = sizeof(saddr);
> +    fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
> +    if (fd < 0 && errno != EINTR) {
> +        return false;
>      }
>  
>      s->fd = fd;
>      s->nc.link_down = false;
> -    net_socket_connect(s);
> +    new_nsrc = event_source_new(fd, net_socket_establish_handler, s);
> +    s->nsrc = new_nsrc;
> +    new_nsrc->gfd.events = G_IO_IN|G_IO_OUT;

Please drop.

> +    new_nsrc->readable = readable;
> +    new_nsrc->writable = writable;
> +    /* prevent more than one connect req */
> +    g_source_destroy(&nsrc->source);
> +    s->nc.info->bind_ctx(&s->nc, NULL);
> +    net_socket_read_poll(s, true);
>      snprintf(s->nc.info_str, sizeof(s->nc.info_str),
>               "socket: connection from %s:%d",
>               inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port));
> +
> +    return true;
>  }
>  
>  static int net_socket_listen_init(NetClientState *peer,
> @@ -508,6 +589,7 @@ static int net_socket_listen_init(NetClientState *peer,
>      NetSocketState *s;
>      struct sockaddr_in saddr;
>      int fd, val, ret;
> +    EventGSource *nsrc;
>  
>      if (parse_host_port(&saddr, host_str) < 0)
>          return -1;
> @@ -542,7 +624,11 @@ static int net_socket_listen_init(NetClientState *peer,
>      s->listen_fd = fd;
>      s->nc.link_down = true;
>  
> -    qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
> +    nsrc = event_source_new(fd, net_socket_listen_handler, s);
> +    s->nsrc = nsrc;
> +    nsrc->gfd.events = G_IO_IN;

Please drop.
pingfan liu April 19, 2013, 5:58 a.m. UTC | #2
On Thu, Apr 18, 2013 at 10:34 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote:
>> @@ -160,7 +154,13 @@ static void net_socket_send(void *opaque)
>>          net_socket_read_poll(s, false);
>>          net_socket_write_poll(s, false);
>>          if (s->listen_fd != -1) {
>> -            qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
>> +            nsrc = s->nsrc;
>> +            new_nsrc = event_source_new(s->listen_fd, net_socket_listen_handler,
>> +                                s);
>> +            s->nsrc = new_nsrc;
>> +            new_nsrc->gfd.events = G_IO_IN;
>> +            g_source_destroy(&nsrc->source);
>> +            s->nc.info->bind_ctx(&s->nc, NULL);
>
> The following is equivalent:
>
> event_source_release(s->nsrc);
> s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, s);
> s->nc.info->bind_ctx(&s->nc, NULL);
>
> Then new_nsrc/nsrc can be dropped and the nsrc memory leak is avoided.
>
Apply here and the following same issues.
> Note that gfd.events = G_IO_IN does not get used since prepare()
> overwrites gfd.events.  Please drop and make sure read_poll == true.
>
Apply,
> I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR.  Perhaps
> disconnect and network errors will be ignored.
>
NetSocketState can do limited things about these situation, perhaps,
implement net_socket_can_receive() to export such message to frontend
?

Pingfan
>>          }
>>          closesocket(s->fd);
>>
>> @@ -331,6 +331,14 @@ static void net_socket_cleanup(NetClientState *nc)
>>          closesocket(s->listen_fd);
>>          s->listen_fd = -1;
>>      }
>> +    event_source_release(s->nsrc);
>> +}
>> +
>> +static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx)
>> +{
>> +    NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
>> +
>> +    g_source_attach(&s->nsrc->source, ctx);
>>  }
>>
>>  static NetClientInfo net_dgram_socket_info = {
>> @@ -338,8 +346,22 @@ static NetClientInfo net_dgram_socket_info = {
>>      .size = sizeof(NetSocketState),
>>      .receive = net_socket_receive_dgram,
>>      .cleanup = net_socket_cleanup,
>> +    .bind_ctx = net_socket_bind_ctx,
>>  };
>>
>> +static gboolean net_socket_dgram_handler(gpointer data)
>> +{
>> +    EventGSource *nsrc = (EventGSource *)data;
>> +    NetSocketState *s = nsrc->opaque;
>> +
>> +    if (nsrc->gfd.revents & G_IO_IN) {
>> +        net_socket_send_dgram(s);
>> +    } else {
>> +        net_socket_writable(s);
>> +    }
>> +    return true;
>> +}
>> +
>>  static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
>>                                                  const char *model,
>>                                                  const char *name,
>> @@ -350,6 +372,7 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
>>      socklen_t saddr_len;
>>      NetClientState *nc;
>>      NetSocketState *s;
>> +    EventGSource *nsrc;
>>
>>      /* fd passed: multicast: "learn" dgram_dst address from bound address and save it
>>       * Because this may be "shared" socket from a "master" process, datagrams would be recv()
>> @@ -393,7 +416,10 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
>>
>>      s->fd = fd;
>>      s->listen_fd = -1;
>> -    s->send_fn = net_socket_send_dgram;
>> +    nsrc = event_source_new(fd, net_socket_dgram_handler, s);
>> +    s->nsrc = nsrc;
>> +    nsrc->gfd.events = G_IO_IN|G_IO_OUT;
>
> Please drop.
>
>> +    nc->info->bind_ctx(nc, NULL);
>>      net_socket_read_poll(s, true);
>>
>>      /* mcast: save bound address as dst */
>> @@ -408,20 +434,28 @@ err:
>>      return NULL;
>>  }
>>
>> -static void net_socket_connect(void *opaque)
>> -{
>> -    NetSocketState *s = opaque;
>> -    s->send_fn = net_socket_send;
>> -    net_socket_read_poll(s, true);
>> -}
>> -
>>  static NetClientInfo net_socket_info = {
>>      .type = NET_CLIENT_OPTIONS_KIND_SOCKET,
>>      .size = sizeof(NetSocketState),
>>      .receive = net_socket_receive,
>>      .cleanup = net_socket_cleanup,
>> +    .bind_ctx = net_socket_bind_ctx,
>>  };
>>
>> +static gboolean net_socket_connect_handler(gpointer data)
>> +{
>> +    EventGSource *new_nsrc, *nsrc = data;
>> +    NetSocketState *s = nsrc->opaque;
>> +
>> +    new_nsrc = event_source_new(s->fd, net_socket_establish_handler, s);
>> +    s->nsrc = new_nsrc;
>> +    new_nsrc->gfd.events = G_IO_IN|G_IO_OUT;
>
> Please drop.
>
>> +    g_source_destroy(&nsrc->source);
>> +    s->nc.info->bind_ctx(&s->nc, NULL);
>> +
>> +    return true;
>> +}
>> +
>>  static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>>                                                   const char *model,
>>                                                   const char *name,
>> @@ -429,6 +463,7 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>>  {
>>      NetClientState *nc;
>>      NetSocketState *s;
>> +    EventGSource *nsrc;
>>
>>      nc = qemu_new_net_client(&net_socket_info, peer, model, name);
>>
>> @@ -440,9 +475,16 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
>>      s->listen_fd = -1;
>>
>>      if (is_connected) {
>> -        net_socket_connect(s);
>> +        nsrc = event_source_new(fd, net_socket_establish_handler, s);
>> +        s->nsrc = nsrc;
>> +        nsrc->gfd.events = G_IO_IN|G_IO_OUT;
>
> Please drop.
>
>> +        nc->info->bind_ctx(nc, NULL);
>>      } else {
>> -        qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
>> +        nsrc = event_source_new(fd, net_socket_connect_handler, s);
>> +        s->nsrc = nsrc;
>> +        nsrc->gfd.events = G_IO_IN;
>
> Please drop.
>
>> +        nc->info->bind_ctx(nc, NULL);
>> +
>>      }
>>      return s;
>>  }
>> @@ -473,30 +515,69 @@ static NetSocketState *net_socket_fd_init(NetClientState *peer,
>>      return NULL;
>>  }
>>
>> -static void net_socket_accept(void *opaque)
>> +static gboolean net_socket_establish_handler(gpointer data)
>> +{
>> +    EventGSource *nsrc = (EventGSource *)data;
>> +    NetSocketState *s = nsrc->opaque;
>> +
>> +    if (nsrc->gfd.revents & G_IO_IN) {
>> +        net_socket_send(s);
>> +    } else {
>> +        net_socket_writable(s);
>> +    }
>> +    return true;
>> +}
>> +
>> +static bool readable(void *opaque)
>>  {
>>      NetSocketState *s = opaque;
>> +
>> +    if (s->read_poll && net_socket_can_send(s)) {
>> +        return true;
>> +    }
>> +    return false;
>> +}
>> +
>> +static bool writable(void *opaque)
>> +{
>> +    NetSocketState *s = opaque;
>> +
>> +    if (s->write_poll) {
>> +        return true;
>> +    }
>> +    return false;
>> +}
>> +
>> +static gboolean net_socket_listen_handler(gpointer data)
>> +{
>> +    EventGSource *new_nsrc, *nsrc = data;
>> +    NetSocketState *s = nsrc->opaque;
>>      struct sockaddr_in saddr;
>>      socklen_t len;
>>      int fd;
>>
>> -    for(;;) {
>> -        len = sizeof(saddr);
>> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>> -        if (fd < 0 && errno != EINTR) {
>> -            return;
>> -        } else if (fd >= 0) {
>> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>> -            break;
>> -        }
>> +    len = sizeof(saddr);
>> +    fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>> +    if (fd < 0 && errno != EINTR) {
>> +        return false;
>>      }
>>
>>      s->fd = fd;
>>      s->nc.link_down = false;
>> -    net_socket_connect(s);
>> +    new_nsrc = event_source_new(fd, net_socket_establish_handler, s);
>> +    s->nsrc = new_nsrc;
>> +    new_nsrc->gfd.events = G_IO_IN|G_IO_OUT;
>
> Please drop.
>
>> +    new_nsrc->readable = readable;
>> +    new_nsrc->writable = writable;
>> +    /* prevent more than one connect req */
>> +    g_source_destroy(&nsrc->source);
>> +    s->nc.info->bind_ctx(&s->nc, NULL);
>> +    net_socket_read_poll(s, true);
>>      snprintf(s->nc.info_str, sizeof(s->nc.info_str),
>>               "socket: connection from %s:%d",
>>               inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port));
>> +
>> +    return true;
>>  }
>>
>>  static int net_socket_listen_init(NetClientState *peer,
>> @@ -508,6 +589,7 @@ static int net_socket_listen_init(NetClientState *peer,
>>      NetSocketState *s;
>>      struct sockaddr_in saddr;
>>      int fd, val, ret;
>> +    EventGSource *nsrc;
>>
>>      if (parse_host_port(&saddr, host_str) < 0)
>>          return -1;
>> @@ -542,7 +624,11 @@ static int net_socket_listen_init(NetClientState *peer,
>>      s->listen_fd = fd;
>>      s->nc.link_down = true;
>>
>> -    qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
>> +    nsrc = event_source_new(fd, net_socket_listen_handler, s);
>> +    s->nsrc = nsrc;
>> +    nsrc->gfd.events = G_IO_IN;
>
> Please drop.
Stefan Hajnoczi April 19, 2013, 12:03 p.m. UTC | #3
On Fri, Apr 19, 2013 at 01:58:40PM +0800, liu ping fan wrote:
> On Thu, Apr 18, 2013 at 10:34 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> > On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote:
> > I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR.  Perhaps
> > disconnect and network errors will be ignored.
> >
> NetSocketState can do limited things about these situation, perhaps,
> implement net_socket_can_receive() to export such message to frontend
> ?

net/socket.c uses G_IO_HUP | G_IO_ERR today, see
iohandler.c:qemu_iohandler_fill().

This patch *stops* using them.  My question is: why stop and is it
correct?

Stefan
pingfan liu April 22, 2013, 7:52 a.m. UTC | #4
On Fri, Apr 19, 2013 at 8:03 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> On Fri, Apr 19, 2013 at 01:58:40PM +0800, liu ping fan wrote:
>> On Thu, Apr 18, 2013 at 10:34 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> > On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote:
>> > I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR.  Perhaps
>> > disconnect and network errors will be ignored.
>> >
>> NetSocketState can do limited things about these situation, perhaps,
>> implement net_socket_can_receive() to export such message to frontend
>> ?
>
> net/socket.c uses G_IO_HUP | G_IO_ERR today, see
> iohandler.c:qemu_iohandler_fill().
>
> This patch *stops* using them.  My question is: why stop and is it
> correct?
>
No, it is not.  It leaves dead file descriptors unhandled.  Will fix
up in next version

> Stefan
diff mbox

Patch

diff --git a/net/socket.c b/net/socket.c
index 396dc8c..bdd5dc0 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -31,6 +31,8 @@ 
 #include "qemu/option.h"
 #include "qemu/sockets.h"
 #include "qemu/iov.h"
+#include "util/event_gsource.h"
+
 
 typedef struct NetSocketState {
     NetClientState nc;
@@ -42,13 +44,15 @@  typedef struct NetSocketState {
     unsigned int send_index;      /* number of bytes sent (only SOCK_STREAM) */
     uint8_t buf[4096];
     struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */
-    IOHandler *send_fn;           /* differs between SOCK_STREAM/SOCK_DGRAM */
     bool read_poll;               /* waiting to receive data? */
     bool write_poll;              /* waiting to transmit data? */
+    EventGSource *nsrc;
 } NetSocketState;
 
-static void net_socket_accept(void *opaque);
 static void net_socket_writable(void *opaque);
+static gboolean net_socket_listen_handler(gpointer data);
+static gboolean net_socket_establish_handler(gpointer data);
+
 
 /* Only read packets from socket when peer can receive them */
 static int net_socket_can_send(void *opaque)
@@ -58,25 +62,14 @@  static int net_socket_can_send(void *opaque)
     return qemu_can_send_packet(&s->nc);
 }
 
-static void net_socket_update_fd_handler(NetSocketState *s)
-{
-    qemu_set_fd_handler2(s->fd,
-                         s->read_poll  ? net_socket_can_send : NULL,
-                         s->read_poll  ? s->send_fn : NULL,
-                         s->write_poll ? net_socket_writable : NULL,
-                         s);
-}
-
 static void net_socket_read_poll(NetSocketState *s, bool enable)
 {
     s->read_poll = enable;
-    net_socket_update_fd_handler(s);
 }
 
 static void net_socket_write_poll(NetSocketState *s, bool enable)
 {
     s->write_poll = enable;
-    net_socket_update_fd_handler(s);
 }
 
 static void net_socket_writable(void *opaque)
@@ -148,6 +141,7 @@  static void net_socket_send(void *opaque)
     unsigned l;
     uint8_t buf1[4096];
     const uint8_t *buf;
+    EventGSource *new_nsrc, *nsrc;
 
     size = qemu_recv(s->fd, buf1, sizeof(buf1), 0);
     if (size < 0) {
@@ -160,7 +154,13 @@  static void net_socket_send(void *opaque)
         net_socket_read_poll(s, false);
         net_socket_write_poll(s, false);
         if (s->listen_fd != -1) {
-            qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
+            nsrc = s->nsrc;
+            new_nsrc = event_source_new(s->listen_fd, net_socket_listen_handler,
+                                s);
+            s->nsrc = new_nsrc;
+            new_nsrc->gfd.events = G_IO_IN;
+            g_source_destroy(&nsrc->source);
+            s->nc.info->bind_ctx(&s->nc, NULL);
         }
         closesocket(s->fd);
 
@@ -331,6 +331,14 @@  static void net_socket_cleanup(NetClientState *nc)
         closesocket(s->listen_fd);
         s->listen_fd = -1;
     }
+    event_source_release(s->nsrc);
+}
+
+static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx)
+{
+    NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc);
+
+    g_source_attach(&s->nsrc->source, ctx);
 }
 
 static NetClientInfo net_dgram_socket_info = {
@@ -338,8 +346,22 @@  static NetClientInfo net_dgram_socket_info = {
     .size = sizeof(NetSocketState),
     .receive = net_socket_receive_dgram,
     .cleanup = net_socket_cleanup,
+    .bind_ctx = net_socket_bind_ctx,
 };
 
+static gboolean net_socket_dgram_handler(gpointer data)
+{
+    EventGSource *nsrc = (EventGSource *)data;
+    NetSocketState *s = nsrc->opaque;
+
+    if (nsrc->gfd.revents & G_IO_IN) {
+        net_socket_send_dgram(s);
+    } else {
+        net_socket_writable(s);
+    }
+    return true;
+}
+
 static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
                                                 const char *model,
                                                 const char *name,
@@ -350,6 +372,7 @@  static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
     socklen_t saddr_len;
     NetClientState *nc;
     NetSocketState *s;
+    EventGSource *nsrc;
 
     /* fd passed: multicast: "learn" dgram_dst address from bound address and save it
      * Because this may be "shared" socket from a "master" process, datagrams would be recv()
@@ -393,7 +416,10 @@  static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer,
 
     s->fd = fd;
     s->listen_fd = -1;
-    s->send_fn = net_socket_send_dgram;
+    nsrc = event_source_new(fd, net_socket_dgram_handler, s);
+    s->nsrc = nsrc;
+    nsrc->gfd.events = G_IO_IN|G_IO_OUT;
+    nc->info->bind_ctx(nc, NULL);
     net_socket_read_poll(s, true);
 
     /* mcast: save bound address as dst */
@@ -408,20 +434,28 @@  err:
     return NULL;
 }
 
-static void net_socket_connect(void *opaque)
-{
-    NetSocketState *s = opaque;
-    s->send_fn = net_socket_send;
-    net_socket_read_poll(s, true);
-}
-
 static NetClientInfo net_socket_info = {
     .type = NET_CLIENT_OPTIONS_KIND_SOCKET,
     .size = sizeof(NetSocketState),
     .receive = net_socket_receive,
     .cleanup = net_socket_cleanup,
+    .bind_ctx = net_socket_bind_ctx,
 };
 
+static gboolean net_socket_connect_handler(gpointer data)
+{
+    EventGSource *new_nsrc, *nsrc = data;
+    NetSocketState *s = nsrc->opaque;
+
+    new_nsrc = event_source_new(s->fd, net_socket_establish_handler, s);
+    s->nsrc = new_nsrc;
+    new_nsrc->gfd.events = G_IO_IN|G_IO_OUT;
+    g_source_destroy(&nsrc->source);
+    s->nc.info->bind_ctx(&s->nc, NULL);
+
+    return true;
+}
+
 static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
                                                  const char *model,
                                                  const char *name,
@@ -429,6 +463,7 @@  static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
 {
     NetClientState *nc;
     NetSocketState *s;
+    EventGSource *nsrc;
 
     nc = qemu_new_net_client(&net_socket_info, peer, model, name);
 
@@ -440,9 +475,16 @@  static NetSocketState *net_socket_fd_init_stream(NetClientState *peer,
     s->listen_fd = -1;
 
     if (is_connected) {
-        net_socket_connect(s);
+        nsrc = event_source_new(fd, net_socket_establish_handler, s);
+        s->nsrc = nsrc;
+        nsrc->gfd.events = G_IO_IN|G_IO_OUT;
+        nc->info->bind_ctx(nc, NULL);
     } else {
-        qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s);
+        nsrc = event_source_new(fd, net_socket_connect_handler, s);
+        s->nsrc = nsrc;
+        nsrc->gfd.events = G_IO_IN;
+        nc->info->bind_ctx(nc, NULL);
+
     }
     return s;
 }
@@ -473,30 +515,69 @@  static NetSocketState *net_socket_fd_init(NetClientState *peer,
     return NULL;
 }
 
-static void net_socket_accept(void *opaque)
+static gboolean net_socket_establish_handler(gpointer data)
+{
+    EventGSource *nsrc = (EventGSource *)data;
+    NetSocketState *s = nsrc->opaque;
+
+    if (nsrc->gfd.revents & G_IO_IN) {
+        net_socket_send(s);
+    } else {
+        net_socket_writable(s);
+    }
+    return true;
+}
+
+static bool readable(void *opaque)
 {
     NetSocketState *s = opaque;
+
+    if (s->read_poll && net_socket_can_send(s)) {
+        return true;
+    }
+    return false;
+}
+
+static bool writable(void *opaque)
+{
+    NetSocketState *s = opaque;
+
+    if (s->write_poll) {
+        return true;
+    }
+    return false;
+}
+
+static gboolean net_socket_listen_handler(gpointer data)
+{
+    EventGSource *new_nsrc, *nsrc = data;
+    NetSocketState *s = nsrc->opaque;
     struct sockaddr_in saddr;
     socklen_t len;
     int fd;
 
-    for(;;) {
-        len = sizeof(saddr);
-        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
-        if (fd < 0 && errno != EINTR) {
-            return;
-        } else if (fd >= 0) {
-            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
-            break;
-        }
+    len = sizeof(saddr);
+    fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
+    if (fd < 0 && errno != EINTR) {
+        return false;
     }
 
     s->fd = fd;
     s->nc.link_down = false;
-    net_socket_connect(s);
+    new_nsrc = event_source_new(fd, net_socket_establish_handler, s);
+    s->nsrc = new_nsrc;
+    new_nsrc->gfd.events = G_IO_IN|G_IO_OUT;
+    new_nsrc->readable = readable;
+    new_nsrc->writable = writable;
+    /* prevent more than one connect req */
+    g_source_destroy(&nsrc->source);
+    s->nc.info->bind_ctx(&s->nc, NULL);
+    net_socket_read_poll(s, true);
     snprintf(s->nc.info_str, sizeof(s->nc.info_str),
              "socket: connection from %s:%d",
              inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port));
+
+    return true;
 }
 
 static int net_socket_listen_init(NetClientState *peer,
@@ -508,6 +589,7 @@  static int net_socket_listen_init(NetClientState *peer,
     NetSocketState *s;
     struct sockaddr_in saddr;
     int fd, val, ret;
+    EventGSource *nsrc;
 
     if (parse_host_port(&saddr, host_str) < 0)
         return -1;
@@ -542,7 +624,11 @@  static int net_socket_listen_init(NetClientState *peer,
     s->listen_fd = fd;
     s->nc.link_down = true;
 
-    qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s);
+    nsrc = event_source_new(fd, net_socket_listen_handler, s);
+    s->nsrc = nsrc;
+    nsrc->gfd.events = G_IO_IN;
+    nc->info->bind_ctx(nc, NULL);
+
     return 0;
 }