Patchwork [RFC,v1,07/12] qmp proxy: core code for proxying qmp requests to guest

login
register
mail settings
Submitter Michael Roth
Date March 25, 2011, 7:47 p.m.
Message ID <1301082479-4058-8-git-send-email-mdroth@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/88424/
State New
Headers show

Comments

Michael Roth - March 25, 2011, 7:47 p.m.
This provides a QmpProxy class, 1 instance of which is shared by all QMP
servers/sessions to send/receive QMP requests/responses between QEMU and
the QEMU guest agent.

A single qmp_proxy_send_request() is the only interface currently needed
by a QMP session, QAPI/QMP's existing async support handles all the work
of doing callbacks and routing responses to the proper session.

Currently the class requires a path to a listening socket that either
corresponds to the chardev that the guest agent is communicating
through, or a local socket so we can communicate with a host-side
"guest" agent for testing purposes.

A subsequent patch will introduce a new chardev that sets up the
socket chardev and initializes the QmpProxy instance to abstract this
away from the user. Unifying this with local "guest" agent support may
not be feasible, so another command-line option may be needed support
host-side-only testing.

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 qmp-core.c       |    8 ++
 qmp-core.h       |    7 +-
 qmp-proxy-core.h |   20 ++++
 qmp-proxy.c      |  335 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 vl.c             |    1 +
 5 files changed, 365 insertions(+), 6 deletions(-)
 create mode 100644 qmp-proxy-core.h
 create mode 100644 qmp-proxy.c
Anthony Liguori - March 25, 2011, 9:27 p.m.
On 03/25/2011 02:47 PM, Michael Roth wrote:
> This provides a QmpProxy class, 1 instance of which is shared by all QMP
> servers/sessions to send/receive QMP requests/responses between QEMU and
> the QEMU guest agent.
>
> A single qmp_proxy_send_request() is the only interface currently needed
> by a QMP session, QAPI/QMP's existing async support handles all the work
> of doing callbacks and routing responses to the proper session.
>
> Currently the class requires a path to a listening socket that either
> corresponds to the chardev that the guest agent is communicating
> through, or a local socket so we can communicate with a host-side
> "guest" agent for testing purposes.
>
> A subsequent patch will introduce a new chardev that sets up the
> socket chardev and initializes the QmpProxy instance to abstract this
> away from the user. Unifying this with local "guest" agent support may
> not be feasible, so another command-line option may be needed support
> host-side-only testing.
>
> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
> ---
>   qmp-core.c       |    8 ++
>   qmp-core.h       |    7 +-
>   qmp-proxy-core.h |   20 ++++
>   qmp-proxy.c      |  335 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   vl.c             |    1 +
>   5 files changed, 365 insertions(+), 6 deletions(-)
>   create mode 100644 qmp-proxy-core.h
>   create mode 100644 qmp-proxy.c
>
> diff --git a/qmp-core.c b/qmp-core.c
> index 9f3d182..dab50a1 100644
> --- a/qmp-core.c
> +++ b/qmp-core.c
> @@ -937,7 +937,15 @@ void qmp_async_complete_command(QmpCommandState *cmd, QObject *retval, Error *er
>       qemu_free(cmd);
>   }
>
> +extern QmpProxy *qmp_proxy_default;
> +
>   void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
>                           QmpGuestCompletionFunc *cb, void *opaque)
>   {
> +    if (!qmp_proxy_default) {
> +        /* TODO: should set errp here */
> +        fprintf(stderr, "qmp proxy: no guest proxy found\n");
> +        return;
> +    }
> +    qmp_proxy_send_request(qmp_proxy_default, name, args, errp, cb, opaque);
>   }
> diff --git a/qmp-core.h b/qmp-core.h
> index b676020..114d290 100644
> --- a/qmp-core.h
> +++ b/qmp-core.h
> @@ -4,6 +4,7 @@
>   #include "monitor.h"
>   #include "qmp-marshal-types.h"
>   #include "error_int.h"
> +#include "qmp-proxy-core.h"
>
>   struct QmpCommandState
>   {
> @@ -85,11 +86,5 @@ int qmp_state_get_fd(QmpState *sess);
>       }                                                        \
>   } while(0)
>
> -typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data, Error *err);
> -
> -void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
> -                        QmpGuestCompletionFunc *cb, void *opaque);
> -
> -
>   #endif
>
> diff --git a/qmp-proxy-core.h b/qmp-proxy-core.h
> new file mode 100644
> index 0000000..47ac85d
> --- /dev/null
> +++ b/qmp-proxy-core.h
> @@ -0,0 +1,20 @@
> +#ifndef QMP_PROXY_CORE_H
> +#define QMP_PROXY_CORE_H
> +
> +#define QMP_PROXY_PATH_DEFAULT "/tmp/qmp-proxy.sock"
> +
> +typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data,
> +                                      Error *err);
> +
> +void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
> +                        QmpGuestCompletionFunc *cb, void *opaque);
> +
> +typedef struct QmpProxy QmpProxy;
> +
> +void qmp_proxy_send_request(QmpProxy *p, const char *name,
> +                            const QDict *args, Error **errp,
> +                            QmpGuestCompletionFunc *cb, void *opaque);
> +QmpProxy *qmp_proxy_new(const char *channel_path);
> +void qmp_proxy_close(QmpProxy *p);
> +
> +#endif
> diff --git a/qmp-proxy.c b/qmp-proxy.c
> new file mode 100644
> index 0000000..eaa6e6e
> --- /dev/null
> +++ b/qmp-proxy.c
> @@ -0,0 +1,335 @@
> +/*
> + * QMP definitions for communicating with guest agent
> + *
> + * Copyright IBM Corp. 2011
> + *
> + * Authors:
> + *  Michael Roth<mdroth@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qmp.h"
> +#include "qmp-core.h"
> +#include "qemu-queue.h"
> +#include "json-parser.h"
> +#include "json-streamer.h"
> +#include "qemu_socket.h"
> +
> +#define QMP_SENTINEL 0xFF
> +
> +typedef struct QmpProxyRequest {
> +    const char *name;
> +    const QDict *args;
> +    QmpGuestCompletionFunc *cb;
> +    void *opaque;
> +    QString *json;
> +    QTAILQ_ENTRY(QmpProxyRequest) entry;
> +} QmpProxyRequest;
> +
> +typedef struct QmpProxyWriteState {
> +    QmpProxyRequest *current_request;
> +    const char *buf;
> +    size_t size;
> +    size_t pos;
> +    bool use_sentinel;
> +} QmpProxyWriteState;
> +
> +typedef struct QmpProxyReadState {
> +    char *buf;
> +    size_t size;
> +    size_t pos;
> +} QmpProxyReadState;

I suspect you should use GStrings for both the rx and tx buffers.  See 
the QmpUnixSession for an example.

> +struct QmpProxy {
> +    int fd;
> +    const char *path;
> +    QmpProxyWriteState write_state;
> +    QmpProxyReadState read_state;
> +    JSONMessageParser parser;
> +    QTAILQ_HEAD(, QmpProxyRequest) sent_requests;
> +    QTAILQ_HEAD(, QmpProxyRequest) queued_requests;
> +    QString *xport_event;
> +    QString *xport_event_sending;
> +};
> +
> +static void qmp_proxy_read_handler(void *opaque);
> +static void qmp_proxy_write_handler(void *opaque);
> +
> +static int qmp_proxy_cancel_request(QmpProxy *p, QmpProxyRequest *r)
> +{
> +    if (r->name) {
> +        if (r->cb) {
> +            r->cb(r->opaque, NULL, NULL);
> +        }
> +    }
> +
> +    return 0;
> +}
> +
> +static int qmp_proxy_cancel_all(QmpProxy *p)
> +{
> +    QmpProxyRequest *r, *tmp;
> +    QTAILQ_FOREACH_SAFE(r,&p->queued_requests, entry, tmp) {
> +        qmp_proxy_cancel_request(p, r);
> +        QTAILQ_REMOVE(&p->queued_requests, r, entry);
> +    }
> +    QTAILQ_FOREACH_SAFE(r,&p->sent_requests, entry, tmp) {
> +        qmp_proxy_cancel_request(p, r);
> +        QTAILQ_REMOVE(&p->queued_requests, r, entry);
> +    }
> +
> +    return 0;
> +}
> +
> +static void qmp_proxy_send_host_ack(QmpProxy *p, int session_id)
> +{
> +    QDict *evt = qdict_new();
> +
> +    /* only the last ack matters, nuke any outstanding ones. need to rethink
> +     * this approach if a host->guest reset event is added
> +     */
> +    if (p->xport_event) {
> +        QDECREF(p->xport_event);
> +    }
> +
> +    qdict_put_obj(evt, "_xport_event", QOBJECT(qstring_from_str("host_ack")));
> +    qdict_put_obj(evt, "_xport_arg_sid", QOBJECT(qint_from_int(session_id)));

I don't quite follow what this is doing.

> +    p->xport_event = qobject_to_json(QOBJECT(evt));
> +
> +    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler,
> +                        qmp_proxy_write_handler, p);
> +}
> +
> +static void qmp_proxy_process_event(JSONMessageParser *parser, QList *tokens)
> +{
> +    QmpProxy *p = container_of(parser, QmpProxy, parser);
> +    QmpProxyRequest *r;
> +    QObject *obj;
> +    QDict *qdict;
> +    Error *err = NULL;
> +    const char *cmd;
> +    int session_id;
> +
> +    fprintf(stderr, "qmp proxy: called\n");
> +    obj = json_parser_parse_err(tokens, NULL,&err);
> +    if (!obj) {
> +        fprintf(stderr, "qmp proxy: failed to parse\n");
> +        return;
> +    } else {
> +        fprintf(stderr, "qmp proxy: parse successful\n");
> +        qdict = qobject_to_qdict(obj);
> +    }
> +
> +    /* check for transport-only commands/events */
> +    if (qdict_haskey(qdict, "_xport_event")) {
> +        cmd = qdict_get_try_str(qdict, "_xport_event");
> +        if (cmd&&  strcmp(cmd, "guest_init") == 0) {
> +            /* reset outstanding requests, then send an ack with the
> +             * session id they passed us
> +             */
> +            session_id = qdict_get_try_int(qdict, "_xport_arg_sid", 0);
> +            if (!session_id) {
> +                fprintf(stderr, "received invalid guest_init event\n");
> +            }
> +            qmp_proxy_cancel_all(p);
> +            qmp_proxy_send_host_ack(p, session_id);
> +
> +            return;
> +        }
> +    } else if (qdict_haskey(qdict, "return")) {
> +        fprintf(stderr, "received return\n");
> +        r = QTAILQ_FIRST(&p->sent_requests);
> +        if (!r) {
> +            fprintf(stderr, "received return, but no request queued\n");
> +            return;
> +        }
> +        /* XXX: can't assume type here */
> +        fprintf(stderr, "recieved response for cmd: %s\nreturn: %s\n",
> +                r->name, qstring_get_str(qobject_to_json(QOBJECT(qdict))));
> +        /* TODO: we don't know what kind of qtype the return value is, what
> +         * really need is for the qmp async callbacks to handle demarshalling
> +         * for us, for now we just pass the whole response up the stack, which
> +         * means everything except commands with no return value, like
> +         * guest-ping, will result in errors reported to the client
> +         */
> +        r->cb(r->opaque, QOBJECT(qdict), NULL);
> +        QTAILQ_REMOVE(&p->sent_requests, r, entry);
> +        fprintf(stderr, "done handling response\n");
> +    } else {
> +        fprintf(stderr, "received invalid payload format\n");
> +    }
> +}
> +
> +static void qmp_proxy_read_handler(void *opaque)
> +{
> +    QmpProxy *p = opaque;
> +    char buf[4096];
> +    int ret;
> +
> +    do {
> +        ret = read(p->fd, buf, 4096);
> +        if (ret == -1) {
> +            if (errno != EAGAIN&&  errno != EINTR) {
> +                fprintf(stderr, "qmp proxy: error reading request: %s",
> +                        strerror(errno));
> +            }
> +            return;
> +        } else if (ret == 0) {
> +            /* TODO: is this recoverable? should only happen for hot-unplug
> +             * in the chardev case, but for testing via a local guest agent
> +             * we may want to do some special handling...
> +             */
> +            fprintf(stderr, "qmp proxy: connection closed unexpectedly");
> +            qmp_proxy_cancel_all(p);
> +            qemu_set_fd_handler(p->fd, NULL, NULL, p);
> +            return;
> +        }
> +        buf[ret] = 0;

You have a buffer overflow here.

> +        json_message_parser_feed(&p->parser, (char *)buf, ret);

cast is unnecessary.

> +    } while (ret>  0);
> +}
> +
> +static void qmp_proxy_write_handler(void *opaque)
> +{
> +    QmpProxy *p = opaque;
> +    QmpProxyWriteState s = p->write_state;
> +    QmpProxyRequest *r;
> +    char sentinel = QMP_SENTINEL;
> +    int ret;
> +
> +send_another:

Better to avoid being clever here and have a separate send_once function 
and then a loop() that calls send_once.

Regards,

Anthony Liguori
Michael Roth - March 25, 2011, 9:56 p.m.
On 03/25/2011 04:27 PM, Anthony Liguori wrote:
> On 03/25/2011 02:47 PM, Michael Roth wrote:
>> This provides a QmpProxy class, 1 instance of which is shared by all QMP
>> servers/sessions to send/receive QMP requests/responses between QEMU and
>> the QEMU guest agent.
>>
>> A single qmp_proxy_send_request() is the only interface currently needed
>> by a QMP session, QAPI/QMP's existing async support handles all the work
>> of doing callbacks and routing responses to the proper session.
>>
>> Currently the class requires a path to a listening socket that either
>> corresponds to the chardev that the guest agent is communicating
>> through, or a local socket so we can communicate with a host-side
>> "guest" agent for testing purposes.
>>
>> A subsequent patch will introduce a new chardev that sets up the
>> socket chardev and initializes the QmpProxy instance to abstract this
>> away from the user. Unifying this with local "guest" agent support may
>> not be feasible, so another command-line option may be needed support
>> host-side-only testing.
>>
>> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
>> ---
>> qmp-core.c | 8 ++
>> qmp-core.h | 7 +-
>> qmp-proxy-core.h | 20 ++++
>> qmp-proxy.c | 335 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>> vl.c | 1 +
>> 5 files changed, 365 insertions(+), 6 deletions(-)
>> create mode 100644 qmp-proxy-core.h
>> create mode 100644 qmp-proxy.c
>>
>> diff --git a/qmp-core.c b/qmp-core.c
>> index 9f3d182..dab50a1 100644
>> --- a/qmp-core.c
>> +++ b/qmp-core.c
>> @@ -937,7 +937,15 @@ void qmp_async_complete_command(QmpCommandState
>> *cmd, QObject *retval, Error *er
>> qemu_free(cmd);
>> }
>>
>> +extern QmpProxy *qmp_proxy_default;
>> +
>> void qmp_guest_dispatch(const char *name, const QDict *args, Error
>> **errp,
>> QmpGuestCompletionFunc *cb, void *opaque)
>> {
>> + if (!qmp_proxy_default) {
>> + /* TODO: should set errp here */
>> + fprintf(stderr, "qmp proxy: no guest proxy found\n");
>> + return;
>> + }
>> + qmp_proxy_send_request(qmp_proxy_default, name, args, errp, cb,
>> opaque);
>> }
>> diff --git a/qmp-core.h b/qmp-core.h
>> index b676020..114d290 100644
>> --- a/qmp-core.h
>> +++ b/qmp-core.h
>> @@ -4,6 +4,7 @@
>> #include "monitor.h"
>> #include "qmp-marshal-types.h"
>> #include "error_int.h"
>> +#include "qmp-proxy-core.h"
>>
>> struct QmpCommandState
>> {
>> @@ -85,11 +86,5 @@ int qmp_state_get_fd(QmpState *sess);
>> } \
>> } while(0)
>>
>> -typedef void (QmpGuestCompletionFunc)(void *opaque, QObject
>> *ret_data, Error *err);
>> -
>> -void qmp_guest_dispatch(const char *name, const QDict *args, Error
>> **errp,
>> - QmpGuestCompletionFunc *cb, void *opaque);
>> -
>> -
>> #endif
>>
>> diff --git a/qmp-proxy-core.h b/qmp-proxy-core.h
>> new file mode 100644
>> index 0000000..47ac85d
>> --- /dev/null
>> +++ b/qmp-proxy-core.h
>> @@ -0,0 +1,20 @@
>> +#ifndef QMP_PROXY_CORE_H
>> +#define QMP_PROXY_CORE_H
>> +
>> +#define QMP_PROXY_PATH_DEFAULT "/tmp/qmp-proxy.sock"
>> +
>> +typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data,
>> + Error *err);
>> +
>> +void qmp_guest_dispatch(const char *name, const QDict *args, Error
>> **errp,
>> + QmpGuestCompletionFunc *cb, void *opaque);
>> +
>> +typedef struct QmpProxy QmpProxy;
>> +
>> +void qmp_proxy_send_request(QmpProxy *p, const char *name,
>> + const QDict *args, Error **errp,
>> + QmpGuestCompletionFunc *cb, void *opaque);
>> +QmpProxy *qmp_proxy_new(const char *channel_path);
>> +void qmp_proxy_close(QmpProxy *p);
>> +
>> +#endif
>> diff --git a/qmp-proxy.c b/qmp-proxy.c
>> new file mode 100644
>> index 0000000..eaa6e6e
>> --- /dev/null
>> +++ b/qmp-proxy.c
>> @@ -0,0 +1,335 @@
>> +/*
>> + * QMP definitions for communicating with guest agent
>> + *
>> + * Copyright IBM Corp. 2011
>> + *
>> + * Authors:
>> + * Michael Roth<mdroth@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> later.
>> + * See the COPYING file in the top-level directory.
>> + *
>> + */
>> +
>> +#include "qmp.h"
>> +#include "qmp-core.h"
>> +#include "qemu-queue.h"
>> +#include "json-parser.h"
>> +#include "json-streamer.h"
>> +#include "qemu_socket.h"
>> +
>> +#define QMP_SENTINEL 0xFF
>> +
>> +typedef struct QmpProxyRequest {
>> + const char *name;
>> + const QDict *args;
>> + QmpGuestCompletionFunc *cb;
>> + void *opaque;
>> + QString *json;
>> + QTAILQ_ENTRY(QmpProxyRequest) entry;
>> +} QmpProxyRequest;
>> +
>> +typedef struct QmpProxyWriteState {
>> + QmpProxyRequest *current_request;
>> + const char *buf;
>> + size_t size;
>> + size_t pos;
>> + bool use_sentinel;
>> +} QmpProxyWriteState;
>> +
>> +typedef struct QmpProxyReadState {
>> + char *buf;
>> + size_t size;
>> + size_t pos;
>> +} QmpProxyReadState;
>
> I suspect you should use GStrings for both the rx and tx buffers. See
> the QmpUnixSession for an example.
>
>> +struct QmpProxy {
>> + int fd;
>> + const char *path;
>> + QmpProxyWriteState write_state;
>> + QmpProxyReadState read_state;
>> + JSONMessageParser parser;
>> + QTAILQ_HEAD(, QmpProxyRequest) sent_requests;
>> + QTAILQ_HEAD(, QmpProxyRequest) queued_requests;
>> + QString *xport_event;
>> + QString *xport_event_sending;
>> +};
>> +
>> +static void qmp_proxy_read_handler(void *opaque);
>> +static void qmp_proxy_write_handler(void *opaque);
>> +
>> +static int qmp_proxy_cancel_request(QmpProxy *p, QmpProxyRequest *r)
>> +{
>> + if (r->name) {
>> + if (r->cb) {
>> + r->cb(r->opaque, NULL, NULL);
>> + }
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +static int qmp_proxy_cancel_all(QmpProxy *p)
>> +{
>> + QmpProxyRequest *r, *tmp;
>> + QTAILQ_FOREACH_SAFE(r,&p->queued_requests, entry, tmp) {
>> + qmp_proxy_cancel_request(p, r);
>> + QTAILQ_REMOVE(&p->queued_requests, r, entry);
>> + }
>> + QTAILQ_FOREACH_SAFE(r,&p->sent_requests, entry, tmp) {
>> + qmp_proxy_cancel_request(p, r);
>> + QTAILQ_REMOVE(&p->queued_requests, r, entry);
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +static void qmp_proxy_send_host_ack(QmpProxy *p, int session_id)
>> +{
>> + QDict *evt = qdict_new();
>> +
>> + /* only the last ack matters, nuke any outstanding ones. need to
>> rethink
>> + * this approach if a host->guest reset event is added
>> + */
>> + if (p->xport_event) {
>> + QDECREF(p->xport_event);
>> + }
>> +
>> + qdict_put_obj(evt, "_xport_event",
>> QOBJECT(qstring_from_str("host_ack")));
>> + qdict_put_obj(evt, "_xport_arg_sid",
>> QOBJECT(qint_from_int(session_id)));
>
> I don't quite follow what this is doing.
>

That's for the session negotiation so we can reset state when the guest 
agent restarts. The sequence is:

guest -> host
{ "_xport_event": "guest_init", "_xport_arg_sid": <random session id> }

host -> guest
{ "_xport_event": "host_ack", "_xport_arg_sid": <received session id> }

Guest will ignore anything it gets until it sees an ack with the proper 
session id, host will cancel outstanding requests when it receives a 
guest init. If there's already an event waiting to be sent we clobber it 
since for the above exchange only the most recent event we sent matters.

We send them as json objects, but they get handled at transport level 
and terminate there.
Anthony Liguori - March 28, 2011, 7:05 p.m.
On 03/25/2011 04:56 PM, Michael Roth wrote:
>> I don't quite follow what this is doing.
>>
>
>
> That's for the session negotiation so we can reset state when the 
> guest agent restarts. The sequence is:
>
> guest -> host
> { "_xport_event": "guest_init", "_xport_arg_sid": <random session id> }
>
> host -> guest
> { "_xport_event": "host_ack", "_xport_arg_sid": <received session id> }
>
> Guest will ignore anything it gets until it sees an ack with the 
> proper session id, host will cancel outstanding requests when it 
> receives a guest init. If there's already an event waiting to be sent 
> we clobber it since for the above exchange only the most recent event 
> we sent matters.
>
> We send them as json objects, but they get handled at transport level 
> and terminate there.

Doesn't an invalid UTF-8 sequence provide the same functionality?

Regards,

Anthony Liguori
Michael Roth - March 28, 2011, 7:57 p.m.
On 03/28/2011 02:05 PM, Anthony Liguori wrote:
> On 03/25/2011 04:56 PM, Michael Roth wrote:
>>> I don't quite follow what this is doing.
>>>
>>
>>
>> That's for the session negotiation so we can reset state when the
>> guest agent restarts. The sequence is:
>>
>> guest -> host
>> { "_xport_event": "guest_init", "_xport_arg_sid": <random session id> }
>>
>> host -> guest
>> { "_xport_event": "host_ack", "_xport_arg_sid": <received session id> }
>>
>> Guest will ignore anything it gets until it sees an ack with the
>> proper session id, host will cancel outstanding requests when it
>> receives a guest init. If there's already an event waiting to be sent
>> we clobber it since for the above exchange only the most recent event
>> we sent matters.
>>
>> We send them as json objects, but they get handled at transport level
>> and terminate there.
>
> Doesn't an invalid UTF-8 sequence provide the same functionality?

We do use the invalid UTF-8 sequence here to some extent, but just to 
ensure the xport events are delivered intact. All xport events are 
preceded with 0xFF to make sure they delivered. After negotiation we 
assume everything is clear for standard json streaming.

We could use an invalid UTF-8 sequence by itself, but it would need to 
be paired with some mechanism to clear the channel, otherwise multiple 
guest agent restarts could result in an agent reading a "stale" ack from 
the host and proceeding to service canceled/invalid requests.

I think, at least with virtio-serial, a close() on the chardev would 
clear the channel...so that might work. That may not be the case with 
isa-serial though. Pairing the init/ack sequence with a session id just 
seemed a little more robust.

>
> Regards,
>
> Anthony Liguori
>
>

Patch

diff --git a/qmp-core.c b/qmp-core.c
index 9f3d182..dab50a1 100644
--- a/qmp-core.c
+++ b/qmp-core.c
@@ -937,7 +937,15 @@  void qmp_async_complete_command(QmpCommandState *cmd, QObject *retval, Error *er
     qemu_free(cmd);
 }
 
+extern QmpProxy *qmp_proxy_default;
+
 void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
                         QmpGuestCompletionFunc *cb, void *opaque)
 {
+    if (!qmp_proxy_default) {
+        /* TODO: should set errp here */
+        fprintf(stderr, "qmp proxy: no guest proxy found\n");
+        return;
+    }
+    qmp_proxy_send_request(qmp_proxy_default, name, args, errp, cb, opaque);
 }
diff --git a/qmp-core.h b/qmp-core.h
index b676020..114d290 100644
--- a/qmp-core.h
+++ b/qmp-core.h
@@ -4,6 +4,7 @@ 
 #include "monitor.h"
 #include "qmp-marshal-types.h"
 #include "error_int.h"
+#include "qmp-proxy-core.h"
 
 struct QmpCommandState
 {
@@ -85,11 +86,5 @@  int qmp_state_get_fd(QmpState *sess);
     }                                                        \
 } while(0)
 
-typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data, Error *err);
-
-void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
-                        QmpGuestCompletionFunc *cb, void *opaque);
-
-
 #endif
 
diff --git a/qmp-proxy-core.h b/qmp-proxy-core.h
new file mode 100644
index 0000000..47ac85d
--- /dev/null
+++ b/qmp-proxy-core.h
@@ -0,0 +1,20 @@ 
+#ifndef QMP_PROXY_CORE_H
+#define QMP_PROXY_CORE_H
+
+#define QMP_PROXY_PATH_DEFAULT "/tmp/qmp-proxy.sock"
+
+typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data,
+                                      Error *err);
+
+void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
+                        QmpGuestCompletionFunc *cb, void *opaque);
+
+typedef struct QmpProxy QmpProxy;
+
+void qmp_proxy_send_request(QmpProxy *p, const char *name,
+                            const QDict *args, Error **errp,
+                            QmpGuestCompletionFunc *cb, void *opaque);
+QmpProxy *qmp_proxy_new(const char *channel_path);
+void qmp_proxy_close(QmpProxy *p);
+
+#endif
diff --git a/qmp-proxy.c b/qmp-proxy.c
new file mode 100644
index 0000000..eaa6e6e
--- /dev/null
+++ b/qmp-proxy.c
@@ -0,0 +1,335 @@ 
+/*
+ * QMP definitions for communicating with guest agent
+ *
+ * Copyright IBM Corp. 2011
+ *
+ * Authors:
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qmp.h"
+#include "qmp-core.h"
+#include "qemu-queue.h"
+#include "json-parser.h"
+#include "json-streamer.h"
+#include "qemu_socket.h"
+
+#define QMP_SENTINEL 0xFF
+
+typedef struct QmpProxyRequest {
+    const char *name;
+    const QDict *args;
+    QmpGuestCompletionFunc *cb;
+    void *opaque;
+    QString *json;
+    QTAILQ_ENTRY(QmpProxyRequest) entry;
+} QmpProxyRequest;
+
+typedef struct QmpProxyWriteState {
+    QmpProxyRequest *current_request;
+    const char *buf;
+    size_t size;
+    size_t pos;
+    bool use_sentinel;
+} QmpProxyWriteState;
+
+typedef struct QmpProxyReadState {
+    char *buf;
+    size_t size;
+    size_t pos;
+} QmpProxyReadState;
+
+struct QmpProxy {
+    int fd;
+    const char *path;
+    QmpProxyWriteState write_state;
+    QmpProxyReadState read_state;
+    JSONMessageParser parser;
+    QTAILQ_HEAD(, QmpProxyRequest) sent_requests;
+    QTAILQ_HEAD(, QmpProxyRequest) queued_requests;
+    QString *xport_event;
+    QString *xport_event_sending;
+};
+
+static void qmp_proxy_read_handler(void *opaque);
+static void qmp_proxy_write_handler(void *opaque);
+
+static int qmp_proxy_cancel_request(QmpProxy *p, QmpProxyRequest *r)
+{
+    if (r->name) {
+        if (r->cb) {
+            r->cb(r->opaque, NULL, NULL);
+        }
+    }
+
+    return 0;
+}
+
+static int qmp_proxy_cancel_all(QmpProxy *p)
+{
+    QmpProxyRequest *r, *tmp;
+    QTAILQ_FOREACH_SAFE(r, &p->queued_requests, entry, tmp) {
+        qmp_proxy_cancel_request(p, r);
+        QTAILQ_REMOVE(&p->queued_requests, r, entry);
+    }
+    QTAILQ_FOREACH_SAFE(r, &p->sent_requests, entry, tmp) {
+        qmp_proxy_cancel_request(p, r);
+        QTAILQ_REMOVE(&p->queued_requests, r, entry);
+    }
+
+    return 0;
+}
+
+static void qmp_proxy_send_host_ack(QmpProxy *p, int session_id)
+{
+    QDict *evt = qdict_new();
+
+    /* only the last ack matters, nuke any outstanding ones. need to rethink
+     * this approach if a host->guest reset event is added
+     */
+    if (p->xport_event) {
+        QDECREF(p->xport_event);
+    }
+
+    qdict_put_obj(evt, "_xport_event", QOBJECT(qstring_from_str("host_ack")));
+    qdict_put_obj(evt, "_xport_arg_sid", QOBJECT(qint_from_int(session_id)));
+
+    p->xport_event = qobject_to_json(QOBJECT(evt));
+
+    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler,
+                        qmp_proxy_write_handler, p);
+}
+
+static void qmp_proxy_process_event(JSONMessageParser *parser, QList *tokens)
+{
+    QmpProxy *p = container_of(parser, QmpProxy, parser);
+    QmpProxyRequest *r;
+    QObject *obj;
+    QDict *qdict;
+    Error *err = NULL;
+    const char *cmd;
+    int session_id;
+
+    fprintf(stderr, "qmp proxy: called\n");
+    obj = json_parser_parse_err(tokens, NULL, &err);
+    if (!obj) {
+        fprintf(stderr, "qmp proxy: failed to parse\n");
+        return;
+    } else {
+        fprintf(stderr, "qmp proxy: parse successful\n");
+        qdict = qobject_to_qdict(obj);
+    }
+
+    /* check for transport-only commands/events */
+    if (qdict_haskey(qdict, "_xport_event")) {
+        cmd = qdict_get_try_str(qdict, "_xport_event");
+        if (cmd && strcmp(cmd, "guest_init") == 0) {
+            /* reset outstanding requests, then send an ack with the
+             * session id they passed us
+             */
+            session_id = qdict_get_try_int(qdict, "_xport_arg_sid", 0);
+            if (!session_id) {
+                fprintf(stderr, "received invalid guest_init event\n");
+            }
+            qmp_proxy_cancel_all(p);
+            qmp_proxy_send_host_ack(p, session_id);
+
+            return;
+        }
+    } else if (qdict_haskey(qdict, "return")) {
+        fprintf(stderr, "received return\n");
+        r = QTAILQ_FIRST(&p->sent_requests);
+        if (!r) {
+            fprintf(stderr, "received return, but no request queued\n");
+            return;
+        }
+        /* XXX: can't assume type here */
+        fprintf(stderr, "recieved response for cmd: %s\nreturn: %s\n",
+                r->name, qstring_get_str(qobject_to_json(QOBJECT(qdict))));
+        /* TODO: we don't know what kind of qtype the return value is, what
+         * really need is for the qmp async callbacks to handle demarshalling
+         * for us, for now we just pass the whole response up the stack, which
+         * means everything except commands with no return value, like
+         * guest-ping, will result in errors reported to the client
+         */ 
+        r->cb(r->opaque, QOBJECT(qdict), NULL);
+        QTAILQ_REMOVE(&p->sent_requests, r, entry);
+        fprintf(stderr, "done handling response\n");
+    } else {
+        fprintf(stderr, "received invalid payload format\n");
+    }
+}
+
+static void qmp_proxy_read_handler(void *opaque)
+{
+    QmpProxy *p = opaque;
+    char buf[4096];
+    int ret;
+
+    do {
+        ret = read(p->fd, buf, 4096);
+        if (ret == -1) {
+            if (errno != EAGAIN && errno != EINTR) {
+                fprintf(stderr, "qmp proxy: error reading request: %s",
+                        strerror(errno));
+            }
+            return;
+        } else if (ret == 0) {
+            /* TODO: is this recoverable? should only happen for hot-unplug
+             * in the chardev case, but for testing via a local guest agent
+             * we may want to do some special handling...
+             */
+            fprintf(stderr, "qmp proxy: connection closed unexpectedly");
+            qmp_proxy_cancel_all(p);
+            qemu_set_fd_handler(p->fd, NULL, NULL, p);
+            return;
+        }
+        buf[ret] = 0;
+        json_message_parser_feed(&p->parser, (char *)buf, ret);
+    } while (ret > 0);
+}
+
+static void qmp_proxy_write_handler(void *opaque)
+{
+    QmpProxy *p = opaque;
+    QmpProxyWriteState s = p->write_state;
+    QmpProxyRequest *r;
+    char sentinel = QMP_SENTINEL;
+    int ret;
+
+send_another:
+    if (p->xport_event) {
+        s.current_request = NULL;
+        if (p->xport_event_sending) {
+            QDECREF(p->xport_event_sending);
+        }
+        p->xport_event_sending = p->xport_event;
+        p->xport_event = NULL;
+        s.buf = qstring_get_str(p->xport_event_sending);
+        s.pos = 0;
+        s.size = strlen(s.buf);
+        s.use_sentinel = true;
+    } else if (!s.current_request) {
+        r = QTAILQ_FIRST(&p->queued_requests);
+        if (r == NULL) {
+            /* no more requests to send for now */
+            qemu_set_fd_handler(p->fd, qmp_proxy_read_handler, NULL, p);
+            return;
+        }
+        s.current_request = r;
+        s.buf = qstring_get_str(s.current_request->json);
+        s.pos = 0;
+        s.size = strlen(s.buf);
+        s.use_sentinel = false;
+    }
+
+    while (s.pos < s.size) {
+        if (s.use_sentinel) {
+            ret = write(p->fd, &sentinel, 1);
+        } else {
+            ret = write(p->fd, s.buf + s.pos, s.size - s.pos);
+        }
+        if (ret == -1) {
+            if (errno != EAGAIN && errno != EINTR) {
+                fprintf(stderr, "qmp proxy: error sending payload");
+            }
+            return;
+        } else if (ret == 0) {
+            /* TODO: is this recoverable? should only happen for hot-unplug
+             * in the chardev case, but for testing via a local guest agent
+             * we may want to do some special handling...
+             */
+            fprintf(stderr, "qmp proxy: connection closed unexpectedly");
+            qmp_proxy_cancel_all(p);
+            qemu_set_fd_handler(p->fd, NULL, NULL, p);
+            return;
+        }
+        if (s.use_sentinel) {
+            s.use_sentinel = false;
+        } else {
+            s.pos += ret;
+        }
+    }
+
+    /* done with this request. send another if there is one */
+    if (s.current_request) {
+        QTAILQ_REMOVE(&p->queued_requests, s.current_request, entry);
+        QTAILQ_INSERT_TAIL(&p->sent_requests, s.current_request, entry);
+        s.current_request = NULL;
+    } else if (p->xport_event) {
+        QDECREF(p->xport_event);
+        p->xport_event = NULL;
+    }
+    s.use_sentinel = false;
+    goto send_another;
+}
+
+void qmp_proxy_send_request(QmpProxy *p, const char *name,
+                            const QDict *args, Error **errp,
+                            QmpGuestCompletionFunc *cb, void *opaque)
+{
+    QmpProxyRequest *r = qemu_mallocz(sizeof(QmpProxyRequest));
+    QDict *payload = qdict_new();
+
+    /* TODO: don't really need to hold on to name/args after encoding */
+    r->name = name;
+    r->args = args;
+    r->cb = cb;
+    r->opaque = opaque;
+    QTAILQ_INSERT_TAIL(&p->queued_requests, r, entry);
+
+    qdict_put_obj(payload, "execute", QOBJECT(qstring_from_str(r->name)));
+    /* TODO: casting a const so we can add it to our dictionary. bad. */
+    qdict_put_obj(payload, "arguments", QOBJECT((QDict *)args));
+
+    r->json = qobject_to_json(QOBJECT((QDict *)payload));
+    if (!r->json) {
+        QDECREF(r->json);
+        goto out_bad;
+    }
+
+    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler,
+                        qmp_proxy_write_handler, p);
+out_bad:
+    return;
+}
+
+QmpProxy *qmp_proxy_new(const char *channel_path)
+{
+    QmpProxy *p = qemu_mallocz(sizeof(QmpProxy));
+    QemuOpts *opts;
+    int fd;
+
+    /* connect to guest agent channel */
+    opts = qemu_opts_create(qemu_find_opts("chardev", NULL), NULL, 0, NULL);
+    qemu_opt_set(opts, "path", channel_path, NULL);
+    fd = unix_connect_opts(opts);
+    if (fd == -1) {
+        qemu_opts_del(opts);
+        fprintf(stderr, "qmp proxy: error opening channel: %s",
+                strerror(errno));
+        return NULL;
+    }
+    qemu_opts_del(opts);
+    socket_set_nonblock(fd);
+
+    p->fd = fd;
+    json_message_parser_init(&p->parser, qmp_proxy_process_event);
+    QTAILQ_INIT(&p->queued_requests);
+    QTAILQ_INIT(&p->sent_requests);
+    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler, NULL, p);
+
+    return p;
+}
+
+void qmp_proxy_close(QmpProxy *p)
+{
+    qmp_proxy_cancel_all(p);
+    close(p->fd);
+    unlink(p->path);
+    qemu_free(p);
+}
diff --git a/vl.c b/vl.c
index 3fdc7cc..e8c49ef 100644
--- a/vl.c
+++ b/vl.c
@@ -231,6 +231,7 @@  int ctrl_grab = 0;
 unsigned int nb_prom_envs = 0;
 const char *prom_envs[MAX_PROM_ENVS];
 int boot_menu;
+QmpProxy *qmp_proxy_default;
 
 ShutdownEvent qemu_shutdown_event;
 ResetEvent qemu_reset_event;