@@ -47,8 +47,18 @@ struct QmpState
int (*add_connection)(QmpState *s, QmpConnection *conn);
void (*del_connection)(QmpState *s, int global_handle, Error **errp);
void (*event)(QmpState *s, QObject *data);
+ int (*get_fd)(QmpState *s);
};
+typedef struct QmpSession
+{
+ JSONMessageParser parser;
+ QmpState state;
+ CharDriverState *chr;
+ int max_global_handle;
+ QTAILQ_HEAD(, QmpConnection) connections;
+} QmpSession;
+
static QTAILQ_HEAD(, QmpCommand) qmp_commands =
QTAILQ_HEAD_INITIALIZER(qmp_commands);
@@ -128,11 +138,16 @@ void qmp_state_add_connection(QmpState *sess, const char *event_name, QmpSignal
conn->global_handle = sess->add_connection(sess, conn);
}
-void qmp_put_event(QmpState *sess, int global_handle, Error **errp)
+void qmp_put_event(QmpState *sess, int64_t global_handle, Error **errp)
{
sess->del_connection(sess, global_handle, errp);
}
+int qmp_state_get_fd(QmpState *sess)
+{
+ return sess->get_fd(sess);
+}
+
void qmp_state_event(QmpConnection *conn, QObject *data)
{
QDict *event = qdict_new();
@@ -205,3 +220,230 @@ void qmp_signal_disconnect(QmpSignal *obj, int handle)
}
}
}
+
+static QObject *qmp_dispatch_err(QmpState *state, QList *tokens, Error **errp)
+{
+ const char *command;
+ QDict *args, *dict;
+ QObject *request;
+ QmpCommand *cmd;
+ QObject *ret = NULL;
+ Error *err = NULL;
+
+ request = json_parser_parse_err(tokens, NULL, &err);
+ if (request == NULL) {
+ if (err == NULL) {
+ error_set(errp, QERR_JSON_PARSE_ERROR, "no valid JSON object");
+ } else {
+ error_propagate(errp, err);
+ }
+ goto out;
+ }
+ if (qobject_type(request) != QTYPE_QDICT) {
+ error_set(errp, QERR_JSON_PARSE_ERROR, "request is not a dictionary");
+ goto out;
+ }
+
+ dict = qobject_to_qdict(request);
+ if (!qdict_haskey(dict, "execute")) {
+ error_set(errp, QERR_JSON_PARSE_ERROR, "no execute key");
+ goto out;
+ }
+
+ command = qdict_get_str(dict, "execute");
+ cmd = qmp_find_command(command);
+ if (cmd == NULL) {
+ error_set(errp, QERR_COMMAND_NOT_FOUND, command);
+ goto out;
+ }
+
+ if (!qdict_haskey(dict, "arguments")) {
+ args = qdict_new();
+ } else {
+ args = qdict_get_qdict(dict, "arguments");
+ QINCREF(args);
+ }
+
+ switch (cmd->type) {
+ case QCT_NORMAL:
+ cmd->fn(args, &ret, errp);
+ if (ret == NULL) {
+ ret = QOBJECT(qdict_new());
+ }
+ break;
+ case QCT_STATEFUL:
+ cmd->sfn(state, args, &ret, errp);
+ if (ret == NULL) {
+ ret = QOBJECT(qdict_new());
+ }
+ break;
+ case QCT_ASYNC: {
+ QmpCommandState *s = qemu_mallocz(sizeof(*s));
+ // FIXME save async commands and do something
+ // smart if disconnect occurs before completion
+ s->state = state;
+ s->tag = NULL;
+ if (qdict_haskey(dict, "tag")) {
+ s->tag = qdict_get(dict, "tag");
+ qobject_incref(s->tag);
+ }
+ cmd->afn(args, errp, s);
+ ret = NULL;
+ }
+ break;
+ }
+
+ QDECREF(args);
+
+out:
+ qobject_decref(request);
+
+ return ret;
+}
+
+static QObject *qmp_dispatch(QmpState *state, QList *tokens)
+{
+ Error *err = NULL;
+ QObject *ret;
+ QDict *rsp;
+
+ ret = qmp_dispatch_err(state, tokens, &err);
+
+ rsp = qdict_new();
+ if (err) {
+ qdict_put_obj(rsp, "error", error_get_qobject(err));
+ error_free(err);
+ } else if (ret) {
+ qdict_put_obj(rsp, "return", ret);
+ } else {
+ QDECREF(rsp);
+ return NULL;
+ }
+
+ return QOBJECT(rsp);
+}
+
+static void qmp_chr_parse(JSONMessageParser *parser, QList *tokens)
+{
+ QmpSession *s = container_of(parser, QmpSession, parser);
+ QObject *rsp;
+ QString *str;
+
+ rsp = qmp_dispatch(&s->state, tokens);
+
+ if (rsp) {
+ str = qobject_to_json(rsp);
+ qemu_chr_write(s->chr, (void *)str->string, str->length);
+ qemu_chr_write(s->chr, (void *)"\n", 1);
+
+ QDECREF(str);
+ qobject_decref(rsp);
+ }
+}
+
+static int qmp_chr_can_receive(void *opaque)
+{
+ return 1024;
+}
+
+static void qmp_chr_receive(void *opaque, const uint8_t *buf, int size)
+{
+ QmpSession *s = opaque;
+ json_message_parser_feed(&s->parser, (char *)buf, size);
+}
+
+static void qmp_chr_send_greeting(QmpSession *s)
+{
+ VersionInfo *info;
+ QObject *vers;
+ QObject *greeting;
+ QString *str;
+
+ info = qmp_query_version(NULL);
+ vers = qmp_marshal_type_VersionInfo(info);
+ qmp_free_version_info(info);
+
+ greeting = qobject_from_jsonf("{'QMP': {'version': %p, 'capabilities': []} }",
+ vers);
+ str = qobject_to_json(greeting);
+ qobject_decref(greeting);
+
+ qemu_chr_write(s->chr, (void *)str->string, str->length);
+ qemu_chr_write(s->chr, (void *)"\n", 1);
+ QDECREF(str);
+}
+
+static void qmp_chr_event(void *opaque, int event)
+{
+ QmpSession *s = opaque;
+ switch (event) {
+ case CHR_EVENT_OPENED:
+ // FIXME disconnect any connected signals including defaults
+ json_message_parser_init(&s->parser, qmp_chr_parse);
+ qmp_chr_send_greeting(s);
+ break;
+ case CHR_EVENT_CLOSED:
+ json_message_parser_flush(&s->parser);
+ break;
+ }
+}
+
+static int qmp_chr_add_connection(QmpState *state, QmpConnection *conn)
+{
+ QmpSession *s = container_of(state, QmpSession, state);
+
+ QTAILQ_INSERT_TAIL(&s->connections, conn, node);
+ return ++s->max_global_handle;
+}
+
+static void qmp_chr_send_event(QmpState *state, QObject *event)
+{
+ QmpSession *s = container_of(state, QmpSession, state);
+ QString *str;
+
+ str = qobject_to_json(event);
+ qemu_chr_write(s->chr, (void *)str->string, str->length);
+ qemu_chr_write(s->chr, (void *)"\n", 1);
+ QDECREF(str);
+}
+
+static void qmp_chr_del_connection(QmpState *state, int global_handle, Error **errp)
+{
+ QmpSession *s = container_of(state, QmpSession, state);
+ QmpConnection *conn;
+
+ QTAILQ_FOREACH(conn, &s->connections, node) {
+ if (conn->global_handle == global_handle) {
+ qmp_signal_disconnect(conn->signal, conn->handle);
+ QTAILQ_REMOVE(&s->connections, conn, node);
+ qemu_free(conn);
+ return;
+ }
+ }
+
+ error_set(errp, QERR_INVALID_PARAMETER_VALUE, "tag", "valid event handle");
+}
+
+static int qmp_chr_get_fd(QmpState *state)
+{
+ QmpSession *s = container_of(state, QmpSession, state);
+
+ return qemu_chr_get_msgfd(s->chr);
+}
+
+void qmp_init_chardev(CharDriverState *chr)
+{
+ QmpSession *s = qemu_mallocz(sizeof(*s));
+
+ s->chr = chr;
+ s->state.add_connection = qmp_chr_add_connection;
+ s->state.event = qmp_chr_send_event;
+ s->state.del_connection = qmp_chr_del_connection;
+ s->state.get_fd = qmp_chr_get_fd;
+
+ s->max_global_handle = 0;
+ QTAILQ_INIT(&s->connections);
+
+ qemu_chr_add_handlers(chr, qmp_chr_can_receive, qmp_chr_receive,
+ qmp_chr_event, s);
+}
@@ -60,9 +60,10 @@ int qmp_signal_connect(QmpSignal *obj, void *func, void *opaque);
void qmp_signal_disconnect(QmpSignal *obj, int handle);
void qmp_state_add_connection(QmpState *sess, const char *name, QmpSignal *obj, int handle, QmpConnection *conn);
-void qmp_put_event(QmpState *sess, int global_handle, Error **errp);
void qmp_state_event(QmpConnection *conn, QObject *data);
+int qmp_state_get_fd(QmpState *sess);
+
#define signal_init(obj) do { \
(obj)->signal = qmp_signal_init(); \
} while (0)
@@ -83,4 +84,6 @@ void qmp_state_event(QmpConnection *conn, QObject *data);
} \
} while(0)
+void qmp_init_chardev(CharDriverState *chr);
+
#endif
This will replace the current QMP server once all the functions are implemented. Signed-off-by: Anthony Liguori <aliguori@us.ibm.com> --- v1 -> v2 - support for get_fd - support for async commands - free request object on error path