diff mbox

[RFC,v7,04/16] virtagent: bi-directional RPC handling logic

Message ID 1299528642-23631-5-git-send-email-mdroth@linux.vnet.ibm.com
State New
Headers show

Commit Message

Michael Roth March 7, 2011, 8:10 p.m. UTC
This implements the state machine/logic used to manage
send/receive/execute phases of RPCs we send or receive. It does so using
a set of abstract methods we implement with the application and
transport level code which will follow.

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 virtagent-manager.c |  326 +++++++++++++++++++++++++++++++++++++++++++++++++++
 virtagent-manager.h |  130 ++++++++++++++++++++
 2 files changed, 456 insertions(+), 0 deletions(-)
 create mode 100644 virtagent-manager.c
 create mode 100644 virtagent-manager.h

Comments

Adam Litke March 7, 2011, 9:24 p.m. UTC | #1
On Mon, 2011-03-07 at 14:10 -0600, Michael Roth wrote:
> This implements the state machine/logic used to manage
> send/receive/execute phases of RPCs we send or receive. It does so using
> a set of abstract methods we implement with the application and
> transport level code which will follow.
> 
> Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
> ---
>  virtagent-manager.c |  326 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  virtagent-manager.h |  130 ++++++++++++++++++++
>  2 files changed, 456 insertions(+), 0 deletions(-)
>  create mode 100644 virtagent-manager.c
>  create mode 100644 virtagent-manager.h
> 
> diff --git a/virtagent-manager.c b/virtagent-manager.c
> new file mode 100644
> index 0000000..51d26a3
> --- /dev/null
> +++ b/virtagent-manager.c
> @@ -0,0 +1,326 @@
> +/*
> + * virtagent - job queue management
> + *
> + * Copyright IBM Corp. 2011
> + *
> + * Authors:
> + *  Michael Roth      <mdroth@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "virtagent-common.h"
> +
> +typedef struct VAServerJob {
> +    char tag[64];
> +    void *opaque;
> +    VAServerJobOps ops;
> +    QTAILQ_ENTRY(VAServerJob) next;
> +    enum {
> +        VA_SERVER_JOB_STATE_NEW = 0,
> +        VA_SERVER_JOB_STATE_BUSY,
> +        VA_SERVER_JOB_STATE_EXECUTED,
> +        VA_SERVER_JOB_STATE_SENT,
> +        VA_SERVER_JOB_STATE_DONE,
> +    } state;
> +} VAServerJob;
> +
> +typedef struct VAClientJob {
> +    char tag[64];
> +    void *opaque;
> +    void *resp_opaque;
> +    VAClientJobOps ops;
> +    QTAILQ_ENTRY(VAClientJob) next;
> +    enum {
> +        VA_CLIENT_JOB_STATE_NEW = 0,
> +        VA_CLIENT_JOB_STATE_BUSY,
> +        VA_CLIENT_JOB_STATE_SENT,
> +        VA_CLIENT_JOB_STATE_READ,
> +        VA_CLIENT_JOB_STATE_DONE,
> +    } state;
> +} VAClientJob;
> +
> +#define SEND_COUNT_MAX 1
> +#define EXECUTE_COUNT_MAX 4

It's not immediately clear what the difference between SEND_COUNT_MAX
and EXECUTE_COUNT_MAX is.  Some comments would help.  Also, will the
code work if these numbers are changed?  If not, a note about what
someone needs to look at when changing these would seem appropriate.

> +
> +struct VAManager {
> +    int send_count; /* sends in flight */
> +    int execute_count; /* number of jobs currently executing */
> +    QTAILQ_HEAD(, VAServerJob) server_jobs;
> +    QTAILQ_HEAD(, VAClientJob) client_jobs;
> +};
> +
> +/* server job operations/helpers */
> +
> +static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag)
> +{
> +    VAServerJob *j;
> +    QTAILQ_FOREACH(j, &m->server_jobs, next) {
> +        if (strcmp(j->tag, tag) == 0) {
> +            return j;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +int va_server_job_add(VAManager *m, const char *tag, void *opaque,
> +                      VAServerJobOps ops)
> +{
> +    VAServerJob *j = qemu_mallocz(sizeof(VAServerJob));
> +    TRACE("called");

Qemu has a good tracing infrastructure.  If this is trace point is
useful enough to keep around, it should try to use that.  If it's not
that important, I'd remove it entirely.  I believe this has been flagged
in an earlier RFC too.

> +    j->state = VA_SERVER_JOB_STATE_NEW;
> +    j->ops = ops;
> +    j->opaque = opaque;
> +    memset(j->tag, 0, 64);
> +    pstrcpy(j->tag, 63, tag);

Magic numbers.  Should use something like #define TAG_LEN 64

> +    QTAILQ_INSERT_TAIL(&m->server_jobs, j, next);
> +    va_kick(m);
> +    return 0;
> +}
> +
> +static void va_server_job_execute(VAServerJob *j)
> +{
> +    TRACE("called");
> +    j->state = VA_SERVER_JOB_STATE_BUSY;
> +    j->ops.execute(j->opaque, j->tag);
> +}
> +
> +/* TODO: need a way to pass information back */
> +void va_server_job_execute_done(VAManager *m, const char *tag)
> +{
> +    VAServerJob *j = va_server_job_by_tag(m, tag);
> +    TRACE("called");
> +    if (!j) {
> +        LOG("server job with tag \"%s\" not found", tag);
> +        return;
> +    }
> +    j->state = VA_SERVER_JOB_STATE_EXECUTED;
> +    va_kick(m);
> +}
> +
> +static void va_server_job_send(VAServerJob *j)
> +{
> +    TRACE("called");
> +    j->state = VA_SERVER_JOB_STATE_BUSY;
> +    j->ops.send(j->opaque, j->tag);
> +}
> +
> +void va_server_job_send_done(VAManager *m, const char *tag)
> +{
> +    VAServerJob *j = va_server_job_by_tag(m, tag);
> +    TRACE("called");
> +    if (!j) {
> +        LOG("server job with tag \"%s\" not found", tag);
> +        return;
> +    }
> +    j->state = VA_SERVER_JOB_STATE_SENT;
> +    m->send_count--;
> +    va_kick(m);
> +}
> +
> +static void va_server_job_callback(VAServerJob *j)
> +{
> +    TRACE("called");
> +    j->state = VA_SERVER_JOB_STATE_BUSY;
> +    if (j->ops.callback) {
> +        j->ops.callback(j->opaque, j->tag);
> +    }
> +    j->state = VA_SERVER_JOB_STATE_DONE;
> +}
> +
> +void va_server_job_cancel(VAManager *m, const char *tag)
> +{
> +    VAServerJob *j = va_server_job_by_tag(m, tag);
> +    TRACE("called");
> +    if (!j) {
> +        LOG("server job with tag \"%s\" not found", tag);
> +        return;
> +    }
> +    /* TODO: need to decrement sends/execs in flight appropriately */
> +    /* make callback and move to done state, kick() will handle cleanup */
> +    va_server_job_callback(j);
> +    va_kick(m);
> +}
> +
> +/* client job operations */
> +
> +static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag)
> +{
> +    VAClientJob *j;
> +    QTAILQ_FOREACH(j, &m->client_jobs, next) {
> +        if (strcmp(j->tag, tag) == 0) {
> +            return j;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +int va_client_job_add(VAManager *m, const char *tag, void *opaque,
> +                      VAClientJobOps ops)
> +{
> +    VAClientJob *j = qemu_mallocz(sizeof(VAClientJob));
> +    TRACE("called");
> +    j->ops = ops;
> +    j->opaque = opaque;
> +    memset(j->tag, 0, 64);
> +    pstrcpy(j->tag, 63, tag);
> +    QTAILQ_INSERT_TAIL(&m->client_jobs, j, next);
> +    va_kick(m);
> +    return 0;
> +}
> +
> +static void va_client_job_send(VAClientJob *j)
> +{
> +    TRACE("called");
> +    j->state = VA_CLIENT_JOB_STATE_BUSY;
> +    j->ops.send(j->opaque, j->tag);
> +}
> +
> +void va_client_job_send_done(VAManager *m, const char *tag)
> +{
> +    VAClientJob *j = va_client_job_by_tag(m, tag);
> +    TRACE("called");
> +    if (!j) {
> +        LOG("client job with tag \"%s\" not found", tag);
> +        return;
> +    }
> +    j->state = VA_CLIENT_JOB_STATE_SENT;
> +    m->send_count--;
> +    va_kick(m);
> +}
> +
> +void va_client_job_read_done(VAManager *m, const char *tag, void *resp)
> +{
> +    VAClientJob *j = va_client_job_by_tag(m, tag);
> +    TRACE("called");
> +    if (!j) {
> +        LOG("client job with tag \"%s\" not found", tag);
> +        return;
> +    }
> +    j->state = VA_CLIENT_JOB_STATE_READ;
> +    j->resp_opaque = resp;
> +    va_kick(m);
> +}
> +
> +static void va_client_job_callback(VAClientJob *j)
> +{
> +    TRACE("called");
> +    j->state = VA_CLIENT_JOB_STATE_BUSY;
> +    if (j->ops.callback) {
> +        j->ops.callback(j->opaque, j->resp_opaque, j->tag);
> +    }
> +    j->state = VA_CLIENT_JOB_STATE_DONE;
> +}
> +
> +void va_client_job_cancel(VAManager *m, const char *tag)
> +{
> +    VAClientJob *j = va_client_job_by_tag(m, tag);
> +    TRACE("called");
> +    if (!j) {
> +        LOG("client job with tag \"%s\" not found", tag);
> +        return;
> +    }
> +    /* TODO: need to decrement sends/execs in flight appropriately */
> +    /* make callback and move to done state, kick() will handle cleanup */
> +    va_client_job_callback(j);
> +    va_kick(m);
> +}
> +
> +/* general management functions */
> +
> +VAManager *va_manager_new(void)
> +{
> +    VAManager *m = qemu_mallocz(sizeof(VAManager));
> +    QTAILQ_INIT(&m->client_jobs);
> +    QTAILQ_INIT(&m->server_jobs);
> +    return m;
> +}
> +
> +static void va_process_server_job(VAManager *m, VAServerJob *sj)
> +{
> +    switch (sj->state) {
> +        case VA_SERVER_JOB_STATE_NEW:
> +            TRACE("marker");
> +            va_server_job_execute(sj);
> +            break;
> +        case VA_SERVER_JOB_STATE_EXECUTED:
> +            TRACE("marker");
> +            if (m->send_count < SEND_COUNT_MAX) {
> +                TRACE("marker");
> +                va_server_job_send(sj);
> +                m->send_count++;
> +            }
> +            break;
> +        case VA_SERVER_JOB_STATE_SENT:
> +            TRACE("marker");
> +            va_server_job_callback(sj);
> +            break;
> +        case VA_SERVER_JOB_STATE_BUSY:
> +            TRACE("marker, server job currently busy");
> +            break;
> +        case VA_SERVER_JOB_STATE_DONE:
> +            TRACE("marker");
> +            QTAILQ_REMOVE(&m->server_jobs, sj, next);
> +            break;
> +        default:
> +            LOG("error, unknown server job state");
> +            break;
> +    }
> +}
> +
> +static void va_process_client_job(VAManager *m, VAClientJob *cj)
> +{
> +    switch (cj->state) {
> +        case VA_CLIENT_JOB_STATE_NEW:
> +            TRACE("marker");
> +            if (m->send_count < SEND_COUNT_MAX) {
> +                TRACE("marker");
> +                va_client_job_send(cj);
> +                m->send_count++;
> +            }
> +            break;
> +        case VA_CLIENT_JOB_STATE_SENT:
> +            TRACE("marker");
> +            //nothing to do here, awaiting read_done()
> +            break;
> +        case VA_CLIENT_JOB_STATE_READ:
> +            TRACE("marker");
> +            va_client_job_callback(cj);
> +            break;
> +        case VA_CLIENT_JOB_STATE_DONE:
> +            TRACE("marker");
> +            QTAILQ_REMOVE(&m->client_jobs, cj, next);
> +            break;
> +        case VA_CLIENT_JOB_STATE_BUSY:
> +            TRACE("marker, client job currently busy");
> +            break;
> +        default:
> +            LOG("error, unknown client job state");
> +            break;
> +    }
> +}
> +
> +void va_kick(VAManager *m)
> +{
> +    VAServerJob *sj, *sj_tmp;
> +    VAClientJob *cj, *cj_tmp;
> +
> +    TRACE("called");
> +    TRACE("send_count: %u, execute_count: %u", m->send_count, m->execute_count);
> +
> +    /* TODO: make sure there is no starvation of jobs/operations here */
> +
> +    /* look for any work to be done among pending server jobs */
> +    QTAILQ_FOREACH_SAFE(sj, &m->server_jobs, next, sj_tmp) {
> +        TRACE("marker, server tag: %s", sj->tag);
> +        va_process_server_job(m, sj);
> +    }
> +
> +    /* look for work to be done among pending client jobs */
> +    QTAILQ_FOREACH_SAFE(cj, &m->client_jobs, next, cj_tmp) {
> +        TRACE("marker, client tag: %s", cj->tag);
> +        va_process_client_job(m, cj);
> +    }
> +}
> diff --git a/virtagent-manager.h b/virtagent-manager.h
> new file mode 100644
> index 0000000..7b463fb
> --- /dev/null
> +++ b/virtagent-manager.h
> @@ -0,0 +1,130 @@
> +#ifndef VIRTAGENT_MANAGER_H
> +#define VIRTAGENT_MANAGER_H
> +
> +#include "qemu-common.h"
> +#include "qemu-queue.h"
> +
> +/*
> + * Protocol Overview:
> + *
> + * The virtagent protocol depends on a state machine to manage communication
> + * over a single connection stream, currently a virtio or isa serial channel.
> + * The basic characterization of the work being done is that clients
> + * send/handle client jobs locally, which are then read/handled remotely as
> + * server jobs. A client job consists of a request which is sent, and a
> + * response which is eventually recieved. A server job consists of a request
> + * which is recieved from the other end, and a response which is sent back.

"i before e, except after c ..." (I misspell receive all the time too).

> + * 
> + * Server jobs are given priority over client jobs, i.e. if we send a client
> + * job (our request) and recieve a server job (their request), rather than
> + * await a response to the client job, we immediately begin processing the
> + * server job and then send back the response. This prevents us from being
> + * deadlocked in a situation where both sides have sent a client job and are
> + * awaiting the response before handling the other side's client job.
> + *
> + * Multiple in-flight requests are supported, but high request rates can
> + * potentially starve out the other side's client jobs / requests, so we'll
> + * behaved participants should periodically backoff on high request rates, or
> + * limit themselves to 1 request at a time (anything more than 1 can still
> + * potentionally remove any window for the other end to service it's own
> + * client jobs, since we can begin sending the next request before it begins
> + * send the response for the 2nd).
> + * 
> + * On a related note, in the future, bidirectional user/session-level guest
> + * agents may also be supported via a forwarding service made available
> + * through the system-level guest agent. In this case it is up to the
> + * system-level agent to handle forwarding requests in such a way that we
> + * don't starve the host-side service out sheerly by having too many
> + * sessions/users trying to send RPCs at a constant rate. This would be
> + * supported through this job Manager via an additional "forwarder" job type.
> + *
> + * To encapsulate some of this logic, we define here a "Manager" class, which
> + * provides an abstract interface to a state machine which handles most of
> + * the above logic transparently to the transport/application-level code.
> + * This also makes it possible to utilize alternative
> + * transport/application-level protocols in the future.
> + *
> + */
> +
> +/*
> + * Two types of jobs are generated from various components of virtagent.
> + * Each job type has a priority, and a set of prioritized functions as well.
> + *
> + * The read handler generates new server jobs as it recieves requests from
> + * the channel. Server jobs make progress through the following operations.
> + *
> + * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE
> + *
> + * EXECUTE (provided by user, manager calls)
> + * When server jobs are added, eventually (as execution slots become
> + * available) an execute() will be called to begin executing the job. An
> + * error value will be returned if there is no room in the queue for another
> + * server job.
> + *
> + * EXECUTE_DONE (provided by manager, user calls)
> + * As server jobs complete, execute_completed() is called to update execution
> + * status of that job (failure/success), inject the payload, and kick off the
> + * next operation.
> + *
> + * SEND (provided by user, manager calls)
> + * Eventually the send() operation is made. This will cause the send handler
> + * to begin sending the response.
> + *
> + * SEND_DONE (provided by manager, user calls)
> + * Upon completion of that send, the send_completed() operation will be
> + * called. This will free up the job, and kick off the next operation.
> + */

Very helpful protocol overview.  Thanks for adding this.

> +typedef int (va_job_op)(void *opaque, const char *tag);
> +typedef struct VAServerJobOps {
> +    va_job_op *execute;
> +    va_job_op *send;
> +    va_job_op *callback;
> +} VAServerJobOps;
> +
> +/*
> + * The client component generates new client jobs as they're made by
> + * virtagent in response to monitored events or user-issued commands.
> + * Client jobs progress via the following operations.
> + *
> + * SEND->SEND_DONE->READ_DONE
> + * 
> + * SEND (provided by user, called by manager)
> + * After client jobs are added, send() will eventually be called to queue
> + * the job up for xmit over the channel.
> + *
> + * SEND_DONE (provided by manager, called by user)
> + * Upon completion of the send, send_completed() should be called with
> + * failure/success indication.
> + *
> + * READ_DONE (provided by manager, called by user)
> + * When a response for the request is read back via the transport layer,
> + * read_done() will be called by the user to indicate success/failure,
> + * inject the response, and make the associated callback.
> + */
> +typedef int (va_client_job_cb)(void *opaque, void *resp_opaque,
> +                               const char *tag);
> +typedef struct VAClientJobOps {
> +    va_job_op *send;
> +    va_client_job_cb *callback;
> +} VAClientJobOps;
> +
> +typedef struct VAManager VAManager;
> +
> +VAManager *va_manager_new(void);
> +void va_kick(VAManager *m);
> +
> +/* interfaces for server jobs */
> +int va_server_job_add(VAManager *m, const char *tag, void *opaque,
> +                      VAServerJobOps ops);
> +void va_server_job_execute_done(VAManager *m, const char *tag);
> +void va_server_job_send_done(VAManager *m, const char *tag);
> +void va_server_job_cancel(VAManager *m, const char *tag);
> +
> +/* interfaces for client jobs */
> +int va_client_job_add(VAManager *m, const char *tag, void *opaque,
> +                      VAClientJobOps ops);
> +void va_client_job_cancel(VAManager *m, const char *tag);
> +void va_client_job_send_done(VAManager *m, const char *tag);
> +void va_client_job_read_done(VAManager *m, const char *tag, void *resp);
> +
> +#endif /* VIRTAGENT_MANAGER_H */
Michael Roth March 7, 2011, 10:35 p.m. UTC | #2
On 03/07/2011 03:24 PM, Adam Litke wrote:
> On Mon, 2011-03-07 at 14:10 -0600, Michael Roth wrote:
>> This implements the state machine/logic used to manage
>> send/receive/execute phases of RPCs we send or receive. It does so using
>> a set of abstract methods we implement with the application and
>> transport level code which will follow.
>>
>> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
>> ---
>>   virtagent-manager.c |  326 +++++++++++++++++++++++++++++++++++++++++++++++++++
>>   virtagent-manager.h |  130 ++++++++++++++++++++
>>   2 files changed, 456 insertions(+), 0 deletions(-)
>>   create mode 100644 virtagent-manager.c
>>   create mode 100644 virtagent-manager.h
>>
>> diff --git a/virtagent-manager.c b/virtagent-manager.c
>> new file mode 100644
>> index 0000000..51d26a3
>> --- /dev/null
>> +++ b/virtagent-manager.c
>> @@ -0,0 +1,326 @@
>> +/*
>> + * virtagent - job queue management
>> + *
>> + * Copyright IBM Corp. 2011
>> + *
>> + * Authors:
>> + *  Michael Roth<mdroth@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + *
>> + */
>> +
>> +#include "virtagent-common.h"
>> +
>> +typedef struct VAServerJob {
>> +    char tag[64];
>> +    void *opaque;
>> +    VAServerJobOps ops;
>> +    QTAILQ_ENTRY(VAServerJob) next;
>> +    enum {
>> +        VA_SERVER_JOB_STATE_NEW = 0,
>> +        VA_SERVER_JOB_STATE_BUSY,
>> +        VA_SERVER_JOB_STATE_EXECUTED,
>> +        VA_SERVER_JOB_STATE_SENT,
>> +        VA_SERVER_JOB_STATE_DONE,
>> +    } state;
>> +} VAServerJob;
>> +
>> +typedef struct VAClientJob {
>> +    char tag[64];
>> +    void *opaque;
>> +    void *resp_opaque;
>> +    VAClientJobOps ops;
>> +    QTAILQ_ENTRY(VAClientJob) next;
>> +    enum {
>> +        VA_CLIENT_JOB_STATE_NEW = 0,
>> +        VA_CLIENT_JOB_STATE_BUSY,
>> +        VA_CLIENT_JOB_STATE_SENT,
>> +        VA_CLIENT_JOB_STATE_READ,
>> +        VA_CLIENT_JOB_STATE_DONE,
>> +    } state;
>> +} VAClientJob;
>> +
>> +#define SEND_COUNT_MAX 1
>> +#define EXECUTE_COUNT_MAX 4
>
> It's not immediately clear what the difference between SEND_COUNT_MAX
> and EXECUTE_COUNT_MAX is.  Some comments would help.  Also, will the
> code work if these numbers are changed?  If not, a note about what
> someone needs to look at when changing these would seem appropriate.
>


Basically the SEND_COUNT_MAX is the number of RPCs the client can have 
in flight at a time. EXECUTE_COUNT_MAX is the number of jobs the server 
can execute concurrently/asynchronously (execute as in actually do the 
"execute corresponding RPC" phase of a server job's lifecycle).

These should be tweakable without much side-effect. These aren't 
currently that important since a monitor tends to limit us to 1 RPC at a 
time, and the guest agent doesn't make any substantial use of 
guest->host RPCs atm, so SEND_COUNT_MAX has little impact.

We don't currently execute RPCs concurrently/asynchronously either, so 
EXECUTE_COUNT_MAX doesn't do much. But when threaded RPC execution is 
re-implemented this will come back into play. I'll make sure to add some 
comments on this.

>> +
>> +struct VAManager {
>> +    int send_count; /* sends in flight */
>> +    int execute_count; /* number of jobs currently executing */
>> +    QTAILQ_HEAD(, VAServerJob) server_jobs;
>> +    QTAILQ_HEAD(, VAClientJob) client_jobs;
>> +};
>> +
>> +/* server job operations/helpers */
>> +
>> +static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag)
>> +{
>> +    VAServerJob *j;
>> +    QTAILQ_FOREACH(j,&m->server_jobs, next) {
>> +        if (strcmp(j->tag, tag) == 0) {
>> +            return j;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +int va_server_job_add(VAManager *m, const char *tag, void *opaque,
>> +                      VAServerJobOps ops)
>> +{
>> +    VAServerJob *j = qemu_mallocz(sizeof(VAServerJob));
>> +    TRACE("called");
>
> Qemu has a good tracing infrastructure.  If this is trace point is
> useful enough to keep around, it should try to use that.  If it's not
> that important, I'd remove it entirely.  I believe this has been flagged
> in an earlier RFC too.

These are really just to aid in development. I plan on NOOPing these via 
the DEBUG_VA flag before merge. Can also remove them if it's too nasty. 
Only a very small subset of these would be useful for the trace 
facility, I'll have a better idea of which ones once I stop relying on 
the TRACE() stuff.

>
>> +    j->state = VA_SERVER_JOB_STATE_NEW;
>> +    j->ops = ops;
>> +    j->opaque = opaque;
>> +    memset(j->tag, 0, 64);
>> +    pstrcpy(j->tag, 63, tag);
>
> Magic numbers.  Should use something like #define TAG_LEN 64
>
>> +    QTAILQ_INSERT_TAIL(&m->server_jobs, j, next);
>> +    va_kick(m);
>> +    return 0;
>> +}
>> +
>> +static void va_server_job_execute(VAServerJob *j)
>> +{
>> +    TRACE("called");
>> +    j->state = VA_SERVER_JOB_STATE_BUSY;
>> +    j->ops.execute(j->opaque, j->tag);
>> +}
>> +
>> +/* TODO: need a way to pass information back */
>> +void va_server_job_execute_done(VAManager *m, const char *tag)
>> +{
>> +    VAServerJob *j = va_server_job_by_tag(m, tag);
>> +    TRACE("called");
>> +    if (!j) {
>> +        LOG("server job with tag \"%s\" not found", tag);
>> +        return;
>> +    }
>> +    j->state = VA_SERVER_JOB_STATE_EXECUTED;
>> +    va_kick(m);
>> +}
>> +
>> +static void va_server_job_send(VAServerJob *j)
>> +{
>> +    TRACE("called");
>> +    j->state = VA_SERVER_JOB_STATE_BUSY;
>> +    j->ops.send(j->opaque, j->tag);
>> +}
>> +
>> +void va_server_job_send_done(VAManager *m, const char *tag)
>> +{
>> +    VAServerJob *j = va_server_job_by_tag(m, tag);
>> +    TRACE("called");
>> +    if (!j) {
>> +        LOG("server job with tag \"%s\" not found", tag);
>> +        return;
>> +    }
>> +    j->state = VA_SERVER_JOB_STATE_SENT;
>> +    m->send_count--;
>> +    va_kick(m);
>> +}
>> +
>> +static void va_server_job_callback(VAServerJob *j)
>> +{
>> +    TRACE("called");
>> +    j->state = VA_SERVER_JOB_STATE_BUSY;
>> +    if (j->ops.callback) {
>> +        j->ops.callback(j->opaque, j->tag);
>> +    }
>> +    j->state = VA_SERVER_JOB_STATE_DONE;
>> +}
>> +
>> +void va_server_job_cancel(VAManager *m, const char *tag)
>> +{
>> +    VAServerJob *j = va_server_job_by_tag(m, tag);
>> +    TRACE("called");
>> +    if (!j) {
>> +        LOG("server job with tag \"%s\" not found", tag);
>> +        return;
>> +    }
>> +    /* TODO: need to decrement sends/execs in flight appropriately */
>> +    /* make callback and move to done state, kick() will handle cleanup */
>> +    va_server_job_callback(j);
>> +    va_kick(m);
>> +}
>> +
>> +/* client job operations */
>> +
>> +static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag)
>> +{
>> +    VAClientJob *j;
>> +    QTAILQ_FOREACH(j,&m->client_jobs, next) {
>> +        if (strcmp(j->tag, tag) == 0) {
>> +            return j;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>> +int va_client_job_add(VAManager *m, const char *tag, void *opaque,
>> +                      VAClientJobOps ops)
>> +{
>> +    VAClientJob *j = qemu_mallocz(sizeof(VAClientJob));
>> +    TRACE("called");
>> +    j->ops = ops;
>> +    j->opaque = opaque;
>> +    memset(j->tag, 0, 64);
>> +    pstrcpy(j->tag, 63, tag);
>> +    QTAILQ_INSERT_TAIL(&m->client_jobs, j, next);
>> +    va_kick(m);
>> +    return 0;
>> +}
>> +
>> +static void va_client_job_send(VAClientJob *j)
>> +{
>> +    TRACE("called");
>> +    j->state = VA_CLIENT_JOB_STATE_BUSY;
>> +    j->ops.send(j->opaque, j->tag);
>> +}
>> +
>> +void va_client_job_send_done(VAManager *m, const char *tag)
>> +{
>> +    VAClientJob *j = va_client_job_by_tag(m, tag);
>> +    TRACE("called");
>> +    if (!j) {
>> +        LOG("client job with tag \"%s\" not found", tag);
>> +        return;
>> +    }
>> +    j->state = VA_CLIENT_JOB_STATE_SENT;
>> +    m->send_count--;
>> +    va_kick(m);
>> +}
>> +
>> +void va_client_job_read_done(VAManager *m, const char *tag, void *resp)
>> +{
>> +    VAClientJob *j = va_client_job_by_tag(m, tag);
>> +    TRACE("called");
>> +    if (!j) {
>> +        LOG("client job with tag \"%s\" not found", tag);
>> +        return;
>> +    }
>> +    j->state = VA_CLIENT_JOB_STATE_READ;
>> +    j->resp_opaque = resp;
>> +    va_kick(m);
>> +}
>> +
>> +static void va_client_job_callback(VAClientJob *j)
>> +{
>> +    TRACE("called");
>> +    j->state = VA_CLIENT_JOB_STATE_BUSY;
>> +    if (j->ops.callback) {
>> +        j->ops.callback(j->opaque, j->resp_opaque, j->tag);
>> +    }
>> +    j->state = VA_CLIENT_JOB_STATE_DONE;
>> +}
>> +
>> +void va_client_job_cancel(VAManager *m, const char *tag)
>> +{
>> +    VAClientJob *j = va_client_job_by_tag(m, tag);
>> +    TRACE("called");
>> +    if (!j) {
>> +        LOG("client job with tag \"%s\" not found", tag);
>> +        return;
>> +    }
>> +    /* TODO: need to decrement sends/execs in flight appropriately */
>> +    /* make callback and move to done state, kick() will handle cleanup */
>> +    va_client_job_callback(j);
>> +    va_kick(m);
>> +}
>> +
>> +/* general management functions */
>> +
>> +VAManager *va_manager_new(void)
>> +{
>> +    VAManager *m = qemu_mallocz(sizeof(VAManager));
>> +    QTAILQ_INIT(&m->client_jobs);
>> +    QTAILQ_INIT(&m->server_jobs);
>> +    return m;
>> +}
>> +
>> +static void va_process_server_job(VAManager *m, VAServerJob *sj)
>> +{
>> +    switch (sj->state) {
>> +        case VA_SERVER_JOB_STATE_NEW:
>> +            TRACE("marker");
>> +            va_server_job_execute(sj);
>> +            break;
>> +        case VA_SERVER_JOB_STATE_EXECUTED:
>> +            TRACE("marker");
>> +            if (m->send_count<  SEND_COUNT_MAX) {
>> +                TRACE("marker");
>> +                va_server_job_send(sj);
>> +                m->send_count++;
>> +            }
>> +            break;
>> +        case VA_SERVER_JOB_STATE_SENT:
>> +            TRACE("marker");
>> +            va_server_job_callback(sj);
>> +            break;
>> +        case VA_SERVER_JOB_STATE_BUSY:
>> +            TRACE("marker, server job currently busy");
>> +            break;
>> +        case VA_SERVER_JOB_STATE_DONE:
>> +            TRACE("marker");
>> +            QTAILQ_REMOVE(&m->server_jobs, sj, next);
>> +            break;
>> +        default:
>> +            LOG("error, unknown server job state");
>> +            break;
>> +    }
>> +}
>> +
>> +static void va_process_client_job(VAManager *m, VAClientJob *cj)
>> +{
>> +    switch (cj->state) {
>> +        case VA_CLIENT_JOB_STATE_NEW:
>> +            TRACE("marker");
>> +            if (m->send_count<  SEND_COUNT_MAX) {
>> +                TRACE("marker");
>> +                va_client_job_send(cj);
>> +                m->send_count++;
>> +            }
>> +            break;
>> +        case VA_CLIENT_JOB_STATE_SENT:
>> +            TRACE("marker");
>> +            //nothing to do here, awaiting read_done()
>> +            break;
>> +        case VA_CLIENT_JOB_STATE_READ:
>> +            TRACE("marker");
>> +            va_client_job_callback(cj);
>> +            break;
>> +        case VA_CLIENT_JOB_STATE_DONE:
>> +            TRACE("marker");
>> +            QTAILQ_REMOVE(&m->client_jobs, cj, next);
>> +            break;
>> +        case VA_CLIENT_JOB_STATE_BUSY:
>> +            TRACE("marker, client job currently busy");
>> +            break;
>> +        default:
>> +            LOG("error, unknown client job state");
>> +            break;
>> +    }
>> +}
>> +
>> +void va_kick(VAManager *m)
>> +{
>> +    VAServerJob *sj, *sj_tmp;
>> +    VAClientJob *cj, *cj_tmp;
>> +
>> +    TRACE("called");
>> +    TRACE("send_count: %u, execute_count: %u", m->send_count, m->execute_count);
>> +
>> +    /* TODO: make sure there is no starvation of jobs/operations here */
>> +
>> +    /* look for any work to be done among pending server jobs */
>> +    QTAILQ_FOREACH_SAFE(sj,&m->server_jobs, next, sj_tmp) {
>> +        TRACE("marker, server tag: %s", sj->tag);
>> +        va_process_server_job(m, sj);
>> +    }
>> +
>> +    /* look for work to be done among pending client jobs */
>> +    QTAILQ_FOREACH_SAFE(cj,&m->client_jobs, next, cj_tmp) {
>> +        TRACE("marker, client tag: %s", cj->tag);
>> +        va_process_client_job(m, cj);
>> +    }
>> +}
>> diff --git a/virtagent-manager.h b/virtagent-manager.h
>> new file mode 100644
>> index 0000000..7b463fb
>> --- /dev/null
>> +++ b/virtagent-manager.h
>> @@ -0,0 +1,130 @@
>> +#ifndef VIRTAGENT_MANAGER_H
>> +#define VIRTAGENT_MANAGER_H
>> +
>> +#include "qemu-common.h"
>> +#include "qemu-queue.h"
>> +
>> +/*
>> + * Protocol Overview:
>> + *
>> + * The virtagent protocol depends on a state machine to manage communication
>> + * over a single connection stream, currently a virtio or isa serial channel.
>> + * The basic characterization of the work being done is that clients
>> + * send/handle client jobs locally, which are then read/handled remotely as
>> + * server jobs. A client job consists of a request which is sent, and a
>> + * response which is eventually recieved. A server job consists of a request
>> + * which is recieved from the other end, and a response which is sent back.
>
> "i before e, except after c ..." (I misspell receive all the time too).
>

TIL about vim's spell check feature :)

>> + *
>> + * Server jobs are given priority over client jobs, i.e. if we send a client
>> + * job (our request) and recieve a server job (their request), rather than
>> + * await a response to the client job, we immediately begin processing the
>> + * server job and then send back the response. This prevents us from being
>> + * deadlocked in a situation where both sides have sent a client job and are
>> + * awaiting the response before handling the other side's client job.
>> + *
>> + * Multiple in-flight requests are supported, but high request rates can
>> + * potentially starve out the other side's client jobs / requests, so we'll
>> + * behaved participants should periodically backoff on high request rates, or
>> + * limit themselves to 1 request at a time (anything more than 1 can still
>> + * potentionally remove any window for the other end to service it's own
>> + * client jobs, since we can begin sending the next request before it begins
>> + * send the response for the 2nd).
>> + *
>> + * On a related note, in the future, bidirectional user/session-level guest
>> + * agents may also be supported via a forwarding service made available
>> + * through the system-level guest agent. In this case it is up to the
>> + * system-level agent to handle forwarding requests in such a way that we
>> + * don't starve the host-side service out sheerly by having too many
>> + * sessions/users trying to send RPCs at a constant rate. This would be
>> + * supported through this job Manager via an additional "forwarder" job type.
>> + *
>> + * To encapsulate some of this logic, we define here a "Manager" class, which
>> + * provides an abstract interface to a state machine which handles most of
>> + * the above logic transparently to the transport/application-level code.
>> + * This also makes it possible to utilize alternative
>> + * transport/application-level protocols in the future.
>> + *
>> + */
>> +
>> +/*
>> + * Two types of jobs are generated from various components of virtagent.
>> + * Each job type has a priority, and a set of prioritized functions as well.
>> + *
>> + * The read handler generates new server jobs as it recieves requests from
>> + * the channel. Server jobs make progress through the following operations.
>> + *
>> + * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE
>> + *
>> + * EXECUTE (provided by user, manager calls)
>> + * When server jobs are added, eventually (as execution slots become
>> + * available) an execute() will be called to begin executing the job. An
>> + * error value will be returned if there is no room in the queue for another
>> + * server job.
>> + *
>> + * EXECUTE_DONE (provided by manager, user calls)
>> + * As server jobs complete, execute_completed() is called to update execution
>> + * status of that job (failure/success), inject the payload, and kick off the
>> + * next operation.
>> + *
>> + * SEND (provided by user, manager calls)
>> + * Eventually the send() operation is made. This will cause the send handler
>> + * to begin sending the response.
>> + *
>> + * SEND_DONE (provided by manager, user calls)
>> + * Upon completion of that send, the send_completed() operation will be
>> + * called. This will free up the job, and kick off the next operation.
>> + */
>
> Very helpful protocol overview.  Thanks for adding this.
>
>> +typedef int (va_job_op)(void *opaque, const char *tag);
>> +typedef struct VAServerJobOps {
>> +    va_job_op *execute;
>> +    va_job_op *send;
>> +    va_job_op *callback;
>> +} VAServerJobOps;
>> +
>> +/*
>> + * The client component generates new client jobs as they're made by
>> + * virtagent in response to monitored events or user-issued commands.
>> + * Client jobs progress via the following operations.
>> + *
>> + * SEND->SEND_DONE->READ_DONE
>> + *
>> + * SEND (provided by user, called by manager)
>> + * After client jobs are added, send() will eventually be called to queue
>> + * the job up for xmit over the channel.
>> + *
>> + * SEND_DONE (provided by manager, called by user)
>> + * Upon completion of the send, send_completed() should be called with
>> + * failure/success indication.
>> + *
>> + * READ_DONE (provided by manager, called by user)
>> + * When a response for the request is read back via the transport layer,
>> + * read_done() will be called by the user to indicate success/failure,
>> + * inject the response, and make the associated callback.
>> + */
>> +typedef int (va_client_job_cb)(void *opaque, void *resp_opaque,
>> +                               const char *tag);
>> +typedef struct VAClientJobOps {
>> +    va_job_op *send;
>> +    va_client_job_cb *callback;
>> +} VAClientJobOps;
>> +
>> +typedef struct VAManager VAManager;
>> +
>> +VAManager *va_manager_new(void);
>> +void va_kick(VAManager *m);
>> +
>> +/* interfaces for server jobs */
>> +int va_server_job_add(VAManager *m, const char *tag, void *opaque,
>> +                      VAServerJobOps ops);
>> +void va_server_job_execute_done(VAManager *m, const char *tag);
>> +void va_server_job_send_done(VAManager *m, const char *tag);
>> +void va_server_job_cancel(VAManager *m, const char *tag);
>> +
>> +/* interfaces for client jobs */
>> +int va_client_job_add(VAManager *m, const char *tag, void *opaque,
>> +                      VAClientJobOps ops);
>> +void va_client_job_cancel(VAManager *m, const char *tag);
>> +void va_client_job_send_done(VAManager *m, const char *tag);
>> +void va_client_job_read_done(VAManager *m, const char *tag, void *resp);
>> +
>> +#endif /* VIRTAGENT_MANAGER_H */
>
diff mbox

Patch

diff --git a/virtagent-manager.c b/virtagent-manager.c
new file mode 100644
index 0000000..51d26a3
--- /dev/null
+++ b/virtagent-manager.c
@@ -0,0 +1,326 @@ 
+/*
+ * virtagent - job queue management
+ *
+ * Copyright IBM Corp. 2011
+ *
+ * Authors:
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "virtagent-common.h"
+
+typedef struct VAServerJob {
+    char tag[64];
+    void *opaque;
+    VAServerJobOps ops;
+    QTAILQ_ENTRY(VAServerJob) next;
+    enum {
+        VA_SERVER_JOB_STATE_NEW = 0,
+        VA_SERVER_JOB_STATE_BUSY,
+        VA_SERVER_JOB_STATE_EXECUTED,
+        VA_SERVER_JOB_STATE_SENT,
+        VA_SERVER_JOB_STATE_DONE,
+    } state;
+} VAServerJob;
+
+typedef struct VAClientJob {
+    char tag[64];
+    void *opaque;
+    void *resp_opaque;
+    VAClientJobOps ops;
+    QTAILQ_ENTRY(VAClientJob) next;
+    enum {
+        VA_CLIENT_JOB_STATE_NEW = 0,
+        VA_CLIENT_JOB_STATE_BUSY,
+        VA_CLIENT_JOB_STATE_SENT,
+        VA_CLIENT_JOB_STATE_READ,
+        VA_CLIENT_JOB_STATE_DONE,
+    } state;
+} VAClientJob;
+
+#define SEND_COUNT_MAX 1
+#define EXECUTE_COUNT_MAX 4
+
+struct VAManager {
+    int send_count; /* sends in flight */
+    int execute_count; /* number of jobs currently executing */
+    QTAILQ_HEAD(, VAServerJob) server_jobs;
+    QTAILQ_HEAD(, VAClientJob) client_jobs;
+};
+
+/* server job operations/helpers */
+
+static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag)
+{
+    VAServerJob *j;
+    QTAILQ_FOREACH(j, &m->server_jobs, next) {
+        if (strcmp(j->tag, tag) == 0) {
+            return j;
+        }
+    }
+    return NULL;
+}
+
+int va_server_job_add(VAManager *m, const char *tag, void *opaque,
+                      VAServerJobOps ops)
+{
+    VAServerJob *j = qemu_mallocz(sizeof(VAServerJob));
+    TRACE("called");
+    j->state = VA_SERVER_JOB_STATE_NEW;
+    j->ops = ops;
+    j->opaque = opaque;
+    memset(j->tag, 0, 64);
+    pstrcpy(j->tag, 63, tag);
+    QTAILQ_INSERT_TAIL(&m->server_jobs, j, next);
+    va_kick(m);
+    return 0;
+}
+
+static void va_server_job_execute(VAServerJob *j)
+{
+    TRACE("called");
+    j->state = VA_SERVER_JOB_STATE_BUSY;
+    j->ops.execute(j->opaque, j->tag);
+}
+
+/* TODO: need a way to pass information back */
+void va_server_job_execute_done(VAManager *m, const char *tag)
+{
+    VAServerJob *j = va_server_job_by_tag(m, tag);
+    TRACE("called");
+    if (!j) {
+        LOG("server job with tag \"%s\" not found", tag);
+        return;
+    }
+    j->state = VA_SERVER_JOB_STATE_EXECUTED;
+    va_kick(m);
+}
+
+static void va_server_job_send(VAServerJob *j)
+{
+    TRACE("called");
+    j->state = VA_SERVER_JOB_STATE_BUSY;
+    j->ops.send(j->opaque, j->tag);
+}
+
+void va_server_job_send_done(VAManager *m, const char *tag)
+{
+    VAServerJob *j = va_server_job_by_tag(m, tag);
+    TRACE("called");
+    if (!j) {
+        LOG("server job with tag \"%s\" not found", tag);
+        return;
+    }
+    j->state = VA_SERVER_JOB_STATE_SENT;
+    m->send_count--;
+    va_kick(m);
+}
+
+static void va_server_job_callback(VAServerJob *j)
+{
+    TRACE("called");
+    j->state = VA_SERVER_JOB_STATE_BUSY;
+    if (j->ops.callback) {
+        j->ops.callback(j->opaque, j->tag);
+    }
+    j->state = VA_SERVER_JOB_STATE_DONE;
+}
+
+void va_server_job_cancel(VAManager *m, const char *tag)
+{
+    VAServerJob *j = va_server_job_by_tag(m, tag);
+    TRACE("called");
+    if (!j) {
+        LOG("server job with tag \"%s\" not found", tag);
+        return;
+    }
+    /* TODO: need to decrement sends/execs in flight appropriately */
+    /* make callback and move to done state, kick() will handle cleanup */
+    va_server_job_callback(j);
+    va_kick(m);
+}
+
+/* client job operations */
+
+static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag)
+{
+    VAClientJob *j;
+    QTAILQ_FOREACH(j, &m->client_jobs, next) {
+        if (strcmp(j->tag, tag) == 0) {
+            return j;
+        }
+    }
+    return NULL;
+}
+
+int va_client_job_add(VAManager *m, const char *tag, void *opaque,
+                      VAClientJobOps ops)
+{
+    VAClientJob *j = qemu_mallocz(sizeof(VAClientJob));
+    TRACE("called");
+    j->ops = ops;
+    j->opaque = opaque;
+    memset(j->tag, 0, 64);
+    pstrcpy(j->tag, 63, tag);
+    QTAILQ_INSERT_TAIL(&m->client_jobs, j, next);
+    va_kick(m);
+    return 0;
+}
+
+static void va_client_job_send(VAClientJob *j)
+{
+    TRACE("called");
+    j->state = VA_CLIENT_JOB_STATE_BUSY;
+    j->ops.send(j->opaque, j->tag);
+}
+
+void va_client_job_send_done(VAManager *m, const char *tag)
+{
+    VAClientJob *j = va_client_job_by_tag(m, tag);
+    TRACE("called");
+    if (!j) {
+        LOG("client job with tag \"%s\" not found", tag);
+        return;
+    }
+    j->state = VA_CLIENT_JOB_STATE_SENT;
+    m->send_count--;
+    va_kick(m);
+}
+
+void va_client_job_read_done(VAManager *m, const char *tag, void *resp)
+{
+    VAClientJob *j = va_client_job_by_tag(m, tag);
+    TRACE("called");
+    if (!j) {
+        LOG("client job with tag \"%s\" not found", tag);
+        return;
+    }
+    j->state = VA_CLIENT_JOB_STATE_READ;
+    j->resp_opaque = resp;
+    va_kick(m);
+}
+
+static void va_client_job_callback(VAClientJob *j)
+{
+    TRACE("called");
+    j->state = VA_CLIENT_JOB_STATE_BUSY;
+    if (j->ops.callback) {
+        j->ops.callback(j->opaque, j->resp_opaque, j->tag);
+    }
+    j->state = VA_CLIENT_JOB_STATE_DONE;
+}
+
+void va_client_job_cancel(VAManager *m, const char *tag)
+{
+    VAClientJob *j = va_client_job_by_tag(m, tag);
+    TRACE("called");
+    if (!j) {
+        LOG("client job with tag \"%s\" not found", tag);
+        return;
+    }
+    /* TODO: need to decrement sends/execs in flight appropriately */
+    /* make callback and move to done state, kick() will handle cleanup */
+    va_client_job_callback(j);
+    va_kick(m);
+}
+
+/* general management functions */
+
+VAManager *va_manager_new(void)
+{
+    VAManager *m = qemu_mallocz(sizeof(VAManager));
+    QTAILQ_INIT(&m->client_jobs);
+    QTAILQ_INIT(&m->server_jobs);
+    return m;
+}
+
+static void va_process_server_job(VAManager *m, VAServerJob *sj)
+{
+    switch (sj->state) {
+        case VA_SERVER_JOB_STATE_NEW:
+            TRACE("marker");
+            va_server_job_execute(sj);
+            break;
+        case VA_SERVER_JOB_STATE_EXECUTED:
+            TRACE("marker");
+            if (m->send_count < SEND_COUNT_MAX) {
+                TRACE("marker");
+                va_server_job_send(sj);
+                m->send_count++;
+            }
+            break;
+        case VA_SERVER_JOB_STATE_SENT:
+            TRACE("marker");
+            va_server_job_callback(sj);
+            break;
+        case VA_SERVER_JOB_STATE_BUSY:
+            TRACE("marker, server job currently busy");
+            break;
+        case VA_SERVER_JOB_STATE_DONE:
+            TRACE("marker");
+            QTAILQ_REMOVE(&m->server_jobs, sj, next);
+            break;
+        default:
+            LOG("error, unknown server job state");
+            break;
+    }
+}
+
+static void va_process_client_job(VAManager *m, VAClientJob *cj)
+{
+    switch (cj->state) {
+        case VA_CLIENT_JOB_STATE_NEW:
+            TRACE("marker");
+            if (m->send_count < SEND_COUNT_MAX) {
+                TRACE("marker");
+                va_client_job_send(cj);
+                m->send_count++;
+            }
+            break;
+        case VA_CLIENT_JOB_STATE_SENT:
+            TRACE("marker");
+            //nothing to do here, awaiting read_done()
+            break;
+        case VA_CLIENT_JOB_STATE_READ:
+            TRACE("marker");
+            va_client_job_callback(cj);
+            break;
+        case VA_CLIENT_JOB_STATE_DONE:
+            TRACE("marker");
+            QTAILQ_REMOVE(&m->client_jobs, cj, next);
+            break;
+        case VA_CLIENT_JOB_STATE_BUSY:
+            TRACE("marker, client job currently busy");
+            break;
+        default:
+            LOG("error, unknown client job state");
+            break;
+    }
+}
+
+void va_kick(VAManager *m)
+{
+    VAServerJob *sj, *sj_tmp;
+    VAClientJob *cj, *cj_tmp;
+
+    TRACE("called");
+    TRACE("send_count: %u, execute_count: %u", m->send_count, m->execute_count);
+
+    /* TODO: make sure there is no starvation of jobs/operations here */
+
+    /* look for any work to be done among pending server jobs */
+    QTAILQ_FOREACH_SAFE(sj, &m->server_jobs, next, sj_tmp) {
+        TRACE("marker, server tag: %s", sj->tag);
+        va_process_server_job(m, sj);
+    }
+
+    /* look for work to be done among pending client jobs */
+    QTAILQ_FOREACH_SAFE(cj, &m->client_jobs, next, cj_tmp) {
+        TRACE("marker, client tag: %s", cj->tag);
+        va_process_client_job(m, cj);
+    }
+}
diff --git a/virtagent-manager.h b/virtagent-manager.h
new file mode 100644
index 0000000..7b463fb
--- /dev/null
+++ b/virtagent-manager.h
@@ -0,0 +1,130 @@ 
+#ifndef VIRTAGENT_MANAGER_H
+#define VIRTAGENT_MANAGER_H
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+
+/*
+ * Protocol Overview:
+ *
+ * The virtagent protocol depends on a state machine to manage communication
+ * over a single connection stream, currently a virtio or isa serial channel.
+ * The basic characterization of the work being done is that clients
+ * send/handle client jobs locally, which are then read/handled remotely as
+ * server jobs. A client job consists of a request which is sent, and a
+ * response which is eventually recieved. A server job consists of a request
+ * which is recieved from the other end, and a response which is sent back.
+ * 
+ * Server jobs are given priority over client jobs, i.e. if we send a client
+ * job (our request) and recieve a server job (their request), rather than
+ * await a response to the client job, we immediately begin processing the
+ * server job and then send back the response. This prevents us from being
+ * deadlocked in a situation where both sides have sent a client job and are
+ * awaiting the response before handling the other side's client job.
+ *
+ * Multiple in-flight requests are supported, but high request rates can
+ * potentially starve out the other side's client jobs / requests, so we'll
+ * behaved participants should periodically backoff on high request rates, or
+ * limit themselves to 1 request at a time (anything more than 1 can still
+ * potentionally remove any window for the other end to service it's own
+ * client jobs, since we can begin sending the next request before it begins
+ * send the response for the 2nd).
+ * 
+ * On a related note, in the future, bidirectional user/session-level guest
+ * agents may also be supported via a forwarding service made available
+ * through the system-level guest agent. In this case it is up to the
+ * system-level agent to handle forwarding requests in such a way that we
+ * don't starve the host-side service out sheerly by having too many
+ * sessions/users trying to send RPCs at a constant rate. This would be
+ * supported through this job Manager via an additional "forwarder" job type.
+ *
+ * To encapsulate some of this logic, we define here a "Manager" class, which
+ * provides an abstract interface to a state machine which handles most of
+ * the above logic transparently to the transport/application-level code.
+ * This also makes it possible to utilize alternative
+ * transport/application-level protocols in the future.
+ *
+ */
+
+/*
+ * Two types of jobs are generated from various components of virtagent.
+ * Each job type has a priority, and a set of prioritized functions as well.
+ *
+ * The read handler generates new server jobs as it recieves requests from
+ * the channel. Server jobs make progress through the following operations.
+ *
+ * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE
+ *
+ * EXECUTE (provided by user, manager calls)
+ * When server jobs are added, eventually (as execution slots become
+ * available) an execute() will be called to begin executing the job. An
+ * error value will be returned if there is no room in the queue for another
+ * server job.
+ *
+ * EXECUTE_DONE (provided by manager, user calls)
+ * As server jobs complete, execute_completed() is called to update execution
+ * status of that job (failure/success), inject the payload, and kick off the
+ * next operation.
+ *
+ * SEND (provided by user, manager calls)
+ * Eventually the send() operation is made. This will cause the send handler
+ * to begin sending the response.
+ *
+ * SEND_DONE (provided by manager, user calls)
+ * Upon completion of that send, the send_completed() operation will be
+ * called. This will free up the job, and kick off the next operation.
+ */
+typedef int (va_job_op)(void *opaque, const char *tag);
+typedef struct VAServerJobOps {
+    va_job_op *execute;
+    va_job_op *send;
+    va_job_op *callback;
+} VAServerJobOps;
+
+/*
+ * The client component generates new client jobs as they're made by
+ * virtagent in response to monitored events or user-issued commands.
+ * Client jobs progress via the following operations.
+ *
+ * SEND->SEND_DONE->READ_DONE
+ * 
+ * SEND (provided by user, called by manager)
+ * After client jobs are added, send() will eventually be called to queue
+ * the job up for xmit over the channel.
+ *
+ * SEND_DONE (provided by manager, called by user)
+ * Upon completion of the send, send_completed() should be called with
+ * failure/success indication.
+ *
+ * READ_DONE (provided by manager, called by user)
+ * When a response for the request is read back via the transport layer,
+ * read_done() will be called by the user to indicate success/failure,
+ * inject the response, and make the associated callback.
+ */
+typedef int (va_client_job_cb)(void *opaque, void *resp_opaque,
+                               const char *tag);
+typedef struct VAClientJobOps {
+    va_job_op *send;
+    va_client_job_cb *callback;
+} VAClientJobOps;
+
+typedef struct VAManager VAManager;
+
+VAManager *va_manager_new(void);
+void va_kick(VAManager *m);
+
+/* interfaces for server jobs */
+int va_server_job_add(VAManager *m, const char *tag, void *opaque,
+                      VAServerJobOps ops);
+void va_server_job_execute_done(VAManager *m, const char *tag);
+void va_server_job_send_done(VAManager *m, const char *tag);
+void va_server_job_cancel(VAManager *m, const char *tag);
+
+/* interfaces for client jobs */
+int va_client_job_add(VAManager *m, const char *tag, void *opaque,
+                      VAClientJobOps ops);
+void va_client_job_cancel(VAManager *m, const char *tag);
+void va_client_job_send_done(VAManager *m, const char *tag);
+void va_client_job_read_done(VAManager *m, const char *tag, void *resp);
+
+#endif /* VIRTAGENT_MANAGER_H */