@@ -15,6 +15,7 @@
#define QAPI_QMP_DISPATCH_H
#include "qemu/queue.h"
+#include "qemu/thread.h"
#include "qapi/qmp/json-streamer.h"
typedef struct QmpReturn QmpReturn;
@@ -48,11 +49,14 @@ struct QmpSession {
QmpDispatch *dispatch_cb;
QmpDispatchReturn *return_cb;
QmpCommandList *cmds;
+ QemuMutex pending_lock;
+ QTAILQ_HEAD(, QmpReturn) pending;
};
struct QmpReturn {
QmpSession *session;
QDict *rsp;
+ QTAILQ_ENTRY(QmpReturn) entry;
};
/*
@@ -31,11 +31,24 @@ QmpReturn *qmp_return_new(QmpSession *session, const QDict *req)
qdict_put_obj(qret->rsp, "id", id);
}
+ qemu_mutex_lock(&session->pending_lock);
+ QTAILQ_INSERT_TAIL(&session->pending, qret, entry);
+ qemu_mutex_unlock(&session->pending_lock);
+
return qret;
}
void qmp_return_free(QmpReturn *qret)
{
+ QmpSession *session = qret->session;
+
+ if (session) {
+ qemu_mutex_lock(&session->pending_lock);
+ }
+ QTAILQ_REMOVE(&session->pending, qret, entry);
+ if (session) {
+ qemu_mutex_unlock(&session->pending_lock);
+ }
QDECREF(qret->rsp);
g_free(qret);
}
@@ -43,7 +56,9 @@ void qmp_return_free(QmpReturn *qret)
void qmp_return(QmpReturn *qret, QObject *rsp)
{
qdict_put_obj(qret->rsp, "return", rsp ?: QOBJECT(qdict_new()));
- qret->session->return_cb(qret->session, qret->rsp);
+ if (qret->session) {
+ qret->session->return_cb(qret->session, qret->rsp);
+ }
qmp_return_free(qret);
}
@@ -55,7 +70,9 @@ void qmp_return_error(QmpReturn *qret, Error *err)
qdict_put_str(qdict, "desc", error_get_pretty(err));
qdict_put_obj(qret->rsp, "error", QOBJECT(qdict));
error_free(err);
- qret->session->return_cb(qret->session, qret->rsp);
+ if (qret->session) {
+ qret->session->return_cb(qret->session, qret->rsp);
+ }
qmp_return_free(qret);
}
@@ -220,16 +237,27 @@ void qmp_session_init(QmpSession *session,
session->cmds = cmds;
session->dispatch_cb = dispatch_cb;
session->return_cb = return_cb;
+ qemu_mutex_init(&session->pending_lock);
+ QTAILQ_INIT(&session->pending);
}
void qmp_session_destroy(QmpSession *session)
{
+ QmpReturn *ret, *next;
+
if (!session->return_cb) {
return;
}
+ qemu_mutex_lock(&session->pending_lock);
+ QTAILQ_FOREACH_SAFE(ret, &session->pending, entry, next) {
+ ret->session = NULL;
+ QTAILQ_REMOVE(&session->pending, ret, entry);
+ }
+ qemu_mutex_unlock(&session->pending_lock);
session->cmds = NULL;
session->dispatch_cb = NULL;
session->return_cb = NULL;
json_message_parser_destroy(&session->parser);
+ qemu_mutex_destroy(&session->pending_lock);
}
The following commit will introduce asynchronous commands. Let's keep the client aware of the pending commands, so we can do interesting things like order the replies, or cancel pending operations when the client is gone. The queue needs a lock, since QmpSession may be used from multiple threads. Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com> --- include/qapi/qmp/dispatch.h | 4 ++++ qapi/qmp-dispatch.c | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-)