From patchwork Sun Aug 23 01:15:59 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Florian Westphal X-Patchwork-Id: 1349790 X-Patchwork-Delegate: fw@strlen.de Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=none (no SPF record) smtp.mailfrom=lists.01.org (client-ip=198.145.21.10; helo=ml01.01.org; envelope-from=mptcp-bounces@lists.01.org; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=strlen.de Received: from ml01.01.org (ml01.01.org [198.145.21.10]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4BYy5l319Jz9sPB for ; Sun, 23 Aug 2020 11:16:58 +1000 (AEST) Received: from ml01.vlan13.01.org (localhost [IPv6:::1]) by ml01.01.org (Postfix) with ESMTP id 02607135A5B34; Sat, 22 Aug 2020 18:16:25 -0700 (PDT) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=193.142.43.52; helo=chamillionaire.breakpoint.cc; envelope-from=fw@breakpoint.cc; receiver= Received: from Chamillionaire.breakpoint.cc (Chamillionaire.breakpoint.cc [193.142.43.52]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)) (No client certificate requested) by ml01.01.org (Postfix) with ESMTPS id 9B77D135A5B31 for ; Sat, 22 Aug 2020 18:16:22 -0700 (PDT) Received: from fw by Chamillionaire.breakpoint.cc with local (Exim 4.92) (envelope-from ) id 1k9ecN-0002hC-JI; Sun, 23 Aug 2020 03:16:19 +0200 From: Florian Westphal To: Cc: pabeni@redhat.com, Florian Westphal Date: Sun, 23 Aug 2020 03:15:59 +0200 Message-Id: <20200823011559.28689-1-fw@strlen.de> X-Mailer: git-send-email 2.26.2 MIME-Version: 1.0 Message-ID-Hash: DR2ZC5EJBEBZQASBUORYF6V7HKKSKZH6 X-Message-ID-Hash: DR2ZC5EJBEBZQASBUORYF6V7HKKSKZH6 X-MailFrom: fw@breakpoint.cc X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; emergency; loop; banned-address; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; suspicious-header X-Mailman-Version: 3.1.1 Precedence: list Subject: [MPTCP] [PATCH mptcp-next] mptcp: keep track of receivers advertised window List-Id: Discussions regarding MPTCP upstreaming Archived-At: List-Archive: List-Help: List-Post: List-Subscribe: List-Unsubscribe: Before sending 'x' new bytes also check that the new snd_una would be within the permitted receive window. Track the largest rcv_win seen on a subflow in the mptcp socket itself. Its updated for every ACK that also contains a DSS ack. When the window is exhausted, behave as if all subflow sockets are busy, i.e. re-use existing wait logic. Signed-off-by: Florian Westphal --- net/mptcp/options.c | 16 +++++++++++++--- net/mptcp/protocol.c | 33 ++++++++++++++++++++++++++++++--- net/mptcp/protocol.h | 1 + 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/net/mptcp/options.c b/net/mptcp/options.c index a52a05effac9..9ff2e85b398a 100644 --- a/net/mptcp/options.c +++ b/net/mptcp/options.c @@ -784,11 +784,14 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit) return cur_ack; } -static void update_una(struct mptcp_sock *msk, - struct mptcp_options_received *mp_opt) +static void ack_update_msk(struct mptcp_sock *msk, + const struct sock *ssk, + struct mptcp_options_received *mp_opt) { u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una); + u32 sk_snd_wnd, old_sk_snd_wnd = atomic_read(&msk->snd_wnd); u64 write_seq = READ_ONCE(msk->write_seq); + u32 ssk_snd_wnd = tcp_sk(ssk)->snd_wnd; /* avoid ack expansion on update conflict, to reduce the risk of * wrongly expanding to a future ack sequence number, which is way @@ -800,6 +803,13 @@ static void update_una(struct mptcp_sock *msk, if (after64(new_snd_una, write_seq)) new_snd_una = old_snd_una; + while (unlikely(ssk_snd_wnd > old_sk_snd_wnd)) { + sk_snd_wnd = old_sk_snd_wnd; + + old_sk_snd_wnd = atomic_cmpxchg(&msk->snd_wnd, sk_snd_wnd, + ssk_snd_wnd); + } + while (after64(new_snd_una, old_snd_una)) { snd_una = old_snd_una; old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una, @@ -900,7 +910,7 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb, * monodirectional flows will stuck */ if (mp_opt.use_ack) - update_una(msk, &mp_opt); + ack_update_msk(msk, sk, &mp_opt); /* Zero-data-length packets are dropped by the caller and not * propagated to the MPTCP layer, so the skb extension does not diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c index 4dd5d35a8f39..994704f85f4c 100644 --- a/net/mptcp/protocol.c +++ b/net/mptcp/protocol.c @@ -55,6 +55,12 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk) return msk->subflow; } +/* Returns end sequence number of the receiver's advertised window */ +static u64 mptcp_wnd_end(const struct mptcp_sock *msk) +{ + return atomic64_read(&msk->snd_una) + atomic_read(&msk->snd_wnd); +} + static bool mptcp_is_tcpsk(struct sock *sk) { struct socket *sock = sk->sk_socket; @@ -769,6 +775,9 @@ static bool mptcp_is_writeable(struct mptcp_sock *msk) if (!sk_stream_is_writeable((struct sock *)msk)) return false; + if (!before64(msk->write_seq, mptcp_wnd_end(msk))) + return false; + mptcp_for_each_subflow(msk, subflow) { if (READ_ONCE(subflow->writable)) return true; @@ -915,6 +924,17 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, } if (!retransmission) { + u64 window_end = mptcp_wnd_end(msk); + + if (!before64(*write_seq + avail_size, window_end)) { + int allowed_size = window_end - *write_seq; + + if (allowed_size <= 0) + return -EAGAIN; + + avail_size = allowed_size; + } + /* reuse tail pfrag, if possible, or carve a new one from the * page allocator */ @@ -1070,7 +1090,8 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk, sock_owned_by_me((struct sock *)msk); *sndbuf = 0; - if (!mptcp_ext_cache_refill(msk)) + if (!mptcp_ext_cache_refill(msk) || + !before64(msk->write_seq, mptcp_wnd_end(msk))) return NULL; if (__mptcp_check_fallback(msk)) { @@ -1221,6 +1242,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) tx_ok = msg_data_left(msg); while (tx_ok) { struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk); + bool zero_win; ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now, &size_goal); @@ -1233,6 +1255,8 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) break; } + zero_win = !before64(msk->write_seq, mptcp_wnd_end(msk)); + /* burst can be negative, we will try move to the next subflow * at selection time, if possible. */ @@ -1269,11 +1293,12 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) * tcp level only. So, we must also check the MPTCP socket * limits before we send more data. */ - if (unlikely(!sk_stream_memory_free(sk))) { + if (unlikely(!sk_stream_memory_free(sk) || zero_win)) { tcp_push(ssk, msg->msg_flags, mss_now, tcp_sk(ssk)->nonagle, size_goal); mptcp_clean_una(sk); - if (!sk_stream_memory_free(sk)) { + zero_win = !before64(msk->write_seq, mptcp_wnd_end(msk)); + if (!sk_stream_memory_free(sk) || zero_win) { /* can't send more for now, need to wait for * MPTCP-level ACKs from peer. * @@ -2079,6 +2104,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk) TCP_INIT_CWND * tp->advmss); if (msk->rcvq_space.space == 0) msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT; + + atomic_set(&msk->snd_wnd, tp->snd_wnd); } static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h index 4b8a5308aeed..e3d0e1faea05 100644 --- a/net/mptcp/protocol.h +++ b/net/mptcp/protocol.h @@ -200,6 +200,7 @@ struct mptcp_sock { u64 ack_seq; u64 rcv_data_fin_seq; struct sock *last_snd; + atomic_t snd_wnd; int snd_burst; atomic64_t snd_una; unsigned long timer_ival;