| Submitter | Yoshiaki Tamura |
|---|---|
| Date | Feb. 10, 2011, 9:30 a.m. |
| Message ID | <1297330258-20494-8-git-send-email-tamura.yoshiaki@lab.ntt.co.jp> |
| Download | mbox | patch |
| Permalink | /patch/82587/ |
| State | New |
| Headers | show |
Comments
Yoshiaki:
I have one question about ram_save_live, during migration 3
stage(completation stage), it will call
cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty pages.
at the end of migrate_ft_trans_connect function, it will invoke vm_start(),
at this time, cpu_physical_memory_set_dirty_tracking(1) is not called yet,
so there may have some ram pages not recorded when qemu_savevm_trans_begin
is called. I think you need calll
cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect
function, Am I right?
BR
Green.
2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> This code implements VM transaction protocol. Like buffered_file, it
> sits between savevm and migration layer. With this architecture, VM
> transaction protocol is implemented mostly independent from other
> existing code.
>
> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp>
> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp>
> ---
> Makefile.objs | 1 +
> ft_trans_file.c | 624
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> ft_trans_file.h | 72 +++++++
> migration.c | 3 +
> trace-events | 15 ++
> 5 files changed, 715 insertions(+), 0 deletions(-)
> create mode 100644 ft_trans_file.c
> create mode 100644 ft_trans_file.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 353b1a8..04148b5 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o
> common-obj-y += qdev.o qdev-properties.o
> common-obj-y += block-migration.o
> common-obj-y += pflib.o
> +common-obj-y += ft_trans_file.o
>
> common-obj-$(CONFIG_BRLAPI) += baum.o
> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o
> migration-fd.o
> diff --git a/ft_trans_file.c b/ft_trans_file.c
> new file mode 100644
> index 0000000..2b42b95
> --- /dev/null
> +++ b/ft_trans_file.c
> @@ -0,0 +1,624 @@
> +/*
> + * Fault tolerant VM transaction QEMUFile
> + *
> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + *
> + * This source code is based on buffered_file.c.
> + * Copyright IBM, Corp. 2008
> + * Authors:
> + * Anthony Liguori <aliguori@us.ibm.com>
> + */
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include "hw/hw.h"
> +#include "qemu-timer.h"
> +#include "sysemu.h"
> +#include "qemu-char.h"
> +#include "trace.h"
> +#include "ft_trans_file.h"
> +
> +typedef struct FtTransHdr
> +{
> + uint16_t cmd;
> + uint16_t id;
> + uint32_t seq;
> + uint32_t payload_len;
> +} FtTransHdr;
> +
> +typedef struct QEMUFileFtTrans
> +{
> + FtTransPutBufferFunc *put_buffer;
> + FtTransGetBufferFunc *get_buffer;
> + FtTransPutReadyFunc *put_ready;
> + FtTransGetReadyFunc *get_ready;
> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze;
> + FtTransCloseFunc *close;
> + void *opaque;
> + QEMUFile *file;
> +
> + enum QEMU_VM_TRANSACTION_STATE state;
> + uint32_t seq;
> + uint16_t id;
> +
> + int has_error;
> +
> + bool freeze_output;
> + bool freeze_input;
> + bool rate_limit;
> + bool is_sender;
> + bool is_payload;
> +
> + uint8_t *buf;
> + size_t buf_max_size;
> + size_t put_offset;
> + size_t get_offset;
> +
> + FtTransHdr header;
> + size_t header_offset;
> +} QEMUFileFtTrans;
> +
> +#define IO_BUF_SIZE 32768
> +
> +static void ft_trans_append(QEMUFileFtTrans *s,
> + const uint8_t *buf, size_t size)
> +{
> + if (size > (s->buf_max_size - s->put_offset)) {
> + trace_ft_trans_realloc(s->buf_max_size, size + 1024);
> + s->buf_max_size += size + 1024;
> + s->buf = qemu_realloc(s->buf, s->buf_max_size);
> + }
> +
> + trace_ft_trans_append(size);
> + memcpy(s->buf + s->put_offset, buf, size);
> + s->put_offset += size;
> +}
> +
> +static void ft_trans_flush(QEMUFileFtTrans *s)
> +{
> + size_t offset = 0;
> +
> + if (s->has_error) {
> + error_report("flush when error %d, bailing", s->has_error);
> + return;
> + }
> +
> + while (offset < s->put_offset) {
> + ssize_t ret;
> +
> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset -
> offset);
> + if (ret == -EAGAIN) {
> + break;
> + }
> +
> + if (ret <= 0) {
> + error_report("error flushing data, %s", strerror(errno));
> + s->has_error = FT_TRANS_ERR_FLUSH;
> + break;
> + } else {
> + offset += ret;
> + }
> + }
> +
> + trace_ft_trans_flush(offset, s->put_offset);
> + memmove(s->buf, s->buf + offset, s->put_offset - offset);
> + s->put_offset -= offset;
> + s->freeze_output = !!s->put_offset;
> +}
> +
> +static ssize_t ft_trans_put(void *opaque, void *buf, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + size_t offset = 0;
> + ssize_t len;
> +
> + /* flush buffered data before putting next */
> + if (s->put_offset) {
> + ft_trans_flush(s);
> + }
> +
> + while (!s->freeze_output && offset < size) {
> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size -
> offset);
> +
> + if (len == -EAGAIN) {
> + trace_ft_trans_freeze_output();
> + s->freeze_output = 1;
> + break;
> + }
> +
> + if (len <= 0) {
> + error_report("putting data failed, %s", strerror(errno));
> + s->has_error = 1;
> + offset = -EINVAL;
> + break;
> + }
> +
> + offset += len;
> + }
> +
> + if (s->freeze_output) {
> + ft_trans_append(s, buf + offset, size - offset);
> + offset = size;
> + }
> +
> + return offset;
> +}
> +
> +static int ft_trans_send_header(QEMUFileFtTrans *s,
> + enum QEMU_VM_TRANSACTION_STATE state,
> + uint32_t payload_len)
> +{
> + int ret;
> + FtTransHdr *hdr = &s->header;
> +
> + trace_ft_trans_send_header(state);
> +
> + hdr->cmd = s->state = state;
> + hdr->id = s->id;
> + hdr->seq = s->seq;
> + hdr->payload_len = payload_len;
> +
> + ret = ft_trans_put(s, hdr, sizeof(*hdr));
> + if (ret < 0) {
> + error_report("send header failed");
> + s->has_error = FT_TRANS_ERR_SEND_HDR;
> + }
> +
> + return ret;
> +}
> +
> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t
> pos, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + ssize_t ret;
> +
> + trace_ft_trans_put_buffer(size, pos);
> +
> + if (s->has_error) {
> + error_report("put_buffer when error %d, bailing", s->has_error);
> + return -EINVAL;
> + }
> +
> + /* assuming qemu_file_put_notify() is calling */
> + if (pos == 0 && size == 0) {
> + trace_ft_trans_put_ready();
> + ft_trans_flush(s);
> +
> + if (!s->freeze_output) {
> + trace_ft_trans_cb(s->put_ready);
> + ret = s->put_ready();
> + }
> +
> + ret = 0;
> + goto out;
> + }
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + ret = ft_trans_put(s, (uint8_t *)buf, size);
> + if (ret < 0) {
> + error_report("send palyload failed");
> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD;
> + goto out;
> + }
> +
> + s->seq++;
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + size_t offset = 0;
> + ssize_t len;
> +
> + s->freeze_input = 0;
> +
> + while (offset < size) {
> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset,
> + 0, size - offset);
> + if (len == -EAGAIN) {
> + trace_ft_trans_freeze_input();
> + s->freeze_input = 1;
> + break;
> + }
> +
> + if (len <= 0) {
> + error_report("fill buffer failed, %s", strerror(errno));
> + s->has_error = 1;
> + return -EINVAL;
> + }
> +
> + offset += len;
> + }
> +
> + return offset;
> +}
> +
> +static int ft_trans_recv_header(QEMUFileFtTrans *s)
> +{
> + int ret;
> + char *buf = (char *)&s->header + s->header_offset;
> +
> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) -
> s->header_offset);
> + if (ret < 0) {
> + error_report("recv header failed");
> + s->has_error = FT_TRANS_ERR_RECV_HDR;
> + goto out;
> + }
> +
> + s->header_offset += ret;
> + if (s->header_offset == sizeof(FtTransHdr)) {
> + trace_ft_trans_recv_header(s->header.cmd);
> + s->state = s->header.cmd;
> + s->header_offset = 0;
> +
> + if (!s->is_sender) {
> + s->id = s->header.id;
> + s->seq = s->header.seq;
> + }
> + }
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_recv_payload(QEMUFileFtTrans *s)
> +{
> + QEMUFile *f = s->file;
> + int ret = -1;
> +
> + /* extend QEMUFile buf if there weren't enough space */
> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) {
> + s->buf_max_size += (s->header.payload_len -
> + (s->buf_max_size - s->get_offset));
> + s->buf = qemu_realloc_buffer(f, s->buf_max_size);
> + }
> +
> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset,
> + s->header.payload_len);
> + if (ret < 0) {
> + error_report("recv payload failed");
> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD;
> + goto out;
> + }
> +
> + trace_ft_trans_recv_payload(ret, s->header.payload_len,
> s->get_offset);
> +
> + s->header.payload_len -= ret;
> + s->get_offset += ret;
> + s->is_payload = !!s->header.payload_len;
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_recv(QEMUFileFtTrans *s)
> +{
> + int ret;
> +
> + /* get payload and return */
> + if (s->is_payload) {
> + ret = ft_trans_recv_payload(s);
> + goto out;
> + }
> +
> + ret = ft_trans_recv_header(s);
> + if (ret < 0 || s->freeze_input) {
> + goto out;
> + }
> +
> + switch (s->state) {
> + case QEMU_VM_TRANSACTION_BEGIN:
> + /* CONTINUE or COMMIT should come shortly */
> + s->is_payload = 0;
> + break;
> +
> + case QEMU_VM_TRANSACTION_CONTINUE:
> + /* get payload */
> + s->is_payload = 1;
> + break;
> +
> + case QEMU_VM_TRANSACTION_COMMIT:
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + trace_ft_trans_cb(s->get_ready);
> + ret = s->get_ready(s->opaque);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + qemu_clear_buffer(s->file);
> + s->get_offset = 0;
> + s->is_payload = 0;
> +
> + break;
> +
> + case QEMU_VM_TRANSACTION_ATOMIC:
> + /* not implemented yet */
> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d",
> + ret);
> + break;
> +
> + case QEMU_VM_TRANSACTION_CANCEL:
> + /* return -EINVAL until migrate cancel on recevier side is
> supported */
> + ret = -EINVAL;
> + break;
> +
> + default:
> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + }
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf,
> + int64_t pos, int size)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> +
> + if (s->has_error) {
> + error_report("get_buffer when error %d, bailing", s->has_error);
> + return -EINVAL;
> + }
> +
> + /* assuming qemu_file_get_notify() is calling */
> + if (pos == 0 && size == 0) {
> + trace_ft_trans_get_ready();
> + s->freeze_input = 0;
> +
> + /* sender should be waiting for ACK */
> + if (s->is_sender) {
> + ret = ft_trans_recv_header(s);
> + if (s->freeze_input) {
> + ret = 0;
> + goto out;
> + }
> + if (ret < 0) {
> + error_report("recv ack failed");
> + goto out;
> + }
> +
> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> + error_report("recv invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + trace_ft_trans_cb(s->get_ready);
> + ret = s->get_ready(s->opaque);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + /* proceed trans id */
> + s->id++;
> +
> + return 0;
> + }
> +
> + /* set QEMUFile buf at beginning */
> + if (!s->buf) {
> + s->buf = buf;
> + }
> +
> + ret = ft_trans_recv(s);
> + goto out;
> + }
> +
> + ret = s->get_offset;
> +
> +out:
> + return ret;
> +}
> +
> +static int ft_trans_close(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> +
> + trace_ft_trans_close();
> + ret = s->close(s->opaque);
> + if (s->is_sender) {
> + qemu_free(s->buf);
> + }
> + qemu_free(s);
> +
> + return ret;
> +}
> +
> +static int ft_trans_rate_limit(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> +
> + if (s->has_error) {
> + return 0;
> + }
> +
> + if (s->rate_limit && s->freeze_output) {
> + return 1;
> + }
> +
> + return 0;
> +}
> +
> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate)
> +{
> + QEMUFileFtTrans *s = opaque;
> +
> + if (s->has_error) {
> + goto out;
> + }
> +
> + s->rate_limit = !!new_rate;
> +
> +out:
> + return s->rate_limit;
> +}
> +
> +int ft_trans_begin(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> + s->seq = 0;
> +
> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */
> + if (!s->is_sender) {
> + if (s->state != QEMU_VM_TRANSACTION_INIT) {
> + error_report("invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + }
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> + goto out;
> + }
> +
> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */
> + if (s->state == QEMU_VM_TRANSACTION_INIT) {
> +retry:
> + ret = ft_trans_recv_header(s);
> + if (s->freeze_input) {
> + goto retry;
> + }
> + if (ret < 0) {
> + error_report("recv ack failed");
> + goto out;
> + }
> +
> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> + error_report("recv invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + goto out;
> + }
> + }
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + s->state = QEMU_VM_TRANSACTION_CONTINUE;
> +
> +out:
> + return ret;
> +}
> +
> +int ft_trans_commit(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> + int ret;
> +
> + if (!s->is_sender) {
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0);
> + goto out;
> + }
> +
> + /* sender should flush buf before sending COMMIT */
> + qemu_fflush(s->file);
> +
> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0);
> + if (ret < 0) {
> + goto out;
> + }
> +
> + while (!s->has_error && s->put_offset) {
> + ft_trans_flush(s);
> + if (s->freeze_output) {
> + s->wait_for_unfreeze(s);
> + }
> + }
> +
> + if (s->has_error) {
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + ret = ft_trans_recv_header(s);
> + if (s->freeze_input) {
> + ret = -EAGAIN;
> + goto out;
> + }
> + if (ret < 0) {
> + error_report("recv ack failed");
> + goto out;
> + }
> +
> + if (s->state != QEMU_VM_TRANSACTION_ACK) {
> + error_report("recv invalid state %d", s->state);
> + s->has_error = FT_TRANS_ERR_STATE_INVALID;
> + ret = -EINVAL;
> + goto out;
> + }
> +
> + s->id++;
> + ret = 0;
> +
> +out:
> + return ret;
> +}
> +
> +int ft_trans_cancel(void *opaque)
> +{
> + QEMUFileFtTrans *s = opaque;
> +
> + /* invalid until migrate cancel on recevier side is supported */
> + if (!s->is_sender) {
> + return -EINVAL;
> + }
> +
> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0);
> +}
> +
> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
> + FtTransPutBufferFunc *put_buffer,
> + FtTransGetBufferFunc *get_buffer,
> + FtTransPutReadyFunc *put_ready,
> + FtTransGetReadyFunc *get_ready,
> + FtTransWaitForUnfreezeFunc
> *wait_for_unfreeze,
> + FtTransCloseFunc *close,
> + bool is_sender)
> +{
> + QEMUFileFtTrans *s;
> +
> + s = qemu_mallocz(sizeof(*s));
> +
> + s->opaque = opaque;
> + s->put_buffer = put_buffer;
> + s->get_buffer = get_buffer;
> + s->put_ready = put_ready;
> + s->get_ready = get_ready;
> + s->wait_for_unfreeze = wait_for_unfreeze;
> + s->close = close;
> + s->is_sender = is_sender;
> + s->id = 0;
> + s->seq = 0;
> + s->rate_limit = 1;
> +
> + if (!s->is_sender) {
> + s->buf_max_size = IO_BUF_SIZE;
> + }
> +
> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer,
> + ft_trans_close, ft_trans_rate_limit,
> + ft_trans_set_rate_limit, NULL);
> +
> + return s->file;
> +}
> diff --git a/ft_trans_file.h b/ft_trans_file.h
> new file mode 100644
> index 0000000..5ca6b53
> --- /dev/null
> +++ b/ft_trans_file.h
> @@ -0,0 +1,72 @@
> +/*
> + * Fault tolerant VM transaction QEMUFile
> + *
> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + *
> + * This source code is based on buffered_file.h.
> + * Copyright IBM, Corp. 2008
> + * Authors:
> + * Anthony Liguori <aliguori@us.ibm.com>
> + */
> +
> +#ifndef QEMU_FT_TRANSACTION_FILE_H
> +#define QEMU_FT_TRANSACTION_FILE_H
> +
> +#include "hw/hw.h"
> +
> +enum QEMU_VM_TRANSACTION_STATE {
> + QEMU_VM_TRANSACTION_NACK = -1,
> + QEMU_VM_TRANSACTION_INIT,
> + QEMU_VM_TRANSACTION_BEGIN,
> + QEMU_VM_TRANSACTION_CONTINUE,
> + QEMU_VM_TRANSACTION_COMMIT,
> + QEMU_VM_TRANSACTION_CANCEL,
> + QEMU_VM_TRANSACTION_ATOMIC,
> + QEMU_VM_TRANSACTION_ACK,
> +};
> +
> +enum FT_MODE {
> + FT_ERROR = -1,
> + FT_OFF,
> + FT_INIT,
> + FT_TRANSACTION_BEGIN,
> + FT_TRANSACTION_ITER,
> + FT_TRANSACTION_COMMIT,
> + FT_TRANSACTION_ATOMIC,
> + FT_TRANSACTION_RECV,
> +};
> +extern enum FT_MODE ft_mode;
> +
> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */
> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */
> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */
> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */
> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */
> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */
> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */
> +
> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data,
> size_t size);
> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t
> pos, size_t size);
> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec
> *iov, int iovcnt);
> +typedef int (FtTransPutReadyFunc)(void);
> +typedef int (FtTransGetReadyFunc)(void *opaque);
> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque);
> +typedef int (FtTransCloseFunc)(void *opaque);
> +
> +int ft_trans_begin(void *opaque);
> +int ft_trans_commit(void *opaque);
> +int ft_trans_cancel(void *opaque);
> +
> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque,
> + FtTransPutBufferFunc *put_buffer,
> + FtTransGetBufferFunc *get_buffer,
> + FtTransPutReadyFunc *put_ready,
> + FtTransGetReadyFunc *get_ready,
> + FtTransWaitForUnfreezeFunc
> *wait_for_unfreeze,
> + FtTransCloseFunc *close,
> + bool is_sender);
> +
> +#endif
> diff --git a/migration.c b/migration.c
> index dd3bf94..c5e0146 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -15,6 +15,7 @@
> #include "migration.h"
> #include "monitor.h"
> #include "buffered_file.h"
> +#include "ft_trans_file.h"
> #include "sysemu.h"
> #include "block.h"
> #include "qemu_socket.h"
> @@ -31,6 +32,8 @@
> do { } while (0)
> #endif
>
> +enum FT_MODE ft_mode = FT_OFF;
> +
> /* Migration speed throttling */
> static int64_t max_throttle = (32 << 20);
>
> diff --git a/trace-events b/trace-events
> index e6138ea..50ac840 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice
> wrottn %lu of requested %zd
> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested
> %zd"
> disable spice_vmc_register_interface(void *scd) "spice vmc registered
> interface %p"
> disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered
> interface %p"
> +
> +# ft_trans_file.c
> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing
> buffer from %zu by %zu"
> +disable ft_trans_append(size_t size) "buffering %zu bytes"
> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes"
> +disable ft_trans_send_header(uint16_t cmd) "send header %d"
> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d"
> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at
> %"PRId64""
> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total)
> "recv %d of %d total %d"
> +disable ft_trans_close(void) "closing"
> +disable ft_trans_freeze_output(void) "backend not ready, freezing output"
> +disable ft_trans_freeze_input(void) "backend not ready, freezing input"
> +disable ft_trans_put_ready(void) "file is ready to put"
> +disable ft_trans_get_ready(void) "file is ready to get"
> +disable ft_trans_cb(void *cb) "callback %p"
> --
> 1.7.1.2
>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
Hi Green, 2011/2/21 ya su <suya94335@gmail.com>: > Yoshiaki: > > I have one question about ram_save_live, during migration 3 > stage(completation stage), it will call > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty pages. > at the end of migrate_ft_trans_connect function, it will invoke vm_start(), > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called yet, > so there may have some ram pages not recorded when qemu_savevm_trans_begin > is called. I think you need calll > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect > function, Am I right? Thank you for taking a look. When qemu_savevm_trans_begin is called for the first time, it calls ram_save_live with stage 1, that sends all pages and sets dirty tracking, so there won't be missing pages. Note that event-tap is turned on by then, meaning no outputs are sent before finishing the first transaction. I understand that this implementation is inefficient, and planning to introduce a new stage that is almost same as stage 3 but keeps dirty tracking in the future. Thanks, Yoshi > > BR > > Green. > > > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> This code implements VM transaction protocol. Like buffered_file, it >> sits between savevm and migration layer. With this architecture, VM >> transaction protocol is implemented mostly independent from other >> existing code. >> >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> >> --- >> Makefile.objs | 1 + >> ft_trans_file.c | 624 >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> ft_trans_file.h | 72 +++++++ >> migration.c | 3 + >> trace-events | 15 ++ >> 5 files changed, 715 insertions(+), 0 deletions(-) >> create mode 100644 ft_trans_file.c >> create mode 100644 ft_trans_file.h >> >> diff --git a/Makefile.objs b/Makefile.objs >> index 353b1a8..04148b5 100644 >> --- a/Makefile.objs >> +++ b/Makefile.objs >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o >> common-obj-y += qdev.o qdev-properties.o >> common-obj-y += block-migration.o >> common-obj-y += pflib.o >> +common-obj-y += ft_trans_file.o >> >> common-obj-$(CONFIG_BRLAPI) += baum.o >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o >> migration-fd.o >> diff --git a/ft_trans_file.c b/ft_trans_file.c >> new file mode 100644 >> index 0000000..2b42b95 >> --- /dev/null >> +++ b/ft_trans_file.c >> @@ -0,0 +1,624 @@ >> +/* >> + * Fault tolerant VM transaction QEMUFile >> + * >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2. See >> + * the COPYING file in the top-level directory. >> + * >> + * This source code is based on buffered_file.c. >> + * Copyright IBM, Corp. 2008 >> + * Authors: >> + * Anthony Liguori <aliguori@us.ibm.com> >> + */ >> + >> +#include "qemu-common.h" >> +#include "qemu-error.h" >> +#include "hw/hw.h" >> +#include "qemu-timer.h" >> +#include "sysemu.h" >> +#include "qemu-char.h" >> +#include "trace.h" >> +#include "ft_trans_file.h" >> + >> +typedef struct FtTransHdr >> +{ >> + uint16_t cmd; >> + uint16_t id; >> + uint32_t seq; >> + uint32_t payload_len; >> +} FtTransHdr; >> + >> +typedef struct QEMUFileFtTrans >> +{ >> + FtTransPutBufferFunc *put_buffer; >> + FtTransGetBufferFunc *get_buffer; >> + FtTransPutReadyFunc *put_ready; >> + FtTransGetReadyFunc *get_ready; >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; >> + FtTransCloseFunc *close; >> + void *opaque; >> + QEMUFile *file; >> + >> + enum QEMU_VM_TRANSACTION_STATE state; >> + uint32_t seq; >> + uint16_t id; >> + >> + int has_error; >> + >> + bool freeze_output; >> + bool freeze_input; >> + bool rate_limit; >> + bool is_sender; >> + bool is_payload; >> + >> + uint8_t *buf; >> + size_t buf_max_size; >> + size_t put_offset; >> + size_t get_offset; >> + >> + FtTransHdr header; >> + size_t header_offset; >> +} QEMUFileFtTrans; >> + >> +#define IO_BUF_SIZE 32768 >> + >> +static void ft_trans_append(QEMUFileFtTrans *s, >> + const uint8_t *buf, size_t size) >> +{ >> + if (size > (s->buf_max_size - s->put_offset)) { >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024); >> + s->buf_max_size += size + 1024; >> + s->buf = qemu_realloc(s->buf, s->buf_max_size); >> + } >> + >> + trace_ft_trans_append(size); >> + memcpy(s->buf + s->put_offset, buf, size); >> + s->put_offset += size; >> +} >> + >> +static void ft_trans_flush(QEMUFileFtTrans *s) >> +{ >> + size_t offset = 0; >> + >> + if (s->has_error) { >> + error_report("flush when error %d, bailing", s->has_error); >> + return; >> + } >> + >> + while (offset < s->put_offset) { >> + ssize_t ret; >> + >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - >> offset); >> + if (ret == -EAGAIN) { >> + break; >> + } >> + >> + if (ret <= 0) { >> + error_report("error flushing data, %s", strerror(errno)); >> + s->has_error = FT_TRANS_ERR_FLUSH; >> + break; >> + } else { >> + offset += ret; >> + } >> + } >> + >> + trace_ft_trans_flush(offset, s->put_offset); >> + memmove(s->buf, s->buf + offset, s->put_offset - offset); >> + s->put_offset -= offset; >> + s->freeze_output = !!s->put_offset; >> +} >> + >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + size_t offset = 0; >> + ssize_t len; >> + >> + /* flush buffered data before putting next */ >> + if (s->put_offset) { >> + ft_trans_flush(s); >> + } >> + >> + while (!s->freeze_output && offset < size) { >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - >> offset); >> + >> + if (len == -EAGAIN) { >> + trace_ft_trans_freeze_output(); >> + s->freeze_output = 1; >> + break; >> + } >> + >> + if (len <= 0) { >> + error_report("putting data failed, %s", strerror(errno)); >> + s->has_error = 1; >> + offset = -EINVAL; >> + break; >> + } >> + >> + offset += len; >> + } >> + >> + if (s->freeze_output) { >> + ft_trans_append(s, buf + offset, size - offset); >> + offset = size; >> + } >> + >> + return offset; >> +} >> + >> +static int ft_trans_send_header(QEMUFileFtTrans *s, >> + enum QEMU_VM_TRANSACTION_STATE state, >> + uint32_t payload_len) >> +{ >> + int ret; >> + FtTransHdr *hdr = &s->header; >> + >> + trace_ft_trans_send_header(state); >> + >> + hdr->cmd = s->state = state; >> + hdr->id = s->id; >> + hdr->seq = s->seq; >> + hdr->payload_len = payload_len; >> + >> + ret = ft_trans_put(s, hdr, sizeof(*hdr)); >> + if (ret < 0) { >> + error_report("send header failed"); >> + s->has_error = FT_TRANS_ERR_SEND_HDR; >> + } >> + >> + return ret; >> +} >> + >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t >> pos, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + ssize_t ret; >> + >> + trace_ft_trans_put_buffer(size, pos); >> + >> + if (s->has_error) { >> + error_report("put_buffer when error %d, bailing", s->has_error); >> + return -EINVAL; >> + } >> + >> + /* assuming qemu_file_put_notify() is calling */ >> + if (pos == 0 && size == 0) { >> + trace_ft_trans_put_ready(); >> + ft_trans_flush(s); >> + >> + if (!s->freeze_output) { >> + trace_ft_trans_cb(s->put_ready); >> + ret = s->put_ready(); >> + } >> + >> + ret = 0; >> + goto out; >> + } >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + ret = ft_trans_put(s, (uint8_t *)buf, size); >> + if (ret < 0) { >> + error_report("send palyload failed"); >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; >> + goto out; >> + } >> + >> + s->seq++; >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + size_t offset = 0; >> + ssize_t len; >> + >> + s->freeze_input = 0; >> + >> + while (offset < size) { >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, >> + 0, size - offset); >> + if (len == -EAGAIN) { >> + trace_ft_trans_freeze_input(); >> + s->freeze_input = 1; >> + break; >> + } >> + >> + if (len <= 0) { >> + error_report("fill buffer failed, %s", strerror(errno)); >> + s->has_error = 1; >> + return -EINVAL; >> + } >> + >> + offset += len; >> + } >> + >> + return offset; >> +} >> + >> +static int ft_trans_recv_header(QEMUFileFtTrans *s) >> +{ >> + int ret; >> + char *buf = (char *)&s->header + s->header_offset; >> + >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - >> s->header_offset); >> + if (ret < 0) { >> + error_report("recv header failed"); >> + s->has_error = FT_TRANS_ERR_RECV_HDR; >> + goto out; >> + } >> + >> + s->header_offset += ret; >> + if (s->header_offset == sizeof(FtTransHdr)) { >> + trace_ft_trans_recv_header(s->header.cmd); >> + s->state = s->header.cmd; >> + s->header_offset = 0; >> + >> + if (!s->is_sender) { >> + s->id = s->header.id; >> + s->seq = s->header.seq; >> + } >> + } >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s) >> +{ >> + QEMUFile *f = s->file; >> + int ret = -1; >> + >> + /* extend QEMUFile buf if there weren't enough space */ >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { >> + s->buf_max_size += (s->header.payload_len - >> + (s->buf_max_size - s->get_offset)); >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size); >> + } >> + >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, >> + s->header.payload_len); >> + if (ret < 0) { >> + error_report("recv payload failed"); >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; >> + goto out; >> + } >> + >> + trace_ft_trans_recv_payload(ret, s->header.payload_len, >> s->get_offset); >> + >> + s->header.payload_len -= ret; >> + s->get_offset += ret; >> + s->is_payload = !!s->header.payload_len; >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_recv(QEMUFileFtTrans *s) >> +{ >> + int ret; >> + >> + /* get payload and return */ >> + if (s->is_payload) { >> + ret = ft_trans_recv_payload(s); >> + goto out; >> + } >> + >> + ret = ft_trans_recv_header(s); >> + if (ret < 0 || s->freeze_input) { >> + goto out; >> + } >> + >> + switch (s->state) { >> + case QEMU_VM_TRANSACTION_BEGIN: >> + /* CONTINUE or COMMIT should come shortly */ >> + s->is_payload = 0; >> + break; >> + >> + case QEMU_VM_TRANSACTION_CONTINUE: >> + /* get payload */ >> + s->is_payload = 1; >> + break; >> + >> + case QEMU_VM_TRANSACTION_COMMIT: >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + trace_ft_trans_cb(s->get_ready); >> + ret = s->get_ready(s->opaque); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + qemu_clear_buffer(s->file); >> + s->get_offset = 0; >> + s->is_payload = 0; >> + >> + break; >> + >> + case QEMU_VM_TRANSACTION_ATOMIC: >> + /* not implemented yet */ >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", >> + ret); >> + break; >> + >> + case QEMU_VM_TRANSACTION_CANCEL: >> + /* return -EINVAL until migrate cancel on recevier side is >> supported */ >> + ret = -EINVAL; >> + break; >> + >> + default: >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + } >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, >> + int64_t pos, int size) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + >> + if (s->has_error) { >> + error_report("get_buffer when error %d, bailing", s->has_error); >> + return -EINVAL; >> + } >> + >> + /* assuming qemu_file_get_notify() is calling */ >> + if (pos == 0 && size == 0) { >> + trace_ft_trans_get_ready(); >> + s->freeze_input = 0; >> + >> + /* sender should be waiting for ACK */ >> + if (s->is_sender) { >> + ret = ft_trans_recv_header(s); >> + if (s->freeze_input) { >> + ret = 0; >> + goto out; >> + } >> + if (ret < 0) { >> + error_report("recv ack failed"); >> + goto out; >> + } >> + >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> + error_report("recv invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + goto out; >> + } >> + >> + trace_ft_trans_cb(s->get_ready); >> + ret = s->get_ready(s->opaque); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + /* proceed trans id */ >> + s->id++; >> + >> + return 0; >> + } >> + >> + /* set QEMUFile buf at beginning */ >> + if (!s->buf) { >> + s->buf = buf; >> + } >> + >> + ret = ft_trans_recv(s); >> + goto out; >> + } >> + >> + ret = s->get_offset; >> + >> +out: >> + return ret; >> +} >> + >> +static int ft_trans_close(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + >> + trace_ft_trans_close(); >> + ret = s->close(s->opaque); >> + if (s->is_sender) { >> + qemu_free(s->buf); >> + } >> + qemu_free(s); >> + >> + return ret; >> +} >> + >> +static int ft_trans_rate_limit(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + >> + if (s->has_error) { >> + return 0; >> + } >> + >> + if (s->rate_limit && s->freeze_output) { >> + return 1; >> + } >> + >> + return 0; >> +} >> + >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + >> + if (s->has_error) { >> + goto out; >> + } >> + >> + s->rate_limit = !!new_rate; >> + >> +out: >> + return s->rate_limit; >> +} >> + >> +int ft_trans_begin(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + s->seq = 0; >> + >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ >> + if (!s->is_sender) { >> + if (s->state != QEMU_VM_TRANSACTION_INIT) { >> + error_report("invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + } >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> + goto out; >> + } >> + >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ >> + if (s->state == QEMU_VM_TRANSACTION_INIT) { >> +retry: >> + ret = ft_trans_recv_header(s); >> + if (s->freeze_input) { >> + goto retry; >> + } >> + if (ret < 0) { >> + error_report("recv ack failed"); >> + goto out; >> + } >> + >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> + error_report("recv invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + goto out; >> + } >> + } >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + s->state = QEMU_VM_TRANSACTION_CONTINUE; >> + >> +out: >> + return ret; >> +} >> + >> +int ft_trans_commit(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + int ret; >> + >> + if (!s->is_sender) { >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> + goto out; >> + } >> + >> + /* sender should flush buf before sending COMMIT */ >> + qemu_fflush(s->file); >> + >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); >> + if (ret < 0) { >> + goto out; >> + } >> + >> + while (!s->has_error && s->put_offset) { >> + ft_trans_flush(s); >> + if (s->freeze_output) { >> + s->wait_for_unfreeze(s); >> + } >> + } >> + >> + if (s->has_error) { >> + ret = -EINVAL; >> + goto out; >> + } >> + >> + ret = ft_trans_recv_header(s); >> + if (s->freeze_input) { >> + ret = -EAGAIN; >> + goto out; >> + } >> + if (ret < 0) { >> + error_report("recv ack failed"); >> + goto out; >> + } >> + >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> + error_report("recv invalid state %d", s->state); >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> + ret = -EINVAL; >> + goto out; >> + } >> + >> + s->id++; >> + ret = 0; >> + >> +out: >> + return ret; >> +} >> + >> +int ft_trans_cancel(void *opaque) >> +{ >> + QEMUFileFtTrans *s = opaque; >> + >> + /* invalid until migrate cancel on recevier side is supported */ >> + if (!s->is_sender) { >> + return -EINVAL; >> + } >> + >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); >> +} >> + >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> + FtTransPutBufferFunc *put_buffer, >> + FtTransGetBufferFunc *get_buffer, >> + FtTransPutReadyFunc *put_ready, >> + FtTransGetReadyFunc *get_ready, >> + FtTransWaitForUnfreezeFunc >> *wait_for_unfreeze, >> + FtTransCloseFunc *close, >> + bool is_sender) >> +{ >> + QEMUFileFtTrans *s; >> + >> + s = qemu_mallocz(sizeof(*s)); >> + >> + s->opaque = opaque; >> + s->put_buffer = put_buffer; >> + s->get_buffer = get_buffer; >> + s->put_ready = put_ready; >> + s->get_ready = get_ready; >> + s->wait_for_unfreeze = wait_for_unfreeze; >> + s->close = close; >> + s->is_sender = is_sender; >> + s->id = 0; >> + s->seq = 0; >> + s->rate_limit = 1; >> + >> + if (!s->is_sender) { >> + s->buf_max_size = IO_BUF_SIZE; >> + } >> + >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer, >> + ft_trans_close, ft_trans_rate_limit, >> + ft_trans_set_rate_limit, NULL); >> + >> + return s->file; >> +} >> diff --git a/ft_trans_file.h b/ft_trans_file.h >> new file mode 100644 >> index 0000000..5ca6b53 >> --- /dev/null >> +++ b/ft_trans_file.h >> @@ -0,0 +1,72 @@ >> +/* >> + * Fault tolerant VM transaction QEMUFile >> + * >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2. See >> + * the COPYING file in the top-level directory. >> + * >> + * This source code is based on buffered_file.h. >> + * Copyright IBM, Corp. 2008 >> + * Authors: >> + * Anthony Liguori <aliguori@us.ibm.com> >> + */ >> + >> +#ifndef QEMU_FT_TRANSACTION_FILE_H >> +#define QEMU_FT_TRANSACTION_FILE_H >> + >> +#include "hw/hw.h" >> + >> +enum QEMU_VM_TRANSACTION_STATE { >> + QEMU_VM_TRANSACTION_NACK = -1, >> + QEMU_VM_TRANSACTION_INIT, >> + QEMU_VM_TRANSACTION_BEGIN, >> + QEMU_VM_TRANSACTION_CONTINUE, >> + QEMU_VM_TRANSACTION_COMMIT, >> + QEMU_VM_TRANSACTION_CANCEL, >> + QEMU_VM_TRANSACTION_ATOMIC, >> + QEMU_VM_TRANSACTION_ACK, >> +}; >> + >> +enum FT_MODE { >> + FT_ERROR = -1, >> + FT_OFF, >> + FT_INIT, >> + FT_TRANSACTION_BEGIN, >> + FT_TRANSACTION_ITER, >> + FT_TRANSACTION_COMMIT, >> + FT_TRANSACTION_ATOMIC, >> + FT_TRANSACTION_RECV, >> +}; >> +extern enum FT_MODE ft_mode; >> + >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */ >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ >> + >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, >> size_t size); >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t >> pos, size_t size); >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec >> *iov, int iovcnt); >> +typedef int (FtTransPutReadyFunc)(void); >> +typedef int (FtTransGetReadyFunc)(void *opaque); >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); >> +typedef int (FtTransCloseFunc)(void *opaque); >> + >> +int ft_trans_begin(void *opaque); >> +int ft_trans_commit(void *opaque); >> +int ft_trans_cancel(void *opaque); >> + >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> + FtTransPutBufferFunc *put_buffer, >> + FtTransGetBufferFunc *get_buffer, >> + FtTransPutReadyFunc *put_ready, >> + FtTransGetReadyFunc *get_ready, >> + FtTransWaitForUnfreezeFunc >> *wait_for_unfreeze, >> + FtTransCloseFunc *close, >> + bool is_sender); >> + >> +#endif >> diff --git a/migration.c b/migration.c >> index dd3bf94..c5e0146 100644 >> --- a/migration.c >> +++ b/migration.c >> @@ -15,6 +15,7 @@ >> #include "migration.h" >> #include "monitor.h" >> #include "buffered_file.h" >> +#include "ft_trans_file.h" >> #include "sysemu.h" >> #include "block.h" >> #include "qemu_socket.h" >> @@ -31,6 +32,8 @@ >> do { } while (0) >> #endif >> >> +enum FT_MODE ft_mode = FT_OFF; >> + >> /* Migration speed throttling */ >> static int64_t max_throttle = (32 << 20); >> >> diff --git a/trace-events b/trace-events >> index e6138ea..50ac840 100644 >> --- a/trace-events >> +++ b/trace-events >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice >> wrottn %lu of requested %zd >> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested >> %zd" >> disable spice_vmc_register_interface(void *scd) "spice vmc registered >> interface %p" >> disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered >> interface %p" >> + >> +# ft_trans_file.c >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing >> buffer from %zu by %zu" >> +disable ft_trans_append(size_t size) "buffering %zu bytes" >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu >> bytes" >> +disable ft_trans_send_header(uint16_t cmd) "send header %d" >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes >> at %"PRId64"" >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) >> "recv %d of %d total %d" >> +disable ft_trans_close(void) "closing" >> +disable ft_trans_freeze_output(void) "backend not ready, freezing output" >> +disable ft_trans_freeze_input(void) "backend not ready, freezing input" >> +disable ft_trans_put_ready(void) "file is ready to put" >> +disable ft_trans_get_ready(void) "file is ready to get" >> +disable ft_trans_cb(void *cb) "callback %p" >> -- >> 1.7.1.2 >> >> -- >> To unsubscribe from this list: send the line "unsubscribe kvm" in >> the body of a message to majordomo@vger.kernel.org >> More majordomo info at http://vger.kernel.org/majordomo-info.html > >
Yoshi: thanks for your explaining. if you introduce a new stage as 3, I think stage 1 also need to change as it will mark all pages dirty. looking forward to your new patch update. Green. 2011/2/21 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > Hi Green, > > 2011/2/21 ya su <suya94335@gmail.com>: > > Yoshiaki: > > > > I have one question about ram_save_live, during migration 3 > > stage(completation stage), it will call > > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty > pages. > > at the end of migrate_ft_trans_connect function, it will invoke > vm_start(), > > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called > yet, > > so there may have some ram pages not recorded when > qemu_savevm_trans_begin > > is called. I think you need calll > > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect > > function, Am I right? > > Thank you for taking a look. > When qemu_savevm_trans_begin is called for the first time, it > calls ram_save_live with stage 1, that sends all pages and sets > dirty tracking, so there won't be missing pages. Note that > event-tap is turned on by then, meaning no outputs are sent before > finishing the first transaction. I understand that this > implementation is inefficient, and planning to introduce a new > stage that is almost same as stage 3 but keeps dirty tracking in > the future. > > Thanks, > > Yoshi > > > > > BR > > > > Green. > > > > > > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > >> > >> This code implements VM transaction protocol. Like buffered_file, it > >> sits between savevm and migration layer. With this architecture, VM > >> transaction protocol is implemented mostly independent from other > >> existing code. > >> > >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> > >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> > >> --- > >> Makefile.objs | 1 + > >> ft_trans_file.c | 624 > >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ > >> ft_trans_file.h | 72 +++++++ > >> migration.c | 3 + > >> trace-events | 15 ++ > >> 5 files changed, 715 insertions(+), 0 deletions(-) > >> create mode 100644 ft_trans_file.c > >> create mode 100644 ft_trans_file.h > >> > >> diff --git a/Makefile.objs b/Makefile.objs > >> index 353b1a8..04148b5 100644 > >> --- a/Makefile.objs > >> +++ b/Makefile.objs > >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o > >> common-obj-y += qdev.o qdev-properties.o > >> common-obj-y += block-migration.o > >> common-obj-y += pflib.o > >> +common-obj-y += ft_trans_file.o > >> > >> common-obj-$(CONFIG_BRLAPI) += baum.o > >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o > >> migration-fd.o > >> diff --git a/ft_trans_file.c b/ft_trans_file.c > >> new file mode 100644 > >> index 0000000..2b42b95 > >> --- /dev/null > >> +++ b/ft_trans_file.c > >> @@ -0,0 +1,624 @@ > >> +/* > >> + * Fault tolerant VM transaction QEMUFile > >> + * > >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. > >> + * > >> + * This work is licensed under the terms of the GNU GPL, version 2. > See > >> + * the COPYING file in the top-level directory. > >> + * > >> + * This source code is based on buffered_file.c. > >> + * Copyright IBM, Corp. 2008 > >> + * Authors: > >> + * Anthony Liguori <aliguori@us.ibm.com> > >> + */ > >> + > >> +#include "qemu-common.h" > >> +#include "qemu-error.h" > >> +#include "hw/hw.h" > >> +#include "qemu-timer.h" > >> +#include "sysemu.h" > >> +#include "qemu-char.h" > >> +#include "trace.h" > >> +#include "ft_trans_file.h" > >> + > >> +typedef struct FtTransHdr > >> +{ > >> + uint16_t cmd; > >> + uint16_t id; > >> + uint32_t seq; > >> + uint32_t payload_len; > >> +} FtTransHdr; > >> + > >> +typedef struct QEMUFileFtTrans > >> +{ > >> + FtTransPutBufferFunc *put_buffer; > >> + FtTransGetBufferFunc *get_buffer; > >> + FtTransPutReadyFunc *put_ready; > >> + FtTransGetReadyFunc *get_ready; > >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; > >> + FtTransCloseFunc *close; > >> + void *opaque; > >> + QEMUFile *file; > >> + > >> + enum QEMU_VM_TRANSACTION_STATE state; > >> + uint32_t seq; > >> + uint16_t id; > >> + > >> + int has_error; > >> + > >> + bool freeze_output; > >> + bool freeze_input; > >> + bool rate_limit; > >> + bool is_sender; > >> + bool is_payload; > >> + > >> + uint8_t *buf; > >> + size_t buf_max_size; > >> + size_t put_offset; > >> + size_t get_offset; > >> + > >> + FtTransHdr header; > >> + size_t header_offset; > >> +} QEMUFileFtTrans; > >> + > >> +#define IO_BUF_SIZE 32768 > >> + > >> +static void ft_trans_append(QEMUFileFtTrans *s, > >> + const uint8_t *buf, size_t size) > >> +{ > >> + if (size > (s->buf_max_size - s->put_offset)) { > >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024); > >> + s->buf_max_size += size + 1024; > >> + s->buf = qemu_realloc(s->buf, s->buf_max_size); > >> + } > >> + > >> + trace_ft_trans_append(size); > >> + memcpy(s->buf + s->put_offset, buf, size); > >> + s->put_offset += size; > >> +} > >> + > >> +static void ft_trans_flush(QEMUFileFtTrans *s) > >> +{ > >> + size_t offset = 0; > >> + > >> + if (s->has_error) { > >> + error_report("flush when error %d, bailing", s->has_error); > >> + return; > >> + } > >> + > >> + while (offset < s->put_offset) { > >> + ssize_t ret; > >> + > >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - > >> offset); > >> + if (ret == -EAGAIN) { > >> + break; > >> + } > >> + > >> + if (ret <= 0) { > >> + error_report("error flushing data, %s", strerror(errno)); > >> + s->has_error = FT_TRANS_ERR_FLUSH; > >> + break; > >> + } else { > >> + offset += ret; > >> + } > >> + } > >> + > >> + trace_ft_trans_flush(offset, s->put_offset); > >> + memmove(s->buf, s->buf + offset, s->put_offset - offset); > >> + s->put_offset -= offset; > >> + s->freeze_output = !!s->put_offset; > >> +} > >> + > >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + size_t offset = 0; > >> + ssize_t len; > >> + > >> + /* flush buffered data before putting next */ > >> + if (s->put_offset) { > >> + ft_trans_flush(s); > >> + } > >> + > >> + while (!s->freeze_output && offset < size) { > >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - > >> offset); > >> + > >> + if (len == -EAGAIN) { > >> + trace_ft_trans_freeze_output(); > >> + s->freeze_output = 1; > >> + break; > >> + } > >> + > >> + if (len <= 0) { > >> + error_report("putting data failed, %s", strerror(errno)); > >> + s->has_error = 1; > >> + offset = -EINVAL; > >> + break; > >> + } > >> + > >> + offset += len; > >> + } > >> + > >> + if (s->freeze_output) { > >> + ft_trans_append(s, buf + offset, size - offset); > >> + offset = size; > >> + } > >> + > >> + return offset; > >> +} > >> + > >> +static int ft_trans_send_header(QEMUFileFtTrans *s, > >> + enum QEMU_VM_TRANSACTION_STATE state, > >> + uint32_t payload_len) > >> +{ > >> + int ret; > >> + FtTransHdr *hdr = &s->header; > >> + > >> + trace_ft_trans_send_header(state); > >> + > >> + hdr->cmd = s->state = state; > >> + hdr->id = s->id; > >> + hdr->seq = s->seq; > >> + hdr->payload_len = payload_len; > >> + > >> + ret = ft_trans_put(s, hdr, sizeof(*hdr)); > >> + if (ret < 0) { > >> + error_report("send header failed"); > >> + s->has_error = FT_TRANS_ERR_SEND_HDR; > >> + } > >> + > >> + return ret; > >> +} > >> + > >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, > int64_t > >> pos, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + ssize_t ret; > >> + > >> + trace_ft_trans_put_buffer(size, pos); > >> + > >> + if (s->has_error) { > >> + error_report("put_buffer when error %d, bailing", > s->has_error); > >> + return -EINVAL; > >> + } > >> + > >> + /* assuming qemu_file_put_notify() is calling */ > >> + if (pos == 0 && size == 0) { > >> + trace_ft_trans_put_ready(); > >> + ft_trans_flush(s); > >> + > >> + if (!s->freeze_output) { > >> + trace_ft_trans_cb(s->put_ready); > >> + ret = s->put_ready(); > >> + } > >> + > >> + ret = 0; > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_put(s, (uint8_t *)buf, size); > >> + if (ret < 0) { > >> + error_report("send palyload failed"); > >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; > >> + goto out; > >> + } > >> + > >> + s->seq++; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + size_t offset = 0; > >> + ssize_t len; > >> + > >> + s->freeze_input = 0; > >> + > >> + while (offset < size) { > >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, > >> + 0, size - offset); > >> + if (len == -EAGAIN) { > >> + trace_ft_trans_freeze_input(); > >> + s->freeze_input = 1; > >> + break; > >> + } > >> + > >> + if (len <= 0) { > >> + error_report("fill buffer failed, %s", strerror(errno)); > >> + s->has_error = 1; > >> + return -EINVAL; > >> + } > >> + > >> + offset += len; > >> + } > >> + > >> + return offset; > >> +} > >> + > >> +static int ft_trans_recv_header(QEMUFileFtTrans *s) > >> +{ > >> + int ret; > >> + char *buf = (char *)&s->header + s->header_offset; > >> + > >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - > >> s->header_offset); > >> + if (ret < 0) { > >> + error_report("recv header failed"); > >> + s->has_error = FT_TRANS_ERR_RECV_HDR; > >> + goto out; > >> + } > >> + > >> + s->header_offset += ret; > >> + if (s->header_offset == sizeof(FtTransHdr)) { > >> + trace_ft_trans_recv_header(s->header.cmd); > >> + s->state = s->header.cmd; > >> + s->header_offset = 0; > >> + > >> + if (!s->is_sender) { > >> + s->id = s->header.id; > >> + s->seq = s->header.seq; > >> + } > >> + } > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s) > >> +{ > >> + QEMUFile *f = s->file; > >> + int ret = -1; > >> + > >> + /* extend QEMUFile buf if there weren't enough space */ > >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { > >> + s->buf_max_size += (s->header.payload_len - > >> + (s->buf_max_size - s->get_offset)); > >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size); > >> + } > >> + > >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, > >> + s->header.payload_len); > >> + if (ret < 0) { > >> + error_report("recv payload failed"); > >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; > >> + goto out; > >> + } > >> + > >> + trace_ft_trans_recv_payload(ret, s->header.payload_len, > >> s->get_offset); > >> + > >> + s->header.payload_len -= ret; > >> + s->get_offset += ret; > >> + s->is_payload = !!s->header.payload_len; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_recv(QEMUFileFtTrans *s) > >> +{ > >> + int ret; > >> + > >> + /* get payload and return */ > >> + if (s->is_payload) { > >> + ret = ft_trans_recv_payload(s); > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_recv_header(s); > >> + if (ret < 0 || s->freeze_input) { > >> + goto out; > >> + } > >> + > >> + switch (s->state) { > >> + case QEMU_VM_TRANSACTION_BEGIN: > >> + /* CONTINUE or COMMIT should come shortly */ > >> + s->is_payload = 0; > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_CONTINUE: > >> + /* get payload */ > >> + s->is_payload = 1; > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_COMMIT: > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + trace_ft_trans_cb(s->get_ready); > >> + ret = s->get_ready(s->opaque); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + qemu_clear_buffer(s->file); > >> + s->get_offset = 0; > >> + s->is_payload = 0; > >> + > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_ATOMIC: > >> + /* not implemented yet */ > >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", > >> + ret); > >> + break; > >> + > >> + case QEMU_VM_TRANSACTION_CANCEL: > >> + /* return -EINVAL until migrate cancel on recevier side is > >> supported */ > >> + ret = -EINVAL; > >> + break; > >> + > >> + default: > >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + } > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, > >> + int64_t pos, int size) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + > >> + if (s->has_error) { > >> + error_report("get_buffer when error %d, bailing", > s->has_error); > >> + return -EINVAL; > >> + } > >> + > >> + /* assuming qemu_file_get_notify() is calling */ > >> + if (pos == 0 && size == 0) { > >> + trace_ft_trans_get_ready(); > >> + s->freeze_input = 0; > >> + > >> + /* sender should be waiting for ACK */ > >> + if (s->is_sender) { > >> + ret = ft_trans_recv_header(s); > >> + if (s->freeze_input) { > >> + ret = 0; > >> + goto out; > >> + } > >> + if (ret < 0) { > >> + error_report("recv ack failed"); > >> + goto out; > >> + } > >> + > >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { > >> + error_report("recv invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + > >> + trace_ft_trans_cb(s->get_ready); > >> + ret = s->get_ready(s->opaque); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + /* proceed trans id */ > >> + s->id++; > >> + > >> + return 0; > >> + } > >> + > >> + /* set QEMUFile buf at beginning */ > >> + if (!s->buf) { > >> + s->buf = buf; > >> + } > >> + > >> + ret = ft_trans_recv(s); > >> + goto out; > >> + } > >> + > >> + ret = s->get_offset; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +static int ft_trans_close(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + > >> + trace_ft_trans_close(); > >> + ret = s->close(s->opaque); > >> + if (s->is_sender) { > >> + qemu_free(s->buf); > >> + } > >> + qemu_free(s); > >> + > >> + return ret; > >> +} > >> + > >> +static int ft_trans_rate_limit(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + > >> + if (s->has_error) { > >> + return 0; > >> + } > >> + > >> + if (s->rate_limit && s->freeze_output) { > >> + return 1; > >> + } > >> + > >> + return 0; > >> +} > >> + > >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + > >> + if (s->has_error) { > >> + goto out; > >> + } > >> + > >> + s->rate_limit = !!new_rate; > >> + > >> +out: > >> + return s->rate_limit; > >> +} > >> + > >> +int ft_trans_begin(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + s->seq = 0; > >> + > >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ > >> + if (!s->is_sender) { > >> + if (s->state != QEMU_VM_TRANSACTION_INIT) { > >> + error_report("invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + } > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > >> + goto out; > >> + } > >> + > >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ > >> + if (s->state == QEMU_VM_TRANSACTION_INIT) { > >> +retry: > >> + ret = ft_trans_recv_header(s); > >> + if (s->freeze_input) { > >> + goto retry; > >> + } > >> + if (ret < 0) { > >> + error_report("recv ack failed"); > >> + goto out; > >> + } > >> + > >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { > >> + error_report("recv invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + } > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + s->state = QEMU_VM_TRANSACTION_CONTINUE; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +int ft_trans_commit(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + int ret; > >> + > >> + if (!s->is_sender) { > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); > >> + goto out; > >> + } > >> + > >> + /* sender should flush buf before sending COMMIT */ > >> + qemu_fflush(s->file); > >> + > >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); > >> + if (ret < 0) { > >> + goto out; > >> + } > >> + > >> + while (!s->has_error && s->put_offset) { > >> + ft_trans_flush(s); > >> + if (s->freeze_output) { > >> + s->wait_for_unfreeze(s); > >> + } > >> + } > >> + > >> + if (s->has_error) { > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + > >> + ret = ft_trans_recv_header(s); > >> + if (s->freeze_input) { > >> + ret = -EAGAIN; > >> + goto out; > >> + } > >> + if (ret < 0) { > >> + error_report("recv ack failed"); > >> + goto out; > >> + } > >> + > >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { > >> + error_report("recv invalid state %d", s->state); > >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; > >> + ret = -EINVAL; > >> + goto out; > >> + } > >> + > >> + s->id++; > >> + ret = 0; > >> + > >> +out: > >> + return ret; > >> +} > >> + > >> +int ft_trans_cancel(void *opaque) > >> +{ > >> + QEMUFileFtTrans *s = opaque; > >> + > >> + /* invalid until migrate cancel on recevier side is supported */ > >> + if (!s->is_sender) { > >> + return -EINVAL; > >> + } > >> + > >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); > >> +} > >> + > >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, > >> + FtTransPutBufferFunc *put_buffer, > >> + FtTransGetBufferFunc *get_buffer, > >> + FtTransPutReadyFunc *put_ready, > >> + FtTransGetReadyFunc *get_ready, > >> + FtTransWaitForUnfreezeFunc > >> *wait_for_unfreeze, > >> + FtTransCloseFunc *close, > >> + bool is_sender) > >> +{ > >> + QEMUFileFtTrans *s; > >> + > >> + s = qemu_mallocz(sizeof(*s)); > >> + > >> + s->opaque = opaque; > >> + s->put_buffer = put_buffer; > >> + s->get_buffer = get_buffer; > >> + s->put_ready = put_ready; > >> + s->get_ready = get_ready; > >> + s->wait_for_unfreeze = wait_for_unfreeze; > >> + s->close = close; > >> + s->is_sender = is_sender; > >> + s->id = 0; > >> + s->seq = 0; > >> + s->rate_limit = 1; > >> + > >> + if (!s->is_sender) { > >> + s->buf_max_size = IO_BUF_SIZE; > >> + } > >> + > >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, > ft_trans_get_buffer, > >> + ft_trans_close, ft_trans_rate_limit, > >> + ft_trans_set_rate_limit, NULL); > >> + > >> + return s->file; > >> +} > >> diff --git a/ft_trans_file.h b/ft_trans_file.h > >> new file mode 100644 > >> index 0000000..5ca6b53 > >> --- /dev/null > >> +++ b/ft_trans_file.h > >> @@ -0,0 +1,72 @@ > >> +/* > >> + * Fault tolerant VM transaction QEMUFile > >> + * > >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. > >> + * > >> + * This work is licensed under the terms of the GNU GPL, version 2. > See > >> + * the COPYING file in the top-level directory. > >> + * > >> + * This source code is based on buffered_file.h. > >> + * Copyright IBM, Corp. 2008 > >> + * Authors: > >> + * Anthony Liguori <aliguori@us.ibm.com> > >> + */ > >> + > >> +#ifndef QEMU_FT_TRANSACTION_FILE_H > >> +#define QEMU_FT_TRANSACTION_FILE_H > >> + > >> +#include "hw/hw.h" > >> + > >> +enum QEMU_VM_TRANSACTION_STATE { > >> + QEMU_VM_TRANSACTION_NACK = -1, > >> + QEMU_VM_TRANSACTION_INIT, > >> + QEMU_VM_TRANSACTION_BEGIN, > >> + QEMU_VM_TRANSACTION_CONTINUE, > >> + QEMU_VM_TRANSACTION_COMMIT, > >> + QEMU_VM_TRANSACTION_CANCEL, > >> + QEMU_VM_TRANSACTION_ATOMIC, > >> + QEMU_VM_TRANSACTION_ACK, > >> +}; > >> + > >> +enum FT_MODE { > >> + FT_ERROR = -1, > >> + FT_OFF, > >> + FT_INIT, > >> + FT_TRANSACTION_BEGIN, > >> + FT_TRANSACTION_ITER, > >> + FT_TRANSACTION_COMMIT, > >> + FT_TRANSACTION_ATOMIC, > >> + FT_TRANSACTION_RECV, > >> +}; > >> +extern enum FT_MODE ft_mode; > >> + > >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ > >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ > >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ > >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ > >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ > >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed > */ > >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ > >> + > >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, > >> size_t size); > >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t > >> pos, size_t size); > >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec > >> *iov, int iovcnt); > >> +typedef int (FtTransPutReadyFunc)(void); > >> +typedef int (FtTransGetReadyFunc)(void *opaque); > >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); > >> +typedef int (FtTransCloseFunc)(void *opaque); > >> + > >> +int ft_trans_begin(void *opaque); > >> +int ft_trans_commit(void *opaque); > >> +int ft_trans_cancel(void *opaque); > >> + > >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, > >> + FtTransPutBufferFunc *put_buffer, > >> + FtTransGetBufferFunc *get_buffer, > >> + FtTransPutReadyFunc *put_ready, > >> + FtTransGetReadyFunc *get_ready, > >> + FtTransWaitForUnfreezeFunc > >> *wait_for_unfreeze, > >> + FtTransCloseFunc *close, > >> + bool is_sender); > >> + > >> +#endif > >> diff --git a/migration.c b/migration.c > >> index dd3bf94..c5e0146 100644 > >> --- a/migration.c > >> +++ b/migration.c > >> @@ -15,6 +15,7 @@ > >> #include "migration.h" > >> #include "monitor.h" > >> #include "buffered_file.h" > >> +#include "ft_trans_file.h" > >> #include "sysemu.h" > >> #include "block.h" > >> #include "qemu_socket.h" > >> @@ -31,6 +32,8 @@ > >> do { } while (0) > >> #endif > >> > >> +enum FT_MODE ft_mode = FT_OFF; > >> + > >> /* Migration speed throttling */ > >> static int64_t max_throttle = (32 << 20); > >> > >> diff --git a/trace-events b/trace-events > >> index e6138ea..50ac840 100644 > >> --- a/trace-events > >> +++ b/trace-events > >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) > "spice > >> wrottn %lu of requested %zd > >> disable spice_vmc_read(int bytes, int len) "spice read %lu of requested > >> %zd" > >> disable spice_vmc_register_interface(void *scd) "spice vmc registered > >> interface %p" > >> disable spice_vmc_unregister_interface(void *scd) "spice vmc > unregistered > >> interface %p" > >> + > >> +# ft_trans_file.c > >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing > >> buffer from %zu by %zu" > >> +disable ft_trans_append(size_t size) "buffering %zu bytes" > >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu > >> bytes" > >> +disable ft_trans_send_header(uint16_t cmd) "send header %d" > >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" > >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes > >> at %"PRId64"" > >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) > >> "recv %d of %d total %d" > >> +disable ft_trans_close(void) "closing" > >> +disable ft_trans_freeze_output(void) "backend not ready, freezing > output" > >> +disable ft_trans_freeze_input(void) "backend not ready, freezing input" > >> +disable ft_trans_put_ready(void) "file is ready to put" > >> +disable ft_trans_get_ready(void) "file is ready to get" > >> +disable ft_trans_cb(void *cb) "callback %p" > >> -- > >> 1.7.1.2 > >> > >> -- > >> To unsubscribe from this list: send the line "unsubscribe kvm" in > >> the body of a message to majordomo@vger.kernel.org > >> More majordomo info at http://vger.kernel.org/majordomo-info.html > > > > >
2011/2/23 ya su <suya94335@gmail.com>: > Yoshi: > > thanks for your explaining. > if you introduce a new stage as 3, I think stage 1 also need to change as > it will mark all pages dirty. > looking forward to your new patch update. Unless there're strong comments from others, I won't put it in this series though because I don't want to touch other components as much as possible this time. Yoshi > > Green. > > > 2011/2/21 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> Hi Green, >> >> 2011/2/21 ya su <suya94335@gmail.com>: >> > Yoshiaki: >> > >> > I have one question about ram_save_live, during migration 3 >> > stage(completation stage), it will call >> > cpu_physical_memory_set_dirty_tracking(0) to stop recording ram dirty >> > pages. >> > at the end of migrate_ft_trans_connect function, it will invoke >> > vm_start(), >> > at this time, cpu_physical_memory_set_dirty_tracking(1) is not called >> > yet, >> > so there may have some ram pages not recorded when >> > qemu_savevm_trans_begin >> > is called. I think you need calll >> > cpu_physical_memory_set_dirty_tracking(1) in migrate_ft_trans_connect >> > function, Am I right? >> >> Thank you for taking a look. >> When qemu_savevm_trans_begin is called for the first time, it >> calls ram_save_live with stage 1, that sends all pages and sets >> dirty tracking, so there won't be missing pages. Note that >> event-tap is turned on by then, meaning no outputs are sent before >> finishing the first transaction. I understand that this >> implementation is inefficient, and planning to introduce a new >> stage that is almost same as stage 3 but keeps dirty tracking in >> the future. >> >> Thanks, >> >> Yoshi >> >> > >> > BR >> > >> > Green. >> > >> > >> > 2011/2/10 Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> >> >> This code implements VM transaction protocol. Like buffered_file, it >> >> sits between savevm and migration layer. With this architecture, VM >> >> transaction protocol is implemented mostly independent from other >> >> existing code. >> >> >> >> Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> >> >> Signed-off-by: OHMURA Kei <ohmura.kei@lab.ntt.co.jp> >> >> --- >> >> Makefile.objs | 1 + >> >> ft_trans_file.c | 624 >> >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> >> ft_trans_file.h | 72 +++++++ >> >> migration.c | 3 + >> >> trace-events | 15 ++ >> >> 5 files changed, 715 insertions(+), 0 deletions(-) >> >> create mode 100644 ft_trans_file.c >> >> create mode 100644 ft_trans_file.h >> >> >> >> diff --git a/Makefile.objs b/Makefile.objs >> >> index 353b1a8..04148b5 100644 >> >> --- a/Makefile.objs >> >> +++ b/Makefile.objs >> >> @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o >> >> common-obj-y += qdev.o qdev-properties.o >> >> common-obj-y += block-migration.o >> >> common-obj-y += pflib.o >> >> +common-obj-y += ft_trans_file.o >> >> >> >> common-obj-$(CONFIG_BRLAPI) += baum.o >> >> common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o >> >> migration-fd.o >> >> diff --git a/ft_trans_file.c b/ft_trans_file.c >> >> new file mode 100644 >> >> index 0000000..2b42b95 >> >> --- /dev/null >> >> +++ b/ft_trans_file.c >> >> @@ -0,0 +1,624 @@ >> >> +/* >> >> + * Fault tolerant VM transaction QEMUFile >> >> + * >> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> >> + * >> >> + * This work is licensed under the terms of the GNU GPL, version 2. >> >> See >> >> + * the COPYING file in the top-level directory. >> >> + * >> >> + * This source code is based on buffered_file.c. >> >> + * Copyright IBM, Corp. 2008 >> >> + * Authors: >> >> + * Anthony Liguori <aliguori@us.ibm.com> >> >> + */ >> >> + >> >> +#include "qemu-common.h" >> >> +#include "qemu-error.h" >> >> +#include "hw/hw.h" >> >> +#include "qemu-timer.h" >> >> +#include "sysemu.h" >> >> +#include "qemu-char.h" >> >> +#include "trace.h" >> >> +#include "ft_trans_file.h" >> >> + >> >> +typedef struct FtTransHdr >> >> +{ >> >> + uint16_t cmd; >> >> + uint16_t id; >> >> + uint32_t seq; >> >> + uint32_t payload_len; >> >> +} FtTransHdr; >> >> + >> >> +typedef struct QEMUFileFtTrans >> >> +{ >> >> + FtTransPutBufferFunc *put_buffer; >> >> + FtTransGetBufferFunc *get_buffer; >> >> + FtTransPutReadyFunc *put_ready; >> >> + FtTransGetReadyFunc *get_ready; >> >> + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; >> >> + FtTransCloseFunc *close; >> >> + void *opaque; >> >> + QEMUFile *file; >> >> + >> >> + enum QEMU_VM_TRANSACTION_STATE state; >> >> + uint32_t seq; >> >> + uint16_t id; >> >> + >> >> + int has_error; >> >> + >> >> + bool freeze_output; >> >> + bool freeze_input; >> >> + bool rate_limit; >> >> + bool is_sender; >> >> + bool is_payload; >> >> + >> >> + uint8_t *buf; >> >> + size_t buf_max_size; >> >> + size_t put_offset; >> >> + size_t get_offset; >> >> + >> >> + FtTransHdr header; >> >> + size_t header_offset; >> >> +} QEMUFileFtTrans; >> >> + >> >> +#define IO_BUF_SIZE 32768 >> >> + >> >> +static void ft_trans_append(QEMUFileFtTrans *s, >> >> + const uint8_t *buf, size_t size) >> >> +{ >> >> + if (size > (s->buf_max_size - s->put_offset)) { >> >> + trace_ft_trans_realloc(s->buf_max_size, size + 1024); >> >> + s->buf_max_size += size + 1024; >> >> + s->buf = qemu_realloc(s->buf, s->buf_max_size); >> >> + } >> >> + >> >> + trace_ft_trans_append(size); >> >> + memcpy(s->buf + s->put_offset, buf, size); >> >> + s->put_offset += size; >> >> +} >> >> + >> >> +static void ft_trans_flush(QEMUFileFtTrans *s) >> >> +{ >> >> + size_t offset = 0; >> >> + >> >> + if (s->has_error) { >> >> + error_report("flush when error %d, bailing", s->has_error); >> >> + return; >> >> + } >> >> + >> >> + while (offset < s->put_offset) { >> >> + ssize_t ret; >> >> + >> >> + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset >> >> - >> >> offset); >> >> + if (ret == -EAGAIN) { >> >> + break; >> >> + } >> >> + >> >> + if (ret <= 0) { >> >> + error_report("error flushing data, %s", strerror(errno)); >> >> + s->has_error = FT_TRANS_ERR_FLUSH; >> >> + break; >> >> + } else { >> >> + offset += ret; >> >> + } >> >> + } >> >> + >> >> + trace_ft_trans_flush(offset, s->put_offset); >> >> + memmove(s->buf, s->buf + offset, s->put_offset - offset); >> >> + s->put_offset -= offset; >> >> + s->freeze_output = !!s->put_offset; >> >> +} >> >> + >> >> +static ssize_t ft_trans_put(void *opaque, void *buf, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + size_t offset = 0; >> >> + ssize_t len; >> >> + >> >> + /* flush buffered data before putting next */ >> >> + if (s->put_offset) { >> >> + ft_trans_flush(s); >> >> + } >> >> + >> >> + while (!s->freeze_output && offset < size) { >> >> + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - >> >> offset); >> >> + >> >> + if (len == -EAGAIN) { >> >> + trace_ft_trans_freeze_output(); >> >> + s->freeze_output = 1; >> >> + break; >> >> + } >> >> + >> >> + if (len <= 0) { >> >> + error_report("putting data failed, %s", strerror(errno)); >> >> + s->has_error = 1; >> >> + offset = -EINVAL; >> >> + break; >> >> + } >> >> + >> >> + offset += len; >> >> + } >> >> + >> >> + if (s->freeze_output) { >> >> + ft_trans_append(s, buf + offset, size - offset); >> >> + offset = size; >> >> + } >> >> + >> >> + return offset; >> >> +} >> >> + >> >> +static int ft_trans_send_header(QEMUFileFtTrans *s, >> >> + enum QEMU_VM_TRANSACTION_STATE state, >> >> + uint32_t payload_len) >> >> +{ >> >> + int ret; >> >> + FtTransHdr *hdr = &s->header; >> >> + >> >> + trace_ft_trans_send_header(state); >> >> + >> >> + hdr->cmd = s->state = state; >> >> + hdr->id = s->id; >> >> + hdr->seq = s->seq; >> >> + hdr->payload_len = payload_len; >> >> + >> >> + ret = ft_trans_put(s, hdr, sizeof(*hdr)); >> >> + if (ret < 0) { >> >> + error_report("send header failed"); >> >> + s->has_error = FT_TRANS_ERR_SEND_HDR; >> >> + } >> >> + >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, >> >> int64_t >> >> pos, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + ssize_t ret; >> >> + >> >> + trace_ft_trans_put_buffer(size, pos); >> >> + >> >> + if (s->has_error) { >> >> + error_report("put_buffer when error %d, bailing", >> >> s->has_error); >> >> + return -EINVAL; >> >> + } >> >> + >> >> + /* assuming qemu_file_put_notify() is calling */ >> >> + if (pos == 0 && size == 0) { >> >> + trace_ft_trans_put_ready(); >> >> + ft_trans_flush(s); >> >> + >> >> + if (!s->freeze_output) { >> >> + trace_ft_trans_cb(s->put_ready); >> >> + ret = s->put_ready(); >> >> + } >> >> + >> >> + ret = 0; >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_put(s, (uint8_t *)buf, size); >> >> + if (ret < 0) { >> >> + error_report("send palyload failed"); >> >> + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; >> >> + goto out; >> >> + } >> >> + >> >> + s->seq++; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + size_t offset = 0; >> >> + ssize_t len; >> >> + >> >> + s->freeze_input = 0; >> >> + >> >> + while (offset < size) { >> >> + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, >> >> + 0, size - offset); >> >> + if (len == -EAGAIN) { >> >> + trace_ft_trans_freeze_input(); >> >> + s->freeze_input = 1; >> >> + break; >> >> + } >> >> + >> >> + if (len <= 0) { >> >> + error_report("fill buffer failed, %s", strerror(errno)); >> >> + s->has_error = 1; >> >> + return -EINVAL; >> >> + } >> >> + >> >> + offset += len; >> >> + } >> >> + >> >> + return offset; >> >> +} >> >> + >> >> +static int ft_trans_recv_header(QEMUFileFtTrans *s) >> >> +{ >> >> + int ret; >> >> + char *buf = (char *)&s->header + s->header_offset; >> >> + >> >> + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - >> >> s->header_offset); >> >> + if (ret < 0) { >> >> + error_report("recv header failed"); >> >> + s->has_error = FT_TRANS_ERR_RECV_HDR; >> >> + goto out; >> >> + } >> >> + >> >> + s->header_offset += ret; >> >> + if (s->header_offset == sizeof(FtTransHdr)) { >> >> + trace_ft_trans_recv_header(s->header.cmd); >> >> + s->state = s->header.cmd; >> >> + s->header_offset = 0; >> >> + >> >> + if (!s->is_sender) { >> >> + s->id = s->header.id; >> >> + s->seq = s->header.seq; >> >> + } >> >> + } >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_recv_payload(QEMUFileFtTrans *s) >> >> +{ >> >> + QEMUFile *f = s->file; >> >> + int ret = -1; >> >> + >> >> + /* extend QEMUFile buf if there weren't enough space */ >> >> + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { >> >> + s->buf_max_size += (s->header.payload_len - >> >> + (s->buf_max_size - s->get_offset)); >> >> + s->buf = qemu_realloc_buffer(f, s->buf_max_size); >> >> + } >> >> + >> >> + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, >> >> + s->header.payload_len); >> >> + if (ret < 0) { >> >> + error_report("recv payload failed"); >> >> + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; >> >> + goto out; >> >> + } >> >> + >> >> + trace_ft_trans_recv_payload(ret, s->header.payload_len, >> >> s->get_offset); >> >> + >> >> + s->header.payload_len -= ret; >> >> + s->get_offset += ret; >> >> + s->is_payload = !!s->header.payload_len; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_recv(QEMUFileFtTrans *s) >> >> +{ >> >> + int ret; >> >> + >> >> + /* get payload and return */ >> >> + if (s->is_payload) { >> >> + ret = ft_trans_recv_payload(s); >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_recv_header(s); >> >> + if (ret < 0 || s->freeze_input) { >> >> + goto out; >> >> + } >> >> + >> >> + switch (s->state) { >> >> + case QEMU_VM_TRANSACTION_BEGIN: >> >> + /* CONTINUE or COMMIT should come shortly */ >> >> + s->is_payload = 0; >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_CONTINUE: >> >> + /* get payload */ >> >> + s->is_payload = 1; >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_COMMIT: >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + trace_ft_trans_cb(s->get_ready); >> >> + ret = s->get_ready(s->opaque); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + qemu_clear_buffer(s->file); >> >> + s->get_offset = 0; >> >> + s->is_payload = 0; >> >> + >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_ATOMIC: >> >> + /* not implemented yet */ >> >> + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", >> >> + ret); >> >> + break; >> >> + >> >> + case QEMU_VM_TRANSACTION_CANCEL: >> >> + /* return -EINVAL until migrate cancel on recevier side is >> >> supported */ >> >> + ret = -EINVAL; >> >> + break; >> >> + >> >> + default: >> >> + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + } >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, >> >> + int64_t pos, int size) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + >> >> + if (s->has_error) { >> >> + error_report("get_buffer when error %d, bailing", >> >> s->has_error); >> >> + return -EINVAL; >> >> + } >> >> + >> >> + /* assuming qemu_file_get_notify() is calling */ >> >> + if (pos == 0 && size == 0) { >> >> + trace_ft_trans_get_ready(); >> >> + s->freeze_input = 0; >> >> + >> >> + /* sender should be waiting for ACK */ >> >> + if (s->is_sender) { >> >> + ret = ft_trans_recv_header(s); >> >> + if (s->freeze_input) { >> >> + ret = 0; >> >> + goto out; >> >> + } >> >> + if (ret < 0) { >> >> + error_report("recv ack failed"); >> >> + goto out; >> >> + } >> >> + >> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> >> + error_report("recv invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + >> >> + trace_ft_trans_cb(s->get_ready); >> >> + ret = s->get_ready(s->opaque); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + /* proceed trans id */ >> >> + s->id++; >> >> + >> >> + return 0; >> >> + } >> >> + >> >> + /* set QEMUFile buf at beginning */ >> >> + if (!s->buf) { >> >> + s->buf = buf; >> >> + } >> >> + >> >> + ret = ft_trans_recv(s); >> >> + goto out; >> >> + } >> >> + >> >> + ret = s->get_offset; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_close(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + >> >> + trace_ft_trans_close(); >> >> + ret = s->close(s->opaque); >> >> + if (s->is_sender) { >> >> + qemu_free(s->buf); >> >> + } >> >> + qemu_free(s); >> >> + >> >> + return ret; >> >> +} >> >> + >> >> +static int ft_trans_rate_limit(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + >> >> + if (s->has_error) { >> >> + return 0; >> >> + } >> >> + >> >> + if (s->rate_limit && s->freeze_output) { >> >> + return 1; >> >> + } >> >> + >> >> + return 0; >> >> +} >> >> + >> >> +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + >> >> + if (s->has_error) { >> >> + goto out; >> >> + } >> >> + >> >> + s->rate_limit = !!new_rate; >> >> + >> >> +out: >> >> + return s->rate_limit; >> >> +} >> >> + >> >> +int ft_trans_begin(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + s->seq = 0; >> >> + >> >> + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ >> >> + if (!s->is_sender) { >> >> + if (s->state != QEMU_VM_TRANSACTION_INIT) { >> >> + error_report("invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + } >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> >> + goto out; >> >> + } >> >> + >> >> + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction >> >> */ >> >> + if (s->state == QEMU_VM_TRANSACTION_INIT) { >> >> +retry: >> >> + ret = ft_trans_recv_header(s); >> >> + if (s->freeze_input) { >> >> + goto retry; >> >> + } >> >> + if (ret < 0) { >> >> + error_report("recv ack failed"); >> >> + goto out; >> >> + } >> >> + >> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> >> + error_report("recv invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + } >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + s->state = QEMU_VM_TRANSACTION_CONTINUE; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +int ft_trans_commit(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + int ret; >> >> + >> >> + if (!s->is_sender) { >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); >> >> + goto out; >> >> + } >> >> + >> >> + /* sender should flush buf before sending COMMIT */ >> >> + qemu_fflush(s->file); >> >> + >> >> + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); >> >> + if (ret < 0) { >> >> + goto out; >> >> + } >> >> + >> >> + while (!s->has_error && s->put_offset) { >> >> + ft_trans_flush(s); >> >> + if (s->freeze_output) { >> >> + s->wait_for_unfreeze(s); >> >> + } >> >> + } >> >> + >> >> + if (s->has_error) { >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + >> >> + ret = ft_trans_recv_header(s); >> >> + if (s->freeze_input) { >> >> + ret = -EAGAIN; >> >> + goto out; >> >> + } >> >> + if (ret < 0) { >> >> + error_report("recv ack failed"); >> >> + goto out; >> >> + } >> >> + >> >> + if (s->state != QEMU_VM_TRANSACTION_ACK) { >> >> + error_report("recv invalid state %d", s->state); >> >> + s->has_error = FT_TRANS_ERR_STATE_INVALID; >> >> + ret = -EINVAL; >> >> + goto out; >> >> + } >> >> + >> >> + s->id++; >> >> + ret = 0; >> >> + >> >> +out: >> >> + return ret; >> >> +} >> >> + >> >> +int ft_trans_cancel(void *opaque) >> >> +{ >> >> + QEMUFileFtTrans *s = opaque; >> >> + >> >> + /* invalid until migrate cancel on recevier side is supported */ >> >> + if (!s->is_sender) { >> >> + return -EINVAL; >> >> + } >> >> + >> >> + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); >> >> +} >> >> + >> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> >> + FtTransPutBufferFunc *put_buffer, >> >> + FtTransGetBufferFunc *get_buffer, >> >> + FtTransPutReadyFunc *put_ready, >> >> + FtTransGetReadyFunc *get_ready, >> >> + FtTransWaitForUnfreezeFunc >> >> *wait_for_unfreeze, >> >> + FtTransCloseFunc *close, >> >> + bool is_sender) >> >> +{ >> >> + QEMUFileFtTrans *s; >> >> + >> >> + s = qemu_mallocz(sizeof(*s)); >> >> + >> >> + s->opaque = opaque; >> >> + s->put_buffer = put_buffer; >> >> + s->get_buffer = get_buffer; >> >> + s->put_ready = put_ready; >> >> + s->get_ready = get_ready; >> >> + s->wait_for_unfreeze = wait_for_unfreeze; >> >> + s->close = close; >> >> + s->is_sender = is_sender; >> >> + s->id = 0; >> >> + s->seq = 0; >> >> + s->rate_limit = 1; >> >> + >> >> + if (!s->is_sender) { >> >> + s->buf_max_size = IO_BUF_SIZE; >> >> + } >> >> + >> >> + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, >> >> ft_trans_get_buffer, >> >> + ft_trans_close, ft_trans_rate_limit, >> >> + ft_trans_set_rate_limit, NULL); >> >> + >> >> + return s->file; >> >> +} >> >> diff --git a/ft_trans_file.h b/ft_trans_file.h >> >> new file mode 100644 >> >> index 0000000..5ca6b53 >> >> --- /dev/null >> >> +++ b/ft_trans_file.h >> >> @@ -0,0 +1,72 @@ >> >> +/* >> >> + * Fault tolerant VM transaction QEMUFile >> >> + * >> >> + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. >> >> + * >> >> + * This work is licensed under the terms of the GNU GPL, version 2. >> >> See >> >> + * the COPYING file in the top-level directory. >> >> + * >> >> + * This source code is based on buffered_file.h. >> >> + * Copyright IBM, Corp. 2008 >> >> + * Authors: >> >> + * Anthony Liguori <aliguori@us.ibm.com> >> >> + */ >> >> + >> >> +#ifndef QEMU_FT_TRANSACTION_FILE_H >> >> +#define QEMU_FT_TRANSACTION_FILE_H >> >> + >> >> +#include "hw/hw.h" >> >> + >> >> +enum QEMU_VM_TRANSACTION_STATE { >> >> + QEMU_VM_TRANSACTION_NACK = -1, >> >> + QEMU_VM_TRANSACTION_INIT, >> >> + QEMU_VM_TRANSACTION_BEGIN, >> >> + QEMU_VM_TRANSACTION_CONTINUE, >> >> + QEMU_VM_TRANSACTION_COMMIT, >> >> + QEMU_VM_TRANSACTION_CANCEL, >> >> + QEMU_VM_TRANSACTION_ATOMIC, >> >> + QEMU_VM_TRANSACTION_ACK, >> >> +}; >> >> + >> >> +enum FT_MODE { >> >> + FT_ERROR = -1, >> >> + FT_OFF, >> >> + FT_INIT, >> >> + FT_TRANSACTION_BEGIN, >> >> + FT_TRANSACTION_ITER, >> >> + FT_TRANSACTION_COMMIT, >> >> + FT_TRANSACTION_ATOMIC, >> >> + FT_TRANSACTION_RECV, >> >> +}; >> >> +extern enum FT_MODE ft_mode; >> >> + >> >> +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ >> >> +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ >> >> +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ >> >> +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ >> >> +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ >> >> +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed >> >> */ >> >> +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ >> >> + >> >> +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, >> >> size_t size); >> >> +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t >> >> pos, size_t size); >> >> +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct >> >> iovec >> >> *iov, int iovcnt); >> >> +typedef int (FtTransPutReadyFunc)(void); >> >> +typedef int (FtTransGetReadyFunc)(void *opaque); >> >> +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); >> >> +typedef int (FtTransCloseFunc)(void *opaque); >> >> + >> >> +int ft_trans_begin(void *opaque); >> >> +int ft_trans_commit(void *opaque); >> >> +int ft_trans_cancel(void *opaque); >> >> + >> >> +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, >> >> + FtTransPutBufferFunc *put_buffer, >> >> + FtTransGetBufferFunc *get_buffer, >> >> + FtTransPutReadyFunc *put_ready, >> >> + FtTransGetReadyFunc *get_ready, >> >> + FtTransWaitForUnfreezeFunc >> >> *wait_for_unfreeze, >> >> + FtTransCloseFunc *close, >> >> + bool is_sender); >> >> + >> >> +#endif >> >> diff --git a/migration.c b/migration.c >> >> index dd3bf94..c5e0146 100644 >> >> --- a/migration.c >> >> +++ b/migration.c >> >> @@ -15,6 +15,7 @@ >> >> #include "migration.h" >> >> #include "monitor.h" >> >> #include "buffered_file.h" >> >> +#include "ft_trans_file.h" >> >> #include "sysemu.h" >> >> #include "block.h" >> >> #include "qemu_socket.h" >> >> @@ -31,6 +32,8 @@ >> >> do { } while (0) >> >> #endif >> >> >> >> +enum FT_MODE ft_mode = FT_OFF; >> >> + >> >> /* Migration speed throttling */ >> >> static int64_t max_throttle = (32 << 20); >> >> >> >> diff --git a/trace-events b/trace-events >> >> index e6138ea..50ac840 100644 >> >> --- a/trace-events >> >> +++ b/trace-events >> >> @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) >> >> "spice >> >> wrottn %lu of requested %zd >> >> disable spice_vmc_read(int bytes, int len) "spice read %lu of >> >> requested >> >> %zd" >> >> disable spice_vmc_register_interface(void *scd) "spice vmc registered >> >> interface %p" >> >> disable spice_vmc_unregister_interface(void *scd) "spice vmc >> >> unregistered >> >> interface %p" >> >> + >> >> +# ft_trans_file.c >> >> +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing >> >> buffer from %zu by %zu" >> >> +disable ft_trans_append(size_t size) "buffering %zu bytes" >> >> +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu >> >> bytes" >> >> +disable ft_trans_send_header(uint16_t cmd) "send header %d" >> >> +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" >> >> +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d >> >> bytes >> >> at %"PRId64"" >> >> +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) >> >> "recv %d of %d total %d" >> >> +disable ft_trans_close(void) "closing" >> >> +disable ft_trans_freeze_output(void) "backend not ready, freezing >> >> output" >> >> +disable ft_trans_freeze_input(void) "backend not ready, freezing >> >> input" >> >> +disable ft_trans_put_ready(void) "file is ready to put" >> >> +disable ft_trans_get_ready(void) "file is ready to get" >> >> +disable ft_trans_cb(void *cb) "callback %p" >> >> -- >> >> 1.7.1.2 >> >> >> >> -- >> >> To unsubscribe from this list: send the line "unsubscribe kvm" in >> >> the body of a message to majordomo@vger.kernel.org >> >> More majordomo info at http://vger.kernel.org/majordomo-info.html >> > >> > > >
Patch
diff --git a/Makefile.objs b/Makefile.objs index 353b1a8..04148b5 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o common-obj-y += qdev.o qdev-properties.o common-obj-y += block-migration.o common-obj-y += pflib.o +common-obj-y += ft_trans_file.o common-obj-$(CONFIG_BRLAPI) += baum.o common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o diff --git a/ft_trans_file.c b/ft_trans_file.c new file mode 100644 index 0000000..2b42b95 --- /dev/null +++ b/ft_trans_file.c @@ -0,0 +1,624 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.c. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + */ + +#include "qemu-common.h" +#include "qemu-error.h" +#include "hw/hw.h" +#include "qemu-timer.h" +#include "sysemu.h" +#include "qemu-char.h" +#include "trace.h" +#include "ft_trans_file.h" + +typedef struct FtTransHdr +{ + uint16_t cmd; + uint16_t id; + uint32_t seq; + uint32_t payload_len; +} FtTransHdr; + +typedef struct QEMUFileFtTrans +{ + FtTransPutBufferFunc *put_buffer; + FtTransGetBufferFunc *get_buffer; + FtTransPutReadyFunc *put_ready; + FtTransGetReadyFunc *get_ready; + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; + FtTransCloseFunc *close; + void *opaque; + QEMUFile *file; + + enum QEMU_VM_TRANSACTION_STATE state; + uint32_t seq; + uint16_t id; + + int has_error; + + bool freeze_output; + bool freeze_input; + bool rate_limit; + bool is_sender; + bool is_payload; + + uint8_t *buf; + size_t buf_max_size; + size_t put_offset; + size_t get_offset; + + FtTransHdr header; + size_t header_offset; +} QEMUFileFtTrans; + +#define IO_BUF_SIZE 32768 + +static void ft_trans_append(QEMUFileFtTrans *s, + const uint8_t *buf, size_t size) +{ + if (size > (s->buf_max_size - s->put_offset)) { + trace_ft_trans_realloc(s->buf_max_size, size + 1024); + s->buf_max_size += size + 1024; + s->buf = qemu_realloc(s->buf, s->buf_max_size); + } + + trace_ft_trans_append(size); + memcpy(s->buf + s->put_offset, buf, size); + s->put_offset += size; +} + +static void ft_trans_flush(QEMUFileFtTrans *s) +{ + size_t offset = 0; + + if (s->has_error) { + error_report("flush when error %d, bailing", s->has_error); + return; + } + + while (offset < s->put_offset) { + ssize_t ret; + + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - offset); + if (ret == -EAGAIN) { + break; + } + + if (ret <= 0) { + error_report("error flushing data, %s", strerror(errno)); + s->has_error = FT_TRANS_ERR_FLUSH; + break; + } else { + offset += ret; + } + } + + trace_ft_trans_flush(offset, s->put_offset); + memmove(s->buf, s->buf + offset, s->put_offset - offset); + s->put_offset -= offset; + s->freeze_output = !!s->put_offset; +} + +static ssize_t ft_trans_put(void *opaque, void *buf, int size) +{ + QEMUFileFtTrans *s = opaque; + size_t offset = 0; + ssize_t len; + + /* flush buffered data before putting next */ + if (s->put_offset) { + ft_trans_flush(s); + } + + while (!s->freeze_output && offset < size) { + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset); + + if (len == -EAGAIN) { + trace_ft_trans_freeze_output(); + s->freeze_output = 1; + break; + } + + if (len <= 0) { + error_report("putting data failed, %s", strerror(errno)); + s->has_error = 1; + offset = -EINVAL; + break; + } + + offset += len; + } + + if (s->freeze_output) { + ft_trans_append(s, buf + offset, size - offset); + offset = size; + } + + return offset; +} + +static int ft_trans_send_header(QEMUFileFtTrans *s, + enum QEMU_VM_TRANSACTION_STATE state, + uint32_t payload_len) +{ + int ret; + FtTransHdr *hdr = &s->header; + + trace_ft_trans_send_header(state); + + hdr->cmd = s->state = state; + hdr->id = s->id; + hdr->seq = s->seq; + hdr->payload_len = payload_len; + + ret = ft_trans_put(s, hdr, sizeof(*hdr)); + if (ret < 0) { + error_report("send header failed"); + s->has_error = FT_TRANS_ERR_SEND_HDR; + } + + return ret; +} + +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) +{ + QEMUFileFtTrans *s = opaque; + ssize_t ret; + + trace_ft_trans_put_buffer(size, pos); + + if (s->has_error) { + error_report("put_buffer when error %d, bailing", s->has_error); + return -EINVAL; + } + + /* assuming qemu_file_put_notify() is calling */ + if (pos == 0 && size == 0) { + trace_ft_trans_put_ready(); + ft_trans_flush(s); + + if (!s->freeze_output) { + trace_ft_trans_cb(s->put_ready); + ret = s->put_ready(); + } + + ret = 0; + goto out; + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); + if (ret < 0) { + goto out; + } + + ret = ft_trans_put(s, (uint8_t *)buf, size); + if (ret < 0) { + error_report("send palyload failed"); + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; + goto out; + } + + s->seq++; + +out: + return ret; +} + +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTrans *s = opaque; + size_t offset = 0; + ssize_t len; + + s->freeze_input = 0; + + while (offset < size) { + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, + 0, size - offset); + if (len == -EAGAIN) { + trace_ft_trans_freeze_input(); + s->freeze_input = 1; + break; + } + + if (len <= 0) { + error_report("fill buffer failed, %s", strerror(errno)); + s->has_error = 1; + return -EINVAL; + } + + offset += len; + } + + return offset; +} + +static int ft_trans_recv_header(QEMUFileFtTrans *s) +{ + int ret; + char *buf = (char *)&s->header + s->header_offset; + + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - s->header_offset); + if (ret < 0) { + error_report("recv header failed"); + s->has_error = FT_TRANS_ERR_RECV_HDR; + goto out; + } + + s->header_offset += ret; + if (s->header_offset == sizeof(FtTransHdr)) { + trace_ft_trans_recv_header(s->header.cmd); + s->state = s->header.cmd; + s->header_offset = 0; + + if (!s->is_sender) { + s->id = s->header.id; + s->seq = s->header.seq; + } + } + +out: + return ret; +} + +static int ft_trans_recv_payload(QEMUFileFtTrans *s) +{ + QEMUFile *f = s->file; + int ret = -1; + + /* extend QEMUFile buf if there weren't enough space */ + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { + s->buf_max_size += (s->header.payload_len - + (s->buf_max_size - s->get_offset)); + s->buf = qemu_realloc_buffer(f, s->buf_max_size); + } + + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, + s->header.payload_len); + if (ret < 0) { + error_report("recv payload failed"); + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; + goto out; + } + + trace_ft_trans_recv_payload(ret, s->header.payload_len, s->get_offset); + + s->header.payload_len -= ret; + s->get_offset += ret; + s->is_payload = !!s->header.payload_len; + +out: + return ret; +} + +static int ft_trans_recv(QEMUFileFtTrans *s) +{ + int ret; + + /* get payload and return */ + if (s->is_payload) { + ret = ft_trans_recv_payload(s); + goto out; + } + + ret = ft_trans_recv_header(s); + if (ret < 0 || s->freeze_input) { + goto out; + } + + switch (s->state) { + case QEMU_VM_TRANSACTION_BEGIN: + /* CONTINUE or COMMIT should come shortly */ + s->is_payload = 0; + break; + + case QEMU_VM_TRANSACTION_CONTINUE: + /* get payload */ + s->is_payload = 1; + break; + + case QEMU_VM_TRANSACTION_COMMIT: + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + if (ret < 0) { + goto out; + } + + trace_ft_trans_cb(s->get_ready); + ret = s->get_ready(s->opaque); + if (ret < 0) { + goto out; + } + + qemu_clear_buffer(s->file); + s->get_offset = 0; + s->is_payload = 0; + + break; + + case QEMU_VM_TRANSACTION_ATOMIC: + /* not implemented yet */ + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d", + ret); + break; + + case QEMU_VM_TRANSACTION_CANCEL: + /* return -EINVAL until migrate cancel on recevier side is supported */ + ret = -EINVAL; + break; + + default: + error_report("unknown QEMU_VM_TRANSACTION_STATE %d", ret); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + } + +out: + return ret; +} + +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + if (s->has_error) { + error_report("get_buffer when error %d, bailing", s->has_error); + return -EINVAL; + } + + /* assuming qemu_file_get_notify() is calling */ + if (pos == 0 && size == 0) { + trace_ft_trans_get_ready(); + s->freeze_input = 0; + + /* sender should be waiting for ACK */ + if (s->is_sender) { + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + ret = 0; + goto out; + } + if (ret < 0) { + error_report("recv ack failed"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + + trace_ft_trans_cb(s->get_ready); + ret = s->get_ready(s->opaque); + if (ret < 0) { + goto out; + } + + /* proceed trans id */ + s->id++; + + return 0; + } + + /* set QEMUFile buf at beginning */ + if (!s->buf) { + s->buf = buf; + } + + ret = ft_trans_recv(s); + goto out; + } + + ret = s->get_offset; + +out: + return ret; +} + +static int ft_trans_close(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + trace_ft_trans_close(); + ret = s->close(s->opaque); + if (s->is_sender) { + qemu_free(s->buf); + } + qemu_free(s); + + return ret; +} + +static int ft_trans_rate_limit(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + + if (s->has_error) { + return 0; + } + + if (s->rate_limit && s->freeze_output) { + return 1; + } + + return 0; +} + +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) +{ + QEMUFileFtTrans *s = opaque; + + if (s->has_error) { + goto out; + } + + s->rate_limit = !!new_rate; + +out: + return s->rate_limit; +} + +int ft_trans_begin(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + s->seq = 0; + + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ + if (!s->is_sender) { + if (s->state != QEMU_VM_TRANSACTION_INIT) { + error_report("invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + goto out; + } + + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ + if (s->state == QEMU_VM_TRANSACTION_INIT) { +retry: + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + goto retry; + } + if (ret < 0) { + error_report("recv ack failed"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); + if (ret < 0) { + goto out; + } + + s->state = QEMU_VM_TRANSACTION_CONTINUE; + +out: + return ret; +} + +int ft_trans_commit(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + if (!s->is_sender) { + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + goto out; + } + + /* sender should flush buf before sending COMMIT */ + qemu_fflush(s->file); + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); + if (ret < 0) { + goto out; + } + + while (!s->has_error && s->put_offset) { + ft_trans_flush(s); + if (s->freeze_output) { + s->wait_for_unfreeze(s); + } + } + + if (s->has_error) { + ret = -EINVAL; + goto out; + } + + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + ret = -EAGAIN; + goto out; + } + if (ret < 0) { + error_report("recv ack failed"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + + s->id++; + ret = 0; + +out: + return ret; +} + +int ft_trans_cancel(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + + /* invalid until migrate cancel on recevier side is supported */ + if (!s->is_sender) { + return -EINVAL; + } + + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); +} + +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, + FtTransPutBufferFunc *put_buffer, + FtTransGetBufferFunc *get_buffer, + FtTransPutReadyFunc *put_ready, + FtTransGetReadyFunc *get_ready, + FtTransWaitForUnfreezeFunc *wait_for_unfreeze, + FtTransCloseFunc *close, + bool is_sender) +{ + QEMUFileFtTrans *s; + + s = qemu_mallocz(sizeof(*s)); + + s->opaque = opaque; + s->put_buffer = put_buffer; + s->get_buffer = get_buffer; + s->put_ready = put_ready; + s->get_ready = get_ready; + s->wait_for_unfreeze = wait_for_unfreeze; + s->close = close; + s->is_sender = is_sender; + s->id = 0; + s->seq = 0; + s->rate_limit = 1; + + if (!s->is_sender) { + s->buf_max_size = IO_BUF_SIZE; + } + + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer, + ft_trans_close, ft_trans_rate_limit, + ft_trans_set_rate_limit, NULL); + + return s->file; +} diff --git a/ft_trans_file.h b/ft_trans_file.h new file mode 100644 index 0000000..5ca6b53 --- /dev/null +++ b/ft_trans_file.h @@ -0,0 +1,72 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.h. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + */ + +#ifndef QEMU_FT_TRANSACTION_FILE_H +#define QEMU_FT_TRANSACTION_FILE_H + +#include "hw/hw.h" + +enum QEMU_VM_TRANSACTION_STATE { + QEMU_VM_TRANSACTION_NACK = -1, + QEMU_VM_TRANSACTION_INIT, + QEMU_VM_TRANSACTION_BEGIN, + QEMU_VM_TRANSACTION_CONTINUE, + QEMU_VM_TRANSACTION_COMMIT, + QEMU_VM_TRANSACTION_CANCEL, + QEMU_VM_TRANSACTION_ATOMIC, + QEMU_VM_TRANSACTION_ACK, +}; + +enum FT_MODE { + FT_ERROR = -1, + FT_OFF, + FT_INIT, + FT_TRANSACTION_BEGIN, + FT_TRANSACTION_ITER, + FT_TRANSACTION_COMMIT, + FT_TRANSACTION_ATOMIC, + FT_TRANSACTION_RECV, +}; +extern enum FT_MODE ft_mode; + +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */ +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ + +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size); +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t pos, size_t size); +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt); +typedef int (FtTransPutReadyFunc)(void); +typedef int (FtTransGetReadyFunc)(void *opaque); +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); +typedef int (FtTransCloseFunc)(void *opaque); + +int ft_trans_begin(void *opaque); +int ft_trans_commit(void *opaque); +int ft_trans_cancel(void *opaque); + +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, + FtTransPutBufferFunc *put_buffer, + FtTransGetBufferFunc *get_buffer, + FtTransPutReadyFunc *put_ready, + FtTransGetReadyFunc *get_ready, + FtTransWaitForUnfreezeFunc *wait_for_unfreeze, + FtTransCloseFunc *close, + bool is_sender); + +#endif diff --git a/migration.c b/migration.c index dd3bf94..c5e0146 100644 --- a/migration.c +++ b/migration.c @@ -15,6 +15,7 @@ #include "migration.h" #include "monitor.h" #include "buffered_file.h" +#include "ft_trans_file.h" #include "sysemu.h" #include "block.h" #include "qemu_socket.h" @@ -31,6 +32,8 @@ do { } while (0) #endif +enum FT_MODE ft_mode = FT_OFF; + /* Migration speed throttling */ static int64_t max_throttle = (32 << 20); diff --git a/trace-events b/trace-events index e6138ea..50ac840 100644 --- a/trace-events +++ b/trace-events @@ -254,3 +254,18 @@ disable spice_vmc_write(ssize_t out, int len) "spice wrottn %lu of requested %zd disable spice_vmc_read(int bytes, int len) "spice read %lu of requested %zd" disable spice_vmc_register_interface(void *scd) "spice vmc registered interface %p" disable spice_vmc_unregister_interface(void *scd) "spice vmc unregistered interface %p" + +# ft_trans_file.c +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing buffer from %zu by %zu" +disable ft_trans_append(size_t size) "buffering %zu bytes" +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes" +disable ft_trans_send_header(uint16_t cmd) "send header %d" +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at %"PRId64"" +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) "recv %d of %d total %d" +disable ft_trans_close(void) "closing" +disable ft_trans_freeze_output(void) "backend not ready, freezing output" +disable ft_trans_freeze_input(void) "backend not ready, freezing input" +disable ft_trans_put_ready(void) "file is ready to put" +disable ft_trans_get_ready(void) "file is ready to get" +disable ft_trans_cb(void *cb) "callback %p"