Message ID | 1366944455-14239-6-git-send-email-qemulist@gmail.com |
---|---|
State | New |
Headers | show |
On Fri, Apr 26, 2013 at 10:47:26AM +0800, Liu Ping Fan wrote: > @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, > return ret; > } > > +static gushort socket_connecting_readable(void *opaque) > +{ > + return G_IO_IN; > +} > + > +static gushort socket_listen_readable(void *opaque) > +{ > + /* listen only handle in-req, no err */ > + return G_IO_IN; From the accept(2) man page: "Linux accept() (and accept4()) passes already-pending network errors on the new socket as an error code from accept()." So we must handle errors from accept(2), please use G_IO_IN | G_IO_HUP | G_IO_ERR. > +static gushort socket_establish_readable(void *opaque) > +{ > + NetSocketState *s = opaque; > + > + /* rely on net_socket_send to handle err */ > + if (s->read_poll && net_socket_can_send(s)) { > + return G_IO_IN|G_IO_HUP|G_IO_ERR; > + } > + return G_IO_HUP|G_IO_ERR; > +} This new function always monitors G_IO_HUP | G_IO_ERR. The old code only monitored it when read_poll == true and net_socket_can_send() == true. Please preserve semantics. > +static gushort socket_establish_writable(void *opaque) > +{ > + NetSocketState *s = opaque; > + > + if (s->write_poll) { > + return G_IO_OUT; Errors/hang up? > @@ -440,9 +529,20 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, > s->listen_fd = -1; > > if (is_connected) { > - net_socket_connect(s); > + assert(!s->nsrc); > + s->nsrc = event_source_new(fd, net_socket_establish_handler, s); > + s->nsrc->readable = socket_establish_readable; > + s->nsrc->writable = socket_establish_writable; > + nc->info->bind_ctx(nc, NULL); > + net_socket_read_poll(s, true); > + net_socket_write_poll(s, true); > } else { > - qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s); > + assert(!s->nsrc); > + s->nsrc = event_source_new(fd, net_socket_connect_handler, s); > + s->nsrc->readable = socket_connecting_readable; The original code wants writeable, not readable. > +static gboolean net_socket_listen_handler(gpointer data) > +{ > + EventGSource *nsrc = data; > + NetSocketState *s = nsrc->opaque; > struct sockaddr_in saddr; > socklen_t len; > int fd; > > - for(;;) { > - len = sizeof(saddr); > - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); > - if (fd < 0 && errno != EINTR) { > - return; > - } else if (fd >= 0) { > - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL); > - break; > - } > + len = sizeof(saddr); > + fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); > + if (fd < 0 && errno != EINTR) { > + return false; > } This breaks the code when accept(2) is interrupted by a signal and we get -1, errno == EINTR. Why did you remove the loop?
On Fri, Apr 26, 2013 at 5:48 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > On Fri, Apr 26, 2013 at 10:47:26AM +0800, Liu Ping Fan wrote: >> @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, >> return ret; >> } >> >> +static gushort socket_connecting_readable(void *opaque) >> +{ >> + return G_IO_IN; >> +} >> + >> +static gushort socket_listen_readable(void *opaque) >> +{ >> + /* listen only handle in-req, no err */ >> + return G_IO_IN; > > From the accept(2) man page: > > "Linux accept() (and accept4()) passes already-pending network errors on > the new socket as an error code from accept()." > > So we must handle errors from accept(2), please use G_IO_IN | G_IO_HUP | > G_IO_ERR. > Here, we handle listen(2), not accept(2) >> +static gushort socket_establish_readable(void *opaque) >> +{ >> + NetSocketState *s = opaque; >> + >> + /* rely on net_socket_send to handle err */ >> + if (s->read_poll && net_socket_can_send(s)) { >> + return G_IO_IN|G_IO_HUP|G_IO_ERR; >> + } >> + return G_IO_HUP|G_IO_ERR; >> +} > > This new function always monitors G_IO_HUP | G_IO_ERR. The old code > only monitored it when read_poll == true and net_socket_can_send() == > true. > > Please preserve semantics. > But the only the code in net_socket_send() will handle the err condition. See the code behind "/* end of connection */". And I think it is safely to handle err, even when the peer is not ready to receive. >> +static gushort socket_establish_writable(void *opaque) >> +{ >> + NetSocketState *s = opaque; >> + >> + if (s->write_poll) { >> + return G_IO_OUT; > > Errors/hang up? > As explained above, net_socket_writable() does not handle the err condition. But maybe we need the qemu_flush_queued_packets() in it? >> @@ -440,9 +529,20 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, >> s->listen_fd = -1; >> >> if (is_connected) { >> - net_socket_connect(s); >> + assert(!s->nsrc); >> + s->nsrc = event_source_new(fd, net_socket_establish_handler, s); >> + s->nsrc->readable = socket_establish_readable; >> + s->nsrc->writable = socket_establish_writable; >> + nc->info->bind_ctx(nc, NULL); >> + net_socket_read_poll(s, true); >> + net_socket_write_poll(s, true); >> } else { >> - qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s); >> + assert(!s->nsrc); >> + s->nsrc = event_source_new(fd, net_socket_connect_handler, s); >> + s->nsrc->readable = socket_connecting_readable; > > The original code wants writeable, not readable. > Will fix it. >> +static gboolean net_socket_listen_handler(gpointer data) >> +{ >> + EventGSource *nsrc = data; >> + NetSocketState *s = nsrc->opaque; >> struct sockaddr_in saddr; >> socklen_t len; >> int fd; >> >> - for(;;) { >> - len = sizeof(saddr); >> - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); >> - if (fd < 0 && errno != EINTR) { >> - return; >> - } else if (fd >= 0) { >> - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL); >> - break; >> - } >> + len = sizeof(saddr); >> + fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); >> + if (fd < 0 && errno != EINTR) { >> + return false; >> } > > This breaks the code when accept(2) is interrupted by a signal and we > get -1, errno == EINTR. Why did you remove the loop? Oh, will fix it. Thanks, Pingfan
On Sat, Apr 27, 2013 at 03:09:10PM +0800, liu ping fan wrote: > On Fri, Apr 26, 2013 at 5:48 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote: > > On Fri, Apr 26, 2013 at 10:47:26AM +0800, Liu Ping Fan wrote: > >> @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, > >> return ret; > >> } > >> > >> +static gushort socket_connecting_readable(void *opaque) > >> +{ > >> + return G_IO_IN; > >> +} > >> + > >> +static gushort socket_listen_readable(void *opaque) > >> +{ > >> + /* listen only handle in-req, no err */ > >> + return G_IO_IN; > > > > From the accept(2) man page: > > > > "Linux accept() (and accept4()) passes already-pending network errors on > > the new socket as an error code from accept()." > > > > So we must handle errors from accept(2), please use G_IO_IN | G_IO_HUP | > > G_IO_ERR. > > > Here, we handle listen(2), not accept(2) Look again, the handler invokes accept(2). listen(2) was called to put the socket into the listening state but now we are monitoring for accept. > >> +static gushort socket_establish_readable(void *opaque) > >> +{ > >> + NetSocketState *s = opaque; > >> + > >> + /* rely on net_socket_send to handle err */ > >> + if (s->read_poll && net_socket_can_send(s)) { > >> + return G_IO_IN|G_IO_HUP|G_IO_ERR; > >> + } > >> + return G_IO_HUP|G_IO_ERR; > >> +} > > > > This new function always monitors G_IO_HUP | G_IO_ERR. The old code > > only monitored it when read_poll == true and net_socket_can_send() == > > true. > > > > Please preserve semantics. > > > But the only the code in net_socket_send() will handle the err > condition. See the code behind "/* end of connection */". And I think > it is safely to handle err, even when the peer is not ready to > receive. > > >> +static gushort socket_establish_writable(void *opaque) > >> +{ > >> + NetSocketState *s = opaque; > >> + > >> + if (s->write_poll) { > >> + return G_IO_OUT; > > > > Errors/hang up? > > > As explained above, net_socket_writable() does not handle the err > condition. But maybe we need the qemu_flush_queued_packets() in it? net_socket_receive() does handle send(2) errors but it does so differently from net_socket_send(). It fails the packet and resets ->send_index. The change you made doesn't really solve this because an error could still happen right when QEMU calls send(2) and therefore not be processed by net_socket_send(). Changing the send/receive error handling is something that could be done carefully in a separate patch. But please preserve semantics in conversion patches - it makes them easy to review and merge. As explained above, I'm not convinced that the change you made is useful. Stefan
diff --git a/net/socket.c b/net/socket.c index 396dc8c..abdb809 100644 --- a/net/socket.c +++ b/net/socket.c @@ -31,6 +31,8 @@ #include "qemu/option.h" #include "qemu/sockets.h" #include "qemu/iov.h" +#include "util/event_gsource.h" + typedef struct NetSocketState { NetClientState nc; @@ -42,13 +44,15 @@ typedef struct NetSocketState { unsigned int send_index; /* number of bytes sent (only SOCK_STREAM) */ uint8_t buf[4096]; struct sockaddr_in dgram_dst; /* contains inet host and port destination iff connectionless (SOCK_DGRAM) */ - IOHandler *send_fn; /* differs between SOCK_STREAM/SOCK_DGRAM */ bool read_poll; /* waiting to receive data? */ bool write_poll; /* waiting to transmit data? */ + EventGSource *nsrc; } NetSocketState; -static void net_socket_accept(void *opaque); static void net_socket_writable(void *opaque); +static gboolean net_socket_listen_handler(gpointer data); +static gboolean net_socket_establish_handler(gpointer data); + /* Only read packets from socket when peer can receive them */ static int net_socket_can_send(void *opaque) @@ -58,25 +62,14 @@ static int net_socket_can_send(void *opaque) return qemu_can_send_packet(&s->nc); } -static void net_socket_update_fd_handler(NetSocketState *s) -{ - qemu_set_fd_handler2(s->fd, - s->read_poll ? net_socket_can_send : NULL, - s->read_poll ? s->send_fn : NULL, - s->write_poll ? net_socket_writable : NULL, - s); -} - static void net_socket_read_poll(NetSocketState *s, bool enable) { s->read_poll = enable; - net_socket_update_fd_handler(s); } static void net_socket_write_poll(NetSocketState *s, bool enable) { s->write_poll = enable; - net_socket_update_fd_handler(s); } static void net_socket_writable(void *opaque) @@ -141,6 +134,59 @@ static ssize_t net_socket_receive_dgram(NetClientState *nc, const uint8_t *buf, return ret; } +static gushort socket_connecting_readable(void *opaque) +{ + return G_IO_IN; +} + +static gushort socket_listen_readable(void *opaque) +{ + /* listen only handle in-req, no err */ + return G_IO_IN; +} + +static gushort socket_establish_readable(void *opaque) +{ + NetSocketState *s = opaque; + + /* rely on net_socket_send to handle err */ + if (s->read_poll && net_socket_can_send(s)) { + return G_IO_IN|G_IO_HUP|G_IO_ERR; + } + return G_IO_HUP|G_IO_ERR; +} + +static gushort socket_establish_writable(void *opaque) +{ + NetSocketState *s = opaque; + + if (s->write_poll) { + return G_IO_OUT; + } + return 0; +} + +static gushort socket_dgram_readable(void *opaque) +{ + NetSocketState *s = opaque; + + /* rely on net_socket_send_dgram to handle err */ + if (s->read_poll && net_socket_can_send(s)) { + return G_IO_IN|G_IO_ERR; + } + return G_IO_ERR; +} + +static gushort socket_dgram_writable(void *opaque) +{ + NetSocketState *s = opaque; + + if (s->write_poll) { + return G_IO_OUT; + } + return 0; +} + static void net_socket_send(void *opaque) { NetSocketState *s = opaque; @@ -160,7 +206,11 @@ static void net_socket_send(void *opaque) net_socket_read_poll(s, false); net_socket_write_poll(s, false); if (s->listen_fd != -1) { - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); + event_source_release(s->nsrc); + s->nsrc = event_source_new(s->listen_fd, net_socket_listen_handler, + s); + s->nsrc->readable = socket_listen_readable; + s->nc.info->bind_ctx(&s->nc, NULL); } closesocket(s->fd); @@ -231,6 +281,8 @@ static void net_socket_send_dgram(void *opaque) /* end of connection */ net_socket_read_poll(s, false); net_socket_write_poll(s, false); + /* for dgram err, removing it */ + g_source_remove_poll(&s->nsrc->source, &s->nsrc->gfd); return; } qemu_send_packet(&s->nc, s->buf, size); @@ -331,6 +383,14 @@ static void net_socket_cleanup(NetClientState *nc) closesocket(s->listen_fd); s->listen_fd = -1; } + event_source_release(s->nsrc); +} + +static void net_socket_bind_ctx(NetClientState *nc, GMainContext *ctx) +{ + NetSocketState *s = DO_UPCAST(NetSocketState, nc, nc); + + g_source_attach(&s->nsrc->source, ctx); } static NetClientInfo net_dgram_socket_info = { @@ -338,8 +398,23 @@ static NetClientInfo net_dgram_socket_info = { .size = sizeof(NetSocketState), .receive = net_socket_receive_dgram, .cleanup = net_socket_cleanup, + .bind_ctx = net_socket_bind_ctx, }; +static gboolean net_socket_dgram_handler(gpointer data) +{ + EventGSource *nsrc = (EventGSource *)data; + NetSocketState *s = nsrc->opaque; + + /* for err, unregister the handler */ + if (nsrc->gfd.revents & (G_IO_IN|G_IO_ERR)) { + net_socket_send_dgram(s); + } else { + net_socket_writable(s); + } + return true; +} + static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, const char *model, const char *name, @@ -393,8 +468,12 @@ static NetSocketState *net_socket_fd_init_dgram(NetClientState *peer, s->fd = fd; s->listen_fd = -1; - s->send_fn = net_socket_send_dgram; + s->nsrc = event_source_new(fd, net_socket_dgram_handler, s); + s->nsrc->readable = socket_dgram_readable; + s->nsrc->writable = socket_dgram_writable; + nc->info->bind_ctx(nc, NULL); net_socket_read_poll(s, true); + net_socket_write_poll(s, true); /* mcast: save bound address as dst */ if (is_connected) { @@ -408,20 +487,30 @@ err: return NULL; } -static void net_socket_connect(void *opaque) -{ - NetSocketState *s = opaque; - s->send_fn = net_socket_send; - net_socket_read_poll(s, true); -} - static NetClientInfo net_socket_info = { .type = NET_CLIENT_OPTIONS_KIND_SOCKET, .size = sizeof(NetSocketState), .receive = net_socket_receive, .cleanup = net_socket_cleanup, + .bind_ctx = net_socket_bind_ctx, }; +static gboolean net_socket_connect_handler(gpointer data) +{ + EventGSource *nsrc = data; + NetSocketState *s = nsrc->opaque; + + event_source_release(s->nsrc); + s->nsrc = event_source_new(s->fd, net_socket_establish_handler, s); + s->nsrc->readable = socket_establish_readable; + s->nsrc->writable = socket_establish_writable; + s->nc.info->bind_ctx(&s->nc, NULL); + net_socket_read_poll(s, true); + net_socket_write_poll(s, true); + + return true; +} + static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, const char *model, const char *name, @@ -440,9 +529,20 @@ static NetSocketState *net_socket_fd_init_stream(NetClientState *peer, s->listen_fd = -1; if (is_connected) { - net_socket_connect(s); + assert(!s->nsrc); + s->nsrc = event_source_new(fd, net_socket_establish_handler, s); + s->nsrc->readable = socket_establish_readable; + s->nsrc->writable = socket_establish_writable; + nc->info->bind_ctx(nc, NULL); + net_socket_read_poll(s, true); + net_socket_write_poll(s, true); } else { - qemu_set_fd_handler(s->fd, NULL, net_socket_connect, s); + assert(!s->nsrc); + s->nsrc = event_source_new(fd, net_socket_connect_handler, s); + s->nsrc->readable = socket_connecting_readable; + nc->info->bind_ctx(nc, NULL); + net_socket_read_poll(s, true); + net_socket_write_poll(s, false); } return s; } @@ -473,30 +573,49 @@ static NetSocketState *net_socket_fd_init(NetClientState *peer, return NULL; } -static void net_socket_accept(void *opaque) +static gboolean net_socket_establish_handler(gpointer data) { - NetSocketState *s = opaque; + EventGSource *nsrc = (EventGSource *)data; + NetSocketState *s = nsrc->opaque; + + /* for err case, resort to the logic in net_socket_send to recover */ + if (nsrc->gfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) { + net_socket_send(s); + } + if ((nsrc->gfd.revents & G_IO_OUT)) { + net_socket_writable(s); + } + return true; +} + +static gboolean net_socket_listen_handler(gpointer data) +{ + EventGSource *nsrc = data; + NetSocketState *s = nsrc->opaque; struct sockaddr_in saddr; socklen_t len; int fd; - for(;;) { - len = sizeof(saddr); - fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); - if (fd < 0 && errno != EINTR) { - return; - } else if (fd >= 0) { - qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL); - break; - } + len = sizeof(saddr); + fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len); + if (fd < 0 && errno != EINTR) { + return false; } s->fd = fd; s->nc.link_down = false; - net_socket_connect(s); + /* prevent more than one connect req */ + event_source_release(s->nsrc); + s->nsrc = event_source_new(fd, net_socket_establish_handler, s); + s->nsrc->readable = socket_establish_readable; + s->nsrc->writable = socket_establish_writable; + s->nc.info->bind_ctx(&s->nc, NULL); + net_socket_read_poll(s, true); snprintf(s->nc.info_str, sizeof(s->nc.info_str), "socket: connection from %s:%d", inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port)); + + return true; } static int net_socket_listen_init(NetClientState *peer, @@ -542,7 +661,10 @@ static int net_socket_listen_init(NetClientState *peer, s->listen_fd = fd; s->nc.link_down = true; - qemu_set_fd_handler(s->listen_fd, net_socket_accept, NULL, s); + s->nsrc = event_source_new(fd, net_socket_listen_handler, s); + s->nsrc->readable = socket_listen_readable; + nc->info->bind_ctx(nc, NULL); + return 0; }