diff mbox series

[net-next,3/5] tipc: add trace_events for tipc socket

Message ID 20181219021800.21662-4-tuong.t.lien@dektech.com.au
State Accepted, archived
Delegated to: David Miller
Headers show
Series tipc: tracepoints and trace_events in TIPC | expand

Commit Message

Tuong Lien Dec. 19, 2018, 2:17 a.m. UTC
The commit adds the new trace_events for TIPC socket object:

trace_tipc_sk_create()
trace_tipc_sk_poll()
trace_tipc_sk_sendmsg()
trace_tipc_sk_sendmcast()
trace_tipc_sk_sendstream()
trace_tipc_sk_filter_rcv()
trace_tipc_sk_advance_rx()
trace_tipc_sk_rej_msg()
trace_tipc_sk_drop_msg()
trace_tipc_sk_release()
trace_tipc_sk_shutdown()
trace_tipc_sk_overlimit1()
trace_tipc_sk_overlimit2()

Also, enables the traces for the following cases:
- When user creates a TIPC socket;
- When user calls poll() on TIPC socket;
- When user sends a dgram/mcast/stream message.
- When a message is put into the socket 'sk_receive_queue';
- When a message is released from the socket 'sk_receive_queue';
- When a message is rejected (e.g. due to no port, invalid, etc.);
- When a message is dropped (e.g. due to wrong message type);
- When socket is released;
- When socket is shutdown;
- When socket rcvq's allocation is overlimit (> 90%);
- When socket rcvq + bklq's allocation is overlimit (> 90%);
- When the 'TIPC_ERR_OVERLOAD/2' issue happens;

Note:
a) All the socket traces are designed to be able to trace on a specific
socket by either using the 'event filtering' feature on a known socket
'portid' value or the sysctl file:

/proc/sys/net/tipc/sk_filter

The file determines a 'tuple' for what socket should be traced:

(portid, sock type, name type, name lower, name upper)

where:
+ 'portid' is the socket portid generated at socket creating, can be
found in the trace outputs or the 'tipc socket list' command printouts;
+ 'sock type' is the socket type (1 = SOCK_TREAM, ...);
+ 'name type', 'name lower' and 'name upper' are the service name being
connected to or published by the socket.

Value '0' means 'ANY', the default tuple value is (0, 0, 0, 0, 0) i.e.
the traces happen for every sockets with no filter.

b) The 'tipc_sk_overlimit1/2' event is also a conditional trace_event
which happens when the socket receive queue (and backlog queue) is
about to be overloaded, when the queue allocation is > 90%. Then, when
the trace is enabled, the last skbs leading to the TIPC_ERR_OVERLOAD/2
issue can be traced.

The trace event is designed as an 'upper watermark' notification that
the other traces (e.g. 'tipc_sk_advance_rx' vs 'tipc_sk_filter_rcv') or
actions can be triggerred in the meanwhile to see what is going on with
the socket queue.

In addition, the 'trace_tipc_sk_dump()' is also placed at the
'TIPC_ERR_OVERLOAD/2' case, so the socket and last skb can be dumped
for post-analysis.

Acked-by: Ying Xue <ying.xue@windriver.com>
Tested-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
---
 net/tipc/socket.c | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 net/tipc/socket.h |   2 +
 net/tipc/sysctl.c |   8 ++++
 net/tipc/trace.c  |   6 +++
 net/tipc/trace.h  |  32 +++++++++++--
 5 files changed, 176 insertions(+), 9 deletions(-)
diff mbox series

Patch

diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index b6b2a94eb54e..291d6bbe85f4 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -234,6 +234,7 @@  static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
  */
 static void tsk_advance_rx_queue(struct sock *sk)
 {
+	trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
@@ -248,6 +249,7 @@  static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
 	if (!tipc_msg_reverse(onode, &skb, err))
 		return;
 
+	trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
 	dnode = msg_destnode(buf_msg(skb));
 	selector = msg_origport(buf_msg(skb));
 	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
@@ -483,6 +485,7 @@  static int tipc_sk_create(struct net *net, struct socket *sock,
 			tsk_set_unreliable(tsk, true);
 	}
 
+	trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
 	return 0;
 }
 
@@ -572,6 +575,7 @@  static int tipc_release(struct socket *sock)
 	tsk = tipc_sk(sk);
 	lock_sock(sk);
 
+	trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
 	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 	sk->sk_shutdown = SHUTDOWN_MASK;
 	tipc_sk_leave(tsk);
@@ -719,6 +723,7 @@  static __poll_t tipc_poll(struct file *file, struct socket *sock,
 	__poll_t revents = 0;
 
 	sock_poll_wait(file, sock, wait);
+	trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
 
 	if (sk->sk_shutdown & RCV_SHUTDOWN)
 		revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
@@ -805,9 +810,12 @@  static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
 	/* Send message if build was successful */
-	if (unlikely(rc == dlen))
+	if (unlikely(rc == dlen)) {
+		trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
+					TIPC_DUMP_SK_SNDQ, " ");
 		rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
 				     &tsk->cong_link_cnt);
+	}
 
 	tipc_nlist_purge(&dsts);
 
@@ -1209,8 +1217,10 @@  static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
 	bool conn_cong;
 
 	/* Ignore if connection cannot be validated: */
-	if (!tsk_peer_msg(tsk, hdr))
+	if (!tsk_peer_msg(tsk, hdr)) {
+		trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
 		goto exit;
+	}
 
 	if (unlikely(msg_errcode(hdr))) {
 		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
@@ -1378,6 +1388,7 @@  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue)))
 		return -ENOMEM;
 
+	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
 	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 	if (unlikely(rc == -ELINKCONG)) {
 		tipc_dest_push(clinks, dnode, 0);
@@ -1455,6 +1466,8 @@  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 		if (unlikely(rc != send))
 			break;
 
+		trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+					 TIPC_DUMP_SK_SNDQ, " ");
 		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 		if (unlikely(rc == -ELINKCONG)) {
 			tsk->cong_link_cnt = 1;
@@ -2129,6 +2142,7 @@  static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 	struct sk_buff_head inputq;
 	int limit, err = TIPC_OK;
 
+	trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
 	TIPC_SKB_CB(skb)->bytes_read = 0;
 	__skb_queue_head_init(&inputq);
 	__skb_queue_tail(&inputq, skb);
@@ -2148,17 +2162,25 @@  static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 		    (!grp && msg_in_group(hdr)))
 			err = TIPC_ERR_NO_PORT;
 		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+			trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
+					   "err_overload2!");
 			atomic_inc(&sk->sk_drops);
 			err = TIPC_ERR_OVERLOAD;
 		}
 
 		if (unlikely(err)) {
-			tipc_skb_reject(net, err, skb, xmitq);
+			if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
+				trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
+						      "@filter_rcv!");
+				__skb_queue_tail(xmitq, skb);
+			}
 			err = TIPC_OK;
 			continue;
 		}
 		__skb_queue_tail(&sk->sk_receive_queue, skb);
 		skb_set_owner_r(skb, sk);
+		trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
+					 "rcvq >90% allocated!");
 		sk->sk_data_ready(sk);
 	}
 }
@@ -2224,14 +2246,21 @@  static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
 		if (!sk->sk_backlog.len)
 			atomic_set(dcnt, 0);
 		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
-		if (likely(!sk_add_backlog(sk, skb, lim)))
+		if (likely(!sk_add_backlog(sk, skb, lim))) {
+			trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
+						 "bklg & rcvq >90% allocated!");
 			continue;
+		}
 
+		trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
 		/* Overload => reject message back to sender */
 		onode = tipc_own_addr(sock_net(sk));
 		atomic_inc(&sk->sk_drops);
-		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
+			trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
+					      "@sk_enqueue!");
 			__skb_queue_tail(xmitq, skb);
+		}
 		break;
 	}
 }
@@ -2280,6 +2309,8 @@  void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 		/* Prepare for message rejection */
 		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
 			continue;
+
+		trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
 xmit:
 		dnode = msg_destnode(buf_msg(skb));
 		tipc_node_xmit_skb(net, skb, dnode, dport);
@@ -2553,6 +2584,7 @@  static int tipc_shutdown(struct socket *sock, int how)
 
 	lock_sock(sk);
 
+	trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
 	__tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
 	sk->sk_shutdown = SEND_SHUTDOWN;
 
@@ -3566,12 +3598,107 @@  int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
 	return skb->len;
 }
 
+/**
+ * tipc_sk_filtering - check if a socket should be traced
+ * @sk: the socket to be examined
+ * @sysctl_tipc_sk_filter[]: the socket tuple for filtering,
+ *  (portid, sock type, name type, name lower, name upper)
+ *
+ * Returns true if the socket meets the socket tuple data
+ * (value 0 = 'any') or when there is no tuple set (all = 0),
+ * otherwise false
+ */
+bool tipc_sk_filtering(struct sock *sk)
+{
+	struct tipc_sock *tsk;
+	struct publication *p;
+	u32 _port, _sktype, _type, _lower, _upper;
+	u32 type = 0, lower = 0, upper = 0;
+
+	if (!sk)
+		return true;
+
+	tsk = tipc_sk(sk);
+
+	_port = sysctl_tipc_sk_filter[0];
+	_sktype = sysctl_tipc_sk_filter[1];
+	_type = sysctl_tipc_sk_filter[2];
+	_lower = sysctl_tipc_sk_filter[3];
+	_upper = sysctl_tipc_sk_filter[4];
+
+	if (!_port && !_sktype && !_type && !_lower && !_upper)
+		return true;
+
+	if (_port)
+		return (_port == tsk->portid);
+
+	if (_sktype && _sktype != sk->sk_type)
+		return false;
+
+	if (tsk->published) {
+		p = list_first_entry_or_null(&tsk->publications,
+					     struct publication, binding_sock);
+		if (p) {
+			type = p->type;
+			lower = p->lower;
+			upper = p->upper;
+		}
+	}
+
+	if (!tipc_sk_type_connectionless(sk)) {
+		type = tsk->conn_type;
+		lower = tsk->conn_instance;
+		upper = tsk->conn_instance;
+	}
+
+	if ((_type && _type != type) || (_lower && _lower != lower) ||
+	    (_upper && _upper != upper))
+		return false;
+
+	return true;
+}
+
 u32 tipc_sock_get_portid(struct sock *sk)
 {
 	return (sk) ? (tipc_sk(sk))->portid : 0;
 }
 
 /**
+ * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
+ *			both the rcv and backlog queues are considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
+{
+	atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+	unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+	unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
+
+	return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
+ *			only the rcv queue is considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
+{
+	unsigned int lim = rcvbuf_limit(sk, skb);
+	unsigned int qsize = sk_rmem_alloc_get(sk);
+
+	return (qsize > lim * 90 / 100);
+}
+
+/**
  * tipc_sk_dump - dump TIPC socket
  * @sk: tipc sk to be dumped
  * @dqueues: bitmask to decide if any socket queue to be dumped?
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 07e36545b696..235b9679acee 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -72,5 +72,7 @@  int tipc_dump_start(struct netlink_callback *cb);
 int __tipc_dump_start(struct netlink_callback *cb, struct net *net);
 int tipc_dump_done(struct netlink_callback *cb);
 u32 tipc_sock_get_portid(struct sock *sk);
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb);
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb);
 
 #endif
diff --git a/net/tipc/sysctl.c b/net/tipc/sysctl.c
index 1a779b1e8510..3481e4906bd6 100644
--- a/net/tipc/sysctl.c
+++ b/net/tipc/sysctl.c
@@ -34,6 +34,7 @@ 
  */
 
 #include "core.h"
+#include "trace.h"
 
 #include <linux/sysctl.h>
 
@@ -54,6 +55,13 @@  static struct ctl_table tipc_table[] = {
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec,
 	},
+	{
+		.procname       = "sk_filter",
+		.data           = &sysctl_tipc_sk_filter,
+		.maxlen         = sizeof(sysctl_tipc_sk_filter),
+		.mode           = 0644,
+		.proc_handler   = proc_doulongvec_minmax,
+	},
 	{}
 };
 
diff --git a/net/tipc/trace.c b/net/tipc/trace.c
index 846196f0e810..964823841efe 100644
--- a/net/tipc/trace.c
+++ b/net/tipc/trace.c
@@ -37,6 +37,12 @@ 
 #include "trace.h"
 
 /**
+ * socket tuples for filtering in socket traces:
+ * (portid, sock type, name type, name lower, name upper)
+ */
+unsigned long sysctl_tipc_sk_filter[5] __read_mostly = {0, };
+
+/**
  * tipc_skb_dump - dump TIPC skb data
  * @skb: skb to be dumped
  * @more: dump more?
diff --git a/net/tipc/trace.h b/net/tipc/trace.h
index 535c8958651f..ebbfcd14627e 100644
--- a/net/tipc/trace.h
+++ b/net/tipc/trace.h
@@ -113,11 +113,14 @@  enum {
 			{(0xcbe),	"SYNCH_BEGIN_EVT"		},\
 			{(0xcee),	"SYNCH_END_EVT"			})
 
+extern unsigned long sysctl_tipc_sk_filter[5] __read_mostly;
+
 int tipc_skb_dump(struct sk_buff *skb, bool more, char *buf);
 int tipc_list_dump(struct sk_buff_head *list, bool more, char *buf);
 int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf);
 int tipc_link_dump(struct tipc_link *l, u16 dqueues, char *buf);
 int tipc_node_dump(struct tipc_node *n, bool more, char *buf);
+bool tipc_sk_filtering(struct sock *sk);
 
 DECLARE_EVENT_CLASS(tipc_skb_class,
 
@@ -199,12 +202,33 @@  DECLARE_EVENT_CLASS(tipc_sk_class,
 		  __get_str(skb_buf), __get_str(buf))
 );
 
-#define DEFINE_SK_EVENT(name) \
-DEFINE_EVENT(tipc_sk_class, name, \
+#define DEFINE_SK_EVENT_FILTER(name) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
+	TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
+		 const char *header), \
+	TP_ARGS(sk, skb, dqueues, header), \
+	TP_CONDITION(tipc_sk_filtering(sk)))
+DEFINE_SK_EVENT_FILTER(tipc_sk_dump);
+DEFINE_SK_EVENT_FILTER(tipc_sk_create);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendmcast);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendmsg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendstream);
+DEFINE_SK_EVENT_FILTER(tipc_sk_poll);
+DEFINE_SK_EVENT_FILTER(tipc_sk_filter_rcv);
+DEFINE_SK_EVENT_FILTER(tipc_sk_advance_rx);
+DEFINE_SK_EVENT_FILTER(tipc_sk_rej_msg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_drop_msg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_release);
+DEFINE_SK_EVENT_FILTER(tipc_sk_shutdown);
+
+#define DEFINE_SK_EVENT_FILTER_COND(name, cond) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
 	TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
 		 const char *header), \
-	TP_ARGS(sk, skb, dqueues, header))
-DEFINE_SK_EVENT(tipc_sk_dump);
+	TP_ARGS(sk, skb, dqueues, header), \
+	TP_CONDITION(tipc_sk_filtering(sk) && (cond)))
+DEFINE_SK_EVENT_FILTER_COND(tipc_sk_overlimit1, tipc_sk_overlimit1(sk, skb));
+DEFINE_SK_EVENT_FILTER_COND(tipc_sk_overlimit2, tipc_sk_overlimit2(sk, skb));
 
 DECLARE_EVENT_CLASS(tipc_link_class,