diff mbox series

[net-next,3/3] tipc: smooth change between replicast and broadcast

Message ID 20190219113054.13517-3-hoang.h.le@dektech.com.au
State Changes Requested
Delegated to: David Miller
Headers show
Series [net-next,1/3] tipc: support broadcast/replicast configurable for bc-link | expand

Commit Message

Hoang Huu Le Feb. 19, 2019, 11:30 a.m. UTC
Currently, a multicast stream may start out using replicast, because
there are few destinations, and then it should ideally switch to
L2/broadcast IGMP/multicast when the number of destinations grows beyond
a certain limit. The opposite should happen when the number decreases
below the limit.

To eliminate the risk of message reordering caused by method change,
a sending socket must stick to a previously selected method until it
enters an idle period of 5 seconds. Means there is a 5 seconds pause
in the traffic from the sender socket.

If the sender never makes such a pause, the method will never change,
and transmission may become very inefficient as the cluster grows.

With this commit, we allow such a switch between replicast and
broadcast without any need for a traffic pause.

Solution is to send a dummy message with only the header, also with
the SYN bit set, via broadcast or replicast. For the data message,
the SYN bit is set and sending via replicast or broadcast (inverse
method with dummy).

Then, at receiving side any messages follow first SYN bit message
(data or dummy message), they will be held in deferred queue until
another pair (dummy or data message) arrived in other link.

Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Hoang Le <hoang.h.le@dektech.com.au>
---
 net/tipc/bcast.c  | 165 +++++++++++++++++++++++++++++++++++++++++++++-
 net/tipc/bcast.h  |   5 ++
 net/tipc/msg.h    |  10 +++
 net/tipc/socket.c |   5 ++
 4 files changed, 184 insertions(+), 1 deletion(-)

Comments

David Miller Feb. 22, 2019, midnight UTC | #1
From: Hoang Le <hoang.h.le@dektech.com.au>
Date: Tue, 19 Feb 2019 18:30:54 +0700

> +static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
> +				struct tipc_mc_method *method,
> +				struct tipc_nlist *dests,
> +				u16 *cong_link_cnt)
> +{
> +	struct sk_buff_head tmpq;
> +	struct sk_buff *_skb;
> +	struct tipc_msg *hdr, *_hdr;

Reverse christmas tree please.

> @@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
>  		    u16 *cong_link_cnt)
>  {
>  	struct sk_buff_head inputq, localq;
> +	struct sk_buff *skb;
> +	struct tipc_msg *hdr;
> +	bool rcast = method->rcast;
>  	int rc = 0;

Likewise.

Thanks.
diff mbox series

Patch

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 12b59268bdd6..de11a036821e 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -220,9 +220,24 @@  static void tipc_bcast_select_xmit_method(struct net *net, int dests,
 	}
 	/* Can current method be changed ? */
 	method->expires = jiffies + TIPC_METHOD_EXPIRE;
-	if (method->mandatory || time_before(jiffies, exp))
+	if (method->mandatory)
 		return;
 
+	if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
+	    time_before(jiffies, exp))
+		return;
+
+	/* Configuration as force 'broadcast' method */
+	if (bb->force_bcast) {
+		method->rcast = false;
+		return;
+	}
+	/* Configuration as force 'replicast' method */
+	if (bb->force_rcast) {
+		method->rcast = true;
+		return;
+	}
+	/* Configuration as 'autoselect' or default method */
 	/* Determine method to use now */
 	method->rcast = dests <= bb->bc_threshold;
 }
@@ -285,6 +300,63 @@  static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
 	return 0;
 }
 
+/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
+ * @net: the applicable net namespace
+ * @skb: socket buffer to copy
+ * @method: send method to be used
+ * @dests: destination nodes for message.
+ * @cong_link_cnt: returns number of encountered congested destination links
+ * Returns 0 if success, otherwise errno
+ */
+static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
+				struct tipc_mc_method *method,
+				struct tipc_nlist *dests,
+				u16 *cong_link_cnt)
+{
+	struct sk_buff_head tmpq;
+	struct sk_buff *_skb;
+	struct tipc_msg *hdr, *_hdr;
+
+	/* Is a cluster supporting with new capabilities ? */
+	if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
+		return 0;
+
+	hdr = buf_msg(skb);
+	if (msg_user(hdr) == MSG_FRAGMENTER)
+		hdr = msg_get_wrapped(hdr);
+	if (msg_type(hdr) != TIPC_MCAST_MSG)
+		return 0;
+
+	/* Allocate dummy message */
+	_skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
+	if (!skb)
+		return -ENOMEM;
+
+	/* Preparing for 'synching' header */
+	msg_set_syn(hdr, 1);
+
+	/* Copy skb's header into a dummy header */
+	skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
+	skb_orphan(_skb);
+
+	/* Reverse method for dummy message */
+	_hdr = buf_msg(_skb);
+	msg_set_size(_hdr, MCAST_H_SIZE);
+	msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
+
+	skb_queue_head_init(&tmpq);
+	__skb_queue_tail(&tmpq, _skb);
+	if (method->rcast)
+		tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
+	else
+		tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
+
+	/* This queue should normally be empty by now */
+	__skb_queue_purge(&tmpq);
+
+	return 0;
+}
+
 /* tipc_mcast_xmit - deliver message to indicated destination nodes
  *                   and to identified node local sockets
  * @net: the applicable net namespace
@@ -300,6 +372,9 @@  int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
 		    u16 *cong_link_cnt)
 {
 	struct sk_buff_head inputq, localq;
+	struct sk_buff *skb;
+	struct tipc_msg *hdr;
+	bool rcast = method->rcast;
 	int rc = 0;
 
 	skb_queue_head_init(&inputq);
@@ -313,6 +388,18 @@  int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
 	/* Send according to determined transmit method */
 	if (dests->remote) {
 		tipc_bcast_select_xmit_method(net, dests->remote, method);
+
+		skb = skb_peek(pkts);
+		hdr = buf_msg(skb);
+		if (msg_user(hdr) == MSG_FRAGMENTER)
+			hdr = msg_get_wrapped(hdr);
+		msg_set_is_rcast(hdr, method->rcast);
+
+		/* Switch method ? */
+		if (rcast != method->rcast)
+			tipc_mcast_send_sync(net, skb, method,
+					     dests, cong_link_cnt);
+
 		if (method->rcast)
 			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
 		else
@@ -672,3 +759,79 @@  u32 tipc_bcast_get_broadcast_ratio(struct net *net)
 
 	return bb->rc_ratio;
 }
+
+void tipc_mcast_filter_msg(struct sk_buff_head *defq,
+			   struct sk_buff_head *inputq)
+{
+	struct sk_buff *skb, *_skb, *tmp;
+	struct tipc_msg *hdr, *_hdr;
+	bool match = false;
+	u32 node, port;
+
+	skb = skb_peek(inputq);
+	hdr = buf_msg(skb);
+
+	if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
+		return;
+
+	node = msg_orignode(hdr);
+	port = msg_origport(hdr);
+
+	/* Has the twin SYN message already arrived ? */
+	skb_queue_walk(defq, _skb) {
+		_hdr = buf_msg(_skb);
+		if (msg_orignode(_hdr) != node)
+			continue;
+		if (msg_origport(_hdr) != port)
+			continue;
+		match = true;
+		break;
+	}
+
+	if (!match) {
+		if (!msg_is_syn(hdr))
+			return;
+		__skb_dequeue(inputq);
+		__skb_queue_tail(defq, skb);
+		return;
+	}
+
+	/* Deliver non-SYN message from other link, otherwise queue it */
+	if (!msg_is_syn(hdr)) {
+		if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
+			return;
+		__skb_dequeue(inputq);
+		__skb_queue_tail(defq, skb);
+		return;
+	}
+
+	/* Queue non-SYN/SYN message from same link */
+	if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
+		__skb_dequeue(inputq);
+		__skb_queue_tail(defq, skb);
+		return;
+	}
+
+	/* Matching SYN messages => return the one with data, if any */
+	__skb_unlink(_skb, defq);
+	if (msg_data_sz(hdr)) {
+		kfree_skb(_skb);
+	} else {
+		__skb_dequeue(inputq);
+		kfree_skb(skb);
+		__skb_queue_tail(inputq, _skb);
+	}
+
+	/* Deliver subsequent non-SYN messages from same peer */
+	skb_queue_walk_safe(defq, _skb, tmp) {
+		_hdr = buf_msg(_skb);
+		if (msg_orignode(_hdr) != node)
+			continue;
+		if (msg_origport(_hdr) != port)
+			continue;
+		if (msg_is_syn(_hdr))
+			break;
+		__skb_unlink(_skb, defq);
+		__skb_queue_tail(inputq, _skb);
+	}
+}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 37c55e7347a5..484bde289d3a 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -67,11 +67,13 @@  void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
 /* Cookie to be used between socket and broadcast layer
  * @rcast: replicast (instead of broadcast) was used at previous xmit
  * @mandatory: broadcast/replicast indication was set by user
+ * @deferredq: defer queue to make message in order
  * @expires: re-evaluate non-mandatory transmit method if we are past this
  */
 struct tipc_mc_method {
 	bool rcast;
 	bool mandatory;
+	struct sk_buff_head deferredq;
 	unsigned long expires;
 };
 
@@ -99,6 +101,9 @@  int tipc_bclink_reset_stats(struct net *net);
 u32 tipc_bcast_get_broadcast_mode(struct net *net);
 u32 tipc_bcast_get_broadcast_ratio(struct net *net);
 
+void tipc_mcast_filter_msg(struct sk_buff_head *defq,
+			   struct sk_buff_head *inputq);
+
 static inline void tipc_bcast_lock(struct net *net)
 {
 	spin_lock_bh(&tipc_net(net)->bclock);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d7e4b8b93f9d..528ba9241acc 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -257,6 +257,16 @@  static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
 	msg_set_bits(m, 0, 18, 1, d);
 }
 
+static inline bool msg_is_rcast(struct tipc_msg *m)
+{
+	return msg_bits(m, 0, 18, 0x1);
+}
+
+static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
+{
+	msg_set_bits(m, 0, 18, 0x1, d);
+}
+
 static inline void msg_set_size(struct tipc_msg *m, u32 sz)
 {
 	m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 8fc5acd4820d..de83eb1e718e 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -483,6 +483,7 @@  static int tipc_sk_create(struct net *net, struct socket *sock,
 		tsk_set_unreturnable(tsk, true);
 		if (sock->type == SOCK_DGRAM)
 			tsk_set_unreliable(tsk, true);
+		__skb_queue_head_init(&tsk->mc_method.deferredq);
 	}
 
 	trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@@ -580,6 +581,7 @@  static int tipc_release(struct socket *sock)
 	sk->sk_shutdown = SHUTDOWN_MASK;
 	tipc_sk_leave(tsk);
 	tipc_sk_withdraw(tsk, 0, NULL);
+	__skb_queue_purge(&tsk->mc_method.deferredq);
 	sk_stop_timer(sk, &sk->sk_timer);
 	tipc_sk_remove(tsk);
 
@@ -2157,6 +2159,9 @@  static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 	if (unlikely(grp))
 		tipc_group_filter_msg(grp, &inputq, xmitq);
 
+	if (msg_type(hdr) == TIPC_MCAST_MSG)
+		tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq);
+
 	/* Validate and add to receive buffer if there is space */
 	while ((skb = __skb_dequeue(&inputq))) {
 		hdr = buf_msg(skb);