diff mbox

[09/16] migration: Start of multiple fd work

Message ID 20170313124434.1043-10-quintela@redhat.com
State New
Headers show

Commit Message

Juan Quintela March 13, 2017, 12:44 p.m. UTC
We create new channels for each new thread created. We only send through
them a character to be sure that we are creating the channels in the
right order.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Split SocketArgs into incoming and outgoing args

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  7 +++++
 migration/ram.c               | 35 ++++++++++++++++++++++
 migration/socket.c            | 67 +++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 106 insertions(+), 3 deletions(-)

Comments

Daniel P. Berrangé March 13, 2017, 4:41 p.m. UTC | #1
On Mon, Mar 13, 2017 at 01:44:27PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |  7 +++++
>  migration/ram.c               | 35 ++++++++++++++++++++++
>  migration/socket.c            | 67 +++++++++++++++++++++++++++++++++++++++++--
>  3 files changed, 106 insertions(+), 3 deletions(-)
> 

> diff --git a/migration/socket.c b/migration/socket.c
> index 13966f1..58a16b5 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -24,6 +24,65 @@
>  #include "io/channel-socket.h"
>  #include "trace.h"
>  
> +struct SocketIncomingArgs {
> +    QIOChannelSocket *ioc;
> +} incoming_args;
> +
> +QIOChannel *socket_recv_channel_create(void)
> +{
> +    QIOChannelSocket *sioc;
> +    Error *err = NULL;
> +
> +    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(incoming_args.ioc),
> +                                     &err);
> +    if (!sioc) {
> +        error_report("could not accept migration connection (%s)",
> +                     error_get_pretty(err));
> +        return NULL;
> +    }
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    return 0;
> +}
> +
> +/* we have created all the recv channels, we can close the main one */
> +int socket_recv_channel_close_listening(void)
> +{
> +    /* Close listening socket as its no longer needed */
> +    qio_channel_close(QIO_CHANNEL(incoming_args.ioc), NULL);
> +    return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> +    SocketAddress *saddr;
> +    Error **errp;
> +} outgoing_args;
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> +    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
> +                                    outgoing_args.errp);
> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> +    return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> +    /* Remove channel */
> +    object_unref(OBJECT(send));
> +    if (outgoing_args.saddr) {
> +        qapi_free_SocketAddress(outgoing_args.saddr);
> +        outgoing_args.saddr = NULL;
> +    }
> +    return 0;
> +}
>  
>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>  {
> @@ -97,6 +156,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>  
>      data->s = s;
> +
> +    outgoing_args.saddr = saddr;
> +    outgoing_args.errp = errp;
> +
>      if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
>          data->hostname = g_strdup(saddr->u.inet.data->host);
>      }
> @@ -107,7 +170,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free);
> -    qapi_free_SocketAddress(saddr);
>  }
>  
>  void tcp_start_outgoing_migration(MigrationState *s,
> @@ -154,8 +216,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>      object_unref(OBJECT(sioc));
>  
>  out:
> -    /* Close listening socket as its no longer needed */
> -    qio_channel_close(ioc, NULL);
>      return FALSE; /* unregister */
>  }
>  
> @@ -164,6 +224,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
>                                              Error **errp)
>  {
>      QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> +    incoming_args.ioc = listen_ioc;
>  
>      qio_channel_set_name(QIO_CHANNEL(listen_ioc),
>                           "migration-socket-listener");

I still don't really like any of the changes in this file. We've now got
two sets of methods which connect to a remote host and two sets of methods
which accept incoming clients. I've got to think there's a better way to
refactor the existing code, such that we don't need two sets of methods
for the same actions


Regards,
Daniel
Juan Quintela March 13, 2017, 4:58 p.m. UTC | #2
"Daniel P. Berrange" <berrange@redhat.com> wrote:
> On Mon, Mar 13, 2017 at 01:44:27PM +0100, Juan Quintela wrote:
>> We create new channels for each new thread created. We only send through
>> them a character to be sure that we are creating the channels in the
>> right order.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> 
>> --
>> Split SocketArgs into incoming and outgoing args
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  include/migration/migration.h |  7 +++++
>>  migration/ram.c               | 35 ++++++++++++++++++++++
>>  migration/socket.c            | 67 +++++++++++++++++++++++++++++++++++++++++--
>>  3 files changed, 106 insertions(+), 3 deletions(-)
>> 
>
>> diff --git a/migration/socket.c b/migration/socket.c
>> index 13966f1..58a16b5 100644
>> --- a/migration/socket.c
>> +++ b/migration/socket.c
>> @@ -24,6 +24,65 @@
>>  #include "io/channel-socket.h"
>>  #include "trace.h"
>>  
>> +struct SocketIncomingArgs {
>> +    QIOChannelSocket *ioc;
>> +} incoming_args;
>> +
>> +QIOChannel *socket_recv_channel_create(void)
>> +{
>> +    QIOChannelSocket *sioc;
>> +    Error *err = NULL;
>> +
>> +    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(incoming_args.ioc),
>> +                                     &err);
>> +    if (!sioc) {
>> +        error_report("could not accept migration connection (%s)",
>> +                     error_get_pretty(err));
>> +        return NULL;
>> +    }
>> +    return QIO_CHANNEL(sioc);
>> +}
>> +
>> +int socket_recv_channel_destroy(QIOChannel *recv)
>> +{
>> +    /* Remove channel */
>> +    object_unref(OBJECT(send));
>> +    return 0;
>> +}
>> +
>> +/* we have created all the recv channels, we can close the main one */
>> +int socket_recv_channel_close_listening(void)
>> +{
>> +    /* Close listening socket as its no longer needed */
>> +    qio_channel_close(QIO_CHANNEL(incoming_args.ioc), NULL);
>> +    return 0;
>> +}
>> +
>> +struct SocketOutgoingArgs {
>> +    SocketAddress *saddr;
>> +    Error **errp;
>> +} outgoing_args;
>> +
>> +QIOChannel *socket_send_channel_create(void)
>> +{
>> +    QIOChannelSocket *sioc = qio_channel_socket_new();
>> +
>> +    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
>> +                                    outgoing_args.errp);
>> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
>> +    return QIO_CHANNEL(sioc);
>> +}
>> +
>> +int socket_send_channel_destroy(QIOChannel *send)
>> +{
>> +    /* Remove channel */
>> +    object_unref(OBJECT(send));
>> +    if (outgoing_args.saddr) {
>> +        qapi_free_SocketAddress(outgoing_args.saddr);
>> +        outgoing_args.saddr = NULL;
>> +    }
>> +    return 0;
>> +}
>>  
>>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
>>  {
>> @@ -97,6 +156,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
>>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>>  
>>      data->s = s;
>> +
>> +    outgoing_args.saddr = saddr;
>> +    outgoing_args.errp = errp;
>> +
>>      if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
>>          data->hostname = g_strdup(saddr->u.inet.data->host);
>>      }
>> @@ -107,7 +170,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
>>                                       socket_outgoing_migration,
>>                                       data,
>>                                       socket_connect_data_free);
>> -    qapi_free_SocketAddress(saddr);
>>  }
>>  
>>  void tcp_start_outgoing_migration(MigrationState *s,
>> @@ -154,8 +216,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>>      object_unref(OBJECT(sioc));
>>  
>>  out:
>> -    /* Close listening socket as its no longer needed */
>> -    qio_channel_close(ioc, NULL);
>>      return FALSE; /* unregister */

*HERE*


>>  }
>>  
>> @@ -164,6 +224,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
>>                                              Error **errp)
>>  {
>>      QIOChannelSocket *listen_ioc = qio_channel_socket_new();
>> +    incoming_args.ioc = listen_ioc;
>>  
>>      qio_channel_set_name(QIO_CHANNEL(listen_ioc),
>>                           "migration-socket-listener");
>
> I still don't really like any of the changes in this file. We've now got
> two sets of methods which connect to a remote host and two sets of methods
> which accept incoming clients. I've got to think there's a better way to
> refactor the existing code, such that we don't need two sets of methods
> for the same actions

I am open to suggestions, basically we want to be able to:
- open one + n channels
- be sure that we got the same id on both sides of the connection.

You suggested on the previous iteration that I changed the FALSE in
*HERE* for TRUE, but I was not able to:
- make sure that we have opened n sockets before we continue with
  migration
- making sure that we got same id numbers in both sides, that is doable,
  just add a new id field
- right now I open a channel, and wait for the other side to open it
  before open the following one.  I can do things in parallel, but
  locking is going to be "interesting".

So, as said, I don't really care how we open the channels, I am totally
open to suggestions.  Looking at the current code, this is the best way
that I have been able to think of.

Later, Juan.
Daniel P. Berrangé March 14, 2017, 10:34 a.m. UTC | #3
On Mon, Mar 13, 2017 at 05:58:06PM +0100, Juan Quintela wrote:
> "Daniel P. Berrange" <berrange@redhat.com> wrote:
> > On Mon, Mar 13, 2017 at 01:44:27PM +0100, Juan Quintela wrote:
> >> We create new channels for each new thread created. We only send through
> >> them a character to be sure that we are creating the channels in the
> >> right order.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> 
> >> --
> >> Split SocketArgs into incoming and outgoing args
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  include/migration/migration.h |  7 +++++
> >>  migration/ram.c               | 35 ++++++++++++++++++++++
> >>  migration/socket.c            | 67 +++++++++++++++++++++++++++++++++++++++++--
> >>  3 files changed, 106 insertions(+), 3 deletions(-)
> >> 
> >
> >> diff --git a/migration/socket.c b/migration/socket.c
> >> index 13966f1..58a16b5 100644
> >> --- a/migration/socket.c
> >> +++ b/migration/socket.c
> >> @@ -24,6 +24,65 @@
> >>  #include "io/channel-socket.h"
> >>  #include "trace.h"
> >>  
> >> +struct SocketIncomingArgs {
> >> +    QIOChannelSocket *ioc;
> >> +} incoming_args;
> >> +
> >> +QIOChannel *socket_recv_channel_create(void)
> >> +{
> >> +    QIOChannelSocket *sioc;
> >> +    Error *err = NULL;
> >> +
> >> +    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(incoming_args.ioc),
> >> +                                     &err);
> >> +    if (!sioc) {
> >> +        error_report("could not accept migration connection (%s)",
> >> +                     error_get_pretty(err));
> >> +        return NULL;
> >> +    }
> >> +    return QIO_CHANNEL(sioc);
> >> +}
> >> +
> >> +int socket_recv_channel_destroy(QIOChannel *recv)
> >> +{
> >> +    /* Remove channel */
> >> +    object_unref(OBJECT(send));
> >> +    return 0;
> >> +}
> >> +
> >> +/* we have created all the recv channels, we can close the main one */
> >> +int socket_recv_channel_close_listening(void)
> >> +{
> >> +    /* Close listening socket as its no longer needed */
> >> +    qio_channel_close(QIO_CHANNEL(incoming_args.ioc), NULL);
> >> +    return 0;
> >> +}
> >> +
> >> +struct SocketOutgoingArgs {
> >> +    SocketAddress *saddr;
> >> +    Error **errp;
> >> +} outgoing_args;
> >> +
> >> +QIOChannel *socket_send_channel_create(void)
> >> +{
> >> +    QIOChannelSocket *sioc = qio_channel_socket_new();
> >> +
> >> +    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
> >> +                                    outgoing_args.errp);
> >> +    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> >> +    return QIO_CHANNEL(sioc);
> >> +}
> >> +
> >> +int socket_send_channel_destroy(QIOChannel *send)
> >> +{
> >> +    /* Remove channel */
> >> +    object_unref(OBJECT(send));
> >> +    if (outgoing_args.saddr) {
> >> +        qapi_free_SocketAddress(outgoing_args.saddr);
> >> +        outgoing_args.saddr = NULL;
> >> +    }
> >> +    return 0;
> >> +}
> >>  
> >>  static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
> >>  {
> >> @@ -97,6 +156,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
> >>      struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
> >>  
> >>      data->s = s;
> >> +
> >> +    outgoing_args.saddr = saddr;
> >> +    outgoing_args.errp = errp;
> >> +
> >>      if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
> >>          data->hostname = g_strdup(saddr->u.inet.data->host);
> >>      }
> >> @@ -107,7 +170,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
> >>                                       socket_outgoing_migration,
> >>                                       data,
> >>                                       socket_connect_data_free);
> >> -    qapi_free_SocketAddress(saddr);
> >>  }
> >>  
> >>  void tcp_start_outgoing_migration(MigrationState *s,
> >> @@ -154,8 +216,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
> >>      object_unref(OBJECT(sioc));
> >>  
> >>  out:
> >> -    /* Close listening socket as its no longer needed */
> >> -    qio_channel_close(ioc, NULL);
> >>      return FALSE; /* unregister */
> 
> *HERE*
> 
> 
> >>  }
> >>  
> >> @@ -164,6 +224,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
> >>                                              Error **errp)
> >>  {
> >>      QIOChannelSocket *listen_ioc = qio_channel_socket_new();
> >> +    incoming_args.ioc = listen_ioc;
> >>  
> >>      qio_channel_set_name(QIO_CHANNEL(listen_ioc),
> >>                           "migration-socket-listener");
> >
> > I still don't really like any of the changes in this file. We've now got
> > two sets of methods which connect to a remote host and two sets of methods
> > which accept incoming clients. I've got to think there's a better way to
> > refactor the existing code, such that we don't need two sets of methods
> > for the same actions
> 
> I am open to suggestions, basically we want to be able to:
> - open one + n channels
> - be sure that we got the same id on both sides of the connection.
> 
> You suggested on the previous iteration that I changed the FALSE in
> *HERE* for TRUE, but I was not able to:
> - make sure that we have opened n sockets before we continue with
>   migration
> - making sure that we got same id numbers in both sides, that is doable,
>   just add a new id field
> - right now I open a channel, and wait for the other side to open it
>   before open the following one.  I can do things in parallel, but
>   locking is going to be "interesting".
> 
> So, as said, I don't really care how we open the channels, I am totally
> open to suggestions.  Looking at the current code, this is the best way
> that I have been able to think of.

I think the key problem in the current design is that you delay the opening
of the extra socket channels. To be able to remove most of this duplication,
I think you need to open all the channels at once right at the start.


IOW, in qmp_migrate() instead of calling tcp_start_outgoing_migration()
just once, use a loop to call it N times (where N == number of threads).

Now this method is asynchronous, and eventually triggers a call to
migration_channel_connect() when the connection actually succeeds.
You will need to change migration_channel_connect() so that it can be
called multiple times. migration_channel_connect() should count how
many channels have been opened, and only start the migration once all
of them are open.

The incoming side is a little different - in qemu_start_incoming_migration()
you only need call tcp_start_incoming_migration() once. In the
socket_accept_incoming_migration() method though, you need to change the
'return FALSE' to 'return TRUE', so that it continues to accept multiple
incoming clients. The socket_start_outgoing_migration()method needs to again
count the number of channels that have been opened so far, and only start
the actual migration once the right number are open.


By doing all this opening of channels upfront, you'll also make it much
easier to support the other migration protocols - in particular 'fd'
protocol needs to be extended so that libvirt can pass in multiple FDs
in the monitor command at once. The 'exec' protocol should also be
able to trivially support this by simply launching the command multiple
times.

Regards,
Daniel
Juan Quintela March 14, 2017, 12:32 p.m. UTC | #4
"Daniel P. Berrange" <berrange@redhat.com> wrote:
> On Mon, Mar 13, 2017 at 05:58:06PM +0100, Juan Quintela wrote:
>> "Daniel P. Berrange" <berrange@redhat.com> wrote:
>> > On Mon, Mar 13, 2017 at 01:44:27PM +0100, Juan Quintela wrote:
>> >
>> > I still don't really like any of the changes in this file. We've now got
>> > two sets of methods which connect to a remote host and two sets of methods
>> > which accept incoming clients. I've got to think there's a better way to
>> > refactor the existing code, such that we don't need two sets of methods
>> > for the same actions
>> 
>> I am open to suggestions, basically we want to be able to:
>> - open one + n channels
>> - be sure that we got the same id on both sides of the connection.
>> 
>> You suggested on the previous iteration that I changed the FALSE in
>> *HERE* for TRUE, but I was not able to:
>> - make sure that we have opened n sockets before we continue with
>>   migration
>> - making sure that we got same id numbers in both sides, that is doable,
>>   just add a new id field
>> - right now I open a channel, and wait for the other side to open it
>>   before open the following one.  I can do things in parallel, but
>>   locking is going to be "interesting".
>> 
>> So, as said, I don't really care how we open the channels, I am totally
>> open to suggestions.  Looking at the current code, this is the best way
>> that I have been able to think of.
>
> I think the key problem in the current design is that you delay the opening
> of the extra socket channels. To be able to remove most of this duplication,
> I think you need to open all the channels at once right at the start.
>
>
> IOW, in qmp_migrate() instead of calling tcp_start_outgoing_migration()
> just once, use a loop to call it N times (where N == number of threads).
>
> Now this method is asynchronous, and eventually triggers a call to
> migration_channel_connect() when the connection actually succeeds.
> You will need to change migration_channel_connect() so that it can be
> called multiple times. migration_channel_connect() should count how
> many channels have been opened, and only start the migration once all
> of them are open.
>
> The incoming side is a little different - in qemu_start_incoming_migration()
> you only need call tcp_start_incoming_migration() once. In the
> socket_accept_incoming_migration() method though, you need to change the
> 'return FALSE' to 'return TRUE', so that it continues to accept multiple
> incoming clients. The socket_start_outgoing_migration()method needs to again
> count the number of channels that have been opened so far, and only start
> the actual migration once the right number are open.
>
>
> By doing all this opening of channels upfront, you'll also make it much
> easier to support the other migration protocols - in particular 'fd'
> protocol needs to be extended so that libvirt can pass in multiple FDs
> in the monitor command at once. The 'exec' protocol should also be
> able to trivially support this by simply launching the command multiple
> times.

Ok. Thanks.  Will look into this.

Later, Juan.
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index e8b9fcb..cbb049d 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -23,6 +23,7 @@ 
 #include "exec/cpu-common.h"
 #include "qemu/coroutine_int.h"
 #include "qom/object.h"
+#include "io/channel.h"
 
 #define QEMU_VM_FILE_MAGIC           0x5145564d
 #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
@@ -235,6 +236,12 @@  void tcp_start_incoming_migration(const char *host_port, Error **errp);
 
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
 
+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+int socket_recv_channel_close_listening(void);
+QIOChannel *socket_send_channel_create(void);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void unix_start_incoming_migration(const char *path, Error **errp);
 
 void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index ee32fa8..7833e6f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -387,7 +387,9 @@  void migrate_compress_threads_create(void)
 struct MultiFDSendParams {
     int id;
     QemuThread thread;
+    QIOChannel *c;
     QemuSemaphore sem;
+    QemuSemaphore init;
     QemuMutex mutex;
     bool quit;
 };
@@ -427,6 +429,8 @@  void migrate_multifd_send_threads_join(void)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->init);
+        socket_send_channel_destroy(p->c);
     }
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -438,6 +442,11 @@  static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
 
+    char start = 's';
+
+    qio_channel_write(p->c, &start, 1, &error_abort);
+    qemu_sem_post(&p->init);
+
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
@@ -468,12 +477,20 @@  int migrate_multifd_send_threads_create(void)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->init, 0);
         p->quit = false;
         p->id = i;
+        p->c = socket_send_channel_create();
+        if (!p->c) {
+            error_report("Error creating a send channel");
+            migrate_multifd_send_threads_join();
+            return -1;
+        }
         snprintf(thread_name, 15, "multifd_send_%d", i);
         qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
                            QEMU_THREAD_JOINABLE);
         multifd_send_state->count++;
+        qemu_sem_wait(&p->init);
     }
     return 0;
 }
@@ -481,6 +498,8 @@  int migrate_multifd_send_threads_create(void)
 struct MultiFDRecvParams {
     int id;
     QemuThread thread;
+    QIOChannel *c;
+    QemuSemaphore init;
     QemuSemaphore sem;
     QemuMutex mutex;
     bool quit;
@@ -521,6 +540,8 @@  void migrate_multifd_recv_threads_join(void)
         qemu_thread_join(&p->thread);
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->init);
+        socket_send_channel_destroy(multifd_recv_state->params[i].c);
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
@@ -531,6 +552,10 @@  void migrate_multifd_recv_threads_join(void)
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    char start;
+
+    qio_channel_read(p->c, &start, 1, &error_abort);
+    qemu_sem_post(&p->init);
 
     while (true) {
         qemu_mutex_lock(&p->mutex);
@@ -561,12 +586,22 @@  int migrate_multifd_recv_threads_create(void)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->init, 0);
         p->quit = false;
         p->id = i;
+        p->c = socket_recv_channel_create();
+
+        if (!p->c) {
+            error_report("Error creating a recv channel");
+            migrate_multifd_recv_threads_join();
+            return -1;
+        }
         qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
                            QEMU_THREAD_JOINABLE);
         multifd_recv_state->count++;
+        qemu_sem_wait(&p->init);
     }
+    socket_recv_channel_close_listening();
     return 0;
 }
 
diff --git a/migration/socket.c b/migration/socket.c
index 13966f1..58a16b5 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -24,6 +24,65 @@ 
 #include "io/channel-socket.h"
 #include "trace.h"
 
+struct SocketIncomingArgs {
+    QIOChannelSocket *ioc;
+} incoming_args;
+
+QIOChannel *socket_recv_channel_create(void)
+{
+    QIOChannelSocket *sioc;
+    Error *err = NULL;
+
+    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(incoming_args.ioc),
+                                     &err);
+    if (!sioc) {
+        error_report("could not accept migration connection (%s)",
+                     error_get_pretty(err));
+        return NULL;
+    }
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    return 0;
+}
+
+/* we have created all the recv channels, we can close the main one */
+int socket_recv_channel_close_listening(void)
+{
+    /* Close listening socket as its no longer needed */
+    qio_channel_close(QIO_CHANNEL(incoming_args.ioc), NULL);
+    return 0;
+}
+
+struct SocketOutgoingArgs {
+    SocketAddress *saddr;
+    Error **errp;
+} outgoing_args;
+
+QIOChannel *socket_send_channel_create(void)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+
+    qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
+                                    outgoing_args.errp);
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    /* Remove channel */
+    object_unref(OBJECT(send));
+    if (outgoing_args.saddr) {
+        qapi_free_SocketAddress(outgoing_args.saddr);
+        outgoing_args.saddr = NULL;
+    }
+    return 0;
+}
 
 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -97,6 +156,10 @@  static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
 
     data->s = s;
+
+    outgoing_args.saddr = saddr;
+    outgoing_args.errp = errp;
+
     if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
         data->hostname = g_strdup(saddr->u.inet.data->host);
     }
@@ -107,7 +170,6 @@  static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
-    qapi_free_SocketAddress(saddr);
 }
 
 void tcp_start_outgoing_migration(MigrationState *s,
@@ -154,8 +216,6 @@  static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
     object_unref(OBJECT(sioc));
 
 out:
-    /* Close listening socket as its no longer needed */
-    qio_channel_close(ioc, NULL);
     return FALSE; /* unregister */
 }
 
@@ -164,6 +224,7 @@  static void socket_start_incoming_migration(SocketAddress *saddr,
                                             Error **errp)
 {
     QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+    incoming_args.ioc = listen_ioc;
 
     qio_channel_set_name(QIO_CHANNEL(listen_ioc),
                          "migration-socket-listener");