diff mbox

[v2,09/43] Return path: Send responses from destination to source

Message ID 1407767399-3030-10-git-send-email-dgilbert@redhat.com
State New
Headers show

Commit Message

Dr. David Alan Gilbert Aug. 11, 2014, 2:29 p.m. UTC
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>

Add migrate_send_rp_message to send a message from destination to source along the return path.
  (It uses a mutex to let it be called from multiple threads)
Add migrate_send_rp_shut to send a 'shut' message to indicate
  the destination is finished with the RP.
Add migrate_send_rp_ack to send an 'ack' message
  Use it in the CMD_REQACK handler

Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 include/migration/migration.h | 18 ++++++++++++++++++
 migration.c                   | 41 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+)
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 173775b..12e640d 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -40,6 +40,13 @@  struct MigrationParams {
     bool shared;
 };
 
+/* Commands sent on the return path from destination to source*/
+enum mig_rpcomm_cmd {
+    MIG_RPCOMM_INVALID = 0,  /* Must be 0 */
+    MIG_RPCOMM_SHUT,         /* sibling will not send any more RP messages */
+    MIG_RPCOMM_ACK,          /* data (seq: be32 ) */
+    MIG_RPCOMM_AFTERLASTVALID
+};
 typedef struct MigrationState MigrationState;
 
 /* State for the incoming migration */
@@ -47,6 +54,7 @@  struct MigrationIncomingState {
     QEMUFile *file;
 
     QEMUFile *return_path;
+    QemuMutex      rp_mutex;    /* We send replies from multiple threads */
 };
 
 MigrationIncomingState *migration_incoming_get_current(void);
@@ -168,6 +176,16 @@  int64_t migrate_xbzrle_cache_size(void);
 
 int64_t xbzrle_cache_resize(int64_t new_size);
 
+/* Sending on the return path - generic and then for each message type */
+void migrate_send_rp_message(MigrationIncomingState *mis,
+                             enum mig_rpcomm_cmd cmd,
+                             uint16_t len, uint8_t *data);
+void migrate_send_rp_shut(MigrationIncomingState *mis,
+                          uint32_t value);
+void migrate_send_rp_ack(MigrationIncomingState *mis,
+                         uint32_t value);
+
+
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_load_hook(QEMUFile *f, uint64_t flags);
diff --git a/migration.c b/migration.c
index c203958..3e4e120 100644
--- a/migration.c
+++ b/migration.c
@@ -90,6 +90,7 @@  MigrationIncomingState *migration_incoming_state_init(QEMUFile* f)
 {
     mis_current = g_malloc0(sizeof(MigrationIncomingState));
     mis_current->file = f;
+    qemu_mutex_init(&mis_current->rp_mutex);
 
     return mis_current;
 }
@@ -100,6 +101,46 @@  void migration_incoming_state_destroy(void)
     mis_current = NULL;
 }
 
+/* Send a message on the return channel back to the source
+ * of the migration.
+ */
+void migrate_send_rp_message(MigrationIncomingState *mis,
+                             enum mig_rpcomm_cmd cmd,
+                             uint16_t len, uint8_t *data)
+{
+    DPRINTF("migrate_send_rp_message: cmd=%d, len=%d\n", (int)cmd, len);
+    qemu_mutex_lock(&mis->rp_mutex);
+    qemu_put_be16(mis->return_path, (unsigned int)cmd);
+    qemu_put_be16(mis->return_path, len);
+    qemu_put_buffer(mis->return_path, data, len);
+    qemu_fflush(mis->return_path);
+    qemu_mutex_unlock(&mis->rp_mutex);
+}
+
+/*
+ * Send a 'SHUT' message on the return channel with the given value
+ * to indicate that we've finished with the RP.  None-0 value indicates
+ * error.
+ */
+void migrate_send_rp_shut(MigrationIncomingState *mis,
+                          uint32_t value)
+{
+    uint32_t buf;
+
+    buf = cpu_to_be32(value);
+    migrate_send_rp_message(mis, MIG_RPCOMM_SHUT, 4, (uint8_t *)&buf);
+}
+
+/* Send an 'ACK' message on the return channel with the given value */
+void migrate_send_rp_ack(MigrationIncomingState *mis,
+                         uint32_t value)
+{
+    uint32_t buf;
+
+    buf = cpu_to_be32(value);
+    migrate_send_rp_message(mis, MIG_RPCOMM_ACK, 4, (uint8_t *)&buf);
+}
+
 void qemu_start_incoming_migration(const char *uri, Error **errp)
 {
     const char *p;