diff mbox series

[v3,27/38] QmpSession: keep a queue of pending commands

Message ID 20180326150916.9602-28-marcandre.lureau@redhat.com
State New
Headers show
Series RFC: monitor: add asynchronous command type | expand

Commit Message

Marc-André Lureau March 26, 2018, 3:09 p.m. UTC
The following commit will introduce asynchronous commands. Let's keep
the client aware of the pending commands, so we can do interesting
things like order the replies, or cancel pending operations when the
client is gone.

The queue needs a lock, since QmpSession may be used from multiple
threads.

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
---
 include/qapi/qmp/dispatch.h |  4 ++++
 qapi/qmp-dispatch.c         | 32 ++++++++++++++++++++++++++++++--
 2 files changed, 34 insertions(+), 2 deletions(-)
diff mbox series

Patch

diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
index c5ac3bd41e..94a272a5fb 100644
--- a/include/qapi/qmp/dispatch.h
+++ b/include/qapi/qmp/dispatch.h
@@ -15,6 +15,7 @@ 
 #define QAPI_QMP_DISPATCH_H
 
 #include "qemu/queue.h"
+#include "qemu/thread.h"
 #include "qapi/qmp/json-streamer.h"
 
 typedef struct QmpReturn QmpReturn;
@@ -48,11 +49,14 @@  struct QmpSession {
     QmpDispatch *dispatch_cb;
     QmpDispatchReturn *return_cb;
     QmpCommandList *cmds;
+    QemuMutex pending_lock;
+    QTAILQ_HEAD(, QmpReturn) pending;
 };
 
 struct QmpReturn {
     QmpSession *session;
     QDict *rsp;
+    QTAILQ_ENTRY(QmpReturn) entry;
 };
 
 /*
diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
index 4a73cf88b3..2c162642cb 100644
--- a/qapi/qmp-dispatch.c
+++ b/qapi/qmp-dispatch.c
@@ -31,11 +31,24 @@  QmpReturn *qmp_return_new(QmpSession *session, const QDict *req)
         qdict_put_obj(qret->rsp, "id", id);
     }
 
+    qemu_mutex_lock(&session->pending_lock);
+    QTAILQ_INSERT_TAIL(&session->pending, qret, entry);
+    qemu_mutex_unlock(&session->pending_lock);
+
     return qret;
 }
 
 void qmp_return_free(QmpReturn *qret)
 {
+    QmpSession *session = qret->session;
+
+    if (session) {
+        qemu_mutex_lock(&session->pending_lock);
+    }
+    QTAILQ_REMOVE(&session->pending, qret, entry);
+    if (session) {
+        qemu_mutex_unlock(&session->pending_lock);
+    }
     QDECREF(qret->rsp);
     g_free(qret);
 }
@@ -43,7 +56,9 @@  void qmp_return_free(QmpReturn *qret)
 void qmp_return(QmpReturn *qret, QObject *rsp)
 {
     qdict_put_obj(qret->rsp, "return", rsp ?: QOBJECT(qdict_new()));
-    qret->session->return_cb(qret->session, qret->rsp);
+    if (qret->session) {
+        qret->session->return_cb(qret->session, qret->rsp);
+    }
     qmp_return_free(qret);
 }
 
@@ -55,7 +70,9 @@  void qmp_return_error(QmpReturn *qret, Error *err)
     qdict_put_str(qdict, "desc", error_get_pretty(err));
     qdict_put_obj(qret->rsp, "error", QOBJECT(qdict));
     error_free(err);
-    qret->session->return_cb(qret->session, qret->rsp);
+    if (qret->session) {
+        qret->session->return_cb(qret->session, qret->rsp);
+    }
     qmp_return_free(qret);
 }
 
@@ -220,16 +237,27 @@  void qmp_session_init(QmpSession *session,
     session->cmds = cmds;
     session->dispatch_cb = dispatch_cb;
     session->return_cb = return_cb;
+    qemu_mutex_init(&session->pending_lock);
+    QTAILQ_INIT(&session->pending);
 }
 
 void qmp_session_destroy(QmpSession *session)
 {
+    QmpReturn *ret, *next;
+
     if (!session->return_cb) {
         return;
     }
 
+    qemu_mutex_lock(&session->pending_lock);
+    QTAILQ_FOREACH_SAFE(ret, &session->pending, entry, next) {
+        ret->session = NULL;
+        QTAILQ_REMOVE(&session->pending, ret, entry);
+    }
+    qemu_mutex_unlock(&session->pending_lock);
     session->cmds = NULL;
     session->dispatch_cb = NULL;
     session->return_cb = NULL;
     json_message_parser_destroy(&session->parser);
+    qemu_mutex_destroy(&session->pending_lock);
 }