Message ID | 1366187964-14265-7-git-send-email-qemulist@gmail.com |
---|---|
State | New |
Headers | show |
On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote: > @@ -160,7 +154,13 @@ static void net_socket_send(void *opaque) > net_socket_read_poll(s, false); > net_socket_write_poll(s, false); > if (s->listen_fd != -1) { > - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); > + nsrc = s->nsrc; > + new_nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, > + s); > + s->nsrc = new_nsrc; > + new_nsrc->gfd.events = G_IO_IN; > + g_source_destroy(&nsrc->source); > + s->nc.info->bind_ctx(&s->nc, NULL); The following is equivalent: event_source_release(s->nsrc); s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, s); s->nc.info->bind_ctx(&s->nc, NULL); Then new_nsrc/nsrc can be dropped and the nsrc memory leak is avoided. Note that gfd.events = G_IO_IN does not get used since prepare() overwrites gfd.events. Please drop and make sure read_poll == true. I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR. Perhaps disconnect and network errors will be ignored. > } > closesocket(s->fd); > > @@ -331,6 +331,14 @@ static void net_socket_cleanup(NetClientState *nc) > closesocket(s->listen_fd); > s->listen_fd = -1; > } > + event_source_release(s->nsrc); > +} > + > +static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx) > +{ > + NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc); > + > + g_source_attach(&s->nsrc->source, ctx); > } > > static NetClientInfo net_dgram_socket_info = { > @@ -338,8 +346,22 @@ static NetClientInfo net_dgram_socket_info = { > .size = sizeof(NetSocketState), > .receive = net_socket_receive_dgram, > .cleanup = net_socket_cleanup, > + .bind_ctx = net_socket_bind_ctx, > }; > > +static gboolean net_socket_dgram_handler(gpointer data) > +{ > + EventGSource *nsrc = (EventGSource *)data; > + NetSocketState *s = nsrc->opaque; > + > + if (nsrc->gfd.revents & G_IO_IN) { > + net_socket_send_dgram(s); > + } else { > + net_socket_writable(s); > + } > + return true; > +} > + > static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, > const char *model, > const char *name, > @@ -350,6 +372,7 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, > socklen_t saddr_len; > NetClientState *nc; > NetSocketState *s; > + EventGSource *nsrc; > > /* fd passed: multicast: "learn" dgram_dst address from bound address and save it > * Because this may be "shared" socket from a "master" process, datagrams would be recv() > @@ -393,7 +416,10 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, > > s->fd = fd; > s->listen_fd = -1; > - s->send_fn = net_socket_send_dgram; > + nsrc = event_source_new(fd, net_socket_dgram_handler, s); > + s->nsrc = nsrc; > + nsrc->gfd.events = G_IO_IN|G_IO_OUT; Please drop. > + nc->info->bind_ctx(nc, NULL); > net_socket_read_poll(s, true); > > /* mcast: save bound address as dst */ > @@ -408,20 +434,28 @@ err: > return NULL; > } > > -static void net_socket_connect(void *opaque) > -{ > - NetSocketState *s = opaque; > - s->send_fn = net_socket_send; > - net_socket_read_poll(s, true); > -} > - > static NetClientInfo net_socket_info = { > .type = NET_CLIENT_OPTIONS_KIND_SOCKET, > .size = sizeof(NetSocketState), > .receive = net_socket_receive, > .cleanup = net_socket_cleanup, > + .bind_ctx = net_socket_bind_ctx, > }; > > +static gboolean net_socket_connect_handler(gpointer data) > +{ > + EventGSource *new_nsrc, *nsrc = data; > + NetSocketState *s = nsrc->opaque; > + > + new_nsrc = event_source_new(s->fd, net_socket_establish_handler, s); > + s->nsrc = new_nsrc; > + new_nsrc->gfd.events = G_IO_IN|G_IO_OUT; Please drop. > + g_source_destroy(&nsrc->source); > + s->nc.info->bind_ctx(&s->nc, NULL); > + > + return true; > +} > + > static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, > const char *model, > const char *name, > @@ -429,6 +463,7 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, > { > NetClientState *nc; > NetSocketState *s; > + EventGSource *nsrc; > > nc = qemu_new_net_client(&net_socket_info, peer, model, name); > > @@ -440,9 +475,16 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, > s->listen_fd = -1; > > if (is_connected) { > - net_socket_connect(s); > + nsrc = event_source_new(fd, net_socket_establish_handler, s); > + s->nsrc = nsrc; > + nsrc->gfd.events = G_IO_IN|G_IO_OUT; Please drop. > + nc->info->bind_ctx(nc, NULL); > } else { > - qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s); > + nsrc = event_source_new(fd, net_socket_connect_handler, s); > + s->nsrc = nsrc; > + nsrc->gfd.events = G_IO_IN; Please drop. > + nc->info->bind_ctx(nc, NULL); > + > } > return s; > } > @@ -473,30 +515,69 @@ static NetSocketState *net_socket_fd_init(NetClientState *peer, > return NULL; > } > > -static void net_socket_accept(void *opaque) > +static gboolean net_socket_establish_handler(gpointer data) > +{ > + EventGSource *nsrc = (EventGSource *)data; > + NetSocketState *s = nsrc->opaque; > + > + if (nsrc->gfd.revents & G_IO_IN) { > + net_socket_send(s); > + } else { > + net_socket_writable(s); > + } > + return true; > +} > + > +static bool readable(void *opaque) > { > NetSocketState *s = opaque; > + > + if (s->read_poll && net_socket_can_send(s)) { > + return true; > + } > + return false; > +} > + > +static bool writable(void *opaque) > +{ > + NetSocketState *s = opaque; > + > + if (s->write_poll) { > + return true; > + } > + return false; > +} > + > +static gboolean net_socket_listen_handler(gpointer data) > +{ > + EventGSource *new_nsrc, *nsrc = data; > + NetSocketState *s = nsrc->opaque; > struct sockaddr_in saddr; > socklen_t len; > int fd; > > - for(;;) { > - len = sizeof(saddr); > - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); > - if (fd < 0 && errno != EINTR) { > - return; > - } else if (fd >= 0) { > - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL); > - break; > - } > + len = sizeof(saddr); > + fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); > + if (fd < 0 && errno != EINTR) { > + return false; > } > > s->fd = fd; > s->nc.link_down = false; > - net_socket_connect(s); > + new_nsrc = event_source_new(fd, net_socket_establish_handler, s); > + s->nsrc = new_nsrc; > + new_nsrc->gfd.events = G_IO_IN|G_IO_OUT; Please drop. > + new_nsrc->readable = readable; > + new_nsrc->writable = writable; > + /* prevent more than one connect req */ > + g_source_destroy(&nsrc->source); > + s->nc.info->bind_ctx(&s->nc, NULL); > + net_socket_read_poll(s, true); > snprintf(s->nc.info_str, sizeof(s->nc.info_str), > "socket: connection from %s:%d", > inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port)); > + > + return true; > } > > static int net_socket_listen_init(NetClientState *peer, > @@ -508,6 +589,7 @@ static int net_socket_listen_init(NetClientState *peer, > NetSocketState *s; > struct sockaddr_in saddr; > int fd, val, ret; > + EventGSource *nsrc; > > if (parse_host_port(&saddr, host_str) < 0) > return -1; > @@ -542,7 +624,11 @@ static int net_socket_listen_init(NetClientState *peer, > s->listen_fd = fd; > s->nc.link_down = true; > > - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); > + nsrc = event_source_new(fd, net_socket_listen_handler, s); > + s->nsrc = nsrc; > + nsrc->gfd.events = G_IO_IN; Please drop.
On Thu, Apr 18, 2013 at 10:34 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote: > On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote: >> @@ -160,7 +154,13 @@ static void net_socket_send(void *opaque) >> net_socket_read_poll(s, false); >> net_socket_write_poll(s, false); >> if (s->listen_fd != -1) { >> - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); >> + nsrc = s->nsrc; >> + new_nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, >> + s); >> + s->nsrc = new_nsrc; >> + new_nsrc->gfd.events = G_IO_IN; >> + g_source_destroy(&nsrc->source); >> + s->nc.info->bind_ctx(&s->nc, NULL); > > The following is equivalent: > > event_source_release(s->nsrc); > s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, s); > s->nc.info->bind_ctx(&s->nc, NULL); > > Then new_nsrc/nsrc can be dropped and the nsrc memory leak is avoided. > Apply here and the following same issues. > Note that gfd.events = G_IO_IN does not get used since prepare() > overwrites gfd.events. Please drop and make sure read_poll == true. > Apply, > I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR. Perhaps > disconnect and network errors will be ignored. > NetSocketState can do limited things about these situation, perhaps, implement net_socket_can_receive() to export such message to frontend ? Pingfan >> } >> closesocket(s->fd); >> >> @@ -331,6 +331,14 @@ static void net_socket_cleanup(NetClientState *nc) >> closesocket(s->listen_fd); >> s->listen_fd = -1; >> } >> + event_source_release(s->nsrc); >> +} >> + >> +static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx) >> +{ >> + NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc); >> + >> + g_source_attach(&s->nsrc->source, ctx); >> } >> >> static NetClientInfo net_dgram_socket_info = { >> @@ -338,8 +346,22 @@ static NetClientInfo net_dgram_socket_info = { >> .size = sizeof(NetSocketState), >> .receive = net_socket_receive_dgram, >> .cleanup = net_socket_cleanup, >> + .bind_ctx = net_socket_bind_ctx, >> }; >> >> +static gboolean net_socket_dgram_handler(gpointer data) >> +{ >> + EventGSource *nsrc = (EventGSource *)data; >> + NetSocketState *s = nsrc->opaque; >> + >> + if (nsrc->gfd.revents & G_IO_IN) { >> + net_socket_send_dgram(s); >> + } else { >> + net_socket_writable(s); >> + } >> + return true; >> +} >> + >> static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, >> const char *model, >> const char *name, >> @@ -350,6 +372,7 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, >> socklen_t saddr_len; >> NetClientState *nc; >> NetSocketState *s; >> + EventGSource *nsrc; >> >> /* fd passed: multicast: "learn" dgram_dst address from bound address and save it >> * Because this may be "shared" socket from a "master" process, datagrams would be recv() >> @@ -393,7 +416,10 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, >> >> s->fd = fd; >> s->listen_fd = -1; >> - s->send_fn = net_socket_send_dgram; >> + nsrc = event_source_new(fd, net_socket_dgram_handler, s); >> + s->nsrc = nsrc; >> + nsrc->gfd.events = G_IO_IN|G_IO_OUT; > > Please drop. > >> + nc->info->bind_ctx(nc, NULL); >> net_socket_read_poll(s, true); >> >> /* mcast: save bound address as dst */ >> @@ -408,20 +434,28 @@ err: >> return NULL; >> } >> >> -static void net_socket_connect(void *opaque) >> -{ >> - NetSocketState *s = opaque; >> - s->send_fn = net_socket_send; >> - net_socket_read_poll(s, true); >> -} >> - >> static NetClientInfo net_socket_info = { >> .type = NET_CLIENT_OPTIONS_KIND_SOCKET, >> .size = sizeof(NetSocketState), >> .receive = net_socket_receive, >> .cleanup = net_socket_cleanup, >> + .bind_ctx = net_socket_bind_ctx, >> }; >> >> +static gboolean net_socket_connect_handler(gpointer data) >> +{ >> + EventGSource *new_nsrc, *nsrc = data; >> + NetSocketState *s = nsrc->opaque; >> + >> + new_nsrc = event_source_new(s->fd, net_socket_establish_handler, s); >> + s->nsrc = new_nsrc; >> + new_nsrc->gfd.events = G_IO_IN|G_IO_OUT; > > Please drop. > >> + g_source_destroy(&nsrc->source); >> + s->nc.info->bind_ctx(&s->nc, NULL); >> + >> + return true; >> +} >> + >> static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, >> const char *model, >> const char *name, >> @@ -429,6 +463,7 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, >> { >> NetClientState *nc; >> NetSocketState *s; >> + EventGSource *nsrc; >> >> nc = qemu_new_net_client(&net_socket_info, peer, model, name); >> >> @@ -440,9 +475,16 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, >> s->listen_fd = -1; >> >> if (is_connected) { >> - net_socket_connect(s); >> + nsrc = event_source_new(fd, net_socket_establish_handler, s); >> + s->nsrc = nsrc; >> + nsrc->gfd.events = G_IO_IN|G_IO_OUT; > > Please drop. > >> + nc->info->bind_ctx(nc, NULL); >> } else { >> - qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s); >> + nsrc = event_source_new(fd, net_socket_connect_handler, s); >> + s->nsrc = nsrc; >> + nsrc->gfd.events = G_IO_IN; > > Please drop. > >> + nc->info->bind_ctx(nc, NULL); >> + >> } >> return s; >> } >> @@ -473,30 +515,69 @@ static NetSocketState *net_socket_fd_init(NetClientState *peer, >> return NULL; >> } >> >> -static void net_socket_accept(void *opaque) >> +static gboolean net_socket_establish_handler(gpointer data) >> +{ >> + EventGSource *nsrc = (EventGSource *)data; >> + NetSocketState *s = nsrc->opaque; >> + >> + if (nsrc->gfd.revents & G_IO_IN) { >> + net_socket_send(s); >> + } else { >> + net_socket_writable(s); >> + } >> + return true; >> +} >> + >> +static bool readable(void *opaque) >> { >> NetSocketState *s = opaque; >> + >> + if (s->read_poll && net_socket_can_send(s)) { >> + return true; >> + } >> + return false; >> +} >> + >> +static bool writable(void *opaque) >> +{ >> + NetSocketState *s = opaque; >> + >> + if (s->write_poll) { >> + return true; >> + } >> + return false; >> +} >> + >> +static gboolean net_socket_listen_handler(gpointer data) >> +{ >> + EventGSource *new_nsrc, *nsrc = data; >> + NetSocketState *s = nsrc->opaque; >> struct sockaddr_in saddr; >> socklen_t len; >> int fd; >> >> - for(;;) { >> - len = sizeof(saddr); >> - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); >> - if (fd < 0 && errno != EINTR) { >> - return; >> - } else if (fd >= 0) { >> - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL); >> - break; >> - } >> + len = sizeof(saddr); >> + fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); >> + if (fd < 0 && errno != EINTR) { >> + return false; >> } >> >> s->fd = fd; >> s->nc.link_down = false; >> - net_socket_connect(s); >> + new_nsrc = event_source_new(fd, net_socket_establish_handler, s); >> + s->nsrc = new_nsrc; >> + new_nsrc->gfd.events = G_IO_IN|G_IO_OUT; > > Please drop. > >> + new_nsrc->readable = readable; >> + new_nsrc->writable = writable; >> + /* prevent more than one connect req */ >> + g_source_destroy(&nsrc->source); >> + s->nc.info->bind_ctx(&s->nc, NULL); >> + net_socket_read_poll(s, true); >> snprintf(s->nc.info_str, sizeof(s->nc.info_str), >> "socket: connection from %s:%d", >> inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port)); >> + >> + return true; >> } >> >> static int net_socket_listen_init(NetClientState *peer, >> @@ -508,6 +589,7 @@ static int net_socket_listen_init(NetClientState *peer, >> NetSocketState *s; >> struct sockaddr_in saddr; >> int fd, val, ret; >> + EventGSource *nsrc; >> >> if (parse_host_port(&saddr, host_str) < 0) >> return -1; >> @@ -542,7 +624,11 @@ static int net_socket_listen_init(NetClientState *peer, >> s->listen_fd = fd; >> s->nc.link_down = true; >> >> - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); >> + nsrc = event_source_new(fd, net_socket_listen_handler, s); >> + s->nsrc = nsrc; >> + nsrc->gfd.events = G_IO_IN; > > Please drop.
On Fri, Apr 19, 2013 at 01:58:40PM +0800, liu ping fan wrote: > On Thu, Apr 18, 2013 at 10:34 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote: > > On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote: > > I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR. Perhaps > > disconnect and network errors will be ignored. > > > NetSocketState can do limited things about these situation, perhaps, > implement net_socket_can_receive() to export such message to frontend > ? net/socket.c uses G_IO_HUP | G_IO_ERR today, see iohandler.c:qemu_iohandler_fill(). This patch *stops* using them. My question is: why stop and is it correct? Stefan
On Fri, Apr 19, 2013 at 8:03 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > On Fri, Apr 19, 2013 at 01:58:40PM +0800, liu ping fan wrote: >> On Thu, Apr 18, 2013 at 10:34 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote: >> > On Wed, Apr 17, 2013 at 04:39:15PM +0800, Liu Ping Fan wrote: >> > I'm a little worried that we're lacking G_IO_HUP | G_IO_ERR. Perhaps >> > disconnect and network errors will be ignored. >> > >> NetSocketState can do limited things about these situation, perhaps, >> implement net_socket_can_receive() to export such message to frontend >> ? > > net/socket.c uses G_IO_HUP | G_IO_ERR today, see > iohandler.c:qemu_iohandler_fill(). > > This patch *stops* using them. My question is: why stop and is it > correct? > No, it is not. It leaves dead file descriptors unhandled. Will fix up in next version > Stefan
diff --git a/net/socket.c b/net/socket.c index 396dc8c..bdd5dc0 100644 --- a/net/socket.c +++ b/net/socket.c @@ -31,6 +31,8 @@ #include "qemu/option.h" #include "qemu/sockets.h" #include "qemu/iov.h" +#include "util/event_gsource.h" + typedef struct NetSocketState { NetClientState nc; @@ -42,13 +44,15 @@ typedef struct NetSocketState { unsigned int send_index; /* number of bytes sent (only SOCK_STREAM) */ uint8_t buf[4096]; struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */ - IOHandler *send_fn; /* differs between SOCK_STREAM/SOCK_DGRAM */ bool read_poll; /* waiting to receive data? */ bool write_poll; /* waiting to transmit data? */ + EventGSource *nsrc; } NetSocketState; -static void net_socket_accept(void *opaque); static void net_socket_writable(void *opaque); +static gboolean net_socket_listen_handler(gpointer data); +static gboolean net_socket_establish_handler(gpointer data); + /* Only read packets from socket when peer can receive them */ static int net_socket_can_send(void *opaque) @@ -58,25 +62,14 @@ static int net_socket_can_send(void *opaque) return qemu_can_send_packet(&s->nc); } -static void net_socket_update_fd_handler(NetSocketState *s) -{ - qemu_set_fd_handler2(s->fd, - s->read_poll ? net_socket_can_send : NULL, - s->read_poll ? s->send_fn : NULL, - s->write_poll ? net_socket_writable : NULL, - s); -} - static void net_socket_read_poll(NetSocketState *s, bool enable) { s->read_poll = enable; - net_socket_update_fd_handler(s); } static void net_socket_write_poll(NetSocketState *s, bool enable) { s->write_poll = enable; - net_socket_update_fd_handler(s); } static void net_socket_writable(void *opaque) @@ -148,6 +141,7 @@ static void net_socket_send(void *opaque) unsigned l; uint8_t buf1[4096]; const uint8_t *buf; + EventGSource *new_nsrc, *nsrc; size = qemu_recv(s->fd, buf1, sizeof(buf1), 0); if (size < 0) { @@ -160,7 +154,13 @@ static void net_socket_send(void *opaque) net_socket_read_poll(s, false); net_socket_write_poll(s, false); if (s->listen_fd != -1) { - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); + nsrc = s->nsrc; + new_nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, + s); + s->nsrc = new_nsrc; + new_nsrc->gfd.events = G_IO_IN; + g_source_destroy(&nsrc->source); + s->nc.info->bind_ctx(&s->nc, NULL); } closesocket(s->fd); @@ -331,6 +331,14 @@ static void net_socket_cleanup(NetClientState *nc) closesocket(s->listen_fd); s->listen_fd = -1; } + event_source_release(s->nsrc); +} + +static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx) +{ + NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc); + + g_source_attach(&s->nsrc->source, ctx); } static NetClientInfo net_dgram_socket_info = { @@ -338,8 +346,22 @@ static NetClientInfo net_dgram_socket_info = { .size = sizeof(NetSocketState), .receive = net_socket_receive_dgram, .cleanup = net_socket_cleanup, + .bind_ctx = net_socket_bind_ctx, }; +static gboolean net_socket_dgram_handler(gpointer data) +{ + EventGSource *nsrc = (EventGSource *)data; + NetSocketState *s = nsrc->opaque; + + if (nsrc->gfd.revents & G_IO_IN) { + net_socket_send_dgram(s); + } else { + net_socket_writable(s); + } + return true; +} + static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, const char *model, const char *name, @@ -350,6 +372,7 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, socklen_t saddr_len; NetClientState *nc; NetSocketState *s; + EventGSource *nsrc; /* fd passed: multicast: "learn" dgram_dst address from bound address and save it * Because this may be "shared" socket from a "master" process, datagrams would be recv() @@ -393,7 +416,10 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, s->fd = fd; s->listen_fd = -1; - s->send_fn = net_socket_send_dgram; + nsrc = event_source_new(fd, net_socket_dgram_handler, s); + s->nsrc = nsrc; + nsrc->gfd.events = G_IO_IN|G_IO_OUT; + nc->info->bind_ctx(nc, NULL); net_socket_read_poll(s, true); /* mcast: save bound address as dst */ @@ -408,20 +434,28 @@ err: return NULL; } -static void net_socket_connect(void *opaque) -{ - NetSocketState *s = opaque; - s->send_fn = net_socket_send; - net_socket_read_poll(s, true); -} - static NetClientInfo net_socket_info = { .type = NET_CLIENT_OPTIONS_KIND_SOCKET, .size = sizeof(NetSocketState), .receive = net_socket_receive, .cleanup = net_socket_cleanup, + .bind_ctx = net_socket_bind_ctx, }; +static gboolean net_socket_connect_handler(gpointer data) +{ + EventGSource *new_nsrc, *nsrc = data; + NetSocketState *s = nsrc->opaque; + + new_nsrc = event_source_new(s->fd, net_socket_establish_handler, s); + s->nsrc = new_nsrc; + new_nsrc->gfd.events = G_IO_IN|G_IO_OUT; + g_source_destroy(&nsrc->source); + s->nc.info->bind_ctx(&s->nc, NULL); + + return true; +} + static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, const char *model, const char *name, @@ -429,6 +463,7 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, { NetClientState *nc; NetSocketState *s; + EventGSource *nsrc; nc = qemu_new_net_client(&net_socket_info, peer, model, name); @@ -440,9 +475,16 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, s->listen_fd = -1; if (is_connected) { - net_socket_connect(s); + nsrc = event_source_new(fd, net_socket_establish_handler, s); + s->nsrc = nsrc; + nsrc->gfd.events = G_IO_IN|G_IO_OUT; + nc->info->bind_ctx(nc, NULL); } else { - qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s); + nsrc = event_source_new(fd, net_socket_connect_handler, s); + s->nsrc = nsrc; + nsrc->gfd.events = G_IO_IN; + nc->info->bind_ctx(nc, NULL); + } return s; } @@ -473,30 +515,69 @@ static NetSocketState *net_socket_fd_init(NetClientState *peer, return NULL; } -static void net_socket_accept(void *opaque) +static gboolean net_socket_establish_handler(gpointer data) +{ + EventGSource *nsrc = (EventGSource *)data; + NetSocketState *s = nsrc->opaque; + + if (nsrc->gfd.revents & G_IO_IN) { + net_socket_send(s); + } else { + net_socket_writable(s); + } + return true; +} + +static bool readable(void *opaque) { NetSocketState *s = opaque; + + if (s->read_poll && net_socket_can_send(s)) { + return true; + } + return false; +} + +static bool writable(void *opaque) +{ + NetSocketState *s = opaque; + + if (s->write_poll) { + return true; + } + return false; +} + +static gboolean net_socket_listen_handler(gpointer data) +{ + EventGSource *new_nsrc, *nsrc = data; + NetSocketState *s = nsrc->opaque; struct sockaddr_in saddr; socklen_t len; int fd; - for(;;) { - len = sizeof(saddr); - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); - if (fd < 0 && errno != EINTR) { - return; - } else if (fd >= 0) { - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL); - break; - } + len = sizeof(saddr); + fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); + if (fd < 0 && errno != EINTR) { + return false; } s->fd = fd; s->nc.link_down = false; - net_socket_connect(s); + new_nsrc = event_source_new(fd, net_socket_establish_handler, s); + s->nsrc = new_nsrc; + new_nsrc->gfd.events = G_IO_IN|G_IO_OUT; + new_nsrc->readable = readable; + new_nsrc->writable = writable; + /* prevent more than one connect req */ + g_source_destroy(&nsrc->source); + s->nc.info->bind_ctx(&s->nc, NULL); + net_socket_read_poll(s, true); snprintf(s->nc.info_str, sizeof(s->nc.info_str), "socket: connection from %s:%d", inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port)); + + return true; } static int net_socket_listen_init(NetClientState *peer, @@ -508,6 +589,7 @@ static int net_socket_listen_init(NetClientState *peer, NetSocketState *s; struct sockaddr_in saddr; int fd, val, ret; + EventGSource *nsrc; if (parse_host_port(&saddr, host_str) < 0) return -1; @@ -542,7 +624,11 @@ static int net_socket_listen_init(NetClientState *peer, s->listen_fd = fd; s->nc.link_down = true; - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); + nsrc = event_source_new(fd, net_socket_listen_handler, s); + s->nsrc = nsrc; + nsrc->gfd.events = G_IO_IN; + nc->info->bind_ctx(nc, NULL); + return 0; }