diff mbox series

[v3,2/3] QIOChannelSocket: Implement io_async_write & io_async_flush

Message ID 20210922222423.644444-3-leobras@redhat.com
State New
Headers show
Series QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd | expand

Commit Message

Leonardo Bras Sept. 22, 2021, 10:24 p.m. UTC
Implement the new optional callbacks io_async_write and io_async_flush on
QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
available in the host kernel, and TCP sockets are used.

qio_channel_socket_writev() contents were moved to a helper function
__qio_channel_socket_writev() which accepts an extra 'flag' argument.
This helper function is used to implement qio_channel_socket_writev(), with
flags = 0, keeping it's behavior unchanged, and
qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.

qio_channel_socket_async_flush() was implemented by reading the socket's error
queue, which will have information on MSG_ZEROCOPY send completion.
There is no need to worry with re-sending packets in case any error happens, as
MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.

Notes on using async_write():
- As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
some caution is necessary to avoid overwriting any buffer before it's sent.
If something like this happen, a newer version of the buffer may be sent instead.
- If this is a problem, it's recommended to use async_flush() before freeing or
re-using the buffer.
- When using MSG_ZERCOCOPY, the buffer memory will be locked, so it may require
a larger amount than usually available to non-root user.
- If the required amount of locked memory is not available, it falls-back to
buffer copying behavior, and synchronous sending.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel-socket.h |   2 +
 include/io/channel.h        |   1 +
 io/channel-socket.c         | 176 ++++++++++++++++++++++++++++++++++--
 3 files changed, 169 insertions(+), 10 deletions(-)

Comments

Daniel P. Berrangé Sept. 24, 2021, 5:38 p.m. UTC | #1
On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> Implement the new optional callbacks io_async_write and io_async_flush on
> QIOChannelSocket, but enables it only when MSG_ZEROCOPY feature is
> available in the host kernel, and TCP sockets are used.
> 
> qio_channel_socket_writev() contents were moved to a helper function
> __qio_channel_socket_writev() which accepts an extra 'flag' argument.
> This helper function is used to implement qio_channel_socket_writev(), with
> flags = 0, keeping it's behavior unchanged, and
> qio_channel_socket_async_writev() with flags = MSG_ZEROCOPY.
> 
> qio_channel_socket_async_flush() was implemented by reading the socket's error
> queue, which will have information on MSG_ZEROCOPY send completion.
> There is no need to worry with re-sending packets in case any error happens, as
> MSG_ZEROCOPY only works with TCP and it will re-tranmsmit if any error ocurs.
> 
> Notes on using async_write():
> - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
> some caution is necessary to avoid overwriting any buffer before it's sent.
> If something like this happen, a newer version of the buffer may be sent instead.
> - If this is a problem, it's recommended to use async_flush() before freeing or
> re-using the buffer.
> - When using MSG_ZERCOCOPY, the buffer memory will be locked, so it may require
> a larger amount than usually available to non-root user.
> - If the required amount of locked memory is not available, it falls-back to
> buffer copying behavior, and synchronous sending.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  include/io/channel-socket.h |   2 +
>  include/io/channel.h        |   1 +
>  io/channel-socket.c         | 176 ++++++++++++++++++++++++++++++++++--
>  3 files changed, 169 insertions(+), 10 deletions(-)
> 
> diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
> index e747e63514..4d1be0637a 100644
> --- a/include/io/channel-socket.h
> +++ b/include/io/channel-socket.h
> @@ -47,6 +47,8 @@ struct QIOChannelSocket {
>      socklen_t localAddrLen;
>      struct sockaddr_storage remoteAddr;
>      socklen_t remoteAddrLen;
> +    ssize_t async_queued;
> +    ssize_t async_sent;
>  };
>  
>  
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 74f2e3ae8a..611bb2ea26 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -31,6 +31,7 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
>  
>  
>  #define QIO_CHANNEL_ERR_BLOCK -2
> +#define QIO_CHANNEL_ERR_NOBUFS -3
>  
>  typedef enum QIOChannelFeature QIOChannelFeature;
>  
> diff --git a/io/channel-socket.c b/io/channel-socket.c
> index 606ec97cf7..c67832d0bb 100644
> --- a/io/channel-socket.c
> +++ b/io/channel-socket.c
> @@ -26,9 +26,23 @@
>  #include "io/channel-watch.h"
>  #include "trace.h"
>  #include "qapi/clone-visitor.h"
> +#ifdef CONFIG_LINUX
> +#include <linux/errqueue.h>
> +#include <poll.h>
> +#endif
>  
>  #define SOCKET_MAX_FDS 16
>  
> +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
> +                                               const struct iovec *iov,
> +                                               size_t niov,
> +                                               int *fds,
> +                                               size_t nfds,
> +                                               Error **errp);
> +
> +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> +                                           Error **errp);
> +
>  SocketAddress *
>  qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
>                                       Error **errp)
> @@ -55,6 +69,8 @@ qio_channel_socket_new(void)
>  
>      sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
>      sioc->fd = -1;
> +    sioc->async_queued = 0;
> +    sioc->async_sent = 0;
>  
>      ioc = QIO_CHANNEL(sioc);
>      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@ -140,6 +156,7 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>                                      Error **errp)
>  {
>      int fd;
> +    int ret, v = 1;
>  
>      trace_qio_channel_socket_connect_sync(ioc, addr);
>      fd = socket_connect(addr, errp);
> @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>          return -1;
>      }
>  
> +#ifdef CONFIG_LINUX
> +    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
> +        return 0;
> +    }
> +
> +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> +    if (ret >= 0) {
> +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +        klass->io_async_writev = qio_channel_socket_async_writev;
> +        klass->io_async_flush = qio_channel_socket_async_flush;
> +    }
> +#endif

This is not write - the async APIs should not be tied 1:1 to ZEROCOPY
usage - we should have them take a flag to request ZEROCOPY behaviour.

> +
>      return 0;
>  }
>  
> @@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
>      return ret;
>  }
>  
> -static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> -                                         const struct iovec *iov,
> -                                         size_t niov,
> -                                         int *fds,
> -                                         size_t nfds,
> -                                         Error **errp)
> +static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
> +                                           const struct iovec *iov,
> +                                           size_t niov,
> +                                           int *fds,
> +                                           size_t nfds,
> +                                           int flags,
> +                                           Error **errp)
>  {
>      QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
>      ssize_t ret;
> @@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
>      }
>  
>   retry:
> -    ret = sendmsg(sioc->fd, &msg, 0);
> +    ret = sendmsg(sioc->fd, &msg, flags);
>      if (ret <= 0) {
> -        if (errno == EAGAIN) {
> +        switch (errno) {
> +        case EAGAIN:
>              return QIO_CHANNEL_ERR_BLOCK;
> -        }
> -        if (errno == EINTR) {
> +        case EINTR:
>              goto retry;
> +        case ENOBUFS:
> +            return QIO_CHANNEL_ERR_NOBUFS;
>          }
> +
>          error_setg_errno(errp, errno,
>                           "Unable to write to socket");
>          return -1;
>      }
>      return ret;
>  }
> +
> +static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> +                                         const struct iovec *iov,
> +                                         size_t niov,
> +                                         int *fds,
> +                                         size_t nfds,
> +                                         Error **errp)
> +{
> +    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
> +}
> +
> +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
> +                                               const struct iovec *iov,
> +                                               size_t niov,
> +                                               int *fds,
> +                                               size_t nfds,
> +                                               Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    ssize_t ret;
> +
> +    sioc->async_queued++;
> +
> +    ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
> +                                       errp);
> +    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
> +        /*
> +         * Not enough locked memory available to the process.
> +         * Fallback to default sync callback.
> +         */
> +
> +        if (errp && *errp) {
> +            warn_reportf_err(*errp,
> +                             "Process can't lock enough memory for using MSG_ZEROCOPY,"
> +                             "falling back to non-zerocopy");

This is not nice as it hides what is likely mis-configuration error.
If someone asked for zerocopy, we should honour that or report an
error back.

> +        }
> +
> +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +        klass->io_async_writev = NULL;
> +        klass->io_async_flush = NULL;

Clearing the flush callback is wrong. We might have pending async
writes that haven't been processed that, and the lack of buffers
may be a transient problem just caused by a backlog of writes.

> +
> +        /* Re-send current buffer */
> +        ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
> +    return ret;
> +}
> +
> +
> +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> +                                           Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    struct msghdr msg = {};
> +    struct pollfd pfd;
> +    struct sock_extended_err *serr;
> +    struct cmsghdr *cm;
> +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];

Err  sizeof(int) * SOCKET_MAX_FDS   doesn't look right. This
buffer needs to hold 'struct sock_extended_err' instances,
not 'int', and SOCKET_MAX_FDS is an unrelated limit.

> +    int ret;
> +
> +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> +    msg.msg_control = control;
> +    msg.msg_controllen = sizeof(control);
> +
> +    while (sioc->async_sent < sioc->async_queued) {
> +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> +        if (ret < 0) {
> +            if (errno == EAGAIN) {
> +                /* Nothing on errqueue, wait */
> +                pfd.fd = sioc->fd;
> +                pfd.events = 0;
> +                ret = poll(&pfd, 1, 250);
> +                if (ret == 0) {
> +                    /*
> +                     * Timeout : After 250ms without receiving any zerocopy
> +                     * notification, consider all data as sent.
> +                     */

This feels very dubious indeed. If some caller needs a guarantee that the
data was successfully sent, merely waiting 250ms is not going to be reliable
enough. 

A regular non-async + non-zerocopy right will wait as long as is needed
unless SO_SNDTIMEO has been set on the socket.

At the very least the timeout ought to be a parameter passed in, and the
return value should indicate whether it timed out, or report how many
pending writes still aren't processed, so the caller can decide whether
to call flush again.

> +                    break;
> +                } else if (ret < 0 ||
> +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> +                    error_setg_errno(errp, errno,
> +                                     "Poll error");
> +                    break;
> +                } else {
> +                    continue;
> +                }
> +            }
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +
> +            error_setg_errno(errp, errno,
> +                             "Unable to read errqueue");
> +            break;
> +        }
> +
> +        cm = CMSG_FIRSTHDR(&msg);
> +        if (cm->cmsg_level != SOL_IP &&
> +            cm->cmsg_type != IP_RECVERR) {
> +            error_setg_errno(errp, EPROTOTYPE,
> +                             "Wrong cmsg in errqueue");
> +            break;
> +        }
> +
> +        serr = (void *) CMSG_DATA(cm);
> +        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
> +            error_setg_errno(errp, serr->ee_errno,
> +                             "Error on socket");
> +            break;
> +        }
> +        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
> +            error_setg_errno(errp, serr->ee_origin,
> +                             "Error not from zerocopy");
> +            break;
> +        }
> +
> +        /* No errors, count sent ids*/
> +        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
> +    }
> +}
> +
> +
>  #else /* WIN32 */
>  static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
>                                          const struct iovec *iov,

Regards,
Daniel
Peter Xu Sept. 28, 2021, 10:45 p.m. UTC | #2
On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> +                                           Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    struct msghdr msg = {};
> +    struct pollfd pfd;
> +    struct sock_extended_err *serr;
> +    struct cmsghdr *cm;
> +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
> +    int ret;
> +
> +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> +    msg.msg_control = control;
> +    msg.msg_controllen = sizeof(control);
> +
> +    while (sioc->async_sent < sioc->async_queued) {
> +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> +        if (ret < 0) {
> +            if (errno == EAGAIN) {
> +                /* Nothing on errqueue, wait */
> +                pfd.fd = sioc->fd;
> +                pfd.events = 0;
> +                ret = poll(&pfd, 1, 250);
> +                if (ret == 0) {
> +                    /*
> +                     * Timeout : After 250ms without receiving any zerocopy
> +                     * notification, consider all data as sent.
> +                     */
> +                    break;

After a timeout, we'll break the while loop and continue parsing an invalid
msg [1].  Is that what we want?

Also, I don't think we can return the flush() even if timed out - iiuc we
should keep polling until we have async_sent==async_queued.  It depends on how
we define flush(): if it's "when this function returns all data is sent", then
we should keep polling, and afaict this is what we want here right now.

> +                } else if (ret < 0 ||
> +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> +                    error_setg_errno(errp, errno,
> +                                     "Poll error");
> +                    break;
> +                } else {
> +                    continue;
> +                }
> +            }
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +
> +            error_setg_errno(errp, errno,
> +                             "Unable to read errqueue");
> +            break;
> +        }
> +
> +        cm = CMSG_FIRSTHDR(&msg);

[1]

> +        if (cm->cmsg_level != SOL_IP &&
> +            cm->cmsg_type != IP_RECVERR) {
> +            error_setg_errno(errp, EPROTOTYPE,
> +                             "Wrong cmsg in errqueue");
> +            break;
> +        }
Leonardo Bras Sept. 29, 2021, 7:32 p.m. UTC | #3
Hello Daniel,

On Fri, Sep 24, 2021 at 2:38 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
[...]
> > @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> >          return -1;
> >      }
> >
> > +#ifdef CONFIG_LINUX
> > +    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
> > +        return 0;
> > +    }
> > +
> > +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> > +    if (ret >= 0) {
> > +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +        klass->io_async_writev = qio_channel_socket_async_writev;
> > +        klass->io_async_flush = qio_channel_socket_async_flush;
> > +    }
> > +#endif
>
> This is not write - the async APIs should not be tied 1:1 to ZEROCOPY
> usage - we should have them take a flag to request ZEROCOPY behaviour.

I agree, but I am not aware of how to do asynchronous send in a socket
without MSG_ZEROCOPY.

I mean, I know of the non-blocking send, but I am not sure how it
checks if everything was sent (i.e. the flush part).
Would it also be using the ERRQUEUE for that?

What would you suggest?

>
> > +
> >      return 0;
> >  }
> >
> > @@ -520,12 +550,13 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
> >      return ret;
> >  }
> >
> > -static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> > -                                         const struct iovec *iov,
> > -                                         size_t niov,
> > -                                         int *fds,
> > -                                         size_t nfds,
> > -                                         Error **errp)
> > +static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
> > +                                           const struct iovec *iov,
> > +                                           size_t niov,
> > +                                           int *fds,
> > +                                           size_t nfds,
> > +                                           int flags,
> > +                                           Error **errp)
> >  {
> >      QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> >      ssize_t ret;
> > @@ -558,20 +589,145 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> >      }
> >
> >   retry:
> > -    ret = sendmsg(sioc->fd, &msg, 0);
> > +    ret = sendmsg(sioc->fd, &msg, flags);
> >      if (ret <= 0) {
> > -        if (errno == EAGAIN) {
> > +        switch (errno) {
> > +        case EAGAIN:
> >              return QIO_CHANNEL_ERR_BLOCK;
> > -        }
> > -        if (errno == EINTR) {
> > +        case EINTR:
> >              goto retry;
> > +        case ENOBUFS:
> > +            return QIO_CHANNEL_ERR_NOBUFS;
> >          }
> > +
> >          error_setg_errno(errp, errno,
> >                           "Unable to write to socket");
> >          return -1;
> >      }
> >      return ret;
> >  }
> > +
> > +static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
> > +                                         const struct iovec *iov,
> > +                                         size_t niov,
> > +                                         int *fds,
> > +                                         size_t nfds,
> > +                                         Error **errp)
> > +{
> > +    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
> > +}
> > +
> > +static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
> > +                                               const struct iovec *iov,
> > +                                               size_t niov,
> > +                                               int *fds,
> > +                                               size_t nfds,
> > +                                               Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    ssize_t ret;
> > +
> > +    sioc->async_queued++;
> > +
> > +    ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
> > +                                       errp);
> > +    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
> > +        /*
> > +         * Not enough locked memory available to the process.
> > +         * Fallback to default sync callback.
> > +         */
> > +
> > +        if (errp && *errp) {
> > +            warn_reportf_err(*errp,
> > +                             "Process can't lock enough memory for using MSG_ZEROCOPY,"
> > +                             "falling back to non-zerocopy");
>
> This is not nice as it hides what is likely mis-configuration error.
> If someone asked for zerocopy, we should honour that or report an
> error back.

Yeah, that makes sense to me.
Thank you for pointing that out.

>
> > +        }
> > +
> > +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +        klass->io_async_writev = NULL;
> > +        klass->io_async_flush = NULL;
>
> Clearing the flush callback is wrong. We might have pending async
> writes that haven't been processed that, and the lack of buffers
> may be a transient problem just caused by a backlog of writes.

I agree that it's wrong.
But I think it will be deprecated anyway if we implement ZEROCOPY as
a feature instead of async, and avoid doing fallback to writev when async is
not available.


>
> > +
> > +        /* Re-send current buffer */
> > +        ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
> > +    }
> > +
> > +    return ret;
> > +}
> > +
> > +
> > +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> > +                                           Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    struct msghdr msg = {};
> > +    struct pollfd pfd;
> > +    struct sock_extended_err *serr;
> > +    struct cmsghdr *cm;
> > +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
>
> Err  sizeof(int) * SOCKET_MAX_FDS   doesn't look right. This
> buffer needs to hold 'struct sock_extended_err' instances,
> not 'int', and SOCKET_MAX_FDS is an unrelated limit.

That was a bad mistake,
I got it by reusing code from above functions, and it got past the review I
did before sending the patch.
Sorry about that.

>
> > +    int ret;
> > +
> > +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> > +    msg.msg_control = control;
> > +    msg.msg_controllen = sizeof(control);
> > +
> > +    while (sioc->async_sent < sioc->async_queued) {
> > +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> > +        if (ret < 0) {
> > +            if (errno == EAGAIN) {
> > +                /* Nothing on errqueue, wait */
> > +                pfd.fd = sioc->fd;
> > +                pfd.events = 0;
> > +                ret = poll(&pfd, 1, 250);
> > +                if (ret == 0) {
> > +                    /*
> > +                     * Timeout : After 250ms without receiving any zerocopy
> > +                     * notification, consider all data as sent.
> > +                     */
>
> This feels very dubious indeed. If some caller needs a guarantee that the
> data was successfully sent, merely waiting 250ms is not going to be reliable
> enough.

That makes sense.
I added this part because at some point in debugging I got an infinite
loop in this part
(I think it was somehow missing some notifications).

>
> A regular non-async + non-zerocopy right will wait as long as is needed
> unless SO_SNDTIMEO has been set on the socket.

So It would be ok to let it loop here?
Maybe the timeout could be only enough to keep the cpu from getting
stuck in here.

>
> At the very least the timeout ought to be a parameter passed in, and the
> return value should indicate whether it timed out, or report how many
> pending writes still aren't processed, so the caller can decide whether
> to call flush again.

That is also makes sense to me.

>
> > +                    break;
> > +                } else if (ret < 0 ||
> > +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> > +                    error_setg_errno(errp, errno,
> > +                                     "Poll error");
> > +                    break;
> > +                } else {
> > +                    continue;
> > +                }
> > +            }
> > +            if (errno == EINTR) {
> > +                continue;
> > +            }
> > +
> > +            error_setg_errno(errp, errno,
> > +                             "Unable to read errqueue");
> > +            break;
> > +        }
> > +
> > +        cm = CMSG_FIRSTHDR(&msg);
> > +        if (cm->cmsg_level != SOL_IP &&
> > +            cm->cmsg_type != IP_RECVERR) {
> > +            error_setg_errno(errp, EPROTOTYPE,
> > +                             "Wrong cmsg in errqueue");
> > +            break;
> > +        }
> > +
> > +        serr = (void *) CMSG_DATA(cm);
> > +        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
> > +            error_setg_errno(errp, serr->ee_errno,
> > +                             "Error on socket");
> > +            break;
> > +        }
> > +        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
> > +            error_setg_errno(errp, serr->ee_origin,
> > +                             "Error not from zerocopy");
> > +            break;
> > +        }
> > +
> > +        /* No errors, count sent ids*/
> > +        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
> > +    }
> > +}
> > +
> > +
> >  #else /* WIN32 */
> >  static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
> >                                          const struct iovec *iov,
>
> Regards,
> Daniel
> --
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
>

Thanks Daniel,

Best regards,
Leonardo Bras
Leonardo Bras Sept. 29, 2021, 7:36 p.m. UTC | #4
On Tue, Sep 28, 2021 at 7:45 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> > +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> > +                                           Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    struct msghdr msg = {};
> > +    struct pollfd pfd;
> > +    struct sock_extended_err *serr;
> > +    struct cmsghdr *cm;
> > +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
> > +    int ret;
> > +
> > +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> > +    msg.msg_control = control;
> > +    msg.msg_controllen = sizeof(control);
> > +
> > +    while (sioc->async_sent < sioc->async_queued) {
> > +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> > +        if (ret < 0) {
> > +            if (errno == EAGAIN) {
> > +                /* Nothing on errqueue, wait */
> > +                pfd.fd = sioc->fd;
> > +                pfd.events = 0;
> > +                ret = poll(&pfd, 1, 250);
> > +                if (ret == 0) {
> > +                    /*
> > +                     * Timeout : After 250ms without receiving any zerocopy
> > +                     * notification, consider all data as sent.
> > +                     */
> > +                    break;
>
> After a timeout, we'll break the while loop and continue parsing an invalid
> msg [1].  Is that what we want?

No, the point here was returning from flush if this (long) timeout
happened, as in
"if asso long has passed, there must be no pending send", which I
agree is quite bad,
but it was all I could think to avoid an infinite loop here if
something goes wrong.

>
> Also, I don't think we can return the flush() even if timed out - iiuc we
> should keep polling until we have async_sent==async_queued.  It depends on how
> we define flush(): if it's "when this function returns all data is sent", then
> we should keep polling, and afaict this is what we want here right now.

Yeah, I agree.
That is the correct way to deal with this.

>
> > +                } else if (ret < 0 ||
> > +                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
> > +                    error_setg_errno(errp, errno,
> > +                                     "Poll error");
> > +                    break;
> > +                } else {
> > +                    continue;
> > +                }
> > +            }
> > +            if (errno == EINTR) {
> > +                continue;
> > +            }
> > +
> > +            error_setg_errno(errp, errno,
> > +                             "Unable to read errqueue");
> > +            break;
> > +        }
> > +
> > +        cm = CMSG_FIRSTHDR(&msg);
>
> [1]
>
> > +        if (cm->cmsg_level != SOL_IP &&
> > +            cm->cmsg_type != IP_RECVERR) {
> > +            error_setg_errno(errp, EPROTOTYPE,
> > +                             "Wrong cmsg in errqueue");
> > +            break;
> > +        }
>
> --
> Peter Xu
>

Best regards,
Leonardo
Peter Xu Sept. 29, 2021, 7:58 p.m. UTC | #5
On Wed, Sep 29, 2021 at 04:36:10PM -0300, Leonardo Bras Soares Passos wrote:
> On Tue, Sep 28, 2021 at 7:45 PM Peter Xu <peterx@redhat.com> wrote:
> >
> > On Wed, Sep 22, 2021 at 07:24:22PM -0300, Leonardo Bras wrote:
> > > +static void qio_channel_socket_async_flush(QIOChannel *ioc,
> > > +                                           Error **errp)
> > > +{
> > > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > > +    struct msghdr msg = {};
> > > +    struct pollfd pfd;
> > > +    struct sock_extended_err *serr;
> > > +    struct cmsghdr *cm;
> > > +    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
> > > +    int ret;
> > > +
> > > +    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
> > > +    msg.msg_control = control;
> > > +    msg.msg_controllen = sizeof(control);
> > > +
> > > +    while (sioc->async_sent < sioc->async_queued) {
> > > +        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
> > > +        if (ret < 0) {
> > > +            if (errno == EAGAIN) {
> > > +                /* Nothing on errqueue, wait */
> > > +                pfd.fd = sioc->fd;
> > > +                pfd.events = 0;
> > > +                ret = poll(&pfd, 1, 250);
> > > +                if (ret == 0) {
> > > +                    /*
> > > +                     * Timeout : After 250ms without receiving any zerocopy
> > > +                     * notification, consider all data as sent.
> > > +                     */
> > > +                    break;
> >
> > After a timeout, we'll break the while loop and continue parsing an invalid
> > msg [1].  Is that what we want?
> 
> No, the point here was returning from flush if this (long) timeout
> happened, as in
> "if asso long has passed, there must be no pending send", which I
> agree is quite bad,
> but it was all I could think to avoid an infinite loop here if
> something goes wrong.

IMHO it's the same when we write() to a socket but the buffer is always full,
we'll simply block there until it has some space.  I don't know what we can do
here besides infinite loop on the timeout - we shouldn't eat the cpu all, but
we should still wait?
Daniel P. Berrangé Sept. 30, 2021, 8:39 a.m. UTC | #6
On Wed, Sep 29, 2021 at 04:32:12PM -0300, Leonardo Bras Soares Passos wrote:
> Hello Daniel,
> 
> On Fri, Sep 24, 2021 at 2:38 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
> [...]
> > > @@ -154,6 +171,19 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> > >          return -1;
> > >      }
> > >
> > > +#ifdef CONFIG_LINUX
> > > +    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> > > +    if (ret >= 0) {
> > > +        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > > +        klass->io_async_writev = qio_channel_socket_async_writev;
> > > +        klass->io_async_flush = qio_channel_socket_async_flush;
> > > +    }
> > > +#endif
> >
> > This is not write - the async APIs should not be tied 1:1 to ZEROCOPY
> > usage - we should have them take a flag to request ZEROCOPY behaviour.
> 
> I agree, but I am not aware of how to do asynchronous send in a socket
> without MSG_ZEROCOPY.
> 
> I mean, I know of the non-blocking send, but I am not sure how it
> checks if everything was sent (i.e. the flush part).
> Would it also be using the ERRQUEUE for that?
> 
> What would you suggest?

Yeah, there isn't any really. I guess I'm anticipating a future that
probably won't exist.  Lets just call the callbacks 'io_write_zerocopy'
and 'io_flush_zerocopy' and ignore the flag.


Regards,
Daniel
diff mbox series

Patch

diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index e747e63514..4d1be0637a 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -47,6 +47,8 @@  struct QIOChannelSocket {
     socklen_t localAddrLen;
     struct sockaddr_storage remoteAddr;
     socklen_t remoteAddrLen;
+    ssize_t async_queued;
+    ssize_t async_sent;
 };
 
 
diff --git a/include/io/channel.h b/include/io/channel.h
index 74f2e3ae8a..611bb2ea26 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -31,6 +31,7 @@  OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
 
 
 #define QIO_CHANNEL_ERR_BLOCK -2
+#define QIO_CHANNEL_ERR_NOBUFS -3
 
 typedef enum QIOChannelFeature QIOChannelFeature;
 
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 606ec97cf7..c67832d0bb 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -26,9 +26,23 @@ 
 #include "io/channel-watch.h"
 #include "trace.h"
 #include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
 
 #define SOCKET_MAX_FDS 16
 
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               Error **errp);
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+                                           Error **errp);
+
 SocketAddress *
 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
                                      Error **errp)
@@ -55,6 +69,8 @@  qio_channel_socket_new(void)
 
     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
     sioc->fd = -1;
+    sioc->async_queued = 0;
+    sioc->async_sent = 0;
 
     ioc = QIO_CHANNEL(sioc);
     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +156,7 @@  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
                                     Error **errp)
 {
     int fd;
+    int ret, v = 1;
 
     trace_qio_channel_socket_connect_sync(ioc, addr);
     fd = socket_connect(addr, errp);
@@ -154,6 +171,19 @@  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
         return -1;
     }
 
+#ifdef CONFIG_LINUX
+    if (addr->type != SOCKET_ADDRESS_TYPE_INET) {
+        return 0;
+    }
+
+    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+    if (ret >= 0) {
+        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+        klass->io_async_writev = qio_channel_socket_async_writev;
+        klass->io_async_flush = qio_channel_socket_async_flush;
+    }
+#endif
+
     return 0;
 }
 
@@ -520,12 +550,13 @@  static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
     return ret;
 }
 
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
-                                         const struct iovec *iov,
-                                         size_t niov,
-                                         int *fds,
-                                         size_t nfds,
-                                         Error **errp)
+static ssize_t __qio_channel_socket_writev(QIOChannel *ioc,
+                                           const struct iovec *iov,
+                                           size_t niov,
+                                           int *fds,
+                                           size_t nfds,
+                                           int flags,
+                                           Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     ssize_t ret;
@@ -558,20 +589,145 @@  static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
     }
 
  retry:
-    ret = sendmsg(sioc->fd, &msg, 0);
+    ret = sendmsg(sioc->fd, &msg, flags);
     if (ret <= 0) {
-        if (errno == EAGAIN) {
+        switch (errno) {
+        case EAGAIN:
             return QIO_CHANNEL_ERR_BLOCK;
-        }
-        if (errno == EINTR) {
+        case EINTR:
             goto retry;
+        case ENOBUFS:
+            return QIO_CHANNEL_ERR_NOBUFS;
         }
+
         error_setg_errno(errp, errno,
                          "Unable to write to socket");
         return -1;
     }
     return ret;
 }
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+                                         const struct iovec *iov,
+                                         size_t niov,
+                                         int *fds,
+                                         size_t nfds,
+                                         Error **errp)
+{
+    return __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
+static ssize_t qio_channel_socket_async_writev(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    ssize_t ret;
+
+    sioc->async_queued++;
+
+    ret = __qio_channel_socket_writev(ioc, iov, niov, fds, nfds, MSG_ZEROCOPY,
+                                       errp);
+    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
+        /*
+         * Not enough locked memory available to the process.
+         * Fallback to default sync callback.
+         */
+
+        if (errp && *errp) {
+            warn_reportf_err(*errp,
+                             "Process can't lock enough memory for using MSG_ZEROCOPY,"
+                             "falling back to non-zerocopy");
+        }
+
+        QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+        klass->io_async_writev = NULL;
+        klass->io_async_flush = NULL;
+
+        /* Re-send current buffer */
+        ret = qio_channel_socket_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
+    return ret;
+}
+
+
+static void qio_channel_socket_async_flush(QIOChannel *ioc,
+                                           Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    struct msghdr msg = {};
+    struct pollfd pfd;
+    struct sock_extended_err *serr;
+    struct cmsghdr *cm;
+    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
+    int ret;
+
+    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
+    msg.msg_control = control;
+    msg.msg_controllen = sizeof(control);
+
+    while (sioc->async_sent < sioc->async_queued) {
+        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                /* Nothing on errqueue, wait */
+                pfd.fd = sioc->fd;
+                pfd.events = 0;
+                ret = poll(&pfd, 1, 250);
+                if (ret == 0) {
+                    /*
+                     * Timeout : After 250ms without receiving any zerocopy
+                     * notification, consider all data as sent.
+                     */
+                    break;
+                } else if (ret < 0 ||
+                           (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+                    error_setg_errno(errp, errno,
+                                     "Poll error");
+                    break;
+                } else {
+                    continue;
+                }
+            }
+            if (errno == EINTR) {
+                continue;
+            }
+
+            error_setg_errno(errp, errno,
+                             "Unable to read errqueue");
+            break;
+        }
+
+        cm = CMSG_FIRSTHDR(&msg);
+        if (cm->cmsg_level != SOL_IP &&
+            cm->cmsg_type != IP_RECVERR) {
+            error_setg_errno(errp, EPROTOTYPE,
+                             "Wrong cmsg in errqueue");
+            break;
+        }
+
+        serr = (void *) CMSG_DATA(cm);
+        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+            error_setg_errno(errp, serr->ee_errno,
+                             "Error on socket");
+            break;
+        }
+        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+            error_setg_errno(errp, serr->ee_origin,
+                             "Error not from zerocopy");
+            break;
+        }
+
+        /* No errors, count sent ids*/
+        sioc->async_sent += serr->ee_data - serr->ee_info + 1;
+    }
+}
+
+
 #else /* WIN32 */
 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
                                         const struct iovec *iov,