diff mbox series

[V2,net-next,2/3] rds: deliver zerocopy completion notification with data

Message ID e3be990c88dbac833852ab9b430035a568acd929.1519399473.git.sowmini.varadhan@oracle.com
State Changes Requested, archived
Delegated to: David Miller
Headers show
Series RDS: optimized notification for zerocopy completion | expand

Commit Message

Sowmini Varadhan Feb. 23, 2018, 10:08 p.m. UTC
This commit is an optimization of the commit 01883eda72bd
("rds: support for zcopy completion notification") for PF_RDS sockets.

RDS applications are predominantly request-response transactions, so
it is more efficient to reduce the number of system calls and have
zerocopy completion notification delivered as ancillary data on the
POLLIN channel.

Cookies are passed up as ancillary data (at level SOL_RDS) in a
struct rds_zcopy_cookies when the returned value of recvmsg() is
greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed
with each message.

This commit removes support for zerocopy completion notification on
MSG_ERRQUEUE for PF_RDS sockets.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
---
v2: remove sk_error_queue path; lot of cautionary checks rds_recvmsg_zcookie()
    and callers to make sure we dont remove cookies from the queue and then
    fail to pass it up to caller

 include/uapi/linux/errqueue.h |    2 --
 include/uapi/linux/rds.h      |    7 +++++++
 net/rds/af_rds.c              |    7 +++++--
 net/rds/message.c             |   35 +++++++++++++----------------------
 net/rds/rds.h                 |    2 ++
 net/rds/recv.c                |   34 +++++++++++++++++++++++++++++++++-
 6 files changed, 60 insertions(+), 27 deletions(-)

Comments

Willem de Bruijn Feb. 25, 2018, 3:56 p.m. UTC | #1
On Fri, Feb 23, 2018 at 5:08 PM, Sowmini Varadhan
<sowmini.varadhan@oracle.com> wrote:
> This commit is an optimization of the commit 01883eda72bd
> ("rds: support for zcopy completion notification") for PF_RDS sockets.
>
> RDS applications are predominantly request-response transactions, so
> it is more efficient to reduce the number of system calls and have
> zerocopy completion notification delivered as ancillary data on the
> POLLIN channel.
>
> Cookies are passed up as ancillary data (at level SOL_RDS) in a
> struct rds_zcopy_cookies when the returned value of recvmsg() is
> greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed
> with each message.
>
> This commit removes support for zerocopy completion notification on
> MSG_ERRQUEUE for PF_RDS sockets.
>
> Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
> ---

> diff --git a/net/rds/message.c b/net/rds/message.c
> index 6518345..2e8bdaf 100644
> --- a/net/rds/message.c
> +++ b/net/rds/message.c
> @@ -58,32 +58,26 @@ void rds_message_addref(struct rds_message *rm)
>
>  static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
>  {
> -       struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
> -       int ncookies;
> -       u32 *ptr;
> +       struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;

please add a bounds check (not necessarily right here)

  BUILD_BUG_ON(sizeof(*ck) > sizeof(skb->cb));

>  static void rds_rm_zerocopy_callback(struct rds_sock *rs,
>                                      struct rds_znotifier *znotif)
>  {
> -       struct sock *sk = rds_rs_to_sk(rs);
>         struct sk_buff *skb, *tail;
> -       struct sock_exterr_skb *serr;
>         unsigned long flags;
>         struct sk_buff_head *q;
>         u32 cookie = znotif->z_cookie;
> +       struct rds_zcopy_cookies *ck;
>
> -       q = &sk->sk_error_queue;
> +       q = &rs->rs_zcookie_queue;
>         spin_lock_irqsave(&q->lock, flags);
>         tail = skb_peek_tail(q);
>
> @@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs,
>                 spin_unlock_irqrestore(&q->lock, flags);
>                 mm_unaccount_pinned_pages(&znotif->z_mmp);
>                 consume_skb(rds_skb_from_znotifier(znotif));
> -               sk->sk_error_report(sk);
> +               /* caller should wake up POLLIN */

sk->sk_data_ready(sk);

> @@ -362,8 +354,7 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
>                 int total_copied = 0;
>                 struct sk_buff *skb;
>
> -               skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
> -                               GFP_KERNEL);
> +               skb = alloc_skb(0, GFP_KERNEL);

Without the error queue, the struct no longer needs to be an skb,
per se. Converting to a different struct with list_head is definitely
a longer patch. But kmalloc will be cheaper than alloc_skb.
Perhaps something to try (as separate follow-on work).

> +static int rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
> +{
> +       struct sk_buff *skb;
> +       struct sk_buff_head *q = &rs->rs_zcookie_queue;
> +       struct rds_zcopy_cookies *done;
> +       int ret;
> +
> +       if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || !skb_peek(q))
> +               return 0;

Racy read?

> +
> +       if (!msg->msg_control ||

I'd move this first, so that the cookie queue need not even be probed
in the common case.

> +           msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
> +               return 0;

if caller does not satisfy the contract on controllen size, can be
more explicit and return an error.

> +
> +       skb = skb_dequeue(q);
> +       if (!skb)
> +               return 0;
> +       done = (struct rds_zcopy_cookies *)skb->cb;
> +       ret = done->num;

done->num is guaranteed to be >= 1, so ret is not strictly needed.

> +       if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
> +                    done)) {
> +               skb_queue_head(q, skb);
> +               ret = 0;
> +       } else {
> +               consume_skb(skb);
> +       }
> +       return ret;
> +}
> +
>  int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
>                 int msg_flags)
>  {
> @@ -586,6 +615,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
>         int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
>         DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
>         struct rds_incoming *inc = NULL;
> +       int ncookies;
>
>         /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
>         timeo = sock_rcvtimeo(sk, nonblock);
> @@ -611,7 +641,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
>
>                 if (!rds_next_incoming(rs, &inc)) {
>                         if (nonblock) {
> -                               ret = -EAGAIN;
> +                               ncookies = rds_recvmsg_zcookie(rs, msg);
> +                               ret = ncookies ? 0 : -EAGAIN;
>                                 break;
>                         }
>
> @@ -660,6 +691,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
>                         ret = -EFAULT;
>                         goto out;
>                 }
> +               ncookies = rds_recvmsg_zcookie(rs, msg);

return value not used. can make rds_recvmsg_zcookie return a bool for
the non-blocking case.

>
>                 rds_stats_inc(s_recv_delivered);
>
> --
> 1.7.1
Sowmini Varadhan Feb. 25, 2018, 4:20 p.m. UTC | #2
On (02/25/18 10:56), Willem de Bruijn wrote:
> > @@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> >                 spin_unlock_irqrestore(&q->lock, flags);
> >                 mm_unaccount_pinned_pages(&znotif->z_mmp);
> >                 consume_skb(rds_skb_from_znotifier(znotif));
> > -               sk->sk_error_report(sk);
> > +               /* caller should wake up POLLIN */
> 
> sk->sk_data_ready(sk);

yes, this was my first thought, but everything else in rds
is calling rds_wake_sk_sleep (this is even done in
rds_recv_incoming(), which actually queues up the data), 
so I chose to align with that model (and call this in the caller 
of rds_rm_zerocopy_callback()

> Without the error queue, the struct no longer needs to be an skb,
> per se. Converting to a different struct with list_head is definitely
> a longer patch. But kmalloc will be cheaper than alloc_skb.
> Perhaps something to try (as separate follow-on work).

right, I was thinking along these exact lines as well,
and was already planning a follow-up.

> > +       if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || !skb_peek(q))
> > +               return 0;
> 
> Racy read?

Can you elaborate? I only put the skb_peek to quickly
bail for sockets that are not using zerocopy at all- 
if you race against something that's queuing data, and 
miss it on the peek, the next read/recv should find it.
Am I missing some race?


> 
> > +
> > +       if (!msg->msg_control ||
> 
> I'd move this first, so that the cookie queue need not even be probed
> in the common case.

you mean before the check for SOCK_ZEROCOPY?

> > +           msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
> > +               return 0;
> 
> if caller does not satisfy the contract on controllen size, can be
> more explicit and return an error.

if SOCK_ZEROCOPY has been set, but the recv did not specify a cmsghdr,
you mean?

> > +               ncookies = rds_recvmsg_zcookie(rs, msg);

Will take care of the remaining comments in V3.
Willem de Bruijn Feb. 25, 2018, 5:36 p.m. UTC | #3
On Sun, Feb 25, 2018 at 11:20 AM, Sowmini Varadhan
<sowmini.varadhan@oracle.com> wrote:
> On (02/25/18 10:56), Willem de Bruijn wrote:
>> > @@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs,
>> >                 spin_unlock_irqrestore(&q->lock, flags);
>> >                 mm_unaccount_pinned_pages(&znotif->z_mmp);
>> >                 consume_skb(rds_skb_from_znotifier(znotif));
>> > -               sk->sk_error_report(sk);
>> > +               /* caller should wake up POLLIN */
>>
>> sk->sk_data_ready(sk);
>
> yes, this was my first thought, but everything else in rds
> is calling rds_wake_sk_sleep (this is even done in
> rds_recv_incoming(), which actually queues up the data),
> so I chose to align with that model (and call this in the caller
> of rds_rm_zerocopy_callback()

Ah, understood. Perhaps say "wakes" instead of "should wake".
I mistakenly read this as a todo.

>> Without the error queue, the struct no longer needs to be an skb,
>> per se. Converting to a different struct with list_head is definitely
>> a longer patch. But kmalloc will be cheaper than alloc_skb.
>> Perhaps something to try (as separate follow-on work).
>
> right, I was thinking along these exact lines as well,
> and was already planning a follow-up.
>
>> > +       if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || !skb_peek(q))
>> > +               return 0;
>>
>> Racy read?
>
> Can you elaborate? I only put the skb_peek to quickly
> bail for sockets that are not using zerocopy at all-
> if you race against something that's queuing data, and
> miss it on the peek, the next read/recv should find it.
> Am I missing some race?

It''s a lockless access. But intentionally so, then. You're right, as long as
the subsequent skb_dequeue handles the case where the queue is
empty, it seems okay to optimistically probe lockless first.

>>
>> > +
>> > +       if (!msg->msg_control ||
>>
>> I'd move this first, so that the cookie queue need not even be probed
>> in the common case.
>
> you mean before the check for SOCK_ZEROCOPY?

Yes

>> > +           msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
>> > +               return 0;
>>
>> if caller does not satisfy the contract on controllen size, can be
>> more explicit and return an error.
>
> if SOCK_ZEROCOPY has been set, but the recv did not specify a cmsghdr,
> you mean?

I mean if SOCK_ZEROCOPY has been set and the caller calls recvmsg
with a control buffer, but one that is too small to handle zerocopy cookie
notifications.

>> > +               ncookies = rds_recvmsg_zcookie(rs, msg);
>
> Will take care of the remaining comments in V3.
diff mbox series

Patch

diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h
index 28812ed..dc64cfa 100644
--- a/include/uapi/linux/errqueue.h
+++ b/include/uapi/linux/errqueue.h
@@ -20,13 +20,11 @@  struct sock_extended_err {
 #define SO_EE_ORIGIN_ICMP6	3
 #define SO_EE_ORIGIN_TXSTATUS	4
 #define SO_EE_ORIGIN_ZEROCOPY	5
-#define SO_EE_ORIGIN_ZCOOKIE	6
 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
 
 #define SO_EE_OFFENDER(ee)	((struct sockaddr*)((ee)+1))
 
 #define SO_EE_CODE_ZEROCOPY_COPIED	1
-#define	SO_EE_ORIGIN_MAX_ZCOOKIES	8
 
 /**
  *	struct scm_timestamping - timestamps exposed through cmsg
diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h
index 12e3bca..a66b213 100644
--- a/include/uapi/linux/rds.h
+++ b/include/uapi/linux/rds.h
@@ -104,6 +104,7 @@ 
 #define RDS_CMSG_MASKED_ATOMIC_CSWP	9
 #define RDS_CMSG_RXPATH_LATENCY		11
 #define	RDS_CMSG_ZCOPY_COOKIE		12
+#define	RDS_CMSG_ZCOPY_COMPLETION	13
 
 #define RDS_INFO_FIRST			10000
 #define RDS_INFO_COUNTERS		10000
@@ -317,6 +318,12 @@  struct rds_rdma_notify {
 #define RDS_RDMA_DROPPED	3
 #define RDS_RDMA_OTHER_ERROR	4
 
+#define	RDS_MAX_ZCOOKIES	8
+struct rds_zcopy_cookies {
+	__u32 num;
+	__u32 cookies[RDS_MAX_ZCOOKIES];
+};
+
 /*
  * Common set of flags for all RDMA related structs
  */
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index a937f18..f712610 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -77,6 +77,7 @@  static int rds_release(struct socket *sock)
 	rds_send_drop_to(rs, NULL);
 	rds_rdma_drop_keys(rs);
 	rds_notify_queue_get(rs, NULL);
+	__skb_queue_purge(&rs->rs_zcookie_queue);
 
 	spin_lock_bh(&rds_sock_lock);
 	list_del_init(&rs->rs_item);
@@ -144,7 +145,7 @@  static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
  *  -	to signal that a previously congested destination may have become
  *	uncongested
  *  -	A notification has been queued to the socket (this can be a congestion
- *	update, or a RDMA completion).
+ *	update, or a RDMA completion, or a MSG_ZEROCOPY completion).
  *
  * EPOLLOUT is asserted if there is room on the send queue. This does not mean
  * however, that the next sendmsg() call will succeed. If the application tries
@@ -178,7 +179,8 @@  static __poll_t rds_poll(struct file *file, struct socket *sock,
 		spin_unlock(&rs->rs_lock);
 	}
 	if (!list_empty(&rs->rs_recv_queue) ||
-	    !list_empty(&rs->rs_notify_queue))
+	    !list_empty(&rs->rs_notify_queue) ||
+	    !skb_queue_empty(&rs->rs_zcookie_queue))
 		mask |= (EPOLLIN | EPOLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (EPOLLOUT | EPOLLWRNORM);
@@ -513,6 +515,7 @@  static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
+	skb_queue_head_init(&rs->rs_zcookie_queue);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
 	rs->rs_rx_traces = 0;
diff --git a/net/rds/message.c b/net/rds/message.c
index 6518345..2e8bdaf 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -58,32 +58,26 @@  void rds_message_addref(struct rds_message *rm)
 
 static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
 {
-	struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
-	int ncookies;
-	u32 *ptr;
+	struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
+	int ncookies = ck->num;
 
-	if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
+	if (ncookies == RDS_MAX_ZCOOKIES)
 		return false;
-	ncookies = serr->ee.ee_data;
-	if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
-		return false;
-	ptr = skb_put(skb, sizeof(u32));
-	*ptr = cookie;
-	serr->ee.ee_data = ++ncookies;
+	ck->cookies[ncookies] = cookie;
+	ck->num =  ++ncookies;
 	return true;
 }
 
 static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 				     struct rds_znotifier *znotif)
 {
-	struct sock *sk = rds_rs_to_sk(rs);
 	struct sk_buff *skb, *tail;
-	struct sock_exterr_skb *serr;
 	unsigned long flags;
 	struct sk_buff_head *q;
 	u32 cookie = znotif->z_cookie;
+	struct rds_zcopy_cookies *ck;
 
-	q = &sk->sk_error_queue;
+	q = &rs->rs_zcookie_queue;
 	spin_lock_irqsave(&q->lock, flags);
 	tail = skb_peek_tail(q);
 
@@ -91,22 +85,19 @@  static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 		spin_unlock_irqrestore(&q->lock, flags);
 		mm_unaccount_pinned_pages(&znotif->z_mmp);
 		consume_skb(rds_skb_from_znotifier(znotif));
-		sk->sk_error_report(sk);
+		/* caller should wake up POLLIN */
 		return;
 	}
 
 	skb = rds_skb_from_znotifier(znotif);
-	serr = SKB_EXT_ERR(skb);
-	memset(&serr->ee, 0, sizeof(serr->ee));
-	serr->ee.ee_errno = 0;
-	serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
-	serr->ee.ee_info = 0;
+	ck = (struct rds_zcopy_cookies *)skb->cb;
+	memset(ck, 0, sizeof(*ck));
 	WARN_ON(!skb_zcookie_add(skb, cookie));
 
 	__skb_queue_tail(q, skb);
 
 	spin_unlock_irqrestore(&q->lock, flags);
-	sk->sk_error_report(sk);
+	/* caller should wake up POLLIN */
 
 	mm_unaccount_pinned_pages(&znotif->z_mmp);
 }
@@ -129,6 +120,7 @@  static void rds_message_purge(struct rds_message *rm)
 		if (rm->data.op_mmp_znotifier) {
 			zcopy = true;
 			rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+			rds_wake_sk_sleep(rs);
 			rm->data.op_mmp_znotifier = NULL;
 		}
 		sock_put(rds_rs_to_sk(rs));
@@ -362,8 +354,7 @@  int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
 		int total_copied = 0;
 		struct sk_buff *skb;
 
-		skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
-				GFP_KERNEL);
+		skb = alloc_skb(0, GFP_KERNEL);
 		if (!skb)
 			return -ENOMEM;
 		rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 31cd388..33b1635 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -603,6 +603,8 @@  struct rds_sock {
 	/* Socket receive path trace points*/
 	u8			rs_rx_traces;
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+	struct sk_buff_head	rs_zcookie_queue;
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b080961..9f1bf55 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -577,6 +577,35 @@  static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
 	return ret;
 }
 
+static int rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
+{
+	struct sk_buff *skb;
+	struct sk_buff_head *q = &rs->rs_zcookie_queue;
+	struct rds_zcopy_cookies *done;
+	int ret;
+
+	if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || !skb_peek(q))
+		return 0;
+
+	if (!msg->msg_control ||
+	    msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
+		return 0;
+
+	skb = skb_dequeue(q);
+	if (!skb)
+		return 0;
+	done = (struct rds_zcopy_cookies *)skb->cb;
+	ret = done->num;
+	if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
+		     done)) {
+		skb_queue_head(q, skb);
+		ret = 0;
+	} else {
+		consume_skb(skb);
+	}
+	return ret;
+}
+
 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 		int msg_flags)
 {
@@ -586,6 +615,7 @@  int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 	int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
 	DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
 	struct rds_incoming *inc = NULL;
+	int ncookies;
 
 	/* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
 	timeo = sock_rcvtimeo(sk, nonblock);
@@ -611,7 +641,8 @@  int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 
 		if (!rds_next_incoming(rs, &inc)) {
 			if (nonblock) {
-				ret = -EAGAIN;
+				ncookies = rds_recvmsg_zcookie(rs, msg);
+				ret = ncookies ? 0 : -EAGAIN;
 				break;
 			}
 
@@ -660,6 +691,7 @@  int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 			ret = -EFAULT;
 			goto out;
 		}
+		ncookies = rds_recvmsg_zcookie(rs, msg);
 
 		rds_stats_inc(s_recv_delivered);