diff mbox

[RFC,v5,04/21] virtagent: transport definitions and job callbacks

Message ID 1291399402-20366-5-git-send-email-mdroth@linux.vnet.ibm.com
State New
Headers show

Commit Message

Michael Roth Dec. 3, 2010, 6:03 p.m. UTC
Async read/send handlers for managing RPC/HTTP request/responses. The
read handler runs continuously and calls into client or server RPC
callbacks based on the HTTP header. The send handlers are started up to
send RPC client requests/server responses.

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 virtagent-common.c |  332 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 332 insertions(+), 0 deletions(-)

Comments

Adam Litke Dec. 6, 2010, 10:02 p.m. UTC | #1
On Fri, 2010-12-03 at 12:03 -0600, Michael Roth wrote:
> +static void va_http_send_handler(void *opaque)
> +{
> +    VAHTState *s = &va_state->send_state;
> +    enum va_http_status http_status;
> +    int fd = va_state->fd;
> +    int ret;
> +
> +    TRACE("called, fd: %d", fd);
> +
> +    switch (s->state) {

Why is there a VA_SEND_START state when it always falls through to
VA_SEND_HDR?  Is there any difference between these?

> +    case VA_SEND_START:
> +        s->state = VA_SEND_HDR;
> +    case VA_SEND_HDR:
> +        do {
> +            ret = write(fd, s->hdr + s->hdr_pos, s->hdr_len - s->hdr_pos);
> +            if (ret <= 0) {
> +                break;
> +            }
> +            s->hdr_pos += ret;
> +        } while (s->hdr_pos < s->hdr_len);
> +        if (ret == -1) {
> +            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
> +                return;
> +            } else {
> +                LOG("error writing header: %s", strerror(errno));
> +                goto out_bad;
> +            }
> +        } else if (ret == 0) {
> +            LOG("connected closed unexpectedly");
> +            goto out_bad;
> +        } else {
> +            s->state = VA_SEND_BODY;
> +        }
> +    case VA_SEND_BODY:
> +        do {
> +            ret = write(fd, s->content + s->content_pos,
> +                        s->content_len - s->content_pos);
> +            if (ret <= 0) {
> +                break;
> +            }
> +            s->content_pos += ret;
> +        } while (s->content_pos < s->content_len);
> +        if (ret == -1) {
> +            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
> +                return;
> +            } else {
> +                LOG("error writing content: %s", strerror(errno));
> +                goto out_bad;
> +            }
> +        } else if (ret == 0) {
> +            LOG("connected closed unexpectedly");
> +            goto out_bad;
> +        } else {
> +            http_status = VA_HTTP_STATUS_OK;
> +            goto out;
> +        }
> +    default:
> +        LOG("unknown state");
> +        goto out_bad;
> +    }
> +
> +out_bad:
> +    http_status = VA_HTTP_STATUS_ERROR;
> +out:
> +    s->send_cb(http_status, s->content, s->content_len);
> +    qemu_set_fd_handler(fd, va_http_read_handler, NULL, NULL);
> +}
Michael Roth Dec. 6, 2010, 10:34 p.m. UTC | #2
On 12/06/2010 04:02 PM, Adam Litke wrote:
> On Fri, 2010-12-03 at 12:03 -0600, Michael Roth wrote:
>> +static void va_http_send_handler(void *opaque)
>> +{
>> +    VAHTState *s =&va_state->send_state;
>> +    enum va_http_status http_status;
>> +    int fd = va_state->fd;
>> +    int ret;
>> +
>> +    TRACE("called, fd: %d", fd);
>> +
>> +    switch (s->state) {
>
> Why is there a VA_SEND_START state when it always falls through to
> VA_SEND_HDR?  Is there any difference between these?
>

Nope, not at the moment. I'll stick with just _HDR for now, but if we 
ever need to do some initialization or anything before we start 
sending/reading that's what this would be for.

>> +    case VA_SEND_START:
>> +        s->state = VA_SEND_HDR;
>> +    case VA_SEND_HDR:
>> +        do {
>> +            ret = write(fd, s->hdr + s->hdr_pos, s->hdr_len - s->hdr_pos);
>> +            if (ret<= 0) {
>> +                break;
>> +            }
>> +            s->hdr_pos += ret;
>> +        } while (s->hdr_pos<  s->hdr_len);
>> +        if (ret == -1) {
>> +            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
>> +                return;
>> +            } else {
>> +                LOG("error writing header: %s", strerror(errno));
>> +                goto out_bad;
>> +            }
>> +        } else if (ret == 0) {
>> +            LOG("connected closed unexpectedly");
>> +            goto out_bad;
>> +        } else {
>> +            s->state = VA_SEND_BODY;
>> +        }
>> +    case VA_SEND_BODY:
>> +        do {
>> +            ret = write(fd, s->content + s->content_pos,
>> +                        s->content_len - s->content_pos);
>> +            if (ret<= 0) {
>> +                break;
>> +            }
>> +            s->content_pos += ret;
>> +        } while (s->content_pos<  s->content_len);
>> +        if (ret == -1) {
>> +            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
>> +                return;
>> +            } else {
>> +                LOG("error writing content: %s", strerror(errno));
>> +                goto out_bad;
>> +            }
>> +        } else if (ret == 0) {
>> +            LOG("connected closed unexpectedly");
>> +            goto out_bad;
>> +        } else {
>> +            http_status = VA_HTTP_STATUS_OK;
>> +            goto out;
>> +        }
>> +    default:
>> +        LOG("unknown state");
>> +        goto out_bad;
>> +    }
>> +
>> +out_bad:
>> +    http_status = VA_HTTP_STATUS_ERROR;
>> +out:
>> +    s->send_cb(http_status, s->content, s->content_len);
>> +    qemu_set_fd_handler(fd, va_http_read_handler, NULL, NULL);
>> +}
>
Jes Sorensen Dec. 7, 2010, 1:44 p.m. UTC | #3
On 12/03/10 19:03, Michael Roth wrote:
> +static void va_server_read_cb(const char *content, size_t content_len)
> +{
> +    xmlrpc_mem_block *resp_xml;
> +    VAServerData *server_data = &va_state->server_data;
> +    int ret;
> +
> +    TRACE("called");
> +    resp_xml = xmlrpc_registry_process_call(&server_data->env,
> +                                            server_data->registry,
> +                                            NULL, content, content_len);
> +    if (resp_xml == NULL) {
> +        LOG("error processing RPC request");
> +        goto out_bad;
> +    }
> +
> +    ret = va_server_job_add(resp_xml);
> +    if (ret != 0) {
> +        LOG("error adding server job: %s", strerror(ret));
> +    }
> +
> +    return;
> +out_bad:
> +    /* TODO: should reset state here */
> +    return;

Looks like some missing error handling is needed here?

> +static void va_rpc_parse_hdr(VAHTState *s)
> +{
> +    int i, line_pos = 0;
> +    bool first_line = true;
> +    char line_buf[4096];

In 03/21 you defined VA_HDR_LEN_MAX to 4096, here you hard code the
value .... sounds like something begging to go wrong.

> +static int va_end_of_header(char *buf, int end_pos)
> +{
> +    return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
> +}

Maybe I am missing something here, but it looks like you do a strncmp to
a char that is one past the end of the buffer, or? If this is
intentional, please document it.

All this http parsing code leaves the question open why you do it
manually, instead of relying on a library?

Cheers,
Jes
Michael Roth Dec. 7, 2010, 5:19 p.m. UTC | #4
On 12/07/2010 07:44 AM, Jes Sorensen wrote:
> On 12/03/10 19:03, Michael Roth wrote:
>> +static void va_server_read_cb(const char *content, size_t content_len)
>> +{
>> +    xmlrpc_mem_block *resp_xml;
>> +    VAServerData *server_data =&va_state->server_data;
>> +    int ret;
>> +
>> +    TRACE("called");
>> +    resp_xml = xmlrpc_registry_process_call(&server_data->env,
>> +                                            server_data->registry,
>> +                                            NULL, content, content_len);
>> +    if (resp_xml == NULL) {
>> +        LOG("error processing RPC request");
>> +        goto out_bad;
>> +    }
>> +
>> +    ret = va_server_job_add(resp_xml);
>> +    if (ret != 0) {
>> +        LOG("error adding server job: %s", strerror(ret));
>> +    }
>> +
>> +    return;
>> +out_bad:
>> +    /* TODO: should reset state here */
>> +    return;
>
> Looks like some missing error handling is needed here?
>
>> +static void va_rpc_parse_hdr(VAHTState *s)
>> +{
>> +    int i, line_pos = 0;
>> +    bool first_line = true;
>> +    char line_buf[4096];
>
> In 03/21 you defined VA_HDR_LEN_MAX to 4096, here you hard code the
> value .... sounds like something begging to go wrong.
>
>> +static int va_end_of_header(char *buf, int end_pos)
>> +{
>> +    return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
>> +}
>
> Maybe I am missing something here, but it looks like you do a strncmp to
> a char that is one past the end of the buffer, or? If this is
> intentional, please document it.
>

buf+end_pos points to the last char we read (rather than being an offset 
to the current position). So it stops comparing when it reaches 
buf+end_pos (buf=0 + end_pos=2 implies 3 characters)

For some reason this confused the hell out of me when I looked over it 
again as well. Alternatively I can do:

static int va_end_of_header(char *buf, int end_pos)
{
     return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
}
...
va_end_of_header(s->hdr, s->hdr_pos - 1)

->

static int va_end_of_header(char *buf, int cur_pos)
{
     return !strncmp(buf+(cur_pos-3), "\n\r\n", 3);
}
...
va_end_of_header(s->hdr, s->hdr_pos);

It does seem easier to parse...

> All this http parsing code leaves the question open why you do it
> manually, instead of relying on a library?
>

Something like libcurl? At some point we didn't attempt to use libraries 
provide by xmlrpc-c (which uses libcurl for http transport) for the 
client and server. The problem there is that libcurl really wants and 
tcp socket read and write from, whereas we need to support tcp/unix 
sockets on the host side and isa/virtio serial ports on the guest side.

Even assuming we could hook in wrappers for these other types of 
sockets/channels, there's also the added complexity since dropping 
virtproxy of multiplexing HTTP/RPCs using a single stream, whereas 
something like libcurl would, understandably, assume it has a dedicated 
stream to read/write from. So we wouldn't really save any work or code, 
unfortunately.

> Cheers,
> Jes
>
Jes Sorensen Dec. 8, 2010, 7:16 p.m. UTC | #5
On 12/07/10 18:19, Michael Roth wrote:
> On 12/07/2010 07:44 AM, Jes Sorensen wrote:
>>> +static int va_end_of_header(char *buf, int end_pos)
>>> +{
>>> +    return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
>>> +}
>>
>> Maybe I am missing something here, but it looks like you do a strncmp to
>> a char that is one past the end of the buffer, or? If this is
>> intentional, please document it.
>>
> 
> buf+end_pos points to the last char we read (rather than being an offset
> to the current position). So it stops comparing when it reaches
> buf+end_pos (buf=0 + end_pos=2 implies 3 characters)
> 
> For some reason this confused the hell out of me when I looked over it
> again as well. Alternatively I can do:
> 
> static int va_end_of_header(char *buf, int end_pos)
> {
>     return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
> }
> ...
> va_end_of_header(s->hdr, s->hdr_pos - 1)
> 
> ->
> 
> static int va_end_of_header(char *buf, int cur_pos)
> {
>     return !strncmp(buf+(cur_pos-3), "\n\r\n", 3);
> }
> ...
> va_end_of_header(s->hdr, s->hdr_pos);
> 
> It does seem easier to parse...

I would prefer this, somewhat easier to parse.

>> All this http parsing code leaves the question open why you do it
>> manually, instead of relying on a library? 
> Something like libcurl? At some point we didn't attempt to use libraries
> provide by xmlrpc-c (which uses libcurl for http transport) for the
> client and server. The problem there is that libcurl really wants and
> tcp socket read and write from, whereas we need to support tcp/unix
> sockets on the host side and isa/virtio serial ports on the guest side.
> 
> Even assuming we could hook in wrappers for these other types of
> sockets/channels, there's also the added complexity since dropping
> virtproxy of multiplexing HTTP/RPCs using a single stream, whereas
> something like libcurl would, understandably, assume it has a dedicated
> stream to read/write from. So we wouldn't really save any work or code,
> unfortunately.

I guess I am just a little worried that we end up with errors in the
code that could have been solved by using a maintainer http library, but
if it isn't feasible I guess not.

Cheers,
Jes
diff mbox

Patch

diff --git a/virtagent-common.c b/virtagent-common.c
index 45f9d9f..1ed2b55 100644
--- a/virtagent-common.c
+++ b/virtagent-common.c
@@ -89,6 +89,12 @@  typedef struct VAState {
 
 static VAState *va_state;
 
+static bool va_set_client_state(enum va_client_state client_state);
+static VAServerJob *va_pop_server_job(void);
+static VAClientJob *va_pop_client_job(void);
+static int va_server_job_add(xmlrpc_mem_block *resp_xml);
+static int va_kick(void);
+
 static VAClientJob *va_current_client_job(void)
 {
     TRACE("called");
@@ -96,6 +102,332 @@  static VAClientJob *va_current_client_job(void)
 }
 
 /***********************************************************/
+/* callbacks for read/send handlers */
+
+static void va_client_send_cb(enum va_http_status http_status,
+                              const char *content, size_t content_len)
+{
+    VAClientJob *client_job = va_current_client_job();
+
+    TRACE("called");
+    assert(client_job != NULL);
+
+    if (http_status != VA_HTTP_STATUS_OK) {
+        /* TODO: we should reset everything at this point...guest/host will
+         * be out of whack with each other since there's no way to let the
+         * other know job failed (server or client job) if the send channel
+         * is down. But how do we induce the other side to do the same?
+         */
+        LOG("error sending http request");
+    }
+
+    /* request sent ok. free up request xml, then move to
+     * wait (for response) state
+     */
+    XMLRPC_MEMBLOCK_FREE(char, client_job->req_data);
+    assert(va_set_client_state(VA_CLIENT_WAIT));
+}
+
+static void va_server_send_cb(enum va_http_status http_status,
+                              const char *content, size_t content_len)
+{
+    VAServerJob *server_job = va_pop_server_job();
+
+    TRACE("called");
+    assert(server_job != NULL);
+
+    if (http_status != VA_HTTP_STATUS_OK) {
+        /* TODO: we should reset everything at this point...guest/host will
+         * be out of whack with each other since there's no way to let the
+         * other know job failed (server or client job) if the send channel
+         * is down
+         */
+        LOG("error sending http response");
+        return;
+    }
+
+    /* response sent ok, cleanup server job and kick off the next one */
+    XMLRPC_MEMBLOCK_FREE(char, server_job->resp_data);
+    qemu_free(server_job);
+    va_kick();
+}
+
+static void va_client_read_cb(const char *content, size_t content_len)
+{
+    VAClientJob *client_job;
+
+    client_job = va_pop_client_job();
+    assert(client_job != NULL);
+TRACE("marker");
+    client_job->cb(content, content_len, client_job->mon_cb,
+                   client_job->mon_data);
+    va_kick();
+}
+
+static void va_server_read_cb(const char *content, size_t content_len)
+{
+    xmlrpc_mem_block *resp_xml;
+    VAServerData *server_data = &va_state->server_data;
+    int ret;
+
+    TRACE("called");
+    resp_xml = xmlrpc_registry_process_call(&server_data->env,
+                                            server_data->registry,
+                                            NULL, content, content_len);
+    if (resp_xml == NULL) {
+        LOG("error processing RPC request");
+        goto out_bad;
+    }
+
+    ret = va_server_job_add(resp_xml);
+    if (ret != 0) {
+        LOG("error adding server job: %s", strerror(ret));
+    }
+
+    return;
+out_bad:
+    /* TODO: should reset state here */
+    return;
+}
+
+static void va_http_read_cb(enum va_http_status http_status,
+                            const char *content, size_t content_len, bool is_request)
+{
+    TRACE("called");
+    if (http_status != VA_HTTP_STATUS_OK) {
+        LOG("error reading http %s", is_request ? "request" : "response");
+        content = NULL;
+    }
+
+    if (is_request) {
+        va_server_read_cb(content, content_len);
+    } else {
+        va_client_read_cb(content, content_len);
+    }
+
+    return;
+}
+
+/***********************************************************/
+/* utility functions for handling http calls */
+
+#define VA_HTTP_REQUEST 1
+#define VA_HTTP_RESPONSE 2
+
+static void va_http_hdr_init(VAHTState *s, int request_type) {
+    const char *preamble;
+
+    TRACE("called");
+    /* essentially ignored in the context of virtagent, but might as well */
+    if (request_type == VA_HTTP_REQUEST) {
+        preamble = "POST /RPC2 HTTP/1.1";
+    } else if (request_type == VA_HTTP_RESPONSE) {
+        preamble = "HTTP/1.1 200 OK";
+    } else {
+        s->hdr_len = 0;
+        return;
+    }
+    s->hdr_len = sprintf(s->hdr,
+                         "%s" EOL
+                         "Content-Type: text/xml" EOL
+                         "Content-Length: %u" EOL EOL,
+                         preamble,
+                         (uint32_t)s->content_len);
+}
+
+static void va_rpc_parse_hdr(VAHTState *s)
+{
+    int i, line_pos = 0;
+    bool first_line = true;
+    char line_buf[4096];
+
+    TRACE("called");
+
+    for (i = 0; i < VA_HDR_LEN_MAX; ++i) {
+        if (s->hdr[i] != '\n') {
+            /* read line */
+            line_buf[line_pos++] = s->hdr[i];
+        } else {
+            /* process line */
+            if (first_line) {
+                s->is_request = (strncmp(line_buf, "POST", 4) == 0) ?
+                                true : false;
+                first_line = false;
+            }
+            if (strncmp(line_buf, "Content-Length: ", 16) == 0) {
+                s->content_len = atoi(&line_buf[16]);
+                return;
+            }
+            line_pos = 0;
+        }
+    }
+}
+
+static int va_end_of_header(char *buf, int end_pos)
+{
+    return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
+}
+
+/***********************************************************/
+/* read/send handlers */
+
+static void va_http_read_handler(void *opaque)
+{
+    VAHTState *s = &va_state->read_state;
+    enum va_http_status http_status;
+    int fd = va_state->fd;
+    int ret;
+
+    TRACE("called with opaque: %p", opaque);
+
+    switch (s->state) {
+    case VA_READ_START:
+        s->state = VA_READ_HDR;
+    case VA_READ_HDR:
+        while((ret = read(fd, s->hdr + s->hdr_pos, 1)) > 0
+              && s->hdr_pos < VA_HDR_LEN_MAX) {
+            s->hdr_pos += ret;
+            if (va_end_of_header(s->hdr, s->hdr_pos - 1)) {
+                break;
+            }
+        }
+        if (ret == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                return;
+            } else {
+                LOG("error reading connection: %s", strerror(errno));
+                goto out_bad;
+            }
+        } else if (ret == 0) {
+            LOG("connected closed unexpectedly");
+            goto out_bad;
+        } else if (s->hdr_pos >= VA_HDR_LEN_MAX) {
+            LOG("http header too long");
+            goto out_bad;
+        } else {
+            s->content_len = -1;
+            va_rpc_parse_hdr(s);
+            if (s->content_len == -1) {
+                LOG("malformed http header");
+                goto out_bad;
+            } else if (s->content_len > VA_CONTENT_LEN_MAX) {
+                LOG("http content length too long");
+                goto out_bad;
+            }
+            s->content = qemu_mallocz(s->content_len);
+            s->state = VA_READ_BODY;
+        }
+    case VA_READ_BODY:
+        while(s->content_pos < s->content_len) {
+            ret = read(fd, s->content + s->content_pos,
+                       s->content_len - s->content_pos);
+            if (ret == -1) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK
+                    || errno == EINTR) {
+                    return;
+                } else {
+                    LOG("error reading connection: %s", strerror(errno));
+                    goto out_bad;
+                }
+            } else if (ret == 0) {
+                LOG("connection closed unexpectedly:"
+                    " read %u bytes, expected %u bytes",
+                    (unsigned int)s->content_pos, (unsigned int)s->content_len);
+                goto out_bad;
+            }
+            s->content_pos += ret;
+        }
+
+        http_status = VA_HTTP_STATUS_OK;
+        goto out;
+    default:
+        LOG("unknown state");
+        goto out_bad;
+    }
+
+out_bad:
+    http_status = VA_HTTP_STATUS_ERROR;
+out:
+    /* handle the response or request we just read */
+    s->read_cb(http_status, s->content, s->content_len, s->is_request);
+    /* restart read handler */
+    s->state = VA_READ_START;
+    s->hdr_pos = 0;
+    s->content_len = 0;
+    s->content_pos = 0;
+    qemu_free(s->content);
+    http_status = VA_HTTP_STATUS_NEW;
+}
+
+static void va_http_send_handler(void *opaque)
+{
+    VAHTState *s = &va_state->send_state;
+    enum va_http_status http_status;
+    int fd = va_state->fd;
+    int ret;
+
+    TRACE("called, fd: %d", fd);
+
+    switch (s->state) {
+    case VA_SEND_START:
+        s->state = VA_SEND_HDR;
+    case VA_SEND_HDR:
+        do {
+            ret = write(fd, s->hdr + s->hdr_pos, s->hdr_len - s->hdr_pos);
+            if (ret <= 0) {
+                break;
+            }
+            s->hdr_pos += ret;
+        } while (s->hdr_pos < s->hdr_len);
+        if (ret == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                return;
+            } else {
+                LOG("error writing header: %s", strerror(errno));
+                goto out_bad;
+            }
+        } else if (ret == 0) {
+            LOG("connected closed unexpectedly");
+            goto out_bad;
+        } else {
+            s->state = VA_SEND_BODY;
+        }
+    case VA_SEND_BODY:
+        do {
+            ret = write(fd, s->content + s->content_pos,
+                        s->content_len - s->content_pos);
+            if (ret <= 0) {
+                break;
+            }
+            s->content_pos += ret;
+        } while (s->content_pos < s->content_len);
+        if (ret == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+                return;
+            } else {
+                LOG("error writing content: %s", strerror(errno));
+                goto out_bad;
+            }
+        } else if (ret == 0) {
+            LOG("connected closed unexpectedly");
+            goto out_bad;
+        } else {
+            http_status = VA_HTTP_STATUS_OK;
+            goto out;
+        }
+    default:
+        LOG("unknown state");
+        goto out_bad;
+    }
+
+out_bad:
+    http_status = VA_HTTP_STATUS_ERROR;
+out:
+    s->send_cb(http_status, s->content, s->content_len);
+    qemu_set_fd_handler(fd, va_http_read_handler, NULL, NULL);
+}
+
+/***********************************************************/
 /* functions for starting/managing client/server rpc jobs */
 
 static int va_send_server_response(VAServerJob *server_job)