diff mbox

[07/18] Introduce fault tolerant VM transaction QEMUFile and ft_mode.

Message ID 1298468927-19193-8-git-send-email-tamura.yoshiaki@lab.ntt.co.jp
State New
Headers show

Commit Message

Yoshiaki Tamura Feb. 23, 2011, 1:48 p.m. UTC
This code implements VM transaction protocol.  Like buffered_file, it
sits between savevm and migration layer.  With this architecture, VM
transaction protocol is implemented mostly independent from other
existing code.

Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
---
 Makefile.objs   |    1 +
 ft_trans_file.c |  624 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 ft_trans_file.h |   72 +++++++
 migration.c     |    3 +
 trace-events    |   15 ++
 5 files changed, 715 insertions(+), 0 deletions(-)
 create mode 100644 ft_trans_file.c
 create mode 100644 ft_trans_file.h

Comments

Juan Quintela Feb. 23, 2011, 10:16 p.m. UTC | #1
Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> wrote:
> This code implements VM transaction protocol.  Like buffered_file, it
> sits between savevm and migration layer.  With this architecture, VM
> transaction protocol is implemented mostly independent from other
> existing code.

Could you explain what is the difference with buffered_file.c?
I am fixing problems on buffered_file, and having something that copies
lot of code from there makes me nervous.

> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);

Can we get some sharing here?
typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);

There are not so much types for a write function that the 1st element is
one opaque :p

Later, Juan.
Yoshiaki Tamura Feb. 24, 2011, 3:54 a.m. UTC | #2
Juan Quintela wrote:
> Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>  wrote:
>> This code implements VM transaction protocol.  Like buffered_file, it
>> sits between savevm and migration layer.  With this architecture, VM
>> transaction protocol is implemented mostly independent from other
>> existing code.
>
> Could you explain what is the difference with buffered_file.c?
> I am fixing problems on buffered_file, and having something that copies
> lot of code from there makes me nervous.

The objective is different:

buffered_file buffers data for transmission control.
ft_trans_file adds headers to the stream, and controls the transaction between 
sender and receiver.

Although ft_trans_file sometimes buffers date, but it's not the main objective.
If you're fixing the problems on buffered_file, I'll keep eyes on them.

>> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);
>
> Can we get some sharing here?
> typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
>
> There are not so much types for a write function that the 1st element is
> one opaque :p

You're right, but I want to keep ft_trans_file independent of buffered_file at 
this point.  Once Kemari gets merged, I'm happy to work with you to fix the 
problems on buffered_file and ft_trans_file, and refactoring them.

Thanks,

Yoshi

>
> Later, Juan.
>
Juan Quintela Feb. 24, 2011, 9:30 a.m. UTC | #3
[ trimming cc to kvm & qemu lists]

Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> wrote:
> Juan Quintela wrote:
>> Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>  wrote:
>>> This code implements VM transaction protocol.  Like buffered_file, it
>>> sits between savevm and migration layer.  With this architecture, VM
>>> transaction protocol is implemented mostly independent from other
>>> existing code.
>>
>> Could you explain what is the difference with buffered_file.c?
>> I am fixing problems on buffered_file, and having something that copies
>> lot of code from there makes me nervous.
>
> The objective is different:
>
> buffered_file buffers data for transmission control.
> ft_trans_file adds headers to the stream, and controls the transaction
> between sender and receiver.
>
> Although ft_trans_file sometimes buffers date, but it's not the main objective.
> If you're fixing the problems on buffered_file, I'll keep eyes on them.
>
>>> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);
>>
>> Can we get some sharing here?
>> typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
>>
>> There are not so much types for a write function that the 1st element is
>> one opaque :p
>
> You're right, but I want to keep ft_trans_file independent of
> buffered_file at this point.  Once Kemari gets merged, I'm happy to
> work with you to fix the problems on buffered_file and ft_trans_file,
> and refactoring them.

My goal is getting its own thread for migration on 0.15, that
basically means that we can do rm buffered_file.c.  I guess that
something similar could happen for kemari.

But for now, this is just the start + handwaving, once I start doing the
work I will told you.

Later, Juan.
Yoshiaki Tamura Feb. 24, 2011, 9:44 a.m. UTC | #4
2011/2/24 Juan Quintela <quintela@redhat.com>:
>
> [ trimming cc to kvm & qemu lists]
>
> Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> wrote:
>> Juan Quintela wrote:
>>> Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>  wrote:
>>>> This code implements VM transaction protocol.  Like buffered_file, it
>>>> sits between savevm and migration layer.  With this architecture, VM
>>>> transaction protocol is implemented mostly independent from other
>>>> existing code.
>>>
>>> Could you explain what is the difference with buffered_file.c?
>>> I am fixing problems on buffered_file, and having something that copies
>>> lot of code from there makes me nervous.
>>
>> The objective is different:
>>
>> buffered_file buffers data for transmission control.
>> ft_trans_file adds headers to the stream, and controls the transaction
>> between sender and receiver.
>>
>> Although ft_trans_file sometimes buffers date, but it's not the main objective.
>> If you're fixing the problems on buffered_file, I'll keep eyes on them.
>>
>>>> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);
>>>
>>> Can we get some sharing here?
>>> typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
>>>
>>> There are not so much types for a write function that the 1st element is
>>> one opaque :p
>>
>> You're right, but I want to keep ft_trans_file independent of
>> buffered_file at this point.  Once Kemari gets merged, I'm happy to
>> work with you to fix the problems on buffered_file and ft_trans_file,
>> and refactoring them.
>
> My goal is getting its own thread for migration on 0.15, that
> basically means that we can do rm buffered_file.c.  I guess that
> something similar could happen for kemari.

That means both gets initiated by it's own thread, not like
current poll based.  I'm still skeptical whether Anthony agrees,
but I'll keep it in my mind.

> But for now, this is just the start + handwaving, once I start doing the
> work I will told you.

Yes, please.

Yoshi

>
> Later, Juan.
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
ya su March 9, 2011, 8:01 a.m. UTC | #5
Juan:

     It's especailly important for ft to be a standalone thread, as it
may cause monitor to  be blocked by network problems.  what's your
schedule, maybe I can help some.

Yoshi:

     in the following code:

+
+    s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer,
+                             ft_trans_close, ft_trans_rate_limit,
+                             ft_trans_set_rate_limit, NULL);
+
+    return s->file;
+}

    I think you should register ft_trans_get_rate_limit function,
otherwise it will not transfer any block data at stage 2 in
block_save_live function:

    if (stage == 2) {
        /* control the rate of transfer */
        while ((block_mig_state.submitted +
                block_mig_state.read_done) * BLOCK_SIZE <
               qemu_file_get_rate_limit(f)) {

     qemu_file_get_rate_limit will return 0, then it will not proceed
to copy dirty block data.

     FYI.

Green.


2011/2/24 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>:
> 2011/2/24 Juan Quintela <quintela@redhat.com>:
>>
>> [ trimming cc to kvm & qemu lists]
>>
>> Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> wrote:
>>> Juan Quintela wrote:
>>>> Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp>  wrote:
>>>>> This code implements VM transaction protocol.  Like buffered_file, it
>>>>> sits between savevm and migration layer.  With this architecture, VM
>>>>> transaction protocol is implemented mostly independent from other
>>>>> existing code.
>>>>
>>>> Could you explain what is the difference with buffered_file.c?
>>>> I am fixing problems on buffered_file, and having something that copies
>>>> lot of code from there makes me nervous.
>>>
>>> The objective is different:
>>>
>>> buffered_file buffers data for transmission control.
>>> ft_trans_file adds headers to the stream, and controls the transaction
>>> between sender and receiver.
>>>
>>> Although ft_trans_file sometimes buffers date, but it's not the main objective.
>>> If you're fixing the problems on buffered_file, I'll keep eyes on them.
>>>
>>>>> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);
>>>>
>>>> Can we get some sharing here?
>>>> typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
>>>>
>>>> There are not so much types for a write function that the 1st element is
>>>> one opaque :p
>>>
>>> You're right, but I want to keep ft_trans_file independent of
>>> buffered_file at this point.  Once Kemari gets merged, I'm happy to
>>> work with you to fix the problems on buffered_file and ft_trans_file,
>>> and refactoring them.
>>
>> My goal is getting its own thread for migration on 0.15, that
>> basically means that we can do rm buffered_file.c.  I guess that
>> something similar could happen for kemari.
>
> That means both gets initiated by it's own thread, not like
> current poll based.  I'm still skeptical whether Anthony agrees,
> but I'll keep it in my mind.
>
>> But for now, this is just the start + handwaving, once I start doing the
>> work I will told you.
>
> Yes, please.
>
> Yoshi
>
>>
>> Later, Juan.
>> --
>> To unsubscribe from this list: send the line "unsubscribe kvm" in
>> the body of a message to majordomo@vger.kernel.org
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index c144df1..8856160 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -100,6 +100,7 @@  common-obj-y += msmouse.o ps2.o
 common-obj-y += qdev.o qdev-properties.o
 common-obj-y += block-migration.o
 common-obj-y += pflib.o
+common-obj-y += ft_trans_file.o
 
 common-obj-$(CONFIG_BRLAPI) += baum.o
 common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o
diff --git a/ft_trans_file.c b/ft_trans_file.c
new file mode 100644
index 0000000..2b42b95
--- /dev/null
+++ b/ft_trans_file.c
@@ -0,0 +1,624 @@ 
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.c.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ *  Anthony Liguori        <aliguori@us.ibm.com>
+ */
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include "hw/hw.h"
+#include "qemu-timer.h"
+#include "sysemu.h"
+#include "qemu-char.h"
+#include "trace.h"
+#include "ft_trans_file.h"
+
+typedef struct FtTransHdr
+{
+    uint16_t cmd;
+    uint16_t id;
+    uint32_t seq;
+    uint32_t payload_len;
+} FtTransHdr;
+
+typedef struct QEMUFileFtTrans
+{
+    FtTransPutBufferFunc *put_buffer;
+    FtTransGetBufferFunc *get_buffer;
+    FtTransPutReadyFunc *put_ready;
+    FtTransGetReadyFunc *get_ready;
+    FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
+    FtTransCloseFunc *close;
+    void *opaque;
+    QEMUFile *file;
+
+    enum QEMU_VM_TRANSACTION_STATE state;
+    uint32_t seq;
+    uint16_t id;
+
+    int has_error;
+
+    bool freeze_output;
+    bool freeze_input;
+    bool rate_limit;
+    bool is_sender;
+    bool is_payload;
+
+    uint8_t *buf;
+    size_t buf_max_size;
+    size_t put_offset;
+    size_t get_offset;
+
+    FtTransHdr header;
+    size_t header_offset;
+} QEMUFileFtTrans;
+
+#define IO_BUF_SIZE 32768
+
+static void ft_trans_append(QEMUFileFtTrans *s,
+                            const uint8_t *buf, size_t size)
+{
+    if (size > (s->buf_max_size - s->put_offset)) {
+        trace_ft_trans_realloc(s->buf_max_size, size + 1024);
+        s->buf_max_size += size + 1024;
+        s->buf = qemu_realloc(s->buf, s->buf_max_size);
+    }
+
+    trace_ft_trans_append(size);
+    memcpy(s->buf + s->put_offset, buf, size);
+    s->put_offset += size;
+}
+
+static void ft_trans_flush(QEMUFileFtTrans *s)
+{
+    size_t offset = 0;
+
+    if (s->has_error) {
+        error_report("flush when error %d, bailing", s->has_error);
+        return;
+    }
+
+    while (offset < s->put_offset) {
+        ssize_t ret;
+
+        ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - offset);
+        if (ret == -EAGAIN) {
+            break;
+        }
+
+        if (ret <= 0) {
+            error_report("error flushing data, %s", strerror(errno));
+            s->has_error = FT_TRANS_ERR_FLUSH;
+            break;
+        } else {
+            offset += ret;
+        }
+    }
+
+    trace_ft_trans_flush(offset, s->put_offset);
+    memmove(s->buf, s->buf + offset, s->put_offset - offset);
+    s->put_offset -= offset;
+    s->freeze_output = !!s->put_offset;
+}
+
+static ssize_t ft_trans_put(void *opaque, void *buf, int size)
+{
+    QEMUFileFtTrans *s = opaque;
+    size_t offset = 0;
+    ssize_t len;
+
+    /* flush buffered data before putting next */
+    if (s->put_offset) {
+        ft_trans_flush(s);
+    }
+
+    while (!s->freeze_output && offset < size) {
+        len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset);
+
+        if (len == -EAGAIN) {
+            trace_ft_trans_freeze_output();
+            s->freeze_output = 1;
+            break;
+        }
+
+        if (len <= 0) {
+            error_report("putting data failed, %s", strerror(errno));
+            s->has_error = 1;
+            offset = -EINVAL;
+            break;
+        }
+
+        offset += len;
+    }
+
+    if (s->freeze_output) {
+        ft_trans_append(s, buf + offset, size - offset);
+        offset = size;
+    }
+
+    return offset;
+}
+
+static int ft_trans_send_header(QEMUFileFtTrans *s,
+                                enum QEMU_VM_TRANSACTION_STATE state,
+                                uint32_t payload_len)
+{
+    int ret;
+    FtTransHdr *hdr = &s->header;
+
+    trace_ft_trans_send_header(state);
+
+    hdr->cmd = s->state = state;
+    hdr->id = s->id;
+    hdr->seq = s->seq;
+    hdr->payload_len = payload_len;
+
+    ret = ft_trans_put(s, hdr, sizeof(*hdr));
+    if (ret < 0) {
+        error_report("send header failed");
+        s->has_error = FT_TRANS_ERR_SEND_HDR;
+    }
+
+    return ret;
+}
+
+static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
+{
+    QEMUFileFtTrans *s = opaque;
+    ssize_t ret;
+
+    trace_ft_trans_put_buffer(size, pos);
+
+    if (s->has_error) {
+        error_report("put_buffer when error %d, bailing", s->has_error);
+        return -EINVAL;
+    }
+
+    /* assuming qemu_file_put_notify() is calling */
+    if (pos == 0 && size == 0) {
+        trace_ft_trans_put_ready();
+        ft_trans_flush(s);
+
+        if (!s->freeze_output) {
+            trace_ft_trans_cb(s->put_ready);
+            ret = s->put_ready();
+        }
+
+        ret = 0;
+        goto out;
+    }
+
+    ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = ft_trans_put(s, (uint8_t *)buf, size);
+    if (ret < 0) {
+        error_report("send palyload failed");
+        s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
+        goto out;
+    }
+
+    s->seq++;
+
+out:
+    return ret;
+}
+
+static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
+{
+    QEMUFileFtTrans *s = opaque;
+    size_t offset = 0;
+    ssize_t len;
+
+    s->freeze_input = 0;
+
+    while (offset < size) {
+        len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
+                            0, size - offset);
+        if (len == -EAGAIN) {
+            trace_ft_trans_freeze_input();
+            s->freeze_input = 1;
+            break;
+        }
+
+        if (len <= 0) {
+            error_report("fill buffer failed, %s", strerror(errno));
+            s->has_error = 1;
+            return -EINVAL;
+        }
+
+        offset += len;
+    }
+
+    return offset;
+}
+
+static int ft_trans_recv_header(QEMUFileFtTrans *s)
+{
+    int ret;
+    char *buf = (char *)&s->header + s->header_offset;
+
+    ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - s->header_offset);
+    if (ret < 0) {
+        error_report("recv header failed");
+        s->has_error = FT_TRANS_ERR_RECV_HDR;
+        goto out;
+    }
+
+    s->header_offset += ret;
+    if (s->header_offset == sizeof(FtTransHdr)) {
+        trace_ft_trans_recv_header(s->header.cmd);
+        s->state = s->header.cmd;
+        s->header_offset = 0;
+
+        if (!s->is_sender) {
+            s->id = s->header.id;
+            s->seq = s->header.seq;
+        }
+    }
+
+out:
+    return ret;
+}
+
+static int ft_trans_recv_payload(QEMUFileFtTrans *s)
+{
+    QEMUFile *f = s->file;
+    int ret = -1;
+
+    /* extend QEMUFile buf if there weren't enough space */
+    if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
+        s->buf_max_size += (s->header.payload_len -
+                            (s->buf_max_size - s->get_offset));
+        s->buf = qemu_realloc_buffer(f, s->buf_max_size);
+    }
+
+    ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
+                               s->header.payload_len);
+    if (ret < 0) {
+        error_report("recv payload failed");
+        s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
+        goto out;
+    }
+
+    trace_ft_trans_recv_payload(ret, s->header.payload_len, s->get_offset);
+
+    s->header.payload_len -= ret;
+    s->get_offset += ret;
+    s->is_payload = !!s->header.payload_len;
+
+out:
+    return ret;
+}
+
+static int ft_trans_recv(QEMUFileFtTrans *s)
+{
+    int ret;
+
+    /* get payload and return */
+    if (s->is_payload) {
+        ret = ft_trans_recv_payload(s);
+        goto out;
+    }
+
+    ret = ft_trans_recv_header(s);
+    if (ret < 0 || s->freeze_input) {
+        goto out;
+    }
+
+    switch (s->state) {
+    case QEMU_VM_TRANSACTION_BEGIN:
+        /* CONTINUE or COMMIT should come shortly */
+        s->is_payload = 0;
+        break;
+
+    case QEMU_VM_TRANSACTION_CONTINUE:
+        /* get payload */
+        s->is_payload = 1;
+        break;
+
+    case QEMU_VM_TRANSACTION_COMMIT:
+        ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
+        if (ret < 0) {
+            goto out;
+        }
+
+        trace_ft_trans_cb(s->get_ready);
+        ret = s->get_ready(s->opaque);
+        if (ret < 0) {
+            goto out;
+        }
+
+        qemu_clear_buffer(s->file);
+        s->get_offset = 0;
+        s->is_payload = 0;
+
+        break;
+
+    case QEMU_VM_TRANSACTION_ATOMIC:
+        /* not implemented yet */
+        error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
+                ret);
+        break;
+
+    case QEMU_VM_TRANSACTION_CANCEL:
+        /* return -EINVAL until migrate cancel on recevier side is supported */
+        ret = -EINVAL;
+        break;
+
+    default:
+        error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
+        s->has_error = FT_TRANS_ERR_STATE_INVALID;
+        ret = -EINVAL;
+    }
+
+out:
+    return ret;
+}
+
+static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
+                               int64_t pos, int size)
+{
+    QEMUFileFtTrans *s = opaque;
+    int ret;
+
+    if (s->has_error) {
+        error_report("get_buffer when error %d, bailing", s->has_error);
+        return -EINVAL;
+    }
+
+    /* assuming qemu_file_get_notify() is calling */
+    if (pos == 0 && size == 0) {
+        trace_ft_trans_get_ready();
+        s->freeze_input = 0;
+
+        /* sender should be waiting for ACK */
+        if (s->is_sender) {
+            ret = ft_trans_recv_header(s);
+            if (s->freeze_input) {
+                ret = 0;
+                goto out;
+            }
+            if (ret < 0) {
+                error_report("recv ack failed");
+                goto out;
+            }
+
+            if (s->state != QEMU_VM_TRANSACTION_ACK) {
+                error_report("recv invalid state %d", s->state);
+                s->has_error = FT_TRANS_ERR_STATE_INVALID;
+                ret = -EINVAL;
+                goto out;
+            }
+
+            trace_ft_trans_cb(s->get_ready);
+            ret = s->get_ready(s->opaque);
+            if (ret < 0) {
+                goto out;
+            }
+
+            /* proceed trans id */
+            s->id++;
+
+            return 0;
+        }
+
+        /* set QEMUFile buf at beginning */
+        if (!s->buf) {
+            s->buf = buf;
+        }
+
+        ret = ft_trans_recv(s);
+        goto out;
+    }
+
+    ret = s->get_offset;
+
+out:
+    return ret;
+}
+
+static int ft_trans_close(void *opaque)
+{
+    QEMUFileFtTrans *s = opaque;
+    int ret;
+
+    trace_ft_trans_close();
+    ret = s->close(s->opaque);
+    if (s->is_sender) {
+        qemu_free(s->buf);
+    }
+    qemu_free(s);
+
+    return ret;
+}
+
+static int ft_trans_rate_limit(void *opaque)
+{
+    QEMUFileFtTrans *s = opaque;
+
+    if (s->has_error) {
+        return 0;
+    }
+
+    if (s->rate_limit && s->freeze_output) {
+        return 1;
+    }
+
+    return 0;
+}
+
+static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
+{
+    QEMUFileFtTrans *s = opaque;
+
+    if (s->has_error) {
+        goto out;
+    }
+
+    s->rate_limit = !!new_rate;
+
+out:
+    return s->rate_limit;
+}
+
+int ft_trans_begin(void *opaque)
+{
+    QEMUFileFtTrans *s = opaque;
+    int ret;
+    s->seq = 0;
+
+    /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
+    if (!s->is_sender) {
+        if (s->state != QEMU_VM_TRANSACTION_INIT) {
+            error_report("invalid state %d", s->state);
+            s->has_error = FT_TRANS_ERR_STATE_INVALID;
+            ret = -EINVAL;
+        }
+
+        ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
+        goto out;
+    }
+
+    /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */
+    if (s->state == QEMU_VM_TRANSACTION_INIT) {
+retry:
+        ret = ft_trans_recv_header(s);
+        if (s->freeze_input) {
+            goto retry;
+        }
+        if (ret < 0) {
+            error_report("recv ack failed");
+            goto out;
+        }
+
+        if (s->state != QEMU_VM_TRANSACTION_ACK) {
+            error_report("recv invalid state %d", s->state);
+            s->has_error = FT_TRANS_ERR_STATE_INVALID;
+            ret = -EINVAL;
+            goto out;
+        }
+    }
+
+    ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
+    if (ret < 0) {
+        goto out;
+    }
+
+    s->state = QEMU_VM_TRANSACTION_CONTINUE;
+
+out:
+    return ret;
+}
+
+int ft_trans_commit(void *opaque)
+{
+    QEMUFileFtTrans *s = opaque;
+    int ret;
+
+    if (!s->is_sender) {
+        ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
+        goto out;
+    }
+
+    /* sender should flush buf before sending COMMIT */
+    qemu_fflush(s->file);
+
+    ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
+    if (ret < 0) {
+        goto out;
+    }
+
+    while (!s->has_error && s->put_offset) {
+        ft_trans_flush(s);
+        if (s->freeze_output) {
+            s->wait_for_unfreeze(s);
+        }
+    }
+
+    if (s->has_error) {
+        ret = -EINVAL;
+        goto out;
+    }
+
+    ret = ft_trans_recv_header(s);
+    if (s->freeze_input) {
+        ret = -EAGAIN;
+        goto out;
+    }
+    if (ret < 0) {
+        error_report("recv ack failed");
+        goto out;
+    }
+
+    if (s->state != QEMU_VM_TRANSACTION_ACK) {
+        error_report("recv invalid state %d", s->state);
+        s->has_error = FT_TRANS_ERR_STATE_INVALID;
+        ret = -EINVAL;
+        goto out;
+    }
+
+    s->id++;
+    ret = 0;
+
+out:
+    return ret;
+}
+
+int ft_trans_cancel(void *opaque)
+{
+    QEMUFileFtTrans *s = opaque;
+
+    /* invalid until migrate cancel on recevier side is supported */
+    if (!s->is_sender) {
+        return -EINVAL;
+    }
+
+    return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
+}
+
+QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
+                                  FtTransPutBufferFunc *put_buffer,
+                                  FtTransGetBufferFunc *get_buffer,
+                                  FtTransPutReadyFunc *put_ready,
+                                  FtTransGetReadyFunc *get_ready,
+                                  FtTransWaitForUnfreezeFunc *wait_for_unfreeze,
+                                  FtTransCloseFunc *close,
+                                  bool is_sender)
+{
+    QEMUFileFtTrans *s;
+
+    s = qemu_mallocz(sizeof(*s));
+
+    s->opaque = opaque;
+    s->put_buffer = put_buffer;
+    s->get_buffer = get_buffer;
+    s->put_ready = put_ready;
+    s->get_ready = get_ready;
+    s->wait_for_unfreeze = wait_for_unfreeze;
+    s->close = close;
+    s->is_sender = is_sender;
+    s->id = 0;
+    s->seq = 0;
+    s->rate_limit = 1;
+
+    if (!s->is_sender) {
+        s->buf_max_size = IO_BUF_SIZE;
+    }
+
+    s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer,
+                             ft_trans_close, ft_trans_rate_limit,
+                             ft_trans_set_rate_limit, NULL);
+
+    return s->file;
+}
diff --git a/ft_trans_file.h b/ft_trans_file.h
new file mode 100644
index 0000000..5ca6b53
--- /dev/null
+++ b/ft_trans_file.h
@@ -0,0 +1,72 @@ 
+/*
+ * Fault tolerant VM transaction QEMUFile
+ *
+ * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * This source code is based on buffered_file.h.
+ * Copyright IBM, Corp. 2008
+ * Authors:
+ *  Anthony Liguori        <aliguori@us.ibm.com>
+ */
+
+#ifndef QEMU_FT_TRANSACTION_FILE_H
+#define QEMU_FT_TRANSACTION_FILE_H
+
+#include "hw/hw.h"
+
+enum QEMU_VM_TRANSACTION_STATE {
+    QEMU_VM_TRANSACTION_NACK = -1,
+    QEMU_VM_TRANSACTION_INIT,
+    QEMU_VM_TRANSACTION_BEGIN,
+    QEMU_VM_TRANSACTION_CONTINUE,
+    QEMU_VM_TRANSACTION_COMMIT,
+    QEMU_VM_TRANSACTION_CANCEL,
+    QEMU_VM_TRANSACTION_ATOMIC,
+    QEMU_VM_TRANSACTION_ACK,
+};
+
+enum FT_MODE {
+    FT_ERROR = -1,
+    FT_OFF,
+    FT_INIT,
+    FT_TRANSACTION_BEGIN,
+    FT_TRANSACTION_ITER,
+    FT_TRANSACTION_COMMIT,
+    FT_TRANSACTION_ATOMIC,
+    FT_TRANSACTION_RECV,
+};
+extern enum FT_MODE ft_mode;
+
+#define FT_TRANS_ERR_UNKNOWN       0x01 /* Unknown error */
+#define FT_TRANS_ERR_SEND_HDR      0x02 /* Send header failed */
+#define FT_TRANS_ERR_RECV_HDR      0x03 /* Recv header failed */
+#define FT_TRANS_ERR_SEND_PAYLOAD  0x04 /* Send payload failed */
+#define FT_TRANS_ERR_RECV_PAYLOAD  0x05 /* Recv payload failed */
+#define FT_TRANS_ERR_FLUSH         0x06 /* Flush buffered data failed */
+#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
+
+typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size);
+typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t pos, size_t size);
+typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt);
+typedef int (FtTransPutReadyFunc)(void);
+typedef int (FtTransGetReadyFunc)(void *opaque);
+typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
+typedef int (FtTransCloseFunc)(void *opaque);
+
+int ft_trans_begin(void *opaque);
+int ft_trans_commit(void *opaque);
+int ft_trans_cancel(void *opaque);
+
+QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
+                                  FtTransPutBufferFunc *put_buffer,
+                                  FtTransGetBufferFunc *get_buffer,
+                                  FtTransPutReadyFunc *put_ready,
+                                  FtTransGetReadyFunc *get_ready,
+                                  FtTransWaitForUnfreezeFunc *wait_for_unfreeze,
+                                  FtTransCloseFunc *close,
+                                  bool is_sender);
+
+#endif
diff --git a/migration.c b/migration.c
index bd51fef..3be3554 100644
--- a/migration.c
+++ b/migration.c
@@ -15,6 +15,7 @@ 
 #include "migration.h"
 #include "monitor.h"
 #include "buffered_file.h"
+#include "ft_trans_file.h"
 #include "sysemu.h"
 #include "block.h"
 #include "qemu_socket.h"
@@ -31,6 +32,8 @@ 
     do { } while (0)
 #endif
 
+enum FT_MODE ft_mode = FT_OFF;
+
 /* Migration speed throttling */
 static int64_t max_throttle = (32 << 20);
 
diff --git a/trace-events b/trace-events
index e6138ea..50ac840 100644
--- a/trace-events
+++ b/trace-events
@@ -254,3 +254,18 @@  disable spice_vmc_write(ssize_t out, int len) "spice wrottn %lu of requested %zd
 disable spice_vmc_read(int bytes, int len) "spice read %lu of requested %zd"
 disable spice_vmc_register_interface(void *scd) "spice vmc registered interface %p"
 disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered interface %p"
+
+# ft_trans_file.c
+disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing buffer from %zu by %zu"
+disable ft_trans_append(size_t size) "buffering %zu bytes"
+disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes"
+disable ft_trans_send_header(uint16_t cmd) "send header %d"
+disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
+disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at %"PRId64""
+disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) "recv %d of %d total %d"
+disable ft_trans_close(void) "closing"
+disable ft_trans_freeze_output(void) "backend not ready, freezing output"
+disable ft_trans_freeze_input(void) "backend not ready, freezing input"
+disable ft_trans_put_ready(void) "file is ready to put"
+disable ft_trans_get_ready(void) "file is ready to get"
+disable ft_trans_cb(void *cb) "callback %p"