diff mbox series

[3/4] Establishing connection between any non-default source and destination pair

Message ID 20220609073305.142515-4-het.gala@nutanix.com
State New
Headers show
Series Multiple interface support on top of Multi-FD | expand

Commit Message

Het Gala June 9, 2022, 7:33 a.m. UTC
i) Binding of the socket to source ip address and port on the non-default
   interface has been implemented for multi-FD connection, which was not
   necessary earlier because the binding was on the default interface itself.

ii) Created an end to end connection between all multi-FD source and
    destination pairs.

Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
Signed-off-by: Het Gala <het.gala@nutanix.com>
---
 chardev/char-socket.c               |  4 +-
 include/io/channel-socket.h         | 26 ++++++-----
 include/qemu/sockets.h              |  6 ++-
 io/channel-socket.c                 | 50 ++++++++++++++------
 migration/socket.c                  | 15 +++---
 nbd/client-connection.c             |  2 +-
 qemu-nbd.c                          |  4 +-
 scsi/pr-manager-helper.c            |  1 +
 tests/unit/test-char.c              |  8 ++--
 tests/unit/test-io-channel-socket.c |  4 +-
 tests/unit/test-util-sockets.c      | 16 +++----
 ui/input-barrier.c                  |  2 +-
 ui/vnc.c                            |  3 +-
 util/qemu-sockets.c                 | 71 ++++++++++++++++++++---------
 14 files changed, 135 insertions(+), 77 deletions(-)

Comments

Daniel P. Berrangé June 16, 2022, 5:39 p.m. UTC | #1
On Thu, Jun 09, 2022 at 07:33:04AM +0000, Het Gala wrote:
> i) Binding of the socket to source ip address and port on the non-default
>    interface has been implemented for multi-FD connection, which was not
>    necessary earlier because the binding was on the default interface itself.
> 
> ii) Created an end to end connection between all multi-FD source and
>     destination pairs.
> 
> Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
> Signed-off-by: Het Gala <het.gala@nutanix.com>
> ---
>  chardev/char-socket.c               |  4 +-
>  include/io/channel-socket.h         | 26 ++++++-----
>  include/qemu/sockets.h              |  6 ++-
>  io/channel-socket.c                 | 50 ++++++++++++++------
>  migration/socket.c                  | 15 +++---
>  nbd/client-connection.c             |  2 +-
>  qemu-nbd.c                          |  4 +-
>  scsi/pr-manager-helper.c            |  1 +
>  tests/unit/test-char.c              |  8 ++--
>  tests/unit/test-io-channel-socket.c |  4 +-
>  tests/unit/test-util-sockets.c      | 16 +++----
>  ui/input-barrier.c                  |  2 +-
>  ui/vnc.c                            |  3 +-
>  util/qemu-sockets.c                 | 71 ++++++++++++++++++++---------
>  14 files changed, 135 insertions(+), 77 deletions(-)
> 
> diff --git a/chardev/char-socket.c b/chardev/char-socket.c
> index dc4e218eeb..f3725238c5 100644
> --- a/chardev/char-socket.c
> +++ b/chardev/char-socket.c
> @@ -932,7 +932,7 @@ static int tcp_chr_connect_client_sync(Chardev *chr, Error **errp)
>      QIOChannelSocket *sioc = qio_channel_socket_new();
>      tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING);
>      tcp_chr_set_client_ioc_name(chr, sioc);
> -    if (qio_channel_socket_connect_sync(sioc, s->addr, errp) < 0) {
> +    if (qio_channel_socket_connect_sync(sioc, s->addr, NULL, errp) < 0) {
>          tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED);
>          object_unref(OBJECT(sioc));
>          return -1;
> @@ -1120,7 +1120,7 @@ static void tcp_chr_connect_client_task(QIOTask *task,
>      SocketAddress *addr = opaque;
>      Error *err = NULL;
>  
> -    qio_channel_socket_connect_sync(ioc, addr, &err);
> +    qio_channel_socket_connect_sync(ioc, addr, NULL, &err);
>  
>      qio_task_set_error(task, err);
>  }
> diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
> index 513c428fe4..59d5b1b349 100644
> --- a/include/io/channel-socket.h
> +++ b/include/io/channel-socket.h
> @@ -83,41 +83,45 @@ qio_channel_socket_new_fd(int fd,
>  /**
>   * qio_channel_socket_connect_sync:
>   * @ioc: the socket channel object
> - * @addr: the address to connect to
> + * @dst_addr: the destination address to connect to
> + * @src_addr: the source address to be connected
>   * @errp: pointer to a NULL-initialized error object
>   *
> - * Attempt to connect to the address @addr. This method
> - * will run in the foreground so the caller will not regain
> - * execution control until the connection is established or
> + * Attempt to connect to the address @dst_addr with @src_addr.
> + * This method will run in the foreground so the caller will not
> + * regain execution control until the connection is established or
>   * an error occurs.
>   */
>  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> -                                    SocketAddress *addr,
> +                                    SocketAddress *dst_addr,
> +                                    SocketAddress *src_addr,
>                                      Error **errp);
>  
>  /**
>   * qio_channel_socket_connect_async:
>   * @ioc: the socket channel object
> - * @addr: the address to connect to
> + * @dst_addr: the destination address to connect to
>   * @callback: the function to invoke on completion
>   * @opaque: user data to pass to @callback
>   * @destroy: the function to free @opaque
>   * @context: the context to run the async task. If %NULL, the default
>   *           context will be used.
> + * @src_addr: the source address to be connected
>   *
> - * Attempt to connect to the address @addr. This method
> - * will run in the background so the caller will regain
> + * Attempt to connect to the address @dst_addr with the @src_addr.
> + * This method will run in the background so the caller will regain
>   * execution control immediately. The function @callback
> - * will be invoked on completion or failure. The @addr
> + * will be invoked on completion or failure. The @dst_addr
>   * parameter will be copied, so may be freed as soon
>   * as this function returns without waiting for completion.
>   */
>  void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
> -                                      SocketAddress *addr,
> +                                      SocketAddress *dst_addr,
>                                        QIOTaskFunc callback,
>                                        gpointer opaque,
>                                        GDestroyNotify destroy,
> -                                      GMainContext *context);
> +                                      GMainContext *context,
> +                                      SocketAddress *src_addr);

Lets avoid modifying these two methods, and thus avoid
updating countless callers.

In common with typical pattern in QIO code, lets add
a second variant

  qio_channel_socket_connect_full
  qio_channel_socket_connect_full_async

which has the extra parameters, then make the existing
simpler methods call the new ones.

ie qio_channel_socket_connect should call
qio_channel_socket_connect_full, pasing NULL for the
src_addr.



> diff --git a/io/channel-socket.c b/io/channel-socket.c
> index dc9c165de1..f8746ad646 100644
> --- a/io/channel-socket.c
> +++ b/io/channel-socket.c
> @@ -36,6 +36,12 @@
>  
>  #define SOCKET_MAX_FDS 16
>  
> +struct SrcDestAddress {
> +    SocketAddress *dst_addr;
> +    SocketAddress *src_addr;
> +};

Nitpick, just call this   'struct ConnectData'.

>  SocketAddress *
>  qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
>                                       Error **errp)
> @@ -145,13 +151,14 @@ qio_channel_socket_new_fd(int fd,
>  
>  
>  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> -                                    SocketAddress *addr,
> +                                    SocketAddress *dst_addr,
> +                                    SocketAddress *src_addr,
>                                      Error **errp)
>  {
>      int fd;
>  
> -    trace_qio_channel_socket_connect_sync(ioc, addr);
> -    fd = socket_connect(addr, errp);
> +    trace_qio_channel_socket_connect_sync(ioc, dst_addr);
> +    fd = socket_connect(dst_addr, src_addr, errp);
>      if (fd < 0) {
>          trace_qio_channel_socket_connect_fail(ioc);
>          return -1;
> @@ -177,39 +184,56 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>  }
>  
>  
> +static void qio_channel_socket_worker_free(gpointer opaque)
> +{
> +    struct SrcDestAddress *data = opaque;
> +    if (!data) {
> +        return;
> +    }
> +    qapi_free_SocketAddress(data->dst_addr);
> +    qapi_free_SocketAddress(data->src_addr);
> +    g_free(data);
> +}
> +
> +
>  static void qio_channel_socket_connect_worker(QIOTask *task,
>                                                gpointer opaque)
>  {
>      QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
> -    SocketAddress *addr = opaque;
> +    struct SrcDestAddress *data = opaque;
>      Error *err = NULL;
>  
> -    qio_channel_socket_connect_sync(ioc, addr, &err);
> +    qio_channel_socket_connect_sync(ioc, data->dst_addr, data->src_addr, &err);
>  
>      qio_task_set_error(task, err);
>  }
>  
>  
>  void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
> -                                      SocketAddress *addr,
> +                                      SocketAddress *dst_addr,
>                                        QIOTaskFunc callback,
>                                        gpointer opaque,
>                                        GDestroyNotify destroy,
> -                                      GMainContext *context)
> +                                      GMainContext *context,
> +                                      SocketAddress *src_addr)
>  {
>      QIOTask *task = qio_task_new(
>          OBJECT(ioc), callback, opaque, destroy);
> -    SocketAddress *addrCopy;
> -
> -    addrCopy = QAPI_CLONE(SocketAddress, addr);
> +    struct SrcDestAddress *data = g_new0(struct SrcDestAddress, 1);
>  
> +    data->dst_addr = QAPI_CLONE(SocketAddress, dst_addr);
> +    if (src_addr) {
> +        data->src_addr = QAPI_CLONE(SocketAddress, src_addr);
> +    } else {
> +        data->src_addr = NULL;
> +    }
>      /* socket_connect() does a non-blocking connect(), but it
>       * still blocks in DNS lookups, so we must use a thread */
> -    trace_qio_channel_socket_connect_async(ioc, addr);
> +    trace_qio_channel_socket_connect_async(ioc, dst_addr);
>      qio_task_run_in_thread(task,
>                             qio_channel_socket_connect_worker,
> -                           addrCopy,
> -                           (GDestroyNotify)qapi_free_SocketAddress,
> +                           data,
> +                           qio_channel_socket_worker_free,
>                             context);
>  }
>  
> diff --git a/migration/socket.c b/migration/socket.c
> index 21e0983df2..d0cb7cc6a6 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -47,7 +47,7 @@ void socket_send_channel_create(QIOTaskFunc f, void *data)
>  {
>      QIOChannelSocket *sioc = qio_channel_socket_new();
>      qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> -                                     f, data, NULL, NULL);
> +                                     f, data, NULL, NULL, NULL);
>  }
>  
>  int socket_send_channel_destroy(QIOChannel *send)
> @@ -110,7 +110,7 @@ out:
>  
>  static void
>  socket_start_outgoing_migration_internal(MigrationState *s,
> -                                         SocketAddress *saddr,
> +                                         SocketAddress *dst_addr,
>                                           Error **errp)
>  {
>      QIOChannelSocket *sioc = qio_channel_socket_new();
> @@ -118,20 +118,17 @@ socket_start_outgoing_migration_internal(MigrationState *s,
>  
>      data->s = s;
>  
> -    /* in case previous migration leaked it */
> -    qapi_free_SocketAddress(outgoing_args.saddr);
> -    outgoing_args.saddr = saddr;
> -
> -    if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
> -        data->hostname = g_strdup(saddr->u.inet.host);
> +    if (dst_addr->type == SOCKET_ADDRESS_TYPE_INET) {
> +        data->hostname = g_strdup(dst_addr->u.inet.host);
>      }
>  
>      qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-outgoing");
>      qio_channel_socket_connect_async(sioc,
> -                                     saddr,
> +                                     dst_addr,
>                                       socket_outgoing_migration,
>                                       data,
>                                       socket_connect_data_free,
> +                                     NULL,
>                                       NULL);
>  }
>  

> diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
> index 13b5b197f9..bbe0dc0ee0 100644
> --- a/util/qemu-sockets.c
> +++ b/util/qemu-sockets.c
> @@ -226,7 +226,7 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
>          return -1;
>      }
>  
> -    memset(&ai,0, sizeof(ai));
> +    memset(&ai,0,sizeof(ai));

Add whitespace before the '0', rather than removing it after.

>      ai.ai_flags = AI_PASSIVE;
>      if (saddr->has_numeric && saddr->numeric) {
>          ai.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
> @@ -282,8 +282,8 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
>              e->ai_protocol = IPPROTO_MPTCP;
>          }
>  #endif
> -        getnameinfo((struct sockaddr*)e->ai_addr,e->ai_addrlen,
> -                        uaddr,INET6_ADDRSTRLEN,uport,32,
> +        getnameinfo((struct sockaddr *)e->ai_addr, e->ai_addrlen,
> +                        uaddr, INET6_ADDRSTRLEN, uport, 32,
>                          NI_NUMERICHOST | NI_NUMERICSERV);

Both thsi & the above whitespace changes should be a separate
patch from any functional changes


> @@ -371,8 +372,28 @@ static int inet_connect_addr(const InetSocketAddress *saddr,
>      }
>      socket_set_fast_reuse(sock);
>  
> +    /* to bind the socket if src_addr is available */
> +
> +    if (src_addr) {
> +        struct sockaddr_in servaddr;
> +
> +        /* bind to a specific interface in the internet domain */
> +        /* to make sure the sin_zero filed is cleared */
> +        memset(&servaddr, 0, sizeof(servaddr));
> +
> +        servaddr.sin_family = AF_INET;
> +        servaddr.sin_addr.s_addr = inet_addr(src_addr->host);

We can't assume src_addr is a raw IPv4 address.

THis needs to go through getaddrinfo in the caller like the
dst address has done.

For sanity, we should also validate that there isn't a mismatch
of IPv4 vs IPv6 for thte src & dst addrs.

> +        servaddr.sin_port = 0;



> +
> +        if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
> +            error_setg_errno(errp, errno, "Failed to bind socket");
> +            return -1;
> +        }
> +    }
> +
>      /* connect to peer */
>      do {
> +
>          rc = 0;
>          if (connect(sock, addr->ai_addr, addr->ai_addrlen) < 0) {
>              rc = -errno;
> @@ -380,8 +401,14 @@ static int inet_connect_addr(const InetSocketAddress *saddr,
>      } while (rc == -EINTR);
>  
>      if (rc < 0) {
> -        error_setg_errno(errp, errno, "Failed to connect to '%s:%s'",
> -                         saddr->host, saddr->port);
> +        if (src_addr) {
> +            error_setg_errno(errp, errno, "Failed to connect '%s:%s' to "
> +                             "'%s:%s'", dst_addr->host, dst_addr->port,
> +                             src_addr->host, src_addr->port);
> +        } else {
> +            error_setg_errno(errp, errno, "Failed to connect '%s:%s'",
> +                             dst_addr->host, dst_addr->port);
> +        }
>          closesocket(sock);
>          return -1;
>      }

> @@ -506,7 +534,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr,
>      Error *err = NULL;
>  
>      /* lookup peer addr */
> -    memset(&ai,0, sizeof(ai));
> +    memset(&ai, 0, sizeof(ai));
>      ai.ai_flags = AI_CANONNAME | AI_V4MAPPED | AI_ADDRCONFIG;
>      ai.ai_family = inet_ai_family_from_address(sraddr, &err);
>      ai.ai_socktype = SOCK_DGRAM;
> @@ -533,7 +561,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr,
>      }
>  
>      /* lookup local addr */
> -    memset(&ai,0, sizeof(ai));
> +    memset(&ai, 0, sizeof(ai));
>      ai.ai_flags = AI_PASSIVE;
>      ai.ai_family = peer->ai_family;
>      ai.ai_socktype = SOCK_DGRAM;


> @@ -574,7 +602,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr,
>      }
>  
>      /* connect to peer */
> -    if (connect(sock,peer->ai_addr,peer->ai_addrlen) < 0) {
> +    if (connect(sock, peer->ai_addr, peer->ai_addrlen) < 0) {
>          error_setg_errno(errp, errno, "Failed to connect to '%s:%s'",
>                           addr, port);
>          goto err;

More whitespace changes for a separate patch




With regards,
Daniel
manish.mishra June 21, 2022, 4:09 p.m. UTC | #2
Hi Daniel, David,

Thank you so much for review on patches. I am posting this message on

behalf of Het. We wanted to get a early feedback so sorry if code was not

in best of shape. Het is currently on break intership break so does not have

access to nutanix mail, he will join in first week of july and will definately post

v2 on this by 2nd week of july.

thanks

Manish MIshra

On 16/06/22 11:09 pm, Daniel P. Berrangé wrote:
> On Thu, Jun 09, 2022 at 07:33:04AM +0000, Het Gala wrote:
>> i) Binding of the socket to source ip address and port on the non-default
>>     interface has been implemented for multi-FD connection, which was not
>>     necessary earlier because the binding was on the default interface itself.
>>
>> ii) Created an end to end connection between all multi-FD source and
>>      destination pairs.
>>
>> Suggested-by: Manish Mishra<manish.mishra@nutanix.com>
>> Signed-off-by: Het Gala<het.gala@nutanix.com>
>> ---
>>   chardev/char-socket.c               |  4 +-
>>   include/io/channel-socket.h         | 26 ++++++-----
>>   include/qemu/sockets.h              |  6 ++-
>>   io/channel-socket.c                 | 50 ++++++++++++++------
>>   migration/socket.c                  | 15 +++---
>>   nbd/client-connection.c             |  2 +-
>>   qemu-nbd.c                          |  4 +-
>>   scsi/pr-manager-helper.c            |  1 +
>>   tests/unit/test-char.c              |  8 ++--
>>   tests/unit/test-io-channel-socket.c |  4 +-
>>   tests/unit/test-util-sockets.c      | 16 +++----
>>   ui/input-barrier.c                  |  2 +-
>>   ui/vnc.c                            |  3 +-
>>   util/qemu-sockets.c                 | 71 ++++++++++++++++++++---------
>>   14 files changed, 135 insertions(+), 77 deletions(-)
>>
>> diff --git a/chardev/char-socket.c b/chardev/char-socket.c
>> index dc4e218eeb..f3725238c5 100644
>> --- a/chardev/char-socket.c
>> +++ b/chardev/char-socket.c
>> @@ -932,7 +932,7 @@ static int tcp_chr_connect_client_sync(Chardev *chr, Error **errp)
>>       QIOChannelSocket *sioc = qio_channel_socket_new();
>>       tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING);
>>       tcp_chr_set_client_ioc_name(chr, sioc);
>> -    if (qio_channel_socket_connect_sync(sioc, s->addr, errp) < 0) {
>> +    if (qio_channel_socket_connect_sync(sioc, s->addr, NULL, errp) < 0) {
>>           tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED);
>>           object_unref(OBJECT(sioc));
>>           return -1;
>> @@ -1120,7 +1120,7 @@ static void tcp_chr_connect_client_task(QIOTask *task,
>>       SocketAddress *addr = opaque;
>>       Error *err = NULL;
>>   
>> -    qio_channel_socket_connect_sync(ioc, addr, &err);
>> +    qio_channel_socket_connect_sync(ioc, addr, NULL, &err);
>>   
>>       qio_task_set_error(task, err);
>>   }
>> diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
>> index 513c428fe4..59d5b1b349 100644
>> --- a/include/io/channel-socket.h
>> +++ b/include/io/channel-socket.h
>> @@ -83,41 +83,45 @@ qio_channel_socket_new_fd(int fd,
>>   /**
>>    * qio_channel_socket_connect_sync:
>>    * @ioc: the socket channel object
>> - * @addr: the address to connect to
>> + * @dst_addr: the destination address to connect to
>> + * @src_addr: the source address to be connected
>>    * @errp: pointer to a NULL-initialized error object
>>    *
>> - * Attempt to connect to the address @addr. This method
>> - * will run in the foreground so the caller will not regain
>> - * execution control until the connection is established or
>> + * Attempt to connect to the address @dst_addr with @src_addr.
>> + * This method will run in the foreground so the caller will not
>> + * regain execution control until the connection is established or
>>    * an error occurs.
>>    */
>>   int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>> -                                    SocketAddress *addr,
>> +                                    SocketAddress *dst_addr,
>> +                                    SocketAddress *src_addr,
>>                                       Error **errp);
>>   
>>   /**
>>    * qio_channel_socket_connect_async:
>>    * @ioc: the socket channel object
>> - * @addr: the address to connect to
>> + * @dst_addr: the destination address to connect to
>>    * @callback: the function to invoke on completion
>>    * @opaque: user data to pass to @callback
>>    * @destroy: the function to free @opaque
>>    * @context: the context to run the async task. If %NULL, the default
>>    *           context will be used.
>> + * @src_addr: the source address to be connected
>>    *
>> - * Attempt to connect to the address @addr. This method
>> - * will run in the background so the caller will regain
>> + * Attempt to connect to the address @dst_addr with the @src_addr.
>> + * This method will run in the background so the caller will regain
>>    * execution control immediately. The function @callback
>> - * will be invoked on completion or failure. The @addr
>> + * will be invoked on completion or failure. The @dst_addr
>>    * parameter will be copied, so may be freed as soon
>>    * as this function returns without waiting for completion.
>>    */
>>   void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
>> -                                      SocketAddress *addr,
>> +                                      SocketAddress *dst_addr,
>>                                         QIOTaskFunc callback,
>>                                         gpointer opaque,
>>                                         GDestroyNotify destroy,
>> -                                      GMainContext *context);
>> +                                      GMainContext *context,
>> +                                      SocketAddress *src_addr);
> Lets avoid modifying these two methods, and thus avoid
> updating countless callers.
>
> In common with typical pattern in QIO code, lets add
> a second variant
>
>    qio_channel_socket_connect_full
>    qio_channel_socket_connect_full_async
>
> which has the extra parameters, then make the existing
> simpler methods call the new ones.
>
> ie qio_channel_socket_connect should call
> qio_channel_socket_connect_full, pasing NULL for the
> src_addr.
>
>
>
>> diff --git a/io/channel-socket.c b/io/channel-socket.c
>> index dc9c165de1..f8746ad646 100644
>> --- a/io/channel-socket.c
>> +++ b/io/channel-socket.c
>> @@ -36,6 +36,12 @@
>>   
>>   #define SOCKET_MAX_FDS 16
>>   
>> +struct SrcDestAddress {
>> +    SocketAddress *dst_addr;
>> +    SocketAddress *src_addr;
>> +};
> Nitpick, just call this   'struct ConnectData'.
>
>>   SocketAddress *
>>   qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
>>                                        Error **errp)
>> @@ -145,13 +151,14 @@ qio_channel_socket_new_fd(int fd,
>>   
>>   
>>   int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>> -                                    SocketAddress *addr,
>> +                                    SocketAddress *dst_addr,
>> +                                    SocketAddress *src_addr,
>>                                       Error **errp)
>>   {
>>       int fd;
>>   
>> -    trace_qio_channel_socket_connect_sync(ioc, addr);
>> -    fd = socket_connect(addr, errp);
>> +    trace_qio_channel_socket_connect_sync(ioc, dst_addr);
>> +    fd = socket_connect(dst_addr, src_addr, errp);
>>       if (fd < 0) {
>>           trace_qio_channel_socket_connect_fail(ioc);
>>           return -1;
>> @@ -177,39 +184,56 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>>   }
>>   
>>   
>> +static void qio_channel_socket_worker_free(gpointer opaque)
>> +{
>> +    struct SrcDestAddress *data = opaque;
>> +    if (!data) {
>> +        return;
>> +    }
>> +    qapi_free_SocketAddress(data->dst_addr);
>> +    qapi_free_SocketAddress(data->src_addr);
>> +    g_free(data);
>> +}
>> +
>> +
>>   static void qio_channel_socket_connect_worker(QIOTask *task,
>>                                                 gpointer opaque)
>>   {
>>       QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
>> -    SocketAddress *addr = opaque;
>> +    struct SrcDestAddress *data = opaque;
>>       Error *err = NULL;
>>   
>> -    qio_channel_socket_connect_sync(ioc, addr, &err);
>> +    qio_channel_socket_connect_sync(ioc, data->dst_addr, data->src_addr, &err);
>>   
>>       qio_task_set_error(task, err);
>>   }
>>   
>>   
>>   void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
>> -                                      SocketAddress *addr,
>> +                                      SocketAddress *dst_addr,
>>                                         QIOTaskFunc callback,
>>                                         gpointer opaque,
>>                                         GDestroyNotify destroy,
>> -                                      GMainContext *context)
>> +                                      GMainContext *context,
>> +                                      SocketAddress *src_addr)
>>   {
>>       QIOTask *task = qio_task_new(
>>           OBJECT(ioc), callback, opaque, destroy);
>> -    SocketAddress *addrCopy;
>> -
>> -    addrCopy = QAPI_CLONE(SocketAddress, addr);
>> +    struct SrcDestAddress *data = g_new0(struct SrcDestAddress, 1);
>>   
>> +    data->dst_addr = QAPI_CLONE(SocketAddress, dst_addr);
>> +    if (src_addr) {
>> +        data->src_addr = QAPI_CLONE(SocketAddress, src_addr);
>> +    } else {
>> +        data->src_addr = NULL;
>> +    }
>>       /* socket_connect() does a non-blocking connect(), but it
>>        * still blocks in DNS lookups, so we must use a thread */
>> -    trace_qio_channel_socket_connect_async(ioc, addr);
>> +    trace_qio_channel_socket_connect_async(ioc, dst_addr);
>>       qio_task_run_in_thread(task,
>>                              qio_channel_socket_connect_worker,
>> -                           addrCopy,
>> -                           (GDestroyNotify)qapi_free_SocketAddress,
>> +                           data,
>> +                           qio_channel_socket_worker_free,
>>                              context);
>>   }
>>   
>> diff --git a/migration/socket.c b/migration/socket.c
>> index 21e0983df2..d0cb7cc6a6 100644
>> --- a/migration/socket.c
>> +++ b/migration/socket.c
>> @@ -47,7 +47,7 @@ void socket_send_channel_create(QIOTaskFunc f, void *data)
>>   {
>>       QIOChannelSocket *sioc = qio_channel_socket_new();
>>       qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
>> -                                     f, data, NULL, NULL);
>> +                                     f, data, NULL, NULL, NULL);
>>   }
>>   
>>   int socket_send_channel_destroy(QIOChannel *send)
>> @@ -110,7 +110,7 @@ out:
>>   
>>   static void
>>   socket_start_outgoing_migration_internal(MigrationState *s,
>> -                                         SocketAddress *saddr,
>> +                                         SocketAddress *dst_addr,
>>                                            Error **errp)
>>   {
>>       QIOChannelSocket *sioc = qio_channel_socket_new();
>> @@ -118,20 +118,17 @@ socket_start_outgoing_migration_internal(MigrationState *s,
>>   
>>       data->s = s;
>>   
>> -    /* in case previous migration leaked it */
>> -    qapi_free_SocketAddress(outgoing_args.saddr);
>> -    outgoing_args.saddr = saddr;
>> -
>> -    if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
>> -        data->hostname = g_strdup(saddr->u.inet.host);
>> +    if (dst_addr->type == SOCKET_ADDRESS_TYPE_INET) {
>> +        data->hostname = g_strdup(dst_addr->u.inet.host);
>>       }
>>   
>>       qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-outgoing");
>>       qio_channel_socket_connect_async(sioc,
>> -                                     saddr,
>> +                                     dst_addr,
>>                                        socket_outgoing_migration,
>>                                        data,
>>                                        socket_connect_data_free,
>> +                                     NULL,
>>                                        NULL);
>>   }
>>   
>> diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
>> index 13b5b197f9..bbe0dc0ee0 100644
>> --- a/util/qemu-sockets.c
>> +++ b/util/qemu-sockets.c
>> @@ -226,7 +226,7 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
>>           return -1;
>>       }
>>   
>> -    memset(&ai,0, sizeof(ai));
>> +    memset(&ai,0,sizeof(ai));
> Add whitespace before the '0', rather than removing it after.
>
>>       ai.ai_flags = AI_PASSIVE;
>>       if (saddr->has_numeric && saddr->numeric) {
>>           ai.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
>> @@ -282,8 +282,8 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
>>               e->ai_protocol = IPPROTO_MPTCP;
>>           }
>>   #endif
>> -        getnameinfo((struct sockaddr*)e->ai_addr,e->ai_addrlen,
>> -                        uaddr,INET6_ADDRSTRLEN,uport,32,
>> +        getnameinfo((struct sockaddr *)e->ai_addr, e->ai_addrlen,
>> +                        uaddr, INET6_ADDRSTRLEN, uport, 32,
>>                           NI_NUMERICHOST | NI_NUMERICSERV);
> Both thsi & the above whitespace changes should be a separate
> patch from any functional changes
>
>
>> @@ -371,8 +372,28 @@ static int inet_connect_addr(const InetSocketAddress *saddr,
>>       }
>>       socket_set_fast_reuse(sock);
>>   
>> +    /* to bind the socket if src_addr is available */
>> +
>> +    if (src_addr) {
>> +        struct sockaddr_in servaddr;
>> +
>> +        /* bind to a specific interface in the internet domain */
>> +        /* to make sure the sin_zero filed is cleared */
>> +        memset(&servaddr, 0, sizeof(servaddr));
>> +
>> +        servaddr.sin_family = AF_INET;
>> +        servaddr.sin_addr.s_addr = inet_addr(src_addr->host);
> We can't assume src_addr is a raw IPv4 address.
>
> THis needs to go through getaddrinfo in the caller like the
> dst address has done.
>
> For sanity, we should also validate that there isn't a mismatch
> of IPv4 vs IPv6 for thte src & dst addrs.
>
>> +        servaddr.sin_port = 0;
>
>
>> +
>> +        if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
>> +            error_setg_errno(errp, errno, "Failed to bind socket");
>> +            return -1;
>> +        }
>> +    }
>> +
>>       /* connect to peer */
>>       do {
>> +
>>           rc = 0;
>>           if (connect(sock, addr->ai_addr, addr->ai_addrlen) < 0) {
>>               rc = -errno;
>> @@ -380,8 +401,14 @@ static int inet_connect_addr(const InetSocketAddress *saddr,
>>       } while (rc == -EINTR);
>>   
>>       if (rc < 0) {
>> -        error_setg_errno(errp, errno, "Failed to connect to '%s:%s'",
>> -                         saddr->host, saddr->port);
>> +        if (src_addr) {
>> +            error_setg_errno(errp, errno, "Failed to connect '%s:%s' to "
>> +                             "'%s:%s'", dst_addr->host, dst_addr->port,
>> +                             src_addr->host, src_addr->port);
>> +        } else {
>> +            error_setg_errno(errp, errno, "Failed to connect '%s:%s'",
>> +                             dst_addr->host, dst_addr->port);
>> +        }
>>           closesocket(sock);
>>           return -1;
>>       }
>> @@ -506,7 +534,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr,
>>       Error *err = NULL;
>>   
>>       /* lookup peer addr */
>> -    memset(&ai,0, sizeof(ai));
>> +    memset(&ai, 0, sizeof(ai));
>>       ai.ai_flags = AI_CANONNAME | AI_V4MAPPED | AI_ADDRCONFIG;
>>       ai.ai_family = inet_ai_family_from_address(sraddr, &err);
>>       ai.ai_socktype = SOCK_DGRAM;
>> @@ -533,7 +561,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr,
>>       }
>>   
>>       /* lookup local addr */
>> -    memset(&ai,0, sizeof(ai));
>> +    memset(&ai, 0, sizeof(ai));
>>       ai.ai_flags = AI_PASSIVE;
>>       ai.ai_family = peer->ai_family;
>>       ai.ai_socktype = SOCK_DGRAM;
>
>> @@ -574,7 +602,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr,
>>       }
>>   
>>       /* connect to peer */
>> -    if (connect(sock,peer->ai_addr,peer->ai_addrlen) < 0) {
>> +    if (connect(sock, peer->ai_addr, peer->ai_addrlen) < 0) {
>>           error_setg_errno(errp, errno, "Failed to connect to '%s:%s'",
>>                            addr, port);
>>           goto err;
> More whitespace changes for a separate patch
>
>
>
>
> With regards,
> Daniel
Daniel P. Berrangé July 20, 2022, 6:52 a.m. UTC | #3
Re-adding the mailing list, please don't drop the list in
replies to discussions.

On Wed, Jul 20, 2022 at 02:08:23AM +0530, Het Gala wrote:
> 
> On 13/07/22 3:10 pm, Het Gala wrote:
> > 
> > On 16/06/22 11:09 pm, Daniel P. Berrangé wrote:
> > > On Thu, Jun 09, 2022 at 07:33:04AM +0000, Het Gala wrote:
> > > > i) Binding of the socket to source ip address and port on the
> > > > non-default
> > > >     interface has been implemented for multi-FD connection,
> > > > which was not
> > > >     necessary earlier because the binding was on the default
> > > > interface itself.
> > > > 
> > > > ii) Created an end to end connection between all multi-FD source and
> > > >      destination pairs.
> > > > 
> > > > Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
> > > > Signed-off-by: Het Gala <het.gala@nutanix.com>
> > > > ---
> > > >   chardev/char-socket.c               |  4 +-
> > > >   include/io/channel-socket.h         | 26 ++++++-----
> > > >   include/qemu/sockets.h              |  6 ++-
> > > >   io/channel-socket.c                 | 50 ++++++++++++++------
> > > >   migration/socket.c                  | 15 +++---
> > > >   nbd/client-connection.c             |  2 +-
> > > >   qemu-nbd.c                          |  4 +-
> > > >   scsi/pr-manager-helper.c            |  1 +
> > > >   tests/unit/test-char.c              |  8 ++--
> > > >   tests/unit/test-io-channel-socket.c |  4 +-
> > > >   tests/unit/test-util-sockets.c      | 16 +++----
> > > >   ui/input-barrier.c                  |  2 +-
> > > >   ui/vnc.c                            |  3 +-
> > > >   util/qemu-sockets.c                 | 71
> > > > ++++++++++++++++++++---------
> > > >   14 files changed, 135 insertions(+), 77 deletions(-)
> > > > 
> > > > diff --git a/chardev/char-socket.c b/chardev/char-socket.c
> > > > index dc4e218eeb..f3725238c5 100644
> > > > --- a/chardev/char-socket.c
> > > > +++ b/chardev/char-socket.c
> > > > @@ -932,7 +932,7 @@ static int
> > > > tcp_chr_connect_client_sync(Chardev *chr, Error **errp)
> > > >       QIOChannelSocket *sioc = qio_channel_socket_new();
> > > >       tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING);
> > > >       tcp_chr_set_client_ioc_name(chr, sioc);
> > > > -    if (qio_channel_socket_connect_sync(sioc, s->addr, errp) < 0) {
> > > > +    if (qio_channel_socket_connect_sync(sioc, s->addr, NULL,
> > > > errp) < 0) {
> > > >           tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED);
> > > >           object_unref(OBJECT(sioc));
> > > >           return -1;
> > > > @@ -1120,7 +1120,7 @@ static void
> > > > tcp_chr_connect_client_task(QIOTask *task,
> > > >       SocketAddress *addr = opaque;
> > > >       Error *err = NULL;
> > > >   -    qio_channel_socket_connect_sync(ioc, addr, &err);
> > > > +    qio_channel_socket_connect_sync(ioc, addr, NULL, &err);
> > > >         qio_task_set_error(task, err);
> > > >   }
> > > > diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
> > > > index 513c428fe4..59d5b1b349 100644
> > > > --- a/include/io/channel-socket.h
> > > > +++ b/include/io/channel-socket.h
> > > > @@ -83,41 +83,45 @@ qio_channel_socket_new_fd(int fd,
> > > >   /**
> > > >    * qio_channel_socket_connect_sync:
> > > >    * @ioc: the socket channel object
> > > > - * @addr: the address to connect to
> > > > + * @dst_addr: the destination address to connect to
> > > > + * @src_addr: the source address to be connected
> > > >    * @errp: pointer to a NULL-initialized error object
> > > >    *
> > > > - * Attempt to connect to the address @addr. This method
> > > > - * will run in the foreground so the caller will not regain
> > > > - * execution control until the connection is established or
> > > > + * Attempt to connect to the address @dst_addr with @src_addr.
> > > > + * This method will run in the foreground so the caller will not
> > > > + * regain execution control until the connection is established or
> > > >    * an error occurs.
> > > >    */
> > > >   int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> > > > -                                    SocketAddress *addr,
> > > > +                                    SocketAddress *dst_addr,
> > > > +                                    SocketAddress *src_addr,
> > > >                                       Error **errp);
> > > >     /**
> > > >    * qio_channel_socket_connect_async:
> > > >    * @ioc: the socket channel object
> > > > - * @addr: the address to connect to
> > > > + * @dst_addr: the destination address to connect to
> > > >    * @callback: the function to invoke on completion
> > > >    * @opaque: user data to pass to @callback
> > > >    * @destroy: the function to free @opaque
> > > >    * @context: the context to run the async task. If %NULL, the default
> > > >    *           context will be used.
> > > > + * @src_addr: the source address to be connected
> > > >    *
> > > > - * Attempt to connect to the address @addr. This method
> > > > - * will run in the background so the caller will regain
> > > > + * Attempt to connect to the address @dst_addr with the @src_addr.
> > > > + * This method will run in the background so the caller will regain
> > > >    * execution control immediately. The function @callback
> > > > - * will be invoked on completion or failure. The @addr
> > > > + * will be invoked on completion or failure. The @dst_addr
> > > >    * parameter will be copied, so may be freed as soon
> > > >    * as this function returns without waiting for completion.
> > > >    */
> > > >   void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
> > > > -                                      SocketAddress *addr,
> > > > +                                      SocketAddress *dst_addr,
> > > >                                         QIOTaskFunc callback,
> > > >                                         gpointer opaque,
> > > >                                         GDestroyNotify destroy,
> > > > -                                      GMainContext *context);
> > > > +                                      GMainContext *context,
> > > > +                                      SocketAddress *src_addr);
> > > Lets avoid modifying these two methods, and thus avoid
> > > updating countless callers.
> > > 
> > > In common with typical pattern in QIO code, lets add
> > > a second variant
> > > 
> > >    qio_channel_socket_connect_full
> > >    qio_channel_socket_connect_full_async
> > > 
> > > which has the extra parameters, then make the existing
> > > simpler methods call the new ones.
> > > 
> > > ie qio_channel_socket_connect should call
> > > qio_channel_socket_connect_full, pasing NULL for the
> > > src_addr.
> > > 
> > > Thanks for the suggestion Daniel. Will modify the same structure as
> > 
> > suggested above in the v2 patchset.
> 
> > Hi Daniel. I agree with your suggestion here, but I have couple of doubts
> in implementing this type.
> 
> 1. You meant to say qio_channel_socket_connect_async calls ->
> qio_channel_socket_connect_all_async and the later function would have a
> extra parameter for src_addr as NULL right. But if you see this approach
> works well for connecting non-multifd channels where source uri is passed as
> NULL, but for multifd channels, as you see the function
> socket_send_channel_create also calls qio_channel_socket_connect_async, but
> this time instead of NULL, it should actually pass a src_addr parameter. So
> in my opion, whatever function multifd function is calling it should have
> extra parameter to pass src_addr.
> 
> 2. Same goes for qio_channel_socket_connect_sync func, for multifd path, it
> should be passed with src_addr instead of NULL.
> 
> 3. I agree, modifying these methods would lead to updating endless callers
> from test cases. But I don't see a better way that this at the moment. And
> out of the two methods, one method is called only for single unit test case
> in qemu.
> 
> We would love to have suggestions from your side Daniel.

Do not modify this existing method signature at all:

 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
                                     SocketAddress *addr,
                                     Error **errp);

Only add a new method:

 int qio_channel_socket_connect_full_sync(QIOChannelSocket *ioc,
                                          SocketAddress *dst_addr,
                                          SocketAddress *src_addr,
                                          Error **errp);

Internally the former method calls the latter, assing NULL for
src_addr.

Externally, only the migration code needs to use the new method,
all the rest of QEMU code must remain unchanged calling the simpler
method.


With regards,
Daniel
diff mbox series

Patch

diff --git a/chardev/char-socket.c b/chardev/char-socket.c
index dc4e218eeb..f3725238c5 100644
--- a/chardev/char-socket.c
+++ b/chardev/char-socket.c
@@ -932,7 +932,7 @@  static int tcp_chr_connect_client_sync(Chardev *chr, Error **errp)
     QIOChannelSocket *sioc = qio_channel_socket_new();
     tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING);
     tcp_chr_set_client_ioc_name(chr, sioc);
-    if (qio_channel_socket_connect_sync(sioc, s->addr, errp) < 0) {
+    if (qio_channel_socket_connect_sync(sioc, s->addr, NULL, errp) < 0) {
         tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED);
         object_unref(OBJECT(sioc));
         return -1;
@@ -1120,7 +1120,7 @@  static void tcp_chr_connect_client_task(QIOTask *task,
     SocketAddress *addr = opaque;
     Error *err = NULL;
 
-    qio_channel_socket_connect_sync(ioc, addr, &err);
+    qio_channel_socket_connect_sync(ioc, addr, NULL, &err);
 
     qio_task_set_error(task, err);
 }
diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index 513c428fe4..59d5b1b349 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -83,41 +83,45 @@  qio_channel_socket_new_fd(int fd,
 /**
  * qio_channel_socket_connect_sync:
  * @ioc: the socket channel object
- * @addr: the address to connect to
+ * @dst_addr: the destination address to connect to
+ * @src_addr: the source address to be connected
  * @errp: pointer to a NULL-initialized error object
  *
- * Attempt to connect to the address @addr. This method
- * will run in the foreground so the caller will not regain
- * execution control until the connection is established or
+ * Attempt to connect to the address @dst_addr with @src_addr.
+ * This method will run in the foreground so the caller will not
+ * regain execution control until the connection is established or
  * an error occurs.
  */
 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
-                                    SocketAddress *addr,
+                                    SocketAddress *dst_addr,
+                                    SocketAddress *src_addr,
                                     Error **errp);
 
 /**
  * qio_channel_socket_connect_async:
  * @ioc: the socket channel object
- * @addr: the address to connect to
+ * @dst_addr: the destination address to connect to
  * @callback: the function to invoke on completion
  * @opaque: user data to pass to @callback
  * @destroy: the function to free @opaque
  * @context: the context to run the async task. If %NULL, the default
  *           context will be used.
+ * @src_addr: the source address to be connected
  *
- * Attempt to connect to the address @addr. This method
- * will run in the background so the caller will regain
+ * Attempt to connect to the address @dst_addr with the @src_addr.
+ * This method will run in the background so the caller will regain
  * execution control immediately. The function @callback
- * will be invoked on completion or failure. The @addr
+ * will be invoked on completion or failure. The @dst_addr
  * parameter will be copied, so may be freed as soon
  * as this function returns without waiting for completion.
  */
 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
-                                      SocketAddress *addr,
+                                      SocketAddress *dst_addr,
                                       QIOTaskFunc callback,
                                       gpointer opaque,
                                       GDestroyNotify destroy,
-                                      GMainContext *context);
+                                      GMainContext *context,
+                                      SocketAddress *src_addr);
 
 
 /**
diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h
index 038faa157f..dc863c3df8 100644
--- a/include/qemu/sockets.h
+++ b/include/qemu/sockets.h
@@ -33,7 +33,8 @@  int inet_ai_family_from_address(InetSocketAddress *addr,
                                 Error **errp);
 int inet_parse(InetSocketAddress *addr, const char *str, Error **errp);
 int inet_connect(const char *str, Error **errp);
-int inet_connect_saddr(InetSocketAddress *saddr, Error **errp);
+int inet_connect_saddr(InetSocketAddress *dst_addr,
+                       InetSocketAddress *src_addr, Error **errp);
 
 NetworkAddressFamily inet_netfamily(int family);
 
@@ -41,7 +42,8 @@  int unix_listen(const char *path, Error **errp);
 int unix_connect(const char *path, Error **errp);
 
 SocketAddress *socket_parse(const char *str, Error **errp);
-int socket_connect(SocketAddress *addr, Error **errp);
+int socket_connect(SocketAddress *dst_addr, SocketAddress *src_addr,
+                   Error **errp);
 int socket_listen(SocketAddress *addr, int num, Error **errp);
 void socket_listen_cleanup(int fd, Error **errp);
 int socket_dgram(SocketAddress *remote, SocketAddress *local, Error **errp);
diff --git a/io/channel-socket.c b/io/channel-socket.c
index dc9c165de1..f8746ad646 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -36,6 +36,12 @@ 
 
 #define SOCKET_MAX_FDS 16
 
+struct SrcDestAddress {
+    SocketAddress *dst_addr;
+    SocketAddress *src_addr;
+};
+
+
 SocketAddress *
 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
                                      Error **errp)
@@ -145,13 +151,14 @@  qio_channel_socket_new_fd(int fd,
 
 
 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
-                                    SocketAddress *addr,
+                                    SocketAddress *dst_addr,
+                                    SocketAddress *src_addr,
                                     Error **errp)
 {
     int fd;
 
-    trace_qio_channel_socket_connect_sync(ioc, addr);
-    fd = socket_connect(addr, errp);
+    trace_qio_channel_socket_connect_sync(ioc, dst_addr);
+    fd = socket_connect(dst_addr, src_addr, errp);
     if (fd < 0) {
         trace_qio_channel_socket_connect_fail(ioc);
         return -1;
@@ -177,39 +184,56 @@  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
 }
 
 
+static void qio_channel_socket_worker_free(gpointer opaque)
+{
+    struct SrcDestAddress *data = opaque;
+    if (!data) {
+        return;
+    }
+    qapi_free_SocketAddress(data->dst_addr);
+    qapi_free_SocketAddress(data->src_addr);
+    g_free(data);
+}
+
+
 static void qio_channel_socket_connect_worker(QIOTask *task,
                                               gpointer opaque)
 {
     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
-    SocketAddress *addr = opaque;
+    struct SrcDestAddress *data = opaque;
     Error *err = NULL;
 
-    qio_channel_socket_connect_sync(ioc, addr, &err);
+    qio_channel_socket_connect_sync(ioc, data->dst_addr, data->src_addr, &err);
 
     qio_task_set_error(task, err);
 }
 
 
 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
-                                      SocketAddress *addr,
+                                      SocketAddress *dst_addr,
                                       QIOTaskFunc callback,
                                       gpointer opaque,
                                       GDestroyNotify destroy,
-                                      GMainContext *context)
+                                      GMainContext *context,
+                                      SocketAddress *src_addr)
 {
     QIOTask *task = qio_task_new(
         OBJECT(ioc), callback, opaque, destroy);
-    SocketAddress *addrCopy;
-
-    addrCopy = QAPI_CLONE(SocketAddress, addr);
+    struct SrcDestAddress *data = g_new0(struct SrcDestAddress, 1);
 
+    data->dst_addr = QAPI_CLONE(SocketAddress, dst_addr);
+    if (src_addr) {
+        data->src_addr = QAPI_CLONE(SocketAddress, src_addr);
+    } else {
+        data->src_addr = NULL;
+    }
     /* socket_connect() does a non-blocking connect(), but it
      * still blocks in DNS lookups, so we must use a thread */
-    trace_qio_channel_socket_connect_async(ioc, addr);
+    trace_qio_channel_socket_connect_async(ioc, dst_addr);
     qio_task_run_in_thread(task,
                            qio_channel_socket_connect_worker,
-                           addrCopy,
-                           (GDestroyNotify)qapi_free_SocketAddress,
+                           data,
+                           qio_channel_socket_worker_free,
                            context);
 }
 
diff --git a/migration/socket.c b/migration/socket.c
index 21e0983df2..d0cb7cc6a6 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -47,7 +47,7 @@  void socket_send_channel_create(QIOTaskFunc f, void *data)
 {
     QIOChannelSocket *sioc = qio_channel_socket_new();
     qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
-                                     f, data, NULL, NULL);
+                                     f, data, NULL, NULL, NULL);
 }
 
 int socket_send_channel_destroy(QIOChannel *send)
@@ -110,7 +110,7 @@  out:
 
 static void
 socket_start_outgoing_migration_internal(MigrationState *s,
-                                         SocketAddress *saddr,
+                                         SocketAddress *dst_addr,
                                          Error **errp)
 {
     QIOChannelSocket *sioc = qio_channel_socket_new();
@@ -118,20 +118,17 @@  socket_start_outgoing_migration_internal(MigrationState *s,
 
     data->s = s;
 
-    /* in case previous migration leaked it */
-    qapi_free_SocketAddress(outgoing_args.saddr);
-    outgoing_args.saddr = saddr;
-
-    if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
-        data->hostname = g_strdup(saddr->u.inet.host);
+    if (dst_addr->type == SOCKET_ADDRESS_TYPE_INET) {
+        data->hostname = g_strdup(dst_addr->u.inet.host);
     }
 
     qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-outgoing");
     qio_channel_socket_connect_async(sioc,
-                                     saddr,
+                                     dst_addr,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free,
+                                     NULL,
                                      NULL);
 }
 
diff --git a/nbd/client-connection.c b/nbd/client-connection.c
index 2a632931c3..4bc505d26e 100644
--- a/nbd/client-connection.c
+++ b/nbd/client-connection.c
@@ -133,7 +133,7 @@  static int nbd_connect(QIOChannelSocket *sioc, SocketAddress *addr,
         *outioc = NULL;
     }
 
-    ret = qio_channel_socket_connect_sync(sioc, addr, errp);
+    ret = qio_channel_socket_connect_sync(sioc, addr, NULL, errp);
     if (ret < 0) {
         return ret;
     }
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 0cd5aa6f02..fe178f271e 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -191,7 +191,7 @@  static int qemu_nbd_client_list(SocketAddress *saddr, QCryptoTLSCreds *tls,
     int i, j;
 
     sioc = qio_channel_socket_new();
-    if (qio_channel_socket_connect_sync(sioc, saddr, &err) < 0) {
+    if (qio_channel_socket_connect_sync(sioc, saddr, NULL, &err) < 0) {
         error_report_err(err);
         goto out;
     }
@@ -284,7 +284,7 @@  static void *nbd_client_thread(void *arg)
 
     sioc = qio_channel_socket_new();
     if (qio_channel_socket_connect_sync(sioc,
-                                        saddr,
+                                        saddr, NULL,
                                         &local_error) < 0) {
         error_report_err(local_error);
         goto out;
diff --git a/scsi/pr-manager-helper.c b/scsi/pr-manager-helper.c
index 3be52a98d5..6e3e00eadf 100644
--- a/scsi/pr-manager-helper.c
+++ b/scsi/pr-manager-helper.c
@@ -114,6 +114,7 @@  static int pr_manager_helper_initialize(PRManagerHelper *pr_mgr,
     qio_channel_set_name(QIO_CHANNEL(sioc), "pr-manager-helper");
     qio_channel_socket_connect_sync(sioc,
                                     &saddr,
+                                    NULL,
                                     &local_err);
     g_free(path);
     if (local_err) {
diff --git a/tests/unit/test-char.c b/tests/unit/test-char.c
index 5b3b48ebac..fd5f281777 100644
--- a/tests/unit/test-char.c
+++ b/tests/unit/test-char.c
@@ -691,7 +691,7 @@  char_socket_addr_to_opt_str(SocketAddress *addr, bool fd_pass,
         if (is_listen) {
             qio_channel_socket_listen_sync(ioc, addr, 1, &error_abort);
         } else {
-            qio_channel_socket_connect_sync(ioc, addr, &error_abort);
+            qio_channel_socket_connect_sync(ioc, addr, NULL, &error_abort);
         }
         fd = ioc->fd;
         ioc->fd = -1;
@@ -748,7 +748,7 @@  char_socket_server_client_thread(gpointer data)
     SocketAddress *addr = data;
     QIOChannelSocket *ioc = qio_channel_socket_new();
 
-    qio_channel_socket_connect_sync(ioc, addr, &error_abort);
+    qio_channel_socket_connect_sync(ioc, addr, NULL, &error_abort);
 
     char_socket_ping_pong(QIO_CHANNEL(ioc), &error_abort);
 
@@ -1147,7 +1147,7 @@  static void char_socket_server_two_clients_test(gconstpointer opaque)
                              &closed, NULL, true);
 
     ioc1 = qio_channel_socket_new();
-    qio_channel_socket_connect_sync(ioc1, addr, &error_abort);
+    qio_channel_socket_connect_sync(ioc1, addr, NULL, &error_abort);
     qemu_chr_wait_connected(chr, &error_abort);
 
     /* switch the chardev to another context */
@@ -1161,7 +1161,7 @@  static void char_socket_server_two_clients_test(gconstpointer opaque)
      * succeed immediately.
      */
     ioc2 = qio_channel_socket_new();
-    qio_channel_socket_connect_sync(ioc2, addr, &error_abort);
+    qio_channel_socket_connect_sync(ioc2, addr, NULL, &error_abort);
 
     object_unref(OBJECT(ioc1));
     /* The two connections should now be processed serially.  */
diff --git a/tests/unit/test-io-channel-socket.c b/tests/unit/test-io-channel-socket.c
index 6713886d02..bc33ce6956 100644
--- a/tests/unit/test-io-channel-socket.c
+++ b/tests/unit/test-io-channel-socket.c
@@ -72,7 +72,7 @@  static void test_io_channel_setup_sync(SocketAddress *listen_addr,
 
     *src = QIO_CHANNEL(qio_channel_socket_new());
     qio_channel_socket_connect_sync(
-        QIO_CHANNEL_SOCKET(*src), connect_addr, &error_abort);
+        QIO_CHANNEL_SOCKET(*src), connect_addr, NULL, &error_abort);
     qio_channel_set_delay(*src, false);
 
     qio_channel_wait(QIO_CHANNEL(lioc), G_IO_IN);
@@ -136,7 +136,7 @@  static void test_io_channel_setup_async(SocketAddress *listen_addr,
 
     qio_channel_socket_connect_async(
         QIO_CHANNEL_SOCKET(*src), connect_addr,
-        test_io_channel_complete, &data, NULL, NULL);
+        test_io_channel_complete, &data, NULL, NULL, NULL);
 
     g_main_loop_run(data.loop);
     g_main_context_iteration(g_main_context_default(), FALSE);
diff --git a/tests/unit/test-util-sockets.c b/tests/unit/test-util-sockets.c
index 63909ccb2b..aa26630045 100644
--- a/tests/unit/test-util-sockets.c
+++ b/tests/unit/test-util-sockets.c
@@ -89,7 +89,7 @@  static void test_socket_fd_pass_name_good(void)
     addr.type = SOCKET_ADDRESS_TYPE_FD;
     addr.u.fd.str = g_strdup(mon_fdname);
 
-    fd = socket_connect(&addr, &error_abort);
+    fd = socket_connect(&addr, NULL, &error_abort);
     g_assert_cmpint(fd, !=, -1);
     g_assert_cmpint(fd, !=, mon_fd);
     close(fd);
@@ -121,7 +121,7 @@  static void test_socket_fd_pass_name_bad(void)
     addr.type = SOCKET_ADDRESS_TYPE_FD;
     addr.u.fd.str = g_strdup(mon_fdname);
 
-    fd = socket_connect(&addr, &err);
+    fd = socket_connect(&addr, NULL, &err);
     g_assert_cmpint(fd, ==, -1);
     error_free_or_abort(&err);
 
@@ -148,7 +148,7 @@  static void test_socket_fd_pass_name_nomon(void)
     addr.type = SOCKET_ADDRESS_TYPE_FD;
     addr.u.fd.str = g_strdup("myfd");
 
-    fd = socket_connect(&addr, &err);
+    fd = socket_connect(&addr, NULL, &err);
     g_assert_cmpint(fd, ==, -1);
     error_free_or_abort(&err);
 
@@ -172,7 +172,7 @@  static void test_socket_fd_pass_num_good(void)
     addr.type = SOCKET_ADDRESS_TYPE_FD;
     addr.u.fd.str = g_strdup_printf("%d", sfd);
 
-    fd = socket_connect(&addr, &error_abort);
+    fd = socket_connect(&addr, NULL, &error_abort);
     g_assert_cmpint(fd, ==, sfd);
 
     fd = socket_listen(&addr, 1, &error_abort);
@@ -194,7 +194,7 @@  static void test_socket_fd_pass_num_bad(void)
     addr.type = SOCKET_ADDRESS_TYPE_FD;
     addr.u.fd.str = g_strdup_printf("%d", sfd);
 
-    fd = socket_connect(&addr, &err);
+    fd = socket_connect(&addr, NULL, &err);
     g_assert_cmpint(fd, ==, -1);
     error_free_or_abort(&err);
 
@@ -217,7 +217,7 @@  static void test_socket_fd_pass_num_nocli(void)
     addr.type = SOCKET_ADDRESS_TYPE_FD;
     addr.u.fd.str = g_strdup_printf("%d", STDOUT_FILENO);
 
-    fd = socket_connect(&addr, &err);
+    fd = socket_connect(&addr, NULL, &err);
     g_assert_cmpint(fd, ==, -1);
     error_free_or_abort(&err);
 
@@ -246,10 +246,10 @@  static gpointer unix_client_thread_func(gpointer user_data)
 
     for (i = 0; i < ABSTRACT_SOCKET_VARIANTS; i++) {
         if (row->expect_connect[i]) {
-            fd = socket_connect(row->client[i], &error_abort);
+            fd = socket_connect(row->client[i], NULL, &error_abort);
             g_assert_cmpint(fd, >=, 0);
         } else {
-            fd = socket_connect(row->client[i], &err);
+            fd = socket_connect(row->client[i], NULL, &err);
             g_assert_cmpint(fd, ==, -1);
             error_free_or_abort(&err);
         }
diff --git a/ui/input-barrier.c b/ui/input-barrier.c
index 2d57ca7079..ee78654e3b 100644
--- a/ui/input-barrier.c
+++ b/ui/input-barrier.c
@@ -506,7 +506,7 @@  static void input_barrier_complete(UserCreatable *uc, Error **errp)
     ib->sioc = qio_channel_socket_new();
     qio_channel_set_name(QIO_CHANNEL(ib->sioc), "barrier-client");
 
-    qio_channel_socket_connect_sync(ib->sioc, &ib->saddr, &local_err);
+    qio_channel_socket_connect_sync(ib->sioc, &ib->saddr, NULL, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         return;
diff --git a/ui/vnc.c b/ui/vnc.c
index 6a05d06147..c3dd84026a 100644
--- a/ui/vnc.c
+++ b/ui/vnc.c
@@ -3931,7 +3931,8 @@  static int vnc_display_connect(VncDisplay *vd,
     vd->is_unix = saddr_list->value->type == SOCKET_ADDRESS_TYPE_UNIX;
     sioc = qio_channel_socket_new();
     qio_channel_set_name(QIO_CHANNEL(sioc), "vnc-reverse");
-    if (qio_channel_socket_connect_sync(sioc, saddr_list->value, errp) < 0) {
+    if (qio_channel_socket_connect_sync(sioc, saddr_list->value,
+                                        NULL, errp) < 0) {
         object_unref(OBJECT(sioc));
         return -1;
     }
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index 13b5b197f9..bbe0dc0ee0 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -226,7 +226,7 @@  static int inet_listen_saddr(InetSocketAddress *saddr,
         return -1;
     }
 
-    memset(&ai,0, sizeof(ai));
+    memset(&ai,0,sizeof(ai));
     ai.ai_flags = AI_PASSIVE;
     if (saddr->has_numeric && saddr->numeric) {
         ai.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
@@ -282,8 +282,8 @@  static int inet_listen_saddr(InetSocketAddress *saddr,
             e->ai_protocol = IPPROTO_MPTCP;
         }
 #endif
-        getnameinfo((struct sockaddr*)e->ai_addr,e->ai_addrlen,
-                        uaddr,INET6_ADDRSTRLEN,uport,32,
+        getnameinfo((struct sockaddr *)e->ai_addr, e->ai_addrlen,
+                        uaddr, INET6_ADDRSTRLEN, uport, 32,
                         NI_NUMERICHOST | NI_NUMERICSERV);
 
         port_min = inet_getport(e);
@@ -358,7 +358,8 @@  listen_ok:
     ((rc) == -EINPROGRESS)
 #endif
 
-static int inet_connect_addr(const InetSocketAddress *saddr,
+static int inet_connect_addr(const InetSocketAddress *dst_addr,
+                             const InetSocketAddress *src_addr,
                              struct addrinfo *addr, Error **errp)
 {
     int sock, rc;
@@ -371,8 +372,28 @@  static int inet_connect_addr(const InetSocketAddress *saddr,
     }
     socket_set_fast_reuse(sock);
 
+    /* to bind the socket if src_addr is available */
+
+    if (src_addr) {
+        struct sockaddr_in servaddr;
+
+        /* bind to a specific interface in the internet domain */
+        /* to make sure the sin_zero filed is cleared */
+        memset(&servaddr, 0, sizeof(servaddr));
+
+        servaddr.sin_family = AF_INET;
+        servaddr.sin_addr.s_addr = inet_addr(src_addr->host);
+        servaddr.sin_port = 0;
+
+        if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
+            error_setg_errno(errp, errno, "Failed to bind socket");
+            return -1;
+        }
+    }
+
     /* connect to peer */
     do {
+
         rc = 0;
         if (connect(sock, addr->ai_addr, addr->ai_addrlen) < 0) {
             rc = -errno;
@@ -380,8 +401,14 @@  static int inet_connect_addr(const InetSocketAddress *saddr,
     } while (rc == -EINTR);
 
     if (rc < 0) {
-        error_setg_errno(errp, errno, "Failed to connect to '%s:%s'",
-                         saddr->host, saddr->port);
+        if (src_addr) {
+            error_setg_errno(errp, errno, "Failed to connect '%s:%s' to "
+                             "'%s:%s'", dst_addr->host, dst_addr->port,
+                             src_addr->host, src_addr->port);
+        } else {
+            error_setg_errno(errp, errno, "Failed to connect '%s:%s'",
+                             dst_addr->host, dst_addr->port);
+        }
         closesocket(sock);
         return -1;
     }
@@ -446,13 +473,14 @@  static struct addrinfo *inet_parse_connect_saddr(InetSocketAddress *saddr,
  *
  * Returns: -1 on error, file descriptor on success.
  */
-int inet_connect_saddr(InetSocketAddress *saddr, Error **errp)
+int inet_connect_saddr(InetSocketAddress *dst_addr,
+                       InetSocketAddress *src_addr, Error **errp)
 {
     Error *local_err = NULL;
     struct addrinfo *res, *e;
     int sock = -1;
 
-    res = inet_parse_connect_saddr(saddr, errp);
+    res = inet_parse_connect_saddr(dst_addr, errp);
     if (!res) {
         return -1;
     }
@@ -462,12 +490,12 @@  int inet_connect_saddr(InetSocketAddress *saddr, Error **errp)
         local_err = NULL;
 
 #ifdef HAVE_IPPROTO_MPTCP
-        if (saddr->has_mptcp && saddr->mptcp) {
+        if (dst_addr->has_mptcp && dst_addr->mptcp) {
             e->ai_protocol = IPPROTO_MPTCP;
         }
 #endif
 
-        sock = inet_connect_addr(saddr, e, &local_err);
+        sock = inet_connect_addr(dst_addr, src_addr, e, &local_err);
         if (sock >= 0) {
             break;
         }
@@ -480,7 +508,7 @@  int inet_connect_saddr(InetSocketAddress *saddr, Error **errp)
         return sock;
     }
 
-    if (saddr->keep_alive) {
+    if (dst_addr->keep_alive) {
         int val = 1;
         int ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
                              &val, sizeof(val));
@@ -506,7 +534,7 @@  static int inet_dgram_saddr(InetSocketAddress *sraddr,
     Error *err = NULL;
 
     /* lookup peer addr */
-    memset(&ai,0, sizeof(ai));
+    memset(&ai, 0, sizeof(ai));
     ai.ai_flags = AI_CANONNAME | AI_V4MAPPED | AI_ADDRCONFIG;
     ai.ai_family = inet_ai_family_from_address(sraddr, &err);
     ai.ai_socktype = SOCK_DGRAM;
@@ -533,7 +561,7 @@  static int inet_dgram_saddr(InetSocketAddress *sraddr,
     }
 
     /* lookup local addr */
-    memset(&ai,0, sizeof(ai));
+    memset(&ai, 0, sizeof(ai));
     ai.ai_flags = AI_PASSIVE;
     ai.ai_family = peer->ai_family;
     ai.ai_socktype = SOCK_DGRAM;
@@ -574,7 +602,7 @@  static int inet_dgram_saddr(InetSocketAddress *sraddr,
     }
 
     /* connect to peer */
-    if (connect(sock,peer->ai_addr,peer->ai_addrlen) < 0) {
+    if (connect(sock, peer->ai_addr, peer->ai_addrlen) < 0) {
         error_setg_errno(errp, errno, "Failed to connect to '%s:%s'",
                          addr, port);
         goto err;
@@ -727,7 +755,7 @@  int inet_connect(const char *str, Error **errp)
     InetSocketAddress *addr = g_new(InetSocketAddress, 1);
 
     if (!inet_parse(addr, str, errp)) {
-        sock = inet_connect_saddr(addr, errp);
+        sock = inet_connect_saddr(addr, NULL, errp);
     }
     qapi_free_InetSocketAddress(addr);
     return sock;
@@ -1182,25 +1210,26 @@  int socket_address_parse_named_fd(SocketAddress *addr, Error **errp)
     return 0;
 }
 
-int socket_connect(SocketAddress *addr, Error **errp)
+int socket_connect(SocketAddress *dst_addr, SocketAddress *src_addr, Error **errp)
 {
     int fd;
 
-    switch (addr->type) {
+    switch (dst_addr->type) {
     case SOCKET_ADDRESS_TYPE_INET:
-        fd = inet_connect_saddr(&addr->u.inet, errp);
+        fd = inet_connect_saddr(&dst_addr->u.inet, src_addr ?
+                                &src_addr->u.inet : NULL, errp);
         break;
 
     case SOCKET_ADDRESS_TYPE_UNIX:
-        fd = unix_connect_saddr(&addr->u.q_unix, errp);
+        fd = unix_connect_saddr(&dst_addr->u.q_unix, errp);
         break;
 
     case SOCKET_ADDRESS_TYPE_FD:
-        fd = socket_get_fd(addr->u.fd.str, errp);
+        fd = socket_get_fd(dst_addr->u.fd.str, errp);
         break;
 
     case SOCKET_ADDRESS_TYPE_VSOCK:
-        fd = vsock_connect_saddr(&addr->u.vsock, errp);
+        fd = vsock_connect_saddr(&dst_addr->u.vsock, errp);
         break;
 
     default: