From patchwork Thu Feb 15 09:40:42 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873716 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="g+/TIED9"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrrf2wGcz9t2f for ; Thu, 15 Feb 2018 20:41:38 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755223AbeBOJlf (ORCPT ); Thu, 15 Feb 2018 04:41:35 -0500 Received: from sessmg23.ericsson.net ([193.180.251.45]:47314 "EHLO sessmg23.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755159AbeBOJld (ORCPT ); Thu, 15 Feb 2018 04:41:33 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687691; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=OirXz97IGS0VaZtqC1aIMeBpB3xR4qyh/tJAXy1tIa8=; b=g+/TIED909unyK6sECwVBse62Bhl2kuLScU5tO4E5+zDoAjSrqbaCnEK2PQO6Ph/ EEdGaNa6HSYRF/AzX2JuzXVB4YOS7aiWLJO8hb96H+iI/8seKaKIicA2LQ3NClDD 3J8/Jn9ZxpkdBSAkPQeC/7YhFcpsAZhbNncmcqh//CM=; X-AuditID: c1b4fb2d-4b1ff70000005540-02-5a8555cb54b1 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sessmg23.ericsson.net (Symantec Mail Security) with SMTP id 3D.E8.21824.BC5558A5; Thu, 15 Feb 2018 10:41:31 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:41:30 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 01/10] tipc: remove redundant code in topology server Date: Thu, 15 Feb 2018 10:40:42 +0100 Message-ID: <1518687651-26561-2-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrHLMWRmVeSWpSXmKPExsUyM2K7se7p0NYog9+fZSxuNPQwW8w538Ji 8fbVLHaLYwvELLacz7K40n6W3eLx9evMDuweW1beZPJ4d4XNY/eCz0wenzfJeazfspUpgDWK yyYlNSezLLVI3y6BK2Pl4z2MBeu1K97f+cXawPhFqYuRk0NCwETiecdz9i5GLg4hgcOMEvv/ TmWDcLYzSkz/epwRpIpNQEPi5bQOMFtEwFji1cpOJpAiZoFPjBItK5+xgSSEBTwk1r3+yQ5i swioStze+JYZxOYVcJPYs2M9I8Q6OYnzx3+CxTkF3CVWtfxjBbGFgGpeL3nDBFEvKHFy5hMW EJtZQELi4IsXzBA1yhJzP0xjgpijIPFtZjfTBEaBWUhaZiFpWcDItIpRtDi1uDg33chYL7Uo M7m4OD9PLy+1ZBMjMJQPbvmtu4Nx9WvHQ4wCHIxKPLycIa1RQqyJZcWVuYcYJTiYlUR4b9sB hXhTEiurUovy44tKc1KLDzFKc7AoifOe9OSNEhJITyxJzU5NLUgtgskycXBKNTBadlmK/Of4 f27pHLd664krz7Oc/LRRXDVtbhrT/nuzSpRYJbZ4R4e69X83Llz2cubyyR+Od05+c+bz3ns5 61SaEhq3Ga4JZj/jljCHuSJMZvu6WsHMaa1e5tLzJt08X7Uw5uf9jHwmTyHW9aVfZ8Qz93BU nA3jsF6cJDyXY8V52/MN/6caZfAqsRRnJBpqMRcVJwIAj3yPDGECAAA= Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org The socket handling in the topology server is unnecessarily generic. It is prepared to handle both SOCK_RDM, SOCK_DGRAM and SOCK_STREAM type sockets, as well as the only socket type which is really used, SOCK_SEQPACKET. We now remove this redundant code to make the code more readable. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/server.c | 36 +++++++----------------------------- net/tipc/server.h | 4 +--- net/tipc/subscr.c | 4 +--- 3 files changed, 9 insertions(+), 35 deletions(-) diff --git a/net/tipc/server.c b/net/tipc/server.c index df0c563..04a6dd9 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -82,7 +82,6 @@ struct tipc_conn { struct outqueue_entry { struct list_head list; struct kvec iov; - struct sockaddr_tipc dest; }; static void tipc_recv_work(struct work_struct *work); @@ -93,7 +92,6 @@ static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_server *s = con->server; - struct sockaddr_tipc *saddr = s->saddr; struct socket *sock = con->sock; struct sock *sk; @@ -103,8 +101,6 @@ static void tipc_conn_kref_release(struct kref *kref) __module_get(sock->ops->owner); __module_get(sk->sk_prot_creator->owner); } - saddr->scope = -TIPC_NODE_SCOPE; - kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); sock_release(sock); con->sock = NULL; } @@ -325,36 +321,24 @@ static struct socket *tipc_create_listen_sock(struct tipc_conn *con) { struct tipc_server *s = con->server; struct socket *sock = NULL; + int imp = TIPC_CRITICAL_IMPORTANCE; int ret; ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock); if (ret < 0) return NULL; ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, - (char *)&s->imp, sizeof(s->imp)); + (char *)&imp, sizeof(imp)); if (ret < 0) goto create_err; ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); if (ret < 0) goto create_err; - switch (s->type) { - case SOCK_STREAM: - case SOCK_SEQPACKET: - con->rx_action = tipc_accept_from_sock; - - ret = kernel_listen(sock, 0); - if (ret < 0) - goto create_err; - break; - case SOCK_DGRAM: - case SOCK_RDM: - con->rx_action = tipc_receive_from_sock; - break; - default: - pr_err("Unknown socket type %d\n", s->type); + con->rx_action = tipc_accept_from_sock; + ret = kernel_listen(sock, 0); + if (ret < 0) goto create_err; - } /* As server's listening socket owner and creator is the same module, * we have to decrease TIPC module reference count to guarantee that @@ -444,7 +428,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con) } int tipc_conn_sendmsg(struct tipc_server *s, int conid, - struct sockaddr_tipc *addr, void *data, size_t len) + void *data, size_t len) { struct outqueue_entry *e; struct tipc_conn *con; @@ -464,9 +448,6 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, return -ENOMEM; } - if (addr) - memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); - spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); @@ -575,10 +556,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) if (con->sock) { memset(&msg, 0, sizeof(msg)); msg.msg_flags = MSG_DONTWAIT; - if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { - msg.msg_name = &e->dest; - msg.msg_namelen = sizeof(struct sockaddr_tipc); - } ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, e->iov.iov_len); if (ret == -EWOULDBLOCK || ret == 0) { @@ -591,6 +568,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) evt = e->iov.iov_base; tipc_send_kern_top_evt(s->net, evt); } + /* Don't starve users filling buffers */ if (++count >= MAX_SEND_MSG_COUNT) { cond_resched(); diff --git a/net/tipc/server.h b/net/tipc/server.h index 64df751..434736d 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -79,12 +79,10 @@ struct tipc_server { void *buf, size_t len); struct sockaddr_tipc *saddr; char name[TIPC_SERVER_NAME_LEN]; - int imp; - int type; }; int tipc_conn_sendmsg(struct tipc_server *s, int conid, - struct sockaddr_tipc *addr, void *data, size_t len); + void *data, size_t len); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 68e2647..eaef826 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -81,7 +81,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, subscriber->conid, NULL, + tipc_conn_sendmsg(tn->topsrv, subscriber->conid, msg_sect.iov_base, msg_sect.iov_len); } @@ -375,8 +375,6 @@ int tipc_topsrv_start(struct net *net) } topsrv->net = net; topsrv->saddr = saddr; - topsrv->imp = TIPC_CRITICAL_IMPORTANCE; - topsrv->type = SOCK_SEQPACKET; topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr); topsrv->tipc_conn_recvmsg = tipc_subscrb_rcv_cb; topsrv->tipc_conn_new = tipc_subscrb_connect_cb; From patchwork Thu Feb 15 09:40:43 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873717 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="LzD9m3w4"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrsB2D3sz9t2f for ; Thu, 15 Feb 2018 20:42:06 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755270AbeBOJmD (ORCPT ); Thu, 15 Feb 2018 04:42:03 -0500 Received: from sessmg22.ericsson.net ([193.180.251.58]:56065 "EHLO sessmg22.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755174AbeBOJmB (ORCPT ); Thu, 15 Feb 2018 04:42:01 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687719; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=MwnCzYprRGQEWthcQl5pQ/T/CZnG7+919o9cMkkr7sc=; b=LzD9m3w43WmAwFbcEsPMOAZEY8Q2S4/ygjIfNV4YQ/vHi3YdsFW5EpBQ3c87ThB6 WbjXmUpM5C5e4X0l+p/nNqs9bVLcuT92uhbzGgF/FFBb5FXPHxLILHuOKqwuHBqI X6mU+gTgzUBGQwx3Od3QGHLO/IyF0CrIbbOA97GnXbs=; X-AuditID: c1b4fb3a-35fff700000067b4-25-5a8555e7ce43 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sessmg22.ericsson.net (Symantec Mail Security) with SMTP id 71.5F.26548.7E5558A5; Thu, 15 Feb 2018 10:41:59 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:41:58 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 02/10] tipc: remove unnecessary function pointers Date: Thu, 15 Feb 2018 10:40:43 +0100 Message-ID: <1518687651-26561-3-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrHLMWRmVeSWpSXmKPExsUyM2K7se7z0NYogz0fzSxuNPQwW8w538Ji 8fbVLHaLYwvELLacz7K40n6W3eLx9evMDuweW1beZPJ4d4XNY/eCz0wenzfJeazfspUpgDWK yyYlNSezLLVI3y6BK+Ph7fPMBZesK/qWtrI2MG4x7GLk5JAQMJFYvriVsYuRi0NI4DCjRPez JlYIZzujxN/j29lBqtgENCReTutgBLFFBIwlXq3sZAIpYhb4xCjRsvIZG0hCWMBFov/HMxYQ m0VAVeLWxJ1gcV4BN4mrbftZIdbJSZw//pMZxOYUcJdY1fIPLC4EVPN6yRsmiHpBiZMzn4DN YRaQkDj44gUzRI2yxNwP05gg5ihIfJvZzTSBUWAWkpZZSFoWMDKtYhQtTi0uzk03MtJLLcpM Li7Oz9PLSy3ZxAgM5YNbflvtYDz43PEQowAHoxIP79uQ1igh1sSy4srcQ4wSHMxKIry37YBC vCmJlVWpRfnxRaU5qcWHGKU5WJTEeZ3SLKKEBNITS1KzU1MLUotgskwcnFINjLWN7nf8WN+/ MxMRUJv/Smxh8+XH238um75ea3u58yGbB1218YF7tJJOr9568XP7hoLuNQ8nLH67Vz7yr7P1 DTuRnUv2uu4TnRX27GWl2AGlROWtfzf6nBd/2SrEqJ2xWSN5OcfOtwtPGwiITbnAKWI643mw S5DZlCbmCL3Pn245nRGaGpx04JASS3FGoqEWc1FxIgBxVbOIYQIAAA== Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org Interaction between the functionality in server.c and subscr.c is done via function pointers installed in struct server. This makes the code harder to follow, and doesn't serve any obvious purpose. Here, we replace the function pointers with direct function calls. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/server.c | 21 ++++++++++----------- net/tipc/server.h | 5 ----- net/tipc/subscr.c | 27 ++++++--------------------- net/tipc/subscr.h | 4 ++++ 4 files changed, 20 insertions(+), 37 deletions(-) diff --git a/net/tipc/server.c b/net/tipc/server.c index 04a6dd9..8aa2a33 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -33,6 +33,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "subscr.h" #include "server.h" #include "core.h" #include "socket.h" @@ -182,7 +183,6 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) static void tipc_close_conn(struct tipc_conn *con) { - struct tipc_server *s = con->server; struct sock *sk = con->sock->sk; bool disconnect = false; @@ -191,7 +191,7 @@ static void tipc_close_conn(struct tipc_conn *con) if (disconnect) { sk->sk_user_data = NULL; if (con->conid) - s->tipc_conn_release(con->conid, con->usr_data); + tipc_subscrb_delete(con->usr_data); } write_unlock_bh(&sk->sk_callback_lock); @@ -240,7 +240,6 @@ static int tipc_receive_from_sock(struct tipc_conn *con) { struct tipc_server *s = con->server; struct sock *sk = con->sock->sk; - struct sockaddr_tipc addr; struct msghdr msg = {}; struct kvec iov; void *buf; @@ -254,7 +253,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con) iov.iov_base = buf; iov.iov_len = s->max_rcvbuf_size; - msg.msg_name = &addr; + msg.msg_name = NULL; iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); if (ret <= 0) { @@ -264,8 +263,8 @@ static int tipc_receive_from_sock(struct tipc_conn *con) read_lock_bh(&sk->sk_callback_lock); if (test_bit(CF_CONNECTED, &con->flags)) - ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, - &addr, con->usr_data, buf, ret); + ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid, + con->usr_data, buf, ret); read_unlock_bh(&sk->sk_callback_lock); kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) @@ -284,7 +283,6 @@ static int tipc_receive_from_sock(struct tipc_conn *con) static int tipc_accept_from_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; struct socket *sock = con->sock; struct socket *newsock; struct tipc_conn *newcon; @@ -305,7 +303,8 @@ static int tipc_accept_from_sock(struct tipc_conn *con) tipc_register_callbacks(newsock, newcon); /* Notify that new connection is incoming */ - newcon->usr_data = s->tipc_conn_new(newcon->conid); + newcon->usr_data = tipc_subscrb_create(newcon->conid); + if (!newcon->usr_data) { sock_release(newsock); conn_put(newcon); @@ -489,7 +488,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, *conid = con->conid; s = con->server; - scbr = s->tipc_conn_new(*conid); + scbr = tipc_subscrb_create(*conid); if (!scbr) { conn_put(con); return false; @@ -497,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, con->usr_data = scbr; con->sock = NULL; - s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub)); + tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); return true; } @@ -513,7 +512,7 @@ void tipc_topsrv_kern_unsubscr(struct net *net, int conid) test_and_clear_bit(CF_CONNECTED, &con->flags); srv = con->server; if (con->conid) - srv->tipc_conn_release(con->conid, con->usr_data); + tipc_subscrb_delete(con->usr_data); conn_put(con); conn_put(con); } diff --git a/net/tipc/server.h b/net/tipc/server.h index 434736d..b4b83bd 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -72,11 +72,6 @@ struct tipc_server { struct workqueue_struct *rcv_wq; struct workqueue_struct *send_wq; int max_rcvbuf_size; - void *(*tipc_conn_new)(int conid); - void (*tipc_conn_release)(int conid, void *usr_data); - int (*tipc_conn_recvmsg)(struct net *net, int conid, - struct sockaddr_tipc *addr, void *usr_data, - void *buf, size_t len); struct sockaddr_tipc *saddr; char name[TIPC_SERVER_NAME_LEN]; }; diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index eaef826..b86fbbf 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -220,7 +220,7 @@ static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber, spin_unlock_bh(&subscriber->lock); } -static struct tipc_subscriber *tipc_subscrb_create(int conid) +struct tipc_subscriber *tipc_subscrb_create(int conid) { struct tipc_subscriber *subscriber; @@ -237,7 +237,7 @@ static struct tipc_subscriber *tipc_subscrb_create(int conid) return subscriber; } -static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) +void tipc_subscrb_delete(struct tipc_subscriber *subscriber) { tipc_subscrb_subscrp_delete(subscriber, NULL); tipc_subscrb_put(subscriber); @@ -315,16 +315,10 @@ static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, return 0; } -/* Handle one termination request for the subscriber */ -static void tipc_subscrb_release_cb(int conid, void *usr_data) -{ - tipc_subscrb_delete((struct tipc_subscriber *)usr_data); -} - -/* Handle one request to create a new subscription for the subscriber */ -static int tipc_subscrb_rcv_cb(struct net *net, int conid, - struct sockaddr_tipc *addr, void *usr_data, - void *buf, size_t len) +/* Handle one request to create a new subscription for the subscriber + */ +int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, + void *buf, size_t len) { struct tipc_subscriber *subscriber = usr_data; struct tipc_subscr *s = (struct tipc_subscr *)buf; @@ -345,12 +339,6 @@ static int tipc_subscrb_rcv_cb(struct net *net, int conid, return tipc_subscrp_subscribe(net, s, subscriber, swap, status); } -/* Handle one request to establish a new subscriber */ -static void *tipc_subscrb_connect_cb(int conid) -{ - return (void *)tipc_subscrb_create(conid); -} - int tipc_topsrv_start(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); @@ -376,9 +364,6 @@ int tipc_topsrv_start(struct net *net) topsrv->net = net; topsrv->saddr = saddr; topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr); - topsrv->tipc_conn_recvmsg = tipc_subscrb_rcv_cb; - topsrv->tipc_conn_new = tipc_subscrb_connect_cb; - topsrv->tipc_conn_release = tipc_subscrb_release_cb; strncpy(topsrv->name, name, strlen(name) + 1); tn->topsrv = topsrv; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index f3edca7..a736f29 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -67,6 +67,10 @@ struct tipc_subscription { struct tipc_event evt; }; +struct tipc_subscriber *tipc_subscrb_create(int conid); +void tipc_subscrb_delete(struct tipc_subscriber *subscriber); +int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, + void *buf, size_t len); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, From patchwork Thu Feb 15 09:40:44 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873718 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="Bf4lkHVP"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrsp5Chdz9t2f for ; Thu, 15 Feb 2018 20:42:38 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755253AbeBOJmf (ORCPT ); Thu, 15 Feb 2018 04:42:35 -0500 Received: from sessmg23.ericsson.net ([193.180.251.45]:53856 "EHLO sessmg23.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755165AbeBOJmb (ORCPT ); Thu, 15 Feb 2018 04:42:31 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687750; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=e6qzJNAFh0uGGY6GGTXGPz0b8nzSG1E0NVExXCkFOuI=; b=Bf4lkHVP1wYh3NBV38NOPq34cYbhENDLT+kp5Dn7DnYN+VpdTBHER4lhfC8dtfHO dOZayFN7YlZQPN0DX4Bb4Wo18IJykcwlj0XkJiRN0fZVXpIolIrw08Y+LCBGV6qa i6otmm2qoCNuiPnDffbHCehrrAZGKgAF8JhGi7z1rgo=; X-AuditID: c1b4fb2d-499ff70000005540-20-5a8556066ca9 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sessmg23.ericsson.net (Symantec Mail Security) with SMTP id 26.29.21824.606558A5; Thu, 15 Feb 2018 10:42:30 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:42:29 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 03/10] tipc: eliminate struct tipc_subscriber Date: Thu, 15 Feb 2018 10:40:44 +0100 Message-ID: <1518687651-26561-4-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFrrALMWRmVeSWpSXmKPExsUyM2K7sS5bWGuUwf55whY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAldHU0stccGsPY8WHZ63MDYxfZzJ2MXJySAiYSFz528HWxcjFISRwmFFi 85RHjBDOdkaJy89vglWxCWhIvJzWAWaLCBhLvFrZyQRSxCzwiVGiZeUzNpCEsICDxLVlD8GK WARUJTY9/QMW5xVwk3h05z07xDo5ifPHfzKD2JwC7hKrWv6xgthCQDWvl7xhgqgXlDg58wkL iM0sICFx8MULZogaZYm5H6YxQcxRkPg2s5tpAqPALCQts5C0LGBkWsUoWpxaXJybbmSsl1qU mVxcnJ+nl5dasokRGM4Ht/zW3cG4+rXjIUYBDkYlHl7OkNYoIdbEsuLK3EOMEhzMSiK8t+2A QrwpiZVVqUX58UWlOanFhxilOViUxHlPevJGCQmkJ5akZqemFqQWwWSZODilGhhN1M6rZbr8 cfLKvr5mtuquFS6TZmixXpw414/fTW1jGnddWMHFlAwhUwUmx/MBBy4Y+iwPt82Q2cFuxNyy OO5kk88RVQ21NMNfuh612pmyfRZbzMV0rq2bvpXnjsePatuP2Svm7qkK7Lv2oZfV7cqkqxx7 bn79acga3mrrdyv9zsZTfk2K1UosxRmJhlrMRcWJAPy+bLRjAgAA Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org It is unnecessary to keep two structures, struct tipc_conn and struct tipc_subscriber, with a one-to-one relationship and still with different life cycles. The fact that the two often run in different contexts, and still may access each other via direct pointers constitutes an additional hazard, something we have experienced at several occasions, and still see happening. We have identified at least two remaining problems that are easier to fix if we simplify the topology server data structure somewhat. - When there is a race between a subscription up/down event and a timeout event, it is fully possible that the former might be delivered after the latter, leading to confusion for the receiver. - The function tipc_subcrp_timeout() is executing in interrupt context, while the following call chain is at least theoretically possible: tipc_subscrp_timeout() tipc_subscrp_send_event() tipc_conn_sendmsg() conn_put() tipc_conn_kref_release() sock_release(sock) I.e., we end up calling a function that might try to sleep in interrupt context. To eliminate this, we need to ensure that the tipc_conn structure and the socket, as well as the subscription instances, only are deleted in work queue context, i.e., after the timeout event really has been sent out. We now remove this unnecessary complexity, by merging data and functionality of the subscriber structure into struct tipc_conn and the associated file server.c. We thereafter add a spinlock and a new 'inactive' state to the subscription structure. Using those, both problems described above can be easily solved. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/server.c | 161 ++++++++++++++++++++++++++++++++------------------ net/tipc/server.h | 2 +- net/tipc/subscr.c | 173 ++++++++++-------------------------------------------- net/tipc/subscr.h | 17 +++--- 4 files changed, 146 insertions(+), 207 deletions(-) diff --git a/net/tipc/server.c b/net/tipc/server.c index 8aa2a33..b8268c0 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -2,6 +2,7 @@ * net/tipc/server.c: TIPC server infrastructure * * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,12 +58,13 @@ * @sock: socket handler associated with connection * @flags: indicates connection state * @server: pointer to connected server + * @sub_list: lsit to all pertaing subscriptions + * @sub_lock: lock protecting the subscription list + * @outqueue_lock: control access to the outqueue * @rwork: receive work item - * @usr_data: user-specified field * @rx_action: what to do when connection socket is active * @outqueue: pointer to first outbound message in queue * @outqueue_lock: control access to the outqueue - * @outqueue: list of connection objects for its server * @swork: send work item */ struct tipc_conn { @@ -71,9 +73,10 @@ struct tipc_conn { struct socket *sock; unsigned long flags; struct tipc_server *server; + struct list_head sub_list; + spinlock_t sub_lock; /* for subscription list */ struct work_struct rwork; int (*rx_action) (struct tipc_conn *con); - void *usr_data; struct list_head outqueue; spinlock_t outqueue_lock; struct work_struct swork; @@ -81,6 +84,7 @@ struct tipc_conn { /* An entry waiting to be sent */ struct outqueue_entry { + u32 evt; struct list_head list; struct kvec iov; }; @@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); static void tipc_clean_outqueues(struct tipc_conn *con); +static bool connected(struct tipc_conn *con) +{ + return con && test_bit(CF_CONNECTED, &con->flags); +} + +/** + * htohl - convert value to endianness used by destination + * @in: value to convert + * @swap: non-zero if endianness must be reversed + * + * Returns converted value + */ +static u32 htohl(u32 in, int swap) +{ + return swap ? swab32(in) : in; +} + static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_server *s = con->server; struct socket *sock = con->sock; - struct sock *sk; if (sock) { - sk = sock->sk; if (test_bit(CF_SERVER, &con->flags)) { __module_get(sock->ops->owner); - __module_get(sk->sk_prot_creator->owner); + __module_get(sock->sk->sk_prot_creator->owner); } sock_release(sock); con->sock = NULL; @@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) spin_lock_bh(&s->idr_lock); con = idr_find(&s->conn_idr, conid); - if (con) { - if (!test_bit(CF_CONNECTED, &con->flags) || - !kref_get_unless_zero(&con->kref)) - con = NULL; - } + if (!connected(con) || !kref_get_unless_zero(&con->kref)) + con = NULL; spin_unlock_bh(&s->idr_lock); return con; } @@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); - if (con && test_bit(CF_CONNECTED, &con->flags)) { + if (connected(con)) { conn_get(con); if (!queue_work(con->server->rcv_wq, &con->rwork)) conn_put(con); @@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); - if (con && test_bit(CF_CONNECTED, &con->flags)) { + if (connected(con)) { conn_get(con); if (!queue_work(con->server->send_wq, &con->swork)) conn_put(con); @@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) write_unlock_bh(&sk->sk_callback_lock); } +/* tipc_con_delete_sub - delete a specific or all subscriptions + * for a given subscriber + */ +static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) +{ + struct list_head *sub_list = &con->sub_list; + struct tipc_subscription *sub, *tmp; + + spin_lock_bh(&con->sub_lock); + list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) + tipc_sub_delete(sub); + else if (s) + break; + } + spin_unlock_bh(&con->sub_lock); +} + static void tipc_close_conn(struct tipc_conn *con) { struct sock *sk = con->sock->sk; @@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con) write_lock_bh(&sk->sk_callback_lock); disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); + if (disconnect) { sk->sk_user_data = NULL; if (con->conid) - tipc_subscrb_delete(con->usr_data); + tipc_con_delete_sub(con, NULL); } write_unlock_bh(&sk->sk_callback_lock); @@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) kref_init(&con->kref); INIT_LIST_HEAD(&con->outqueue); + INIT_LIST_HEAD(&con->sub_list); spin_lock_init(&con->outqueue_lock); + spin_lock_init(&con->sub_lock); INIT_WORK(&con->swork, tipc_send_work); INIT_WORK(&con->rwork, tipc_recv_work); @@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) return con; } +int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, + void *buf, size_t len) +{ + struct tipc_subscr *s = (struct tipc_subscr *)buf; + struct tipc_subscription *sub; + bool status; + int swap; + + /* Determine subscriber's endianness */ + swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | + TIPC_SUB_CANCEL)); + + /* Detect & process a subscription cancellation request */ + if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { + s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + tipc_con_delete_sub(con, s); + return 0; + } + status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); + sub = tipc_subscrp_subscribe(net, s, conid, swap, status); + if (!sub) + return -1; + + spin_lock_bh(&con->sub_lock); + list_add(&sub->subscrp_list, &con->sub_list); + spin_unlock_bh(&con->sub_lock); + return 0; +} + static int tipc_receive_from_sock(struct tipc_conn *con) { struct tipc_server *s = con->server; @@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con) } read_lock_bh(&sk->sk_callback_lock); - if (test_bit(CF_CONNECTED, &con->flags)) - ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid, - con->usr_data, buf, ret); + ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret); read_unlock_bh(&sk->sk_callback_lock); kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) @@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con) newcon->rx_action = tipc_receive_from_sock; tipc_register_callbacks(newsock, newcon); - /* Notify that new connection is incoming */ - newcon->usr_data = tipc_subscrb_create(newcon->conid); - - if (!newcon->usr_data) { - sock_release(newsock); - conn_put(newcon); - return -ENOMEM; - } - /* Wake up receive process in case of 'SYN+' message */ newsock->sk->sk_data_ready(newsock->sk); return ret; @@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con) } int tipc_conn_sendmsg(struct tipc_server *s, int conid, - void *data, size_t len) + u32 evt, void *data, size_t len) { struct outqueue_entry *e; struct tipc_conn *con; @@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, if (!con) return -EINVAL; - if (!test_bit(CF_CONNECTED, &con->flags)) { + if (!connected(con)) { conn_put(con); return 0; } @@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, conn_put(con); return -ENOMEM; } - + e->evt = evt; spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); @@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid) { - struct tipc_subscriber *scbr; struct tipc_subscr sub; - struct tipc_server *s; struct tipc_conn *con; + int rc; sub.seq.type = type; sub.seq.lower = lower; @@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, return false; *conid = con->conid; - s = con->server; - scbr = tipc_subscrb_create(*conid); - if (!scbr) { - conn_put(con); - return false; - } - - con->usr_data = scbr; con->sock = NULL; - tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); - return true; + rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub)); + if (rc < 0) + tipc_close_conn(con); + return !rc; } void tipc_topsrv_kern_unsubscr(struct net *net, int conid) { struct tipc_conn *con; - struct tipc_server *srv; con = tipc_conn_lookup(tipc_topsrv(net), conid); if (!con) return; test_and_clear_bit(CF_CONNECTED, &con->flags); - srv = con->server; - if (con->conid) - tipc_subscrb_delete(con->usr_data); + tipc_con_delete_sub(con, NULL); conn_put(con); conn_put(con); } @@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) static void tipc_send_to_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; + struct list_head *queue = &con->outqueue; + struct tipc_server *srv = con->server; struct outqueue_entry *e; struct tipc_event *evt; struct msghdr msg; @@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con) int ret; spin_lock_bh(&con->outqueue_lock); - while (test_bit(CF_CONNECTED, &con->flags)) { - e = list_entry(con->outqueue.next, struct outqueue_entry, list); - if ((struct list_head *) e == &con->outqueue) - break; + + while (!list_empty(queue)) { + e = list_first_entry(queue, struct outqueue_entry, list); spin_unlock_bh(&con->outqueue_lock); + if (e->evt == TIPC_SUBSCR_TIMEOUT) { + evt = (struct tipc_event *)e->iov.iov_base; + tipc_con_delete_sub(con, &evt->s); + } + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + if (con->sock) { - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, e->iov.iov_len); if (ret == -EWOULDBLOCK || ret == 0) { @@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) } } else { evt = e->iov.iov_base; - tipc_send_kern_top_evt(s->net, evt); + tipc_send_kern_top_evt(srv->net, evt); } /* Don't starve users filling buffers */ @@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) cond_resched(); count = 0; } - spin_lock_bh(&con->outqueue_lock); list_del(&e->list); tipc_free_entry(e); @@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work) struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); int count = 0; - while (test_bit(CF_CONNECTED, &con->flags)) { + while (connected(con)) { if (con->rx_action(con)) break; @@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work) { struct tipc_conn *con = container_of(work, struct tipc_conn, swork); - if (test_bit(CF_CONNECTED, &con->flags)) + if (connected(con)) tipc_send_to_sock(con); conn_put(con); diff --git a/net/tipc/server.h b/net/tipc/server.h index b4b83bd..fcc7232 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -77,7 +77,7 @@ struct tipc_server { }; int tipc_conn_sendmsg(struct tipc_server *s, int conid, - void *data, size_t len); + u32 evt, void *data, size_t len); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index b86fbbf..c6de145 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -1,7 +1,7 @@ /* * net/tipc/subscr.c: TIPC network topology service * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2017, Ericsson AB * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -39,22 +39,6 @@ #include "subscr.h" /** - * struct tipc_subscriber - TIPC network topology subscriber - * @kref: reference counter to tipc_subscription object - * @conid: connection identifier to server connecting to subscriber - * @lock: control access to subscriber - * @subscrp_list: list of subscription objects for this subscriber - */ -struct tipc_subscriber { - struct kref kref; - int conid; - spinlock_t lock; - struct list_head subscrp_list; -}; - -static void tipc_subscrb_put(struct tipc_subscriber *subscriber); - -/** * htohl - convert value to endianness used by destination * @in: value to convert * @swap: non-zero if endianness must be reversed @@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 event, u32 port_ref, u32 node) { struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct tipc_subscriber *subscriber = sub->subscriber; struct kvec msg_sect; + if (sub->inactive) + return; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); sub->evt.event = htohl(event, sub->swap); @@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, subscriber->conid, + tipc_conn_sendmsg(tn->topsrv, sub->conid, event, msg_sect.iov_base, msg_sect.iov_len); } @@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, return; if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) return; - - tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, - node); + spin_lock(&sub->lock); + tipc_subscrp_send_event(sub, found_lower, found_upper, + event, port_ref, node); + spin_unlock(&sub->lock); } static void tipc_subscrp_timeout(struct timer_list *t) { struct tipc_subscription *sub = from_timer(sub, t, timer); - struct tipc_subscriber *subscriber = sub->subscriber; - - spin_lock_bh(&subscriber->lock); - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - spin_unlock_bh(&subscriber->lock); + struct tipc_subscr *s = &sub->evt.s; - /* Notify subscriber of timeout */ - tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, + spin_lock(&sub->lock); + tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper, TIPC_SUBSCR_TIMEOUT, 0, 0); - - tipc_subscrp_put(sub); -} - -static void tipc_subscrb_kref_release(struct kref *kref) -{ - kfree(container_of(kref,struct tipc_subscriber, kref)); -} - -static void tipc_subscrb_put(struct tipc_subscriber *subscriber) -{ - kref_put(&subscriber->kref, tipc_subscrb_kref_release); -} - -static void tipc_subscrb_get(struct tipc_subscriber *subscriber) -{ - kref_get(&subscriber->kref); + sub->inactive = true; + spin_unlock(&sub->lock); } static void tipc_subscrp_kref_release(struct kref *kref) @@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref) struct tipc_subscription, kref); struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct tipc_subscriber *subscriber = sub->subscriber; atomic_dec(&tn->subscription_count); kfree(sub); - tipc_subscrb_put(subscriber); } void tipc_subscrp_put(struct tipc_subscription *subscription) @@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -/* tipc_subscrb_subscrp_delete - delete a specific subscription or all - * subscriptions for a given subscriber. - */ -static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber, - struct tipc_subscr *s) -{ - struct list_head *subscription_list = &subscriber->subscrp_list; - struct tipc_subscription *sub, *temp; - u32 timeout; - - spin_lock_bh(&subscriber->lock); - list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) { - if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) - continue; - - timeout = htohl(sub->evt.s.timeout, sub->swap); - if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) { - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - tipc_subscrp_put(sub); - } - - if (s) - break; - } - spin_unlock_bh(&subscriber->lock); -} - -struct tipc_subscriber *tipc_subscrb_create(int conid) -{ - struct tipc_subscriber *subscriber; - - subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC); - if (!subscriber) { - pr_warn("Subscriber rejected, no memory\n"); - return NULL; - } - INIT_LIST_HEAD(&subscriber->subscrp_list); - kref_init(&subscriber->kref); - subscriber->conid = conid; - spin_lock_init(&subscriber->lock); - - return subscriber; -} - -void tipc_subscrb_delete(struct tipc_subscriber *subscriber) -{ - tipc_subscrb_subscrp_delete(subscriber, NULL); - tipc_subscrb_put(subscriber); -} - -static void tipc_subscrp_cancel(struct tipc_subscr *s, - struct tipc_subscriber *subscriber) -{ - tipc_subscrb_get(subscriber); - tipc_subscrb_subscrp_delete(subscriber, s); - tipc_subscrb_put(subscriber); -} - static struct tipc_subscription *tipc_subscrp_create(struct net *net, struct tipc_subscr *s, - int swap) + int conid, bool swap) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_subscription *sub; @@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, /* Initialize subscription object */ sub->net = net; + sub->conid = conid; + sub->inactive = false; if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { pr_warn("Subscription rejected, illegal request\n"); @@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); + spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); return sub; } -static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, - struct tipc_subscriber *subscriber, int swap, - bool status) +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, + struct tipc_subscr *s, + int conid, bool swap, + bool status) { struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(net, s, swap); + sub = tipc_subscrp_create(net, s, conid, swap); if (!sub) - return -1; + return NULL; - spin_lock_bh(&subscriber->lock); - list_add(&sub->subscrp_list, &subscriber->subscrp_list); - sub->subscriber = subscriber; tipc_nametbl_subscribe(sub, status); - tipc_subscrb_get(subscriber); - spin_unlock_bh(&subscriber->lock); - timer_setup(&sub->timer, tipc_subscrp_timeout, 0); timeout = htohl(sub->evt.s.timeout, swap); - if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); - return 0; + return sub; } -/* Handle one request to create a new subscription for the subscriber - */ -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, - void *buf, size_t len) +void tipc_sub_delete(struct tipc_subscription *sub) { - struct tipc_subscriber *subscriber = usr_data; - struct tipc_subscr *s = (struct tipc_subscr *)buf; - bool status; - int swap; - - /* Determine subscriber's endianness */ - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | - TIPC_SUB_CANCEL)); - - /* Detect & process a subscription cancellation request */ - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); - tipc_subscrp_cancel(s, subscriber); - return 0; - } - status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - return tipc_subscrp_subscribe(net, s, subscriber, swap, status); + tipc_nametbl_unsubscribe(sub); + if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) + del_timer_sync(&sub->timer); + list_del(&sub->subscrp_list); + tipc_subscrp_put(sub); } int tipc_topsrv_start(struct net *net) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index a736f29..cfd0cb3 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -43,7 +43,7 @@ #define TIPC_MAX_PUBLICATIONS 65535 struct tipc_subscription; -struct tipc_subscriber; +struct tipc_conn; /** * struct tipc_subscription - TIPC network topology subscription object @@ -58,19 +58,22 @@ struct tipc_subscriber; */ struct tipc_subscription { struct kref kref; - struct tipc_subscriber *subscriber; struct net *net; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; - int swap; struct tipc_event evt; + int conid; + bool swap; + bool inactive; + spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscriber *tipc_subscrb_create(int conid); -void tipc_subscrb_delete(struct tipc_subscriber *subscriber); -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, - void *buf, size_t len); +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, + struct tipc_subscr *s, + int conid, bool swap, + bool status); +void tipc_sub_delete(struct tipc_subscription *sub); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, From patchwork Thu Feb 15 09:40:45 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873719 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="TrXSsDr/"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrtH5jvfz9t2f for ; Thu, 15 Feb 2018 20:43:03 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755300AbeBOJm6 (ORCPT ); Thu, 15 Feb 2018 04:42:58 -0500 Received: from sessmg23.ericsson.net ([193.180.251.45]:52406 "EHLO sessmg23.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755192AbeBOJm4 (ORCPT ); Thu, 15 Feb 2018 04:42:56 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687775; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=c7ZyEeeckfWGzyo0aEZZ6X79RO80kdh36y5ewnGqMoE=; b=TrXSsDr/5SaEfcg/MgQNWixWhUsR7qWy113Rw5ZC7j1RNSQDjPaQKgRPBLaivZFX D7Mz4Z4FDaehvJJKpWpSqZB9/Is94c7QR+MwdHfweRGhbBfHN2rW2tKKz3A/Afm1 0BQs61FOJ/sqASQE8QibMDZz9M9WzmjInb2qzN5jFFk=; X-AuditID: c1b4fb2d-499ff70000005540-a8-5a85561fd761 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sessmg23.ericsson.net (Symantec Mail Security) with SMTP id AF.39.21824.F16558A5; Thu, 15 Feb 2018 10:42:55 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:42:54 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 04/10] tipc: simplify interaction between subscription and topology connection Date: Thu, 15 Feb 2018 10:40:45 +0100 Message-ID: <1518687651-26561-5-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrPLMWRmVeSWpSXmKPExsUyM2K7sa58WGuUwb9TchY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlfHh/yqWgiMNjBUf901ja2A8kt7FyMkhIWAicbfnJ2MXIxeHkMBhRonN LQuYIJztjBLbHt9gAqliE9CQeDmtgxHEFhEwlni1shOsiFngE6NEy8pnbCAJYYFEifY1Z8Bs FgFViXttG8CaeQXcJP5t62WBWCcncf74T2YQm1PAXWJVyz9WEFsIqOb1kjdQ9YISJ2c+Aatn FpCQOPjiBTNEjbLE3A/TmCDmKEh8m9nNNIFRYBaSlllIWhYwMq1iFC1OLS7OTTcy1kstykwu Ls7P08tLLdnECAzmg1t+6+5gXP3a8RCjAAejEg8vZ0hrlBBrYllxZe4hRgkOZiUR3tt2QCHe lMTKqtSi/Pii0pzU4kOM0hwsSuK8Jz15o4QE0hNLUrNTUwtSi2CyTBycUg2MOQU/5x2O3FFS 6LxzgugNrqU8TlnBDTcqpFsusx9/5bvEM27RzlWqQafW5K6Y/DxBhZGp/MSu4iNme5qvuLH2 eHpe/phTfT5X1Gdd/mrBs56233+JvH2TPanTr8d8itmF1S7m0lNYLutMDk/JSvleXjbBaa3e Xpf7X/tv/z3z9WyLSeUp4f/rlViKMxINtZiLihMB5jhK12ICAAA= Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org The message transmission and reception in the topology server is more generic than is currently necessary. By basing the funtionality on the fact that we only send items of type struct tipc_event and always receive items of struct tipc_subcr we can make several simplifications, and also get rid of some unnecessary dynamic memory allocations. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/name_table.c | 10 +-- net/tipc/server.c | 170 +++++++++++++++++--------------------------------- net/tipc/server.h | 12 +--- net/tipc/subscr.c | 40 ++++++------ net/tipc/subscr.h | 5 +- 5 files changed, 88 insertions(+), 149 deletions(-) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index ed0457c..c0ca7be 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, */ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) { - struct tipc_net *tn = net_generic(s->net, tipc_net_id); + struct tipc_server *srv = s->server; + struct tipc_net *tn = tipc_net(srv->net); u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); int index = hash(type); struct name_seq *seq; struct tipc_name_seq ns; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, type); + seq = nametbl_find_seq(srv->net, type); if (!seq) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { @@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *s) { - struct tipc_net *tn = net_generic(s->net, tipc_net_id); + struct tipc_server *srv = s->server; + struct tipc_net *tn = tipc_net(srv->net); struct name_seq *seq; u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(s->net, type); + seq = nametbl_find_seq(srv->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&s->nameseq_list); diff --git a/net/tipc/server.c b/net/tipc/server.c index b8268c0..7933fb9 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -84,9 +84,9 @@ struct tipc_conn { /* An entry waiting to be sent */ struct outqueue_entry { - u32 evt; + bool inactive; + struct tipc_event evt; struct list_head list; - struct kvec iov; }; static void tipc_recv_work(struct work_struct *work); @@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) return con; } +/* sock_data_ready - interrupt callback indicating the socket has data to read + * The queued job is launched in tipc_recv_from_sock() + */ static void sock_data_ready(struct sock *sk) { struct tipc_conn *con; @@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk) read_unlock_bh(&sk->sk_callback_lock); } +/* sock_write_space - interrupt callback after a sendmsg EAGAIN + * Indicates that there now is more is space in the send buffer + * The queued job is launched in tipc_send_to_sock() + */ static void sock_write_space(struct sock *sk) { struct tipc_conn *con; @@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) return con; } -int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, - void *buf, size_t len) +static int tipc_con_rcv_sub(struct tipc_server *srv, + struct tipc_conn *con, + struct tipc_subscr *s) { - struct tipc_subscr *s = (struct tipc_subscr *)buf; struct tipc_subscription *sub; bool status; int swap; @@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, return 0; } status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - sub = tipc_subscrp_subscribe(net, s, conid, swap, status); + sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status); if (!sub) return -1; @@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, static int tipc_receive_from_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; + struct tipc_server *srv = con->server; struct sock *sk = con->sock->sk; struct msghdr msg = {}; + struct tipc_subscr s; struct kvec iov; - void *buf; int ret; - buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); - if (!buf) { - ret = -ENOMEM; - goto out_close; - } - - iov.iov_base = buf; - iov.iov_len = s->max_rcvbuf_size; + iov.iov_base = &s; + iov.iov_len = sizeof(s); msg.msg_name = NULL; iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); - if (ret <= 0) { - kmem_cache_free(s->rcvbuf_cache, buf); - goto out_close; + if (ret == -EWOULDBLOCK) + return -EWOULDBLOCK; + if (ret > 0) { + read_lock_bh(&sk->sk_callback_lock); + ret = tipc_con_rcv_sub(srv, con, &s); + read_unlock_bh(&sk->sk_callback_lock); } - - read_lock_bh(&sk->sk_callback_lock); - ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret); - read_unlock_bh(&sk->sk_callback_lock); - kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) - tipc_conn_terminate(s, con->conid); - return ret; - -out_close: - if (ret != -EWOULDBLOCK) tipc_close_conn(con); - else if (ret == 0) - /* Don't return success if we really got EOF */ - ret = -EAGAIN; return ret; } @@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s) return 0; } -static struct outqueue_entry *tipc_alloc_entry(void *data, int len) -{ - struct outqueue_entry *entry; - void *buf; - - entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); - if (!entry) - return NULL; - - buf = kmemdup(data, len, GFP_ATOMIC); - if (!buf) { - kfree(entry); - return NULL; - } - - entry->iov.iov_base = buf; - entry->iov.iov_len = len; - - return entry; -} - -static void tipc_free_entry(struct outqueue_entry *e) -{ - kfree(e->iov.iov_base); - kfree(e); -} - static void tipc_clean_outqueues(struct tipc_conn *con) { struct outqueue_entry *e, *safe; @@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con) spin_lock_bh(&con->outqueue_lock); list_for_each_entry_safe(e, safe, &con->outqueue, list) { list_del(&e->list); - tipc_free_entry(e); + kfree(e); } spin_unlock_bh(&con->outqueue_lock); } -int tipc_conn_sendmsg(struct tipc_server *s, int conid, - u32 evt, void *data, size_t len) +/* tipc_conn_queue_evt - interrupt level call from a subscription instance + * The queued job is launched in tipc_send_to_sock() + */ +void tipc_conn_queue_evt(struct tipc_server *s, int conid, + u32 event, struct tipc_event *evt) { struct outqueue_entry *e; struct tipc_conn *con; con = tipc_conn_lookup(s, conid); if (!con) - return -EINVAL; + return; - if (!connected(con)) { - conn_put(con); - return 0; - } + if (!connected(con)) + goto err; - e = tipc_alloc_entry(data, len); - if (!e) { - conn_put(con); - return -ENOMEM; - } - e->evt = evt; + e = kmalloc(sizeof(*e), GFP_ATOMIC); + if (!e) + goto err; + e->inactive = (event == TIPC_SUBSCR_TIMEOUT); + memcpy(&e->evt, evt, sizeof(*evt)); spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (!queue_work(s->send_wq, &con->swork)) - conn_put(con); - return 0; -} - -void tipc_conn_terminate(struct tipc_server *s, int conid) -{ - struct tipc_conn *con; - - con = tipc_conn_lookup(s, conid); - if (con) { - tipc_close_conn(con); - conn_put(con); - } + if (queue_work(s->send_wq, &con->swork)) + return; +err: + conn_put(con); } bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, @@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, *conid = con->conid; con->sock = NULL; - rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub)); + rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub); if (rc < 0) tipc_close_conn(con); return !rc; @@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) struct outqueue_entry *e; struct tipc_event *evt; struct msghdr msg; + struct kvec iov; int count = 0; int ret; @@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con) while (!list_empty(queue)) { e = list_first_entry(queue, struct outqueue_entry, list); - + evt = &e->evt; spin_unlock_bh(&con->outqueue_lock); - if (e->evt == TIPC_SUBSCR_TIMEOUT) { - evt = (struct tipc_event *)e->iov.iov_base; + if (e->inactive) tipc_con_delete_sub(con, &evt->s); - } + memset(&msg, 0, sizeof(msg)); msg.msg_flags = MSG_DONTWAIT; + iov.iov_base = evt; + iov.iov_len = sizeof(*evt); + msg.msg_name = NULL; if (con->sock) { - ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, - e->iov.iov_len); + ret = kernel_sendmsg(con->sock, &msg, &iov, + 1, sizeof(*evt)); if (ret == -EWOULDBLOCK || ret == 0) { cond_resched(); goto out; } else if (ret < 0) { - goto send_err; + goto err; } } else { - evt = e->iov.iov_base; tipc_send_kern_top_evt(srv->net, evt); } @@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con) } spin_lock_bh(&con->outqueue_lock); list_del(&e->list); - tipc_free_entry(e); + kfree(e); } spin_unlock_bh(&con->outqueue_lock); out: return; - -send_err: +err: tipc_close_conn(con); } @@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s) idr_init(&s->conn_idr); s->idr_in_use = 0; - s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, - 0, SLAB_HWCACHE_ALIGN, NULL); - if (!s->rcvbuf_cache) - return -ENOMEM; - ret = tipc_work_start(s); - if (ret < 0) { - kmem_cache_destroy(s->rcvbuf_cache); + if (ret < 0) return ret; - } + ret = tipc_open_listening_sock(s); - if (ret < 0) { + if (ret < 0) tipc_work_stop(s); - kmem_cache_destroy(s->rcvbuf_cache); - return ret; - } + return ret; } @@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s) spin_unlock_bh(&s->idr_lock); tipc_work_stop(s); - kmem_cache_destroy(s->rcvbuf_cache); idr_destroy(&s->conn_idr); } diff --git a/net/tipc/server.h b/net/tipc/server.h index fcc7232..2de8709 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -36,6 +36,7 @@ #ifndef _TIPC_SERVER_H #define _TIPC_SERVER_H +#include "core.h" #include #include #include @@ -68,7 +69,6 @@ struct tipc_server { spinlock_t idr_lock; int idr_in_use; struct net *net; - struct kmem_cache *rcvbuf_cache; struct workqueue_struct *rcv_wq; struct workqueue_struct *send_wq; int max_rcvbuf_size; @@ -76,19 +76,13 @@ struct tipc_server { char name[TIPC_SERVER_NAME_LEN]; }; -int tipc_conn_sendmsg(struct tipc_server *s, int conid, - u32 evt, void *data, size_t len); +void tipc_conn_queue_evt(struct tipc_server *s, int conid, + u32 event, struct tipc_event *evt); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid); -/** - * tipc_conn_terminate - terminate connection with server - * - * Note: Must call it in process context since it might sleep - */ -void tipc_conn_terminate(struct tipc_server *s, int conid); int tipc_server_start(struct tipc_server *s); void tipc_server_stop(struct tipc_server *s); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c6de145..c3e0b924 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -52,22 +52,19 @@ static u32 htohl(u32 in, int swap) static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, - u32 event, u32 port_ref, u32 node) + u32 event, u32 port, u32 node) { - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct kvec msg_sect; + struct tipc_event *evt = &sub->evt; + bool swap = sub->swap; if (sub->inactive) return; - msg_sect.iov_base = (void *)&sub->evt; - msg_sect.iov_len = sizeof(struct tipc_event); - sub->evt.event = htohl(event, sub->swap); - sub->evt.found_lower = htohl(found_lower, sub->swap); - sub->evt.found_upper = htohl(found_upper, sub->swap); - sub->evt.port.ref = htohl(port_ref, sub->swap); - sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, sub->conid, event, - msg_sect.iov_base, msg_sect.iov_len); + evt->event = htohl(event, swap); + evt->found_lower = htohl(found_lower, swap); + evt->found_upper = htohl(found_upper, swap); + evt->port.ref = htohl(port, swap); + evt->port.node = htohl(node, swap); + tipc_conn_queue_evt(sub->server, sub->conid, event, evt); } /** @@ -137,10 +134,11 @@ static void tipc_subscrp_timeout(struct timer_list *t) static void tipc_subscrp_kref_release(struct kref *kref) { - struct tipc_subscription *sub = container_of(kref, - struct tipc_subscription, - kref); - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); + struct tipc_subscription *sub; + struct tipc_net *tn; + + sub = container_of(kref, struct tipc_subscription, kref); + tn = tipc_net(sub->server->net); atomic_dec(&tn->subscription_count); kfree(sub); @@ -156,11 +154,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -static struct tipc_subscription *tipc_subscrp_create(struct net *net, +static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap) { - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; u32 filter = htohl(s->filter, swap); @@ -179,7 +177,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, } /* Initialize subscription object */ - sub->net = net; + sub->server = srv; sub->conid = conid; sub->inactive = false; if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || @@ -197,7 +195,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, return sub; } -struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, +struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap, bool status) @@ -205,7 +203,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(net, s, conid, swap); + sub = tipc_subscrp_create(srv, s, conid, swap); if (!sub) return NULL; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index cfd0cb3..64ffd32 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -49,7 +49,6 @@ struct tipc_conn; * struct tipc_subscription - TIPC network topology subscription object * @subscriber: pointer to its subscriber * @seq: name sequence associated with subscription - * @net: point to network namespace * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list * @subscrp_list: adjacent subscriptions in subscriber's subscription list @@ -58,7 +57,7 @@ struct tipc_conn; */ struct tipc_subscription { struct kref kref; - struct net *net; + struct tipc_server *server; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; @@ -69,7 +68,7 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, +struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, int conid, bool swap, bool status); From patchwork Thu Feb 15 09:40:46 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873721 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="heg00RZW"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrtf2TpCz9t2f for ; Thu, 15 Feb 2018 20:43:22 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755302AbeBOJnT (ORCPT ); Thu, 15 Feb 2018 04:43:19 -0500 Received: from sesbmg22.ericsson.net ([193.180.251.48]:53644 "EHLO sesbmg22.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755186AbeBOJnR (ORCPT ); Thu, 15 Feb 2018 04:43:17 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687796; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=bFJ/NKOMEy3QjfoWMTXvVbw6HJCfIHv5a4C2P1ZIuuk=; b=heg00RZW6PLQC188Ya/UVI9IrEjeIe4uOcbj/zt4BLKdAxBYe9vr/XbmkvCi27fS /blD8K1iZAImRxf+ViiCF31aCSm8On9XsYtwBogP+E+O4kV0emBGH+oPqHfU5gNe 6mtLr/H3lAmgC171EVrJAzd8Hg6Pn6ewPXZ6k2Xc83c=; X-AuditID: c1b4fb30-399ff70000004778-a0-5a8556346530 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sesbmg22.ericsson.net (Symantec Mail Security) with SMTP id 08.A8.18296.436558A5; Thu, 15 Feb 2018 10:43:16 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:43:15 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 05/10] tipc: simplify endianness handling in topology subscriber Date: Thu, 15 Feb 2018 10:40:46 +0100 Message-ID: <1518687651-26561-6-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrDLMWRmVeSWpSXmKPExsUyM2K7sa5JWGuUwbFtghY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlfHnQztbwbeiihMTbzA2MN6I7WLk5JAQMJFo//WGvYuRi0NI4DCjxOO3 N4AcDiBnO6NEhxJIDZuAhsTLaR2MILaIgLHEq5WdTCD1zAKfGCVaVj5jA0kICwRLbDo4gR3E ZhFQlZj59A4TiM0r4CbxvH83C8QyOYnzx38yg9icAu4Sq1r+sYLYQkA1r5e8gaoXlDg58wlY PbOAhMTBFy+YIWqUJeZ+mMYEMUdB4tvMbqYJjAKzkLTMQtKygJFpFaNocWpxUm66kZFealFm cnFxfp5eXmrJJkZgGB/c8ttgB+PL546HGAU4GJV4eK1DWqOEWBPLiitzDzFKcDArifDetgMK 8aYkVlalFuXHF5XmpBYfYpTmYFES5z3pyRslJJCeWJKanZpakFoEk2Xi4JRqYGRLmC6+PfvU kjXaUTeCjJIYJ73jvxdar9OWz9t4gOfbj4lsWjyvBZ0Ntj+dk3fjuIGJR4D5/Ml1Zkr3LXhT 2f1zfh+u6dp/gO2V9P5tiX86XaaeTbOxTYmd+2/N0vIT/w9EeD6Iif1iy38rLu9chtXtXnZ2 y+uP1Y9rP5x9Wif82YyPQUErzyixFGckGmoxFxUnAgCvxg/GXwIAAA== Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org Because of the requirement for total distribution transparency, users send subscriptions and receive topology events in their own host format. It is up to the topology server to determine this format and do the correct conversions to and from its own host format when needed. Until now, this has been handled in a rather non-transparent way inside the topology server and subscriber code, leading to unnecessary complexity when creating subscriptions and issuing events. We now improve this situation by adding two new macros, tipc_sub_read() and tipc_evt_write(). Both those functions calculate the need for conversion internally before performing their respective operations. Hence, all handling of such conversions become transparent to the rest of the code. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/name_table.c | 42 +++++++++++++++----------- net/tipc/name_table.h | 2 +- net/tipc/server.c | 25 ++-------------- net/tipc/subscr.c | 83 +++++++++++++++++++-------------------------------- net/tipc/subscr.h | 36 ++++++++++++++++------ 5 files changed, 86 insertions(+), 102 deletions(-) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index c0ca7be..2fbd0a2 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -412,18 +412,22 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net, * sequence overlapping with the requested sequence */ static void tipc_nameseq_subscribe(struct name_seq *nseq, - struct tipc_subscription *s, - bool status) + struct tipc_subscription *sub) { struct sub_seq *sseq = nseq->sseqs; struct tipc_name_seq ns; + struct tipc_subscr *s = &sub->evt.s; + bool no_status; - tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); + ns.type = tipc_sub_read(s, seq.type); + ns.lower = tipc_sub_read(s, seq.lower); + ns.upper = tipc_sub_read(s, seq.upper); + no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS; - tipc_subscrp_get(s); - list_add(&s->nameseq_list, &nseq->subscriptions); + tipc_subscrp_get(sub); + list_add(&sub->nameseq_list, &nseq->subscriptions); - if (!status || !sseq) + if (no_status || !sseq) return; while (sseq != &nseq->sseqs[nseq->first_free]) { @@ -433,7 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, int must_report = 1; list_for_each_entry(crs, &info->zone_list, zone_list) { - tipc_subscrp_report_overlap(s, sseq->lower, + tipc_subscrp_report_overlap(sub, sseq->lower, sseq->upper, TIPC_PUBLISHED, crs->ref, crs->node, @@ -808,11 +812,12 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, /** * tipc_nametbl_subscribe - add a subscription object to the name table */ -void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) +void tipc_nametbl_subscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = s->server; + struct tipc_server *srv = sub->server; struct tipc_net *tn = tipc_net(srv->net); - u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); + struct tipc_subscr *s = &sub->evt.s; + u32 type = tipc_sub_read(s, seq.type); int index = hash(type); struct name_seq *seq; struct tipc_name_seq ns; @@ -823,10 +828,12 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { spin_lock_bh(&seq->lock); - tipc_nameseq_subscribe(seq, s, status); + tipc_nameseq_subscribe(seq, sub); spin_unlock_bh(&seq->lock); } else { - tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); + ns.type = tipc_sub_read(s, seq.type); + ns.lower = tipc_sub_read(s, seq.lower); + ns.upper = tipc_sub_read(s, seq.upper); pr_warn("Failed to create subscription for {%u,%u,%u}\n", ns.type, ns.lower, ns.upper); } @@ -836,19 +843,20 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status) /** * tipc_nametbl_unsubscribe - remove a subscription object from name table */ -void tipc_nametbl_unsubscribe(struct tipc_subscription *s) +void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = s->server; + struct tipc_server *srv = sub->server; + struct tipc_subscr *s = &sub->evt.s; struct tipc_net *tn = tipc_net(srv->net); struct name_seq *seq; - u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); + u32 type = tipc_sub_read(s, seq.type); spin_lock_bh(&tn->nametbl_lock); seq = nametbl_find_seq(srv->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); - list_del_init(&s->nameseq_list); - tipc_subscrp_put(s); + list_del_init(&sub->nameseq_list); + tipc_subscrp_put(sub); if (!seq->first_free && list_empty(&seq->subscriptions)) { hlist_del_init_rcu(&seq->ns_list); kfree(seq->sseqs); diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index f56e7cb..1765260 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -120,7 +120,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, u32 lower, u32 node, u32 ref, u32 key); -void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status); +void tipc_nametbl_subscribe(struct tipc_subscription *s); void tipc_nametbl_unsubscribe(struct tipc_subscription *s); int tipc_nametbl_init(struct net *net); void tipc_nametbl_stop(struct net *net); diff --git a/net/tipc/server.c b/net/tipc/server.c index 7933fb9..5d231fa 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -98,18 +98,6 @@ static bool connected(struct tipc_conn *con) return con && test_bit(CF_CONNECTED, &con->flags); } -/** - * htohl - convert value to endianness used by destination - * @in: value to convert - * @swap: non-zero if endianness must be reversed - * - * Returns converted value - */ -static u32 htohl(u32 in, int swap) -{ - return swap ? swab32(in) : in; -} - static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); @@ -285,21 +273,12 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, struct tipc_subscr *s) { struct tipc_subscription *sub; - bool status; - int swap; - - /* Determine subscriber's endianness */ - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | - TIPC_SUB_CANCEL)); - /* Detect & process a subscription cancellation request */ - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { tipc_con_delete_sub(con, s); return 0; } - status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status); + sub = tipc_subscrp_subscribe(srv, s, con->conid); if (!sub) return -1; diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c3e0b924..406b09f 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -38,32 +38,19 @@ #include "name_table.h" #include "subscr.h" -/** - * htohl - convert value to endianness used by destination - * @in: value to convert - * @swap: non-zero if endianness must be reversed - * - * Returns converted value - */ -static u32 htohl(u32 in, int swap) -{ - return swap ? swab32(in) : in; -} - static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 found_lower, u32 found_upper, u32 event, u32 port, u32 node) { struct tipc_event *evt = &sub->evt; - bool swap = sub->swap; if (sub->inactive) return; - evt->event = htohl(event, swap); - evt->found_lower = htohl(found_lower, swap); - evt->found_upper = htohl(found_upper, swap); - evt->port.ref = htohl(port, swap); - evt->port.node = htohl(node, swap); + tipc_evt_write(evt, event, event); + tipc_evt_write(evt, found_lower, found_lower); + tipc_evt_write(evt, found_upper, found_upper); + tipc_evt_write(evt, port.ref, port); + tipc_evt_write(evt, port.node, node); tipc_conn_queue_evt(sub->server, sub->conid, event, evt); } @@ -85,29 +72,22 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, return 1; } -u32 tipc_subscrp_convert_seq_type(u32 type, int swap) +void tipc_subscrp_report_overlap(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must) { - return htohl(type, swap); -} - -void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, - struct tipc_name_seq *out) -{ - out->type = htohl(in->type, swap); - out->lower = htohl(in->lower, swap); - out->upper = htohl(in->upper, swap); -} - -void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, - u32 found_upper, u32 event, u32 port_ref, - u32 node, u32 scope, int must) -{ - u32 filter = htohl(sub->evt.s.filter, sub->swap); struct tipc_name_seq seq; + struct tipc_subscr *s = &sub->evt.s; + u32 filter = tipc_sub_read(s, filter); + + seq.type = tipc_sub_read(s, seq.type); + seq.lower = tipc_sub_read(s, seq.lower); + seq.upper = tipc_sub_read(s, seq.upper); - tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) return; + if (!must && !(filter & TIPC_SUB_PORTS)) return; if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE) @@ -116,7 +96,7 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, return; spin_lock(&sub->lock); tipc_subscrp_send_event(sub, found_lower, found_upper, - event, port_ref, node); + event, port, node); spin_unlock(&sub->lock); } @@ -156,11 +136,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, struct tipc_subscr *s, - int conid, bool swap) + int conid) { struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; - u32 filter = htohl(s->filter, swap); + u32 filter = tipc_sub_read(s, filter); /* Refuse subscription if global limit exceeded */ if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { @@ -177,39 +157,38 @@ static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, } /* Initialize subscription object */ + if (filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) + goto err; + if (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper)) + goto err; sub->server = srv; sub->conid = conid; sub->inactive = false; - if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || - (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { - pr_warn("Subscription rejected, illegal request\n"); - kfree(sub); - return NULL; - } - - sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); return sub; +err: + pr_warn("Subscription rejected, illegal request\n"); + kfree(sub); + return NULL; } struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, - int conid, bool swap, - bool status) + int conid) { struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(srv, s, conid, swap); + sub = tipc_subscrp_create(srv, s, conid); if (!sub) return NULL; - tipc_nametbl_subscribe(sub, status); + tipc_nametbl_subscribe(sub); timer_setup(&sub->timer, tipc_subscrp_timeout, 0); - timeout = htohl(sub->evt.s.timeout, swap); + timeout = tipc_sub_read(&sub->evt.s, timeout); if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); return sub; diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 64ffd32..db80e41 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -52,7 +52,6 @@ struct tipc_conn; * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list * @subscrp_list: adjacent subscriptions in subscriber's subscription list - * @swap: indicates if subscriber uses opposite endianness in its messages * @evt: template for events generated by subscription */ struct tipc_subscription { @@ -63,28 +62,47 @@ struct tipc_subscription { struct list_head subscrp_list; struct tipc_event evt; int conid; - bool swap; bool inactive; spinlock_t lock; /* serialize up/down and timer events */ }; struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, struct tipc_subscr *s, - int conid, bool swap, - bool status); + int conid); void tipc_sub_delete(struct tipc_subscription *sub); + int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, u32 event, - u32 port_ref, u32 node, u32 scope, int must); -void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, - struct tipc_name_seq *out); -u32 tipc_subscrp_convert_seq_type(u32 type, int swap); + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must); int tipc_topsrv_start(struct net *net); void tipc_topsrv_stop(struct net *net); void tipc_subscrp_put(struct tipc_subscription *subscription); void tipc_subscrp_get(struct tipc_subscription *subscription); +#define TIPC_FILTER_MASK (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | TIPC_SUB_CANCEL) + +/* tipc_sub_read - return field_ of struct sub_ in host endian format + */ +#define tipc_sub_read(sub_, field_) \ + ({ \ + struct tipc_subscr *sub__ = sub_; \ + u32 val__ = (sub__)->field_; \ + int swap_ = !((sub__)->filter & TIPC_FILTER_MASK); \ + (swap_ ? swab32(val__) : val__); \ + }) + +/* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format + */ +#define tipc_evt_write(evt_, field_, val_) \ + ({ \ + struct tipc_event *evt__ = evt_; \ + u32 val__ = val_; \ + int swap_ = !((evt__)->s.filter & (TIPC_FILTER_MASK)); \ + (evt__)->field_ = swap_ ? swab32(val__) : val__; \ + }) + #endif From patchwork Thu Feb 15 09:40:47 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873722 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="QRn474T2"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrv81b7rz9t2f for ; Thu, 15 Feb 2018 20:43:48 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755250AbeBOJnp (ORCPT ); Thu, 15 Feb 2018 04:43:45 -0500 Received: from sesbmg23.ericsson.net ([193.180.251.37]:42786 "EHLO sesbmg23.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755186AbeBOJnn (ORCPT ); Thu, 15 Feb 2018 04:43:43 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687822; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=G0mxhu8p41ZwK2Dfn278tnEEzPbj0VDJhkOwjMB6eh0=; b=QRn474T20r8vPlJM4zFFhfjNXNtb3vuok2N1O+A7vjIDjV2NwZwokHpmr0MZvzZ/ QT8O0Jy60zH/wJJLd5LpqxNNZKs0u+6EgiKzgt7a1eFY1pPV6PY5BcyQt0xHGXYL AD6PGDAaTAQUNiFVyYQjAV9v8HFEaZO3OHOY0qPlpaI=; X-AuditID: c1b4fb25-859119c00000341b-ee-5a85564da7c2 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sesbmg23.ericsson.net (Symantec Mail Security) with SMTP id 77.E1.13339.D46558A5; Thu, 15 Feb 2018 10:43:42 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:43:40 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 06/10] tipc: collapse subscription creation functions Date: Thu, 15 Feb 2018 10:40:47 +0100 Message-ID: <1518687651-26561-7-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrNLMWRmVeSWpSXmKPExsUyM2K7sa5fWGuUwd7frBY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlTHx2QKmgi06FVPeL2RpYJyn0sXIySEhYCKxYOd+5i5GLg4hgcOMEvNO f2IFSQgJbGeU2LOECcRmE9CQeDmtgxHEFhEwlni1spMJpIFZ4BOjRMvKZ2wgCWEBD4mJa5cB TeLgYBFQldi+XALE5BVwk2hZJAuxS07i/PGfzCA2p4C7xKqWf1Cr3CReL3kDtopXQFDi5Mwn LCA2s4CExMEXL5ghapQl5n6YxgQxR0Hi28xupgmMArOQtMxC0rKAkWkVo2hxanFSbrqRsV5q UWZycXF+nl5easkmRmAQH9zyW3UH4+U3jocYBTgYlXh4J4e2RgmxJpYVV+YeYpTgYFYS4b1t BxTiTUmsrEotyo8vKs1JLT7EKM3BoiTOe9KTN0pIID2xJDU7NbUgtQgmy8TBKdXAKJgWKjJF P68jNmle0kHbJ6+LN1+5vLElKjjvQ2lc+pLXbTtblc13LnGzMMjwX5kxTSibuTnq9K7q9dW7 pvIpdFv8c3/Bdv7MxZT8qL6z4cum2r5pirHaKFb4+c9f1s7DuxvdPObtWXNiP+Nzjg7TfSGL p2+R0+kx9hePOzE7a+62xea+39+fVmIpzkg01GIuKk4EAPXVutteAgAA Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org After the previous changes it becomes logical to collapse the two-level creation of subscription instances into one. We do that here. We also rename the creation and deletion functions for more consistency. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/server.c | 4 ++-- net/tipc/server.h | 1 + net/tipc/subscr.c | 46 ++++++++++++---------------------------------- net/tipc/subscr.h | 14 +++++++------- 4 files changed, 22 insertions(+), 43 deletions(-) diff --git a/net/tipc/server.c b/net/tipc/server.c index 5d231fa..6a18b10 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -203,7 +203,7 @@ static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) spin_lock_bh(&con->sub_lock); list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) - tipc_sub_delete(sub); + tipc_sub_unsubscribe(sub); else if (s) break; } @@ -278,7 +278,7 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, tipc_con_delete_sub(con, s); return 0; } - sub = tipc_subscrp_subscribe(srv, s, con->conid); + sub = tipc_sub_subscribe(srv, s, con->conid); if (!sub) return -1; diff --git a/net/tipc/server.h b/net/tipc/server.h index 2de8709..995b795 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -2,6 +2,7 @@ * net/tipc/server.h: Include file for TIPC server code * * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 406b09f..8d37b61 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -134,33 +134,29 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, - struct tipc_subscr *s, - int conid) +struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, + struct tipc_subscr *s, + int conid) { struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; u32 filter = tipc_sub_read(s, filter); + u32 timeout; - /* Refuse subscription if global limit exceeded */ - if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { - pr_warn("Subscription rejected, limit reached (%u)\n", - TIPC_MAX_SUBSCRIPTIONS); + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { + pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); + return NULL; + } + if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) || + (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) { + pr_warn("Subscription rejected, illegal request\n"); return NULL; } - - /* Allocate subscription object */ sub = kmalloc(sizeof(*sub), GFP_ATOMIC); if (!sub) { pr_warn("Subscription rejected, no memory\n"); return NULL; } - - /* Initialize subscription object */ - if (filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) - goto err; - if (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper)) - goto err; sub->server = srv; sub->conid = conid; sub->inactive = false; @@ -168,24 +164,6 @@ static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv, spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); - return sub; -err: - pr_warn("Subscription rejected, illegal request\n"); - kfree(sub); - return NULL; -} - -struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, - struct tipc_subscr *s, - int conid) -{ - struct tipc_subscription *sub = NULL; - u32 timeout; - - sub = tipc_subscrp_create(srv, s, conid); - if (!sub) - return NULL; - tipc_nametbl_subscribe(sub); timer_setup(&sub->timer, tipc_subscrp_timeout, 0); timeout = tipc_sub_read(&sub->evt.s, timeout); @@ -194,7 +172,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, return sub; } -void tipc_sub_delete(struct tipc_subscription *sub) +void tipc_sub_unsubscribe(struct tipc_subscription *sub) { tipc_nametbl_unsubscribe(sub); if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index db80e41..2d35f10 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -1,7 +1,7 @@ /* * net/tipc/subscr.h: Include file for TIPC network topology service * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2017, Ericsson AB * Copyright (c) 2005-2007, 2012-2013, Wind River Systems * All rights reserved. * @@ -39,8 +39,8 @@ #include "server.h" -#define TIPC_MAX_SUBSCRIPTIONS 65535 -#define TIPC_MAX_PUBLICATIONS 65535 +#define TIPC_MAX_SUBSCR 65535 +#define TIPC_MAX_PUBLICATIONS 65535 struct tipc_subscription; struct tipc_conn; @@ -66,10 +66,10 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv, - struct tipc_subscr *s, - int conid); -void tipc_sub_delete(struct tipc_subscription *sub); +struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, + struct tipc_subscr *s, + int conid); +void tipc_sub_unsubscribe(struct tipc_subscription *sub); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); From patchwork Thu Feb 15 09:40:48 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873723 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="VhmbE4y0"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrvg6Bpdz9t2x for ; Thu, 15 Feb 2018 20:44:15 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755280AbeBOJoM (ORCPT ); Thu, 15 Feb 2018 04:44:12 -0500 Received: from sesbmg22.ericsson.net ([193.180.251.48]:50858 "EHLO sesbmg22.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755248AbeBOJoK (ORCPT ); Thu, 15 Feb 2018 04:44:10 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687849; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=I5YjGjhQDcSq0RsKg2TzF9KUnK13P6bs3YNE9yZs35o=; b=VhmbE4y04HOQAs6xlo+n1NQhIgvzn/Vh9dqCtwLDE7Nm9pYCZRwPTJKK+QLiPZdP Up1/6NlEYZx96wml9FX5C1eNgLET/ZGQU2+k+dYLpxXeiUw1Y7R1cYsatZZ7rKWV 99YSOUeRQujTfysycRHqSwKBpOfy1POPuaRtBwVgGf8=; X-AuditID: c1b4fb30-799639c000004778-d3-5a855669cfdc Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sesbmg22.ericsson.net (Symantec Mail Security) with SMTP id 7A.D8.18296.966558A5; Thu, 15 Feb 2018 10:44:09 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:44:07 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 07/10] tipc: some prefix changes Date: Thu, 15 Feb 2018 10:40:48 +0100 Message-ID: <1518687651-26561-8-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrLLMWRmVeSWpSXmKPExsUyM2K7sW5mWGuUwZobohY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlfHgiUvBzYiKD83dbA2MNz26GDk5JARMJLYcOcTSxcjFISRwmFFixZLD jBDOdkaJh50TWUCq2AQ0JF5O62AEsUUEjCVerexkAiliFvjEKNGy8hkbSEIYKHFu+09WEJtF QFVi1a9DYA28Am4Sn2a+YIZYJydx/vhPMJtTwF1iVcs/sHohoJrXS94wQdQLSpyc+QRsMbOA hMTBFxC9QgLKEnM/TGOCmKMg8W1mN9MERoFZSFpmIWlZwMi0ilG0OLU4KTfdyEgvtSgzubg4 P08vL7VkEyMwkA9u+W2wg/Hlc8dDjAIcjEo8vNYhrVFCrIllxZW5hxglOJiVRHhv2wGFeFMS K6tSi/Lji0pzUosPMUpzsCiJ85705I0SEkhPLEnNTk0tSC2CyTJxcEo1MKZ4rEw3XsJiPfXP G5OS67/vKvO0deq4Pylle3bdzkvSTFp8e/b1qG06c3ZuePuev+DZ9FSnT6+4NjVLLfd3FMlN VvgSlvnOsMt77yKl/SkO9tZLyv+dufq/Mis/vad2hkPXHo/Zj+fPZ4rIWlqxrfiCf4jozJmv tdzcvJWOTXki4yQsJL9nuhJLcUaioRZzUXEiAFp2rF5gAgAA Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org Since we now have removed struct tipc_subscriber from the code, and only struct tipc_subscription remains, there is no longer need for long and awkward prefixes to distinguish between their pertaining functions. We now change all tipc_subscrp_* prefixes to tipc_sub_*. This is a purely cosmetic change. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/name_table.c | 33 ++++++++++++++++---------------- net/tipc/server.c | 5 ++--- net/tipc/subscr.c | 52 +++++++++++++++++++++++++-------------------------- net/tipc/subscr.h | 20 ++++++++++---------- 4 files changed, 54 insertions(+), 56 deletions(-) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 2fbd0a2..b234b7e 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -326,10 +326,10 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net, /* Any subscriptions waiting for notification? */ list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_subscrp_report_overlap(s, publ->lower, publ->upper, - TIPC_PUBLISHED, publ->ref, - publ->node, publ->scope, - created_subseq); + tipc_sub_report_overlap(s, publ->lower, publ->upper, + TIPC_PUBLISHED, publ->ref, + publ->node, publ->scope, + created_subseq); } return publ; } @@ -397,10 +397,9 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net, /* Notify any waiting subscriptions */ list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { - tipc_subscrp_report_overlap(s, publ->lower, publ->upper, - TIPC_WITHDRAWN, publ->ref, - publ->node, publ->scope, - removed_subseq); + tipc_sub_report_overlap(s, publ->lower, publ->upper, + TIPC_WITHDRAWN, publ->ref, publ->node, + publ->scope, removed_subseq); } return publ; @@ -424,25 +423,25 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, ns.upper = tipc_sub_read(s, seq.upper); no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS; - tipc_subscrp_get(sub); + tipc_sub_get(sub); list_add(&sub->nameseq_list, &nseq->subscriptions); if (no_status || !sseq) return; while (sseq != &nseq->sseqs[nseq->first_free]) { - if (tipc_subscrp_check_overlap(&ns, sseq->lower, sseq->upper)) { + if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) { struct publication *crs; struct name_info *info = sseq->info; int must_report = 1; list_for_each_entry(crs, &info->zone_list, zone_list) { - tipc_subscrp_report_overlap(sub, sseq->lower, - sseq->upper, - TIPC_PUBLISHED, - crs->ref, crs->node, - crs->scope, - must_report); + tipc_sub_report_overlap(sub, sseq->lower, + sseq->upper, + TIPC_PUBLISHED, + crs->ref, crs->node, + crs->scope, + must_report); must_report = 0; } } @@ -856,7 +855,7 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&sub->nameseq_list); - tipc_subscrp_put(sub); + tipc_sub_put(sub); if (!seq->first_free && list_empty(&seq->subscriptions)) { hlist_del_init_rcu(&seq->ns_list); kfree(seq->sseqs); diff --git a/net/tipc/server.c b/net/tipc/server.c index 6a18b10..a5c112e 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -201,7 +201,7 @@ static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) struct tipc_subscription *sub, *tmp; spin_lock_bh(&con->sub_lock); - list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { + list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) tipc_sub_unsubscribe(sub); else if (s) @@ -281,9 +281,8 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, sub = tipc_sub_subscribe(srv, s, con->conid); if (!sub) return -1; - spin_lock_bh(&con->sub_lock); - list_add(&sub->subscrp_list, &con->sub_list); + list_add(&sub->sub_list, &con->sub_list); spin_unlock_bh(&con->sub_lock); return 0; } diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 8d37b61..3be1e4b 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -38,9 +38,9 @@ #include "name_table.h" #include "subscr.h" -static void tipc_subscrp_send_event(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, - u32 event, u32 port, u32 node) +static void tipc_sub_send_event(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node) { struct tipc_event *evt = &sub->evt; @@ -55,13 +55,13 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, } /** - * tipc_subscrp_check_overlap - test for subscription overlap with the + * tipc_sub_check_overlap - test for subscription overlap with the * given values * * Returns 1 if there is overlap, otherwise 0. */ -int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, - u32 found_upper) +int tipc_sub_check_overlap(struct tipc_name_seq *seq, u32 found_lower, + u32 found_upper) { if (found_lower < seq->lower) found_lower = seq->lower; @@ -72,20 +72,20 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, return 1; } -void tipc_subscrp_report_overlap(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, - u32 event, u32 port, u32 node, - u32 scope, int must) +void tipc_sub_report_overlap(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must) { - struct tipc_name_seq seq; struct tipc_subscr *s = &sub->evt.s; u32 filter = tipc_sub_read(s, filter); + struct tipc_name_seq seq; seq.type = tipc_sub_read(s, seq.type); seq.lower = tipc_sub_read(s, seq.lower); seq.upper = tipc_sub_read(s, seq.upper); - if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) + if (!tipc_sub_check_overlap(&seq, found_lower, found_upper)) return; if (!must && !(filter & TIPC_SUB_PORTS)) @@ -95,24 +95,24 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) return; spin_lock(&sub->lock); - tipc_subscrp_send_event(sub, found_lower, found_upper, - event, port, node); + tipc_sub_send_event(sub, found_lower, found_upper, + event, port, node); spin_unlock(&sub->lock); } -static void tipc_subscrp_timeout(struct timer_list *t) +static void tipc_sub_timeout(struct timer_list *t) { struct tipc_subscription *sub = from_timer(sub, t, timer); struct tipc_subscr *s = &sub->evt.s; spin_lock(&sub->lock); - tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper, - TIPC_SUBSCR_TIMEOUT, 0, 0); + tipc_sub_send_event(sub, s->seq.lower, s->seq.upper, + TIPC_SUBSCR_TIMEOUT, 0, 0); sub->inactive = true; spin_unlock(&sub->lock); } -static void tipc_subscrp_kref_release(struct kref *kref) +static void tipc_sub_kref_release(struct kref *kref) { struct tipc_subscription *sub; struct tipc_net *tn; @@ -124,12 +124,12 @@ static void tipc_subscrp_kref_release(struct kref *kref) kfree(sub); } -void tipc_subscrp_put(struct tipc_subscription *subscription) +void tipc_sub_put(struct tipc_subscription *subscription) { - kref_put(&subscription->kref, tipc_subscrp_kref_release); + kref_put(&subscription->kref, tipc_sub_kref_release); } -void tipc_subscrp_get(struct tipc_subscription *subscription) +void tipc_sub_get(struct tipc_subscription *subscription) { kref_get(&subscription->kref); } @@ -139,8 +139,8 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, int conid) { struct tipc_net *tn = tipc_net(srv->net); - struct tipc_subscription *sub; u32 filter = tipc_sub_read(s, filter); + struct tipc_subscription *sub; u32 timeout; if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { @@ -165,7 +165,7 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, atomic_inc(&tn->subscription_count); kref_init(&sub->kref); tipc_nametbl_subscribe(sub); - timer_setup(&sub->timer, tipc_subscrp_timeout, 0); + timer_setup(&sub->timer, tipc_sub_timeout, 0); timeout = tipc_sub_read(&sub->evt.s, timeout); if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); @@ -177,16 +177,16 @@ void tipc_sub_unsubscribe(struct tipc_subscription *sub) tipc_nametbl_unsubscribe(sub); if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) del_timer_sync(&sub->timer); - list_del(&sub->subscrp_list); - tipc_subscrp_put(sub); + list_del(&sub->sub_list); + tipc_sub_put(sub); } int tipc_topsrv_start(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); const char name[] = "topology_server"; - struct tipc_server *topsrv; struct sockaddr_tipc *saddr; + struct tipc_server *topsrv; saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); if (!saddr) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 2d35f10..7209328 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -51,7 +51,7 @@ struct tipc_conn; * @seq: name sequence associated with subscription * @timer: timer governing subscription duration (optional) * @nameseq_list: adjacent subscriptions in name sequence's subscription list - * @subscrp_list: adjacent subscriptions in subscriber's subscription list + * @sub_list: adjacent subscriptions in subscriber's subscription list * @evt: template for events generated by subscription */ struct tipc_subscription { @@ -59,7 +59,7 @@ struct tipc_subscription { struct tipc_server *server; struct timer_list timer; struct list_head nameseq_list; - struct list_head subscrp_list; + struct list_head sub_list; struct tipc_event evt; int conid; bool inactive; @@ -71,17 +71,17 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, int conid); void tipc_sub_unsubscribe(struct tipc_subscription *sub); -int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, - u32 found_upper); -void tipc_subscrp_report_overlap(struct tipc_subscription *sub, - u32 found_lower, u32 found_upper, - u32 event, u32 port, u32 node, - u32 scope, int must); +int tipc_sub_check_overlap(struct tipc_name_seq *seq, u32 found_lower, + u32 found_upper); +void tipc_sub_report_overlap(struct tipc_subscription *sub, + u32 found_lower, u32 found_upper, + u32 event, u32 port, u32 node, + u32 scope, int must); int tipc_topsrv_start(struct net *net); void tipc_topsrv_stop(struct net *net); -void tipc_subscrp_put(struct tipc_subscription *subscription); -void tipc_subscrp_get(struct tipc_subscription *subscription); +void tipc_sub_put(struct tipc_subscription *subscription); +void tipc_sub_get(struct tipc_subscription *subscription); #define TIPC_FILTER_MASK (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | TIPC_SUB_CANCEL) From patchwork Thu Feb 15 09:40:49 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873724 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="bU2g+biv"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrvj5qKvz9t2f for ; Thu, 15 Feb 2018 20:44:17 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755304AbeBOJoO (ORCPT ); Thu, 15 Feb 2018 04:44:14 -0500 Received: from sesbmg22.ericsson.net ([193.180.251.48]:50858 "EHLO sesbmg22.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755186AbeBOJoM (ORCPT ); Thu, 15 Feb 2018 04:44:12 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687849; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=RS/GuIb706Tiatkqt1s2qbWnIfTgkC2jax40gv6EpqY=; b=bU2g+bivYrSVhhwYcajmaVAlb3kgZF2m0QbVWVwQdPg6RvjXEHLqosi92bVgjr4G eIgtFVJl4GO4bKcqmUEBLpsrN4K2R4H7vY1mNtQHBHSp1jc1bwAehIiyivpxrjKc LQJAn4jGm+DYeyP+N2kdfwJTkZ43SgKOiqmabm93DWg=; X-AuditID: c1b4fb30-799639c000004778-d6-5a855669cd37 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sesbmg22.ericsson.net (Symantec Mail Security) with SMTP id 1B.D8.18296.966558A5; Thu, 15 Feb 2018 10:44:09 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:44:08 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 08/10] tipc: make struct tipc_server private for server.c Date: Thu, 15 Feb 2018 10:40:49 +0100 Message-ID: <1518687651-26561-9-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrHLMWRmVeSWpSXmKPExsUyM2K7sW5mWGuUwcmZyhY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlfGwq5ex4EBpxYyXHUwNjB8Suhg5OSQETCQeNjxg7GLk4hASOMwo0Tp3 OiuEs51RYsO27awgVWwCGhIvp3UwgtgiAsYSr1Z2MoEUMQt8YpRoWfmMDSQhLOAjsWLvA2YQ m0VAVWLisW6wBl4BN4nH5zpYINbJSZw//hOshlPAXWJVyz+wBUJANa+XvGGCqBeUODnzCVg9 s4CExMEXL5ghapQl5n6YxgQxR0Hi28xupgmMArOQtMxC0rKAkWkVo2hxanFSbrqRkV5qUWZy cXF+nl5easkmRmAoH9zy22AH48vnjocYBTgYlXh4rUNao4RYE8uKK3MPMUpwMCuJ8N62Awrx piRWVqUW5ccXleakFh9ilOZgURLnPenJGyUkkJ5YkpqdmlqQWgSTZeLglGpgnCJ6KvLpjaUv /K6vkJK0ZT52TmWJ8Y4oj8qjgUypc9U7Oe+kvSut+JfO2b68JsUjXNSXb+UdFvWl0+qFvikv bJZPN3ff36ja9qhm36dJF08k/f8ql/Wj0el1Q0jYafc5DKZT5pZf9c6P5+OYfIIhJnXuvhOa Fy/w9Ao9yrb6d7l2UgGjwZsjSizFGYmGWsxFxYkAGDkSF2ECAAA= Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org In order to narrow the interface and dependencies between the topology server and the subscription/binding table functionality we move struct tipc_server inside the file server.c. This requires some code adaptations in other files, but those are mostly minor. The most important change is that we have to move the start/stop functions for the topology server to server.c, where they logically belong anyway. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/name_table.c | 10 ++-- net/tipc/server.c | 124 ++++++++++++++++++++++++++++++++++++++++---------- net/tipc/server.h | 36 +-------------- net/tipc/subscr.c | 64 ++------------------------ net/tipc/subscr.h | 4 +- 5 files changed, 110 insertions(+), 128 deletions(-) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index b234b7e..e01c9c6 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -813,8 +813,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, */ void tipc_nametbl_subscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = sub->server; - struct tipc_net *tn = tipc_net(srv->net); + struct tipc_net *tn = tipc_net(sub->net); struct tipc_subscr *s = &sub->evt.s; u32 type = tipc_sub_read(s, seq.type); int index = hash(type); @@ -822,7 +821,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) struct tipc_name_seq ns; spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(srv->net, type); + seq = nametbl_find_seq(sub->net, type); if (!seq) seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); if (seq) { @@ -844,14 +843,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) */ void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) { - struct tipc_server *srv = sub->server; struct tipc_subscr *s = &sub->evt.s; - struct tipc_net *tn = tipc_net(srv->net); + struct tipc_net *tn = tipc_net(sub->net); struct name_seq *seq; u32 type = tipc_sub_read(s, seq.type); spin_lock_bh(&tn->nametbl_lock); - seq = nametbl_find_seq(srv->net, type); + seq = nametbl_find_seq(sub->net, type); if (seq != NULL) { spin_lock_bh(&seq->lock); list_del_init(&sub->nameseq_list); diff --git a/net/tipc/server.c b/net/tipc/server.c index a5c112e..0abbdd6 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -49,7 +49,37 @@ #define CF_CONNECTED 1 #define CF_SERVER 2 -#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) +#define TIPC_SERVER_NAME_LEN 32 + +/** + * struct tipc_server - TIPC server structure + * @conn_idr: identifier set of connection + * @idr_lock: protect the connection identifier set + * @idr_in_use: amount of allocated identifier entry + * @net: network namspace instance + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @max_rcvbuf_size: maximum permitted receive message length + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_release: callback will be called before releasing the connection + * @tipc_conn_recvmsg: callback will be called when message arrives + * @saddr: TIPC server address + * @name: server name + * @imp: message importance + * @type: socket type + */ +struct tipc_server { + struct idr conn_idr; + spinlock_t idr_lock; /* for idr list */ + int idr_in_use; + struct net *net; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + int max_rcvbuf_size; + struct sockaddr_tipc *saddr; + char name[TIPC_SERVER_NAME_LEN]; +}; /** * struct tipc_conn - TIPC connection structure @@ -93,6 +123,11 @@ static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); static void tipc_clean_outqueues(struct tipc_conn *con); +static struct tipc_conn *sock2con(struct sock *sk) +{ + return sk->sk_user_data; +} + static bool connected(struct tipc_conn *con) { return con && test_bit(CF_CONNECTED, &con->flags); @@ -198,14 +233,17 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) { struct list_head *sub_list = &con->sub_list; + struct tipc_net *tn = tipc_net(con->server->net); struct tipc_subscription *sub, *tmp; spin_lock_bh(&con->sub_lock); list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { - if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { tipc_sub_unsubscribe(sub); - else if (s) + atomic_dec(&tn->subscription_count); + } else if (s) { break; + } } spin_unlock_bh(&con->sub_lock); } @@ -220,8 +258,7 @@ static void tipc_close_conn(struct tipc_conn *con) if (disconnect) { sk->sk_user_data = NULL; - if (con->conid) - tipc_con_delete_sub(con, NULL); + tipc_con_delete_sub(con, NULL); } write_unlock_bh(&sk->sk_callback_lock); @@ -272,15 +309,21 @@ static int tipc_con_rcv_sub(struct tipc_server *srv, struct tipc_conn *con, struct tipc_subscr *s) { + struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { tipc_con_delete_sub(con, s); return 0; } - sub = tipc_sub_subscribe(srv, s, con->conid); + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { + pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); + return -1; + } + sub = tipc_sub_subscribe(srv->net, s, con->conid); if (!sub) return -1; + atomic_inc(&tn->subscription_count); spin_lock_bh(&con->sub_lock); list_add(&sub->sub_list, &con->sub_list); spin_unlock_bh(&con->sub_lock); @@ -426,13 +469,14 @@ static void tipc_clean_outqueues(struct tipc_conn *con) /* tipc_conn_queue_evt - interrupt level call from a subscription instance * The queued job is launched in tipc_send_to_sock() */ -void tipc_conn_queue_evt(struct tipc_server *s, int conid, +void tipc_conn_queue_evt(struct net *net, int conid, u32 event, struct tipc_event *evt) { + struct tipc_server *srv = tipc_topsrv(net); struct outqueue_entry *e; struct tipc_conn *con; - con = tipc_conn_lookup(s, conid); + con = tipc_conn_lookup(srv, conid); if (!con) return; @@ -448,7 +492,7 @@ void tipc_conn_queue_evt(struct tipc_server *s, int conid, list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (queue_work(s->send_wq, &con->swork)) + if (queue_work(srv->send_wq, &con->swork)) return; err: conn_put(con); @@ -620,41 +664,71 @@ static int tipc_work_start(struct tipc_server *s) return 0; } -int tipc_server_start(struct tipc_server *s) +int tipc_topsrv_start(struct net *net) { + struct tipc_net *tn = tipc_net(net); + const char name[] = "topology_server"; + struct sockaddr_tipc *saddr; + struct tipc_server *srv; int ret; - spin_lock_init(&s->idr_lock); - idr_init(&s->conn_idr); - s->idr_in_use = 0; + saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); + if (!saddr) + return -ENOMEM; + saddr->family = AF_TIPC; + saddr->addrtype = TIPC_ADDR_NAMESEQ; + saddr->addr.nameseq.type = TIPC_TOP_SRV; + saddr->addr.nameseq.lower = TIPC_TOP_SRV; + saddr->addr.nameseq.upper = TIPC_TOP_SRV; + saddr->scope = TIPC_NODE_SCOPE; + + srv = kzalloc(sizeof(*srv), GFP_ATOMIC); + if (!srv) { + kfree(saddr); + return -ENOMEM; + } + srv->net = net; + srv->saddr = saddr; + srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + + strncpy(srv->name, name, strlen(name) + 1); + tn->topsrv = srv; + atomic_set(&tn->subscription_count, 0); + + spin_lock_init(&srv->idr_lock); + idr_init(&srv->conn_idr); + srv->idr_in_use = 0; - ret = tipc_work_start(s); + ret = tipc_work_start(srv); if (ret < 0) return ret; - ret = tipc_open_listening_sock(s); + ret = tipc_open_listening_sock(srv); if (ret < 0) - tipc_work_stop(s); + tipc_work_stop(srv); return ret; } -void tipc_server_stop(struct tipc_server *s) +void tipc_topsrv_stop(struct net *net) { + struct tipc_server *srv = tipc_topsrv(net); struct tipc_conn *con; int id; - spin_lock_bh(&s->idr_lock); - for (id = 0; s->idr_in_use; id++) { - con = idr_find(&s->conn_idr, id); + spin_lock_bh(&srv->idr_lock); + for (id = 0; srv->idr_in_use; id++) { + con = idr_find(&srv->conn_idr, id); if (con) { - spin_unlock_bh(&s->idr_lock); + spin_unlock_bh(&srv->idr_lock); tipc_close_conn(con); - spin_lock_bh(&s->idr_lock); + spin_lock_bh(&srv->idr_lock); } } - spin_unlock_bh(&s->idr_lock); + spin_unlock_bh(&srv->idr_lock); - tipc_work_stop(s); - idr_destroy(&s->conn_idr); + tipc_work_stop(srv); + idr_destroy(&srv->conn_idr); + kfree(srv->saddr); + kfree(srv); } diff --git a/net/tipc/server.h b/net/tipc/server.h index 995b795..ce93b6d 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -47,45 +47,11 @@ #define TIPC_SUB_NODE_SCOPE 0x40 #define TIPC_SUB_NO_STATUS 0x80 -/** - * struct tipc_server - TIPC server structure - * @conn_idr: identifier set of connection - * @idr_lock: protect the connection identifier set - * @idr_in_use: amount of allocated identifier entry - * @net: network namspace instance - * @rcvbuf_cache: memory cache of server receive buffer - * @rcv_wq: receive workqueue - * @send_wq: send workqueue - * @max_rcvbuf_size: maximum permitted receive message length - * @tipc_conn_new: callback will be called when new connection is incoming - * @tipc_conn_release: callback will be called before releasing the connection - * @tipc_conn_recvmsg: callback will be called when message arrives - * @saddr: TIPC server address - * @name: server name - * @imp: message importance - * @type: socket type - */ -struct tipc_server { - struct idr conn_idr; - spinlock_t idr_lock; - int idr_in_use; - struct net *net; - struct workqueue_struct *rcv_wq; - struct workqueue_struct *send_wq; - int max_rcvbuf_size; - struct sockaddr_tipc *saddr; - char name[TIPC_SERVER_NAME_LEN]; -}; - -void tipc_conn_queue_evt(struct tipc_server *s, int conid, +void tipc_conn_queue_evt(struct net *net, int conid, u32 event, struct tipc_event *evt); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid); -int tipc_server_start(struct tipc_server *s); - -void tipc_server_stop(struct tipc_server *s); - #endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 3be1e4b..c814656 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -51,7 +51,7 @@ static void tipc_sub_send_event(struct tipc_subscription *sub, tipc_evt_write(evt, found_upper, found_upper); tipc_evt_write(evt, port.ref, port); tipc_evt_write(evt, port.node, node); - tipc_conn_queue_evt(sub->server, sub->conid, event, evt); + tipc_conn_queue_evt(sub->net, sub->conid, event, evt); } /** @@ -114,14 +114,7 @@ static void tipc_sub_timeout(struct timer_list *t) static void tipc_sub_kref_release(struct kref *kref) { - struct tipc_subscription *sub; - struct tipc_net *tn; - - sub = container_of(kref, struct tipc_subscription, kref); - tn = tipc_net(sub->server->net); - - atomic_dec(&tn->subscription_count); - kfree(sub); + kfree(container_of(kref, struct tipc_subscription, kref)); } void tipc_sub_put(struct tipc_subscription *subscription) @@ -134,19 +127,14 @@ void tipc_sub_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, +struct tipc_subscription *tipc_sub_subscribe(struct net *net, struct tipc_subscr *s, int conid) { - struct tipc_net *tn = tipc_net(srv->net); u32 filter = tipc_sub_read(s, filter); struct tipc_subscription *sub; u32 timeout; - if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { - pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); - return NULL; - } if ((filter & TIPC_SUB_PORTS && filter & TIPC_SUB_SERVICE) || (tipc_sub_read(s, seq.lower) > tipc_sub_read(s, seq.upper))) { pr_warn("Subscription rejected, illegal request\n"); @@ -157,12 +145,11 @@ struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, pr_warn("Subscription rejected, no memory\n"); return NULL; } - sub->server = srv; + sub->net = net; sub->conid = conid; sub->inactive = false; memcpy(&sub->evt.s, s, sizeof(*s)); spin_lock_init(&sub->lock); - atomic_inc(&tn->subscription_count); kref_init(&sub->kref); tipc_nametbl_subscribe(sub); timer_setup(&sub->timer, tipc_sub_timeout, 0); @@ -180,46 +167,3 @@ void tipc_sub_unsubscribe(struct tipc_subscription *sub) list_del(&sub->sub_list); tipc_sub_put(sub); } - -int tipc_topsrv_start(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - const char name[] = "topology_server"; - struct sockaddr_tipc *saddr; - struct tipc_server *topsrv; - - saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); - if (!saddr) - return -ENOMEM; - saddr->family = AF_TIPC; - saddr->addrtype = TIPC_ADDR_NAMESEQ; - saddr->addr.nameseq.type = TIPC_TOP_SRV; - saddr->addr.nameseq.lower = TIPC_TOP_SRV; - saddr->addr.nameseq.upper = TIPC_TOP_SRV; - saddr->scope = TIPC_NODE_SCOPE; - - topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC); - if (!topsrv) { - kfree(saddr); - return -ENOMEM; - } - topsrv->net = net; - topsrv->saddr = saddr; - topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr); - - strncpy(topsrv->name, name, strlen(name) + 1); - tn->topsrv = topsrv; - atomic_set(&tn->subscription_count, 0); - - return tipc_server_start(topsrv); -} - -void tipc_topsrv_stop(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_server *topsrv = tn->topsrv; - - tipc_server_stop(topsrv); - kfree(topsrv->saddr); - kfree(topsrv); -} diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 7209328..82ba61a 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -56,7 +56,7 @@ struct tipc_conn; */ struct tipc_subscription { struct kref kref; - struct tipc_server *server; + struct net *net; struct timer_list timer; struct list_head nameseq_list; struct list_head sub_list; @@ -66,7 +66,7 @@ struct tipc_subscription { spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscription *tipc_sub_subscribe(struct tipc_server *srv, +struct tipc_subscription *tipc_sub_subscribe(struct net *net, struct tipc_subscr *s, int conid); void tipc_sub_unsubscribe(struct tipc_subscription *sub); From patchwork Thu Feb 15 09:40:50 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873725 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="WtrnLi73"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrvm6pywz9t2f for ; Thu, 15 Feb 2018 20:44:20 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755309AbeBOJoS (ORCPT ); Thu, 15 Feb 2018 04:44:18 -0500 Received: from sesbmg22.ericsson.net ([193.180.251.48]:50858 "EHLO sesbmg22.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755248AbeBOJoO (ORCPT ); Thu, 15 Feb 2018 04:44:14 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687849; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=rJE2UgJ5w/uM+VNHTsKftrZ4T/uKy7tEx+Q25NdR+6s=; b=WtrnLi73lyVJtitOxCatqACguFrHrvp/Sie52Fvoq8o0s4AEkW9JHjadjo5dtAlp T78b0oGQ0lRNDgInpz+gZ2sk1jVbSjXHjK/YkBu/jCDTKVmJjeXhYnrEiAn+oIzi 6ZQ3CNm2QuVllOLTvOQFmGINIGIZ3sIpO886HRbJtI0=; X-AuditID: c1b4fb30-799639c000004778-d8-5a855669bf88 Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sesbmg22.ericsson.net (Symantec Mail Security) with SMTP id EB.D8.18296.966558A5; Thu, 15 Feb 2018 10:44:09 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:44:08 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 09/10] tipc: separate topology server listener socket from subcsriber sockets Date: Thu, 15 Feb 2018 10:40:50 +0100 Message-ID: <1518687651-26561-10-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFjrLLMWRmVeSWpSXmKPExsUyM2K7sW5mWGuUwan5hhY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlfF342+mgrelFQefb2VrYNwR38XIySEhYCJxePs3dhBbSOAwo8T3b5Vd jFxA9nZGiQ3btrOCJNgENCReTutgBLFFBIwlXq3sZAIpYhb4xCjRsvIZG0hCWCBB4vWS9Uwg NouAqsS9HTPApvIKuEvM+X6HGWKbnMT54z/BbE6g+KqWf6wQm92Aet8wQdQLSpyc+YQFxGYW kJA4+OIFM0SNssTcD9OYIOYoSHyb2c00gVFgFpKWWUhaFjAyrWIULU4tTspNNzLSSy3KTC4u zs/Ty0st2cQIDOSDW34b7GB8+dzxEKMAB6MSD691SGuUEGtiWXFl7iFGCQ5mJRHe23ZAId6U xMqq1KL8+KLSnNTiQ4zSHCxK4rwnPXmjhATSE0tSs1NTC1KLYLJMHJxSDYx2RyapbDb5nrvs 9x7xjzrnbWbffPG4j+eAWtOBi7Nn6TTf2Bwo1bm0eHmXrtpDDvbnf6qKOr4KsTI1/k6fn8Ny /dfeswfstU+cdwuLFJ5f/FSr6de1aQyly707z2he7T6lrm9795syE/vij0+2NItN/W3cL2t8 1Sxo4ooexaryyMgM7VSRJE4lluKMREMt5qLiRAA/biP0YAIAAA== Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org We move the listener socket to struct tipc_server and give it its own work item. This makes it easier to follow the code, and entails some simplifications in the reception code in subscriber sockets. Acked-by: Ying Xue Signed-off-by: Jon Maloy --- net/tipc/server.c | 328 ++++++++++++++++++++++++------------------------------ 1 file changed, 147 insertions(+), 181 deletions(-) diff --git a/net/tipc/server.c b/net/tipc/server.c index 0abbdd6..0e351e8 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -64,7 +64,6 @@ * @tipc_conn_new: callback will be called when new connection is incoming * @tipc_conn_release: callback will be called before releasing the connection * @tipc_conn_recvmsg: callback will be called when message arrives - * @saddr: TIPC server address * @name: server name * @imp: message importance * @type: socket type @@ -74,10 +73,11 @@ struct tipc_server { spinlock_t idr_lock; /* for idr list */ int idr_in_use; struct net *net; + struct work_struct awork; struct workqueue_struct *rcv_wq; struct workqueue_struct *send_wq; int max_rcvbuf_size; - struct sockaddr_tipc *saddr; + struct socket *listener; char name[TIPC_SERVER_NAME_LEN]; }; @@ -106,7 +106,6 @@ struct tipc_conn { struct list_head sub_list; spinlock_t sub_lock; /* for subscription list */ struct work_struct rwork; - int (*rx_action) (struct tipc_conn *con); struct list_head outqueue; spinlock_t outqueue_lock; struct work_struct swork; @@ -121,12 +120,6 @@ struct outqueue_entry { static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); -static void tipc_clean_outqueues(struct tipc_conn *con); - -static struct tipc_conn *sock2con(struct sock *sk) -{ - return sk->sk_user_data; -} static bool connected(struct tipc_conn *con) { @@ -137,21 +130,21 @@ static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_server *s = con->server; - struct socket *sock = con->sock; + struct outqueue_entry *e, *safe; - if (sock) { - if (test_bit(CF_SERVER, &con->flags)) { - __module_get(sock->ops->owner); - __module_get(sock->sk->sk_prot_creator->owner); - } - sock_release(sock); - con->sock = NULL; - } spin_lock_bh(&s->idr_lock); idr_remove(&s->conn_idr, con->conid); s->idr_in_use--; spin_unlock_bh(&s->idr_lock); - tipc_clean_outqueues(con); + if (con->sock) + sock_release(con->sock); + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + kfree(e); + } + spin_unlock_bh(&con->outqueue_lock); kfree(con); } @@ -178,14 +171,14 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) } /* sock_data_ready - interrupt callback indicating the socket has data to read - * The queued job is launched in tipc_recv_from_sock() + * The queued work is launched into tipc_recv_work()->tipc_recv_from_sock() */ static void sock_data_ready(struct sock *sk) { struct tipc_conn *con; read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); + con = sk->sk_user_data; if (connected(con)) { conn_get(con); if (!queue_work(con->server->rcv_wq, &con->rwork)) @@ -195,15 +188,15 @@ static void sock_data_ready(struct sock *sk) } /* sock_write_space - interrupt callback after a sendmsg EAGAIN - * Indicates that there now is more is space in the send buffer - * The queued job is launched in tipc_send_to_sock() + * Indicates that there now is more space in the send buffer + * The queued work is launched into tipc_send_work()->tipc_send_to_sock() */ static void sock_write_space(struct sock *sk) { struct tipc_conn *con; read_lock_bh(&sk->sk_callback_lock); - con = sock2con(sk); + con = sk->sk_user_data; if (connected(con)) { conn_get(con); if (!queue_work(con->server->send_wq, &con->swork)) @@ -212,23 +205,8 @@ static void sock_write_space(struct sock *sk) read_unlock_bh(&sk->sk_callback_lock); } -static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) -{ - struct sock *sk = sock->sk; - - write_lock_bh(&sk->sk_callback_lock); - - sk->sk_data_ready = sock_data_ready; - sk->sk_write_space = sock_write_space; - sk->sk_user_data = con; - - con->sock = sock; - - write_unlock_bh(&sk->sk_callback_lock); -} - /* tipc_con_delete_sub - delete a specific or all subscriptions - * for a given subscriber + * for a given subscriber */ static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) { @@ -268,6 +246,7 @@ static void tipc_close_conn(struct tipc_conn *con) /* Don't flush pending works, -just let them expire */ kernel_sock_shutdown(con->sock, SHUT_RDWR); + conn_put(con); } @@ -357,117 +336,8 @@ static int tipc_receive_from_sock(struct tipc_conn *con) return ret; } -static int tipc_accept_from_sock(struct tipc_conn *con) -{ - struct socket *sock = con->sock; - struct socket *newsock; - struct tipc_conn *newcon; - int ret; - - ret = kernel_accept(sock, &newsock, O_NONBLOCK); - if (ret < 0) - return ret; - - newcon = tipc_alloc_conn(con->server); - if (IS_ERR(newcon)) { - ret = PTR_ERR(newcon); - sock_release(newsock); - return ret; - } - - newcon->rx_action = tipc_receive_from_sock; - tipc_register_callbacks(newsock, newcon); - - /* Wake up receive process in case of 'SYN+' message */ - newsock->sk->sk_data_ready(newsock->sk); - return ret; -} - -static struct socket *tipc_create_listen_sock(struct tipc_conn *con) -{ - struct tipc_server *s = con->server; - struct socket *sock = NULL; - int imp = TIPC_CRITICAL_IMPORTANCE; - int ret; - - ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock); - if (ret < 0) - return NULL; - ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, - (char *)&imp, sizeof(imp)); - if (ret < 0) - goto create_err; - ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); - if (ret < 0) - goto create_err; - - con->rx_action = tipc_accept_from_sock; - ret = kernel_listen(sock, 0); - if (ret < 0) - goto create_err; - - /* As server's listening socket owner and creator is the same module, - * we have to decrease TIPC module reference count to guarantee that - * it remains zero after the server socket is created, otherwise, - * executing "rmmod" command is unable to make TIPC module deleted - * after TIPC module is inserted successfully. - * - * However, the reference count is ever increased twice in - * sock_create_kern(): one is to increase the reference count of owner - * of TIPC socket's proto_ops struct; another is to increment the - * reference count of owner of TIPC proto struct. Therefore, we must - * decrement the module reference count twice to ensure that it keeps - * zero after server's listening socket is created. Of course, we - * must bump the module reference count twice as well before the socket - * is closed. - */ - module_put(sock->ops->owner); - module_put(sock->sk->sk_prot_creator->owner); - set_bit(CF_SERVER, &con->flags); - - return sock; - -create_err: - kernel_sock_shutdown(sock, SHUT_RDWR); - sock_release(sock); - return NULL; -} - -static int tipc_open_listening_sock(struct tipc_server *s) -{ - struct socket *sock; - struct tipc_conn *con; - - con = tipc_alloc_conn(s); - if (IS_ERR(con)) - return PTR_ERR(con); - - sock = tipc_create_listen_sock(con); - if (!sock) { - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - kfree(con); - return -EINVAL; - } - - tipc_register_callbacks(sock, con); - return 0; -} - -static void tipc_clean_outqueues(struct tipc_conn *con) -{ - struct outqueue_entry *e, *safe; - - spin_lock_bh(&con->outqueue_lock); - list_for_each_entry_safe(e, safe, &con->outqueue, list) { - list_del(&e->list); - kfree(e); - } - spin_unlock_bh(&con->outqueue_lock); -} - -/* tipc_conn_queue_evt - interrupt level call from a subscription instance - * The queued job is launched in tipc_send_to_sock() +/* tipc_conn_queue_evt() - interrupt level call from a subscription instance + * The queued work is launched into tipc_send_work()->tipc_send_to_sock() */ void tipc_conn_queue_evt(struct net *net, int conid, u32 event, struct tipc_event *evt) @@ -588,9 +458,9 @@ static void tipc_send_to_sock(struct tipc_conn *con) 1, sizeof(*evt)); if (ret == -EWOULDBLOCK || ret == 0) { cond_resched(); - goto out; + return; } else if (ret < 0) { - goto err; + return tipc_close_conn(con); } } else { tipc_send_kern_top_evt(srv->net, evt); @@ -606,10 +476,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) kfree(e); } spin_unlock_bh(&con->outqueue_lock); -out: - return; -err: - tipc_close_conn(con); } static void tipc_recv_work(struct work_struct *work) @@ -618,7 +484,7 @@ static void tipc_recv_work(struct work_struct *work) int count = 0; while (connected(con)) { - if (con->rx_action(con)) + if (tipc_receive_from_sock(con)) break; /* Don't flood Rx machine */ @@ -640,10 +506,113 @@ static void tipc_send_work(struct work_struct *work) conn_put(con); } -static void tipc_work_stop(struct tipc_server *s) +static void tipc_accept_from_sock(struct work_struct *work) { - destroy_workqueue(s->rcv_wq); - destroy_workqueue(s->send_wq); + struct tipc_server *srv = container_of(work, struct tipc_server, awork); + struct socket *lsock = srv->listener; + struct socket *newsock; + struct tipc_conn *con; + struct sock *newsk; + int ret; + + while (1) { + ret = kernel_accept(lsock, &newsock, O_NONBLOCK); + if (ret < 0) + return; + con = tipc_alloc_conn(srv); + if (IS_ERR(con)) { + ret = PTR_ERR(con); + sock_release(newsock); + return; + } + /* Register callbacks */ + newsk = newsock->sk; + write_lock_bh(&newsk->sk_callback_lock); + newsk->sk_data_ready = sock_data_ready; + newsk->sk_write_space = sock_write_space; + newsk->sk_user_data = con; + con->sock = newsock; + write_unlock_bh(&newsk->sk_callback_lock); + + /* Wake up receive process in case of 'SYN+' message */ + newsk->sk_data_ready(newsk); + } +} + +/* listener_sock_data_ready - interrupt callback indicating new connection + * The queued job is launched into tipc_accept_from_sock() + */ +static void listener_sock_data_ready(struct sock *sk) +{ + struct tipc_server *srv; + + read_lock_bh(&sk->sk_callback_lock); + srv = sk->sk_user_data; + if (srv->listener) + queue_work(srv->rcv_wq, &srv->awork); + read_unlock_bh(&sk->sk_callback_lock); +} + +static int tipc_create_listener_sock(struct tipc_server *srv) +{ + int imp = TIPC_CRITICAL_IMPORTANCE; + struct socket *lsock = NULL; + struct sockaddr_tipc saddr; + struct sock *sk; + int rc; + + rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock); + if (rc < 0) + return rc; + + srv->listener = lsock; + sk = lsock->sk; + write_lock_bh(&sk->sk_callback_lock); + sk->sk_data_ready = listener_sock_data_ready; + sk->sk_user_data = srv; + write_unlock_bh(&sk->sk_callback_lock); + + rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&imp, sizeof(imp)); + if (rc < 0) + goto err; + + saddr.family = AF_TIPC; + saddr.addrtype = TIPC_ADDR_NAMESEQ; + saddr.addr.nameseq.type = TIPC_TOP_SRV; + saddr.addr.nameseq.lower = TIPC_TOP_SRV; + saddr.addr.nameseq.upper = TIPC_TOP_SRV; + saddr.scope = TIPC_NODE_SCOPE; + + rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr)); + if (rc < 0) + goto err; + rc = kernel_listen(lsock, 0); + if (rc < 0) + goto err; + + /* As server's listening socket owner and creator is the same module, + * we have to decrease TIPC module reference count to guarantee that + * it remains zero after the server socket is created, otherwise, + * executing "rmmod" command is unable to make TIPC module deleted + * after TIPC module is inserted successfully. + * + * However, the reference count is ever increased twice in + * sock_create_kern(): one is to increase the reference count of owner + * of TIPC socket's proto_ops struct; another is to increment the + * reference count of owner of TIPC proto struct. Therefore, we must + * decrement the module reference count twice to ensure that it keeps + * zero after server's listening socket is created. Of course, we + * must bump the module reference count twice as well before the socket + * is closed. + */ + module_put(lsock->ops->owner); + module_put(sk->sk_prot_creator->owner); + + return 0; +err: + sock_release(lsock); + return -EINVAL; } static int tipc_work_start(struct tipc_server *s) @@ -664,32 +633,26 @@ static int tipc_work_start(struct tipc_server *s) return 0; } +static void tipc_work_stop(struct tipc_server *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + int tipc_topsrv_start(struct net *net) { struct tipc_net *tn = tipc_net(net); const char name[] = "topology_server"; - struct sockaddr_tipc *saddr; struct tipc_server *srv; int ret; - saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); - if (!saddr) - return -ENOMEM; - saddr->family = AF_TIPC; - saddr->addrtype = TIPC_ADDR_NAMESEQ; - saddr->addr.nameseq.type = TIPC_TOP_SRV; - saddr->addr.nameseq.lower = TIPC_TOP_SRV; - saddr->addr.nameseq.upper = TIPC_TOP_SRV; - saddr->scope = TIPC_NODE_SCOPE; - srv = kzalloc(sizeof(*srv), GFP_ATOMIC); - if (!srv) { - kfree(saddr); + if (!srv) return -ENOMEM; - } - srv->net = net; - srv->saddr = saddr; - srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + + srv->net = net; + srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + INIT_WORK(&srv->awork, tipc_accept_from_sock); strncpy(srv->name, name, strlen(name) + 1); tn->topsrv = srv; @@ -703,7 +666,7 @@ int tipc_topsrv_start(struct net *net) if (ret < 0) return ret; - ret = tipc_open_listening_sock(srv); + ret = tipc_create_listener_sock(srv); if (ret < 0) tipc_work_stop(srv); @@ -713,6 +676,7 @@ int tipc_topsrv_start(struct net *net) void tipc_topsrv_stop(struct net *net) { struct tipc_server *srv = tipc_topsrv(net); + struct socket *lsock = srv->listener; struct tipc_conn *con; int id; @@ -725,10 +689,12 @@ void tipc_topsrv_stop(struct net *net) spin_lock_bh(&srv->idr_lock); } } + __module_get(lsock->ops->owner); + __module_get(lsock->sk->sk_prot_creator->owner); + sock_release(lsock); + srv->listener = NULL; spin_unlock_bh(&srv->idr_lock); - tipc_work_stop(srv); idr_destroy(&srv->conn_idr); - kfree(srv->saddr); kfree(srv); } From patchwork Thu Feb 15 09:40:51 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jon Maloy X-Patchwork-Id: 873726 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Authentication-Results: ozlabs.org; spf=none (mailfrom) smtp.mailfrom=vger.kernel.org (client-ip=209.132.180.67; helo=vger.kernel.org; envelope-from=netdev-owner@vger.kernel.org; receiver=) Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=ericsson.com header.i=@ericsson.com header.b="CMCwKkjb"; dkim-atps=neutral Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3zhrvy0jzTz9t2f for ; Thu, 15 Feb 2018 20:44:30 +1100 (AEDT) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1755317AbeBOJoZ (ORCPT ); Thu, 15 Feb 2018 04:44:25 -0500 Received: from sesbmg22.ericsson.net ([193.180.251.48]:50858 "EHLO sesbmg22.ericsson.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1755186AbeBOJoQ (ORCPT ); Thu, 15 Feb 2018 04:44:16 -0500 DKIM-Signature: v=1; a=rsa-sha256; d=ericsson.com; s=mailgw201801; c=relaxed/simple; q=dns/txt; i=@ericsson.com; t=1518687850; h=From:Sender:Reply-To:Subject:Date:Message-ID:To:CC:MIME-Version:Content-Type: Content-Transfer-Encoding:Content-ID:Content-Description:Resent-Date:Resent-From: Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID:In-Reply-To:References:List-Id: List-Help:List-Unsubscribe:List-Subscribe:List-Post:List-Owner:List-Archive; bh=BVQkqONkkpBdQXXB8RrF6hQkIY96lTuy0Jg8grEBnRw=; b=CMCwKkjbG2Qd66DHv2VkV0BVs9+3ndvY/QfwK+p9QpZjrAPkpwzhzJRPZuqkH6z0 K8grfcehzpUxmc8uPv2+NP2v2p57Ecldexe67xe7xVFvbKh/tzFyl/erE4+jr3kg 6vxlIlQkd9sBDNyT105wM0+Xso5cag7w2lxdKjVVnco=; X-AuditID: c1b4fb30-799639c000004778-de-5a85566acf5f Received: from ESESSHC011.ericsson.se (Unknown_Domain [153.88.183.51]) by sesbmg22.ericsson.net (Symantec Mail Security) with SMTP id BD.D8.18296.A66558A5; Thu, 15 Feb 2018 10:44:10 +0100 (CET) Received: from tipsy.lab.linux.ericsson.se (10.35.28.120) by ESESSHC011.ericsson.se (153.88.183.51) with Microsoft SMTP Server (TLS) id 14.3.352.0; Thu, 15 Feb 2018 10:44:08 +0100 From: Jon Maloy To: , CC: , , , , , , Subject: [net-next 10/10] tipc: rename tipc_server to tipc_topsrv Date: Thu, 15 Feb 2018 10:40:51 +0100 Message-ID: <1518687651-26561-11-git-send-email-jon.maloy@ericsson.com> X-Mailer: git-send-email 2.1.4 In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> References: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com> MIME-Version: 1.0 X-Originating-IP: [10.35.28.120] X-Brightmail-Tracker: H4sIAAAAAAAAA+NgFrrALMWRmVeSWpSXmKPExsUyM2K7sW5WWGuUwaHfghY3GnqYLeacb2Gx ePtqFrvFsQViFlvOZ1lcaT/LbvH4+nVmB3aPLStvMnm8u8LmsXvBZyaPz5vkPNZv2coUwBrF ZZOSmpNZllqkb5fAlXFy7zT2gk//mSreLf3O1sB4fidTFyMnh4SAicSK1pNsXYxcHEIChxkl WvfsYoZwtjNKbNi2nRWkik1AQ+LltA5GEFtEwFji1cpOJpAiZoFPjBItK5+xgSSEBRwl/vx+ DmazCKhK7Dr4hgXE5hVwl/h/6QEjxDo5ifPHfzKD2JxA8VUt/8AWCAm4Sbxe8oYJol5Q4uTM J2C9zAISEgdfvGCGqFGWmPthGtTZChLfZnYzTWAUmIWkZRaSlgWMTKsYRYtTi5Ny042M9FKL MpOLi/Pz9PJSSzYxAsP54JbfBjsYXz53PMQowMGoxMNrHdIaJcSaWFZcmXuIUYKDWUmE97Yd UIg3JbGyKrUoP76oNCe1+BCjNAeLkjjvSU/eKCGB9MSS1OzU1ILUIpgsEwenVAPjkrJFO9zq 5v+9afJv510r0Zta2x065q8T/9MqprWpwtfv8Ay/ctt69gSrzvVbPr5ckeil5Lpp+98uL0fp o6Znd0w/+fPnKc657V1Z0i+ONG2Xt99yrP691KK4w30xc8OnXb73M/hC0Onbp5vs27g3BxUK za35q+n5bdI9L4cVK9u8hB58FOKaocRSnJFoqMVcVJwIAP5BZFJjAgAA Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org We rename struct tipc_server to struct tipc_topsrv. This reflect its now specialized role as topology server. Accoringly, we change or add function prefixes to make it clearer which functionality those belong to. There are no functional changes in this commit. Acked-by: Ying.Xue Signed-off-by: Jon Maloy --- net/tipc/Makefile | 2 +- net/tipc/core.h | 6 +- net/tipc/group.c | 2 +- net/tipc/server.c | 700 ----------------------------------------------------- net/tipc/server.h | 57 ----- net/tipc/subscr.c | 2 +- net/tipc/subscr.h | 2 +- net/tipc/topsrv.c | 702 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/topsrv.h | 54 +++++ 9 files changed, 763 insertions(+), 764 deletions(-) delete mode 100644 net/tipc/server.c delete mode 100644 net/tipc/server.h create mode 100644 net/tipc/topsrv.c create mode 100644 net/tipc/topsrv.h diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 37bb0bf..1edb719 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -9,7 +9,7 @@ tipc-y += addr.o bcast.o bearer.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o monitor.o name_table.o net.o \ netlink.o netlink_compat.o node.o socket.o eth_media.o \ - server.o socket.o group.o + topsrv.o socket.o group.o tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/core.h b/net/tipc/core.h index 20b21af..ff8b071 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -64,7 +64,7 @@ struct tipc_bearer; struct tipc_bc_base; struct tipc_link; struct tipc_name_table; -struct tipc_server; +struct tipc_topsrv; struct tipc_monitor; #define TIPC_MOD_VER "2.0.0" @@ -112,7 +112,7 @@ struct tipc_net { struct list_head dist_queue; /* Topology subscription server */ - struct tipc_server *topsrv; + struct tipc_topsrv *topsrv; atomic_t subscription_count; }; @@ -131,7 +131,7 @@ static inline struct list_head *tipc_nodes(struct net *net) return &tipc_net(net)->node_list; } -static inline struct tipc_server *tipc_topsrv(struct net *net) +static inline struct tipc_topsrv *tipc_topsrv(struct net *net) { return tipc_net(net)->topsrv; } diff --git a/net/tipc/group.c b/net/tipc/group.c index 122162a..03086cc 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -37,7 +37,7 @@ #include "addr.h" #include "group.h" #include "bcast.h" -#include "server.h" +#include "topsrv.h" #include "msg.h" #include "socket.h" #include "node.h" diff --git a/net/tipc/server.c b/net/tipc/server.c deleted file mode 100644 index 0e351e8..0000000 --- a/net/tipc/server.c +++ /dev/null @@ -1,700 +0,0 @@ -/* - * net/tipc/server.c: TIPC server infrastructure - * - * Copyright (c) 2012-2013, Wind River Systems - * Copyright (c) 2017, Ericsson AB - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "subscr.h" -#include "server.h" -#include "core.h" -#include "socket.h" -#include "addr.h" -#include "msg.h" -#include -#include - -/* Number of messages to send before rescheduling */ -#define MAX_SEND_MSG_COUNT 25 -#define MAX_RECV_MSG_COUNT 25 -#define CF_CONNECTED 1 -#define CF_SERVER 2 - -#define TIPC_SERVER_NAME_LEN 32 - -/** - * struct tipc_server - TIPC server structure - * @conn_idr: identifier set of connection - * @idr_lock: protect the connection identifier set - * @idr_in_use: amount of allocated identifier entry - * @net: network namspace instance - * @rcvbuf_cache: memory cache of server receive buffer - * @rcv_wq: receive workqueue - * @send_wq: send workqueue - * @max_rcvbuf_size: maximum permitted receive message length - * @tipc_conn_new: callback will be called when new connection is incoming - * @tipc_conn_release: callback will be called before releasing the connection - * @tipc_conn_recvmsg: callback will be called when message arrives - * @name: server name - * @imp: message importance - * @type: socket type - */ -struct tipc_server { - struct idr conn_idr; - spinlock_t idr_lock; /* for idr list */ - int idr_in_use; - struct net *net; - struct work_struct awork; - struct workqueue_struct *rcv_wq; - struct workqueue_struct *send_wq; - int max_rcvbuf_size; - struct socket *listener; - char name[TIPC_SERVER_NAME_LEN]; -}; - -/** - * struct tipc_conn - TIPC connection structure - * @kref: reference counter to connection object - * @conid: connection identifier - * @sock: socket handler associated with connection - * @flags: indicates connection state - * @server: pointer to connected server - * @sub_list: lsit to all pertaing subscriptions - * @sub_lock: lock protecting the subscription list - * @outqueue_lock: control access to the outqueue - * @rwork: receive work item - * @rx_action: what to do when connection socket is active - * @outqueue: pointer to first outbound message in queue - * @outqueue_lock: control access to the outqueue - * @swork: send work item - */ -struct tipc_conn { - struct kref kref; - int conid; - struct socket *sock; - unsigned long flags; - struct tipc_server *server; - struct list_head sub_list; - spinlock_t sub_lock; /* for subscription list */ - struct work_struct rwork; - struct list_head outqueue; - spinlock_t outqueue_lock; - struct work_struct swork; -}; - -/* An entry waiting to be sent */ -struct outqueue_entry { - bool inactive; - struct tipc_event evt; - struct list_head list; -}; - -static void tipc_recv_work(struct work_struct *work); -static void tipc_send_work(struct work_struct *work); - -static bool connected(struct tipc_conn *con) -{ - return con && test_bit(CF_CONNECTED, &con->flags); -} - -static void tipc_conn_kref_release(struct kref *kref) -{ - struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); - struct tipc_server *s = con->server; - struct outqueue_entry *e, *safe; - - spin_lock_bh(&s->idr_lock); - idr_remove(&s->conn_idr, con->conid); - s->idr_in_use--; - spin_unlock_bh(&s->idr_lock); - if (con->sock) - sock_release(con->sock); - - spin_lock_bh(&con->outqueue_lock); - list_for_each_entry_safe(e, safe, &con->outqueue, list) { - list_del(&e->list); - kfree(e); - } - spin_unlock_bh(&con->outqueue_lock); - kfree(con); -} - -static void conn_put(struct tipc_conn *con) -{ - kref_put(&con->kref, tipc_conn_kref_release); -} - -static void conn_get(struct tipc_conn *con) -{ - kref_get(&con->kref); -} - -static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) -{ - struct tipc_conn *con; - - spin_lock_bh(&s->idr_lock); - con = idr_find(&s->conn_idr, conid); - if (!connected(con) || !kref_get_unless_zero(&con->kref)) - con = NULL; - spin_unlock_bh(&s->idr_lock); - return con; -} - -/* sock_data_ready - interrupt callback indicating the socket has data to read - * The queued work is launched into tipc_recv_work()->tipc_recv_from_sock() - */ -static void sock_data_ready(struct sock *sk) -{ - struct tipc_conn *con; - - read_lock_bh(&sk->sk_callback_lock); - con = sk->sk_user_data; - if (connected(con)) { - conn_get(con); - if (!queue_work(con->server->rcv_wq, &con->rwork)) - conn_put(con); - } - read_unlock_bh(&sk->sk_callback_lock); -} - -/* sock_write_space - interrupt callback after a sendmsg EAGAIN - * Indicates that there now is more space in the send buffer - * The queued work is launched into tipc_send_work()->tipc_send_to_sock() - */ -static void sock_write_space(struct sock *sk) -{ - struct tipc_conn *con; - - read_lock_bh(&sk->sk_callback_lock); - con = sk->sk_user_data; - if (connected(con)) { - conn_get(con); - if (!queue_work(con->server->send_wq, &con->swork)) - conn_put(con); - } - read_unlock_bh(&sk->sk_callback_lock); -} - -/* tipc_con_delete_sub - delete a specific or all subscriptions - * for a given subscriber - */ -static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) -{ - struct list_head *sub_list = &con->sub_list; - struct tipc_net *tn = tipc_net(con->server->net); - struct tipc_subscription *sub, *tmp; - - spin_lock_bh(&con->sub_lock); - list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { - if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { - tipc_sub_unsubscribe(sub); - atomic_dec(&tn->subscription_count); - } else if (s) { - break; - } - } - spin_unlock_bh(&con->sub_lock); -} - -static void tipc_close_conn(struct tipc_conn *con) -{ - struct sock *sk = con->sock->sk; - bool disconnect = false; - - write_lock_bh(&sk->sk_callback_lock); - disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); - - if (disconnect) { - sk->sk_user_data = NULL; - tipc_con_delete_sub(con, NULL); - } - write_unlock_bh(&sk->sk_callback_lock); - - /* Handle concurrent calls from sending and receiving threads */ - if (!disconnect) - return; - - /* Don't flush pending works, -just let them expire */ - kernel_sock_shutdown(con->sock, SHUT_RDWR); - - conn_put(con); -} - -static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) -{ - struct tipc_conn *con; - int ret; - - con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); - if (!con) - return ERR_PTR(-ENOMEM); - - kref_init(&con->kref); - INIT_LIST_HEAD(&con->outqueue); - INIT_LIST_HEAD(&con->sub_list); - spin_lock_init(&con->outqueue_lock); - spin_lock_init(&con->sub_lock); - INIT_WORK(&con->swork, tipc_send_work); - INIT_WORK(&con->rwork, tipc_recv_work); - - spin_lock_bh(&s->idr_lock); - ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); - if (ret < 0) { - kfree(con); - spin_unlock_bh(&s->idr_lock); - return ERR_PTR(-ENOMEM); - } - con->conid = ret; - s->idr_in_use++; - spin_unlock_bh(&s->idr_lock); - - set_bit(CF_CONNECTED, &con->flags); - con->server = s; - - return con; -} - -static int tipc_con_rcv_sub(struct tipc_server *srv, - struct tipc_conn *con, - struct tipc_subscr *s) -{ - struct tipc_net *tn = tipc_net(srv->net); - struct tipc_subscription *sub; - - if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { - tipc_con_delete_sub(con, s); - return 0; - } - if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { - pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); - return -1; - } - sub = tipc_sub_subscribe(srv->net, s, con->conid); - if (!sub) - return -1; - atomic_inc(&tn->subscription_count); - spin_lock_bh(&con->sub_lock); - list_add(&sub->sub_list, &con->sub_list); - spin_unlock_bh(&con->sub_lock); - return 0; -} - -static int tipc_receive_from_sock(struct tipc_conn *con) -{ - struct tipc_server *srv = con->server; - struct sock *sk = con->sock->sk; - struct msghdr msg = {}; - struct tipc_subscr s; - struct kvec iov; - int ret; - - iov.iov_base = &s; - iov.iov_len = sizeof(s); - msg.msg_name = NULL; - iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); - ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); - if (ret == -EWOULDBLOCK) - return -EWOULDBLOCK; - if (ret > 0) { - read_lock_bh(&sk->sk_callback_lock); - ret = tipc_con_rcv_sub(srv, con, &s); - read_unlock_bh(&sk->sk_callback_lock); - } - if (ret < 0) - tipc_close_conn(con); - - return ret; -} - -/* tipc_conn_queue_evt() - interrupt level call from a subscription instance - * The queued work is launched into tipc_send_work()->tipc_send_to_sock() - */ -void tipc_conn_queue_evt(struct net *net, int conid, - u32 event, struct tipc_event *evt) -{ - struct tipc_server *srv = tipc_topsrv(net); - struct outqueue_entry *e; - struct tipc_conn *con; - - con = tipc_conn_lookup(srv, conid); - if (!con) - return; - - if (!connected(con)) - goto err; - - e = kmalloc(sizeof(*e), GFP_ATOMIC); - if (!e) - goto err; - e->inactive = (event == TIPC_SUBSCR_TIMEOUT); - memcpy(&e->evt, evt, sizeof(*evt)); - spin_lock_bh(&con->outqueue_lock); - list_add_tail(&e->list, &con->outqueue); - spin_unlock_bh(&con->outqueue_lock); - - if (queue_work(srv->send_wq, &con->swork)) - return; -err: - conn_put(con); -} - -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, - u32 upper, u32 filter, int *conid) -{ - struct tipc_subscr sub; - struct tipc_conn *con; - int rc; - - sub.seq.type = type; - sub.seq.lower = lower; - sub.seq.upper = upper; - sub.timeout = TIPC_WAIT_FOREVER; - sub.filter = filter; - *(u32 *)&sub.usr_handle = port; - - con = tipc_alloc_conn(tipc_topsrv(net)); - if (IS_ERR(con)) - return false; - - *conid = con->conid; - con->sock = NULL; - rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub); - if (rc < 0) - tipc_close_conn(con); - return !rc; -} - -void tipc_topsrv_kern_unsubscr(struct net *net, int conid) -{ - struct tipc_conn *con; - - con = tipc_conn_lookup(tipc_topsrv(net), conid); - if (!con) - return; - - test_and_clear_bit(CF_CONNECTED, &con->flags); - tipc_con_delete_sub(con, NULL); - conn_put(con); - conn_put(con); -} - -static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) -{ - u32 port = *(u32 *)&evt->s.usr_handle; - u32 self = tipc_own_addr(net); - struct sk_buff_head evtq; - struct sk_buff *skb; - - skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), - self, self, port, port, 0); - if (!skb) - return; - msg_set_dest_droppable(buf_msg(skb), true); - memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); - skb_queue_head_init(&evtq); - __skb_queue_tail(&evtq, skb); - tipc_sk_rcv(net, &evtq); -} - -static void tipc_send_to_sock(struct tipc_conn *con) -{ - struct list_head *queue = &con->outqueue; - struct tipc_server *srv = con->server; - struct outqueue_entry *e; - struct tipc_event *evt; - struct msghdr msg; - struct kvec iov; - int count = 0; - int ret; - - spin_lock_bh(&con->outqueue_lock); - - while (!list_empty(queue)) { - e = list_first_entry(queue, struct outqueue_entry, list); - evt = &e->evt; - spin_unlock_bh(&con->outqueue_lock); - - if (e->inactive) - tipc_con_delete_sub(con, &evt->s); - - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; - iov.iov_base = evt; - iov.iov_len = sizeof(*evt); - msg.msg_name = NULL; - - if (con->sock) { - ret = kernel_sendmsg(con->sock, &msg, &iov, - 1, sizeof(*evt)); - if (ret == -EWOULDBLOCK || ret == 0) { - cond_resched(); - return; - } else if (ret < 0) { - return tipc_close_conn(con); - } - } else { - tipc_send_kern_top_evt(srv->net, evt); - } - - /* Don't starve users filling buffers */ - if (++count >= MAX_SEND_MSG_COUNT) { - cond_resched(); - count = 0; - } - spin_lock_bh(&con->outqueue_lock); - list_del(&e->list); - kfree(e); - } - spin_unlock_bh(&con->outqueue_lock); -} - -static void tipc_recv_work(struct work_struct *work) -{ - struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); - int count = 0; - - while (connected(con)) { - if (tipc_receive_from_sock(con)) - break; - - /* Don't flood Rx machine */ - if (++count >= MAX_RECV_MSG_COUNT) { - cond_resched(); - count = 0; - } - } - conn_put(con); -} - -static void tipc_send_work(struct work_struct *work) -{ - struct tipc_conn *con = container_of(work, struct tipc_conn, swork); - - if (connected(con)) - tipc_send_to_sock(con); - - conn_put(con); -} - -static void tipc_accept_from_sock(struct work_struct *work) -{ - struct tipc_server *srv = container_of(work, struct tipc_server, awork); - struct socket *lsock = srv->listener; - struct socket *newsock; - struct tipc_conn *con; - struct sock *newsk; - int ret; - - while (1) { - ret = kernel_accept(lsock, &newsock, O_NONBLOCK); - if (ret < 0) - return; - con = tipc_alloc_conn(srv); - if (IS_ERR(con)) { - ret = PTR_ERR(con); - sock_release(newsock); - return; - } - /* Register callbacks */ - newsk = newsock->sk; - write_lock_bh(&newsk->sk_callback_lock); - newsk->sk_data_ready = sock_data_ready; - newsk->sk_write_space = sock_write_space; - newsk->sk_user_data = con; - con->sock = newsock; - write_unlock_bh(&newsk->sk_callback_lock); - - /* Wake up receive process in case of 'SYN+' message */ - newsk->sk_data_ready(newsk); - } -} - -/* listener_sock_data_ready - interrupt callback indicating new connection - * The queued job is launched into tipc_accept_from_sock() - */ -static void listener_sock_data_ready(struct sock *sk) -{ - struct tipc_server *srv; - - read_lock_bh(&sk->sk_callback_lock); - srv = sk->sk_user_data; - if (srv->listener) - queue_work(srv->rcv_wq, &srv->awork); - read_unlock_bh(&sk->sk_callback_lock); -} - -static int tipc_create_listener_sock(struct tipc_server *srv) -{ - int imp = TIPC_CRITICAL_IMPORTANCE; - struct socket *lsock = NULL; - struct sockaddr_tipc saddr; - struct sock *sk; - int rc; - - rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock); - if (rc < 0) - return rc; - - srv->listener = lsock; - sk = lsock->sk; - write_lock_bh(&sk->sk_callback_lock); - sk->sk_data_ready = listener_sock_data_ready; - sk->sk_user_data = srv; - write_unlock_bh(&sk->sk_callback_lock); - - rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE, - (char *)&imp, sizeof(imp)); - if (rc < 0) - goto err; - - saddr.family = AF_TIPC; - saddr.addrtype = TIPC_ADDR_NAMESEQ; - saddr.addr.nameseq.type = TIPC_TOP_SRV; - saddr.addr.nameseq.lower = TIPC_TOP_SRV; - saddr.addr.nameseq.upper = TIPC_TOP_SRV; - saddr.scope = TIPC_NODE_SCOPE; - - rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr)); - if (rc < 0) - goto err; - rc = kernel_listen(lsock, 0); - if (rc < 0) - goto err; - - /* As server's listening socket owner and creator is the same module, - * we have to decrease TIPC module reference count to guarantee that - * it remains zero after the server socket is created, otherwise, - * executing "rmmod" command is unable to make TIPC module deleted - * after TIPC module is inserted successfully. - * - * However, the reference count is ever increased twice in - * sock_create_kern(): one is to increase the reference count of owner - * of TIPC socket's proto_ops struct; another is to increment the - * reference count of owner of TIPC proto struct. Therefore, we must - * decrement the module reference count twice to ensure that it keeps - * zero after server's listening socket is created. Of course, we - * must bump the module reference count twice as well before the socket - * is closed. - */ - module_put(lsock->ops->owner); - module_put(sk->sk_prot_creator->owner); - - return 0; -err: - sock_release(lsock); - return -EINVAL; -} - -static int tipc_work_start(struct tipc_server *s) -{ - s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); - if (!s->rcv_wq) { - pr_err("can't start tipc receive workqueue\n"); - return -ENOMEM; - } - - s->send_wq = alloc_ordered_workqueue("tipc_send", 0); - if (!s->send_wq) { - pr_err("can't start tipc send workqueue\n"); - destroy_workqueue(s->rcv_wq); - return -ENOMEM; - } - - return 0; -} - -static void tipc_work_stop(struct tipc_server *s) -{ - destroy_workqueue(s->rcv_wq); - destroy_workqueue(s->send_wq); -} - -int tipc_topsrv_start(struct net *net) -{ - struct tipc_net *tn = tipc_net(net); - const char name[] = "topology_server"; - struct tipc_server *srv; - int ret; - - srv = kzalloc(sizeof(*srv), GFP_ATOMIC); - if (!srv) - return -ENOMEM; - - srv->net = net; - srv->max_rcvbuf_size = sizeof(struct tipc_subscr); - INIT_WORK(&srv->awork, tipc_accept_from_sock); - - strncpy(srv->name, name, strlen(name) + 1); - tn->topsrv = srv; - atomic_set(&tn->subscription_count, 0); - - spin_lock_init(&srv->idr_lock); - idr_init(&srv->conn_idr); - srv->idr_in_use = 0; - - ret = tipc_work_start(srv); - if (ret < 0) - return ret; - - ret = tipc_create_listener_sock(srv); - if (ret < 0) - tipc_work_stop(srv); - - return ret; -} - -void tipc_topsrv_stop(struct net *net) -{ - struct tipc_server *srv = tipc_topsrv(net); - struct socket *lsock = srv->listener; - struct tipc_conn *con; - int id; - - spin_lock_bh(&srv->idr_lock); - for (id = 0; srv->idr_in_use; id++) { - con = idr_find(&srv->conn_idr, id); - if (con) { - spin_unlock_bh(&srv->idr_lock); - tipc_close_conn(con); - spin_lock_bh(&srv->idr_lock); - } - } - __module_get(lsock->ops->owner); - __module_get(lsock->sk->sk_prot_creator->owner); - sock_release(lsock); - srv->listener = NULL; - spin_unlock_bh(&srv->idr_lock); - tipc_work_stop(srv); - idr_destroy(&srv->conn_idr); - kfree(srv); -} diff --git a/net/tipc/server.h b/net/tipc/server.h deleted file mode 100644 index ce93b6d..0000000 --- a/net/tipc/server.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * net/tipc/server.h: Include file for TIPC server code - * - * Copyright (c) 2012-2013, Wind River Systems - * Copyright (c) 2017, Ericsson AB - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the names of the copyright holders nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * Alternatively, this software may be distributed under the terms of the - * GNU General Public License ("GPL") version 2 as published by the Free - * Software Foundation. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _TIPC_SERVER_H -#define _TIPC_SERVER_H - -#include "core.h" -#include -#include -#include - -#define TIPC_SERVER_NAME_LEN 32 -#define TIPC_SUB_CLUSTER_SCOPE 0x20 -#define TIPC_SUB_NODE_SCOPE 0x40 -#define TIPC_SUB_NO_STATUS 0x80 - -void tipc_conn_queue_evt(struct net *net, int conid, - u32 event, struct tipc_event *evt); - -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, - u32 upper, u32 filter, int *conid); -void tipc_topsrv_kern_unsubscr(struct net *net, int conid); - -#endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index c814656..6925a98 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -51,7 +51,7 @@ static void tipc_sub_send_event(struct tipc_subscription *sub, tipc_evt_write(evt, found_upper, found_upper); tipc_evt_write(evt, port.ref, port); tipc_evt_write(evt, port.node, node); - tipc_conn_queue_evt(sub->net, sub->conid, event, evt); + tipc_topsrv_queue_evt(sub->net, sub->conid, event, evt); } /** diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 82ba61a..8b2d22b 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -37,7 +37,7 @@ #ifndef _TIPC_SUBSCR_H #define _TIPC_SUBSCR_H -#include "server.h" +#include "topsrv.h" #define TIPC_MAX_SUBSCR 65535 #define TIPC_MAX_PUBLICATIONS 65535 diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c new file mode 100644 index 0000000..02013e0 --- /dev/null +++ b/net/tipc/topsrv.c @@ -0,0 +1,702 @@ +/* + * net/tipc/server.c: TIPC server infrastructure + * + * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017-2018, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "subscr.h" +#include "topsrv.h" +#include "core.h" +#include "socket.h" +#include "addr.h" +#include "msg.h" +#include +#include + +/* Number of messages to send before rescheduling */ +#define MAX_SEND_MSG_COUNT 25 +#define MAX_RECV_MSG_COUNT 25 +#define CF_CONNECTED 1 +#define CF_SERVER 2 + +#define TIPC_SERVER_NAME_LEN 32 + +/** + * struct tipc_topsrv - TIPC server structure + * @conn_idr: identifier set of connection + * @idr_lock: protect the connection identifier set + * @idr_in_use: amount of allocated identifier entry + * @net: network namspace instance + * @rcvbuf_cache: memory cache of server receive buffer + * @rcv_wq: receive workqueue + * @send_wq: send workqueue + * @max_rcvbuf_size: maximum permitted receive message length + * @tipc_conn_new: callback will be called when new connection is incoming + * @tipc_conn_release: callback will be called before releasing the connection + * @tipc_conn_recvmsg: callback will be called when message arrives + * @name: server name + * @imp: message importance + * @type: socket type + */ +struct tipc_topsrv { + struct idr conn_idr; + spinlock_t idr_lock; /* for idr list */ + int idr_in_use; + struct net *net; + struct work_struct awork; + struct workqueue_struct *rcv_wq; + struct workqueue_struct *send_wq; + int max_rcvbuf_size; + struct socket *listener; + char name[TIPC_SERVER_NAME_LEN]; +}; + +/** + * struct tipc_conn - TIPC connection structure + * @kref: reference counter to connection object + * @conid: connection identifier + * @sock: socket handler associated with connection + * @flags: indicates connection state + * @server: pointer to connected server + * @sub_list: lsit to all pertaing subscriptions + * @sub_lock: lock protecting the subscription list + * @outqueue_lock: control access to the outqueue + * @rwork: receive work item + * @rx_action: what to do when connection socket is active + * @outqueue: pointer to first outbound message in queue + * @outqueue_lock: control access to the outqueue + * @swork: send work item + */ +struct tipc_conn { + struct kref kref; + int conid; + struct socket *sock; + unsigned long flags; + struct tipc_topsrv *server; + struct list_head sub_list; + spinlock_t sub_lock; /* for subscription list */ + struct work_struct rwork; + struct list_head outqueue; + spinlock_t outqueue_lock; /* for outqueue */ + struct work_struct swork; +}; + +/* An entry waiting to be sent */ +struct outqueue_entry { + bool inactive; + struct tipc_event evt; + struct list_head list; +}; + +static void tipc_conn_recv_work(struct work_struct *work); +static void tipc_conn_send_work(struct work_struct *work); +static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt); +static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s); + +static bool connected(struct tipc_conn *con) +{ + return con && test_bit(CF_CONNECTED, &con->flags); +} + +static void tipc_conn_kref_release(struct kref *kref) +{ + struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); + struct tipc_topsrv *s = con->server; + struct outqueue_entry *e, *safe; + + spin_lock_bh(&s->idr_lock); + idr_remove(&s->conn_idr, con->conid); + s->idr_in_use--; + spin_unlock_bh(&s->idr_lock); + if (con->sock) + sock_release(con->sock); + + spin_lock_bh(&con->outqueue_lock); + list_for_each_entry_safe(e, safe, &con->outqueue, list) { + list_del(&e->list); + kfree(e); + } + spin_unlock_bh(&con->outqueue_lock); + kfree(con); +} + +static void conn_put(struct tipc_conn *con) +{ + kref_put(&con->kref, tipc_conn_kref_release); +} + +static void conn_get(struct tipc_conn *con) +{ + kref_get(&con->kref); +} + +static void tipc_conn_close(struct tipc_conn *con) +{ + struct sock *sk = con->sock->sk; + bool disconnect = false; + + write_lock_bh(&sk->sk_callback_lock); + disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); + + if (disconnect) { + sk->sk_user_data = NULL; + tipc_conn_delete_sub(con, NULL); + } + write_unlock_bh(&sk->sk_callback_lock); + + /* Handle concurrent calls from sending and receiving threads */ + if (!disconnect) + return; + + /* Don't flush pending works, -just let them expire */ + kernel_sock_shutdown(con->sock, SHUT_RDWR); + + conn_put(con); +} + +static struct tipc_conn *tipc_conn_alloc(struct tipc_topsrv *s) +{ + struct tipc_conn *con; + int ret; + + con = kzalloc(sizeof(*con), GFP_ATOMIC); + if (!con) + return ERR_PTR(-ENOMEM); + + kref_init(&con->kref); + INIT_LIST_HEAD(&con->outqueue); + INIT_LIST_HEAD(&con->sub_list); + spin_lock_init(&con->outqueue_lock); + spin_lock_init(&con->sub_lock); + INIT_WORK(&con->swork, tipc_conn_send_work); + INIT_WORK(&con->rwork, tipc_conn_recv_work); + + spin_lock_bh(&s->idr_lock); + ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); + if (ret < 0) { + kfree(con); + spin_unlock_bh(&s->idr_lock); + return ERR_PTR(-ENOMEM); + } + con->conid = ret; + s->idr_in_use++; + spin_unlock_bh(&s->idr_lock); + + set_bit(CF_CONNECTED, &con->flags); + con->server = s; + + return con; +} + +static struct tipc_conn *tipc_conn_lookup(struct tipc_topsrv *s, int conid) +{ + struct tipc_conn *con; + + spin_lock_bh(&s->idr_lock); + con = idr_find(&s->conn_idr, conid); + if (!connected(con) || !kref_get_unless_zero(&con->kref)) + con = NULL; + spin_unlock_bh(&s->idr_lock); + return con; +} + +/* tipc_conn_delete_sub - delete a specific or all subscriptions + * for a given subscriber + */ +static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) +{ + struct tipc_net *tn = tipc_net(con->server->net); + struct list_head *sub_list = &con->sub_list; + struct tipc_subscription *sub, *tmp; + + spin_lock_bh(&con->sub_lock); + list_for_each_entry_safe(sub, tmp, sub_list, sub_list) { + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { + tipc_sub_unsubscribe(sub); + atomic_dec(&tn->subscription_count); + } else if (s) { + break; + } + } + spin_unlock_bh(&con->sub_lock); +} + +static void tipc_conn_send_to_sock(struct tipc_conn *con) +{ + struct list_head *queue = &con->outqueue; + struct tipc_topsrv *srv = con->server; + struct outqueue_entry *e; + struct tipc_event *evt; + struct msghdr msg; + struct kvec iov; + int count = 0; + int ret; + + spin_lock_bh(&con->outqueue_lock); + + while (!list_empty(queue)) { + e = list_first_entry(queue, struct outqueue_entry, list); + evt = &e->evt; + spin_unlock_bh(&con->outqueue_lock); + + if (e->inactive) + tipc_conn_delete_sub(con, &evt->s); + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + iov.iov_base = evt; + iov.iov_len = sizeof(*evt); + msg.msg_name = NULL; + + if (con->sock) { + ret = kernel_sendmsg(con->sock, &msg, &iov, + 1, sizeof(*evt)); + if (ret == -EWOULDBLOCK || ret == 0) { + cond_resched(); + return; + } else if (ret < 0) { + return tipc_conn_close(con); + } + } else { + tipc_topsrv_kern_evt(srv->net, evt); + } + + /* Don't starve users filling buffers */ + if (++count >= MAX_SEND_MSG_COUNT) { + cond_resched(); + count = 0; + } + spin_lock_bh(&con->outqueue_lock); + list_del(&e->list); + kfree(e); + } + spin_unlock_bh(&con->outqueue_lock); +} + +static void tipc_conn_send_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, swork); + + if (connected(con)) + tipc_conn_send_to_sock(con); + + conn_put(con); +} + +/* tipc_conn_queue_evt() - interrupt level call from a subscription instance + * The queued work is launched into tipc_send_work()->tipc_send_to_sock() + */ +void tipc_topsrv_queue_evt(struct net *net, int conid, + u32 event, struct tipc_event *evt) +{ + struct tipc_topsrv *srv = tipc_topsrv(net); + struct outqueue_entry *e; + struct tipc_conn *con; + + con = tipc_conn_lookup(srv, conid); + if (!con) + return; + + if (!connected(con)) + goto err; + + e = kmalloc(sizeof(*e), GFP_ATOMIC); + if (!e) + goto err; + e->inactive = (event == TIPC_SUBSCR_TIMEOUT); + memcpy(&e->evt, evt, sizeof(*evt)); + spin_lock_bh(&con->outqueue_lock); + list_add_tail(&e->list, &con->outqueue); + spin_unlock_bh(&con->outqueue_lock); + + if (queue_work(srv->send_wq, &con->swork)) + return; +err: + conn_put(con); +} + +/* tipc_conn_write_space - interrupt callback after a sendmsg EAGAIN + * Indicates that there now is more space in the send buffer + * The queued work is launched into tipc_send_work()->tipc_conn_send_to_sock() + */ +static void tipc_conn_write_space(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock_bh(&sk->sk_callback_lock); + con = sk->sk_user_data; + if (connected(con)) { + conn_get(con); + if (!queue_work(con->server->send_wq, &con->swork)) + conn_put(con); + } + read_unlock_bh(&sk->sk_callback_lock); +} + +static int tipc_conn_rcv_sub(struct tipc_topsrv *srv, + struct tipc_conn *con, + struct tipc_subscr *s) +{ + struct tipc_net *tn = tipc_net(srv->net); + struct tipc_subscription *sub; + + if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { + tipc_conn_delete_sub(con, s); + return 0; + } + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCR) { + pr_warn("Subscription rejected, max (%u)\n", TIPC_MAX_SUBSCR); + return -1; + } + sub = tipc_sub_subscribe(srv->net, s, con->conid); + if (!sub) + return -1; + atomic_inc(&tn->subscription_count); + spin_lock_bh(&con->sub_lock); + list_add(&sub->sub_list, &con->sub_list); + spin_unlock_bh(&con->sub_lock); + return 0; +} + +static int tipc_conn_rcv_from_sock(struct tipc_conn *con) +{ + struct tipc_topsrv *srv = con->server; + struct sock *sk = con->sock->sk; + struct msghdr msg = {}; + struct tipc_subscr s; + struct kvec iov; + int ret; + + iov.iov_base = &s; + iov.iov_len = sizeof(s); + msg.msg_name = NULL; + iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); + ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT); + if (ret == -EWOULDBLOCK) + return -EWOULDBLOCK; + if (ret > 0) { + read_lock_bh(&sk->sk_callback_lock); + ret = tipc_conn_rcv_sub(srv, con, &s); + read_unlock_bh(&sk->sk_callback_lock); + } + if (ret < 0) + tipc_conn_close(con); + + return ret; +} + +static void tipc_conn_recv_work(struct work_struct *work) +{ + struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); + int count = 0; + + while (connected(con)) { + if (tipc_conn_rcv_from_sock(con)) + break; + + /* Don't flood Rx machine */ + if (++count >= MAX_RECV_MSG_COUNT) { + cond_resched(); + count = 0; + } + } + conn_put(con); +} + +/* tipc_conn_data_ready - interrupt callback indicating the socket has data + * The queued work is launched into tipc_recv_work()->tipc_conn_rcv_from_sock() + */ +static void tipc_conn_data_ready(struct sock *sk) +{ + struct tipc_conn *con; + + read_lock_bh(&sk->sk_callback_lock); + con = sk->sk_user_data; + if (connected(con)) { + conn_get(con); + if (!queue_work(con->server->rcv_wq, &con->rwork)) + conn_put(con); + } + read_unlock_bh(&sk->sk_callback_lock); +} + +static void tipc_topsrv_accept(struct work_struct *work) +{ + struct tipc_topsrv *srv = container_of(work, struct tipc_topsrv, awork); + struct socket *lsock = srv->listener; + struct socket *newsock; + struct tipc_conn *con; + struct sock *newsk; + int ret; + + while (1) { + ret = kernel_accept(lsock, &newsock, O_NONBLOCK); + if (ret < 0) + return; + con = tipc_conn_alloc(srv); + if (IS_ERR(con)) { + ret = PTR_ERR(con); + sock_release(newsock); + return; + } + /* Register callbacks */ + newsk = newsock->sk; + write_lock_bh(&newsk->sk_callback_lock); + newsk->sk_data_ready = tipc_conn_data_ready; + newsk->sk_write_space = tipc_conn_write_space; + newsk->sk_user_data = con; + con->sock = newsock; + write_unlock_bh(&newsk->sk_callback_lock); + + /* Wake up receive process in case of 'SYN+' message */ + newsk->sk_data_ready(newsk); + } +} + +/* tipc_toprsv_listener_data_ready - interrupt callback with connection request + * The queued job is launched into tipc_topsrv_accept() + */ +static void tipc_topsrv_listener_data_ready(struct sock *sk) +{ + struct tipc_topsrv *srv; + + read_lock_bh(&sk->sk_callback_lock); + srv = sk->sk_user_data; + if (srv->listener) + queue_work(srv->rcv_wq, &srv->awork); + read_unlock_bh(&sk->sk_callback_lock); +} + +static int tipc_topsrv_create_listener(struct tipc_topsrv *srv) +{ + int imp = TIPC_CRITICAL_IMPORTANCE; + struct socket *lsock = NULL; + struct sockaddr_tipc saddr; + struct sock *sk; + int rc; + + rc = sock_create_kern(srv->net, AF_TIPC, SOCK_SEQPACKET, 0, &lsock); + if (rc < 0) + return rc; + + srv->listener = lsock; + sk = lsock->sk; + write_lock_bh(&sk->sk_callback_lock); + sk->sk_data_ready = tipc_topsrv_listener_data_ready; + sk->sk_user_data = srv; + write_unlock_bh(&sk->sk_callback_lock); + + rc = kernel_setsockopt(lsock, SOL_TIPC, TIPC_IMPORTANCE, + (char *)&imp, sizeof(imp)); + if (rc < 0) + goto err; + + saddr.family = AF_TIPC; + saddr.addrtype = TIPC_ADDR_NAMESEQ; + saddr.addr.nameseq.type = TIPC_TOP_SRV; + saddr.addr.nameseq.lower = TIPC_TOP_SRV; + saddr.addr.nameseq.upper = TIPC_TOP_SRV; + saddr.scope = TIPC_NODE_SCOPE; + + rc = kernel_bind(lsock, (struct sockaddr *)&saddr, sizeof(saddr)); + if (rc < 0) + goto err; + rc = kernel_listen(lsock, 0); + if (rc < 0) + goto err; + + /* As server's listening socket owner and creator is the same module, + * we have to decrease TIPC module reference count to guarantee that + * it remains zero after the server socket is created, otherwise, + * executing "rmmod" command is unable to make TIPC module deleted + * after TIPC module is inserted successfully. + * + * However, the reference count is ever increased twice in + * sock_create_kern(): one is to increase the reference count of owner + * of TIPC socket's proto_ops struct; another is to increment the + * reference count of owner of TIPC proto struct. Therefore, we must + * decrement the module reference count twice to ensure that it keeps + * zero after server's listening socket is created. Of course, we + * must bump the module reference count twice as well before the socket + * is closed. + */ + module_put(lsock->ops->owner); + module_put(sk->sk_prot_creator->owner); + + return 0; +err: + sock_release(lsock); + return -EINVAL; +} + +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, + u32 upper, u32 filter, int *conid) +{ + struct tipc_subscr sub; + struct tipc_conn *con; + int rc; + + sub.seq.type = type; + sub.seq.lower = lower; + sub.seq.upper = upper; + sub.timeout = TIPC_WAIT_FOREVER; + sub.filter = filter; + *(u32 *)&sub.usr_handle = port; + + con = tipc_conn_alloc(tipc_topsrv(net)); + if (IS_ERR(con)) + return false; + + *conid = con->conid; + con->sock = NULL; + rc = tipc_conn_rcv_sub(tipc_topsrv(net), con, &sub); + if (rc < 0) + tipc_conn_close(con); + return !rc; +} + +void tipc_topsrv_kern_unsubscr(struct net *net, int conid) +{ + struct tipc_conn *con; + + con = tipc_conn_lookup(tipc_topsrv(net), conid); + if (!con) + return; + + test_and_clear_bit(CF_CONNECTED, &con->flags); + tipc_conn_delete_sub(con, NULL); + conn_put(con); + conn_put(con); +} + +static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt) +{ + u32 port = *(u32 *)&evt->s.usr_handle; + u32 self = tipc_own_addr(net); + struct sk_buff_head evtq; + struct sk_buff *skb; + + skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), + self, self, port, port, 0); + if (!skb) + return; + msg_set_dest_droppable(buf_msg(skb), true); + memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); + skb_queue_head_init(&evtq); + __skb_queue_tail(&evtq, skb); + tipc_sk_rcv(net, &evtq); +} + +static int tipc_topsrv_work_start(struct tipc_topsrv *s) +{ + s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); + if (!s->rcv_wq) { + pr_err("can't start tipc receive workqueue\n"); + return -ENOMEM; + } + + s->send_wq = alloc_ordered_workqueue("tipc_send", 0); + if (!s->send_wq) { + pr_err("can't start tipc send workqueue\n"); + destroy_workqueue(s->rcv_wq); + return -ENOMEM; + } + + return 0; +} + +static void tipc_topsrv_work_stop(struct tipc_topsrv *s) +{ + destroy_workqueue(s->rcv_wq); + destroy_workqueue(s->send_wq); +} + +int tipc_topsrv_start(struct net *net) +{ + struct tipc_net *tn = tipc_net(net); + const char name[] = "topology_server"; + struct tipc_topsrv *srv; + int ret; + + srv = kzalloc(sizeof(*srv), GFP_ATOMIC); + if (!srv) + return -ENOMEM; + + srv->net = net; + srv->max_rcvbuf_size = sizeof(struct tipc_subscr); + INIT_WORK(&srv->awork, tipc_topsrv_accept); + + strncpy(srv->name, name, strlen(name) + 1); + tn->topsrv = srv; + atomic_set(&tn->subscription_count, 0); + + spin_lock_init(&srv->idr_lock); + idr_init(&srv->conn_idr); + srv->idr_in_use = 0; + + ret = tipc_topsrv_work_start(srv); + if (ret < 0) + return ret; + + ret = tipc_topsrv_create_listener(srv); + if (ret < 0) + tipc_topsrv_work_stop(srv); + + return ret; +} + +void tipc_topsrv_stop(struct net *net) +{ + struct tipc_topsrv *srv = tipc_topsrv(net); + struct socket *lsock = srv->listener; + struct tipc_conn *con; + int id; + + spin_lock_bh(&srv->idr_lock); + for (id = 0; srv->idr_in_use; id++) { + con = idr_find(&srv->conn_idr, id); + if (con) { + spin_unlock_bh(&srv->idr_lock); + tipc_conn_close(con); + spin_lock_bh(&srv->idr_lock); + } + } + __module_get(lsock->ops->owner); + __module_get(lsock->sk->sk_prot_creator->owner); + sock_release(lsock); + srv->listener = NULL; + spin_unlock_bh(&srv->idr_lock); + tipc_topsrv_work_stop(srv); + idr_destroy(&srv->conn_idr); + kfree(srv); +} diff --git a/net/tipc/topsrv.h b/net/tipc/topsrv.h new file mode 100644 index 0000000..c7ea712 --- /dev/null +++ b/net/tipc/topsrv.h @@ -0,0 +1,54 @@ +/* + * net/tipc/server.h: Include file for TIPC server code + * + * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SERVER_H +#define _TIPC_SERVER_H + +#include "core.h" + +#define TIPC_SERVER_NAME_LEN 32 +#define TIPC_SUB_CLUSTER_SCOPE 0x20 +#define TIPC_SUB_NODE_SCOPE 0x40 +#define TIPC_SUB_NO_STATUS 0x80 + +void tipc_topsrv_queue_evt(struct net *net, int conid, + u32 event, struct tipc_event *evt); + +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, + u32 upper, u32 filter, int *conid); +void tipc_topsrv_kern_unsubscr(struct net *net, int conid); + +#endif