diff mbox series

[for-2.12,2/8] qmp: cleanup qmp queues properly

Message ID 20180326063901.27425-3-peterx@redhat.com
State New
Headers show
Series Monitor: some oob related patches (fixes, new param, tests) | expand

Commit Message

Peter Xu March 26, 2018, 6:38 a.m. UTC
Marc-André Lureau reported that we can have this happen:

1. client1 connects, send command C1
2. client1 disconnects before getting response for C1
3. client2 connects, who might receive response of C1

However client2 should not receive remaining responses for client1.

Basically, we should clean up the request/response queue elements when:

- before a session established
- after a session is closed
- before destroying the queues

Some helpers are introduced to achieve that.  We need to make sure we're
with the lock when operating on those queues.

Reported-by: Marc-André Lureau <marcandre.lureau@redhat.com>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
 monitor.c | 79 +++++++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 59 insertions(+), 20 deletions(-)

Comments

Marc-André Lureau March 26, 2018, 8:44 a.m. UTC | #1
On Mon, Mar 26, 2018 at 8:38 AM, Peter Xu <peterx@redhat.com> wrote:
> Marc-André Lureau reported that we can have this happen:
>
> 1. client1 connects, send command C1
> 2. client1 disconnects before getting response for C1
> 3. client2 connects, who might receive response of C1
>
> However client2 should not receive remaining responses for client1.
>
> Basically, we should clean up the request/response queue elements when:
>
> - before a session established
> - after a session is closed
> - before destroying the queues
>
> Some helpers are introduced to achieve that.  We need to make sure we're
> with the lock when operating on those queues.
>
> Reported-by: Marc-André Lureau <marcandre.lureau@redhat.com>
> Signed-off-by: Peter Xu <peterx@redhat.com>

Reviewed-by: Marc-André Lureau <marcandre.lureau@redhat.com>


> ---
>  monitor.c | 79 +++++++++++++++++++++++++++++++++++++++++++++++----------------
>  1 file changed, 59 insertions(+), 20 deletions(-)
>
> diff --git a/monitor.c b/monitor.c
> index 849fa23bf9..eba98df9da 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -234,6 +234,22 @@ static struct {
>      QEMUBH *qmp_respond_bh;
>  } mon_global;
>
> +struct QMPRequest {
> +    /* Owner of the request */
> +    Monitor *mon;
> +    /* "id" field of the request */
> +    QObject *id;
> +    /* Request object to be handled */
> +    QObject *req;
> +    /*
> +     * Whether we need to resume the monitor afterward.  This flag is
> +     * used to emulate the old QMP server behavior that the current
> +     * command must be completed before execution of the next one.
> +     */
> +    bool need_resume;
> +};
> +typedef struct QMPRequest QMPRequest;
> +
>  /* QMP checker flags */
>  #define QMP_ACCEPT_UNKNOWNS 1
>
> @@ -310,6 +326,43 @@ int monitor_read_password(Monitor *mon, ReadLineFunc *readline_func,
>      }
>  }
>
> +static void qmp_request_free(QMPRequest *req)
> +{
> +    qobject_decref(req->id);
> +    qobject_decref(req->req);
> +    g_free(req);
> +}
> +
> +static void qmp_response_free(QObject *obj)
> +{
> +    qobject_decref(obj);
> +}
> +
> +/* Must with the mon->qmp.qmp_queue_lock held */
> +static void monitor_qmp_cleanup_req_queue_locked(Monitor *mon)
> +{
> +    while (!g_queue_is_empty(mon->qmp.qmp_requests)) {
> +        qmp_request_free(g_queue_pop_head(mon->qmp.qmp_requests));
> +    }
> +}
> +
> +/* Must with the mon->qmp.qmp_queue_lock held */
> +static void monitor_qmp_cleanup_resp_queue_locked(Monitor *mon)
> +{
> +    while (!g_queue_is_empty(mon->qmp.qmp_responses)) {
> +        qmp_response_free(g_queue_pop_head(mon->qmp.qmp_responses));
> +    }
> +}
> +
> +static void monitor_qmp_cleanup_queues(Monitor *mon)
> +{
> +    qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> +    monitor_qmp_cleanup_req_queue_locked(mon);
> +    monitor_qmp_cleanup_resp_queue_locked(mon);
> +    qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> +}
> +
> +
>  static void monitor_flush_locked(Monitor *mon);
>
>  static gboolean monitor_unblocked(GIOChannel *chan, GIOCondition cond,
> @@ -497,7 +550,7 @@ static void monitor_qmp_bh_responder(void *opaque)
>              break;
>          }
>          monitor_json_emitter_raw(response.mon, response.data);
> -        qobject_decref(response.data);
> +        qmp_response_free(response.data);
>      }
>  }
>
> @@ -701,6 +754,8 @@ static void monitor_data_destroy(Monitor *mon)
>      QDECREF(mon->outbuf);
>      qemu_mutex_destroy(&mon->out_lock);
>      qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
> +    monitor_qmp_cleanup_req_queue_locked(mon);
> +    monitor_qmp_cleanup_resp_queue_locked(mon);
>      g_queue_free(mon->qmp.qmp_requests);
>      g_queue_free(mon->qmp.qmp_responses);
>  }
> @@ -4009,22 +4064,6 @@ static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
>      qobject_decref(rsp);
>  }
>
> -struct QMPRequest {
> -    /* Owner of the request */
> -    Monitor *mon;
> -    /* "id" field of the request */
> -    QObject *id;
> -    /* Request object to be handled */
> -    QObject *req;
> -    /*
> -     * Whether we need to resume the monitor afterward.  This flag is
> -     * used to emulate the old QMP server behavior that the current
> -     * command must be completed before execution of the next one.
> -     */
> -    bool need_resume;
> -};
> -typedef struct QMPRequest QMPRequest;
> -
>  /*
>   * Dispatch one single QMP request. The function will free the req_obj
>   * and objects inside it before return.
> @@ -4191,9 +4230,7 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
>              qapi_event_send_command_dropped(id,
>                                              COMMAND_DROP_REASON_QUEUE_FULL,
>                                              &error_abort);
> -            qobject_decref(id);
> -            qobject_decref(req);
> -            g_free(req_obj);
> +            qmp_request_free(req_obj);
>              return;
>          }
>      }
> @@ -4327,6 +4364,7 @@ static void monitor_qmp_event(void *opaque, int event)
>
>      switch (event) {
>      case CHR_EVENT_OPENED:
> +        monitor_qmp_cleanup_queues(mon);
>          mon->qmp.commands = &qmp_cap_negotiation_commands;
>          monitor_qmp_caps_reset(mon);
>          data = get_qmp_greeting(mon);
> @@ -4335,6 +4373,7 @@ static void monitor_qmp_event(void *opaque, int event)
>          mon_refcount++;
>          break;
>      case CHR_EVENT_CLOSED:
> +        monitor_qmp_cleanup_queues(mon);
>          json_message_parser_destroy(&mon->qmp.parser);
>          json_message_parser_init(&mon->qmp.parser, handle_qmp_command);
>          mon_refcount--;
> --
> 2.14.3
>
Eric Blake March 26, 2018, 7:42 p.m. UTC | #2
On 03/26/2018 01:38 AM, Peter Xu wrote:
> Marc-André Lureau reported that we can have this happen:
> 
> 1. client1 connects, send command C1
> 2. client1 disconnects before getting response for C1
> 3. client2 connects, who might receive response of C1
> 
> However client2 should not receive remaining responses for client1.
> 
> Basically, we should clean up the request/response queue elements when:
> 
> - before a session established

Why here? [1]

> - after a session is closed
> - before destroying the queues
> 
> Some helpers are introduced to achieve that.  We need to make sure we're
> with the lock when operating on those queues.
> 

It would also be helpful to mention that the patch includes code motion 
to declare struct QMPRequest earlier in the file.

> Reported-by: Marc-André Lureau <marcandre.lureau@redhat.com>
> Signed-off-by: Peter Xu <peterx@redhat.com>
> ---
>   monitor.c | 79 +++++++++++++++++++++++++++++++++++++++++++++++----------------
>   1 file changed, 59 insertions(+), 20 deletions(-)
> 

> +static void qmp_request_free(QMPRequest *req)
> +{
> +    qobject_decref(req->id);
> +    qobject_decref(req->req);
> +    g_free(req);
> +}
> +
> +static void qmp_response_free(QObject *obj)
> +{
> +    qobject_decref(obj);
> +}

Why do we need this function?  Unless you plan to add to it in later 
patches, I'd rather just inline things and directly call 
qobject_decref() at the callsites...

> +
> +/* Must with the mon->qmp.qmp_queue_lock held */
> +static void monitor_qmp_cleanup_req_queue_locked(Monitor *mon)
> +{
> +    while (!g_queue_is_empty(mon->qmp.qmp_requests)) {
> +        qmp_request_free(g_queue_pop_head(mon->qmp.qmp_requests));
> +    }
> +}
> +
> +/* Must with the mon->qmp.qmp_queue_lock held */
> +static void monitor_qmp_cleanup_resp_queue_locked(Monitor *mon)
> +{
> +    while (!g_queue_is_empty(mon->qmp.qmp_responses)) {
> +        qmp_response_free(g_queue_pop_head(mon->qmp.qmp_responses));

...here,

> +    }
> +}
> +
> +static void monitor_qmp_cleanup_queues(Monitor *mon)
> +{
> +    qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> +    monitor_qmp_cleanup_req_queue_locked(mon);
> +    monitor_qmp_cleanup_resp_queue_locked(mon);
> +    qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> +}
> +
> +
>   static void monitor_flush_locked(Monitor *mon);
>   
>   static gboolean monitor_unblocked(GIOChannel *chan, GIOCondition cond,
> @@ -497,7 +550,7 @@ static void monitor_qmp_bh_responder(void *opaque)
>               break;
>           }
>           monitor_json_emitter_raw(response.mon, response.data);
> -        qobject_decref(response.data);
> +        qmp_response_free(response.data);

and here.

> @@ -4327,6 +4364,7 @@ static void monitor_qmp_event(void *opaque, int event)
>   
>       switch (event) {
>       case CHR_EVENT_OPENED:
> +        monitor_qmp_cleanup_queues(mon);

[1] How would something be queued to need cleanup at this point, if we 
already start with a clean queue before the first monitor, and if all 
monitor close actions clean the queue?

>           mon->qmp.commands = &qmp_cap_negotiation_commands;
>           monitor_qmp_caps_reset(mon);
>           data = get_qmp_greeting(mon);
> @@ -4335,6 +4373,7 @@ static void monitor_qmp_event(void *opaque, int event)
>           mon_refcount++;
>           break;
>       case CHR_EVENT_CLOSED:
> +        monitor_qmp_cleanup_queues(mon);
>           json_message_parser_destroy(&mon->qmp.parser);
>           json_message_parser_init(&mon->qmp.parser, handle_qmp_command);
>           mon_refcount--;
>
Peter Xu March 27, 2018, 2:30 a.m. UTC | #3
On Mon, Mar 26, 2018 at 02:42:23PM -0500, Eric Blake wrote:
> On 03/26/2018 01:38 AM, Peter Xu wrote:
> > Marc-André Lureau reported that we can have this happen:
> > 
> > 1. client1 connects, send command C1
> > 2. client1 disconnects before getting response for C1
> > 3. client2 connects, who might receive response of C1
> > 
> > However client2 should not receive remaining responses for client1.
> > 
> > Basically, we should clean up the request/response queue elements when:
> > 
> > - before a session established
> 
> Why here? [1]

Yes it can be omitted. We can do it either here or closing, the only
difference should be that when added here there's more possibility
that the pending commands (requests from disconnected clients) be
executed rather than dropped.

Here I did the cleanup on both places.  Drop any of them would be fine
too.

> 
> > - after a session is closed
> > - before destroying the queues
> > 
> > Some helpers are introduced to achieve that.  We need to make sure we're
> > with the lock when operating on those queues.
> > 
> 
> It would also be helpful to mention that the patch includes code motion to
> declare struct QMPRequest earlier in the file.
> 
> > Reported-by: Marc-André Lureau <marcandre.lureau@redhat.com>
> > Signed-off-by: Peter Xu <peterx@redhat.com>
> > ---
> >   monitor.c | 79 +++++++++++++++++++++++++++++++++++++++++++++++----------------
> >   1 file changed, 59 insertions(+), 20 deletions(-)
> > 
> 
> > +static void qmp_request_free(QMPRequest *req)
> > +{
> > +    qobject_decref(req->id);
> > +    qobject_decref(req->req);
> > +    g_free(req);
> > +}
> > +
> > +static void qmp_response_free(QObject *obj)
> > +{
> > +    qobject_decref(obj);
> > +}
> 
> Why do we need this function?  Unless you plan to add to it in later
> patches, I'd rather just inline things and directly call qobject_decref() at
> the callsites...

Yes we can omit that.

> 
> > +
> > +/* Must with the mon->qmp.qmp_queue_lock held */
> > +static void monitor_qmp_cleanup_req_queue_locked(Monitor *mon)
> > +{
> > +    while (!g_queue_is_empty(mon->qmp.qmp_requests)) {
> > +        qmp_request_free(g_queue_pop_head(mon->qmp.qmp_requests));
> > +    }
> > +}
> > +
> > +/* Must with the mon->qmp.qmp_queue_lock held */
> > +static void monitor_qmp_cleanup_resp_queue_locked(Monitor *mon)
> > +{
> > +    while (!g_queue_is_empty(mon->qmp.qmp_responses)) {
> > +        qmp_response_free(g_queue_pop_head(mon->qmp.qmp_responses));
> 
> ...here,
> 
> > +    }
> > +}
> > +
> > +static void monitor_qmp_cleanup_queues(Monitor *mon)
> > +{
> > +    qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> > +    monitor_qmp_cleanup_req_queue_locked(mon);
> > +    monitor_qmp_cleanup_resp_queue_locked(mon);
> > +    qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> > +}
> > +
> > +
> >   static void monitor_flush_locked(Monitor *mon);
> >   static gboolean monitor_unblocked(GIOChannel *chan, GIOCondition cond,
> > @@ -497,7 +550,7 @@ static void monitor_qmp_bh_responder(void *opaque)
> >               break;
> >           }
> >           monitor_json_emitter_raw(response.mon, response.data);
> > -        qobject_decref(response.data);
> > +        qmp_response_free(response.data);
> 
> and here.
> 
> > @@ -4327,6 +4364,7 @@ static void monitor_qmp_event(void *opaque, int event)
> >       switch (event) {
> >       case CHR_EVENT_OPENED:
> > +        monitor_qmp_cleanup_queues(mon);
> 
> [1] How would something be queued to need cleanup at this point, if we
> already start with a clean queue before the first monitor, and if all
> monitor close actions clean the queue?

(answered above)

Thanks,
diff mbox series

Patch

diff --git a/monitor.c b/monitor.c
index 849fa23bf9..eba98df9da 100644
--- a/monitor.c
+++ b/monitor.c
@@ -234,6 +234,22 @@  static struct {
     QEMUBH *qmp_respond_bh;
 } mon_global;
 
+struct QMPRequest {
+    /* Owner of the request */
+    Monitor *mon;
+    /* "id" field of the request */
+    QObject *id;
+    /* Request object to be handled */
+    QObject *req;
+    /*
+     * Whether we need to resume the monitor afterward.  This flag is
+     * used to emulate the old QMP server behavior that the current
+     * command must be completed before execution of the next one.
+     */
+    bool need_resume;
+};
+typedef struct QMPRequest QMPRequest;
+
 /* QMP checker flags */
 #define QMP_ACCEPT_UNKNOWNS 1
 
@@ -310,6 +326,43 @@  int monitor_read_password(Monitor *mon, ReadLineFunc *readline_func,
     }
 }
 
+static void qmp_request_free(QMPRequest *req)
+{
+    qobject_decref(req->id);
+    qobject_decref(req->req);
+    g_free(req);
+}
+
+static void qmp_response_free(QObject *obj)
+{
+    qobject_decref(obj);
+}
+
+/* Must with the mon->qmp.qmp_queue_lock held */
+static void monitor_qmp_cleanup_req_queue_locked(Monitor *mon)
+{
+    while (!g_queue_is_empty(mon->qmp.qmp_requests)) {
+        qmp_request_free(g_queue_pop_head(mon->qmp.qmp_requests));
+    }
+}
+
+/* Must with the mon->qmp.qmp_queue_lock held */
+static void monitor_qmp_cleanup_resp_queue_locked(Monitor *mon)
+{
+    while (!g_queue_is_empty(mon->qmp.qmp_responses)) {
+        qmp_response_free(g_queue_pop_head(mon->qmp.qmp_responses));
+    }
+}
+
+static void monitor_qmp_cleanup_queues(Monitor *mon)
+{
+    qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+    monitor_qmp_cleanup_req_queue_locked(mon);
+    monitor_qmp_cleanup_resp_queue_locked(mon);
+    qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+}
+
+
 static void monitor_flush_locked(Monitor *mon);
 
 static gboolean monitor_unblocked(GIOChannel *chan, GIOCondition cond,
@@ -497,7 +550,7 @@  static void monitor_qmp_bh_responder(void *opaque)
             break;
         }
         monitor_json_emitter_raw(response.mon, response.data);
-        qobject_decref(response.data);
+        qmp_response_free(response.data);
     }
 }
 
@@ -701,6 +754,8 @@  static void monitor_data_destroy(Monitor *mon)
     QDECREF(mon->outbuf);
     qemu_mutex_destroy(&mon->out_lock);
     qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
+    monitor_qmp_cleanup_req_queue_locked(mon);
+    monitor_qmp_cleanup_resp_queue_locked(mon);
     g_queue_free(mon->qmp.qmp_requests);
     g_queue_free(mon->qmp.qmp_responses);
 }
@@ -4009,22 +4064,6 @@  static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
     qobject_decref(rsp);
 }
 
-struct QMPRequest {
-    /* Owner of the request */
-    Monitor *mon;
-    /* "id" field of the request */
-    QObject *id;
-    /* Request object to be handled */
-    QObject *req;
-    /*
-     * Whether we need to resume the monitor afterward.  This flag is
-     * used to emulate the old QMP server behavior that the current
-     * command must be completed before execution of the next one.
-     */
-    bool need_resume;
-};
-typedef struct QMPRequest QMPRequest;
-
 /*
  * Dispatch one single QMP request. The function will free the req_obj
  * and objects inside it before return.
@@ -4191,9 +4230,7 @@  static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens)
             qapi_event_send_command_dropped(id,
                                             COMMAND_DROP_REASON_QUEUE_FULL,
                                             &error_abort);
-            qobject_decref(id);
-            qobject_decref(req);
-            g_free(req_obj);
+            qmp_request_free(req_obj);
             return;
         }
     }
@@ -4327,6 +4364,7 @@  static void monitor_qmp_event(void *opaque, int event)
 
     switch (event) {
     case CHR_EVENT_OPENED:
+        monitor_qmp_cleanup_queues(mon);
         mon->qmp.commands = &qmp_cap_negotiation_commands;
         monitor_qmp_caps_reset(mon);
         data = get_qmp_greeting(mon);
@@ -4335,6 +4373,7 @@  static void monitor_qmp_event(void *opaque, int event)
         mon_refcount++;
         break;
     case CHR_EVENT_CLOSED:
+        monitor_qmp_cleanup_queues(mon);
         json_message_parser_destroy(&mon->qmp.parser);
         json_message_parser_init(&mon->qmp.parser, handle_qmp_command);
         mon_refcount--;