diff mbox series

[RFC,v3,23/27] qmp: isolate responses into io thread

Message ID 20171106094643.14881-24-peterx@redhat.com
State New
Headers show
Series [RFC,v3,01/27] char-io: fix possible race on IOWatchPoll | expand

Commit Message

Peter Xu Nov. 6, 2017, 9:46 a.m. UTC
For those monitors who has enabled IO thread, we'll offload the
responding procedure into IO thread.  The main reason is that chardev is
not thread safe, and we need to do all the read/write IOs in the same
thread.  For use_io_thr=true monitors, that thread is the IO thread.

We do this isolation in similar pattern as what we have done to the
request queue: we first create one response queue for each monitor, then
instead of reply directly in main thread, we queue the responses and
kick the IO thread to do the rest of the job for us.

Signed-off-by: Peter Xu <peterx@redhat.com>
---
 monitor.c | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 89 insertions(+), 1 deletion(-)

Comments

Fam Zheng Nov. 7, 2017, 7:57 a.m. UTC | #1
On Mon, 11/06 17:46, Peter Xu wrote:
> @@ -4294,6 +4366,11 @@ static GMainContext *monitor_io_context_get(void)
>      return iothread_get_g_main_context(mon_global.mon_iothread);
>  }
>  
> +static AioContext *monitor_aio_context_get(void)
> +{
> +    return iothread_get_aio_context(mon_global.mon_iothread);
> +}
> +

This hunk fits better in patch 10, I think?

>  static void monitor_iothread_init(void)
>  {
>      mon_global.mon_iothread = iothread_create("monitor_iothread",

Fam
Peter Xu Nov. 8, 2017, 7:31 a.m. UTC | #2
On Tue, Nov 07, 2017 at 03:57:08PM +0800, Fam Zheng wrote:
> On Mon, 11/06 17:46, Peter Xu wrote:
> > @@ -4294,6 +4366,11 @@ static GMainContext *monitor_io_context_get(void)
> >      return iothread_get_g_main_context(mon_global.mon_iothread);
> >  }
> >  
> > +static AioContext *monitor_aio_context_get(void)
> > +{
> > +    return iothread_get_aio_context(mon_global.mon_iothread);
> > +}
> > +
> 
> This hunk fits better in patch 10, I think?

This function is only used in current patch, so I think it's fine to
put it here.  But sure I can move it into patch 10 as well.

Thanks,

> 
> >  static void monitor_iothread_init(void)
> >  {
> >      mon_global.mon_iothread = iothread_create("monitor_iothread",
> 
> Fam
Fam Zheng Nov. 8, 2017, 11:16 a.m. UTC | #3
On Wed, 11/08 15:31, Peter Xu wrote:
> On Tue, Nov 07, 2017 at 03:57:08PM +0800, Fam Zheng wrote:
> > On Mon, 11/06 17:46, Peter Xu wrote:
> > > @@ -4294,6 +4366,11 @@ static GMainContext *monitor_io_context_get(void)
> > >      return iothread_get_g_main_context(mon_global.mon_iothread);
> > >  }
> > >  
> > > +static AioContext *monitor_aio_context_get(void)
> > > +{
> > > +    return iothread_get_aio_context(mon_global.mon_iothread);
> > > +}
> > > +
> > 
> > This hunk fits better in patch 10, I think?
> 
> This function is only used in current patch, so I think it's fine to
> put it here.  But sure I can move it into patch 10 as well.

OK, so it's static and you want to keep the compiler happy. That's fine then.

Fam
diff mbox series

Patch

diff --git a/monitor.c b/monitor.c
index 45d293469a..a2550d79a8 100644
--- a/monitor.c
+++ b/monitor.c
@@ -172,6 +172,8 @@  typedef struct {
     QemuMutex qmp_queue_lock;
     /* Input queue that holds all the parsed QMP requests */
     GQueue *qmp_requests;
+    /* Output queue contains all the QMP responses in order */
+    GQueue *qmp_responses;
 } MonitorQMP;
 
 /*
@@ -219,6 +221,8 @@  struct MonitorGlobal {
     IOThread *mon_iothread;
     /* Bottom half to dispatch the requests received from IO thread */
     QEMUBH *qmp_dispatcher_bh;
+    /* Bottom half to deliver the responses back to clients */
+    QEMUBH *qmp_respond_bh;
 };
 
 static struct MonitorGlobal mon_global;
@@ -395,7 +399,8 @@  int monitor_fprintf(FILE *stream, const char *fmt, ...)
     return 0;
 }
 
-static void monitor_json_emitter(Monitor *mon, const QObject *data)
+static void monitor_json_emitter_raw(Monitor *mon,
+                                     QObject *data)
 {
     QString *json;
 
@@ -409,6 +414,71 @@  static void monitor_json_emitter(Monitor *mon, const QObject *data)
     QDECREF(json);
 }
 
+static void monitor_json_emitter(Monitor *mon, QObject *data)
+{
+    if (mon->use_io_thr) {
+        /*
+         * If using IO thread, we need to queue the item so that IO
+         * thread will do the rest for us.  Take refcount so that
+         * caller won't free the data (which will be finally freed in
+         * responder thread).
+         */
+        qobject_incref(data);
+        qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+        g_queue_push_tail(mon->qmp.qmp_responses, (void *)data);
+        qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+        qemu_bh_schedule(mon_global.qmp_respond_bh);
+    } else {
+        /*
+         * If not using monitor IO thread, then we are in main thread.
+         * Do the emission right away.
+         */
+        monitor_json_emitter_raw(mon, data);
+    }
+}
+
+struct QMPResponse {
+    Monitor *mon;
+    QObject *data;
+};
+typedef struct QMPResponse QMPResponse;
+
+/*
+ * Return one QMPResponse.  The response is only valid if
+ * response.data is not NULL.
+ */
+static QMPResponse monitor_qmp_response_pop_one(void)
+{
+    Monitor *mon;
+    QObject *data = NULL;
+
+    qemu_mutex_lock(&monitor_lock);
+    QTAILQ_FOREACH(mon, &mon_list, entry) {
+        qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+        data = g_queue_pop_head(mon->qmp.qmp_responses);
+        qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+        if (data) {
+            break;
+        }
+    }
+    qemu_mutex_unlock(&monitor_lock);
+    return (QMPResponse) { .mon = mon, .data = data };
+}
+
+static void monitor_qmp_bh_responder(void *opaque)
+{
+    QMPResponse response;
+
+    while (true) {
+        response = monitor_qmp_response_pop_one();
+        if (!response.data) {
+            break;
+        }
+        monitor_json_emitter_raw(response.mon, response.data);
+        qobject_decref(response.data);
+    }
+}
+
 static MonitorQAPIEventConf monitor_qapi_event_conf[QAPI_EVENT__MAX] = {
     /* Limit guest-triggerable events to 1 per second */
     [QAPI_EVENT_RTC_CHANGE]        = { 1000 * SCALE_MS },
@@ -595,6 +665,7 @@  static void monitor_data_init(Monitor *mon, bool skip_flush,
     mon->skip_flush = skip_flush;
     mon->use_io_thr = use_io_thr;
     mon->qmp.qmp_requests = g_queue_new();
+    mon->qmp.qmp_responses = g_queue_new();
 }
 
 static void monitor_data_destroy(Monitor *mon)
@@ -609,6 +680,7 @@  static void monitor_data_destroy(Monitor *mon)
     qemu_mutex_destroy(&mon->out_lock);
     qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
     g_queue_free(mon->qmp.qmp_requests);
+    g_queue_free(mon->qmp.qmp_responses);
 }
 
 char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
@@ -4294,6 +4366,11 @@  static GMainContext *monitor_io_context_get(void)
     return iothread_get_g_main_context(mon_global.mon_iothread);
 }
 
+static AioContext *monitor_aio_context_get(void)
+{
+    return iothread_get_aio_context(mon_global.mon_iothread);
+}
+
 static void monitor_iothread_init(void)
 {
     mon_global.mon_iothread = iothread_create("monitor_iothread",
@@ -4312,6 +4389,15 @@  static void monitor_iothread_init(void)
     mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(),
                                               monitor_qmp_bh_dispatcher,
                                               NULL);
+
+    /*
+     * Unlike the dispatcher BH, this must be run on the monitor IO
+     * thread, so that monitors that are using IO thread will make
+     * sure read/write operations are all done on the IO thread.
+     */
+    mon_global.qmp_respond_bh = aio_bh_new(monitor_aio_context_get(),
+                                           monitor_qmp_bh_responder,
+                                           NULL);
 }
 
 void monitor_init_globals(void)
@@ -4435,6 +4521,8 @@  void monitor_cleanup(void)
     /* QEMUBHs needs to be deleted before destroying the IOThread. */
     qemu_bh_delete(mon_global.qmp_dispatcher_bh);
     mon_global.qmp_dispatcher_bh = NULL;
+    qemu_bh_delete(mon_global.qmp_respond_bh);
+    mon_global.qmp_respond_bh = NULL;
 
     iothread_destroy(mon_global.mon_iothread);
     mon_global.mon_iothread = NULL;