Patchwork [v3,11/35] savevm, buffered_file: introduce method to drain buffer of buffered file

login
register
mail settings
Submitter Isaku Yamahata
Date Oct. 30, 2012, 8:32 a.m.
Message ID <9a05f47e22158c1b84940af2429a71e91d47df36.1351582535.git.yamahata@valinux.co.jp>
Download mbox | patch
Permalink /patch/195355/
State New
Headers show

Comments

Isaku Yamahata - Oct. 30, 2012, 8:32 a.m.
Introduce a new method to drain the buffer of QEMUBufferedFile.
When postcopy migration, buffer size can increase unboundedly.
To keep the buffer size reasonably small, introduce the method to
wait for buffer to drain.
Detect unfreeze output by select too, not only by timer, thus pending data
can be sent quickly.

Signed-off-by: Isaku Yamahata <yamahata@valinux.co.jp>
---
 buffered_file.c |   59 +++++++++++++++++++++++++++++++++++++++++++++----------
 buffered_file.h |    1 +
 qemu-file.h     |    1 +
 savevm.c        |    7 +++++++
 4 files changed, 58 insertions(+), 10 deletions(-)

Patch

diff --git a/buffered_file.c b/buffered_file.c
index ed92df1..275d504 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -26,12 +26,14 @@  typedef struct QEMUFileBuffered
     MigrationState *migration_state;
     QEMUFile *file;
     int freeze_output;
+    bool no_limit;
     size_t bytes_xfer;
     size_t xfer_limit;
     uint8_t *buffer;
     size_t buffer_size;
     size_t buffer_capacity;
     QEMUTimer *timer;
+    int unfreeze_fd;
 } QEMUFileBuffered;
 
 #ifdef DEBUG_BUFFERED_FILE
@@ -42,6 +44,16 @@  typedef struct QEMUFileBuffered
     do { } while (0)
 #endif
 
+static ssize_t buffered_flush(QEMUFileBuffered *s);
+
+static void buffered_unfreeze(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+    qemu_set_fd_handler(s->unfreeze_fd, NULL, NULL, NULL);
+    s->freeze_output = 0;
+    buffered_flush(s);
+}
+
 static void buffered_append(QEMUFileBuffered *s,
                             const uint8_t *buf, size_t size)
 {
@@ -65,7 +77,8 @@  static ssize_t buffered_flush(QEMUFileBuffered *s)
 
     DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
 
-    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
+    while ((s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) ||
+           s->no_limit) {
 
         ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
                                     s->buffer_size - offset);
@@ -73,6 +86,15 @@  static ssize_t buffered_flush(QEMUFileBuffered *s)
             DPRINTF("backend not ready, freezing\n");
             ret = 0;
             s->freeze_output = 1;
+            if (!s->no_limit) {
+                if (s->unfreeze_fd == -1) {
+                    s->unfreeze_fd = dup(s->migration_state->fd);
+                }
+                if (s->unfreeze_fd >= 0) {
+                    qemu_set_fd_handler(s->unfreeze_fd,
+                                        NULL, buffered_unfreeze, s);
+                }
+            }
             break;
         }
 
@@ -113,7 +135,7 @@  static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
     s->freeze_output = 0;
 
     if (size > 0) {
-        DPRINTF("buffering %d bytes\n", size - offset);
+        DPRINTF("buffering %d bytes\n", size);
         buffered_append(s, buf, size);
     }
 
@@ -134,17 +156,11 @@  static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
     return size;
 }
 
-static int buffered_close(void *opaque)
+static void buffered_drain(QEMUFileBuffered *s)
 {
-    QEMUFileBuffered *s = opaque;
-    ssize_t ret = 0;
-    int ret2;
-
-    DPRINTF("closing\n");
-
     s->xfer_limit = INT_MAX;
     while (!qemu_file_get_error(s->file) && s->buffer_size) {
-        ret = buffered_flush(s);
+        ssize_t ret = buffered_flush(s);
         if (ret < 0) {
             break;
         }
@@ -153,13 +169,27 @@  static int buffered_close(void *opaque)
             if (ret < 0) {
                 break;
             }
+            s->freeze_output = 0;
         }
     }
+}
+
+static int buffered_close(void *opaque)
+{
+    QEMUFileBuffered *s = opaque;
+    ssize_t ret = 0;
+    int ret2;
 
+    DPRINTF("closing\n");
+
+    buffered_drain(s);
     ret2 = migrate_fd_close(s->migration_state);
     if (ret >= 0) {
         ret = ret2;
     }
+    if (s->unfreeze_fd >= 0) {
+        close(s->unfreeze_fd);
+    }
     qemu_del_timer(s->timer);
     qemu_free_timer(s->timer);
     g_free(s->buffer);
@@ -242,6 +272,7 @@  QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
 
     s->migration_state = migration_state;
     s->xfer_limit = migration_state->bandwidth_limit / 10;
+    s->unfreeze_fd = -1;
 
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
@@ -254,3 +285,11 @@  QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
 
     return s->file;
 }
+
+void qemu_buffered_file_drain_buffer(void *buffered_file)
+{
+    QEMUFileBuffered *s = buffered_file;
+    s->no_limit = true;
+    buffered_drain(s);
+    s->no_limit = false;
+}
diff --git a/buffered_file.h b/buffered_file.h
index ef010fe..be714a7 100644
--- a/buffered_file.h
+++ b/buffered_file.h
@@ -18,5 +18,6 @@ 
 #include "migration.h"
 
 QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state);
+void qemu_buffered_file_drain_buffer(void *buffered_file);
 
 #endif
diff --git a/qemu-file.h b/qemu-file.h
index 452efcd..8074df1 100644
--- a/qemu-file.h
+++ b/qemu-file.h
@@ -76,6 +76,7 @@  typedef struct QEMUFileBuf QEMUFileBuf;
 QEMUFileBuf *qemu_fopen_buf_write(void);
 QEMUFile *qemu_fopen_buf_read(uint8_t *buf, size_t size);
 int qemu_file_fd(QEMUFile *f);
+void qemu_buffered_file_drain(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
 int qemu_fflush(QEMUFile *f);
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
diff --git a/savevm.c b/savevm.c
index 7e55dce..93c51ab 100644
--- a/savevm.c
+++ b/savevm.c
@@ -86,6 +86,7 @@ 
 #include "memory.h"
 #include "qmp-commands.h"
 #include "trace.h"
+#include "buffered_file.h"
 
 #define SELF_ANNOUNCE_ROUNDS 5
 
@@ -558,6 +559,12 @@  int qemu_fflush(QEMUFile *f)
     return ret;
 }
 
+void qemu_buffered_file_drain(QEMUFile *f)
+{
+    qemu_fflush(f);
+    qemu_buffered_file_drain_buffer(f->opaque);
+}
+
 static void qemu_fill_buffer(QEMUFile *f)
 {
     int len;