[net-next,12/15] mptcp: recvmsg() can drain data from multiple subflows
diff mbox series

Message ID 20191214060417.2870-13-mathew.j.martineau@linux.intel.com
State Changes Requested
Delegated to: David Miller
Headers show
Series
  • Multipath TCP part 2: Single subflow
Related show

Commit Message

Mat Martineau Dec. 14, 2019, 6:04 a.m. UTC
From: Paolo Abeni <pabeni@redhat.com>

With the previous patch in place, the msk can detect which subflow
has the current map with a simple walk, let's update the main
loop to always select the 'current' subflow. The exit conditions now
closely mirror tcp_recvmsg() to get expected timeout and signal
behavior.

Co-developed-by: Peter Krystad <peter.krystad@linux.intel.com>
Signed-off-by: Peter Krystad <peter.krystad@linux.intel.com>
Co-developed-by: Davide Caratti <dcaratti@redhat.com>
Signed-off-by: Davide Caratti <dcaratti@redhat.com>
Co-developed-by: Matthieu Baerts <matthieu.baerts@tessares.net>
Signed-off-by: Matthieu Baerts <matthieu.baerts@tessares.net>
Co-developed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
Signed-off-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
Co-developed-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Signed-off-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
---
 net/mptcp/protocol.c | 178 ++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 168 insertions(+), 10 deletions(-)

Patch
diff mbox series

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 44e6f0d1070c..fd89075bd1c9 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -9,6 +9,8 @@ 
 #include <linux/kernel.h>
 #include <linux/module.h>
 #include <linux/netdevice.h>
+#include <linux/sched/signal.h>
+#include <linux/atomic.h>
 #include <net/sock.h>
 #include <net/inet_common.h>
 #include <net/inet_hashtables.h>
@@ -106,6 +108,21 @@  static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
 	return !!msk->cached_ext;
 }
 
+static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
+{
+	struct sock *sk = (struct sock *)msk;
+	struct mptcp_subflow_context *subflow;
+
+	sock_owned_by_me(sk);
+
+	mptcp_for_each_subflow(msk, subflow) {
+		if (subflow->data_avail)
+			return mptcp_subflow_tcp_sock(subflow);
+	}
+
+	return NULL;
+}
+
 static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 			      struct msghdr *msg, long *timeo)
 {
@@ -271,13 +288,37 @@  int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
 	return copy_len;
 }
 
+static void mptcp_wait_data(struct sock *sk, long *timeo)
+{
+	DEFINE_WAIT_FUNC(wait, woken_wake_function);
+	struct mptcp_sock *msk = mptcp_sk(sk);
+
+	add_wait_queue(sk_sleep(sk), &wait);
+	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+
+	sk_wait_event(sk, timeo,
+		      test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
+
+	sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+	remove_wait_queue(sk_sleep(sk), &wait);
+}
+
 static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 			 int nonblock, int flags, int *addr_len)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct mptcp_subflow_context *subflow;
+	bool more_data_avail = false;
+	struct mptcp_read_arg arg;
+	read_descriptor_t desc;
+	bool wait_data = false;
 	struct socket *ssock;
+	struct tcp_sock *tp;
+	bool done = false;
 	struct sock *ssk;
 	int copied = 0;
+	int target;
+	long timeo;
 
 	lock_sock(sk);
 	ssock = __mptcp_tcp_fallback(msk);
@@ -289,16 +330,124 @@  static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
 		return copied;
 	}
 
-	ssk = mptcp_subflow_get(msk);
-	if (!ssk) {
-		release_sock(sk);
-		return -ENOTCONN;
+	arg.msg = msg;
+	desc.arg.data = &arg;
+	desc.error = 0;
+
+	timeo = sock_rcvtimeo(sk, nonblock);
+
+	len = min_t(size_t, len, INT_MAX);
+	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
+
+	while (!done) {
+		u32 map_remaining;
+		int bytes_read;
+
+		ssk = mptcp_subflow_recv_lookup(msk);
+		pr_debug("msk=%p ssk=%p", msk, ssk);
+		if (!ssk)
+			goto wait_for_data;
+
+		subflow = mptcp_subflow_ctx(ssk);
+		tp = tcp_sk(ssk);
+
+		lock_sock(ssk);
+		do {
+			/* try to read as much data as available */
+			map_remaining = subflow->map_data_len -
+					mptcp_subflow_get_map_offset(subflow);
+			desc.count = min_t(size_t, len - copied, map_remaining);
+			pr_debug("reading %zu bytes, copied %d", desc.count,
+				 copied);
+			bytes_read = tcp_read_sock(ssk, &desc,
+						   mptcp_read_actor);
+			if (bytes_read < 0) {
+				if (!copied)
+					copied = bytes_read;
+				done = true;
+				goto next;
+			}
+
+			pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
+				 msk->ack_seq + bytes_read);
+			msk->ack_seq += bytes_read;
+			copied += bytes_read;
+			if (copied >= len) {
+				done = true;
+				goto next;
+			}
+			if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
+				pr_err("Urgent data present, cannot proceed");
+				done = true;
+				goto next;
+			}
+next:
+			more_data_avail = mptcp_subflow_data_available(ssk);
+		} while (more_data_avail && !done);
+		release_sock(ssk);
+		continue;
+
+wait_for_data:
+		more_data_avail = false;
+
+		/* only the master socket status is relevant here. The exit
+		 * conditions mirror closely tcp_recvmsg()
+		 */
+		if (copied >= target)
+			break;
+
+		if (copied) {
+			if (sk->sk_err ||
+			    sk->sk_state == TCP_CLOSE ||
+			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
+			    !timeo ||
+			    signal_pending(current))
+				break;
+		} else {
+			if (sk->sk_err) {
+				copied = sock_error(sk);
+				break;
+			}
+
+			if (sk->sk_shutdown & RCV_SHUTDOWN)
+				break;
+
+			if (sk->sk_state == TCP_CLOSE) {
+				copied = -ENOTCONN;
+				break;
+			}
+
+			if (!timeo) {
+				copied = -EAGAIN;
+				break;
+			}
+
+			if (signal_pending(current)) {
+				copied = sock_intr_errno(timeo);
+				break;
+			}
+		}
+
+		pr_debug("block timeout %ld", timeo);
+		wait_data = true;
+		mptcp_wait_data(sk, &timeo);
 	}
 
-	copied = sock_recvmsg(ssk->sk_socket, msg, flags);
+	if (more_data_avail) {
+		if (!test_bit(MPTCP_DATA_READY, &msk->flags))
+			set_bit(MPTCP_DATA_READY, &msk->flags);
+	} else if (!wait_data) {
+		clear_bit(MPTCP_DATA_READY, &msk->flags);
 
-	release_sock(sk);
+		/* .. race-breaker: ssk might get new data after last
+		 * data_available() returns false.
+		 */
+		ssk = mptcp_subflow_recv_lookup(msk);
+		if (unlikely(ssk))
+			set_bit(MPTCP_DATA_READY, &msk->flags);
+	}
 
+	release_sock(sk);
 	return copied;
 }
 
@@ -459,10 +608,6 @@  static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
 		msk->write_seq = subflow->idsn + 1;
 		ack_seq++;
 		msk->ack_seq = ack_seq;
-		subflow->map_seq = ack_seq;
-		subflow->map_subflow_seq = 1;
-		subflow->rel_write_seq = 1;
-		subflow->tcp_sock = ssk;
 		newsk = new_mptcp_sock;
 		mptcp_copy_inaddrs(newsk, ssk);
 		list_add(&subflow->node, &msk->conn_list);
@@ -474,6 +619,19 @@  static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
 		bh_unlock_sock(new_mptcp_sock);
 		local_bh_enable();
 		release_sock(sk);
+
+		/* the subflow can already receive packet, avoid racing with
+		 * the receive path and process the pending ones
+		 */
+		lock_sock(ssk);
+		subflow->map_seq = ack_seq;
+		subflow->map_subflow_seq = 1;
+		subflow->rel_write_seq = 1;
+		subflow->tcp_sock = ssk;
+		subflow->conn = new_mptcp_sock;
+		if (unlikely(!skb_queue_empty(&ssk->sk_receive_queue)))
+			mptcp_subflow_data_available(ssk);
+		release_sock(ssk);
 	} else {
 		tcp_sk(newsk)->is_mptcp = 0;
 	}