diff mbox

[RFC,2/6] monitor: Simplify event throttling

Message ID 1443189647-11417-3-git-send-email-armbru@redhat.com
State New
Headers show

Commit Message

Markus Armbruster Sept. 25, 2015, 2 p.m. UTC
The event throttling state machine is hard to understand.  I'm not
sure it's entirely correct.  Rewrite it in a more straightforward
manner:

State 1: No event sent recently (less than evconf->rate ns ago)

    Invariant: evstate->timer is not pending, evstate->qdict is null

    On event: send event, arm timer, goto state 2

State 2: Event sent recently, no additional event being delayed

    Invariant: evstate->timer is pending, evstate->qdict is null

    On event: store it in evstate->qdict, goto state 3

    On timer: goto state 1

State 3: Event sent recently, additional event being delayed

    Invariant: evstate->timer is pending, evstate->qdict is non-null

    On event: store it in evstate->qdict, goto state 3

    On timer: send evstate->qdict, clear evstate->qdict,
              arm timer, goto state 2

FIXME update trace-event (and recompile the world)

Signed-off-by: Markus Armbruster <armbru@redhat.com>
---
 monitor.c | 70 +++++++++++++++++++++++++++++++--------------------------------
 1 file changed, 35 insertions(+), 35 deletions(-)

Comments

Eric Blake Sept. 25, 2015, 4:42 p.m. UTC | #1
On 09/25/2015 08:00 AM, Markus Armbruster wrote:
> The event throttling state machine is hard to understand.  I'm not
> sure it's entirely correct.  Rewrite it in a more straightforward
> manner:
> 
> State 1: No event sent recently (less than evconf->rate ns ago)
> 
>     Invariant: evstate->timer is not pending, evstate->qdict is null
> 
>     On event: send event, arm timer, goto state 2
> 
> State 2: Event sent recently, no additional event being delayed
> 
>     Invariant: evstate->timer is pending, evstate->qdict is null
> 
>     On event: store it in evstate->qdict, goto state 3
> 
>     On timer: goto state 1
> 
> State 3: Event sent recently, additional event being delayed
> 
>     Invariant: evstate->timer is pending, evstate->qdict is non-null
> 
>     On event: store it in evstate->qdict, goto state 3
> 
>     On timer: send evstate->qdict, clear evstate->qdict,
>               arm timer, goto state 2
> 

Makes sense for what throttling is supposed to be.

> FIXME update trace-event (and recompile the world)

Obviously something you'd fold into a v2, so I'll defer R-b until seeing
it.  But I like the approach of this patch.

> 
> Signed-off-by: Markus Armbruster <armbru@redhat.com>
> ---
>  monitor.c | 70 +++++++++++++++++++++++++++++++--------------------------------
>  1 file changed, 35 insertions(+), 35 deletions(-)
> 

>      if (!evstate->rate) {
> +        /* Unthrottled event */
>          monitor_qapi_event_emit(event, qdict);
> -        evstate->last = now;
>      } else {
> -        int64_t delta = now - evstate->last;
> -        if (evstate->qdict ||
> -            delta < evstate->rate) {
> -            /* If there's an existing event pending, replace
> -             * it with the new event, otherwise schedule a
> -             * timer for delayed emission
> +        if (timer_pending(evstate->timer)) {

Possible in states 2 and 3...

> +            /*
> +             * Timer is pending for (at least) evstate->rate ns after
> +             * last send.  Store event for sending when timer fires,
> +             * replacing a prior stored event if any.
>               */
> -            if (evstate->qdict) {
> -                QDECREF(evstate->qdict);
> -            } else {
> -                int64_t then = evstate->last + evstate->rate;
> -                timer_mod_ns(evstate->timer, then);
> -            }
> +            QDECREF(evstate->qdict);

no-op in state 2, otherwise replaces the old pending event in state 3

>              evstate->qdict = qdict;
>              QINCREF(evstate->qdict);

either way, we are now in state 3 awaiting the next event or timer.

>          } else {

we are in state 1...

> +            /*
> +             * Last send was (at least) evstate->rate ns ago.
> +             * Send immediately, and arm the timer to call
> +             * monitor_qapi_event_handler() in evstate->rate ns.  Any
> +             * events arriving before then will be delayed until then.
> +             */
> +            int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> +
>              monitor_qapi_event_emit(event, qdict);
> -            evstate->last = now;
> +            timer_mod_ns(evstate->timer, now + evstate->rate);

and now in state 2.

>          }
>      }
> +
>      qemu_mutex_unlock(&monitor_lock);
>  }
>  
>  /*
> - * The callback invoked by QemuTimer when a delayed
> - * event is ready to be emitted
> + * This function runs evstate->rate ns after sending a throttled
> + * event.
> + * If another event has since been stored, send it.
>   */
>  static void monitor_qapi_event_handler(void *opaque)
>  {

We get here in either state 2 or 3...

>      MonitorQAPIEventState *evstate = opaque;
> -    int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>  
>      trace_monitor_protocol_event_handler(evstate->event,
>                                           evstate->qdict,
> -                                         evstate->last,
> -                                         now);
> +                                         0, 0); /* FIXME drop */
>      qemu_mutex_lock(&monitor_lock);
> +
>      if (evstate->qdict) {

state 3, so send it...

> +        int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> +
>          monitor_qapi_event_emit(evstate->event, evstate->qdict);
>          QDECREF(evstate->qdict);
>          evstate->qdict = NULL;
> +        timer_mod_ns(evstate->timer, now + evstate->rate);

...and rearm to go back to state 2

>      }
> -    evstate->last = now;
> +

...otherwise, we didn't rearm, so we go back to state 1

>      qemu_mutex_unlock(&monitor_lock);
>  }
>  

Matches the state logic called out in the commit message.

> @@ -539,20 +542,17 @@ static void
>  monitor_qapi_event_throttle(QAPIEvent event, int64_t rate)
>  {
>      MonitorQAPIEventState *evstate;
> +
>      assert(event < QAPI_EVENT_MAX);
> -
> -    evstate = &(monitor_qapi_event_state[event]);
> -
> +    evstate = &monitor_qapi_event_state[event];
>      trace_monitor_protocol_event_throttle(event, rate);
>      evstate->event = event;
>      assert(rate * SCALE_MS <= INT64_MAX);
>      evstate->rate = rate * SCALE_MS;
> -    evstate->last = 0;
>      evstate->qdict = NULL;
> -    evstate->timer = timer_new(QEMU_CLOCK_REALTIME,
> -                               SCALE_MS,
> -                               monitor_qapi_event_handler,
> -                               evstate);
> +    evstate->timer = timer_new_ns(QEMU_CLOCK_REALTIME,
> +                                  monitor_qapi_event_handler,
> +                                  evstate);

Is this a separate cleanup?  It looks like you're fixing code style, and
then switching from timer_new() to timer_new_ns(), but it's not obvious
from just this context whether you still have the right scale (and I
didn't go chase documentation).  It wasn't mentioned in the commit
message, and seems unrelated to the new state machine.
Markus Armbruster Sept. 28, 2015, 8:14 a.m. UTC | #2
Eric Blake <eblake@redhat.com> writes:

> On 09/25/2015 08:00 AM, Markus Armbruster wrote:
>> The event throttling state machine is hard to understand.  I'm not
>> sure it's entirely correct.  Rewrite it in a more straightforward
>> manner:
>> 
>> State 1: No event sent recently (less than evconf->rate ns ago)
>> 
>>     Invariant: evstate->timer is not pending, evstate->qdict is null
>> 
>>     On event: send event, arm timer, goto state 2
>> 
>> State 2: Event sent recently, no additional event being delayed
>> 
>>     Invariant: evstate->timer is pending, evstate->qdict is null
>> 
>>     On event: store it in evstate->qdict, goto state 3
>> 
>>     On timer: goto state 1
>> 
>> State 3: Event sent recently, additional event being delayed
>> 
>>     Invariant: evstate->timer is pending, evstate->qdict is non-null
>> 
>>     On event: store it in evstate->qdict, goto state 3
>> 
>>     On timer: send evstate->qdict, clear evstate->qdict,
>>               arm timer, goto state 2
>> 
>
> Makes sense for what throttling is supposed to be.
>
>> FIXME update trace-event (and recompile the world)
>
> Obviously something you'd fold into a v2, so I'll defer R-b until seeing
> it.  But I like the approach of this patch.
>
>> 
>> Signed-off-by: Markus Armbruster <armbru@redhat.com>
>> ---
>>  monitor.c | 70 +++++++++++++++++++++++++++++++--------------------------------
>>  1 file changed, 35 insertions(+), 35 deletions(-)
>> 
>
>>      if (!evstate->rate) {
>> +        /* Unthrottled event */
>>          monitor_qapi_event_emit(event, qdict);
>> -        evstate->last = now;
>>      } else {
>> -        int64_t delta = now - evstate->last;
>> -        if (evstate->qdict ||
>> -            delta < evstate->rate) {
>> -            /* If there's an existing event pending, replace
>> -             * it with the new event, otherwise schedule a
>> -             * timer for delayed emission
>> +        if (timer_pending(evstate->timer)) {
>
> Possible in states 2 and 3...
>
>> +            /*
>> +             * Timer is pending for (at least) evstate->rate ns after
>> +             * last send.  Store event for sending when timer fires,
>> +             * replacing a prior stored event if any.
>>               */
>> -            if (evstate->qdict) {
>> -                QDECREF(evstate->qdict);
>> -            } else {
>> -                int64_t then = evstate->last + evstate->rate;
>> -                timer_mod_ns(evstate->timer, then);
>> -            }
>> +            QDECREF(evstate->qdict);
>
> no-op in state 2, otherwise replaces the old pending event in state 3
>
>>              evstate->qdict = qdict;
>>              QINCREF(evstate->qdict);
>
> either way, we are now in state 3 awaiting the next event or timer.
>
>>          } else {
>
> we are in state 1...
>
>> +            /*
>> +             * Last send was (at least) evstate->rate ns ago.
>> +             * Send immediately, and arm the timer to call
>> +             * monitor_qapi_event_handler() in evstate->rate ns.  Any
>> +             * events arriving before then will be delayed until then.
>> +             */
>> +            int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>> +
>>              monitor_qapi_event_emit(event, qdict);
>> -            evstate->last = now;
>> +            timer_mod_ns(evstate->timer, now + evstate->rate);
>
> and now in state 2.
>
>>          }
>>      }
>> +
>>      qemu_mutex_unlock(&monitor_lock);
>>  }
>>  
>>  /*
>> - * The callback invoked by QemuTimer when a delayed
>> - * event is ready to be emitted
>> + * This function runs evstate->rate ns after sending a throttled
>> + * event.
>> + * If another event has since been stored, send it.
>>   */
>>  static void monitor_qapi_event_handler(void *opaque)
>>  {
>
> We get here in either state 2 or 3...
>
>>      MonitorQAPIEventState *evstate = opaque;
>> -    int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>>  
>>      trace_monitor_protocol_event_handler(evstate->event,
>>                                           evstate->qdict,
>> -                                         evstate->last,
>> -                                         now);
>> +                                         0, 0); /* FIXME drop */
>>      qemu_mutex_lock(&monitor_lock);
>> +
>>      if (evstate->qdict) {
>
> state 3, so send it...
>
>> +        int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>> +
>>          monitor_qapi_event_emit(evstate->event, evstate->qdict);
>>          QDECREF(evstate->qdict);
>>          evstate->qdict = NULL;
>> +        timer_mod_ns(evstate->timer, now + evstate->rate);
>
> ...and rearm to go back to state 2
>
>>      }
>> -    evstate->last = now;
>> +
>
> ...otherwise, we didn't rearm, so we go back to state 1
>
>>      qemu_mutex_unlock(&monitor_lock);
>>  }
>>  
>
> Matches the state logic called out in the commit message.
>
>> @@ -539,20 +542,17 @@ static void
>>  monitor_qapi_event_throttle(QAPIEvent event, int64_t rate)
>>  {
>>      MonitorQAPIEventState *evstate;
>> +
>>      assert(event < QAPI_EVENT_MAX);
>> -
>> -    evstate = &(monitor_qapi_event_state[event]);
>> -
>> +    evstate = &monitor_qapi_event_state[event];
>>      trace_monitor_protocol_event_throttle(event, rate);
>>      evstate->event = event;
>>      assert(rate * SCALE_MS <= INT64_MAX);
>>      evstate->rate = rate * SCALE_MS;
>> -    evstate->last = 0;
>>      evstate->qdict = NULL;
>> -    evstate->timer = timer_new(QEMU_CLOCK_REALTIME,
>> -                               SCALE_MS,
>> -                               monitor_qapi_event_handler,
>> -                               evstate);
>> +    evstate->timer = timer_new_ns(QEMU_CLOCK_REALTIME,
>> +                                  monitor_qapi_event_handler,
>> +                                  evstate);
>
> Is this a separate cleanup?  It looks like you're fixing code style, and
> then switching from timer_new() to timer_new_ns(), but it's not obvious
> from just this context whether you still have the right scale (and I
> didn't go chase documentation).  It wasn't mentioned in the commit
> message, and seems unrelated to the new state machine.

I'll split it off and explain it in the commit message.

Thanks!
diff mbox

Patch

diff --git a/monitor.c b/monitor.c
index 2cbb2d9..56c9dec 100644
--- a/monitor.c
+++ b/monitor.c
@@ -183,7 +183,6 @@  typedef struct {
 typedef struct MonitorQAPIEventState {
     QAPIEvent event;    /* Event being tracked */
     int64_t rate;       /* Minimum time (in ns) between two events */
-    int64_t last;       /* QEMU_CLOCK_REALTIME value at last emission */
     QEMUTimer *timer;   /* Timer for handling delayed events */
     QDict *qdict;       /* Event pending delayed dispatch */
 } MonitorQAPIEventState;
@@ -465,65 +464,69 @@  static void
 monitor_qapi_event_queue(QAPIEvent event, QDict *qdict, Error **errp)
 {
     MonitorQAPIEventState *evstate;
-    assert(event < QAPI_EVENT_MAX);
-    int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 
-    evstate = &(monitor_qapi_event_state[event]);
+    assert(event < QAPI_EVENT_MAX);
+    evstate = &monitor_qapi_event_state[event];
     trace_monitor_protocol_event_queue(event,
                                        qdict,
                                        evstate->rate,
-                                       evstate->last,
-                                       now);
+                                       0, 0); /* FIXME drop */
 
-    /* Rate limit of 0 indicates no throttling */
     qemu_mutex_lock(&monitor_lock);
+
     if (!evstate->rate) {
+        /* Unthrottled event */
         monitor_qapi_event_emit(event, qdict);
-        evstate->last = now;
     } else {
-        int64_t delta = now - evstate->last;
-        if (evstate->qdict ||
-            delta < evstate->rate) {
-            /* If there's an existing event pending, replace
-             * it with the new event, otherwise schedule a
-             * timer for delayed emission
+        if (timer_pending(evstate->timer)) {
+            /*
+             * Timer is pending for (at least) evstate->rate ns after
+             * last send.  Store event for sending when timer fires,
+             * replacing a prior stored event if any.
              */
-            if (evstate->qdict) {
-                QDECREF(evstate->qdict);
-            } else {
-                int64_t then = evstate->last + evstate->rate;
-                timer_mod_ns(evstate->timer, then);
-            }
+            QDECREF(evstate->qdict);
             evstate->qdict = qdict;
             QINCREF(evstate->qdict);
         } else {
+            /*
+             * Last send was (at least) evstate->rate ns ago.
+             * Send immediately, and arm the timer to call
+             * monitor_qapi_event_handler() in evstate->rate ns.  Any
+             * events arriving before then will be delayed until then.
+             */
+            int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+
             monitor_qapi_event_emit(event, qdict);
-            evstate->last = now;
+            timer_mod_ns(evstate->timer, now + evstate->rate);
         }
     }
+
     qemu_mutex_unlock(&monitor_lock);
 }
 
 /*
- * The callback invoked by QemuTimer when a delayed
- * event is ready to be emitted
+ * This function runs evstate->rate ns after sending a throttled
+ * event.
+ * If another event has since been stored, send it.
  */
 static void monitor_qapi_event_handler(void *opaque)
 {
     MonitorQAPIEventState *evstate = opaque;
-    int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 
     trace_monitor_protocol_event_handler(evstate->event,
                                          evstate->qdict,
-                                         evstate->last,
-                                         now);
+                                         0, 0); /* FIXME drop */
     qemu_mutex_lock(&monitor_lock);
+
     if (evstate->qdict) {
+        int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+
         monitor_qapi_event_emit(evstate->event, evstate->qdict);
         QDECREF(evstate->qdict);
         evstate->qdict = NULL;
+        timer_mod_ns(evstate->timer, now + evstate->rate);
     }
-    evstate->last = now;
+
     qemu_mutex_unlock(&monitor_lock);
 }
 
@@ -539,20 +542,17 @@  static void
 monitor_qapi_event_throttle(QAPIEvent event, int64_t rate)
 {
     MonitorQAPIEventState *evstate;
+
     assert(event < QAPI_EVENT_MAX);
-
-    evstate = &(monitor_qapi_event_state[event]);
-
+    evstate = &monitor_qapi_event_state[event];
     trace_monitor_protocol_event_throttle(event, rate);
     evstate->event = event;
     assert(rate * SCALE_MS <= INT64_MAX);
     evstate->rate = rate * SCALE_MS;
-    evstate->last = 0;
     evstate->qdict = NULL;
-    evstate->timer = timer_new(QEMU_CLOCK_REALTIME,
-                               SCALE_MS,
-                               monitor_qapi_event_handler,
-                               evstate);
+    evstate->timer = timer_new_ns(QEMU_CLOCK_REALTIME,
+                                  monitor_qapi_event_handler,
+                                  evstate);
 }
 
 static void monitor_qapi_event_init(void)