Message ID | 1289870175-14880-8-git-send-email-mdroth@linux.vnet.ibm.com |
---|---|
State | New |
Headers | show |
On 11/15/2010 07:16 PM, Michael Roth wrote: > Handle data coming in over the channel as VPPackets: Process control > messages and forward data from remote client/server connections to the > appropriate server/client FD on our end. We also provide here a helper > function to process a stream of packets from the channel. > > Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com> > --- > virtproxy.c | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ > virtproxy.h | 3 ++ > 2 files changed, 99 insertions(+), 0 deletions(-) > > diff --git a/virtproxy.c b/virtproxy.c > index 770b57b..091a223 100644 > --- a/virtproxy.c > +++ b/virtproxy.c > @@ -33,6 +33,7 @@ > #define VP_SERVICE_ID_LEN 32 /* max length of service id string */ > #define VP_PKT_DATA_LEN 1024 /* max proxied bytes per VPPacket */ > #define VP_CONN_DATA_LEN 1024 /* max bytes conns can send at a time */ > +#define VP_CHAN_DATA_LEN 4096 /* max bytes channel can send at a time */ > #define VP_MAGIC 0x1F374059 > > /* listening fd, one for each service we're forwarding to remote end */ > @@ -152,6 +153,8 @@ static QemuOptsList vp_socket_opts = { > }, > }; > > +static void vp_channel_read(void *opaque); > + > static int vp_channel_send_all(VPDriver *drv, uint8_t *buf, int count) > { > int ret; > @@ -263,3 +266,96 @@ static void vp_channel_accept(void *opaque) > /* dont accept anymore connections until channel_fd is closed */ > vp_set_fd_handler(drv->listen_fd, NULL, NULL, NULL); > } > + > +/* process a stream of packets coming in from the channel */ > +int vp_handle_packet_buf(VPDriver *drv, const void *buf, int count) > +{ > + VPPacket pkt; > + int ret, buf_offset; > + char *pkt_ptr; > + const char *buf_ptr; > + > + if (drv->buflen + count>= sizeof(VPPacket)) { > + TRACE("initial packet, drv->buflen: %d", drv->buflen); > + pkt_ptr = (char *)&pkt; > + memcpy(pkt_ptr, drv->buf, drv->buflen); > + pkt_ptr += drv->buflen; > + memcpy(pkt_ptr, buf, sizeof(VPPacket) - drv->buflen); > + /* handle first packet */ > + ret = vp_handle_packet(drv,&pkt); > + if (ret != 0) { > + LOG("error handling packet"); > + } > + /* handle the rest of the buffer */ > + buf_offset = sizeof(VPPacket) - drv->buflen; > + drv->buflen = 0; > + buf_ptr = buf + buf_offset; > + count -= buf_offset; > + while (count> 0) { > + if (count>= sizeof(VPPacket)) { > + /* handle full packet */ > + TRACE("additional packet, drv->buflen: %d", drv->buflen); > + memcpy((void *)&pkt, buf_ptr, sizeof(VPPacket)); > + ret = vp_handle_packet(drv,&pkt); > + if (ret != 0) { > + LOG("error handling packet"); > + } > + count -= sizeof(VPPacket); > + buf_ptr += sizeof(VPPacket); > + } else { > + /* buffer the remainder */ > + TRACE("buffering packet, drv->buflen: %d", drv->buflen); > + memcpy(drv->buf, buf_ptr, count); > + drv->buflen = count; > + break; > + } > + } > + } else { > + /* haven't got a full VPPacket yet, buffer for later */ > + TRACE("buffering packet, drv->buflen: %d", drv->buflen); > + memcpy(drv->buf + drv->buflen, buf, count); > + drv->buflen += count; > + } > + return 0; > +} > + > +/* read handler for communication channel > + * > + * de-multiplexes data coming in over the channel. for control messages > + * we process them here, for data destined for a service or client we > + * send it to the appropriate FD. > + */ > +static void vp_channel_read(void *opaque) > +{ > + VPDriver *drv = opaque; > + int count, ret; > + char buf[VP_CHAN_DATA_LEN]; > + > + TRACE("called with opaque: %p", drv); > + > + count = read(drv->channel_fd, buf, sizeof(buf)); > + > + if (count == -1) { > I think you notice this later in your series but this is the first indication that something is fundamentally wrong. In a tool like this, you should be checking for EAGAIN everywhere and handling it gracefully but you assume you have synchronous fds everywhere. This creates the following scenario: In the host we have process Alice and process Mary. They both communicate with process Bob and process Joe on the guest respectively through virtproxy. Bob and Joe send packets to virtproxy, and virtproxy combines the packets into a single channel which it then sends to QEMU for QEMU to decode and sent back out to Alice and Mary. Classic mux/demux. Bob's communication with Alice is totally independent of Mary's communication with Joe. However, because you multiplex synchronously, if virtproxy tries to send a packet from Mary to Joe and the socket buffer between virtproxy and Joe is full, then virtproxy blocks. This means if Alice tries to send a message to Bob, the fact that Joe is not responding blocks their communication creating an artificial dependency between the two communications channels. It's not just as simple as being async and queueing, because you can't queue in memory indefinitely so at some point in time, you simply have to stop reading packets. Now imagine that Joe's socket buffer is full because he's talking to Bob through some other mechanism. Joe being blocked on Bob is no problem because Bob has nothing to do with Alice and Joe's communication stream. Eventually, Bob will get Joe what he needs and that will unblock Joe and let Alice's communication with Joe continue. Now imagine that Bob is blocking Joe because he's waiting on a message from Mary. This should be okay, Mary's communication with Bob *should* be independent on anything and as soon as Mary sends the message to Bob, Bob can talk to Joe, and Joe can talk to Mary. Except, in virtproxy, the fact that Alice cannot talk to Joe blocks Mary from talking to Bob which creates a dead lock. To be honest, there's no simple solution. This is a classic queuing problem. You need some form of congestion control to fix this. That means virtproxy needs to be able to say to Alice that Joe is not ready to communicate right now and then let her know when she can resume communication. That frees up the shared resource for Mary and Bob to use. The fundamental problem here is that virtproxy is too generic. It's trying to tackle too hard of a problem. This isn't a problem with virtio-serial because virtio-serial has a dedicated queue for each port which allows each port to have it's own back off mechanism. You can eliminate this problem by doing the following: 1) Have virtagent use two virtio-serial ports. 2) Have virtagent multiplex on it's own when given a single port. Yes, the problem still presents itself but you've limited the communication scope which means you can practical avoid any deadlocks. You only have two peers in the channel: qemu and virtagent. There communication involves the following: QEMU->virtagent RPC - QEMU wants to send an RPC request. Until this is entirely completed, it will never allow another request to be sent - virtagent is waiting to receive an RPC request, it gets a packet and sees that it's a request - virtagent processes the request, and sends back a response - QEMU receives response, processes virtagent->QEMU RPC - Same as above with roles reversed The only thing you need to handle is if QEMU tries to send a request to virtagent while virtagent simultaneous tries to send QEMU a request. This is simple to handle though because you are allowing one RPC request to be queued on either end at a given time. This is really the key to success, by the nature of the communication, we can demultiplex into finite buffers. Regards, Anthony Liguori
On 11/16/2010 05:17 PM, Anthony Liguori wrote: > Except, in virtproxy, the fact that Alice cannot talk to Joe blocks Mary > from talking to Bob which creates a dead lock. > > To be honest, there's no simple solution. This is a classic queuing > problem. You need some form of congestion control to fix this. That > means virtproxy needs to be able to say to Alice that Joe is not ready > to communicate right now and then let her know when she can resume > communication. That frees up the shared resource for Mary and Bob to use. > > The fundamental problem here is that virtproxy is too generic. It's > trying to tackle too hard of a problem. This isn't a problem with > virtio-serial because virtio-serial has a dedicated queue for each port > which allows each port to have it's own back off mechanism. So, after a series of half-written rebuttals and working over the proposed scenarios with the solutions I'd had in mind, I think I've finally reached the same conclusion. The potential for deadlocks can be traced to the use, or re-implementation, of qemu-char.c's send_all(). My thought had been that since we make assumptions there that these send_all()'s to processes will eventually complete, we can make similar assumptions in virtproxy. And if this is not an acceptable assumption, the solution would be a general one that would benefit chardev's in general, rather than something virtproxy-specific: mainly, an async/non-blocking alternative to send_all(). But you're right...whereas with normal chardevs, we have the option of throttling/polling for adequately-sized buffers at the device/virtio level, with virtproxy we'd need to do this for individual communication streams, which would require additional control packets. That, or we'd have to start dropping packets bound for a process when the corresponding write handler's queue was exhausted, which is not something we necessarily have to resort to when using a dedicated channel for a stream. Probably a bit too much beyond the scope of virtagent... virtproxy was supposed to make things *easier* :) > > You can eliminate this problem by doing the following: > > 1) Have virtagent use two virtio-serial ports. > How bad is this from an end-user/deployment perspective? My first thought would be a shortcut invocation for virtio-serial guests: ./qemu -enable-virtagent which would be equivalent to: ./qemu -chardev virtagent-client -chardev virtagent-server -device virtio-serial -device virtserialport,chardev=virtagent-client,name=org.qemu.virtagent-client -device virtserialport,chardev=virtagent-server,name=org.qemu.virtagent-server And for alternative channels they'd need to do the verbose invocation, which I think works out well since we can't guess a good index for, say, isa-serial, or if we could, we'd still need some way to convey what it is so they can go start the agent accordingly. And implementing virtagent-* as a simple chardev wrapper for -chardev socket lets us set up default paths for the client/server sockets that QEMU talks to send/receive RPCs, as well chardev IDs and invoking virtagent init routines. Potentially we can key into events to infer when the guest is connected to the other end as well. Guest agent invocation would then be, for virtio-serial guests: ./virtagent which would be equivalent to: ./virtagent --client virtio-serial:/dev/virtio-ports/org.qemu.virtagent-client --server virtio-serial:/dev/virtio-ports/org.qemu.virtagent-server And for alternative channels, i.e. isa-serial, they'd have to do the manual invocation and point to the proper serial ports. It's flexible, and the virtio-serial case is dead simple. The major drawback is the potential scarcity of isa-serial ports: 2 out of 4 might be too much to ask for. And I think with a ubiquitous agent sticking with isa-serial to be safe might be a common deployment strategy for a lot of environments. > 2) Have virtagent multiplex on it's own when given a single port. Yes, > the problem still presents itself but you've limited the communication > scope which means you can practical avoid any deadlocks. You only have > two peers in the channel: qemu and virtagent. There communication > involves the following: > > QEMU->virtagent RPC > - QEMU wants to send an RPC request. Until this is entirely completed, > it will never allow another request to be sent > - virtagent is waiting to receive an RPC request, it gets a packet and > sees that it's a request > - virtagent processes the request, and sends back a response > - QEMU receives response, processes > > virtagent->QEMU RPC > - Same as above with roles reversed > > The only thing you need to handle is if QEMU tries to send a request to > virtagent while virtagent simultaneous tries to send QEMU a request. > This is simple to handle though because you are allowing one RPC request > to be queued on either end at a given time. This is really the key to > success, by the nature of the communication, we can demultiplex into > finite buffers. I'll look into this a bit more. But I'm hesitant to revisit a multiplexing solution at this late a stage unless 2 isa-serial ports seems exceedingly prohibitive. If the design seems simple enough I will try to work it into the next RFC, but I think 1) might be more feasible at this point. And we can still potentially work in 2) in a later release. > > Regards, > > Anthony Liguori Thanks! -Mike
As a side-note to the larger control flow questions: Why not read() into &drv->buf[drv->buflen] and use it directly with vp_handle_packet(). I don't think we need memcpy or temporary buffers since we already have a buffer in drv. There are extra buffers and memcpys here which make the code more complex. Stefan
diff --git a/virtproxy.c b/virtproxy.c index 770b57b..091a223 100644 --- a/virtproxy.c +++ b/virtproxy.c @@ -33,6 +33,7 @@ #define VP_SERVICE_ID_LEN 32 /* max length of service id string */ #define VP_PKT_DATA_LEN 1024 /* max proxied bytes per VPPacket */ #define VP_CONN_DATA_LEN 1024 /* max bytes conns can send at a time */ +#define VP_CHAN_DATA_LEN 4096 /* max bytes channel can send at a time */ #define VP_MAGIC 0x1F374059 /* listening fd, one for each service we're forwarding to remote end */ @@ -152,6 +153,8 @@ static QemuOptsList vp_socket_opts = { }, }; +static void vp_channel_read(void *opaque); + static int vp_channel_send_all(VPDriver *drv, uint8_t *buf, int count) { int ret; @@ -263,3 +266,96 @@ static void vp_channel_accept(void *opaque) /* dont accept anymore connections until channel_fd is closed */ vp_set_fd_handler(drv->listen_fd, NULL, NULL, NULL); } + +/* process a stream of packets coming in from the channel */ +int vp_handle_packet_buf(VPDriver *drv, const void *buf, int count) +{ + VPPacket pkt; + int ret, buf_offset; + char *pkt_ptr; + const char *buf_ptr; + + if (drv->buflen + count >= sizeof(VPPacket)) { + TRACE("initial packet, drv->buflen: %d", drv->buflen); + pkt_ptr = (char *)&pkt; + memcpy(pkt_ptr, drv->buf, drv->buflen); + pkt_ptr += drv->buflen; + memcpy(pkt_ptr, buf, sizeof(VPPacket) - drv->buflen); + /* handle first packet */ + ret = vp_handle_packet(drv, &pkt); + if (ret != 0) { + LOG("error handling packet"); + } + /* handle the rest of the buffer */ + buf_offset = sizeof(VPPacket) - drv->buflen; + drv->buflen = 0; + buf_ptr = buf + buf_offset; + count -= buf_offset; + while (count > 0) { + if (count >= sizeof(VPPacket)) { + /* handle full packet */ + TRACE("additional packet, drv->buflen: %d", drv->buflen); + memcpy((void *)&pkt, buf_ptr, sizeof(VPPacket)); + ret = vp_handle_packet(drv, &pkt); + if (ret != 0) { + LOG("error handling packet"); + } + count -= sizeof(VPPacket); + buf_ptr += sizeof(VPPacket); + } else { + /* buffer the remainder */ + TRACE("buffering packet, drv->buflen: %d", drv->buflen); + memcpy(drv->buf, buf_ptr, count); + drv->buflen = count; + break; + } + } + } else { + /* haven't got a full VPPacket yet, buffer for later */ + TRACE("buffering packet, drv->buflen: %d", drv->buflen); + memcpy(drv->buf + drv->buflen, buf, count); + drv->buflen += count; + } + return 0; +} + +/* read handler for communication channel + * + * de-multiplexes data coming in over the channel. for control messages + * we process them here, for data destined for a service or client we + * send it to the appropriate FD. + */ +static void vp_channel_read(void *opaque) +{ + VPDriver *drv = opaque; + int count, ret; + char buf[VP_CHAN_DATA_LEN]; + + TRACE("called with opaque: %p", drv); + + count = read(drv->channel_fd, buf, sizeof(buf)); + + if (count == -1) { + LOG("read() failed: %s", strerror(errno)); + return; + } else if (count == 0) { + /* TODO: channel closed, this probably shouldn't happen for guest-side + * serial/virtio-serial connections, but need to confirm and consider + * what should happen in this case. as it stands this virtproxy instance + * is basically defunct at this point, same goes for "client" instances + * of virtproxy where the remote end has hung-up. + */ + LOG("channel connection closed"); + vp_set_fd_handler(drv->channel_fd, NULL, NULL, drv); + drv->channel_fd = -1; + if (drv->listen_fd) { + vp_set_fd_handler(drv->listen_fd, vp_channel_accept, NULL, drv); + } + /* TODO: should close/remove/delete all existing VPConns here */ + } + + ret = vp_handle_packet_buf(drv, buf, count); + if (ret != 0) { + LOG("error handling packet stream"); + } +} diff --git a/virtproxy.h b/virtproxy.h index 1a5e56a..8fa0142 100644 --- a/virtproxy.h +++ b/virtproxy.h @@ -32,4 +32,7 @@ int vp_set_fd_handler(int fd, void *opaque); void vp_chr_read(CharDriverState *s, uint8_t *buf, int len); +/* virtproxy interface */ +int vp_handle_packet_buf(VPDriver *drv, const void *buf, int count); + #endif /* VIRTPROXY_H */
Handle data coming in over the channel as VPPackets: Process control messages and forward data from remote client/server connections to the appropriate server/client FD on our end. We also provide here a helper function to process a stream of packets from the channel. Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com> --- virtproxy.c | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ virtproxy.h | 3 ++ 2 files changed, 99 insertions(+), 0 deletions(-)