diff mbox series

[net,2/2] rxrpc: Fix the data_ready handler

Message ID 153874699086.18195.6819270943388841145.stgit@warthog.procyon.org.uk
State Changes Requested, archived
Delegated to: David Miller
Headers show
Series rxrpc: Fixes | expand

Commit Message

David Howells Oct. 5, 2018, 1:43 p.m. UTC
Fix the rxrpc_data_ready() function to pick up all packets and to not miss
any.  There are two problems:

 (1) The sk_data_ready pointer on the UDP socket is set *after* it is
     bound.  This means that it's open for business before we're ready to
     dequeue packets and there's a tiny window exists in which a packet can
     sneak onto the receive queue, but we never know about it.

     Fix this by setting the pointers on the socket prior to binding it.

 (2) skb_recv_udp() will return an error (such as ENETUNREACH) if there was
     an error on the transmission side, even though we set the
     sk_error_report hook.  Because rxrpc_data_ready() returns immediately
     in such a case, it never actually removes its packet from the receive
     queue.

     Fix this by abstracting out the UDP dequeuing and checksumming into a
     separate function that keeps hammering on skb_recv_udp() until it
     returns -EAGAIN, passing the packets extracted to the remainder of the
     function.

and two potential problems:

 (3) It might be possible in some circumstances or in the future for
     packets to be being added to the UDP receive queue whilst rxrpc is
     running consuming them, so the data_ready() handler might get called
     less often than once per packet.

     Allow for this by fully draining the queue on each call as (2).

 (4) If a packet fails the checksum check, the code currently returns after
     discarding the packet without checking for more.

     Allow for this by fully draining the queue on each call as (2).

Fixes: 17926a79320a ("[AF_RXRPC]: Provide secure RxRPC sockets for use by userspace and kernel both")
Signed-off-by: David Howells <dhowells@redhat.com>
Acked-by: Paolo Abeni <pabeni@redhat.com>
---

 net/rxrpc/input.c        |   68 ++++++++++++++++++++++++++--------------------
 net/rxrpc/local_object.c |   11 ++++---
 2 files changed, 44 insertions(+), 35 deletions(-)

Comments

Eric Dumazet Oct. 5, 2018, 1:52 p.m. UTC | #1
On 10/05/2018 06:43 AM, David Howells wrote:
> Fix the rxrpc_data_ready() function to pick up all packets and to not miss
> any.  There are two problems:
> 


> +	for (;;) {
> +		skb = skb_recv_udp(udp_sk, 0, 1, &ret);
> +		if (!skb) {
> +			if (ret == -EAGAIN)
> +				return;
> +
> +			/* If there was a transmission failure, we get an error
> +			 * here that we need to ignore.
> +			 */
> +			_debug("UDP socket error %d", ret);
> +			continue;
> +		}
> +
> +		rxrpc_new_skb(skb, rxrpc_skb_rx_received);
> +
> +		/* we'll probably need to checksum it (didn't call sock_recvmsg) */
> +		if (skb_checksum_complete(skb)) {
> +			rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
> +			__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INERRORS, 0);
> +			_debug("csum failed");
> +			continue;
> +		}
> +
> +		__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INDATAGRAMS, 0);
> +
> +		rxrpc_input_packet(udp_sk, skb);
> +	}
> +}


This looks a potential infinite loop to me ?

If not, please add a comment explaining why there is no apparent limit.
David Howells Oct. 5, 2018, 2:18 p.m. UTC | #2
Eric Dumazet <eric.dumazet@gmail.com> wrote:

> This looks a potential infinite loop to me ?

I assume that you're talking about the case where the packets are coming in so
fast that rxrpc is processing them as fast as they're coming in - or failing
to keep up.  I'm not sure what's the best thing to do in that case.

If you're not talking about that case, shouldn't skb_recv_udp() eventually
empty the queue and return -EAGAIN, thereby ending the loop?

I have another patch (see attached) which I was going to submit to net-next,
but maybe I need to merge into this one, in which I take the packets from the
encap_rcv hook.

Paolo has a couple of technical points on it that I need to look at dealing
with, but it speeds things up a bit and gets rid of the loop entirely.

David
---
commit 3784732a15631345c6d33b58985a9c2f30a2047f
Author: David Howells <dhowells@redhat.com>
Date:   Thu Oct 4 11:10:51 2018 +0100

    rxrpc: Use the UDP encap_rcv hook
    
    Use the UDP encap_rcv hook to cut the bit out of the rxrpc packet reception
    in which a packet is placed onto the UDP receive queue and then immediately
    removed again by rxrpc.  Going via the queue in this manner seems like it
    should be unnecessary.
    
    This does, however, require the invention of a value to place in encap_type
    as that's one of the conditions to switch packets out to the encap_rcv
    hook.  Possibly the value doesn't actually matter for anything other than
    sockopts on the UDP socket, which aren't accessible outside of rxrpc
    anyway.
    
    This seems to cut a bit of time out of the time elapsed between each
    sk_buff being timestamped and turning up in rxrpc (the final number in the
    following trace excerpts).  I measured this by making the rxrpc_rx_packet
    trace point print the time elapsed between the skb being timestamped and
    the current time (in ns), e.g.:
    
            ... 424.278721: rxrpc_rx_packet: ...  ACK 25026
    
    So doing a 512MiB DIO read from my test server, with an unmodified kernel:
    
            N       min     max     sum             mean    stddev
            27605   2626    7581    7.83992e+07     2840.04 181.029
    
    and with the patch applied:
    
            N       min     max     sum             mean    stddev
            27547   1895    12165   6.77461e+07     2459.29 255.02
    
    Signed-off-by: David Howells <dhowells@redhat.com>

diff --git a/include/uapi/linux/udp.h b/include/uapi/linux/udp.h
index 09d00f8c442b..09502de447f5 100644
--- a/include/uapi/linux/udp.h
+++ b/include/uapi/linux/udp.h
@@ -40,5 +40,6 @@ struct udphdr {
 #define UDP_ENCAP_L2TPINUDP	3 /* rfc2661 */
 #define UDP_ENCAP_GTP0		4 /* GSM TS 09.60 */
 #define UDP_ENCAP_GTP1U		5 /* 3GPP TS 29.060 */
+#define UDP_ENCAP_RXRPC		6
 
 #endif /* _UAPI_LINUX_UDP_H */
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index b79d0413b5ed..b94f3bdc66ac 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -966,7 +966,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 /*
  * input.c
  */
-void rxrpc_data_ready(struct sock *);
+int rxrpc_input_packet(struct sock *, struct sk_buff *);
 
 /*
  * insecure.c
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index ab90e60a5237..c0af023b77f4 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1121,7 +1121,7 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
  * shut down and the local endpoint from going away, thus sk_user_data will not
  * be cleared until this function returns.
  */
-void rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
+int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_channel *chan;
@@ -1136,6 +1136,24 @@ void rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 
 	_enter("%p", udp_sk);
 
+	if (skb->tstamp == 0)
+		skb->tstamp = ktime_get_real();
+
+	rxrpc_new_skb(skb, rxrpc_skb_rx_received);
+
+	_net("recv skb %p", skb);
+
+	if (sk_filter_trim_cap(udp_sk, skb, sizeof(struct udphdr))) {
+		__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INERRORS, IS_UDPLITE(udp_sk));
+		atomic_inc(&udp_sk->sk_drops);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+		_leave(" [filtered]");
+		return 0;
+	}
+
+	udp_csum_pull_header(skb);
+	skb_orphan(skb);
+
 	/* The UDP protocol already released all skb resources;
 	 * we are free to add our own data there.
 	 */
@@ -1150,7 +1168,7 @@ void rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 		if ((lose++ & 7) == 7) {
 			trace_rxrpc_rx_lose(sp);
 			rxrpc_free_skb(skb, rxrpc_skb_rx_lost);
-			return;
+			return 0;
 		}
 	}
 
@@ -1334,7 +1352,7 @@ void rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 out:
 	trace_rxrpc_rx_done(0, 0);
-	return;
+	return 0;
 
 out_unlock:
 	rcu_read_unlock();
@@ -1373,38 +1391,5 @@ void rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	trace_rxrpc_rx_done(skb->mark, skb->priority);
 	rxrpc_reject_packet(local, skb);
 	_leave(" [badmsg]");
-}
-
-void rxrpc_data_ready(struct sock *udp_sk)
-{
-	struct sk_buff *skb;
-	int ret;
-
-	for (;;) {
-		skb = skb_recv_udp(udp_sk, 0, 1, &ret);
-		if (!skb) {
-			if (ret == -EAGAIN)
-				return;
-
-			/* If there was a transmission failure, we get an error
-			 * here that we need to ignore.
-			 */
-			_debug("UDP socket error %d", ret);
-			continue;
-		}
-
-		rxrpc_new_skb(skb, rxrpc_skb_rx_received);
-
-		/* we'll probably need to checksum it (didn't call sock_recvmsg) */
-		if (skb_checksum_complete(skb)) {
-			rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
-			__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INERRORS, 0);
-			_debug("csum failed");
-			continue;
-		}
-
-		__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INDATAGRAMS, 0);
-
-		rxrpc_input_packet(udp_sk, skb);
-	}
+	return 0;
 }
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 30862f44c9f1..cad0691c2bb4 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -19,6 +19,7 @@
 #include <linux/ip.h>
 #include <linux/hashtable.h>
 #include <net/sock.h>
+#include <net/udp.h>
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
@@ -108,7 +109,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
  */
 static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 {
-	struct sock *sock;
+	struct sock *usk;
 	int ret, opt;
 
 	_enter("%p{%d,%d}",
@@ -123,10 +124,26 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 	}
 
 	/* set the socket up */
-	sock = local->socket->sk;
-	sock->sk_user_data	= local;
-	sock->sk_data_ready	= rxrpc_data_ready;
-	sock->sk_error_report	= rxrpc_error_report;
+	usk = local->socket->sk;
+	inet_sk(usk)->mc_loop = 0;
+
+	/* Enable CHECKSUM_UNNECESSARY to CHECKSUM_COMPLETE conversion */
+	inet_inc_convert_csum(usk);
+
+	rcu_assign_sk_user_data(usk, local);
+
+	udp_sk(usk)->encap_type = UDP_ENCAP_RXRPC;
+	udp_sk(usk)->encap_rcv = rxrpc_input_packet;
+	udp_sk(usk)->encap_destroy = NULL;
+	udp_sk(usk)->gro_receive = NULL;
+	udp_sk(usk)->gro_complete = NULL;
+
+	udp_encap_enable();
+#if IS_ENABLED(CONFIG_IPV6)
+	if (local->srx.transport.family == AF_INET6)
+		udpv6_encap_enable();
+#endif
+	usk->sk_error_report = rxrpc_error_report;
 
 	/* if a local address was supplied then bind it */
 	if (local->srx.transport_len > sizeof(sa_family_t)) {
Eric Dumazet Oct. 5, 2018, 4:07 p.m. UTC | #3
On 10/05/2018 07:18 AM, David Howells wrote:
> Eric Dumazet <eric.dumazet@gmail.com> wrote:
> 
>> This looks a potential infinite loop to me ?
> 
> I assume that you're talking about the case where the packets are coming in so
> fast that rxrpc is processing them as fast as they're coming in - or failing
> to keep up.  I'm not sure what's the best thing to do in that case.

Well, this is exactly why we do not write such loops :/

sk_data_ready is not meant to process packets, it is meant to signal
to another entity (preferably running in process context and thus with proper
schedule points, and not blocking BH) that there is data ready to be consumed.

Under DOS, it is possible multiple cpus will sk_data_ready in parallel.
David Howells Oct. 5, 2018, 4:33 p.m. UTC | #4
Eric Dumazet <eric.dumazet@gmail.com> wrote:

> sk_data_ready is not meant to process packets, it is meant to signal
> to another entity (preferably running in process context and thus with proper
> schedule points, and not blocking BH) that there is data ready to be consumed.

The issue is that I need to route the packets to the appropriate call, and the
BH appears to be the right place to do this, especially as I can quickly parse
and discard certain types of packet right there.

If I move all of this to process context then that adds extra context switches
between the routing process and the destination process.

> Under DOS, it is possible multiple cpus will sk_data_ready in parallel.

Ummm...  I've been led to believe that sk_data_ready will *not* be called in
parallel and that the code it calls can assume non-reentrancy.  Is this not
the case?

What about the patch I attached, whereby I use the encap_rcv() hook.  Do you
say that won't work?

David
Eric Dumazet Oct. 5, 2018, 5:44 p.m. UTC | #5
On 10/05/2018 09:33 AM, David Howells wrote:
> Eric Dumazet <eric.dumazet@gmail.com> wrote:
> 
>> sk_data_ready is not meant to process packets, it is meant to signal
>> to another entity (preferably running in process context and thus with proper
>> schedule points, and not blocking BH) that there is data ready to be consumed.
> 
> The issue is that I need to route the packets to the appropriate call, and the
> BH appears to be the right place to do this, especially as I can quickly parse
> and discard certain types of packet right there.
> 
> If I move all of this to process context then that adds extra context switches
> between the routing process and the destination process.
> 
>> Under DOS, it is possible multiple cpus will sk_data_ready in parallel.
> 
> Ummm...  I've been led to believe that sk_data_ready will *not* be called in
> parallel and that the code it calls can assume non-reentrancy.  Is this not
> the case.

Certainly not for UDP.

TCP input processing uses the socket lock, but UDP is fully parallel.

> 
> What about the patch I attached, whereby I use the encap_rcv() hook.  Do you
> say that won't work?

I am kind of busy today, will look at it next week.
diff mbox series

Patch

diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index c5af9955665b..c3114fa66c92 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1121,7 +1121,7 @@  int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
  * shut down and the local endpoint from going away, thus sk_user_data will not
  * be cleared until this function returns.
  */
-void rxrpc_data_ready(struct sock *udp_sk)
+void rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_channel *chan;
@@ -1130,39 +1130,11 @@  void rxrpc_data_ready(struct sock *udp_sk)
 	struct rxrpc_local *local = udp_sk->sk_user_data;
 	struct rxrpc_peer *peer = NULL;
 	struct rxrpc_sock *rx = NULL;
-	struct sk_buff *skb;
 	unsigned int channel;
-	int ret, skew = 0;
+	int skew = 0;
 
 	_enter("%p", udp_sk);
 
-	ASSERT(!irqs_disabled());
-
-	skb = skb_recv_udp(udp_sk, 0, 1, &ret);
-	if (!skb) {
-		if (ret == -EAGAIN)
-			return;
-		_debug("UDP socket error %d", ret);
-		return;
-	}
-
-	if (skb->tstamp == 0)
-		skb->tstamp = ktime_get_real();
-
-	rxrpc_new_skb(skb, rxrpc_skb_rx_received);
-
-	_net("recv skb %p", skb);
-
-	/* we'll probably need to checksum it (didn't call sock_recvmsg) */
-	if (skb_checksum_complete(skb)) {
-		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
-		__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INERRORS, 0);
-		_leave(" [CSUM failed]");
-		return;
-	}
-
-	__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INDATAGRAMS, 0);
-
 	/* The UDP protocol already released all skb resources;
 	 * we are free to add our own data there.
 	 */
@@ -1181,6 +1153,8 @@  void rxrpc_data_ready(struct sock *udp_sk)
 		}
 	}
 
+	if (skb->tstamp == 0)
+		skb->tstamp = ktime_get_real();
 	trace_rxrpc_rx_packet(sp);
 
 	switch (sp->hdr.type) {
@@ -1398,3 +1372,37 @@  void rxrpc_data_ready(struct sock *udp_sk)
 	rxrpc_reject_packet(local, skb);
 	_leave(" [badmsg]");
 }
+
+void rxrpc_data_ready(struct sock *udp_sk)
+{
+	struct sk_buff *skb;
+	int ret;
+
+	for (;;) {
+		skb = skb_recv_udp(udp_sk, 0, 1, &ret);
+		if (!skb) {
+			if (ret == -EAGAIN)
+				return;
+
+			/* If there was a transmission failure, we get an error
+			 * here that we need to ignore.
+			 */
+			_debug("UDP socket error %d", ret);
+			continue;
+		}
+
+		rxrpc_new_skb(skb, rxrpc_skb_rx_received);
+
+		/* we'll probably need to checksum it (didn't call sock_recvmsg) */
+		if (skb_checksum_complete(skb)) {
+			rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+			__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INERRORS, 0);
+			_debug("csum failed");
+			continue;
+		}
+
+		__UDP_INC_STATS(sock_net(udp_sk), UDP_MIB_INDATAGRAMS, 0);
+
+		rxrpc_input_packet(udp_sk, skb);
+	}
+}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 94d234e9c685..30862f44c9f1 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -122,6 +122,12 @@  static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 		return ret;
 	}
 
+	/* set the socket up */
+	sock = local->socket->sk;
+	sock->sk_user_data	= local;
+	sock->sk_data_ready	= rxrpc_data_ready;
+	sock->sk_error_report	= rxrpc_error_report;
+
 	/* if a local address was supplied then bind it */
 	if (local->srx.transport_len > sizeof(sa_family_t)) {
 		_debug("bind");
@@ -191,11 +197,6 @@  static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 		BUG();
 	}
 
-	/* set the socket up */
-	sock = local->socket->sk;
-	sock->sk_user_data	= local;
-	sock->sk_data_ready	= rxrpc_data_ready;
-	sock->sk_error_report	= rxrpc_error_report;
 	_leave(" = 0");
 	return 0;