Patchwork [RFC,RESEND,v1,06/15] virtproxy: add read handler for communication channel

login
register
mail settings
Submitter Michael Roth
Date Nov. 3, 2010, 3:28 p.m.
Message ID <1288798090-7127-7-git-send-email-mdroth@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/70017/
State New
Headers show

Comments

Michael Roth - Nov. 3, 2010, 3:28 p.m.
Handle data coming in over the channel as VPPackets: Process control
messages and forward data from remote client/server connections to the
appropriate server/client FD on our end.

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 virtproxy.c |   83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 83 insertions(+), 0 deletions(-)
Adam Litke - Nov. 3, 2010, 11:38 p.m.
On Wed, 2010-11-03 at 10:28 -0500, Michael Roth wrote:
> Handle data coming in over the channel as VPPackets: Process control
> messages and forward data from remote client/server connections to the
> appropriate server/client FD on our end.
> 
> Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
> ---
>  virtproxy.c |   83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 files changed, 83 insertions(+), 0 deletions(-)
> 
> diff --git a/virtproxy.c b/virtproxy.c
> index 20532c2..c9c3022 100644
> --- a/virtproxy.c
> +++ b/virtproxy.c
> @@ -33,6 +33,7 @@
>  #define VP_SERVICE_ID_LEN 32    /* max length of service id string */
>  #define VP_PKT_DATA_LEN 1024    /* max proxied bytes per VPPacket */
>  #define VP_CONN_DATA_LEN 1024   /* max bytes conns can send at a time */
> +#define VP_CHAN_DATA_LEN 4096   /* max bytes channel can send at a time */
>  #define VP_MAGIC 0x1F374059
> 
>  /* listening fd, one for each service we're forwarding to remote end */
> @@ -150,6 +151,8 @@ static QemuOptsList vp_socket_opts = {
>      },
>  };
> 
> +static void vp_channel_read(void *opaque);

Try to get rid of these forward declarations.  If you really need them,
consider adding a virtproxy-internal.h.


> @@ -230,3 +233,83 @@ static void vp_channel_accept(void *opaque)
>      /* dont accept anymore connections until channel_fd is closed */
>      vp_set_fd_handler(drv->listen_fd, NULL, NULL, NULL);
>  }
> +
> +/* read handler for communication channel
> + *
> + * de-multiplexes data coming in over the channel. for control messages
> + * we process them here, for data destined for a service or client we
> + * send it to the appropriate FD.
> + */
> +static void vp_channel_read(void *opaque)
> +{
> +    VPDriver *drv = opaque;
> +    VPPacket pkt;
> +    int count, ret, buf_offset;
> +    char buf[VP_CHAN_DATA_LEN];
> +    char *pkt_ptr, *buf_ptr;
> +
> +    TRACE("called with opaque: %p", drv);
> +
> +    count = read(drv->channel_fd, buf, sizeof(buf));
> +
> +    if (count == -1) {
> +        LOG("read() failed: %s", strerror(errno));
> +        return;
> +    } else if (count == 0) {
> +        /* TODO: channel closed, this probably shouldn't happen for guest-side
> +         * serial/virtio-serial connections, but need to confirm and consider
> +         * what should happen in this case. as it stands this virtproxy instance
> +         * is basically defunct at this point, same goes for "client" instances
> +         * of virtproxy where the remote end has hung-up.
> +         */
> +        LOG("channel connection closed");
> +        vp_set_fd_handler(drv->channel_fd, NULL, NULL, drv);
> +        drv->channel_fd = -1;
> +        if (drv->listen_fd) {
> +            vp_set_fd_handler(drv->listen_fd, vp_channel_accept, NULL, drv);
> +        }
> +        /* TODO: should close/remove/delete all existing VPConns here */

Looks like you have a little work TODO here still.  Perhaps you could
add some reset/init functions that would make handling this easier.  The
ability to reset state at both the channel and VPConn levels should give
you the ability to handle errors more gracefully in other places where
you currently just log them.

> +    }
> +
> +    if (drv->buflen + count >= sizeof(VPPacket)) {
> +        TRACE("initial packet, drv->buflen: %d", drv->buflen);
> +        pkt_ptr = (char *)&pkt;
> +        memcpy(pkt_ptr, drv->buf, drv->buflen);

Can drv->buflen ever be > sizeof(VPPacket) ?  You might consider adding
an assert(drv->buflen < sizeof(VPPacket)) unless it's mathematically
impossible.

> +        pkt_ptr += drv->buflen;
> +        memcpy(pkt_ptr, buf, sizeof(VPPacket) - drv->buflen);
> +        /* handle first packet */
> +        ret = vp_handle_packet(drv, &pkt);
> +        if (ret != 0) {
> +            LOG("error handling packet");
> +        }
> +        /* handle the rest of the buffer */
> +        buf_offset = sizeof(VPPacket) - drv->buflen;
> +        drv->buflen = 0;
> +        buf_ptr = buf + buf_offset;
> +        count -= buf_offset;
> +        while (count > 0) {
> +            if (count >= sizeof(VPPacket)) {
> +                /* handle full packet */
> +                TRACE("additional packet, drv->buflen: %d", drv->buflen);
> +                memcpy((void *)&pkt, buf_ptr, sizeof(VPPacket));
> +                ret = vp_handle_packet(drv, &pkt);
> +                if (ret != 0) {
> +                    LOG("error handling packet");
> +                }
> +                count -= sizeof(VPPacket);
> +                buf_ptr += sizeof(VPPacket);
> +            } else {
> +                /* buffer the remainder */
> +                TRACE("buffering packet");
> +                memcpy(drv->buf, buf_ptr, count);
> +                drv->buflen = count;
> +                break;
> +            }
> +        }
> +    } else {
> +        /* haven't got a full VPPacket yet, buffer for later */
> +        buf_ptr = drv->buf + drv->buflen;
> +        memcpy(buf_ptr, buf, count);
> +        drv->buflen += count;

With this algorithm you can hold some data hostage indefinitely.  You
either need to send out smaller packets of whatever data you receive or
maybe have a timer that periodically fires to flush drv->buf.  Any
thoughts on what you plan to do here?
Michael Roth - Nov. 4, 2010, 5 p.m.
On 11/03/2010 06:38 PM, Adam Litke wrote:
> On Wed, 2010-11-03 at 10:28 -0500, Michael Roth wrote:
>> Handle data coming in over the channel as VPPackets: Process control
>> messages and forward data from remote client/server connections to the
>> appropriate server/client FD on our end.
>>
>> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
>> ---
>>   virtproxy.c |   83 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   1 files changed, 83 insertions(+), 0 deletions(-)
>>
>> diff --git a/virtproxy.c b/virtproxy.c
>> index 20532c2..c9c3022 100644
>> --- a/virtproxy.c
>> +++ b/virtproxy.c
>> @@ -33,6 +33,7 @@
>>   #define VP_SERVICE_ID_LEN 32    /* max length of service id string */
>>   #define VP_PKT_DATA_LEN 1024    /* max proxied bytes per VPPacket */
>>   #define VP_CONN_DATA_LEN 1024   /* max bytes conns can send at a time */
>> +#define VP_CHAN_DATA_LEN 4096   /* max bytes channel can send at a time */
>>   #define VP_MAGIC 0x1F374059
>>
>>   /* listening fd, one for each service we're forwarding to remote end */
>> @@ -150,6 +151,8 @@ static QemuOptsList vp_socket_opts = {
>>       },
>>   };
>>
>> +static void vp_channel_read(void *opaque);
>
> Try to get rid of these forward declarations.  If you really need them,
> consider adding a virtproxy-internal.h.
>
>
>> @@ -230,3 +233,83 @@ static void vp_channel_accept(void *opaque)
>>       /* dont accept anymore connections until channel_fd is closed */
>>       vp_set_fd_handler(drv->listen_fd, NULL, NULL, NULL);
>>   }
>> +
>> +/* read handler for communication channel
>> + *
>> + * de-multiplexes data coming in over the channel. for control messages
>> + * we process them here, for data destined for a service or client we
>> + * send it to the appropriate FD.
>> + */
>> +static void vp_channel_read(void *opaque)
>> +{
>> +    VPDriver *drv = opaque;
>> +    VPPacket pkt;
>> +    int count, ret, buf_offset;
>> +    char buf[VP_CHAN_DATA_LEN];
>> +    char *pkt_ptr, *buf_ptr;
>> +
>> +    TRACE("called with opaque: %p", drv);
>> +
>> +    count = read(drv->channel_fd, buf, sizeof(buf));
>> +
>> +    if (count == -1) {
>> +        LOG("read() failed: %s", strerror(errno));
>> +        return;
>> +    } else if (count == 0) {
>> +        /* TODO: channel closed, this probably shouldn't happen for guest-side
>> +         * serial/virtio-serial connections, but need to confirm and consider
>> +         * what should happen in this case. as it stands this virtproxy instance
>> +         * is basically defunct at this point, same goes for "client" instances
>> +         * of virtproxy where the remote end has hung-up.
>> +         */
>> +        LOG("channel connection closed");
>> +        vp_set_fd_handler(drv->channel_fd, NULL, NULL, drv);
>> +        drv->channel_fd = -1;
>> +        if (drv->listen_fd) {
>> +            vp_set_fd_handler(drv->listen_fd, vp_channel_accept, NULL, drv);
>> +        }
>> +        /* TODO: should close/remove/delete all existing VPConns here */
>
> Looks like you have a little work TODO here still.  Perhaps you could
> add some reset/init functions that would make handling this easier.  The
> ability to reset state at both the channel and VPConn levels should give
> you the ability to handle errors more gracefully in other places where
> you currently just log them.
>

Yah, definitely. Planning on going back through these cases soon and 
handling cleanup more properly.

>> +    }
>> +
>> +    if (drv->buflen + count>= sizeof(VPPacket)) {
>> +        TRACE("initial packet, drv->buflen: %d", drv->buflen);
>> +        pkt_ptr = (char *)&pkt;
>> +        memcpy(pkt_ptr, drv->buf, drv->buflen);
>
> Can drv->buflen ever be>  sizeof(VPPacket) ?  You might consider adding
> an assert(drv->buflen<  sizeof(VPPacket)) unless it's mathematically
> impossible.
>

It shouldn't be possible. There are 2 situations where we set 
drv->buflen, and those situations are basically (in vp_channel_read):

1)

while (count > 0) {
     if (count >= sizeof(VPPacket)) {
         //handle packet
         count -= sizeof(VPPacket);
     } else {
         //buffer leftovers
         drv->buflen = count; // must be < sizeof(VPPacket) to get here
     }
}

2)

if (drv->buflen + count >= sizeof(VPPacket)) {
     //handle buffered/read packet data
} else {
     //buffer leftovers
     drv->buflen += count; // must be < sizeof(VPPacket)
}

>> +        pkt_ptr += drv->buflen;
>> +        memcpy(pkt_ptr, buf, sizeof(VPPacket) - drv->buflen);
>> +        /* handle first packet */
>> +        ret = vp_handle_packet(drv,&pkt);
>> +        if (ret != 0) {
>> +            LOG("error handling packet");
>> +        }
>> +        /* handle the rest of the buffer */
>> +        buf_offset = sizeof(VPPacket) - drv->buflen;
>> +        drv->buflen = 0;
>> +        buf_ptr = buf + buf_offset;
>> +        count -= buf_offset;
>> +        while (count>  0) {
>> +            if (count>= sizeof(VPPacket)) {
>> +                /* handle full packet */
>> +                TRACE("additional packet, drv->buflen: %d", drv->buflen);
>> +                memcpy((void *)&pkt, buf_ptr, sizeof(VPPacket));
>> +                ret = vp_handle_packet(drv,&pkt);
>> +                if (ret != 0) {
>> +                    LOG("error handling packet");
>> +                }
>> +                count -= sizeof(VPPacket);
>> +                buf_ptr += sizeof(VPPacket);
>> +            } else {
>> +                /* buffer the remainder */
>> +                TRACE("buffering packet");
>> +                memcpy(drv->buf, buf_ptr, count);
>> +                drv->buflen = count;
>> +                break;
>> +            }
>> +        }
>> +    } else {
>> +        /* haven't got a full VPPacket yet, buffer for later */
>> +        buf_ptr = drv->buf + drv->buflen;
>> +        memcpy(buf_ptr, buf, count);
>> +        drv->buflen += count;
>
> With this algorithm you can hold some data hostage indefinitely.  You
> either need to send out smaller packets of whatever data you receive or
> maybe have a timer that periodically fires to flush drv->buf.  Any
> thoughts on what you plan to do here?
>

All data is written to the channel in set-size packets 
(sizeof(VPPacket)), so if we only read a partial packet it's presumably 
because the other is either still sending, or it's sitting in a buffer 
in the underlying device (isa-serial/virtio-serial/etc). So assuming 
things eventually get flushed we *shouldn't* have an issue with this.

...that said...I'm currently debugging a issue over virtio-serial where 
data doesn't seem like it's getting flushed automatically so I'll 
definitely be looking for problem areas like this in the code :)

Might there be some subtlety about how virtio handles kicks/flushes that 
I might be missing? An example is I forward an ssh session over the 
channel to the guest's ssh server, do reads (top -d .001 for instance), 
let it run for a while, then start typing. What'll happen is a character 
echo won't get written to my terminal till some number of additional 
characters are typed. i.e.

my input:    echo "hello there this is a test"<enter>
my terminal: echo "he
my input:    echo "more"<enter>
my terminal: echo "hello there th

It doesn't behave like this initially though, and I haven't seen this 
with net or isa-serial.

Thought I'd ask...I'll keep looking in the meantime.

Patch

diff --git a/virtproxy.c b/virtproxy.c
index 20532c2..c9c3022 100644
--- a/virtproxy.c
+++ b/virtproxy.c
@@ -33,6 +33,7 @@ 
 #define VP_SERVICE_ID_LEN 32    /* max length of service id string */
 #define VP_PKT_DATA_LEN 1024    /* max proxied bytes per VPPacket */
 #define VP_CONN_DATA_LEN 1024   /* max bytes conns can send at a time */
+#define VP_CHAN_DATA_LEN 4096   /* max bytes channel can send at a time */
 #define VP_MAGIC 0x1F374059
 
 /* listening fd, one for each service we're forwarding to remote end */
@@ -150,6 +151,8 @@  static QemuOptsList vp_socket_opts = {
     },
 };
 
+static void vp_channel_read(void *opaque);
+
 /* get VPConn by fd, "client" denotes whether to look for client or server */
 static VPConn *get_conn(const VPDriver *drv, int fd, bool client)
 {
@@ -230,3 +233,83 @@  static void vp_channel_accept(void *opaque)
     /* dont accept anymore connections until channel_fd is closed */
     vp_set_fd_handler(drv->listen_fd, NULL, NULL, NULL);
 }
+
+/* read handler for communication channel
+ *
+ * de-multiplexes data coming in over the channel. for control messages
+ * we process them here, for data destined for a service or client we
+ * send it to the appropriate FD.
+ */
+static void vp_channel_read(void *opaque)
+{
+    VPDriver *drv = opaque;
+    VPPacket pkt;
+    int count, ret, buf_offset;
+    char buf[VP_CHAN_DATA_LEN];
+    char *pkt_ptr, *buf_ptr;
+
+    TRACE("called with opaque: %p", drv);
+
+    count = read(drv->channel_fd, buf, sizeof(buf));
+
+    if (count == -1) {
+        LOG("read() failed: %s", strerror(errno));
+        return;
+    } else if (count == 0) {
+        /* TODO: channel closed, this probably shouldn't happen for guest-side
+         * serial/virtio-serial connections, but need to confirm and consider
+         * what should happen in this case. as it stands this virtproxy instance
+         * is basically defunct at this point, same goes for "client" instances
+         * of virtproxy where the remote end has hung-up.
+         */
+        LOG("channel connection closed");
+        vp_set_fd_handler(drv->channel_fd, NULL, NULL, drv);
+        drv->channel_fd = -1;
+        if (drv->listen_fd) {
+            vp_set_fd_handler(drv->listen_fd, vp_channel_accept, NULL, drv);
+        }
+        /* TODO: should close/remove/delete all existing VPConns here */
+    }
+
+    if (drv->buflen + count >= sizeof(VPPacket)) {
+        TRACE("initial packet, drv->buflen: %d", drv->buflen);
+        pkt_ptr = (char *)&pkt;
+        memcpy(pkt_ptr, drv->buf, drv->buflen);
+        pkt_ptr += drv->buflen;
+        memcpy(pkt_ptr, buf, sizeof(VPPacket) - drv->buflen);
+        /* handle first packet */
+        ret = vp_handle_packet(drv, &pkt);
+        if (ret != 0) {
+            LOG("error handling packet");
+        }
+        /* handle the rest of the buffer */
+        buf_offset = sizeof(VPPacket) - drv->buflen;
+        drv->buflen = 0;
+        buf_ptr = buf + buf_offset;
+        count -= buf_offset;
+        while (count > 0) {
+            if (count >= sizeof(VPPacket)) {
+                /* handle full packet */
+                TRACE("additional packet, drv->buflen: %d", drv->buflen);
+                memcpy((void *)&pkt, buf_ptr, sizeof(VPPacket));
+                ret = vp_handle_packet(drv, &pkt);
+                if (ret != 0) {
+                    LOG("error handling packet");
+                }
+                count -= sizeof(VPPacket);
+                buf_ptr += sizeof(VPPacket);
+            } else {
+                /* buffer the remainder */
+                TRACE("buffering packet");
+                memcpy(drv->buf, buf_ptr, count);
+                drv->buflen = count;
+                break;
+            }
+        }
+    } else {
+        /* haven't got a full VPPacket yet, buffer for later */
+        buf_ptr = drv->buf + drv->buflen;
+        memcpy(buf_ptr, buf, count);
+        drv->buflen += count;
+    }
+}