Patchwork [04/30] buffered_file: Move from using a timer to use a thread

login
register
mail settings
Submitter Juan Quintela
Date Oct. 18, 2012, 7:30 a.m.
Message ID <1350545426-23172-5-git-send-email-quintela@redhat.com>
Download mbox | patch
Permalink /patch/192207/
State New
Headers show

Comments

Juan Quintela - Oct. 18, 2012, 7:30 a.m.
We still protect everything except the wait with the iothread lock.
But we moved from a timer to a thread.  Steps one by one.

We also need to detect when we have finished with a variable "complete".

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 buffered_file.c | 58 +++++++++++++++++++++++++++++++++++----------------------
 1 file changed, 36 insertions(+), 22 deletions(-)
Paolo Bonzini - Oct. 18, 2012, 8:56 a.m.
Il 18/10/2012 09:30, Juan Quintela ha scritto:
> +        }
> +        if (s->bytes_xfer >= s->xfer_limit) {
> +            /* usleep expects microseconds */
> +            usleep((expire_time - current_time)*1000);
> +        }

g_usleep please.

Paolo
Orit Wasserman - Oct. 21, 2012, 12:09 p.m.
On 10/18/2012 09:30 AM, Juan Quintela wrote:
> We still protect everything except the wait with the iothread lock.
> But we moved from a timer to a thread.  Steps one by one.
> 
> We also need to detect when we have finished with a variable "complete".
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  buffered_file.c | 58 +++++++++++++++++++++++++++++++++++----------------------
>  1 file changed, 36 insertions(+), 22 deletions(-)
> 
> diff --git a/buffered_file.c b/buffered_file.c
> index ed92df1..4b90d54 100644
> --- a/buffered_file.c
> +++ b/buffered_file.c
> @@ -18,6 +18,7 @@
>  #include "qemu-timer.h"
>  #include "qemu-char.h"
>  #include "buffered_file.h"
> +#include "qemu-thread.h"
> 
>  //#define DEBUG_BUFFERED_FILE
> 
> @@ -31,7 +32,8 @@ typedef struct QEMUFileBuffered
>      uint8_t *buffer;
>      size_t buffer_size;
>      size_t buffer_capacity;
> -    QEMUTimer *timer;
> +    QemuThread thread;
> +    bool complete;
>  } QEMUFileBuffered;
> 
>  #ifdef DEBUG_BUFFERED_FILE
> @@ -160,11 +162,8 @@ static int buffered_close(void *opaque)
>      if (ret >= 0) {
>          ret = ret2;
>      }
> -    qemu_del_timer(s->timer);
> -    qemu_free_timer(s->timer);
> -    g_free(s->buffer);
> -    g_free(s);
> -
> +    ret = migrate_fd_close(s->migration_state);
> +    s->complete = true;
>      return ret;
>  }
> 
> @@ -215,23 +214,38 @@ static int64_t buffered_get_rate_limit(void *opaque)
>      return s->xfer_limit;
>  }
> 
> -static void buffered_rate_tick(void *opaque)
> +/* 10ms  xfer_limit is the limit that we should write each 10ms */
maybe 100, please fix the comment

Orit
> +#define BUFFER_DELAY 100
> +
> +static void *buffered_file_thread(void *opaque)
>  {
>      QEMUFileBuffered *s = opaque;
> +    int64_t expire_time = qemu_get_clock_ms(rt_clock) + BUFFER_DELAY;
> 
> -    if (qemu_file_get_error(s->file)) {
> -        buffered_close(s);
> -        return;
> -    }
> -
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> -
> -    if (s->freeze_output)
> -        return;
> -
> -    s->bytes_xfer = 0;
> +    while (true) {
> +        int64_t current_time = qemu_get_clock_ms(rt_clock);
> 
> -    buffered_put_buffer(s, NULL, 0, 0);
> +        if (s->complete) {
> +            break;
> +        }
> +        if (s->freeze_output) {
> +            continue;
> +        }
> +        if (current_time >= expire_time) {
> +            s->bytes_xfer = 0;
> +            expire_time = current_time + BUFFER_DELAY;
> +        }
> +        if (s->bytes_xfer >= s->xfer_limit) {
> +            /* usleep expects microseconds */
> +            usleep((expire_time - current_time)*1000);
> +        }
> +        qemu_mutex_lock_iothread();
> +        buffered_put_buffer(s, NULL, 0, 0);
> +        qemu_mutex_unlock_iothread();
> +    }
> +    g_free(s->buffer);
> +    g_free(s);
> +    return NULL;
>  }
> 
>  QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
> @@ -242,15 +256,15 @@ QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
> 
>      s->migration_state = migration_state;
>      s->xfer_limit = migration_state->bandwidth_limit / 10;
> +    s->complete = false;
> 
>      s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
>                               buffered_close, buffered_rate_limit,
>                               buffered_set_rate_limit,
>  			     buffered_get_rate_limit);
> 
> -    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
> -
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    qemu_thread_create(&s->thread, buffered_file_thread, s,
> +                       QEMU_THREAD_DETACHED);
> 
>      return s->file;
>  }
>
Paolo Bonzini - Nov. 12, 2012, 11:42 a.m.
Il 18/10/2012 09:30, Juan Quintela ha scritto:
> @@ -160,11 +162,8 @@ static int buffered_close(void *opaque)
>      if (ret >= 0) {
>          ret = ret2;
>      }
> -    qemu_del_timer(s->timer);
> -    qemu_free_timer(s->timer);
> -    g_free(s->buffer);
> -    g_free(s);
> -
> +    ret = migrate_fd_close(s->migration_state);
> +    s->complete = true;
>      return ret;

Just above these lines there is

    ret2 = migrate_fd_close(s->migration_state);

I believe the second call to migrate_fd_close should be removed.

Paolo

Patch

diff --git a/buffered_file.c b/buffered_file.c
index ed92df1..4b90d54 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -18,6 +18,7 @@ 
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "buffered_file.h"
+#include "qemu-thread.h"

 //#define DEBUG_BUFFERED_FILE

@@ -31,7 +32,8 @@  typedef struct QEMUFileBuffered
     uint8_t *buffer;
     size_t buffer_size;
     size_t buffer_capacity;
-    QEMUTimer *timer;
+    QemuThread thread;
+    bool complete;
 } QEMUFileBuffered;

 #ifdef DEBUG_BUFFERED_FILE
@@ -160,11 +162,8 @@  static int buffered_close(void *opaque)
     if (ret >= 0) {
         ret = ret2;
     }
-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
-    g_free(s->buffer);
-    g_free(s);
-
+    ret = migrate_fd_close(s->migration_state);
+    s->complete = true;
     return ret;
 }

@@ -215,23 +214,38 @@  static int64_t buffered_get_rate_limit(void *opaque)
     return s->xfer_limit;
 }

-static void buffered_rate_tick(void *opaque)
+/* 10ms  xfer_limit is the limit that we should write each 10ms */
+#define BUFFER_DELAY 100
+
+static void *buffered_file_thread(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
+    int64_t expire_time = qemu_get_clock_ms(rt_clock) + BUFFER_DELAY;

-    if (qemu_file_get_error(s->file)) {
-        buffered_close(s);
-        return;
-    }
-
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
-
-    if (s->freeze_output)
-        return;
-
-    s->bytes_xfer = 0;
+    while (true) {
+        int64_t current_time = qemu_get_clock_ms(rt_clock);

-    buffered_put_buffer(s, NULL, 0, 0);
+        if (s->complete) {
+            break;
+        }
+        if (s->freeze_output) {
+            continue;
+        }
+        if (current_time >= expire_time) {
+            s->bytes_xfer = 0;
+            expire_time = current_time + BUFFER_DELAY;
+        }
+        if (s->bytes_xfer >= s->xfer_limit) {
+            /* usleep expects microseconds */
+            usleep((expire_time - current_time)*1000);
+        }
+        qemu_mutex_lock_iothread();
+        buffered_put_buffer(s, NULL, 0, 0);
+        qemu_mutex_unlock_iothread();
+    }
+    g_free(s->buffer);
+    g_free(s);
+    return NULL;
 }

 QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
@@ -242,15 +256,15 @@  QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)

     s->migration_state = migration_state;
     s->xfer_limit = migration_state->bandwidth_limit / 10;
+    s->complete = false;

     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
 			     buffered_get_rate_limit);

-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
-
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, buffered_file_thread, s,
+                       QEMU_THREAD_DETACHED);

     return s->file;
 }