diff mbox

[RFC,4/4] Curling: the receiver

Message ID 1378784607-7398-5-git-send-email-junqing.wang@cs2c.com.cn
State New
Headers show

Commit Message

Jules Wang Sept. 10, 2013, 3:43 a.m. UTC
The receiver does migration loop until the migration connection is
lost. Then, it is started as a backup.

The receiver does not load vm state once a migration begins,
instead, it perfetches one whole migration data into a buffer,
then loads vm state from that buffer afterwards.

Signed-off-by: Jules Wang <junqing.wang@cs2c.com.cn>
---
 include/migration/qemu-file.h |   1 +
 include/sysemu/sysemu.h       |   1 +
 migration.c                   |  22 ++++--
 savevm.c                      | 154 ++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 168 insertions(+), 10 deletions(-)

Comments

Jules Wang Sept. 11, 2013, 8:25 a.m. UTC | #1
hi,


At 2013-09-10 22:19:48,"Juan Quintela" <quintela@redhat.com> wrote:

>> @@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque)
>>  {
>>      QEMUFile *f = opaque;
>>      int ret;
>> +    int count = 0;
>>  
>> -    ret = qemu_loadvm_state(f);
>> -    qemu_fclose(f);
>> -    if (ret < 0) {
>> -        fprintf(stderr, "load of migration failed\n");
>> -        exit(EXIT_FAILURE);
>> +    if (ft_enabled()) {
>> +        while (qemu_loadvm_state_ft(f) >= 0) {
>> +            count++;
>> +            DPRINTF("incoming count %d\r", count);
>> +        }
>> +        qemu_fclose(f);
>> +        fprintf(stderr, "ft connection lost, launching self..\n");
>
>Obviously,  here we are needing something more that an fprintf,,  right?
>
>We are not checking either if it is one error.

Agree.

>> +    } else {
>> +        ret = qemu_loadvm_state(f);
>> +        qemu_fclose(f);
>> +        if (ret < 0) {
>> +            fprintf(stderr, "load of migration failed\n");
>> +            exit(EXIT_FAILURE);
>> +        }
>>      }
>> +    cpu_synchronize_all_post_init();
>>      qemu_announce_self();
>>      DPRINTF("successfully loaded vm state\n");
>>  
>> diff --git a/savevm.c b/savevm.c
>> index 6daf690..d5bf153 100644
>> --- a/savevm.c
>> +++ b/savevm.c
>> @@ -52,6 +52,8 @@
>>  #define ARP_PTYPE_IP 0x0800
>>  #define ARP_OP_REQUEST_REV 0x3
>>  
>> +#define PFB_SIZE 0x010000
>> +
>>  static int announce_self_create(uint8_t *buf,
>>  				uint8_t *mac_addr)
>>  {
>> @@ -135,6 +137,10 @@ struct QEMUFile {
>>      unsigned int iovcnt;
>>  
>>      int last_error;
>> +
>> +    uint8_t *pfb;   /* pfb -> PerFetch Buffer */
>
>s/PreFetch/Prefetcth/
>
>prefetch_buffer as name?  not used in so many places,  makes things
>clearer or more convoluted?  Other comments?
>

Agree.

>> +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
>> +                                      int64_t pos, int size)
>> +{
>> +    QEMUFile *f = opaque;
>> +
>> +    if (f->pfb_size - pos <= 0) {
>> +        return 0;
>> +    }
>> +
>> +    if (f->pfb_size - pos < size) {
>> +        size = f->pfb_size - pos;
>> +    }
>> +
>> +    memcpy(buf, f->pfb+pos, size);
>> +
>> +    return size;
>> +}
>> +
>> +
>>  static int socket_close(void *opaque)
>>  {
>>      QEMUFileSocket *s = opaque;
>> @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
>>  static const QEMUFileOps socket_read_ops = {
>>      .get_fd =     socket_get_fd,
>>      .get_buffer = socket_get_buffer,
>> +    .get_prefetch_buffer = socket_get_prefetch_buffer,
>>      .close =      socket_close
>>  };
>>  
>
>>      if (f->last_error) {
>>          ret = f->last_error;
>>      }
>> +
>> +    if (f->pfb) {
>> +        g_free(f->pfb);
>
>g_free(f->pfb);
>It already checks for NULL.

Got it.

>> +    }
>> +
>>      g_free(f);
>>      return ret;
>>  }
>> @@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v)
>>  
>>  static void qemu_file_skip(QEMUFile *f, int size)
>>  {
>> +    if (f->pfb_index + size <= f->pfb_size) {
>> +        f->pfb_index += size;
>> +        return;
>> +    } else {
>> +        size -= f->pfb_size - f->pfb_index;
>> +        f->pfb_index = f->pfb_size;
>> +    }
>> +
>>      if (f->buf_index + size <= f->buf_size) {
>>          f->buf_index += size;
>>      }
>> @@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset)
>>  {
>>      int pending;
>>      int index;
>> +    int done;
>> +
>> +    if (f->ops->get_prefetch_buffer) {
>> +        if (f->pfb_index + offset < f->pfb_size) {
>> +            done = f->ops->get_prefetch_buffer(f, buf, f->pfb_index + offset,
>> +                                               size);
>> +            if (done == size) {
>> +                return size;
>> +            }
>> +            size -= done;
>> +            buf  += done;
>> +        } else {
>> +            offset -= f->pfb_size - f->pfb_index;
>> +        }
>> +    }
>>  
>>      assert(!qemu_file_is_writable(f));
>>  
>> @@ -875,7 +929,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
>>  
>>  static int qemu_peek_byte(QEMUFile *f, int offset)
>>  {
>> -    int index = f->buf_index + offset;
>> +    int index;
>> +
>> +    if (f->pfb_index + offset < f->pfb_size) {
>> +        return f->pfb[f->pfb_index + offset];
>> +    } else {
>> +        offset -= f->pfb_size - f->pfb_index;
>> +    }
>> +
>> +    index = f->buf_index + offset;
>>  
>>      assert(!qemu_file_is_writable(f));
>>  
>> @@ -1851,7 +1913,7 @@ void qemu_savevm_state_begin(QEMUFile *f,
>>          }
>>          se->ops->set_params(params, se->opaque);
>>      }
>> -    
>> +
>>      qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
>>      qemu_put_be32(f, QEMU_VM_FILE_VERSION);
>>  
>> @@ -2294,8 +2356,6 @@ int qemu_loadvm_state(QEMUFile *f)
>>          }
>>      }
>>  
>> -    cpu_synchronize_all_post_init();
>> -
>>      ret = 0;
>>  
>>  out:
>> @@ -2311,6 +2371,89 @@ out:
>>      return ret;
>>  }
>>  
>> +int qemu_loadvm_state_ft(QEMUFile *f)
>> +{
>> +    int ret = 0;
>> +    int i   = 0;
>> +    int j   = 0;
>> +    int done = 0;
>> +    uint64_t size = 0;
>> +    uint64_t count = 0;
>> +    uint8_t *pfb = NULL;
>> +    uint8_t *buf = NULL;
>> +
>> +    uint64_t max_mem = last_ram_offset() * 1.5;
>> +
>> +    if (!f->ops->get_prefetch_buffer) {
>> +        fprintf(stderr, "Fault tolerant is not supported by this protocol.\n");
>> +        return EINVAL;
>> +    }
>> +
>> +    size = PFB_SIZE;
>> +    pfb = g_malloc(size);
>> +
>> +    while (true) {
>> +        if (count + TARGET_PAGE_SIZE >= size) {
>> +            if (size*2 > max_mem) {
>> +                fprintf(stderr, "qemu_loadvm_state_ft: warning:" \
>> +                       "Prefetch buffer becomes too large.\n" \
>> +                       "Fault tolerant is unstable when you see this,\n" \
>> +                       "please increase the bandwidth or increase " \
>> +                       "the max down time.\n");
>> +                break;
>> +            }
>> +            size = size * 2;
>> +            buf = g_try_realloc(pfb, size);
>> +            if (!buf) {
>> +                error_report("qemu_loadvm_state_ft: out of memory.\n");
>> +                g_free(pfb);
>> +                return ENOMEM;
>
>You are not handling this error in the caller.  Notice that qemu
>normally 

I am not sure what you mean.
I find my mistake that it should return -ENOMEM and -EINVAL.

>> +            }
>> +
>> +            pfb = buf;
>> +        }
>> +
>> +        done = qemu_get_buffer(f, pfb + count, TARGET_PAGE_SIZE);
>> +
>> +        ret = qemu_file_get_error(f);
>> +        if (ret != 0) {
>> +            g_free(pfb);
>> +            return ret;
>> +        }
>> +
>> +        buf = pfb + count;
>> +        count += done;
>> +        for (i = 0; i < done; i++) {
>> +            if (buf[i] != 0xfe) {
>> +                continue;
>> +            }
>> +            if (buf[i-1] != 0xCa) {
>> +                continue;
>> +            }
>> +            if (buf[i-2] != 0xed) {
>> +                continue;
>> +            }
>> +            if (buf[i-3] == 0xFe) {
>> +                goto out;
>> +            }
>
>Using consistent capitalation here?
>Better way to look for the signature?  

This code looks ugly, but runs fast. :)
And as we are looking for a better solution, this piece of code shall not
be kept in the final version of curling.

> Or,  what happens if it just
>happens that the data contains that magic constant?

THAT IS THE PROBLEM, ft will fail if that happens. I expect better and fast solutions. Any suggestions?
Besides, I have tried the checksum solution which is slow. :(
diff mbox

Patch

diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 0f757fb..f01ff10 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -92,6 +92,7 @@  typedef struct QEMUFileOps {
     QEMURamHookFunc *after_ram_iterate;
     QEMURamHookFunc *hook_ram_load;
     QEMURamSaveFunc *save_page;
+    QEMUFileGetBufferFunc *get_prefetch_buffer;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
index b1aa059..44f23d0 100644
--- a/include/sysemu/sysemu.h
+++ b/include/sysemu/sysemu.h
@@ -81,6 +81,7 @@  void qemu_savevm_state_complete(QEMUFile *f);
 void qemu_savevm_state_cancel(void);
 uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size);
 int qemu_loadvm_state(QEMUFile *f);
+int qemu_loadvm_state_ft(QEMUFile *f);
 
 /* SLIRP */
 void do_info_slirp(Monitor *mon);
diff --git a/migration.c b/migration.c
index d8a9b2d..9be22a4 100644
--- a/migration.c
+++ b/migration.c
@@ -19,6 +19,7 @@ 
 #include "monitor/monitor.h"
 #include "migration/qemu-file.h"
 #include "sysemu/sysemu.h"
+#include "sysemu/cpus.h"
 #include "block/block.h"
 #include "qemu/sockets.h"
 #include "migration/block.h"
@@ -112,13 +113,24 @@  static void process_incoming_migration_co(void *opaque)
 {
     QEMUFile *f = opaque;
     int ret;
+    int count = 0;
 
-    ret = qemu_loadvm_state(f);
-    qemu_fclose(f);
-    if (ret < 0) {
-        fprintf(stderr, "load of migration failed\n");
-        exit(EXIT_FAILURE);
+    if (ft_enabled()) {
+        while (qemu_loadvm_state_ft(f) >= 0) {
+            count++;
+            DPRINTF("incoming count %d\r", count);
+        }
+        qemu_fclose(f);
+        fprintf(stderr, "ft connection lost, launching self..\n");
+    } else {
+        ret = qemu_loadvm_state(f);
+        qemu_fclose(f);
+        if (ret < 0) {
+            fprintf(stderr, "load of migration failed\n");
+            exit(EXIT_FAILURE);
+        }
     }
+    cpu_synchronize_all_post_init();
     qemu_announce_self();
     DPRINTF("successfully loaded vm state\n");
 
diff --git a/savevm.c b/savevm.c
index 6daf690..d5bf153 100644
--- a/savevm.c
+++ b/savevm.c
@@ -52,6 +52,8 @@ 
 #define ARP_PTYPE_IP 0x0800
 #define ARP_OP_REQUEST_REV 0x3
 
+#define PFB_SIZE 0x010000
+
 static int announce_self_create(uint8_t *buf,
 				uint8_t *mac_addr)
 {
@@ -135,6 +137,10 @@  struct QEMUFile {
     unsigned int iovcnt;
 
     int last_error;
+
+    uint8_t *pfb;   /* pfb -> PerFetch Buffer */
+    uint64_t pfb_index;
+    uint64_t pfb_size;
 };
 
 typedef struct QEMUFileStdio
@@ -193,6 +199,25 @@  static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
     return len;
 }
 
+static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
+                                      int64_t pos, int size)
+{
+    QEMUFile *f = opaque;
+
+    if (f->pfb_size - pos <= 0) {
+        return 0;
+    }
+
+    if (f->pfb_size - pos < size) {
+        size = f->pfb_size - pos;
+    }
+
+    memcpy(buf, f->pfb+pos, size);
+
+    return size;
+}
+
+
 static int socket_close(void *opaque)
 {
     QEMUFileSocket *s = opaque;
@@ -440,6 +465,7 @@  QEMUFile *qemu_fdopen(int fd, const char *mode)
 static const QEMUFileOps socket_read_ops = {
     .get_fd =     socket_get_fd,
     .get_buffer = socket_get_buffer,
+    .get_prefetch_buffer = socket_get_prefetch_buffer,
     .close =      socket_close
 };
 
@@ -493,7 +519,7 @@  QEMUFile *qemu_fopen(const char *filename, const char *mode)
     s->stdio_file = fopen(filename, mode);
     if (!s->stdio_file)
         goto fail;
-    
+
     if(mode[0] == 'w') {
         s->file = qemu_fopen_ops(s, &stdio_file_write_ops);
     } else {
@@ -739,6 +765,11 @@  int qemu_fclose(QEMUFile *f)
     if (f->last_error) {
         ret = f->last_error;
     }
+
+    if (f->pfb) {
+        g_free(f->pfb);
+    }
+
     g_free(f);
     return ret;
 }
@@ -822,6 +853,14 @@  void qemu_put_byte(QEMUFile *f, int v)
 
 static void qemu_file_skip(QEMUFile *f, int size)
 {
+    if (f->pfb_index + size <= f->pfb_size) {
+        f->pfb_index += size;
+        return;
+    } else {
+        size -= f->pfb_size - f->pfb_index;
+        f->pfb_index = f->pfb_size;
+    }
+
     if (f->buf_index + size <= f->buf_size) {
         f->buf_index += size;
     }
@@ -831,6 +870,21 @@  static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset)
 {
     int pending;
     int index;
+    int done;
+
+    if (f->ops->get_prefetch_buffer) {
+        if (f->pfb_index + offset < f->pfb_size) {
+            done = f->ops->get_prefetch_buffer(f, buf, f->pfb_index + offset,
+                                               size);
+            if (done == size) {
+                return size;
+            }
+            size -= done;
+            buf  += done;
+        } else {
+            offset -= f->pfb_size - f->pfb_index;
+        }
+    }
 
     assert(!qemu_file_is_writable(f));
 
@@ -875,7 +929,15 @@  int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
 
 static int qemu_peek_byte(QEMUFile *f, int offset)
 {
-    int index = f->buf_index + offset;
+    int index;
+
+    if (f->pfb_index + offset < f->pfb_size) {
+        return f->pfb[f->pfb_index + offset];
+    } else {
+        offset -= f->pfb_size - f->pfb_index;
+    }
+
+    index = f->buf_index + offset;
 
     assert(!qemu_file_is_writable(f));
 
@@ -1851,7 +1913,7 @@  void qemu_savevm_state_begin(QEMUFile *f,
         }
         se->ops->set_params(params, se->opaque);
     }
-    
+
     qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
     qemu_put_be32(f, QEMU_VM_FILE_VERSION);
 
@@ -2294,8 +2356,6 @@  int qemu_loadvm_state(QEMUFile *f)
         }
     }
 
-    cpu_synchronize_all_post_init();
-
     ret = 0;
 
 out:
@@ -2311,6 +2371,89 @@  out:
     return ret;
 }
 
+int qemu_loadvm_state_ft(QEMUFile *f)
+{
+    int ret = 0;
+    int i   = 0;
+    int j   = 0;
+    int done = 0;
+    uint64_t size = 0;
+    uint64_t count = 0;
+    uint8_t *pfb = NULL;
+    uint8_t *buf = NULL;
+
+    uint64_t max_mem = last_ram_offset() * 1.5;
+
+    if (!f->ops->get_prefetch_buffer) {
+        fprintf(stderr, "Fault tolerant is not supported by this protocol.\n");
+        return EINVAL;
+    }
+
+    size = PFB_SIZE;
+    pfb = g_malloc(size);
+
+    while (true) {
+        if (count + TARGET_PAGE_SIZE >= size) {
+            if (size*2 > max_mem) {
+                fprintf(stderr, "qemu_loadvm_state_ft: warning:" \
+                       "Prefetch buffer becomes too large.\n" \
+                       "Fault tolerant is unstable when you see this,\n" \
+                       "please increase the bandwidth or increase " \
+                       "the max down time.\n");
+                break;
+            }
+            size = size * 2;
+            buf = g_try_realloc(pfb, size);
+            if (!buf) {
+                error_report("qemu_loadvm_state_ft: out of memory.\n");
+                g_free(pfb);
+                return ENOMEM;
+            }
+
+            pfb = buf;
+        }
+
+        done = qemu_get_buffer(f, pfb + count, TARGET_PAGE_SIZE);
+
+        ret = qemu_file_get_error(f);
+        if (ret != 0) {
+            g_free(pfb);
+            return ret;
+        }
+
+        buf = pfb + count;
+        count += done;
+        for (i = 0; i < done; i++) {
+            if (buf[i] != 0xfe) {
+                continue;
+            }
+            if (buf[i-1] != 0xCa) {
+                continue;
+            }
+            if (buf[i-2] != 0xed) {
+                continue;
+            }
+            if (buf[i-3] == 0xFe) {
+                goto out;
+            }
+        }
+    }
+ out:
+    if (f->pfb) {
+        free(f->pfb);
+    }
+    f->pfb_size = count;
+    f->pfb_index = 0;
+    f->pfb = pfb;
+
+    ret = qemu_loadvm_state(f);
+
+    /* Skip magic number */
+    qemu_get_be32(f);
+
+    return ret;
+}
+
 static BlockDriverState *find_vmstate_bs(void)
 {
     BlockDriverState *bs = NULL;
@@ -2419,6 +2562,7 @@  void do_savevm(Monitor *mon, const QDict *qdict)
         goto the_end;
     }
     ret = qemu_savevm_state(f);
+    cpu_synchronize_all_post_init();
     vm_state_size = qemu_ftell(f);
     qemu_fclose(f);
     if (ret < 0) {