diff mbox

[v5,09/17] migration: Start of multiple fd work

Message ID 20170717134238.1966-10-quintela@redhat.com
State New
Headers show

Commit Message

Juan Quintela July 17, 2017, 1:42 p.m. UTC
We create new channels for each new thread created. We only send through
them a character to be sure that we are creating the channels in the
right order.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Split SocketArgs into incoming and outgoing args

Use UUID's on the initial message, so we are sure we are connecting to
the right channel.

Remove init semaphore.  Now that we use uuids on the init message, we
know that this is our channel.

Fix recv socket destwroy, we were destroying send channels.
This was very interesting, because we were using an unreferred object
without problems.

Move to struct of pointers
init channel sooner.
split recv thread creation.
listen on main thread
---
 migration/migration.c |   7 ++-
 migration/ram.c       | 118 ++++++++++++++++++++++++++++++++++++++++++--------
 migration/ram.h       |   2 +
 migration/socket.c    |  38 ++++++++++++++--
 migration/socket.h    |  10 +++++
 5 files changed, 152 insertions(+), 23 deletions(-)

Comments

Daniel P. Berrangé July 19, 2017, 1:56 p.m. UTC | #1
On Mon, Jul 17, 2017 at 03:42:30PM +0200, Juan Quintela wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> ---
>  migration/migration.c |   7 ++-
>  migration/ram.c       | 118 ++++++++++++++++++++++++++++++++++++++++++--------
>  migration/ram.h       |   2 +
>  migration/socket.c    |  38 ++++++++++++++--
>  migration/socket.h    |  10 +++++
>  5 files changed, 152 insertions(+), 23 deletions(-)
> 


> diff --git a/migration/ram.c b/migration/ram.c
> index 8e87533..b80f511 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -408,11 +413,38 @@ void multifd_save_cleanup(void)
>      multifd_send_state = NULL;
>  }
>  
> +/* Default uuid for multifd when qemu is not started with uuid */
> +static char multifd_uuid[] = "5c49fd7e-af88-4a07-b6e8-091fd696ad40";
> +/* strlen(multifd) + '-' + <channel id> + '-' +  UUID_FMT + '\0' */
> +#define MULTIFD_UUID_MSG (7 + 1 + 3 + 1 + UUID_FMT_LEN + 1)

> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    char string[MULTIFD_UUID_MSG];
> +    char *string_uuid;
> +    int res;
> +    bool exit = false;
>  
> -    while (true) {
> +    if (qemu_uuid_set) {
> +        string_uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +    } else {
> +        string_uuid = g_strdup(multifd_uuid);
> +    }
> +    res = snprintf(string, MULTIFD_UUID_MSG, "%s multifd %03d",
> +                   string_uuid, p->id);

Just use  g_strdup_printf() here and avoid the error prone
logically for calculating the "correct"  buffer size.

> +    g_free(string_uuid);
> +
> +    /* -1 due to the wonders of '\0' accounting */
> +    if (res != (MULTIFD_UUID_MSG - 1)) {
> +        error_report("Multifd UUID message '%s' is not of right length",
> +            string);
> +        exit = true;
> +    } else {
> +        qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &error_abort);

Ewwww, you can't have QEMU abort when there's an I/O error on the
a file descriptor. It needs to fail the migration cleanly.

> +    }
> +
> +    while (!exit) {
>          qemu_mutex_lock(&p->mutex);
>          if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);

> +gboolean multifd_new_channel(QIOChannel *ioc)
> +{
> +    int thread_count = migrate_multifd_threads();
> +    MultiFDRecvParams *p = g_new0(MultiFDRecvParams, 1);
> +    MigrationState *s = migrate_get_current();
> +    char string[MULTIFD_UUID_MSG];
> +    char string_uuid[UUID_FMT_LEN];
> +    char *uuid;
> +    int id;
> +
> +    qio_channel_read(ioc, string, sizeof(string), &error_abort);

Again, we can't abort QEMU on I/O errors

> +    sscanf(string, "%s multifd %03d", string_uuid, &id);
> +
> +    if (qemu_uuid_set) {
> +        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +    } else {
> +        uuid = g_strdup(multifd_uuid);
> +    }
> +    if (strcmp(string_uuid, uuid)) {
> +        error_report("multifd: received uuid '%s' and expected uuid '%s'",
> +                     string_uuid, uuid);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +        terminate_multifd_recv_threads();
> +        return FALSE;
> +    }
> +    g_free(uuid);
> +
> +    if (multifd_recv_state->params[id] != NULL) {
> +        error_report("multifd: received id '%d' is already setup'", id);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +        terminate_multifd_recv_threads();
> +        return FALSE;
> +    }
> +    qemu_mutex_init(&p->mutex);
> +    qemu_sem_init(&p->sem, 0);
> +    p->quit = false;
> +    p->id = id;
> +    p->c = ioc;
> +    atomic_set(&multifd_recv_state->params[id], p);
> +    qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
> +                       QEMU_THREAD_JOINABLE);
> +    multifd_recv_state->count++;
> +
> +    /* We need to return FALSE for the last channel */
> +    if (multifd_recv_state->count == thread_count) {
> +        return FALSE;
> +    } else {
> +        return TRUE;
> +    }
> +}
> +

> diff --git a/migration/socket.c b/migration/socket.c
> index 6195596..32a6b39 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -26,6 +26,38 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(recv));
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +    Error **errp;
> +} outgoing_args;
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> +    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
> +                                    outgoing_args.errp);

This is going to block the caller, which means if someonee
calls migrate_cancel it won't be possible to cleanup
any threads stuck in this connect call. It is preferrable
to use connect_async, and return the sioc immediately. THis
lets the callce close the sioc to cancel the connect attempt.

> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}
>  
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -96,6 +128,9 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>  
>      data->s = s;
> +    outgoing_args.saddr = saddr;
> +    outgoing_args.errp = errp;

If socket_start_outgoing_migration() is called multiple times, then
we're going to leak saddr.

Also 'errp' is pointing to stack memory in the caller, so you're
saving a pointer to a stack frame that will no longer be valid
once this method returns. So that doesn't look safe to me.

> +
>      if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
>          data->hostname = g_strdup(saddr->u.inet.host);
>      }
> @@ -106,7 +141,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
>  
>  void tcp_start_outgoing_migration(MigrationState *s,
> @@ -151,8 +185,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>  
>      qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-incoming");
>      result = migration_channel_process_incoming(QIO_CHANNEL(sioc));
> -    object_unref(OBJECT(sioc));
> -
>  out:
>      if (result == FALSE) {
>          /* Close listening socket as its no longer needed */
> diff --git a/migration/socket.h b/migration/socket.h
> index 6b91e9d..dabce0e 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -16,6 +16,16 @@
>  
>  #ifndef QEMU_MIGRATION_SOCKET_H
>  #define QEMU_MIGRATION_SOCKET_H
> +
> +#include "io/channel.h"
> +
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +
> +QIOChannel *socket_send_channel_create(void);
> +
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void tcp_start_incoming_migration(const char *host_port, Error **errp);
>  
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
> -- 
> 2.9.4
> 

Regards,
Daniel
Dr. David Alan Gilbert July 19, 2017, 5:35 p.m. UTC | #2
* Juan Quintela (quintela@redhat.com) wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.

That text is out of date isn't it?

> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> ---
>  migration/migration.c |   7 ++-
>  migration/ram.c       | 118 ++++++++++++++++++++++++++++++++++++++++++--------
>  migration/ram.h       |   2 +
>  migration/socket.c    |  38 ++++++++++++++--
>  migration/socket.h    |  10 +++++
>  5 files changed, 152 insertions(+), 23 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index b81c498..e1c79d5 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -389,8 +389,13 @@ gboolean migration_ioc_process_incoming(QIOChannel *ioc)
>          QEMUFile *f = qemu_fopen_channel_input(ioc);
>          mis->from_src_file = f;
>          migration_fd_process_incoming(f);
> +        if (!migrate_use_multifd()) {
> +            return FALSE;
> +        } else {
> +            return TRUE;
> +        }
>      }
> -    return FALSE; /* unregister */
> +    return multifd_new_channel(ioc);
>  }
>  
>  /*
> diff --git a/migration/ram.c b/migration/ram.c
> index 8e87533..b80f511 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -46,6 +47,8 @@
>  #include "exec/ram_addr.h"
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -361,6 +364,7 @@ static void compress_threads_save_setup(void)
>  struct MultiFDSendParams {
>      uint8_t id;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -401,6 +405,7 @@ void multifd_save_cleanup(void)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_send_channel_destroy(p->c);
>      }
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> @@ -408,11 +413,38 @@ void multifd_save_cleanup(void)
>      multifd_send_state = NULL;
>  }
>  
> +/* Default uuid for multifd when qemu is not started with uuid */
> +static char multifd_uuid[] = "5c49fd7e-af88-4a07-b6e8-091fd696ad40";
> +/* strlen(multifd) + '-' + <channel id> + '-' +  UUID_FMT + '\0' */
> +#define MULTIFD_UUID_MSG (7 + 1 + 3 + 1 + UUID_FMT_LEN + 1)
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    char string[MULTIFD_UUID_MSG];
> +    char *string_uuid;
> +    int res;
> +    bool exit = false;
>  
> -    while (true) {
> +    if (qemu_uuid_set) {
> +        string_uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +    } else {
> +        string_uuid = g_strdup(multifd_uuid);
> +    }
> +    res = snprintf(string, MULTIFD_UUID_MSG, "%s multifd %03d",
> +                   string_uuid, p->id);
> +    g_free(string_uuid);
> +
> +    /* -1 due to the wonders of '\0' accounting */
> +    if (res != (MULTIFD_UUID_MSG - 1)) {
> +        error_report("Multifd UUID message '%s' is not of right length",
> +            string);
> +        exit = true;
> +    } else {
> +        qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &error_abort);
> +    }
> +
> +    while (!exit) {
>          qemu_mutex_lock(&p->mutex);
>          if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
> @@ -445,6 +477,12 @@ int multifd_save_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          p->quit = false;
>          p->id = i;
> +        p->c = socket_send_channel_create();
> +        if (!p->c) {
> +            error_report("Error creating a send channel");
> +            multifd_save_cleanup();
> +            return -1;
> +        }
>          snprintf(thread_name, sizeof(thread_name), "multifdsend_%d", i);
>          qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
>                             QEMU_THREAD_JOINABLE);
> @@ -456,6 +494,7 @@ int multifd_save_setup(void)
>  struct MultiFDRecvParams {
>      uint8_t id;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -463,7 +502,7 @@ struct MultiFDRecvParams {
>  typedef struct MultiFDRecvParams MultiFDRecvParams;
>  
>  struct {
> -    MultiFDRecvParams *params;
> +    MultiFDRecvParams **params;

Probably want to push that upto where you added that struct?

>      /* number of created threads */
>      int count;
>  } *multifd_recv_state;
> @@ -473,7 +512,7 @@ static void terminate_multifd_recv_threads(void)
>      int i;
>  
>      for (i = 0; i < multifd_recv_state->count; i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
>  
>          qemu_mutex_lock(&p->mutex);
>          p->quit = true;
> @@ -491,11 +530,13 @@ void multifd_load_cleanup(void)
>      }
>      terminate_multifd_recv_threads();
>      for (i = 0; i < multifd_recv_state->count; i++) {
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +        MultiFDRecvParams *p = multifd_recv_state->params[i];
>  
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_recv_channel_destroy(p->c);
> +        g_free(p);
>      }
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> @@ -520,31 +561,70 @@ static void *multifd_recv_thread(void *opaque)
>      return NULL;
>  }
>  
> +gboolean multifd_new_channel(QIOChannel *ioc)
> +{
> +    int thread_count = migrate_multifd_threads();
> +    MultiFDRecvParams *p = g_new0(MultiFDRecvParams, 1);
> +    MigrationState *s = migrate_get_current();
> +    char string[MULTIFD_UUID_MSG];
> +    char string_uuid[UUID_FMT_LEN];
> +    char *uuid;
> +    int id;
> +
> +    qio_channel_read(ioc, string, sizeof(string), &error_abort);
> +    sscanf(string, "%s multifd %03d", string_uuid, &id);
> +
> +    if (qemu_uuid_set) {
> +        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +    } else {
> +        uuid = g_strdup(multifd_uuid);
> +    }
> +    if (strcmp(string_uuid, uuid)) {
> +        error_report("multifd: received uuid '%s' and expected uuid '%s'",
> +                     string_uuid, uuid);

probably worth adding the channel id as well so we can see
when it fails.

> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +        terminate_multifd_recv_threads();
> +        return FALSE;
> +    }
> +    g_free(uuid);
> +
> +    if (multifd_recv_state->params[id] != NULL) {
> +        error_report("multifd: received id '%d' is already setup'", id);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +        terminate_multifd_recv_threads();
> +        return FALSE;
> +    }
> +    qemu_mutex_init(&p->mutex);
> +    qemu_sem_init(&p->sem, 0);
> +    p->quit = false;
> +    p->id = id;
> +    p->c = ioc;
> +    atomic_set(&multifd_recv_state->params[id], p);

Can you explain why this is quite so careful about ordering ? Is there
something that could look at params or try and take the mutex before
the count is incremented?

I think it's safe to do:
 p->quit = false;
 p->id = id;
 p->c = ioc;
 &multifd_recv_state->params[id] = p;
 qemu_sem_init(&p->sem, 0);
 qemu_mutex_init(&p->mutex);
 qemu_thread_create(...)
 atomic_inc(&multifd_recv_state->count);    <-- I'm not sure if this
 needs to be atomic

> +    qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
> +                       QEMU_THREAD_JOINABLE);

You've lost the nice numbered thread names you had created in the
previous version of this that you're removing.

> +    multifd_recv_state->count++;
> +
> +    /* We need to return FALSE for the last channel */
> +    if (multifd_recv_state->count == thread_count) {
> +        return FALSE;
> +    } else {
> +        return TRUE;
> +    }

return multifd_recv_state->count != thread_count;   ?

> +}
> +
>  int multifd_load_setup(void)
>  {
>      int thread_count;
> -    uint8_t i;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
>      }
>      thread_count = migrate_multifd_threads();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> -    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +    multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
>      multifd_recv_state->count = 0;
> -    for (i = 0; i < thread_count; i++) {
> -        char thread_name[16];
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> -        qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
> -        p->quit = false;
> -        p->id = i;
> -        snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
> -        qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -        multifd_recv_state->count++;
> -    }
>      return 0;
>  }
>  
> diff --git a/migration/ram.h b/migration/ram.h
> index 93c2bb4..9413544 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>  
>  #include "qemu-common.h"
>  #include "exec/cpu-common.h"
> +#include "io/channel.h"
>  
>  extern MigrationStats ram_counters;
>  extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,7 @@ int multifd_save_setup(void);
>  void multifd_save_cleanup(void);
>  int multifd_load_setup(void);
>  void multifd_load_cleanup(void);
> +gboolean multifd_new_channel(QIOChannel *ioc);
>  
>  uint64_t ram_pagesize_summary(void);
>  int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 6195596..32a6b39 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -26,6 +26,38 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(recv));
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +    Error **errp;
> +} outgoing_args;
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> +    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
> +                                    outgoing_args.errp);
> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}
>  
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -96,6 +128,9 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>  
>      data->s = s;
> +    outgoing_args.saddr = saddr;
> +    outgoing_args.errp = errp;
> +
>      if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
>          data->hostname = g_strdup(saddr->u.inet.host);
>      }
> @@ -106,7 +141,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
>  
>  void tcp_start_outgoing_migration(MigrationState *s,
> @@ -151,8 +185,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>  
>      qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-incoming");
>      result = migration_channel_process_incoming(QIO_CHANNEL(sioc));
> -    object_unref(OBJECT(sioc));
> -
>  out:
>      if (result == FALSE) {
>          /* Close listening socket as its no longer needed */
> diff --git a/migration/socket.h b/migration/socket.h
> index 6b91e9d..dabce0e 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -16,6 +16,16 @@
>  
>  #ifndef QEMU_MIGRATION_SOCKET_H
>  #define QEMU_MIGRATION_SOCKET_H
> +
> +#include "io/channel.h"
> +
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +
> +QIOChannel *socket_send_channel_create(void);
> +
> +int socket_send_channel_destroy(QIOChannel *send);
> +
>  void tcp_start_incoming_migration(const char *host_port, Error **errp);
>  
>  void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
> -- 
> 2.9.4

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Peter Xu July 20, 2017, 9:34 a.m. UTC | #3
On Mon, Jul 17, 2017 at 03:42:30PM +0200, Juan Quintela wrote:

[...]

>  int multifd_load_setup(void)
>  {
>      int thread_count;
> -    uint8_t i;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
>      }
>      thread_count = migrate_multifd_threads();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> -    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +    multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
>      multifd_recv_state->count = 0;
> -    for (i = 0; i < thread_count; i++) {
> -        char thread_name[16];
> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> -        qemu_mutex_init(&p->mutex);
> -        qemu_sem_init(&p->sem, 0);
> -        p->quit = false;
> -        p->id = i;
> -        snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
> -        qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -        multifd_recv_state->count++;
> -    }

Could I ask why we explicitly switched from MultiFDRecvParams[] array
into a pointer array? Can we still use the old array?  Thanks,
Juan Quintela Aug. 8, 2017, 9:19 a.m. UTC | #4
Peter Xu <peterx@redhat.com> wrote:
> On Mon, Jul 17, 2017 at 03:42:30PM +0200, Juan Quintela wrote:
>
> [...]
>
>>  int multifd_load_setup(void)
>>  {
>>      int thread_count;
>> -    uint8_t i;
>>  
>>      if (!migrate_use_multifd()) {
>>          return 0;
>>      }
>>      thread_count = migrate_multifd_threads();
>>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>> -    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>> +    multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
>>      multifd_recv_state->count = 0;
>> -    for (i = 0; i < thread_count; i++) {
>> -        char thread_name[16];
>> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
>> -
>> -        qemu_mutex_init(&p->mutex);
>> -        qemu_sem_init(&p->sem, 0);
>> -        p->quit = false;
>> -        p->id = i;
>> -        snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
>> -        qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
>> -                           QEMU_THREAD_JOINABLE);
>> -        multifd_recv_state->count++;
>> -    }
>
> Could I ask why we explicitly switched from MultiFDRecvParams[] array
> into a pointer array? Can we still use the old array?  Thanks,

Now, we could receive the channels out of order (the wonders of
networking).  So, we have two options that I can see:

* Add interesting global locking to be able to modify inplace (I know
  that it should be safe, but yet).
* Create a new struct in the new connection, and then atomically switch
  the pointer to the right instruction.

I can assure you that the second one makes it much more easier to detect
when you use the "channel" before you have fully created it O:-)

Later, Juan.
Juan Quintela Aug. 8, 2017, 9:35 a.m. UTC | #5
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We create new channels for each new thread created. We only send through
>> them a character to be sure that we are creating the channels in the
>> right order.
>
> That text is out of date isn't it?

oops, fixed.


>> +gboolean multifd_new_channel(QIOChannel *ioc)
>> +{
>> +    int thread_count = migrate_multifd_threads();
>> +    MultiFDRecvParams *p = g_new0(MultiFDRecvParams, 1);
>> +    MigrationState *s = migrate_get_current();
>> +    char string[MULTIFD_UUID_MSG];
>> +    char string_uuid[UUID_FMT_LEN];
>> +    char *uuid;
>> +    int id;
>> +
>> +    qio_channel_read(ioc, string, sizeof(string), &error_abort);
>> +    sscanf(string, "%s multifd %03d", string_uuid, &id);
>> +
>> +    if (qemu_uuid_set) {
>> +        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
>> +    } else {
>> +        uuid = g_strdup(multifd_uuid);
>> +    }
>> +    if (strcmp(string_uuid, uuid)) {
>> +        error_report("multifd: received uuid '%s' and expected uuid '%s'",
>> +                     string_uuid, uuid);
>
> probably worth adding the channel id as well so we can see
> when it fails.

Done.

>> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
>> +                          MIGRATION_STATUS_FAILED);
>> +        terminate_multifd_recv_threads();
>> +        return FALSE;
>> +    }
>> +    g_free(uuid);
>> +
>> +    if (multifd_recv_state->params[id] != NULL) {
>> +        error_report("multifd: received id '%d' is already setup'", id);
>> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
>> +                          MIGRATION_STATUS_FAILED);
>> +        terminate_multifd_recv_threads();
>> +        return FALSE;
>> +    }
>> +    qemu_mutex_init(&p->mutex);
>> +    qemu_sem_init(&p->sem, 0);
>> +    p->quit = false;
>> +    p->id = id;
>> +    p->c = ioc;
>> +    atomic_set(&multifd_recv_state->params[id], p);
>
> Can you explain why this is quite so careful about ordering ? Is there
> something that could look at params or try and take the mutex before
> the count is incremented?

what happened to me in the middle stages of the patches (yes, doing
asynchronously was painful) was that:

I created the threads (at the beggining I did the
multifd_recv_state->params[id] == p inside the thread, that makes things
really, really racy.  I *think* that now we could probably do this
as you state.



> I think it's safe to do:
>  p->quit = false;
>  p->id = id;
>  p->c = ioc;
>  &multifd_recv_state->params[id] = p;
>  qemu_sem_init(&p->sem, 0);
>  qemu_mutex_init(&p->mutex);
>  qemu_thread_create(...)
>  atomic_inc(&multifd_recv_state->count);    <-- I'm not sure if this
>  needs to be atomic

We only change it on the main thread, so it should be enough.  The split
that I want to do is:

we do the listen asynchronously
when something arrives, we just read it (main thread)
we then read <uuid> <string> <arguments>
and then after checking that uuid is right, we call whatever function we
have for "string", in our case "multifd", with <arguments> as one string
parameters.

This should make it easier to create new "channels" for other purposes.
So far so good.

But then it appears what are the responsabilities, At the beggining, I
read the string on the reception thread for that channel, that created a
race because I received the 1st message for that channel before the
channel was fully created (yes, it only happened sometimes, easy to
understand after debugging).  This is the main reason that I changed to
an array of pointers to structs instead of one array of structs.

Then, I had to ve very careful to know when I had created all the
channels threads, because otherwise I ended having races left and right.

I will try to test the ordering that you suggested.

>> +    qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
>> +                       QEMU_THREAD_JOINABLE);
>
> You've lost the nice numbered thread names you had created in the
> previous version of this that you're removing.

I could get them back, but they really were not showing at gdb, where do
they show? ps?

>> +    multifd_recv_state->count++;
>> +
>> +    /* We need to return FALSE for the last channel */
>> +    if (multifd_recv_state->count == thread_count) {
>> +        return FALSE;
>> +    } else {
>> +        return TRUE;
>> +    }
>
> return multifd_recv_state->count != thread_count;   ?

For other reasons I change this functions and now they use a different
way of setting/checking if we have finished.  Look at the new series.

I didn't do as you said because I feel it weird that we return a bool
when we expert a gboolean, but .....

Thanks, Juan.
Dr. David Alan Gilbert Aug. 8, 2017, 9:54 a.m. UTC | #6
* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We create new channels for each new thread created. We only send through
> >> them a character to be sure that we are creating the channels in the
> >> right order.
> >
> > That text is out of date isn't it?
> 
> oops, fixed.
> 
> 
> >> +gboolean multifd_new_channel(QIOChannel *ioc)
> >> +{
> >> +    int thread_count = migrate_multifd_threads();
> >> +    MultiFDRecvParams *p = g_new0(MultiFDRecvParams, 1);
> >> +    MigrationState *s = migrate_get_current();
> >> +    char string[MULTIFD_UUID_MSG];
> >> +    char string_uuid[UUID_FMT_LEN];
> >> +    char *uuid;
> >> +    int id;
> >> +
> >> +    qio_channel_read(ioc, string, sizeof(string), &error_abort);
> >> +    sscanf(string, "%s multifd %03d", string_uuid, &id);
> >> +
> >> +    if (qemu_uuid_set) {
> >> +        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> >> +    } else {
> >> +        uuid = g_strdup(multifd_uuid);
> >> +    }
> >> +    if (strcmp(string_uuid, uuid)) {
> >> +        error_report("multifd: received uuid '%s' and expected uuid '%s'",
> >> +                     string_uuid, uuid);
> >
> > probably worth adding the channel id as well so we can see
> > when it fails.
> 
> Done.
> 
> >> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> >> +                          MIGRATION_STATUS_FAILED);
> >> +        terminate_multifd_recv_threads();
> >> +        return FALSE;
> >> +    }
> >> +    g_free(uuid);
> >> +
> >> +    if (multifd_recv_state->params[id] != NULL) {
> >> +        error_report("multifd: received id '%d' is already setup'", id);
> >> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> >> +                          MIGRATION_STATUS_FAILED);
> >> +        terminate_multifd_recv_threads();
> >> +        return FALSE;
> >> +    }
> >> +    qemu_mutex_init(&p->mutex);
> >> +    qemu_sem_init(&p->sem, 0);
> >> +    p->quit = false;
> >> +    p->id = id;
> >> +    p->c = ioc;
> >> +    atomic_set(&multifd_recv_state->params[id], p);
> >
> > Can you explain why this is quite so careful about ordering ? Is there
> > something that could look at params or try and take the mutex before
> > the count is incremented?
> 
> what happened to me in the middle stages of the patches (yes, doing
> asynchronously was painful) was that:
> 
> I created the threads (at the beggining I did the
> multifd_recv_state->params[id] == p inside the thread, that makes things
> really, really racy.  I *think* that now we could probably do this
> as you state.
> 
> 
> 
> > I think it's safe to do:
> >  p->quit = false;
> >  p->id = id;
> >  p->c = ioc;
> >  &multifd_recv_state->params[id] = p;
> >  qemu_sem_init(&p->sem, 0);
> >  qemu_mutex_init(&p->mutex);
> >  qemu_thread_create(...)
> >  atomic_inc(&multifd_recv_state->count);    <-- I'm not sure if this
> >  needs to be atomic
> 
> We only change it on the main thread, so it should be enough.  The split
> that I want to do is:
> 
> we do the listen asynchronously
> when something arrives, we just read it (main thread)
> we then read <uuid> <string> <arguments>
> and then after checking that uuid is right, we call whatever function we
> have for "string", in our case "multifd", with <arguments> as one string
> parameters.
> 
> This should make it easier to create new "channels" for other purposes.
> So far so good.
> 
> But then it appears what are the responsabilities, At the beggining, I
> read the string on the reception thread for that channel, that created a
> race because I received the 1st message for that channel before the
> channel was fully created (yes, it only happened sometimes, easy to
> understand after debugging).  This is the main reason that I changed to
> an array of pointers to structs instead of one array of structs.
> 
> Then, I had to ve very careful to know when I had created all the
> channels threads, because otherwise I ended having races left and right.
> 
> I will try to test the ordering that you suggested.
> 
> >> +    qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
> >> +                       QEMU_THREAD_JOINABLE);
> >
> > You've lost the nice numbered thread names you had created in the
> > previous version of this that you're removing.
> 
> I could get them back, but they really were not showing at gdb, where do
> they show? ps?

If you start qemu with -name debug-threads=on they show up in gdb's
info threads
also in top (hit H) and ps if you turn on the right optioa (H as well?)n.

> >> +    multifd_recv_state->count++;
> >> +
> >> +    /* We need to return FALSE for the last channel */
> >> +    if (multifd_recv_state->count == thread_count) {
> >> +        return FALSE;
> >> +    } else {
> >> +        return TRUE;
> >> +    }
> >
> > return multifd_recv_state->count != thread_count;   ?
> 
> For other reasons I change this functions and now they use a different
> way of setting/checking if we have finished.  Look at the new series.
> 
> I didn't do as you said because I feel it weird that we return a bool
> when we expert a gboolean, but .....

I hope & believe they're defined as compatible:
  https://people.gnome.org/~desrt/glib-docs/glib-Standard-Macros.html#TRUE:CAPS

Dave
> Thanks, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Peter Xu Aug. 9, 2017, 8:08 a.m. UTC | #7
On Tue, Aug 08, 2017 at 11:19:35AM +0200, Juan Quintela wrote:
> Peter Xu <peterx@redhat.com> wrote:
> > On Mon, Jul 17, 2017 at 03:42:30PM +0200, Juan Quintela wrote:
> >
> > [...]
> >
> >>  int multifd_load_setup(void)
> >>  {
> >>      int thread_count;
> >> -    uint8_t i;
> >>  
> >>      if (!migrate_use_multifd()) {
> >>          return 0;
> >>      }
> >>      thread_count = migrate_multifd_threads();
> >>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> >> -    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> >> +    multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
> >>      multifd_recv_state->count = 0;
> >> -    for (i = 0; i < thread_count; i++) {
> >> -        char thread_name[16];
> >> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> >> -
> >> -        qemu_mutex_init(&p->mutex);
> >> -        qemu_sem_init(&p->sem, 0);
> >> -        p->quit = false;
> >> -        p->id = i;
> >> -        snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
> >> -        qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
> >> -                           QEMU_THREAD_JOINABLE);
> >> -        multifd_recv_state->count++;
> >> -    }
> >
> > Could I ask why we explicitly switched from MultiFDRecvParams[] array
> > into a pointer array? Can we still use the old array?  Thanks,
> 
> Now, we could receive the channels out of order (the wonders of
> networking).  So, we have two options that I can see:
> 
> * Add interesting global locking to be able to modify inplace (I know
>   that it should be safe, but yet).
> * Create a new struct in the new connection, and then atomically switch
>   the pointer to the right instruction.
> 
> I can assure you that the second one makes it much more easier to detect
> when you use the "channel" before you have fully created it O:-)

Oh, so it's possible that we start to recv pages even if the recv
channel has not yet been established...

Then would current code be problematic? Like in multifd_recv_page() we
have:

static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
{
    ...
    p = multifd_recv_state->params[fd_num];
    qemu_sem_wait(&p->ready);
    ...
}

Here can p==NULL if channel is not ready yet?

(If so, I think a static array makes more sense...)

Thanks,
Juan Quintela Aug. 9, 2017, 11:12 a.m. UTC | #8
Peter Xu <peterx@redhat.com> wrote:
> On Tue, Aug 08, 2017 at 11:19:35AM +0200, Juan Quintela wrote:
>> Peter Xu <peterx@redhat.com> wrote:
>> > On Mon, Jul 17, 2017 at 03:42:30PM +0200, Juan Quintela wrote:
>> >
>> > [...]
>> >
>> >>  int multifd_load_setup(void)
>> >>  {
>> >>      int thread_count;
>> >> -    uint8_t i;
>> >>  
>> >>      if (!migrate_use_multifd()) {
>> >>          return 0;
>> >>      }
>> >>      thread_count = migrate_multifd_threads();
>> >>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>> >> -    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>> >> +    multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
>> >>      multifd_recv_state->count = 0;
>> >> -    for (i = 0; i < thread_count; i++) {
>> >> -        char thread_name[16];
>> >> -        MultiFDRecvParams *p = &multifd_recv_state->params[i];
>> >> -
>> >> -        qemu_mutex_init(&p->mutex);
>> >> -        qemu_sem_init(&p->sem, 0);
>> >> -        p->quit = false;
>> >> -        p->id = i;
>> >> -        snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
>> >> -        qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
>> >> -                           QEMU_THREAD_JOINABLE);
>> >> -        multifd_recv_state->count++;
>> >> -    }
>> >
>> > Could I ask why we explicitly switched from MultiFDRecvParams[] array
>> > into a pointer array? Can we still use the old array?  Thanks,
>> 
>> Now, we could receive the channels out of order (the wonders of
>> networking).  So, we have two options that I can see:
>> 
>> * Add interesting global locking to be able to modify inplace (I know
>>   that it should be safe, but yet).
>> * Create a new struct in the new connection, and then atomically switch
>>   the pointer to the right instruction.
>> 
>> I can assure you that the second one makes it much more easier to detect
>> when you use the "channel" before you have fully created it O:-)
>
> Oh, so it's possible that we start to recv pages even if the recv
> channel has not yet been established...
>
> Then would current code be problematic? Like in multifd_recv_page() we
> have:
>
> static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
> {
>     ...
>     p = multifd_recv_state->params[fd_num];
>     qemu_sem_wait(&p->ready);
>     ...
> }
>
> Here can p==NULL if channel is not ready yet?
>
> (If so, I think a static array makes more sense...)

Yeap.  If we make an error (and believe me that I did), we  get a "nice"
segmentation fault, where we can see what fd_num is.  Otherwise we
receive a hang qemu.  I know what I preffer O:-)

Later, Juan.
diff mbox

Patch

diff --git a/migration/migration.c b/migration/migration.c
index b81c498..e1c79d5 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -389,8 +389,13 @@  gboolean migration_ioc_process_incoming(QIOChannel *ioc)
         QEMUFile *f = qemu_fopen_channel_input(ioc);
         mis->from_src_file = f;
         migration_fd_process_incoming(f);
+        if (!migrate_use_multifd()) {
+            return FALSE;
+        } else {
+            return TRUE;
+        }
     }
-    return FALSE; /* unregister */
+    return multifd_new_channel(ioc);
 }
 
 /*
diff --git a/migration/ram.c b/migration/ram.c
index 8e87533..b80f511 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@ 
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -46,6 +47,8 @@ 
 #include "exec/ram_addr.h"
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -361,6 +364,7 @@  static void compress_threads_save_setup(void)
 struct MultiFDSendParams {
     uint8_t id;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -401,6 +405,7 @@  void multifd_save_cleanup(void)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_send_channel_destroy(p->c);
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -408,11 +413,38 @@  void multifd_save_cleanup(void)
     multifd_send_state = NULL;
 }
 
+/* Default uuid for multifd when qemu is not started with uuid */
+static char multifd_uuid[] = "5c49fd7e-af88-4a07-b6e8-091fd696ad40";
+/* strlen(multifd) + '-' + <channel id> + '-' +  UUID_FMT + '\0' */
+#define MULTIFD_UUID_MSG (7 + 1 + 3 + 1 + UUID_FMT_LEN + 1)
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    char string[MULTIFD_UUID_MSG];
+    char *string_uuid;
+    int res;
+    bool exit = false;
 
-    while (true) {
+    if (qemu_uuid_set) {
+        string_uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+    } else {
+        string_uuid = g_strdup(multifd_uuid);
+    }
+    res = snprintf(string, MULTIFD_UUID_MSG, "%s multifd %03d",
+                   string_uuid, p->id);
+    g_free(string_uuid);
+
+    /* -1 due to the wonders of '\0' accounting */
+    if (res != (MULTIFD_UUID_MSG - 1)) {
+        error_report("Multifd UUID message '%s' is not of right length",
+            string);
+        exit = true;
+    } else {
+        qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &error_abort);
+    }
+
+    while (!exit) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
@@ -445,6 +477,12 @@  int multifd_save_setup(void)
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
         p->id = i;
+        p->c = socket_send_channel_create();
+        if (!p->c) {
+            error_report("Error creating a send channel");
+            multifd_save_cleanup();
+            return -1;
+        }
         snprintf(thread_name, sizeof(thread_name), "multifdsend_%d", i);
         qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
                            QEMU_THREAD_JOINABLE);
@@ -456,6 +494,7 @@  int multifd_save_setup(void)
 struct MultiFDRecvParams {
     uint8_t id;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -463,7 +502,7 @@  struct MultiFDRecvParams {
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
 struct {
-    MultiFDRecvParams *params;
+    MultiFDRecvParams **params;
     /* number of created threads */
     int count;
 } *multifd_recv_state;
@@ -473,7 +512,7 @@  static void terminate_multifd_recv_threads(void)
     int i;
 
     for (i = 0; i < multifd_recv_state->count; i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+        MultiFDRecvParams *p = multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
         p->quit = true;
@@ -491,11 +530,13 @@  void multifd_load_cleanup(void)
     }
     terminate_multifd_recv_threads();
     for (i = 0; i < multifd_recv_state->count; i++) {
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+        MultiFDRecvParams *p = multifd_recv_state->params[i];
 
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        socket_recv_channel_destroy(p->c);
+        g_free(p);
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
@@ -520,31 +561,70 @@  static void *multifd_recv_thread(void *opaque)
     return NULL;
 }
 
+gboolean multifd_new_channel(QIOChannel *ioc)
+{
+    int thread_count = migrate_multifd_threads();
+    MultiFDRecvParams *p = g_new0(MultiFDRecvParams, 1);
+    MigrationState *s = migrate_get_current();
+    char string[MULTIFD_UUID_MSG];
+    char string_uuid[UUID_FMT_LEN];
+    char *uuid;
+    int id;
+
+    qio_channel_read(ioc, string, sizeof(string), &error_abort);
+    sscanf(string, "%s multifd %03d", string_uuid, &id);
+
+    if (qemu_uuid_set) {
+        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
+    } else {
+        uuid = g_strdup(multifd_uuid);
+    }
+    if (strcmp(string_uuid, uuid)) {
+        error_report("multifd: received uuid '%s' and expected uuid '%s'",
+                     string_uuid, uuid);
+        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                          MIGRATION_STATUS_FAILED);
+        terminate_multifd_recv_threads();
+        return FALSE;
+    }
+    g_free(uuid);
+
+    if (multifd_recv_state->params[id] != NULL) {
+        error_report("multifd: received id '%d' is already setup'", id);
+        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
+                          MIGRATION_STATUS_FAILED);
+        terminate_multifd_recv_threads();
+        return FALSE;
+    }
+    qemu_mutex_init(&p->mutex);
+    qemu_sem_init(&p->sem, 0);
+    p->quit = false;
+    p->id = id;
+    p->c = ioc;
+    atomic_set(&multifd_recv_state->params[id], p);
+    qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    multifd_recv_state->count++;
+
+    /* We need to return FALSE for the last channel */
+    if (multifd_recv_state->count == thread_count) {
+        return FALSE;
+    } else {
+        return TRUE;
+    }
+}
+
 int multifd_load_setup(void)
 {
     int thread_count;
-    uint8_t i;
 
     if (!migrate_use_multifd()) {
         return 0;
     }
     thread_count = migrate_multifd_threads();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
-    multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+    multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
     multifd_recv_state->count = 0;
-    for (i = 0; i < thread_count; i++) {
-        char thread_name[16];
-        MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
-        qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem, 0);
-        p->quit = false;
-        p->id = i;
-        snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
-        qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        multifd_recv_state->count++;
-    }
     return 0;
 }
 
diff --git a/migration/ram.h b/migration/ram.h
index 93c2bb4..9413544 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -31,6 +31,7 @@ 
 
 #include "qemu-common.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"
 
 extern MigrationStats ram_counters;
 extern XBZRLECacheStats xbzrle_counters;
@@ -43,6 +44,7 @@  int multifd_save_setup(void);
 void multifd_save_cleanup(void);
 int multifd_load_setup(void);
 void multifd_load_cleanup(void);
+gboolean multifd_new_channel(QIOChannel *ioc);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
diff --git a/migration/socket.c b/migration/socket.c
index 6195596..32a6b39 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -26,6 +26,38 @@ 
 #include "io/channel-socket.h"
 #include "trace.h"
 
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    /* Remove channel */
+    object_unref(OBJECT(recv));
+    return 0;
+}
+
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+    Error **errp;
+} outgoing_args;
+
+QIOChannel *socket_send_channel_create(void)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+
+    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
+                                    outgoing_args.errp);
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
 
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -96,6 +128,9 @@  static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+    outgoing_args.saddr = saddr;
+    outgoing_args.errp = errp;
+
     if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
         data->hostname = g_strdup(saddr->u.inet.host);
     }
@@ -106,7 +141,6 @@  static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
@@ -151,8 +185,6 @@  static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
 
     qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-incoming");
     result = migration_channel_process_incoming(QIO_CHANNEL(sioc));
-    object_unref(OBJECT(sioc));
-
 out:
     if (result == FALSE) {
         /* Close listening socket as its no longer needed */
diff --git a/migration/socket.h b/migration/socket.h
index 6b91e9d..dabce0e 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -16,6 +16,16 @@ 
 
 #ifndef QEMU_MIGRATION_SOCKET_H
 #define QEMU_MIGRATION_SOCKET_H
+
+#include "io/channel.h"
+
+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+
+QIOChannel *socket_send_channel_create(void);
+
+int socket_send_channel_destroy(QIOChannel *send);
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,