@@ -23,22 +23,174 @@ do { fprintf(stdout, "colo: " fmt , ## __VA_ARGS__); } while (0)
#define DPRINTF(fmt, ...) do {} while (0)
#endif
+enum {
+ COLO_READY = 0x46,
+
+ /*
+ * Checkpoint synchronizing points.
+ *
+ * Primary Secondary
+ * NEW @
+ * Suspend
+ * SUSPENDED @
+ * Suspend&Save state
+ * SEND @
+ * Send state Receive state
+ * RECEIVED @
+ * Flush network Load state
+ * LOADED @
+ * Resume Resume
+ *
+ * Start Comparing
+ * NOTE:
+ * 1) '@' who sends the message
+ * 2) Every sync-point is synchronized by two sides with only
+ * one handshake(single direction) for low-latency.
+ * If more strict synchronization is required, a opposite direction
+ * sync-point should be added.
+ * 3) Since sync-points are single direction, the remote side may
+ * go forward a lot when this side just receives the sync-point.
+ */
+ COLO_CHECKPOINT_NEW,
+ COLO_CHECKPOINT_SUSPENDED,
+ COLO_CHECKPOINT_SEND,
+ COLO_CHECKPOINT_RECEIVED,
+ COLO_CHECKPOINT_LOADED,
+};
+
static QEMUBH *colo_bh;
static Coroutine *colo;
+
+/* colo checkpoint control helper */
+static int colo_ctl_put(QEMUFile *f, uint64_t request)
+{
+ int ret = 0;
+
+ qemu_put_be64(f, request);
+ qemu_fflush(f);
+
+ ret = qemu_file_get_error(f);
+
+ return ret;
+}
+
+static int colo_ctl_get_value(QEMUFile *f, uint64_t *value)
+{
+ int ret = 0;
+ uint64_t temp;
+
+ temp = qemu_get_be64(f);
+
+ ret = qemu_file_get_error(f);
+ if (ret < 0) {
+ return -1;
+ }
+
+ *value = temp;
+ return 0;
+}
+
+static int colo_ctl_get(QEMUFile *f, uint64_t require)
+{
+ int ret;
+ uint64_t value;
+
+ ret = colo_ctl_get_value(f, &value);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (value != require) {
+ error_report("unexpected state! expected: %"PRIu64
+ ", received: %"PRIu64, require, value);
+ exit(1);
+ }
+
+ return ret;
+}
+
+static int do_colo_transaction(MigrationState *s, QEMUFile *control)
+{
+ int ret;
+
+ ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
+ if (ret < 0) {
+ goto out;
+ }
+
+ ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED);
+ if (ret < 0) {
+ goto out;
+ }
+
+ /* TODO: suspend and save vm state to colo buffer */
+
+ ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
+ if (ret < 0) {
+ goto out;
+ }
+
+ /* TODO: send vmstate to slave */
+
+ ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED);
+ if (ret < 0) {
+ goto out;
+ }
+ DPRINTF("got COLO_CHECKPOINT_RECEIVED\n");
+ ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED);
+ if (ret < 0) {
+ goto out;
+ }
+ DPRINTF("got COLO_CHECKPOINT_LOADED\n");
+
+ /* TODO: resume master */
+
+out:
+ return ret;
+}
+
static void *colo_thread(void *opaque)
{
MigrationState *s = opaque;
+ QEMUFile *colo_control = NULL;
+ int ret;
+
+ colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb");
+ if (!colo_control) {
+ error_report("Open colo_control failed!");
+ goto out;
+ }
+
+ /*
+ * Wait for slave finish loading vm states and enter COLO
+ * restore.
+ */
+ ret = colo_ctl_get(colo_control, COLO_READY);
+ if (ret < 0) {
+ goto out;
+ }
+ DPRINTF("get COLO_READY\n");
qemu_mutex_lock_iothread();
vm_start();
qemu_mutex_unlock_iothread();
DPRINTF("vm resume to run\n");
+ while (s->state == MIG_STATE_COLO) {
+ /* start a colo checkpoint */
+ if (do_colo_transaction(s, colo_control)) {
+ goto out;
+ }
+ }
- /*TODO: COLO checkpoint savevm loop*/
-
+out:
migrate_set_state(s, MIG_STATE_COLO, MIG_STATE_COMPLETED);
+
+ if (colo_control) {
+ qemu_fclose(colo_control);
+ }
+
qemu_mutex_lock_iothread();
qemu_bh_schedule(s->cleanup_bh);
qemu_mutex_unlock_iothread();
@@ -71,14 +223,87 @@ void colo_init_checkpointer(MigrationState *s)
qemu_bh_schedule(colo_bh);
}
+/*
+ * return:
+ * 0: start a checkpoint
+ * -1: some error happened, exit colo restore
+ */
+static int slave_wait_new_checkpoint(QEMUFile *f)
+{
+ int ret;
+ uint64_t cmd;
+
+ ret = colo_ctl_get_value(f, &cmd);
+ if (ret < 0) {
+ return -1;
+ }
+
+ switch (cmd) {
+ case COLO_CHECKPOINT_NEW:
+ return 0;
+ default:
+ return -1;
+ }
+}
+
void *colo_process_incoming_checkpoints(void *opaque)
{
+ struct colo_incoming *colo_in = opaque;
+ QEMUFile *f = colo_in->file;
+ int fd = qemu_get_fd(f);
+ QEMUFile *ctl = NULL;
+ int ret;
colo = qemu_coroutine_self();
assert(colo != NULL);
- /* TODO: COLO checkpoint restore loop */
+ ctl = qemu_fopen_socket(fd, "wb");
+ if (!ctl) {
+ error_report("Can't open incoming channel!");
+ goto out;
+ }
+ ret = colo_ctl_put(ctl, COLO_READY);
+ if (ret < 0) {
+ goto out;
+ }
+ /* TODO: in COLO mode, slave is runing, so start the vm */
+ while (true) {
+ if (slave_wait_new_checkpoint(f)) {
+ break;
+ }
+ /* TODO: suspend guest */
+ ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED);
+ if (ret < 0) {
+ goto out;
+ }
+
+ ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND);
+ if (ret < 0) {
+ goto out;
+ }
+ DPRINTF("Got COLO_CHECKPOINT_SEND\n");
+
+ /* TODO: read migration data into colo buffer */
+
+ ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED);
+ if (ret < 0) {
+ goto out;
+ }
+ DPRINTF("Recived vm state\n");
+
+ /* TODO: load vm state */
+
+ ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
+ if (ret < 0) {
+ goto out;
+ }
+}
+
+out:
colo = NULL;
+ if (ctl) {
+ qemu_fclose(ctl);
+ }
loadvm_exit_colo();
return NULL;