diff mbox

[net-next,12/13] tipc: introduce node contact FSM

Message ID 1437080071-6686-13-git-send-email-jon.maloy@ericsson.com
State Accepted, archived
Delegated to: David Miller
Headers show

Commit Message

Jon Maloy July 16, 2015, 8:54 p.m. UTC
The logics for determining when a node is permitted to establish
and maintain contact with its peer node becomes non-trivial in the
presence of multiple parallel links that may come and go independently.

A known failure scenario is that one endpoint registers both its links
to the peer lost, cleans up it binding table, and prepares for a table
update once contact is re-establihed, while the other endpoint may
see its links reset and re-established one by one, hence seeing
no need to re-synchronize the binding table. To avoid this, a node
must not allow re-establishing contact until it has confirmation that
even the peer has lost both links.

Currently, the mechanism for handling this consists of setting and
resetting two state flags from different locations in the code. This
solution is hard to understand and maintain. A closer analysis even
reveals that it is not completely safe.

In this commit we do instead introduce an FSM that keeps track of
the conditions for when the node can establish and maintain links.
It has six states and four events, and is strictly based on explicit
knowledge about the own node's and the peer node's contact states.
Only events leading to state change are shown as edges in the figure
below.

                             +--------------+
                             | SELF_UP/     |
           +---------------->| PEER_COMING  |-----------------+
    SELF_  |                 +--------------+                 |PEER_
    ESTBL_ |                        |                         |ESTBL_
    CONTACT|      SELF_LOST_CONTACT |                         |CONTACT
           |                        v                         |
           |                 +--------------+                 |
           |      PEER_      | SELF_DOWN/   |     SELF_       |
           |      LOST_   +--| PEER_LEAVING |<--+ LOST_       v
+-------------+   CONTACT |  +--------------+   | CONTACT  +-----------+
| SELF_DOWN/  |<----------+                     +----------| SELF_UP/  |
| PEER_DOWN   |<----------+                     +----------| PEER_UP   |
+-------------+   SELF_   |  +--------------+   | PEER_    +-----------+
           |      LOST_   +--| SELF_LEAVING/|<--+ LOST_       A
           |      CONTACT    | PEER_DOWN    |     CONTACT     |
           |                 +--------------+                 |
           |                         A                        |
    PEER_  |       PEER_LOST_CONTACT |                        |SELF_
    ESTBL_ |                         |                        |ESTBL_
    CONTACT|                 +--------------+                 |CONTACT
           +---------------->| PEER_UP/     |-----------------+
                             | SELF_COMING  |
                             +--------------+

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/link.c |  74 ++++++++++++++------------------
 net/tipc/msg.h  |   7 +++
 net/tipc/node.c | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++---
 net/tipc/node.h |  28 +++++++++---
 4 files changed, 185 insertions(+), 54 deletions(-)
diff mbox

Patch

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 5b4609b..eaccf45 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -911,9 +911,13 @@  static void link_retransmit_failure(struct tipc_link *l_ptr,
 
 	if (l_ptr->addr) {
 		/* Handle failure on standard link */
-		link_print(l_ptr, "Resetting link\n");
+		link_print(l_ptr, "Resetting link ");
+		pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
+			msg_user(msg), msg_type(msg), msg_size(msg),
+			msg_errcode(msg));
+		pr_info("sqno %u, prev: %x, src: %x\n",
+			msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
 		tipc_link_reset(l_ptr);
-
 	} else {
 		/* Handle failure on broadcast link */
 		struct tipc_node *n_ptr;
@@ -1067,15 +1071,8 @@  void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 		if (unlikely(!l_ptr))
 			goto unlock;
 
-		/* Verify that communication with node is currently allowed */
-		if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
-		    msg_user(msg) == LINK_PROTOCOL &&
-		    (msg_type(msg) == RESET_MSG ||
-		    msg_type(msg) == ACTIVATE_MSG) &&
-		    !msg_redundant_link(msg))
-			n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
-
-		if (tipc_node_blocked(n_ptr))
+		/* Is reception of this pkt permitted at the moment ? */
+		if (!tipc_node_filter_skb(n_ptr, msg))
 			goto unlock;
 
 		/* Validate message sequence number info */
@@ -1371,15 +1368,6 @@  static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
 			if (less_eq(msg_session(msg), l_ptr->peer_session))
 				break; /* duplicate or old reset: ignore */
 		}
-
-		if (!msg_redundant_link(msg) && (link_working(l_ptr) ||
-						 link_probing(l_ptr))) {
-			/* peer has lost contact -- don't allow peer's links
-			 * to reactivate before we recognize loss & clean up
-			 */
-			l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
-		}
-
 		link_state_event(l_ptr, RESET_MSG);
 
 		/* fall thru' */
@@ -1408,6 +1396,8 @@  static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
 		l_ptr->peer_session = msg_session(msg);
 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
 
+		if (!msg_peer_is_up(msg))
+			tipc_node_fsm_evt(l_ptr->owner, PEER_LOST_CONTACT_EVT);
 		if (msg_type(msg) == ACTIVATE_MSG)
 			link_state_event(l_ptr, ACTIVATE_MSG);
 		break;
@@ -1419,11 +1409,11 @@  static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
 
 		if (msg_linkprio(msg) &&
 		    (msg_linkprio(msg) != l_ptr->priority)) {
-			pr_debug("%s<%s>, priority change %u->%u\n",
-				 link_rst_msg, l_ptr->name,
-				 l_ptr->priority, msg_linkprio(msg));
+			pr_info("%s<%s>, priority change %u->%u\n",
+				link_rst_msg, l_ptr->name,
+				l_ptr->priority, msg_linkprio(msg));
 			l_ptr->priority = msg_linkprio(msg);
-			tipc_link_reset(l_ptr); /* Enforce change to take effect */
+			tipc_link_reset(l_ptr);
 			break;
 		}
 
@@ -1446,15 +1436,18 @@  static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
 			tipc_bclink_update_link_state(l_ptr->owner,
 						      msg_last_bcast(msg));
 
-		if (rec_gap || (msg_probe(msg))) {
+		if (rec_gap || (msg_probe(msg)))
 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
 					     rec_gap, 0, 0);
-		}
+
 		if (msg_seq_gap(msg)) {
 			l_ptr->stats.recv_nacks++;
 			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
 					     msg_seq_gap(msg));
 		}
+		if (tipc_link_is_up(l_ptr))
+			tipc_node_fsm_evt(l_ptr->owner,
+					  PEER_ESTABL_CONTACT_EVT);
 		break;
 	}
 exit:
@@ -1478,10 +1471,6 @@  static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 	if (l->exec_mode == TIPC_LINK_BLOCKED)
 		return;
 
-	/* Abort non-RESET send if communication with node is prohibited */
-	if ((tipc_node_blocked(l->owner)) && (mtyp != RESET_MSG))
-		return;
-
 	msg_set_type(hdr, mtyp);
 	msg_set_net_plane(hdr, l->net_plane);
 	msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
@@ -1799,27 +1788,28 @@  static void link_reset_statistics(struct tipc_link *l_ptr)
 	l_ptr->stats.recv_info = l_ptr->rcv_nxt;
 }
 
-static void link_print(struct tipc_link *l_ptr, const char *str)
+static void link_print(struct tipc_link *l, const char *str)
 {
-	struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
+	struct sk_buff *hskb = skb_peek(&l->transmq);
+	u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt;
+	u16 tail = l->snd_nxt - 1;
 
-	rcu_read_lock();
-	b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
-	if (b_ptr)
-		pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
-	rcu_read_unlock();
+	pr_info("%s Link <%s>:", str, l->name);
 
-	if (link_probing(l_ptr))
+	if (link_probing(l))
 		pr_cont(":P\n");
-	else if (link_establishing(l_ptr))
+	else if (link_establishing(l))
 		pr_cont(":E\n");
-	else if (link_resetting(l_ptr))
+	else if (link_resetting(l))
 		pr_cont(":R\n");
-	else if (link_working(l_ptr))
+	else if (link_working(l))
 		pr_cont(":W\n");
 	else
 		pr_cont("\n");
+
+	pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n",
+		skb_queue_len(&l->transmq), head, tail,
+		skb_queue_len(&l->backlogq), l->snd_nxt, l->rcv_nxt);
 }
 
 /* Parse and validate nested (link) properties valid for media, bearer and link
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 19c45fb..4dc66d9 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -766,6 +766,13 @@  static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
+static inline bool msg_peer_is_up(struct tipc_msg *m)
+{
+	if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG))
+		return true;
+	return msg_redundant_link(m);
+}
+
 struct sk_buff *tipc_buf_acquire(u32 size);
 bool tipc_msg_validate(struct sk_buff *skb);
 bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 77effb2..9dbbb5d 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -141,7 +141,7 @@  struct tipc_node *tipc_node_create(struct net *net, u32 addr)
 			break;
 	}
 	list_add_tail_rcu(&n_ptr->list, &temp_node->list);
-	n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;
+	n_ptr->state = SELF_DOWN_PEER_DOWN;
 	n_ptr->signature = INVALID_NODE_SIG;
 	n_ptr->active_links[0] = INVALID_BEARER_ID;
 	n_ptr->active_links[1] = INVALID_BEARER_ID;
@@ -421,8 +421,131 @@  void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 	}
 }
 
+/* tipc_node_fsm_evt - node finite state machine
+ * Determines when contact is allowed with peer node
+ */
+void tipc_node_fsm_evt(struct tipc_node *n, int evt)
+{
+	int state = n->state;
+
+	switch (state) {
+	case SELF_DOWN_PEER_DOWN:
+		switch (evt) {
+		case SELF_ESTABL_CONTACT_EVT:
+			state = SELF_UP_PEER_COMING;
+			break;
+		case PEER_ESTABL_CONTACT_EVT:
+			state = SELF_COMING_PEER_UP;
+			break;
+		case SELF_LOST_CONTACT_EVT:
+		case PEER_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_UP_PEER_UP:
+		switch (evt) {
+		case SELF_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_LEAVING;
+			break;
+		case PEER_LOST_CONTACT_EVT:
+			state = SELF_LEAVING_PEER_DOWN;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_DOWN_PEER_LEAVING:
+		switch (evt) {
+		case PEER_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_DOWN;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+		case SELF_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_UP_PEER_COMING:
+		switch (evt) {
+		case PEER_ESTABL_CONTACT_EVT:
+			state = SELF_UP_PEER_UP;
+			break;
+		case SELF_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_LEAVING;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_COMING_PEER_UP:
+		switch (evt) {
+		case SELF_ESTABL_CONTACT_EVT:
+			state = SELF_UP_PEER_UP;
+			break;
+		case PEER_LOST_CONTACT_EVT:
+			state = SELF_LEAVING_PEER_DOWN;
+			break;
+		case SELF_LOST_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_LEAVING_PEER_DOWN:
+		switch (evt) {
+		case SELF_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_DOWN;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+		case PEER_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	default:
+		pr_err("Unknown node fsm state %x\n", state);
+		break;
+	}
+
+	n->state = state;
+}
+
+bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr)
+{
+	int state = n->state;
+
+	if (likely(state == SELF_UP_PEER_UP))
+		return true;
+	if (state == SELF_DOWN_PEER_DOWN)
+		return true;
+	if (state == SELF_UP_PEER_COMING)
+		return true;
+	if (state == SELF_COMING_PEER_UP)
+		return true;
+	if (state == SELF_LEAVING_PEER_DOWN)
+		return false;
+	if (state == SELF_DOWN_PEER_LEAVING)
+		if (!msg_peer_is_up(hdr))
+			return true;
+	return false;
+}
+
 static void node_established_contact(struct tipc_node *n_ptr)
 {
+	tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
 	n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
 	n_ptr->bclink.oos_state = 0;
 	n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
@@ -468,11 +591,8 @@  static void node_lost_contact(struct tipc_node *n_ptr)
 		l_ptr->failover_skb = NULL;
 		tipc_link_reset_fragments(l_ptr);
 	}
-
-	n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN;
-
 	/* Prevent re-contact with node until cleanup is done */
-	n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN;
+	tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
 
 	/* Notify publications from this node */
 	n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 2d56344..270256e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,6 +47,24 @@ 
 
 #define INVALID_BEARER_ID -1
 
+/* Node FSM states and events:
+ */
+enum {
+	SELF_DOWN_PEER_DOWN    = 0xdd,
+	SELF_UP_PEER_UP        = 0xaa,
+	SELF_DOWN_PEER_LEAVING = 0xd1,
+	SELF_UP_PEER_COMING    = 0xac,
+	SELF_COMING_PEER_UP    = 0xca,
+	SELF_LEAVING_PEER_DOWN = 0x1d,
+};
+
+enum {
+	SELF_ESTABL_CONTACT_EVT = 0xec,
+	SELF_LOST_CONTACT_EVT   = 0x1c,
+	PEER_ESTABL_CONTACT_EVT = 0xfec,
+	PEER_LOST_CONTACT_EVT   = 0xf1c
+};
+
 /* Flags used to take different actions according to flag type
  * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
  * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
@@ -56,8 +74,6 @@ 
  */
 enum {
 	TIPC_MSG_EVT                    = 1,
-	TIPC_WAIT_PEER_LINKS_DOWN	= (1 << 1),
-	TIPC_WAIT_OWN_LINKS_DOWN	= (1 << 2),
 	TIPC_NOTIFY_NODE_DOWN		= (1 << 3),
 	TIPC_NOTIFY_NODE_UP		= (1 << 4),
 	TIPC_WAKEUP_BCAST_USERS		= (1 << 5),
@@ -133,6 +149,7 @@  struct tipc_node {
 	int action_flags;
 	struct tipc_node_bclink bclink;
 	struct list_head list;
+	int state;
 	int link_cnt;
 	u16 working_links;
 	u16 capabilities;
@@ -176,11 +193,8 @@  static inline void tipc_node_lock(struct tipc_node *node)
 	spin_lock_bh(&node->lock);
 }
 
-static inline bool tipc_node_blocked(struct tipc_node *node)
-{
-	return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN |
-		TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
-}
+void tipc_node_fsm_evt(struct tipc_node *n, int evt);
+bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr);
 
 static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
 {