Patchwork [RFC,12/20] Introduce fault tolerant VM transaction QEMUFile and ft_mode.

login
register
mail settings
Submitter Yoshiaki Tamura
Date April 21, 2010, 5:57 a.m.
Message ID <1271829445-5328-13-git-send-email-tamura.yoshiaki@lab.ntt.co.jp>
Download mbox | patch
Permalink /patch/50620/
State New
Headers show

Comments

Yoshiaki Tamura - April 21, 2010, 5:57 a.m.
This code implements VM transaction protocol.  Like buffered_file, it
sits between savevm and migration layer.  With this architecture, VM
transaction protocol is implemented mostly independent from other
existing code.

Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
---
 Makefile.objs    |    1 +
 ft_transaction.c |  423 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 ft_transaction.h |   57 ++++++++
 migration.c      |    3 +
 4 files changed, 484 insertions(+), 0 deletions(-)
 create mode 100644 ft_transaction.c
 create mode 100644 ft_transaction.h

Patch

diff --git a/Makefile.objs b/Makefile.objs
index b73e2cb..4388fb3 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -78,6 +78,7 @@  common-obj-y += qemu-char.o savevm.o #aio.o
 common-obj-y += msmouse.o ps2.o
 common-obj-y += qdev.o qdev-properties.o
 common-obj-y += qemu-config.o block-migration.o
+common-obj-y += ft_transaction.o
 
 common-obj-$(CONFIG_BRLAPI) += baum.o
 common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o
diff --git a/ft_transaction.c b/ft_transaction.c
new file mode 100644
index 0000000..d0cbc99
--- /dev/null
+++ b/ft_transaction.c
@@ -0,0 +1,423 @@ 
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. 
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.c.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ *  Anthony Liguori        <aliguori@us.ibm.com>
+ */
+
+#include "qemu-common.h"
+#include "hw/hw.h"
+#include "qemu-timer.h"
+#include "sysemu.h"
+#include "qemu-char.h"
+#include "ft_transaction.h"
+
+// #define DEBUG_FT_TRANSACTION
+
+typedef struct QEMUFileFtTranx
+{
+    FtTranxPutBufferFunc *put_buffer;
+    FtTranxPutVectorFunc *put_vector;
+    FtTranxGetBufferFunc *get_buffer;
+    FtTranxGetVectorFunc *get_vector;
+    FtTranxCloseFunc *close;
+    void *opaque;
+    QEMUFile *file;
+    int has_error;
+    int is_sender;
+    int buf_max_size;
+    enum QEMU_VM_TRANSACTION_STATE tranx_state;
+    uint16_t tranx_id;
+    uint32_t seq;
+} QEMUFileFtTranx;
+
+#define IO_BUF_SIZE 32768
+
+#ifdef DEBUG_FT_TRANSACTION
+#define dprintf(fmt, ...) \
+    do { printf("ft_transaction: " fmt, ## __VA_ARGS__); } while (0)
+#else
+#define dprintf(fmt, ...) \
+    do { } while (0)
+#endif
+
+static ssize_t ft_tranx_flush_buffer(void *opaque, void *buf, int size)
+{
+    QEMUFileFtTranx *s = opaque;
+    size_t offset = 0;
+    ssize_t len;
+
+    while (offset < size) {
+        len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset);
+
+        if (len <= 0) {
+            fprintf(stderr, "ft transaction flush buffer failed \n");
+            s->has_error = 1;
+            offset = -EINVAL;
+            break;
+        }
+
+        offset += len;
+    }
+
+    return offset;
+}
+
+static int ft_tranx_send_header(QEMUFileFtTranx *s)
+{
+    int ret = -1;
+
+    dprintf("send header %d\n", s->tranx_state);
+
+    ret = ft_tranx_flush_buffer(s, &s->tranx_state, sizeof(uint16_t));
+    if (ret < 0) {
+        goto out;
+    }
+    ret = ft_tranx_flush_buffer(s, &s->tranx_id, sizeof(uint16_t));    
+
+out:
+    return ret;
+}
+
+static int ft_tranx_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
+{
+    QEMUFileFtTranx *s = opaque;
+    ssize_t ret = -1;
+
+    if (s->has_error) {
+        fprintf(stderr, "flush when error, bailing\n");
+        return -EINVAL;
+    }
+
+    ret = ft_tranx_send_header(s);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq));
+    if (ret < 0) {
+        goto out;
+    }
+    s->seq++;
+
+    ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t));
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = ft_tranx_flush_buffer(s, (uint8_t *)buf, size);
+
+out:
+    return ret;
+}
+
+static int ft_tranx_put_vector(void *opaque, struct iovec *vector, int64_t pos, int count)
+{
+    QEMUFileFtTranx *s = opaque;
+    ssize_t ret = -1;
+    int i;
+    uint32_t size = 0;
+
+    dprintf("putting %d vectors at %" PRId64 "\n", count, pos);
+
+    if (s->has_error) {
+        dprintf("put vector when error, bailing\n");
+        return -EINVAL;
+    }
+
+    ret = ft_tranx_send_header(s);
+    if (ret < 0) {
+        return ret;
+    }
+
+    ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq));
+    if (ret < 0) {
+        return ret;
+    }
+    s->seq++;
+
+    for (i = 0; i < count; i++)
+        size += vector[i].iov_len;
+
+    ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t));
+    if (ret < 0) {
+        return ret;
+    }
+
+    while (count > 0) {
+        /* 
+         * It will continue calling put_vector even if count > IOV_MAX.
+         */
+        ret = s->put_vector(s->opaque, vector,
+            ((count>IOV_MAX)?IOV_MAX:count));
+
+        if (ret <= 0) {
+            fprintf(stderr, "ft transaction putting vector\n");
+            s->has_error = 1;
+            return ret;
+        }
+
+        for (i = 0; i < count; i++) {
+            /* ret represents -(length of remaining data). */
+            ret -= vector[i].iov_len;
+            if (ret < 0) {
+                vector[i].iov_base += (ret + vector[i].iov_len);
+                vector[i].iov_len = -ret;
+                vector = &vector[i];
+                break;
+            }
+        }
+        count -= i;
+    }
+
+    return 0;
+}
+
+static inline int ft_tranx_fill_buffer(void *opaque, void *buf, int size)
+{
+    QEMUFileFtTranx *s = opaque;
+    size_t offset = 0;
+    ssize_t len;
+
+    while (ft_mode != FT_ERROR && offset < size) {
+        len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
+                            0, size - offset);
+        if (len <= 0) {
+            fprintf(stderr, "ft_tranx fill buffer failed\n");
+            s->has_error = 1;
+            return -EINVAL;
+        }
+        offset += len;
+    }
+    return 0;
+}
+
+/* return QEMU_VM_TRANSACTION type */
+static int ft_tranx_get_next(QEMUFileFtTranx *s)
+{
+    uint16_t header;
+    uint16_t tranx_id;
+
+    if ((ft_tranx_fill_buffer(s, &header, sizeof(header)) < 0) ||
+        (ft_tranx_fill_buffer(s, &tranx_id, sizeof(tranx_id)) < 0)) {
+        return QEMU_VM_TRANSACTION_CANCEL;
+    }
+
+    s->tranx_id = tranx_id;
+
+    return header;
+}
+
+static int ft_tranx_get_buffer(void *opaque, uint8_t *buf,
+                               int64_t pos, int size)
+{
+    QEMUFileFtTranx *s = opaque;
+    QEMUFile *f = s->file;
+    uint32_t payload_len;
+    int ret = -1, offset;
+
+    /* get transaction header*/
+    ret = ft_tranx_get_next(s);
+    switch (ret) {
+    case QEMU_VM_TRANSACTION_BEGIN:
+        for (offset = 0;;) {
+            ret = ft_tranx_get_next(s);
+            /* CONTINUE or COMMIT must come afer BEGIN */
+            if ((ret != QEMU_VM_TRANSACTION_CONTINUE) &&
+                (ret != QEMU_VM_TRANSACTION_COMMIT)) {
+                goto error_out;
+            }
+
+            if (ft_tranx_fill_buffer(s, &s->seq, sizeof(s->seq)) < 0) {
+                goto error_out;
+            }
+
+            if (ret == QEMU_VM_TRANSACTION_COMMIT) {
+                ret = offset;
+                dprintf("QEMU_VM_TRANSACTION_COMMIT %d\n", offset);
+                break;
+            }
+
+            if (ft_tranx_fill_buffer(s, &payload_len,
+                                     sizeof(payload_len)) < 0) {
+                goto error_out;
+            }
+  
+            /* Extend QEMUFile buf if there weren't enough space. */
+            if (payload_len > (s->buf_max_size - offset)) {
+                s->buf_max_size += (payload_len - (s->buf_max_size - offset));
+                buf = qemu_realloc_buffer(f, s->buf_max_size);
+            }
+
+            if (ft_tranx_fill_buffer(s, buf + offset, payload_len) < 0) {
+                goto error_out;
+            }
+            offset += payload_len;
+        }
+
+        s->tranx_state = QEMU_VM_TRANSACTION_ACK;
+        if (ft_tranx_send_header(s) < 0) {
+            goto error_out;
+        }
+        goto out;
+
+    case QEMU_VM_TRANSACTION_ATOMIC:
+        /* not implemented yet */
+        fprintf(stderr, "QEMU_VM_TRANSACTION_ATOMIC not implemented. %d\n",
+                ret);
+        goto error_out;
+
+    case QEMU_VM_TRANSACTION_CANCEL:
+        dprintf("ft transaction canceled %d\n", ret);
+        ret = -1;
+        ft_mode = FT_OFF;
+        goto out;
+
+    default:
+        fprintf(stderr, "unknown QEMU_VM_TRANSACTION_STATE %d\n", ret);
+    }
+
+error_out:
+    ret = -1;
+    ft_mode = FT_ERROR;
+out:
+    return ret;
+}
+
+static int ft_tranx_close(void *opaque)
+{
+    QEMUFileFtTranx *s = opaque;
+    int ret = -1;
+
+    dprintf("closing\n");
+    ret = s->close(s->opaque);
+    qemu_free(s);
+
+    return ret;
+}
+
+int qemu_ft_tranx_begin(void *opaque)
+{
+    QEMUFileFtTranx *s = opaque;
+    int ret = -1;
+    s->seq = 0;
+
+    if (!s->is_sender && s->tranx_state == QEMU_VM_TRANSACTION_INIT) {
+        /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction.  */
+        s->tranx_state = QEMU_VM_TRANSACTION_ACK;
+        ret = ft_tranx_send_header(s);
+        goto out;
+    } 
+
+    if (s->is_sender) {
+        if (s->tranx_state == QEMU_VM_TRANSACTION_INIT) {
+            ret = ft_tranx_get_next(s);
+            if (ret != QEMU_VM_TRANSACTION_ACK) {
+                fprintf(stderr, "ft_transaction receiving ack failed\n");
+                ret = -1;
+                goto out;
+            }
+        }
+
+        s->tranx_state = QEMU_VM_TRANSACTION_BEGIN;
+        if ((ret = ft_tranx_send_header(s)) < 0) {
+            goto out;
+        }
+
+        s->tranx_state = QEMU_VM_TRANSACTION_CONTINUE;
+        ret = 0;
+    }
+
+out:
+    return ret;
+}
+
+int qemu_ft_tranx_commit(void *opaque)
+{
+    QEMUFileFtTranx *s = opaque;
+    int ret = -1;
+
+    if (!s->is_sender) {
+        s->tranx_state = QEMU_VM_TRANSACTION_ACK;
+        ret = ft_tranx_send_header(s);
+    } else {
+        /* flush buf before sending COMMIT */
+        qemu_fflush(s->file);
+
+        s->tranx_state = QEMU_VM_TRANSACTION_COMMIT;
+        ret = ft_tranx_send_header(s);
+        if (ret < 0) {
+            return ret;
+        }
+
+        ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq));
+        if (ret < 0) {
+            return ret;
+        }
+
+        /* FIX ME: can we remove this if statement? */
+        if (ret >= 0) {
+            ret = ft_tranx_get_next(s);
+            if (ret != QEMU_VM_TRANSACTION_ACK) {
+                fprintf(stderr, "ft_transaction receiving ack failed\n");
+                return -1;
+            }
+        }
+
+        s->tranx_id++;
+    }
+
+    return ret;
+}
+
+int qemu_ft_tranx_cancel(void *opaque)
+{
+    QEMUFileFtTranx *s = opaque;
+    int ret = -1;
+
+    if (s->is_sender) {
+        s->tranx_state = QEMU_VM_TRANSACTION_CANCEL;
+        if ((ret = ft_tranx_send_header(s)) < 0) {
+            fprintf(stderr, "ft cancel failed\n");
+        }
+    }
+    
+    return ret;
+}
+
+QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque,
+                                  FtTranxPutBufferFunc *put_buffer,
+                                  FtTranxPutVectorFunc *put_vector,
+                                  FtTranxGetBufferFunc *get_buffer,
+                                  FtTranxGetVectorFunc *get_vector,
+                                  FtTranxCloseFunc *close,
+                                  int is_sender)
+{
+    QEMUFileFtTranx *s;
+
+    s = qemu_mallocz(sizeof(*s));
+
+    s->opaque = opaque;
+    s->put_buffer = put_buffer;
+    s->put_vector = put_vector;
+    s->get_buffer = get_buffer;
+    s->get_vector = get_vector;
+    s->close = close;
+    s->buf_max_size = IO_BUF_SIZE;
+    s->is_sender = is_sender;
+    s->tranx_id = 0;
+    s->seq = 0;
+
+    s->file = qemu_fopen_ops(s, ft_tranx_put_buffer, ft_tranx_put_vector,
+                             ft_tranx_get_buffer, NULL, ft_tranx_close,
+                             NULL, NULL, NULL);
+
+    return s->file;
+}
diff --git a/ft_transaction.h b/ft_transaction.h
new file mode 100644
index 0000000..3f7cbd2
--- /dev/null
+++ b/ft_transaction.h
@@ -0,0 +1,57 @@ 
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. 
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.h.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ *  Anthony Liguori        <aliguori@us.ibm.com>
+ */
+
+#ifndef QEMU_FT_TRANSACTION_FILE_H
+#define QEMU_FT_TRANSACTION_FILE_H
+
+#include "hw/hw.h"
+
+enum QEMU_VM_TRANSACTION_STATE {
+    QEMU_VM_TRANSACTION_INIT,
+    QEMU_VM_TRANSACTION_BEGIN,
+    QEMU_VM_TRANSACTION_CONTINUE,
+    QEMU_VM_TRANSACTION_COMMIT,
+    QEMU_VM_TRANSACTION_CANCEL,
+    QEMU_VM_TRANSACTION_ATOMIC,
+    QEMU_VM_TRANSACTION_ACK,
+    QEMU_VM_TRANSACTION_NACK,
+};
+
+enum FT_MODE {
+    FT_OFF,
+    FT_INIT,
+    FT_TRANSACTION,
+    FT_ERROR,
+};
+extern enum FT_MODE ft_mode;
+
+typedef ssize_t (FtTranxPutBufferFunc)(void *opaque, const void *data, size_t size);
+typedef ssize_t (FtTranxPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt);
+typedef QEMUFileGetBufferFunc FtTranxGetBufferFunc;
+typedef QEMUFileGetVectorFunc FtTranxGetVectorFunc;
+typedef int (FtTranxCloseFunc)(void *opaque);
+
+int qemu_ft_tranx_begin(void *opaque);
+int qemu_ft_tranx_commit(void *opaque);
+int qemu_ft_tranx_cancel(void *opaque);
+
+QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque, 
+                                  FtTranxPutBufferFunc *put_buffer,
+                                  FtTranxPutVectorFunc *put_vector,
+                                  FtTranxGetBufferFunc *get_buffer,
+                                  FtTranxGetVectorFunc *get_vector,
+                                  FtTranxCloseFunc *close,
+                                  int is_sender);
+
+#endif
diff --git a/migration.c b/migration.c
index 5d238f5..4eed0b7 100644
--- a/migration.c
+++ b/migration.c
@@ -15,6 +15,7 @@ 
 #include "migration.h"
 #include "monitor.h"
 #include "buffered_file.h"
+#include "ft_transaction.h"
 #include "sysemu.h"
 #include "block.h"
 #include "qemu_socket.h"
@@ -31,6 +32,8 @@ 
     do { } while (0)
 #endif
 
+enum FT_MODE ft_mode = FT_OFF;
+
 /* Migration speed throttling */
 static uint32_t max_throttle = (32 << 20);