Patchwork [RFC,v5,03/21] virtagent: common code for managing client/server rpc jobs

login
register
mail settings
Submitter Michael Roth
Date Dec. 3, 2010, 6:03 p.m.
Message ID <1291399402-20366-4-git-send-email-mdroth@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/74176/
State New
Headers show

Comments

Michael Roth - Dec. 3, 2010, 6:03 p.m.
This implements a simple state machine to manage client/server rpc
jobs being multiplexed over a single channel.

A client job consists of sending an rpc request, reading an
rpc response, then making the appropriate callbacks. We allow one
client job to be processed at a time, which will make the following
state transitions:

VA_CLIENT_IDLE -> VA_CLIENT_SEND (job queued, send channel open)
VA_CLIENT_SEND -> VA_CLIENT_WAIT (request sent, awaiting response)
VA_CLIENT_WAIT -> VA_CLIENT_IDLE (response recieved, callbacks made)

A server job consists of recieving an rpc request, generating a
response, then sending the response. We expect to receive one server
request at a time due to the 1 at a time restriction for client jobs.
Server jobs make the following transitions:

VA_SERVER_IDLE -> VA_SERVER_WAIT (recieved/executed request, send
channel busy, response deferred)
VA_SERVER_IDLE -> VA_SERVER_SEND (recieved/executed request, send
channel open, sending response)
VA_SERVER_WAIT -> VA_SERVER_SEND (send channel now open, sending
response)
VA_SERVER_SEND -> VA_SERVER_IDLE (response sent)

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 virtagent-common.c |  427 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 virtagent-common.h |   66 ++++++++
 2 files changed, 493 insertions(+), 0 deletions(-)
 create mode 100644 virtagent-common.c
 create mode 100644 virtagent-common.h
Adam Litke - Dec. 6, 2010, 9:54 p.m.
On Fri, 2010-12-03 at 12:03 -0600, Michael Roth wrote:
> +/* create new client job and then put it on the queue. this can be
> + * called externally from virtagent. Since there can only be one virtagent
> + * instance we access state via an object-scoped global rather than pass
> + * it around.
> + *
> + * if this is successful virtagent will handle cleanup of req_xml after
> + * making the appropriate callbacks, otherwise callee should handle it
> + */

Explain please. Do you mean caller should handle it? Are you trying to
say that this function, when successful, "steals" the reference to
req_xml?

> +int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
> +                      MonitorCompletion *mon_cb, void *mon_data)
> +{
> +    int ret;
> +    VAClientJob *client_job;
> +    TRACE("called");
> +
> +    client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
> +    if (client_job == NULL) {
> +        return -EINVAL;
> +    }
> +
> +    ret = va_push_client_job(client_job);
> +    if (ret != 0) {
> +        LOG("error adding client to queue: %s", strerror(ret));
> +        qemu_free(client_job);
> +        return ret;
> +    }
> +
> +    return 0;
> +}
Adam Litke - Dec. 6, 2010, 9:57 p.m.
On Fri, 2010-12-03 at 12:03 -0600, Michael Roth wrote: 
> +/* create new client job and then put it on the queue. this can be
> + * called externally from virtagent. Since there can only be one virtagent
> + * instance we access state via an object-scoped global rather than pass
> + * it around.
> + *
> + * if this is successful virtagent will handle cleanup of req_xml after
> + * making the appropriate callbacks, otherwise callee should handle it
> + */

Explain please. Do you mean caller should handle it? Are you trying to
say that this function, when successful, "steals" the reference to
req_xml?

> +int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
> +                      MonitorCompletion *mon_cb, void *mon_data)
> +{
> +    int ret;
> +    VAClientJob *client_job;
> +    TRACE("called");
> +
> +    client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
> +    if (client_job == NULL) {
> +        return -EINVAL;
> +    }
> +
> +    ret = va_push_client_job(client_job);
> +    if (ret != 0) {
> +        LOG("error adding client to queue: %s", strerror(ret));
> +        qemu_free(client_job);
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +/* create new server job and then put it on the queue in wait state
> + * this should only be called from within our read handler callback
> + */

Since this function is only 4 lines and has only one valid call site.
perhaps its better to fold it directly into the read handler callback.

> +static int va_server_job_add(xmlrpc_mem_block *resp_xml)
> +{
> +    VAServerJob *server_job;
> +    TRACE("called");
> +
> +    server_job = va_server_job_new(resp_xml);
> +    assert(server_job != NULL);
> +    va_push_server_job(server_job);
> +    return 0;
> +}
Michael Roth - Dec. 6, 2010, 10:15 p.m.
On 12/06/2010 03:54 PM, Adam Litke wrote:
> On Fri, 2010-12-03 at 12:03 -0600, Michael Roth wrote:
>> +/* create new client job and then put it on the queue. this can be
>> + * called externally from virtagent. Since there can only be one virtagent
>> + * instance we access state via an object-scoped global rather than pass
>> + * it around.
>> + *
>> + * if this is successful virtagent will handle cleanup of req_xml after
>> + * making the appropriate callbacks, otherwise callee should handle it
>> + */
>
> Explain please. Do you mean caller should handle it? Are you trying to
> say that this function, when successful, "steals" the reference to
> req_xml?
>

Yup, should be caller. And yes, cleanup duty gets taken over if the call 
succeeds (after transmitting the request we have no need for the 
req_xml, so it didn't seem to make sense to carry it around just so the 
caller can free it when it gets it's call later on)

>> +int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
>> +                      MonitorCompletion *mon_cb, void *mon_data)
>> +{
>> +    int ret;
>> +    VAClientJob *client_job;
>> +    TRACE("called");
>> +
>> +    client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
>> +    if (client_job == NULL) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    ret = va_push_client_job(client_job);
>> +    if (ret != 0) {
>> +        LOG("error adding client to queue: %s", strerror(ret));
>> +        qemu_free(client_job);
>> +        return ret;
>> +    }
>> +
>> +    return 0;
>> +}
>
Michael Roth - Dec. 6, 2010, 10:24 p.m.
On 12/06/2010 03:57 PM, Adam Litke wrote:
> On Fri, 2010-12-03 at 12:03 -0600, Michael Roth wrote:
>> +/* create new client job and then put it on the queue. this can be
>> + * called externally from virtagent. Since there can only be one virtagent
>> + * instance we access state via an object-scoped global rather than pass
>> + * it around.
>> + *
>> + * if this is successful virtagent will handle cleanup of req_xml after
>> + * making the appropriate callbacks, otherwise callee should handle it
>> + */
>
> Explain please. Do you mean caller should handle it? Are you trying to
> say that this function, when successful, "steals" the reference to
> req_xml?
>
>> +int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
>> +                      MonitorCompletion *mon_cb, void *mon_data)
>> +{
>> +    int ret;
>> +    VAClientJob *client_job;
>> +    TRACE("called");
>> +
>> +    client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
>> +    if (client_job == NULL) {
>> +        return -EINVAL;
>> +    }
>> +
>> +    ret = va_push_client_job(client_job);
>> +    if (ret != 0) {
>> +        LOG("error adding client to queue: %s", strerror(ret));
>> +        qemu_free(client_job);
>> +        return ret;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +/* create new server job and then put it on the queue in wait state
>> + * this should only be called from within our read handler callback
>> + */
>
> Since this function is only 4 lines and has only one valid call site.
> perhaps its better to fold it directly into the read handler callback.
>
>> +static int va_server_job_add(xmlrpc_mem_block *resp_xml)
>> +{
>> +    VAServerJob *server_job;
>> +    TRACE("called");
>> +
>> +    server_job = va_server_job_new(resp_xml);
>> +    assert(server_job != NULL);
>> +    va_push_server_job(server_job);
>> +    return 0;
>> +}
>

What I was mainly shooting for was to have the entry-points for adding 
client and server jobs be clear and somewhat similar. I've actually 
moved the read handler callback body into 
virtagent-server.c:va_do_server_rpc() since then. So client jobs get 
added by hmp/qmp->virtagent:va_do_rpc()->va_push_client_job() and server 
jobs by read 
handler->virtagent-server.c:va_do_server_rpc()->va_push_server_job().
Jes Sorensen - Dec. 7, 2010, 1:38 p.m.
On 12/03/10 19:03, Michael Roth wrote:
> This implements a simple state machine to manage client/server rpc
> jobs being multiplexed over a single channel.
> 
> A client job consists of sending an rpc request, reading an
> rpc response, then making the appropriate callbacks. We allow one
> client job to be processed at a time, which will make the following
> state transitions:
> 
> VA_CLIENT_IDLE -> VA_CLIENT_SEND (job queued, send channel open)
> VA_CLIENT_SEND -> VA_CLIENT_WAIT (request sent, awaiting response)
> VA_CLIENT_WAIT -> VA_CLIENT_IDLE (response recieved, callbacks made)
> 
> A server job consists of recieving an rpc request, generating a
> response, then sending the response. We expect to receive one server
> request at a time due to the 1 at a time restriction for client jobs.
> Server jobs make the following transitions:
> 
> VA_SERVER_IDLE -> VA_SERVER_WAIT (recieved/executed request, send
> channel busy, response deferred)
> VA_SERVER_IDLE -> VA_SERVER_SEND (recieved/executed request, send
> channel open, sending response)
> VA_SERVER_WAIT -> VA_SERVER_SEND (send channel now open, sending
> response)
> VA_SERVER_SEND -> VA_SERVER_IDLE (response sent)
> 
> Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>

As mentioned before, I really don't understand why this is part of QEMU,
the guest agent really should be able to run totally outside of QEMU.

> +
> +#define DEBUG_VA
> +
> +#ifdef DEBUG_VA
> +#define TRACE(msg, ...) do { \
> +    fprintf(stderr, "%s:%s():L%d: " msg "\n", \
> +            __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \
> +} while(0)
> +#else
> +#define TRACE(msg, ...) \
> +    do { } while (0)
> +#endif
> +
> +#define LOG(msg, ...) do { \
> +    fprintf(stderr, "%s:%s(): " msg "\n", \
> +            __FILE__, __FUNCTION__, ## __VA_ARGS__); \
> +} while(0)

This must be like the 217th copy of these functions, could you please
use some of the code that is already in the tree, and make it generic if
needed.

> +
> +#define VERSION "1.0"
> +#define EOL "\r\n"
> +
> +#define VA_HDR_LEN_MAX 4096 /* http header limit */
> +#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */
> +#define VA_CLIENT_JOBS_MAX 5 /* max client rpcs we can queue */
> +#define VA_SERVER_JOBS_MAX 1 /* max server rpcs we can queue */

As mentioned last time, please make this stuff configurable and not hard
coded.

Cheers,
Jes
Michael Roth - Dec. 7, 2010, 3:02 p.m.
On 12/07/2010 07:38 AM, Jes Sorensen wrote:
> On 12/03/10 19:03, Michael Roth wrote:
>> This implements a simple state machine to manage client/server rpc
>> jobs being multiplexed over a single channel.
>>
>> A client job consists of sending an rpc request, reading an
>> rpc response, then making the appropriate callbacks. We allow one
>> client job to be processed at a time, which will make the following
>> state transitions:
>>
>> VA_CLIENT_IDLE ->  VA_CLIENT_SEND (job queued, send channel open)
>> VA_CLIENT_SEND ->  VA_CLIENT_WAIT (request sent, awaiting response)
>> VA_CLIENT_WAIT ->  VA_CLIENT_IDLE (response recieved, callbacks made)
>>
>> A server job consists of recieving an rpc request, generating a
>> response, then sending the response. We expect to receive one server
>> request at a time due to the 1 at a time restriction for client jobs.
>> Server jobs make the following transitions:
>>
>> VA_SERVER_IDLE ->  VA_SERVER_WAIT (recieved/executed request, send
>> channel busy, response deferred)
>> VA_SERVER_IDLE ->  VA_SERVER_SEND (recieved/executed request, send
>> channel open, sending response)
>> VA_SERVER_WAIT ->  VA_SERVER_SEND (send channel now open, sending
>> response)
>> VA_SERVER_SEND ->  VA_SERVER_IDLE (response sent)
>>
>> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
>
> As mentioned before, I really don't understand why this is part of QEMU,
> the guest agent really should be able to run totally outside of QEMU.
>
>> +
>> +#define DEBUG_VA
>> +
>> +#ifdef DEBUG_VA
>> +#define TRACE(msg, ...) do { \
>> +    fprintf(stderr, "%s:%s():L%d: " msg "\n", \
>> +            __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \
>> +} while(0)
>> +#else
>> +#define TRACE(msg, ...) \
>> +    do { } while (0)
>> +#endif
>> +
>> +#define LOG(msg, ...) do { \
>> +    fprintf(stderr, "%s:%s(): " msg "\n", \
>> +            __FILE__, __FUNCTION__, ## __VA_ARGS__); \
>> +} while(0)
>
> This must be like the 217th copy of these functions, could you please
> use some of the code that is already in the tree, and make it generic if
> needed.
>
>> +
>> +#define VERSION "1.0"
>> +#define EOL "\r\n"
>> +
>> +#define VA_HDR_LEN_MAX 4096 /* http header limit */
>> +#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */
>> +#define VA_CLIENT_JOBS_MAX 5 /* max client rpcs we can queue */
>> +#define VA_SERVER_JOBS_MAX 1 /* max server rpcs we can queue */
>
> As mentioned last time, please make this stuff configurable and not hard
> coded.
>

Yup, definitely on the TODO. Should be in the next round.

> Cheers,
> Jes
>

Patch

diff --git a/virtagent-common.c b/virtagent-common.c
new file mode 100644
index 0000000..45f9d9f
--- /dev/null
+++ b/virtagent-common.c
@@ -0,0 +1,427 @@ 
+/*
+ * virtagent - common host/guest RPC functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ *  Adam Litke        <aglitke@linux.vnet.ibm.com>
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "virtagent-common.h"
+
+typedef struct VAClientJob {
+    xmlrpc_mem_block *req_data;
+    char *resp_data;
+    size_t resp_data_len;
+    VAClientCallback *cb;
+    QTAILQ_ENTRY(VAClientJob) next;
+    /* for use by QMP functions */
+    MonitorCompletion *mon_cb;
+    void *mon_data;
+} VAClientJob;
+
+typedef struct VAServerJob {
+    xmlrpc_mem_block *resp_data;
+    char *req_data;
+    size_t req_data_len;
+    void *opaque;
+    QTAILQ_ENTRY(VAServerJob) next;
+} VAServerJob;
+
+enum va_http_status {
+    VA_HTTP_STATUS_NEW,
+    VA_HTTP_STATUS_OK,
+    VA_HTTP_STATUS_ERROR,
+};
+
+typedef void (VAHTSendCallback)(enum va_http_status http_status,
+                                const char *content, size_t content_len);
+typedef void (VAHTReadCallback)(enum va_http_status http_status,
+                                const char *content, size_t content_len,
+                                bool is_request);
+typedef struct VAHTState {
+    enum {
+        VA_SEND_START,
+        VA_SEND_HDR,
+        VA_SEND_BODY,
+        VA_READ_START,
+        VA_READ_HDR,
+        VA_READ_BODY,
+    } state;
+    char hdr[1024];
+    size_t hdr_len;
+    size_t hdr_pos;
+    char *content;
+    size_t content_len;
+    size_t content_pos;
+    VAHTSendCallback *send_cb;
+    VAHTReadCallback *read_cb;
+    bool is_request;
+} VAHTState;
+
+typedef struct VAState {
+    int fd;
+    enum va_client_state {
+        VA_CLIENT_IDLE = 0,
+        VA_CLIENT_SEND,     /* sending rpc request */
+        VA_CLIENT_WAIT,     /* waiting for rpc response */
+    } client_state;
+    enum va_server_state {
+        VA_SERVER_IDLE = 0,
+        VA_SERVER_WAIT,     /* waiting to send rpc response */
+        VA_SERVER_SEND,     /* sending rpc response */
+    } server_state;
+    VAClientData client_data;
+    VAServerData server_data;
+    int client_job_count;
+    QTAILQ_HEAD(, VAClientJob) client_jobs;
+    int server_job_count;
+    QTAILQ_HEAD(, VAServerJob) server_jobs;
+    /* for use by async send/read handlers for fd */
+    VAHTState send_state;
+    VAHTState read_state;
+} VAState;
+
+static VAState *va_state;
+
+static VAClientJob *va_current_client_job(void)
+{
+    TRACE("called");
+    return QTAILQ_FIRST(&va_state->client_jobs);
+}
+
+/***********************************************************/
+/* functions for starting/managing client/server rpc jobs */
+
+static int va_send_server_response(VAServerJob *server_job)
+{
+    VAHTState http_state;
+    TRACE("called");
+    http_state.content = XMLRPC_MEMBLOCK_CONTENTS(char, server_job->resp_data);
+    TRACE("server response: %s", http_state.content);
+    http_state.content_len = XMLRPC_MEMBLOCK_SIZE(char,
+                                                  server_job->resp_data);
+    http_state.content_pos = 0;
+    http_state.hdr_pos = 0;
+    http_state.state = VA_SEND_START;
+    http_state.send_cb = va_server_send_cb;
+    va_http_hdr_init(&http_state, VA_HTTP_RESPONSE);
+    va_state->send_state = http_state;
+    qemu_set_fd_handler(va_state->fd, va_http_read_handler,
+                      va_http_send_handler, NULL);
+    return 0;
+}
+
+static int va_send_client_request(VAClientJob *client_job)
+{
+    VAHTState http_state;
+    TRACE("called");
+    http_state.content = XMLRPC_MEMBLOCK_CONTENTS(char, client_job->req_data);
+    TRACE("client request: %s", http_state.content);
+    http_state.content_len = XMLRPC_MEMBLOCK_SIZE(char,
+                                                  client_job->req_data);
+    http_state.content_pos = 0;
+    http_state.hdr_pos = 0;
+    http_state.state = VA_SEND_START;
+    http_state.send_cb = va_client_send_cb;
+    va_http_hdr_init(&http_state, VA_HTTP_REQUEST);
+    va_state->send_state = http_state;
+    qemu_set_fd_handler(va_state->fd, va_http_send_handler,
+                      va_http_send_handler, NULL);
+    return 0;
+}
+
+/* do some sanity checks before setting client state */
+static bool va_set_client_state(enum va_client_state client_state)
+{
+    TRACE("setting client state to %d", client_state);
+    switch (client_state) {
+    case VA_CLIENT_IDLE:
+        assert(va_state->client_state == VA_CLIENT_IDLE ||
+               va_state->client_state == VA_CLIENT_WAIT);
+        break;
+    case VA_CLIENT_SEND:
+        assert(va_state->client_state == VA_CLIENT_IDLE);
+        break;
+    case VA_CLIENT_WAIT:
+        assert(va_state->client_state == VA_CLIENT_SEND);
+        break;
+    default:
+        LOG("invalid client state");
+        return false;
+    }
+    va_state->client_state = client_state;
+    return true;
+}
+
+/* do some sanity checks before setting server state */
+static bool va_set_server_state(enum va_server_state server_state)
+{
+    TRACE("setting server state to %d", server_state);
+    switch (server_state) {
+    case VA_SERVER_IDLE:
+        assert(va_state->server_state == VA_SERVER_IDLE ||
+               va_state->server_state == VA_SERVER_SEND);
+        break;
+    case VA_SERVER_WAIT:
+        assert(va_state->server_state == VA_SERVER_IDLE);
+        break;
+    case VA_SERVER_SEND:
+        assert(va_state->server_state == VA_SERVER_IDLE ||
+               va_state->server_state == VA_SERVER_WAIT);
+        break;
+    default:
+        LOG("invalid server state");
+        return false;
+    }
+    va_state->server_state = server_state;
+    return true;
+}
+
+/* xmit the next client/server job. for the client this entails sending
+ * a request to the remote server. for the server this entails sending a
+ * response to the remote client
+ *
+ * currently we only do one client job or one server job at a time. for
+ * situations where we start a client job but recieve a server job (remote
+ * rpc request) we go ahead and handle the server job before returning to
+ * handling the client job. TODO: there is potential for pipelining
+ * requests/responses for more efficient use of the channel.
+ *
+ * in all cases, we can only kick off client requests or server responses 
+ * when the send side of the channel is not being used
+ */
+static int va_kick(void)
+{
+    VAServerJob *server_job;
+    VAClientJob *client_job;
+    int ret;
+
+    TRACE("called");
+
+    /* handle server jobs first */
+    if (QTAILQ_EMPTY(&va_state->server_jobs)) {
+        assert(va_set_server_state(VA_SERVER_IDLE));
+    } else {
+        TRACE("handling server job queue");
+        if (va_state->client_state == VA_CLIENT_SEND) {
+            TRACE("send channel busy, deferring till available");
+            assert(va_set_server_state(VA_SERVER_WAIT));
+            goto out;
+        }
+        TRACE("send server response");
+        server_job = QTAILQ_FIRST(&va_state->server_jobs);
+
+        /* set up the send handler for the response */
+        ret = va_send_server_response(server_job);
+        if (ret != 0) {
+            LOG("error setting up send handler for server response");
+            goto out_bad;
+        }
+        assert(va_set_server_state(VA_SERVER_SEND));
+        goto out;
+    }
+
+    /* handle client jobs if nothing to do for server */
+    if (QTAILQ_EMPTY(&va_state->client_jobs)) {
+        assert(va_set_client_state(VA_CLIENT_IDLE));
+    } else {
+        TRACE("handling client job queue");
+        if (va_state->client_state != VA_CLIENT_IDLE) {
+            TRACE("client job in progress, returning");
+            goto out;
+        }
+
+        TRACE("sending new client request");
+        client_job = QTAILQ_FIRST(&va_state->client_jobs);
+        /* set up the send handler for the request, then put it on the
+         * wait queue till response is read
+         */
+        ret = va_send_client_request(client_job);
+        if (ret != 0) {
+            LOG("error setting up sendhandler for client request");
+            goto out_bad;
+        }
+        assert(va_set_client_state(VA_CLIENT_SEND));
+    }
+
+out:
+    return 0;
+out_bad:
+    return ret;
+}
+
+/* push new client job onto queue, */
+static int va_push_client_job(VAClientJob *client_job)
+{
+    TRACE("called");
+    assert(client_job != NULL);
+    if (va_state->client_job_count >= VA_CLIENT_JOBS_MAX) {
+        LOG("client job queue limit exceeded");
+        return -ENOBUFS;
+    }
+    QTAILQ_INSERT_TAIL(&va_state->client_jobs, client_job, next);
+    va_state->client_job_count++;
+
+    return va_kick();
+}
+
+/* pop client job off queue. this should only be done when we're done with
+ * both sending the request and recieving the response
+ */
+static VAClientJob *va_pop_client_job(void)
+{
+    VAClientJob *client_job = va_current_client_job();
+    TRACE("called");
+    if (client_job != NULL) {
+        QTAILQ_REMOVE(&va_state->client_jobs, client_job, next);
+        va_state->client_job_count--;
+        assert(va_set_client_state(VA_CLIENT_IDLE));
+    }
+    return client_job;
+}
+
+/* push new server job onto the queue */
+static int va_push_server_job(VAServerJob *server_job)
+{
+    TRACE("called");
+    if (va_state->server_job_count >= VA_SERVER_JOBS_MAX) {
+        LOG("server job queue limit exceeded");
+        return -ENOBUFS;
+    }
+    QTAILQ_INSERT_TAIL(&va_state->server_jobs, server_job, next);
+    va_state->server_job_count++;
+    return va_kick();
+}
+
+/* pop server job off queue. this should only be done when we're ready to
+ * send the rpc response back to the remote client
+ */
+static VAServerJob *va_pop_server_job(void) {
+    VAServerJob *server_job = QTAILQ_FIRST(&va_state->server_jobs);
+    TRACE("called");
+    if (server_job != NULL) {
+        QTAILQ_REMOVE(&va_state->server_jobs, server_job, next);
+        va_state->server_job_count--;
+        assert(va_set_server_state(VA_SERVER_IDLE));
+    }
+
+    return server_job;
+}
+
+static VAClientJob *va_client_job_new(xmlrpc_mem_block *req_data,
+                                      VAClientCallback *cb,
+                                      MonitorCompletion *mon_cb,
+                                      void *mon_data)
+{
+    VAClientJob *cj = qemu_mallocz(sizeof(VAClientJob));
+    TRACE("called");
+    cj->req_data = req_data;
+    cj->cb = cb;
+    cj->mon_cb = mon_cb;
+    cj->mon_data = mon_data;
+
+    return cj;
+}
+
+static VAServerJob *va_server_job_new(xmlrpc_mem_block *resp_data)
+{
+    VAServerJob *sj = qemu_mallocz(sizeof(VAServerJob));
+    TRACE("called");
+    sj->resp_data = resp_data;
+
+    return sj;
+}
+
+/* create new client job and then put it on the queue. this can be
+ * called externally from virtagent. Since there can only be one virtagent
+ * instance we access state via an object-scoped global rather than pass
+ * it around.
+ *
+ * if this is successful virtagent will handle cleanup of req_xml after
+ * making the appropriate callbacks, otherwise callee should handle it
+ */
+int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
+                      MonitorCompletion *mon_cb, void *mon_data)
+{
+    int ret;
+    VAClientJob *client_job;
+    TRACE("called");
+
+    client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
+    if (client_job == NULL) {
+        return -EINVAL;
+    }
+
+    ret = va_push_client_job(client_job);
+    if (ret != 0) {
+        LOG("error adding client to queue: %s", strerror(ret));
+        qemu_free(client_job);
+        return ret;
+    }
+
+    return 0;
+}
+
+/* create new server job and then put it on the queue in wait state
+ * this should only be called from within our read handler callback
+ */
+static int va_server_job_add(xmlrpc_mem_block *resp_xml)
+{
+    VAServerJob *server_job;
+    TRACE("called");
+
+    server_job = va_server_job_new(resp_xml);
+    assert(server_job != NULL);
+    va_push_server_job(server_job);
+    return 0;
+}
+
+
+int va_init(enum va_ctx ctx, int fd)
+{
+    VAState *s;
+    int ret;
+    bool is_host = (ctx == VA_CTX_HOST) ? true : false;
+
+    TRACE("called");
+    if (va_state) {
+        LOG("virtagent already initialized");
+        return -EPERM;
+    }
+
+    s = qemu_mallocz(sizeof(VAState));
+
+    ret = va_server_init(&s->server_data, is_host);
+    if (ret) {
+        LOG("error initializing virtagent server");
+        goto out_bad;
+    }
+    ret = va_client_init(&s->client_data);
+    if (ret) {
+        LOG("error initializing virtagent client");
+        goto out_bad;
+    }
+
+    s->client_state = VA_CLIENT_IDLE;
+    s->server_state = VA_SERVER_IDLE;
+    QTAILQ_INIT(&s->client_jobs);
+    QTAILQ_INIT(&s->server_jobs);
+    s->read_state.state = VA_READ_START;
+    s->read_state.read_cb = va_http_read_cb;
+    s->fd = fd;
+    va_state = s;
+
+    /* start listening for requests/responses */
+    qemu_set_fd_handler(va_state->fd, va_http_read_handler, NULL, NULL);
+
+    return 0;
+out_bad:
+    qemu_free(s);
+    return ret;
+}
diff --git a/virtagent-common.h b/virtagent-common.h
new file mode 100644
index 0000000..c0ada60
--- /dev/null
+++ b/virtagent-common.h
@@ -0,0 +1,66 @@ 
+/*
+ * virt-agent - host/guest RPC client functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ *  Adam Litke        <aglitke@linux.vnet.ibm.com>
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+#ifndef VIRTAGENT_COMMON_H
+#define VIRTAGENT_COMMON_H
+
+#include <xmlrpc-c/base.h>
+#include <xmlrpc-c/client.h>
+#include <xmlrpc-c/server.h>
+#include "qemu-common.h"
+#include "qemu_socket.h"
+#include "monitor.h"
+#include "virtagent-server.h"
+#include "virtagent.h"
+
+#define DEBUG_VA
+
+#ifdef DEBUG_VA
+#define TRACE(msg, ...) do { \
+    fprintf(stderr, "%s:%s():L%d: " msg "\n", \
+            __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \
+} while(0)
+#else
+#define TRACE(msg, ...) \
+    do { } while (0)
+#endif
+
+#define LOG(msg, ...) do { \
+    fprintf(stderr, "%s:%s(): " msg "\n", \
+            __FILE__, __FUNCTION__, ## __VA_ARGS__); \
+} while(0)
+
+#define VERSION "1.0"
+#define EOL "\r\n"
+
+#define VA_HDR_LEN_MAX 4096 /* http header limit */
+#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */
+#define VA_CLIENT_JOBS_MAX 5 /* max client rpcs we can queue */
+#define VA_SERVER_JOBS_MAX 1 /* max server rpcs we can queue */
+
+enum va_ctx {
+    VA_CTX_HOST,
+    VA_CTX_GUEST,
+};
+
+enum va_job_status {
+    VA_JOB_STATUS_PENDING = 0,
+    VA_JOB_STATUS_OK,
+    VA_JOB_STATUS_ERROR,
+    VA_JOB_STATUS_CANCELLED,
+};
+
+int va_init(enum va_ctx ctx, int fd);
+int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
+                      MonitorCompletion *mon_cb, void *mon_data);
+#endif /* VIRTAGENT_COMMON_H */