Patchwork [RFC,v6,04/23] virtagent: common code for managing client/server rpc jobs

login
register
mail settings
Submitter Michael Roth
Date Jan. 17, 2011, 1:14 p.m.
Message ID <1295270117-24760-5-git-send-email-mdroth@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/79166/
State New
Headers show

Comments

Michael Roth - Jan. 17, 2011, 1:14 p.m.
This implements a simple state machine to manage client/server rpc
jobs being multiplexed over a single channel.

A client job consists of sending an rpc request, reading an
rpc response, then making the appropriate callbacks. We allow one
client job to be processed at a time, which will make the following
state transitions:

VA_CLIENT_IDLE -> VA_CLIENT_SEND (job queued, send channel open)
VA_CLIENT_SEND -> VA_CLIENT_WAIT (request sent, awaiting response)
VA_CLIENT_WAIT -> VA_CLIENT_IDLE (response recieved, callbacks made)

A server job consists of recieving an rpc request, generating a
response, then sending the response. We expect to receive one server
request at a time due to the 1 at a time restriction for client jobs.
Server jobs make the following transitions:

VA_SERVER_IDLE -> VA_SERVER_WAIT (recieved/executed request, send
channel busy, response deferred)
VA_SERVER_IDLE -> VA_SERVER_SEND (recieved/executed request, send
channel open, sending response)
VA_SERVER_WAIT -> VA_SERVER_SEND (send channel now open, sending
response)
VA_SERVER_SEND -> VA_SERVER_IDLE (response sent)

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 virtagent-common.c |  613 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 virtagent-common.h |   71 ++++++
 2 files changed, 684 insertions(+), 0 deletions(-)
 create mode 100644 virtagent-common.c
 create mode 100644 virtagent-common.h

Patch

diff --git a/virtagent-common.c b/virtagent-common.c
new file mode 100644
index 0000000..c487252
--- /dev/null
+++ b/virtagent-common.c
@@ -0,0 +1,613 @@ 
+/*
+ * virtagent - common host/guest RPC functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ *  Adam Litke        <aglitke@linux.vnet.ibm.com>
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "virtagent-common.h"
+
+typedef struct VAClientJob {
+    char client_tag[64];
+    xmlrpc_mem_block *req_data;
+    char *resp_data;
+    size_t resp_data_len;
+    VAClientCallback *cb;
+    QTAILQ_ENTRY(VAClientJob) next;
+    /* for use by QMP functions */
+    MonitorCompletion *mon_cb;
+    void *mon_data;
+} VAClientJob;
+
+typedef struct VAServerJob {
+    char client_tag[64];
+    xmlrpc_mem_block *resp_data;
+    char *req_data;
+    size_t req_data_len;
+    void *opaque;
+    QTAILQ_ENTRY(VAServerJob) next;
+} VAServerJob;
+
+enum va_http_status {
+    VA_HTTP_STATUS_NEW,
+    VA_HTTP_STATUS_OK,
+    VA_HTTP_STATUS_ERROR,
+};
+
+enum va_http_type {
+    VA_HTTP_TYPE_UNKNOWN = 1,
+    VA_HTTP_TYPE_REQUEST,
+    VA_HTTP_TYPE_RESPONSE,
+} va_http_type;
+
+typedef void (VAHTSendCallback)(enum va_http_status http_status,
+                                const char *content, size_t content_len);
+typedef void (VAHTReadCallback)(enum va_http_status http_status,
+                                const char *content, size_t content_len,
+                                const char client_tag[64],
+                                enum va_http_type http_type);
+typedef struct VAHTState {
+    enum {
+        VA_SEND_START,
+        VA_SEND_HDR,
+        VA_SEND_BODY,
+        VA_READ_START,
+        VA_READ_HDR,
+        VA_READ_BODY,
+    } state;
+    char hdr[VA_HDR_LEN_MAX];
+    char hdr_client_tag[64];
+    size_t hdr_len;
+    size_t hdr_pos;
+    char *content;
+    size_t content_len;
+    size_t content_pos;
+    VAHTSendCallback *send_cb;
+    VAHTReadCallback *read_cb;
+    enum va_http_type http_type;
+} VAHTState;
+
+typedef struct VAState {
+    bool is_host;
+    const char *channel_method;
+    const char *channel_path;
+    int fd;
+    QEMUTimer *client_timer;
+    QEMUTimer *server_timer;
+    enum va_client_state {
+        VA_CLIENT_IDLE = 0,
+        VA_CLIENT_SEND,     /* sending rpc request */
+        VA_CLIENT_WAIT,     /* waiting for rpc response */
+    } client_state;
+    enum va_server_state {
+        VA_SERVER_IDLE = 0,
+        VA_SERVER_WAIT,     /* waiting to send rpc response */
+        VA_SERVER_SEND,     /* sending rpc response */
+    } server_state;
+    VAClientData client_data;
+    VAServerData server_data;
+    int client_job_count;
+    int client_jobs_in_flight;
+    QTAILQ_HEAD(, VAClientJob) client_jobs;
+    int server_job_count;
+    QTAILQ_HEAD(, VAServerJob) server_jobs;
+    /* for use by async send/read handlers for fd */
+    VAHTState send_state;
+    VAHTState read_state;
+} VAState;
+
+static VAState *va_state;
+
+static bool va_set_client_state(enum va_client_state client_state);
+static VAServerJob *va_pop_server_job(void);
+static VAClientJob *va_pop_client_job(void);
+static int va_kick(void);
+static int va_connect(void);
+static void va_http_read_handler(void *opaque);
+static void va_http_read_handler_reset(void);
+
+static VAClientJob *va_current_client_job(void)
+{
+    TRACE("called");
+    return QTAILQ_FIRST(&va_state->client_jobs);
+}
+
+static void va_cancel_jobs(void)
+{
+    VAClientJob *cj, *cj_tmp;
+    VAServerJob *sj, *sj_tmp;
+
+    TRACE("called");
+    /* reset read handler, and cancel any current sends */
+    va_http_read_handler_reset();
+    qemu_set_fd_handler(va_state->fd, va_http_read_handler, NULL, NULL);
+
+    /* cancel/remove any queued client jobs */
+    QTAILQ_FOREACH_SAFE(cj, &va_state->client_jobs, next, cj_tmp) {
+        /* issue cb with failure notification */
+        cj->cb(NULL, 0, cj->mon_cb, cj->mon_data);
+        QTAILQ_REMOVE(&va_state->client_jobs, cj, next);
+    }
+    va_state->client_job_count = 0;
+    va_state->client_jobs_in_flight = 0;
+
+    /* cancel/remove any queued server jobs */
+    QTAILQ_FOREACH_SAFE(sj, &va_state->server_jobs, next, sj_tmp) {
+        QTAILQ_REMOVE(&va_state->server_jobs, sj, next);
+    }
+    va_state->server_job_count = 0;
+
+    va_state->client_state = VA_CLIENT_IDLE;
+    va_state->server_state = VA_SERVER_IDLE;
+}
+
+static void va_global_timeout(void *opaque)
+{
+    LOG("time out while handling a client job or sending RPC response");
+    va_cancel_jobs();
+}
+
+static void va_set_client_timeout(int interval)
+{
+    qemu_mod_timer(va_state->client_timer,
+                   qemu_get_clock(rt_clock) + interval);
+}
+
+static void va_unset_client_timeout(void)
+{
+    qemu_del_timer(va_state->client_timer);
+}
+
+static void va_set_server_timeout(int interval)
+{
+    qemu_mod_timer(va_state->server_timer,
+                   qemu_get_clock(rt_clock) + interval);
+}
+
+static void va_unset_server_timeout(void)
+{
+    qemu_del_timer(va_state->server_timer);
+}
+
+/***********************************************************/
+/* functions for starting/managing client/server rpc jobs */
+
+static int va_send_server_response(VAServerJob *server_job)
+{
+    VAHTState http_state;
+    TRACE("called");
+    http_state.content = XMLRPC_MEMBLOCK_CONTENTS(char, server_job->resp_data);
+    TRACE("sending response: %s", http_state.content);
+    http_state.content_len = XMLRPC_MEMBLOCK_SIZE(char,
+                                                  server_job->resp_data);
+    http_state.content_pos = 0;
+    http_state.hdr_pos = 0;
+    pstrcpy(http_state.hdr_client_tag, 64, server_job->client_tag);
+    http_state.state = VA_SEND_START;
+    http_state.send_cb = va_server_send_cb;
+    va_http_hdr_init(&http_state, VA_HTTP_TYPE_RESPONSE);
+    va_state->send_state = http_state;
+    qemu_set_fd_handler(va_state->fd, va_http_read_handler,
+                        va_http_send_handler, NULL);
+    return 0;
+}
+
+static int va_send_client_request(VAClientJob *client_job)
+{
+    VAHTState http_state;
+    TRACE("called");
+    http_state.content = XMLRPC_MEMBLOCK_CONTENTS(char, client_job->req_data);
+    TRACE("sending request: %s", http_state.content);
+    http_state.content_len = XMLRPC_MEMBLOCK_SIZE(char,
+                                                  client_job->req_data);
+    http_state.content_pos = 0;
+    http_state.hdr_pos = 0;
+    http_state.state = VA_SEND_START;
+    http_state.send_cb = va_client_send_cb;
+    pstrcpy(http_state.hdr_client_tag, 64, client_job->client_tag);
+    va_http_hdr_init(&http_state, VA_HTTP_TYPE_REQUEST);
+    va_state->send_state = http_state;
+    qemu_set_fd_handler(va_state->fd, va_http_read_handler,
+                        va_http_send_handler, NULL);
+    return 0;
+}
+
+/* do some sanity checks before setting client state */
+static bool va_set_client_state(enum va_client_state client_state)
+{
+    TRACE("setting client state to %d", client_state);
+    switch (client_state) {
+    case VA_CLIENT_IDLE:
+        assert(va_state->client_state == VA_CLIENT_IDLE ||
+               va_state->client_state == VA_CLIENT_WAIT);
+        break;
+    case VA_CLIENT_SEND:
+        assert(va_state->client_state == VA_CLIENT_IDLE);
+        break;
+    case VA_CLIENT_WAIT:
+        assert(va_state->client_state == VA_CLIENT_SEND);
+        break;
+    default:
+        LOG("invalid client state");
+        return false;
+    }
+    va_state->client_state = client_state;
+    return true;
+}
+
+/* do some sanity checks before setting server state */
+static bool va_set_server_state(enum va_server_state server_state)
+{
+    TRACE("setting server state to %d", server_state);
+    switch (server_state) {
+    case VA_SERVER_IDLE:
+        assert(va_state->server_state == VA_SERVER_IDLE ||
+               va_state->server_state == VA_SERVER_SEND);
+        break;
+    case VA_SERVER_WAIT:
+        assert(va_state->server_state == VA_SERVER_IDLE);
+        break;
+    case VA_SERVER_SEND:
+        assert(va_state->server_state == VA_SERVER_IDLE ||
+               va_state->server_state == VA_SERVER_WAIT);
+        break;
+    default:
+        LOG("invalid server state");
+        return false;
+    }
+    va_state->server_state = server_state;
+    return true;
+}
+
+/* xmit the next client/server job. for the client this entails sending
+ * a request to the remote server. for the server this entails sending a
+ * response to the remote client
+ *
+ * currently we only do one client job or one server job at a time. for
+ * situations where we start a client job but recieve a server job (remote
+ * rpc request) we go ahead and handle the server job before returning to
+ * handling the client job. TODO: there is potential for pipelining
+ * requests/responses for more efficient use of the channel.
+ *
+ * in all cases, we can only kick off client requests or server responses
+ * when the send side of the channel is not being used
+ */
+static int va_kick(void)
+{
+    VAServerJob *server_job;
+    VAClientJob *client_job;
+    int ret;
+
+    TRACE("called");
+
+    /* handle server jobs first */
+    if (QTAILQ_EMPTY(&va_state->server_jobs)) {
+        assert(va_set_server_state(VA_SERVER_IDLE));
+    } else {
+        TRACE("handling server job queue");
+        if (va_state->client_state == VA_CLIENT_SEND) {
+            TRACE("send channel busy, deferring till available");
+            assert(va_set_server_state(VA_SERVER_WAIT));
+            goto out;
+        }
+        if (va_state->server_state == VA_SERVER_SEND) {
+            TRACE("current server job already sending");
+            goto out;
+        }
+        TRACE("send server response");
+        server_job = QTAILQ_FIRST(&va_state->server_jobs);
+
+        /* set up the send handler for the response */
+        ret = va_send_server_response(server_job);
+        if (ret != 0) {
+            LOG("error setting up send handler for server response");
+            goto out_bad;
+        }
+        assert(va_set_server_state(VA_SERVER_SEND));
+        va_set_server_timeout(VA_SERVER_TIMEOUT_MS);
+        goto out;
+    }
+
+    /* handle client jobs if nothing to do for server */
+    if (QTAILQ_EMPTY(&va_state->client_jobs)) {
+        assert(va_set_client_state(VA_CLIENT_IDLE));
+    } else {
+        TRACE("handling client job queue");
+        /* TODO: this limits the ability to pipeline. modify this logic
+         * and update state machine accordingly
+         */
+        if (va_state->client_state != VA_CLIENT_IDLE) {
+            TRACE("client job in progress, returning");
+            goto out;
+        }
+
+        /* We know the other end cannot queue up more than VA_SERVER_JOBS_MAX
+         * before it will begin dropping jobs/data to avoid unbounded memory
+         * utilization, so don't try to send more than this many jobs at a time.
+         * In the future we should obtain the actual value of the other end's
+         * VA_SERVER_JOBS_MAX via an introspection call of some sort in case
+         * this value changes in the future.
+         *
+         * XXX: this won't be relevant until the state machine is modified to
+         * allow pipelining requests.
+         */
+        if (va_state->client_jobs_in_flight >= VA_SERVER_JOBS_MAX) {
+            TRACE("too many client jobs in flight, returning");
+            goto out;
+        }
+        TRACE("sending new client request");
+        client_job = QTAILQ_FIRST(&va_state->client_jobs);
+        /* set up the send handler for the request, then put it on the
+         * wait queue till response is read
+         */
+        ret = va_send_client_request(client_job);
+        if (ret != 0) {
+            LOG("error setting up sendhandler for client request");
+            goto out_bad;
+        }
+        assert(va_set_client_state(VA_CLIENT_SEND));
+        va_state->client_jobs_in_flight++;
+        va_set_client_timeout(VA_CLIENT_TIMEOUT_MS);
+    }
+
+out:
+    return 0;
+out_bad:
+    return ret;
+}
+
+/* push new client job onto queue, */
+static int va_push_client_job(VAClientJob *client_job)
+{
+    TRACE("called");
+    assert(client_job != NULL);
+    if (va_state->client_job_count >= VA_CLIENT_JOBS_MAX) {
+        LOG("client job queue limit exceeded");
+        return -ENOBUFS;
+    }
+    QTAILQ_INSERT_TAIL(&va_state->client_jobs, client_job, next);
+    va_state->client_job_count++;
+
+    return va_kick();
+}
+
+/* pop client job off queue. this should only be done when we're done with
+ * both sending the request and recieving the response
+ */
+static VAClientJob *va_pop_client_job(void)
+{
+    VAClientJob *client_job = va_current_client_job();
+    TRACE("called");
+    if (client_job != NULL) {
+        QTAILQ_REMOVE(&va_state->client_jobs, client_job, next);
+        va_state->client_job_count--;
+        assert(va_set_client_state(VA_CLIENT_IDLE));
+    }
+    return client_job;
+}
+
+/* push new server job onto the queue */
+static int va_push_server_job(VAServerJob *server_job)
+{
+    TRACE("called");
+    if (va_state->server_job_count >= VA_SERVER_JOBS_MAX) {
+        LOG("server job queue limit exceeded");
+        return -ENOBUFS;
+    }
+    QTAILQ_INSERT_TAIL(&va_state->server_jobs, server_job, next);
+    va_state->server_job_count++;
+    return va_kick();
+}
+
+/* pop server job off queue. this should only be done when we're ready to
+ * send the rpc response back to the remote client
+ */
+static VAServerJob *va_pop_server_job(void) {
+    VAServerJob *server_job = QTAILQ_FIRST(&va_state->server_jobs);
+    TRACE("called");
+    if (server_job != NULL) {
+        QTAILQ_REMOVE(&va_state->server_jobs, server_job, next);
+        va_state->server_job_count--;
+        assert(va_set_server_state(VA_SERVER_IDLE));
+    }
+
+    return server_job;
+}
+
+static VAClientJob *va_client_job_new(xmlrpc_mem_block *req_data,
+                                      VAClientCallback *cb,
+                                      MonitorCompletion *mon_cb,
+                                      void *mon_data)
+{
+    VAClientJob *cj = qemu_mallocz(sizeof(VAClientJob));
+    TRACE("called");
+    cj->req_data = req_data;
+    cj->cb = cb;
+    cj->mon_cb = mon_cb;
+    cj->mon_data = mon_data;
+    /* TODO: use uuid's, or something akin */
+    strcpy(cj->client_tag, "testtag");
+
+    return cj;
+}
+
+static VAServerJob *va_server_job_new(xmlrpc_mem_block *resp_data,
+                                      const char client_tag[64])
+{
+    VAServerJob *sj = qemu_mallocz(sizeof(VAServerJob));
+    TRACE("called");
+    sj->resp_data = resp_data;
+    pstrcpy(sj->client_tag, 64, client_tag);
+
+    return sj;
+}
+
+/* create new client job and then put it on the queue. this can be
+ * called externally from virtagent. Since there can only be one virtagent
+ * instance we access state via an object-scoped global rather than pass
+ * it around.
+ *
+ * if this is successful virtagent will handle cleanup of req_xml after
+ * making the appropriate callbacks, otherwise caller should handle it
+ */
+int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
+                      MonitorCompletion *mon_cb, void *mon_data)
+{
+    int ret;
+    VAClientJob *client_job;
+    TRACE("called");
+
+    client_job = va_client_job_new(req_xml, cb, mon_cb, mon_data);
+    if (client_job == NULL) {
+        return -EINVAL;
+    }
+
+    ret = va_push_client_job(client_job);
+    if (ret != 0) {
+        LOG("error adding client to queue: %s", strerror(ret));
+        qemu_free(client_job);
+        return ret;
+    }
+
+    return 0;
+}
+
+/* create new server job and then put it on the queue in wait state */
+int va_server_job_add(xmlrpc_mem_block *resp_xml, const char client_tag[64])
+{
+    VAServerJob *server_job;
+    TRACE("called");
+
+    server_job = va_server_job_new(resp_xml, client_tag);
+    assert(server_job != NULL);
+    va_push_server_job(server_job);
+    return 0;
+}
+
+static int va_connect(void)
+{
+    QemuOpts *opts;
+    int fd, ret = 0;
+
+    TRACE("called");
+    if (va_state->channel_method == NULL) {
+        LOG("no channel method specified");
+        return -EINVAL;
+    }
+    if (va_state->channel_path == NULL) {
+        LOG("no channel path specified");
+        return -EINVAL;
+    }
+
+    if (strcmp(va_state->channel_method, "unix-connect") == 0) {
+        TRACE("connecting to %s", va_state->channel_path);
+        opts = qemu_opts_create(qemu_find_opts("chardev"), NULL, 0);
+        qemu_opt_set(opts, "path", va_state->channel_path);
+        fd = unix_connect_opts(opts);
+        if (fd == -1) {
+            qemu_opts_del(opts);
+            LOG("error opening channel: %s", strerror(errno));
+            return -errno;
+        }
+        qemu_opts_del(opts);
+        socket_set_nonblock(fd);
+    } else if (strcmp(va_state->channel_method, "virtio-serial") == 0) {
+        if (va_state->is_host) {
+            LOG("specified channel method not available for host");
+            return -EINVAL;
+        }
+        if (va_state->channel_path == NULL) {
+            va_state->channel_path = VA_GUEST_PATH_VIRTIO_DEFAULT;
+        }
+        TRACE("opening %s", va_state->channel_path);
+        fd = qemu_open(va_state->channel_path, O_RDWR);
+        if (fd == -1) {
+            LOG("error opening channel: %s", strerror(errno));
+            return -errno;
+        }
+        ret = fcntl(fd, F_GETFL);
+        if (ret < 0) {
+            LOG("error getting channel flags: %s", strerror(errno));
+            return -errno;
+        }
+        ret = fcntl(fd, F_SETFL, ret | O_ASYNC);
+        if (ret < 0) {
+            LOG("error setting channel flags: %s", strerror(errno));
+            return -errno;
+        }
+    } else {
+        LOG("invalid channel method");
+        return -EINVAL;
+    }
+
+    va_state->fd = fd;
+    return 0;
+}
+
+int va_init(VAContext ctx)
+{
+    VAState *s;
+    int ret;
+
+    TRACE("called");
+    if (va_state) {
+        LOG("virtagent already initialized");
+        return -EPERM;
+    }
+
+    s = qemu_mallocz(sizeof(VAState));
+
+    ret = va_server_init(&s->server_data, ctx.is_host);
+    if (ret) {
+        LOG("error initializing virtagent server");
+        goto out_bad;
+    }
+    ret = va_client_init(&s->client_data);
+    if (ret) {
+        LOG("error initializing virtagent client");
+        goto out_bad;
+    }
+
+    s->client_timer = qemu_new_timer(rt_clock, va_global_timeout, NULL);
+    s->server_timer = qemu_new_timer(rt_clock, va_global_timeout, NULL);
+    s->client_state = VA_CLIENT_IDLE;
+    s->client_job_count = 0;
+    s->client_jobs_in_flight = 0;
+    s->server_state = VA_SERVER_IDLE;
+    s->server_job_count = 0;
+    QTAILQ_INIT(&s->client_jobs);
+    QTAILQ_INIT(&s->server_jobs);
+    s->read_state.state = VA_READ_START;
+    s->read_state.read_cb = va_http_read_cb;
+    s->channel_method = ctx.channel_method;
+    s->channel_path = ctx.channel_path;
+    s->is_host = ctx.is_host;
+    va_state = s;
+
+    /* connect to our end of the channel */
+    ret = va_connect();
+    if (ret) {
+        LOG("error connecting to channel");
+        goto out_bad;
+    }
+
+    /* start listening for requests/responses */
+    qemu_set_fd_handler(va_state->fd, va_http_read_handler, NULL, NULL);
+
+    if (!va_state->is_host) {
+        /* tell the host the agent is running */
+        va_send_hello();
+    }
+
+    return 0;
+out_bad:
+    qemu_free(s);
+    return ret;
+}
diff --git a/virtagent-common.h b/virtagent-common.h
new file mode 100644
index 0000000..568df5a
--- /dev/null
+++ b/virtagent-common.h
@@ -0,0 +1,71 @@ 
+/*
+ * virt-agent - host/guest RPC client functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ *  Adam Litke        <aglitke@linux.vnet.ibm.com>
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+#ifndef VIRTAGENT_COMMON_H
+#define VIRTAGENT_COMMON_H
+
+#include <xmlrpc-c/base.h>
+#include <xmlrpc-c/client.h>
+#include <xmlrpc-c/server.h>
+#include "qemu-common.h"
+#include "qemu_socket.h"
+#include "qemu-timer.h"
+#include "monitor.h"
+#include "virtagent-server.h"
+#include "virtagent.h"
+
+#define DEBUG_VA
+
+#ifdef DEBUG_VA
+#define TRACE(msg, ...) do { \
+    fprintf(stderr, "%s:%s():L%d: " msg "\n", \
+            __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \
+} while(0)
+#else
+#define TRACE(msg, ...) \
+    do { } while (0)
+#endif
+
+#define LOG(msg, ...) do { \
+    fprintf(stderr, "%s:%s(): " msg "\n", \
+            __FILE__, __FUNCTION__, ## __VA_ARGS__); \
+} while(0)
+
+#define VA_VERSION "1.0"
+#define EOL "\r\n"
+
+#define VA_HDR_LEN_MAX 4096 /* http header limit */
+#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */
+#define VA_CLIENT_JOBS_MAX 5 /* max client rpcs we can queue */
+#define VA_SERVER_JOBS_MAX 5 /* max server rpcs we can queue */
+#define VA_SERVER_TIMEOUT_MS 5 * 1000
+#define VA_CLIENT_TIMEOUT_MS 5 * 1000
+
+typedef struct VAContext {
+    bool is_host;
+    const char *channel_method;
+    const char *channel_path;
+} VAContext;
+
+enum va_job_status {
+    VA_JOB_STATUS_PENDING = 0,
+    VA_JOB_STATUS_OK,
+    VA_JOB_STATUS_ERROR,
+    VA_JOB_STATUS_CANCELLED,
+};
+
+int va_init(VAContext ctx);
+int va_client_job_add(xmlrpc_mem_block *req_xml, VAClientCallback *cb,
+                      MonitorCompletion *mon_cb, void *mon_data);
+int va_server_job_add(xmlrpc_mem_block *resp_xml, const char client_tag[64]);
+#endif /* VIRTAGENT_COMMON_H */