diff mbox series

[mptcp-next] mptcp: keep track of receivers advertised window

Message ID 20200823011559.28689-1-fw@strlen.de
State Superseded, archived
Delegated to: Florian Westphal
Headers show
Series [mptcp-next] mptcp: keep track of receivers advertised window | expand

Commit Message

Florian Westphal Aug. 23, 2020, 1:15 a.m. UTC
Before sending 'x' new bytes also check that the new snd_una would
be within the permitted receive window.

Track the largest rcv_win seen on a subflow in the mptcp socket itself.
Its updated for every ACK that also contains a DSS ack.

When the window is exhausted, behave as if all subflow sockets are busy,
i.e. re-use existing wait logic.

Signed-off-by: Florian Westphal <fw@strlen.de>
---
 net/mptcp/options.c  | 16 +++++++++++++---
 net/mptcp/protocol.c | 33 ++++++++++++++++++++++++++++++---
 net/mptcp/protocol.h |  1 +
 3 files changed, 44 insertions(+), 6 deletions(-)

Comments

Mat Martineau Aug. 26, 2020, 12:10 a.m. UTC | #1
On Sun, 23 Aug 2020, Florian Westphal wrote:

> Before sending 'x' new bytes also check that the new snd_una would
> be within the permitted receive window.
>
> Track the largest rcv_win seen on a subflow in the mptcp socket itself.
> Its updated for every ACK that also contains a DSS ack.
>
> When the window is exhausted, behave as if all subflow sockets are busy,
> i.e. re-use existing wait logic.
>
> Signed-off-by: Florian Westphal <fw@strlen.de>
> ---
> net/mptcp/options.c  | 16 +++++++++++++---
> net/mptcp/protocol.c | 33 ++++++++++++++++++++++++++++++---
> net/mptcp/protocol.h |  1 +
> 3 files changed, 44 insertions(+), 6 deletions(-)
>
> diff --git a/net/mptcp/options.c b/net/mptcp/options.c
> index a52a05effac9..9ff2e85b398a 100644
> --- a/net/mptcp/options.c
> +++ b/net/mptcp/options.c
> @@ -784,11 +784,14 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
> 	return cur_ack;
> }
>
> -static void update_una(struct mptcp_sock *msk,
> -		       struct mptcp_options_received *mp_opt)
> +static void ack_update_msk(struct mptcp_sock *msk,
> +			   const struct sock *ssk,
> +			   struct mptcp_options_received *mp_opt)
> {
> 	u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
> +	u32 sk_snd_wnd, old_sk_snd_wnd = atomic_read(&msk->snd_wnd);
> 	u64 write_seq = READ_ONCE(msk->write_seq);
> +	u32 ssk_snd_wnd = tcp_sk(ssk)->snd_wnd;
>
> 	/* avoid ack expansion on update conflict, to reduce the risk of
> 	 * wrongly expanding to a future ack sequence number, which is way
> @@ -800,6 +803,13 @@ static void update_una(struct mptcp_sock *msk,
> 	if (after64(new_snd_una, write_seq))
> 		new_snd_una = old_snd_una;
>
> +	while (unlikely(ssk_snd_wnd > old_sk_snd_wnd)) {
> +		sk_snd_wnd = old_sk_snd_wnd;
> +
> +		old_sk_snd_wnd = atomic_cmpxchg(&msk->snd_wnd, sk_snd_wnd,
> +					        ssk_snd_wnd);
> +	}
> +


It looks like this would ratchet up msk->snd_wnd based on the largest 
subflow snd_wnd seen in the life of the connection.

Section 3.3.5 in the RFC says to "only update its local receive window 
values when the largest sequence number allowed (i.e., DATA_ACK + receive 
window) increases on the receipt of a DATA_ACK". So that would compare 
new_snd_una + ssk_snd_wnd and old_snd_una + sk_snd_wnd, and it would be 
possible to have mismatched values for msk->snd_wnd and msk->snd_una if 
there were concurrent reads or writes.

What if this patch added msk->wnd_end instead of msk->snd_wnd? Then the 
msk window information used at transmit time would always be valid, and it 
doesn't really matter if the msk->snd_una update lags a little bit.


Mat



> 	while (after64(new_snd_una, old_snd_una)) {
> 		snd_una = old_snd_una;
> 		old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
> @@ -900,7 +910,7 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb,
> 	 * monodirectional flows will stuck
> 	 */
> 	if (mp_opt.use_ack)
> -		update_una(msk, &mp_opt);
> +		ack_update_msk(msk, sk, &mp_opt);
>
> 	/* Zero-data-length packets are dropped by the caller and not
> 	 * propagated to the MPTCP layer, so the skb extension does not
> diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
> index 4dd5d35a8f39..994704f85f4c 100644
> --- a/net/mptcp/protocol.c
> +++ b/net/mptcp/protocol.c
> @@ -55,6 +55,12 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk)
> 	return msk->subflow;
> }
>
> +/* Returns end sequence number of the receiver's advertised window */
> +static u64 mptcp_wnd_end(const struct mptcp_sock *msk)
> +{
> +	return atomic64_read(&msk->snd_una) + atomic_read(&msk->snd_wnd);
> +}
> +
> static bool mptcp_is_tcpsk(struct sock *sk)
> {
> 	struct socket *sock = sk->sk_socket;
> @@ -769,6 +775,9 @@ static bool mptcp_is_writeable(struct mptcp_sock *msk)
> 	if (!sk_stream_is_writeable((struct sock *)msk))
> 		return false;
>
> +	if (!before64(msk->write_seq, mptcp_wnd_end(msk)))
> +		return false;
> +
> 	mptcp_for_each_subflow(msk, subflow) {
> 		if (READ_ONCE(subflow->writable))
> 			return true;
> @@ -915,6 +924,17 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
> 	}
>
> 	if (!retransmission) {
> +		u64 window_end = mptcp_wnd_end(msk);
> +
> +		if (!before64(*write_seq + avail_size, window_end)) {
> +			int allowed_size = window_end - *write_seq;
> +
> +			if (allowed_size <= 0)
> +				return -EAGAIN;
> +
> +			avail_size = allowed_size;
> +		}
> +
> 		/* reuse tail pfrag, if possible, or carve a new one from the
> 		 * page allocator
> 		 */
> @@ -1070,7 +1090,8 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
> 	sock_owned_by_me((struct sock *)msk);
>
> 	*sndbuf = 0;
> -	if (!mptcp_ext_cache_refill(msk))
> +	if (!mptcp_ext_cache_refill(msk) ||
> +	    !before64(msk->write_seq, mptcp_wnd_end(msk)))
> 		return NULL;
>
> 	if (__mptcp_check_fallback(msk)) {
> @@ -1221,6 +1242,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
> 	tx_ok = msg_data_left(msg);
> 	while (tx_ok) {
> 		struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
> +		bool zero_win;
>
> 		ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now,
> 					 &size_goal);
> @@ -1233,6 +1255,8 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
> 			break;
> 		}
>
> +		zero_win = !before64(msk->write_seq, mptcp_wnd_end(msk));
> +
> 		/* burst can be negative, we will try move to the next subflow
> 		 * at selection time, if possible.
> 		 */
> @@ -1269,11 +1293,12 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
> 		 * tcp level only.  So, we must also check the MPTCP socket
> 		 * limits before we send more data.
> 		 */
> -		if (unlikely(!sk_stream_memory_free(sk))) {
> +		if (unlikely(!sk_stream_memory_free(sk) || zero_win)) {
> 			tcp_push(ssk, msg->msg_flags, mss_now,
> 				 tcp_sk(ssk)->nonagle, size_goal);
> 			mptcp_clean_una(sk);
> -			if (!sk_stream_memory_free(sk)) {
> +			zero_win = !before64(msk->write_seq, mptcp_wnd_end(msk));
> +			if (!sk_stream_memory_free(sk) || zero_win) {
> 				/* can't send more for now, need to wait for
> 				 * MPTCP-level ACKs from peer.
> 				 *
> @@ -2079,6 +2104,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk)
> 				      TCP_INIT_CWND * tp->advmss);
> 	if (msk->rcvq_space.space == 0)
> 		msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT;
> +
> +	atomic_set(&msk->snd_wnd, tp->snd_wnd);
> }
>
> static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
> diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
> index 4b8a5308aeed..e3d0e1faea05 100644
> --- a/net/mptcp/protocol.h
> +++ b/net/mptcp/protocol.h
> @@ -200,6 +200,7 @@ struct mptcp_sock {
> 	u64		ack_seq;
> 	u64		rcv_data_fin_seq;
> 	struct sock	*last_snd;
> +	atomic_t	snd_wnd;
> 	int		snd_burst;
> 	atomic64_t	snd_una;
> 	unsigned long	timer_ival;
> -- 
> 2.26.2
> _______________________________________________
> mptcp mailing list -- mptcp@lists.01.org
> To unsubscribe send an email to mptcp-leave@lists.01.org
>

--
Mat Martineau
Intel
Florian Westphal Aug. 26, 2020, 6:44 p.m. UTC | #2
Mat Martineau <mathew.j.martineau@linux.intel.com> wrote:
> On Sun, 23 Aug 2020, Florian Westphal wrote:
> It looks like this would ratchet up msk->snd_wnd based on the largest
> subflow snd_wnd seen in the life of the connection.
> 
> Section 3.3.5 in the RFC says to "only update its local receive window
> values when the largest sequence number allowed (i.e., DATA_ACK + receive
> window) increases on the receipt of a DATA_ACK". So that would compare
> new_snd_una + ssk_snd_wnd and old_snd_una + sk_snd_wnd, and it would be
> possible to have mismatched values for msk->snd_wnd and msk->snd_una if
> there were concurrent reads or writes.
> 
> What if this patch added msk->wnd_end instead of msk->snd_wnd? Then the msk
> window information used at transmit time would always be valid, and it
> doesn't really matter if the msk->snd_una update lags a little bit.

Sounds like a good plan, I will look into it.  Thanks for reviewing!
diff mbox series

Patch

diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index a52a05effac9..9ff2e85b398a 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -784,11 +784,14 @@  static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
 	return cur_ack;
 }
 
-static void update_una(struct mptcp_sock *msk,
-		       struct mptcp_options_received *mp_opt)
+static void ack_update_msk(struct mptcp_sock *msk,
+			   const struct sock *ssk,
+			   struct mptcp_options_received *mp_opt)
 {
 	u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
+	u32 sk_snd_wnd, old_sk_snd_wnd = atomic_read(&msk->snd_wnd);
 	u64 write_seq = READ_ONCE(msk->write_seq);
+	u32 ssk_snd_wnd = tcp_sk(ssk)->snd_wnd;
 
 	/* avoid ack expansion on update conflict, to reduce the risk of
 	 * wrongly expanding to a future ack sequence number, which is way
@@ -800,6 +803,13 @@  static void update_una(struct mptcp_sock *msk,
 	if (after64(new_snd_una, write_seq))
 		new_snd_una = old_snd_una;
 
+	while (unlikely(ssk_snd_wnd > old_sk_snd_wnd)) {
+		sk_snd_wnd = old_sk_snd_wnd;
+
+		old_sk_snd_wnd = atomic_cmpxchg(&msk->snd_wnd, sk_snd_wnd,
+					        ssk_snd_wnd);
+	}
+
 	while (after64(new_snd_una, old_snd_una)) {
 		snd_una = old_snd_una;
 		old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
@@ -900,7 +910,7 @@  void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb,
 	 * monodirectional flows will stuck
 	 */
 	if (mp_opt.use_ack)
-		update_una(msk, &mp_opt);
+		ack_update_msk(msk, sk, &mp_opt);
 
 	/* Zero-data-length packets are dropped by the caller and not
 	 * propagated to the MPTCP layer, so the skb extension does not
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 4dd5d35a8f39..994704f85f4c 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -55,6 +55,12 @@  static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk)
 	return msk->subflow;
 }
 
+/* Returns end sequence number of the receiver's advertised window */
+static u64 mptcp_wnd_end(const struct mptcp_sock *msk)
+{
+	return atomic64_read(&msk->snd_una) + atomic_read(&msk->snd_wnd);
+}
+
 static bool mptcp_is_tcpsk(struct sock *sk)
 {
 	struct socket *sock = sk->sk_socket;
@@ -769,6 +775,9 @@  static bool mptcp_is_writeable(struct mptcp_sock *msk)
 	if (!sk_stream_is_writeable((struct sock *)msk))
 		return false;
 
+	if (!before64(msk->write_seq, mptcp_wnd_end(msk)))
+		return false;
+
 	mptcp_for_each_subflow(msk, subflow) {
 		if (READ_ONCE(subflow->writable))
 			return true;
@@ -915,6 +924,17 @@  static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
 	}
 
 	if (!retransmission) {
+		u64 window_end = mptcp_wnd_end(msk);
+
+		if (!before64(*write_seq + avail_size, window_end)) {
+			int allowed_size = window_end - *write_seq;
+
+			if (allowed_size <= 0)
+				return -EAGAIN;
+
+			avail_size = allowed_size;
+		}
+
 		/* reuse tail pfrag, if possible, or carve a new one from the
 		 * page allocator
 		 */
@@ -1070,7 +1090,8 @@  static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
 	sock_owned_by_me((struct sock *)msk);
 
 	*sndbuf = 0;
-	if (!mptcp_ext_cache_refill(msk))
+	if (!mptcp_ext_cache_refill(msk) ||
+	    !before64(msk->write_seq, mptcp_wnd_end(msk)))
 		return NULL;
 
 	if (__mptcp_check_fallback(msk)) {
@@ -1221,6 +1242,7 @@  static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 	tx_ok = msg_data_left(msg);
 	while (tx_ok) {
 		struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
+		bool zero_win;
 
 		ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now,
 					 &size_goal);
@@ -1233,6 +1255,8 @@  static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 			break;
 		}
 
+		zero_win = !before64(msk->write_seq, mptcp_wnd_end(msk));
+
 		/* burst can be negative, we will try move to the next subflow
 		 * at selection time, if possible.
 		 */
@@ -1269,11 +1293,12 @@  static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
 		 * tcp level only.  So, we must also check the MPTCP socket
 		 * limits before we send more data.
 		 */
-		if (unlikely(!sk_stream_memory_free(sk))) {
+		if (unlikely(!sk_stream_memory_free(sk) || zero_win)) {
 			tcp_push(ssk, msg->msg_flags, mss_now,
 				 tcp_sk(ssk)->nonagle, size_goal);
 			mptcp_clean_una(sk);
-			if (!sk_stream_memory_free(sk)) {
+			zero_win = !before64(msk->write_seq, mptcp_wnd_end(msk));
+			if (!sk_stream_memory_free(sk) || zero_win) {
 				/* can't send more for now, need to wait for
 				 * MPTCP-level ACKs from peer.
 				 *
@@ -2079,6 +2104,8 @@  void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk)
 				      TCP_INIT_CWND * tp->advmss);
 	if (msk->rcvq_space.space == 0)
 		msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT;
+
+	atomic_set(&msk->snd_wnd, tp->snd_wnd);
 }
 
 static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 4b8a5308aeed..e3d0e1faea05 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -200,6 +200,7 @@  struct mptcp_sock {
 	u64		ack_seq;
 	u64		rcv_data_fin_seq;
 	struct sock	*last_snd;
+	atomic_t	snd_wnd;
 	int		snd_burst;
 	atomic64_t	snd_una;
 	unsigned long	timer_ival;