diff mbox

[22/41] migration: yay, buffering is gone

Message ID 1360950433-17106-23-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Feb. 15, 2013, 5:46 p.m. UTC
Buffering was needed because blocking writes could take a long time
and starve other threads seeking to grab the big QEMU mutex.

Now that all writes (except within _complete callbacks) are done
outside the big QEMU mutex, we do not need buffering at all.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/migration/migration.h |    3 --
 migration.c                   |   78 ++++++++++------------------------------
 savevm.c                      |    1 +
 3 files changed, 21 insertions(+), 61 deletions(-)

Comments

Orit Wasserman Feb. 19, 2013, 3:23 p.m. UTC | #1
On 02/15/2013 07:46 PM, Paolo Bonzini wrote:
> Buffering was needed because blocking writes could take a long time
> and starve other threads seeking to grab the big QEMU mutex.
> 
> Now that all writes (except within _complete callbacks) are done
> outside the big QEMU mutex, we do not need buffering at all.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/migration/migration.h |    3 --
>  migration.c                   |   78 ++++++++++------------------------------
>  savevm.c                      |    1 +
>  3 files changed, 21 insertions(+), 61 deletions(-)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index d78bbbb..172ef95 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -34,9 +34,6 @@ struct MigrationState
>      int64_t bandwidth_limit;
>      size_t bytes_xfer;
>      size_t xfer_limit;
> -    uint8_t *buffer;
> -    size_t buffer_size;
> -    size_t buffer_capacity;
>      QemuThread thread;
>      QEMUBH *cleanup_bh;
>  
> diff --git a/migration.c b/migration.c
> index d6a7dff..1f6fbdc 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -503,73 +503,41 @@ int64_t migrate_xbzrle_cache_size(void)
>  
>  /* migration thread support */
>  
> -
> -static void buffered_flush(MigrationState *s)
> -{
> -    size_t offset = 0;
> -    ssize_t ret = 0;
> -
> -    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
> -
> -    if (qemu_file_get_error(s->file)) {
> -        s->buffer_size = 0;
> -        return;
> -    }
> -    qemu_fflush(s->file);
> -
> -    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
> -        size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
> -        ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send);
> -        if (ret <= 0) {
> -            DPRINTF("error flushing data, %zd\n", ret);
> -            break;
> -        } else {
> -            DPRINTF("flushed %zd byte(s)\n", ret);
> -            offset += ret;
> -            s->bytes_xfer += ret;
> -        }
> -    }
> -
> -    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
> -    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
> -    s->buffer_size -= offset;
> -
> -    if (ret < 0) {
> -        qemu_file_set_error(s->file, ret);
> -    }
> -}
> -
>  static int buffered_put_buffer(void *opaque, const uint8_t *buf,
>                                 int64_t pos, int size)
>  {
>      MigrationState *s = opaque;
> -    ssize_t error;
> +    ssize_t ret;
> +    size_t sent;
>  
>      DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
>  
> -    error = qemu_file_get_error(s->file);
> -    if (error) {
> +    ret = qemu_file_get_error(s->file);
> +    if (ret) {
>          DPRINTF("flush when error, bailing: %s\n", strerror(-error));
you need to replace error with ret here.
> -        return error;
> +        return ret;
>      }
>  
>      if (size <= 0) {
>          return size;
>      }
>  
> -    if (size > (s->buffer_capacity - s->buffer_size)) {
> -        DPRINTF("increasing buffer capacity from %zu by %zu\n",
> -                s->buffer_capacity, size + 1024);
> -
> -        s->buffer_capacity += size + 1024;
> -
> -        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
> +    sent = 0;
> +    while (size) {
> +        ret = migrate_fd_put_buffer(s, buf, size);
> +        if (ret <= 0) {
> +            DPRINTF("error flushing data, %zd\n", ret);
> +            return ret;
> +        } else {
> +            DPRINTF("flushed %zd byte(s)\n", ret);
> +            sent += ret;
> +            buf += ret;
> +            size -= ret;
> +            s->bytes_xfer += ret;
> +        }
>      }
>  
> -    memcpy(s->buffer + s->buffer_size, buf, size);
> -    s->buffer_size += size;
> -
> -    return size;
> +    return sent;
>  }
>  
>  static int buffered_close(void *opaque)
> @@ -691,10 +659,9 @@ static void *buffered_file_thread(void *opaque)
>              /* usleep expects microseconds */
>              g_usleep((initial_time + BUFFER_DELAY - current_time)*1000);
>          }
> -        buffered_flush(s);
>          if (qemu_file_get_error(s->file)) {
>              __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_ERROR);
> -        } else if (last_round && s->buffer_size == 0) {
> +        } else if (last_round) {
>              __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_COMPLETED);
>          }
>      }
> @@ -714,7 +681,6 @@ static void *buffered_file_thread(void *opaque)
>      qemu_bh_schedule(s->cleanup_bh);
>      qemu_mutex_unlock_iothread();
>  
> -    g_free(s->buffer);
>      return NULL;
>  }
>  
> @@ -731,10 +697,6 @@ void migrate_fd_connect(MigrationState *s)
>  {
>      s->state = MIG_STATE_ACTIVE;
>      s->bytes_xfer = 0;
> -    s->buffer = NULL;
> -    s->buffer_size = 0;
> -    s->buffer_capacity = 0;
> -
>      s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO;
>  
>      s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
> diff --git a/savevm.c b/savevm.c
> index 7c7774e..ce10295 100644
> --- a/savevm.c
> +++ b/savevm.c
> @@ -1724,6 +1724,7 @@ void qemu_savevm_state_complete(QEMUFile *f)
>      }
>  
>      qemu_put_byte(f, QEMU_VM_EOF);
> +    qemu_fflush(f);
>  }
>  
>  uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size)
> 
At last :)
Reviewed-by: Orit Wasserman <owasserm@redhat.com>
Juan Quintela Feb. 22, 2013, 11:11 a.m. UTC | #2
Paolo Bonzini <pbonzini@redhat.com> wrote:
> Buffering was needed because blocking writes could take a long time
> and starve other threads seeking to grab the big QEMU mutex.
>
> Now that all writes (except within _complete callbacks) are done
> outside the big QEMU mutex, we do not need buffering at all.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>

Reviewed-by: Juan Quintela <quintela@redhat.com>

DEBUG error change was already found by Orit.
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index d78bbbb..172ef95 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -34,9 +34,6 @@  struct MigrationState
     int64_t bandwidth_limit;
     size_t bytes_xfer;
     size_t xfer_limit;
-    uint8_t *buffer;
-    size_t buffer_size;
-    size_t buffer_capacity;
     QemuThread thread;
     QEMUBH *cleanup_bh;
 
diff --git a/migration.c b/migration.c
index d6a7dff..1f6fbdc 100644
--- a/migration.c
+++ b/migration.c
@@ -503,73 +503,41 @@  int64_t migrate_xbzrle_cache_size(void)
 
 /* migration thread support */
 
-
-static void buffered_flush(MigrationState *s)
-{
-    size_t offset = 0;
-    ssize_t ret = 0;
-
-    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
-
-    if (qemu_file_get_error(s->file)) {
-        s->buffer_size = 0;
-        return;
-    }
-    qemu_fflush(s->file);
-
-    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
-        size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
-        ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send);
-        if (ret <= 0) {
-            DPRINTF("error flushing data, %zd\n", ret);
-            break;
-        } else {
-            DPRINTF("flushed %zd byte(s)\n", ret);
-            offset += ret;
-            s->bytes_xfer += ret;
-        }
-    }
-
-    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
-    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
-    s->buffer_size -= offset;
-
-    if (ret < 0) {
-        qemu_file_set_error(s->file, ret);
-    }
-}
-
 static int buffered_put_buffer(void *opaque, const uint8_t *buf,
                                int64_t pos, int size)
 {
     MigrationState *s = opaque;
-    ssize_t error;
+    ssize_t ret;
+    size_t sent;
 
     DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
 
-    error = qemu_file_get_error(s->file);
-    if (error) {
+    ret = qemu_file_get_error(s->file);
+    if (ret) {
         DPRINTF("flush when error, bailing: %s\n", strerror(-error));
-        return error;
+        return ret;
     }
 
     if (size <= 0) {
         return size;
     }
 
-    if (size > (s->buffer_capacity - s->buffer_size)) {
-        DPRINTF("increasing buffer capacity from %zu by %zu\n",
-                s->buffer_capacity, size + 1024);
-
-        s->buffer_capacity += size + 1024;
-
-        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
+    sent = 0;
+    while (size) {
+        ret = migrate_fd_put_buffer(s, buf, size);
+        if (ret <= 0) {
+            DPRINTF("error flushing data, %zd\n", ret);
+            return ret;
+        } else {
+            DPRINTF("flushed %zd byte(s)\n", ret);
+            sent += ret;
+            buf += ret;
+            size -= ret;
+            s->bytes_xfer += ret;
+        }
     }
 
-    memcpy(s->buffer + s->buffer_size, buf, size);
-    s->buffer_size += size;
-
-    return size;
+    return sent;
 }
 
 static int buffered_close(void *opaque)
@@ -691,10 +659,9 @@  static void *buffered_file_thread(void *opaque)
             /* usleep expects microseconds */
             g_usleep((initial_time + BUFFER_DELAY - current_time)*1000);
         }
-        buffered_flush(s);
         if (qemu_file_get_error(s->file)) {
             __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_ERROR);
-        } else if (last_round && s->buffer_size == 0) {
+        } else if (last_round) {
             __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_COMPLETED);
         }
     }
@@ -714,7 +681,6 @@  static void *buffered_file_thread(void *opaque)
     qemu_bh_schedule(s->cleanup_bh);
     qemu_mutex_unlock_iothread();
 
-    g_free(s->buffer);
     return NULL;
 }
 
@@ -731,10 +697,6 @@  void migrate_fd_connect(MigrationState *s)
 {
     s->state = MIG_STATE_ACTIVE;
     s->bytes_xfer = 0;
-    s->buffer = NULL;
-    s->buffer_size = 0;
-    s->buffer_capacity = 0;
-
     s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO;
 
     s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
diff --git a/savevm.c b/savevm.c
index 7c7774e..ce10295 100644
--- a/savevm.c
+++ b/savevm.c
@@ -1724,6 +1724,7 @@  void qemu_savevm_state_complete(QEMUFile *f)
     }
 
     qemu_put_byte(f, QEMU_VM_EOF);
+    qemu_fflush(f);
 }
 
 uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size)