diff mbox series

[net-next,10/18] tipc: introduce flow control for group broadcast messages

Message ID 1507816959-31787-11-git-send-email-jon.maloy@ericsson.com
State Changes Requested, archived
Delegated to: David Miller
Headers show
Series tipc: Introduce Communcation Group feature | expand

Commit Message

Jon Maloy Oct. 12, 2017, 2:02 p.m. UTC
We introduce an end-to-end flow control mechanism for group broadcast
messages. This ensures that no messages are ever lost because of
destination receive buffer overflow, with minimal impact on performance.
For now, the algorithm is based on the assumption that there is only one
active transmitter at any moment in time.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 net/tipc/group.c  | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 net/tipc/group.h  |  11 ++--
 net/tipc/msg.h    |   5 +-
 net/tipc/socket.c |  48 +++++++++++++-----
 4 files changed, 190 insertions(+), 22 deletions(-)
diff mbox series

Patch

diff --git a/net/tipc/group.c b/net/tipc/group.c
index fe2a76f..50dd7d1 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -46,6 +46,7 @@ 
 
 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
 #define ADV_IDLE ADV_UNIT
+#define ADV_ACTIVE (ADV_UNIT * 12)
 
 enum mbr_state {
 	MBR_QUARANTINED,
@@ -59,16 +60,22 @@  enum mbr_state {
 struct tipc_member {
 	struct rb_node tree_node;
 	struct list_head list;
+	struct list_head congested;
 	struct sk_buff *event_msg;
+	struct tipc_group *group;
 	u32 node;
 	u32 port;
 	u32 instance;
 	enum mbr_state state;
+	u16 advertised;
+	u16 window;
 	u16 bc_rcv_nxt;
+	bool usr_pending;
 };
 
 struct tipc_group {
 	struct rb_root members;
+	struct list_head congested;
 	struct tipc_nlist dests;
 	struct net *net;
 	int subid;
@@ -86,11 +93,24 @@  struct tipc_group {
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 				  int mtyp, struct sk_buff_head *xmitq);
 
+static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
+{
+	int mcnt = grp->member_cnt + 1;
+
+	/* Scale to bytes, considering worst-case truesize/msgsize ratio */
+	return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
+}
+
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
 {
 	return grp->bc_snd_nxt;
 }
 
+static bool tipc_group_is_enabled(struct tipc_member *m)
+{
+	return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
+}
+
 static bool tipc_group_is_receiver(struct tipc_member *m)
 {
 	return m && m->state >= MBR_JOINED;
@@ -111,6 +131,7 @@  struct tipc_group *tipc_group_create(struct net *net, u32 portid,
 	if (!grp)
 		return NULL;
 	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
+	INIT_LIST_HEAD(&grp->congested);
 	grp->members = RB_ROOT;
 	grp->net = net;
 	grp->portid = portid;
@@ -213,6 +234,8 @@  static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
 	if (!m)
 		return NULL;
 	INIT_LIST_HEAD(&m->list);
+	INIT_LIST_HEAD(&m->congested);
+	m->group = grp;
 	m->node = node;
 	m->port = port;
 	grp->member_cnt++;
@@ -233,6 +256,7 @@  static void tipc_group_delete_member(struct tipc_group *grp,
 	rb_erase(&m->tree_node, &grp->members);
 	grp->member_cnt--;
 	list_del_init(&m->list);
+	list_del_init(&m->congested);
 
 	/* If last member on a node, remove node from dest list */
 	if (!tipc_group_find_node(grp, m->node))
@@ -255,11 +279,59 @@  void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
 	*scope = grp->scope;
 }
 
-void tipc_group_update_bc_members(struct tipc_group *grp)
+void tipc_group_update_member(struct tipc_member *m, int len)
+{
+	struct tipc_group *grp = m->group;
+	struct tipc_member *_m, *tmp;
+
+	if (!tipc_group_is_enabled(m))
+		return;
+
+	m->window -= len;
+
+	if (m->window >= ADV_IDLE)
+		return;
+
+	if (!list_empty(&m->congested))
+		return;
+
+	/* Sort member into congested members' list */
+	list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
+		if (m->window > _m->window)
+			continue;
+		list_add_tail(&m->congested, &_m->congested);
+		return;
+	}
+	list_add_tail(&m->congested, &grp->congested);
+}
+
+void tipc_group_update_bc_members(struct tipc_group *grp, int len)
 {
+	struct tipc_member *m;
+	struct rb_node *n;
+
+	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
+		m = container_of(n, struct tipc_member, tree_node);
+		if (tipc_group_is_enabled(m))
+			tipc_group_update_member(m, len);
+	}
 	grp->bc_snd_nxt++;
 }
 
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+{
+	struct tipc_member *m;
+
+	if (list_empty(&grp->congested))
+		return false;
+
+	m = list_first_entry(&grp->congested, struct tipc_member, congested);
+	if (m->window >= len)
+		return false;
+
+	return true;
+}
+
 /* tipc_group_filter_msg() - determine if we should accept arriving message
  */
 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
@@ -302,11 +374,36 @@  void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
 	kfree_skb(skb);
 }
 
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+			       u32 port, struct sk_buff_head *xmitq)
+{
+	struct tipc_member *m;
+
+	m = tipc_group_find_member(grp, node, port);
+	if (!m)
+		return;
+
+	m->advertised -= blks;
+
+	switch (m->state) {
+	case MBR_JOINED:
+		if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
+			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+		break;
+	case MBR_DISCOVERED:
+	case MBR_JOINING:
+	case MBR_LEAVING:
+	default:
+		break;
+	}
+}
+
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 				  int mtyp, struct sk_buff_head *xmitq)
 {
 	struct sk_buff *skb;
 	struct tipc_msg *hdr;
+	int adv = 0;
 
 	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
 			      m->node, tipc_own_addr(grp->net),
@@ -314,14 +411,24 @@  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
 	if (!skb)
 		return;
 
+	if (m->state == MBR_JOINED)
+		adv = ADV_ACTIVE - m->advertised;
+
 	hdr = buf_msg(skb);
-	if (mtyp == GRP_JOIN_MSG)
+
+	if (mtyp == GRP_JOIN_MSG) {
 		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
+		msg_set_adv_win(hdr, adv);
+		m->advertised += adv;
+	} else if (mtyp == GRP_ADV_MSG) {
+		msg_set_adv_win(hdr, adv);
+		m->advertised += adv;
+	}
 	__skb_queue_tail(xmitq, skb);
 }
 
-void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
-			  struct sk_buff_head *inputq,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
+			  struct tipc_msg *hdr, struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq)
 {
 	struct tipc_member *m;
@@ -341,14 +448,22 @@  void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 		if (!m)
 			return;
 		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+		m->window += msg_adv_win(hdr);
 
 		/* Wait until PUBLISH event is received */
 		if (m->state == MBR_DISCOVERED) {
 			m->state = MBR_JOINING;
 		} else if (m->state == MBR_PUBLISHED) {
 			m->state = MBR_JOINED;
+			*usr_wakeup = true;
+			m->usr_pending = false;
+			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
 			__skb_queue_tail(inputq, m->event_msg);
 		}
+		if (m->window < ADV_IDLE)
+			tipc_group_update_member(m, 0);
+		else
+			list_del_init(&m->congested);
 		return;
 	case GRP_LEAVE_MSG:
 		if (!m)
@@ -361,14 +476,28 @@  void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
 		}
 		/* Otherwise deliver already received WITHDRAW event */
 		__skb_queue_tail(inputq, m->event_msg);
+		*usr_wakeup = m->usr_pending;
 		tipc_group_delete_member(grp, m);
+		list_del_init(&m->congested);
+		return;
+	case GRP_ADV_MSG:
+		if (!m)
+			return;
+		m->window += msg_adv_win(hdr);
+		*usr_wakeup = m->usr_pending;
+		m->usr_pending = false;
+		list_del_init(&m->congested);
 		return;
 	default:
 		pr_warn("Received unknown GROUP_PROTO message\n");
 	}
 }
 
+/* tipc_group_member_evt() - receive and handle a member up/down event
+ */
 void tipc_group_member_evt(struct tipc_group *grp,
+			   bool *usr_wakeup,
+			   int *sk_rcvbuf,
 			   struct sk_buff *skb,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq)
@@ -416,16 +545,25 @@  void tipc_group_member_evt(struct tipc_group *grp,
 		} else {
 			__skb_queue_tail(inputq, skb);
 			m->state = MBR_JOINED;
+			*usr_wakeup = true;
+			m->usr_pending = false;
 		}
 		m->instance = instance;
 		TIPC_SKB_CB(skb)->orig_member = m->instance;
 		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+		if (m->window < ADV_IDLE)
+			tipc_group_update_member(m, 0);
+		else
+			list_del_init(&m->congested);
 	} else if (event == TIPC_WITHDRAWN) {
 		if (!m)
 			goto drop;
 
 		TIPC_SKB_CB(skb)->orig_member = m->instance;
 
+		*usr_wakeup = m->usr_pending;
+		m->usr_pending = false;
+
 		/* Hold back event if more messages might be expected */
 		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
 			m->event_msg = skb;
@@ -434,7 +572,9 @@  void tipc_group_member_evt(struct tipc_group *grp,
 			__skb_queue_tail(inputq, skb);
 			tipc_group_delete_member(grp, m);
 		}
+		list_del_init(&m->congested);
 	}
+	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
 	return;
 drop:
 	kfree_skb(skb);
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 5d3f10d..0e2740e 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -52,15 +52,18 @@  void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
 void tipc_group_filter_msg(struct tipc_group *grp,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq);
-void tipc_group_member_evt(struct tipc_group *grp,
-			   struct sk_buff *skb,
+void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
+			   int *sk_rcvbuf, struct sk_buff *skb,
 			   struct sk_buff_head *inputq,
 			   struct sk_buff_head *xmitq);
-void tipc_group_proto_rcv(struct tipc_group *grp,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
 			  struct tipc_msg *hdr,
 			  struct sk_buff_head *inputq,
 			  struct sk_buff_head *xmitq);
-void tipc_group_update_bc_members(struct tipc_group *grp);
+void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+bool tipc_group_bc_cong(struct tipc_group *grp, int len);
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+			       u32 port, struct sk_buff_head *xmitq);
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
 int tipc_group_size(struct tipc_group *grp);
 #endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 3384ee6..820720c 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -538,6 +538,7 @@  static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
  */
 #define GRP_JOIN_MSG         0
 #define GRP_LEAVE_MSG        1
+#define GRP_ADV_MSG          2
 
 /*
  * Word 1
@@ -790,12 +791,12 @@  static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 16, 0xffff, n);
 }
 
-static inline u32 msg_adv_win(struct tipc_msg *m)
+static inline u16 msg_adv_win(struct tipc_msg *m)
 {
 	return msg_bits(m, 9, 0, 0xffff);
 }
 
-static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
+static inline void msg_set_adv_win(struct tipc_msg *m, u16 n)
 {
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ed0907c..d2f83fc9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -196,6 +196,11 @@  static bool tsk_conn_cong(struct tipc_sock *tsk)
 	return tsk->snt_unacked > tsk->snd_win;
 }
 
+static u16 tsk_blocks(int len)
+{
+	return ((len / FLOWCTL_BLK_SZ) + 1);
+}
+
 /* tsk_blocks(): translate a buffer size in bytes to number of
  * advertisable blocks, taking into account the ratio truesize(len)/len
  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -828,20 +833,22 @@  static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	struct tipc_nlist *dsts = tipc_group_dests(grp);
 	struct tipc_mc_method *method = &tsk->mc_method;
 	struct sk_buff_head pkts;
+	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
 	int mtu = tipc_bcast_get_mtu(net);
 	int rc = -EHOSTUNREACH;
 
 	if (!dsts->local && !dsts->remote)
 		return -EHOSTUNREACH;
 
-	/* Block or return if any destination link is congested */
-	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt);
+	/* Block or return if any destination link or member is congested */
+	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt &&
+				!tipc_group_bc_cong(grp, blks));
 	if (unlikely(rc))
 		return rc;
 
 	/* Complete message header */
 	msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
-	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 	msg_set_destport(hdr, 0);
 	msg_set_destnode(hdr, 0);
 	msg_set_nameinst(hdr, 0);
@@ -859,9 +866,8 @@  static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
 	if (unlikely(rc))
 		return rc;
 
-	/* Update broadcast sequence number */
-	tipc_group_update_bc_members(tsk->group);
-
+	/* Update broadcast sequence number and send windows */
+	tipc_group_update_bc_members(tsk->group, blks);
 	return dlen;
 }
 
@@ -1019,7 +1025,7 @@  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
 		return -EMSGSIZE;
 
-	if (unlikely(grp))
+	if (unlikely(grp && !dest))
 		return tipc_send_group_bcast(sock, m, dlen, timeout);
 
 	if (unlikely(!dest)) {
@@ -1419,6 +1425,7 @@  static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct sk_buff *skb;
 	struct tipc_msg *hdr;
+	struct sk_buff_head xmitq;
 	bool connected = !tipc_sk_type_connectionless(sk);
 	bool grp_evt;
 	int rc, err, hlen, dlen, copy;
@@ -1435,8 +1442,8 @@  static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	}
 	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 
+	/* Step rcv queue to first msg with data or error; wait if necessary */
 	do {
-		/* Look at first msg in receive queue; wait if necessary */
 		rc = tipc_wait_for_rcvmsg(sock, &timeout);
 		if (unlikely(rc))
 			goto exit;
@@ -1484,12 +1491,21 @@  static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
 	if (unlikely(flags & MSG_PEEK))
 		goto exit;
 
+	/* Send group flow control advertisement when applicable */
+	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
+		skb_queue_head_init(&xmitq);
+		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
+					  msg_orignode(hdr), msg_origport(hdr),
+					  &xmitq);
+		tipc_node_distr_xmit(sock_net(sk), &xmitq);
+	}
+
 	tsk_advance_rx_queue(sk);
 
 	if (likely(!connected))
 		goto exit;
 
-	/* Send connection flow control ack when applicable */
+	/* Send connection flow control advertisement when applicable */
 	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
 	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
 		tipc_sk_send_ack(tsk);
@@ -1649,6 +1665,7 @@  static void tipc_sk_proto_rcv(struct sock *sk,
 	struct sk_buff *skb = __skb_dequeue(inputq);
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct tipc_group *grp = tsk->group;
+	bool wakeup = false;
 
 	switch (msg_user(hdr)) {
 	case CONN_MANAGER:
@@ -1657,19 +1674,23 @@  static void tipc_sk_proto_rcv(struct sock *sk,
 	case SOCK_WAKEUP:
 		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
 		tsk->cong_link_cnt--;
-		sk->sk_write_space(sk);
+		wakeup = true;
 		break;
 	case GROUP_PROTOCOL:
-		tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
+		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
 		break;
 	case TOP_SRV:
-		tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
+		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
+				      skb, inputq, xmitq);
 		skb = NULL;
 		break;
 	default:
 		break;
 	}
 
+	if (wakeup)
+		sk->sk_write_space(sk);
+
 	kfree_skb(skb);
 }
 
@@ -1784,6 +1805,9 @@  static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = buf_msg(skb);
 
+	if (unlikely(msg_in_group(hdr)))
+		return sk->sk_rcvbuf;
+
 	if (unlikely(!msg_connected(hdr)))
 		return sk->sk_rcvbuf << msg_importance(hdr);