diff mbox

[net-next,v2] tipc: add peer removal functionality

Message ID 1452607696-24216-1-git-send-email-richard.alpe@ericsson.com
State Deferred, archived
Delegated to: David Miller
Headers show

Commit Message

Richard Alpe Jan. 12, 2016, 2:08 p.m. UTC
Add TIPC_NL_PEER_REMOVE netlink command. This command can remove
an offline peer node from the internal data structures.

This will be supported by the tipc user space tool in iproute2.

Signed-off-by: Richard Alpe <richard.alpe@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
---
 include/uapi/linux/tipc_netlink.h |  1 +
 net/tipc/net.c                    |  4 +--
 net/tipc/net.h                    |  2 ++
 net/tipc/netlink.c                |  5 ++++
 net/tipc/node.c                   | 60 +++++++++++++++++++++++++++++++++++----
 net/tipc/node.h                   |  3 +-
 6 files changed, 66 insertions(+), 9 deletions(-)

Comments

David Miller Jan. 12, 2016, 8:31 p.m. UTC | #1
From: Richard Alpe <richard.alpe@ericsson.com>
Date: Tue, 12 Jan 2016 15:08:16 +0100

> Add TIPC_NL_PEER_REMOVE netlink command. This command can remove
> an offline peer node from the internal data structures.
> 
> This will be supported by the tipc user space tool in iproute2.
> 
> Signed-off-by: Richard Alpe <richard.alpe@ericsson.com>
> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
> Acked-by: Ying Xue <ying.xue@windriver.com>

Please see:

	http://marc.info/?l=linux-netdev&m=145248145925834&w=2
Jon Maloy Feb. 10, 2016, 7:43 p.m. UTC | #2
Hi Jason,
I believe you are right, although the real problem is not what you think it is.
I can see at least two interesting scenarios here,  of which I believe you are 
alluding to the first one below. However, I don't think the problem is the
way Richard is using node_stop(), but rather with node_stop() itself. 
See below.

Scenario 1:
--------------

CPU 1                                CPU2
---------                             --------
1: node_timeout()                                                                                     //refcnt == 1
2:     node_get()                                                                                             //refcnt -> 2
3:                                       node_stop()
4:                                           del_timer()                                                       // expired timer returns 0
6:                                           node_put()                                                      // refcnt -> 1
6:                                                                          node_find()
7:                                                                               node_get()                   //refcnt -> 2
8:    node_put()                                                                                               //refcnt -> 1
9:                                                                                node_put()                  //refcnt -> 0
10:                                                                                    node_kref_release()
11:                                                                                       node_delete()
12:                                                                                             hlist_del_rcu()
13:                                                                                             kfree(node)

There are several possible variations on this theme, e.g., with multiple
parallel node_find() calls, but I don't think  they will normally lead to
a crash. Even though  node_find() sometimes may return the reference
to a node that was meant to be deleted, it will find the node in a state
where it will just decide to leave it again, where after the node will be
deleted.


The second scenario is more problematic:

Scenario 2:
-------------
CPU 1                                             CPU2
---------                                         --------
1: node_timeout()                                                          //refcnt == 1
2:                                                  node_stop()
3:                                                           del_timer()          // expired timer returns 0
4:                                                           node_put()          //refcnt -> 0
5:                                                                 node_kref_release()
6:                                                                        node_delete()
7:                                                                                kfree(node)
8:     tipc_node_readlock(node)
9:   BOOM!

This one explains a crash I have seen occasionally when I am deleting a huge
 number (>100) of name spaces interconnected with TIPC. I don't know why I
 have been using del_timer() instead of del_timer_sync() here, which would
have been the  natural choice. I probably believed that our reference counter
handling would make it safe, which it obviously isn't.

Regards
///jon



> -----Original Message-----
> From: jason [mailto:huzhijiang@gmail.com]
> Sent: Tuesday, 26 January, 2016 02:37
> To: Richard Alpe
> Cc: netdev@vger.kernel.org; tipc-discussion@lists.sourceforge.net
> Subject: Re: [tipc-discussion] [PATCH net-next v2] tipc: add peer removal
> functionality
> 
> Hi,
> 
> On Jan 12, 2016 10:09 PM, "Richard Alpe" <richard.alpe@ericsson.com>
> wrote:
> >
> > Add TIPC_NL_PEER_REMOVE netlink command. This command can remove
> an
> > offline peer node from the internal data structures.
> >
> > This will be supported by the tipc user space tool in iproute2.
> >
> > Signed-off-by: Richard Alpe <richard.alpe@ericsson.com>
> > Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
> > Acked-by: Ying Xue <ying.xue@windriver.com>
> > ---
> >  include/uapi/linux/tipc_netlink.h |  1 +
> >  net/tipc/net.c                    |  4 +--
> >  net/tipc/net.h                    |  2 ++
> >  net/tipc/netlink.c                |  5 ++++
> >  net/tipc/node.c                   | 60
> +++++++++++++++++++++++++++++++++++----
> >  net/tipc/node.h                   |  3 +-
> >  6 files changed, 66 insertions(+), 9 deletions(-)
> >
> > diff --git a/include/uapi/linux/tipc_netlink.h
> b/include/uapi/linux/tipc_netlink.h
> > index d4c8f14..25eb645 100644
> > --- a/include/uapi/linux/tipc_netlink.h
> > +++ b/include/uapi/linux/tipc_netlink.h
> > @@ -56,6 +56,7 @@ enum {
> >         TIPC_NL_NET_GET,
> >         TIPC_NL_NET_SET,
> >         TIPC_NL_NAME_TABLE_GET,
> > +       TIPC_NL_PEER_REMOVE,
> >
> >         __TIPC_NL_CMD_MAX,
> >         TIPC_NL_CMD_MAX = __TIPC_NL_CMD_MAX - 1 diff --git
> > a/net/tipc/net.c b/net/tipc/net.c index 77bf911..042d4ce 100644
> > --- a/net/tipc/net.c
> > +++ b/net/tipc/net.c
> > @@ -42,7 +42,7 @@
> >  #include "node.h"
> >  #include "bcast.h"
> >
> > -static const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX +
> > 1]
> = {
> > +const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX + 1] = {
> >         [TIPC_NLA_NET_UNSPEC]   = { .type = NLA_UNSPEC },
> >         [TIPC_NLA_NET_ID]       = { .type = NLA_U32 }
> >  };
> > @@ -139,7 +139,7 @@ void tipc_net_stop(struct net *net)
> >                               tn->own_addr);
> >         rtnl_lock();
> >         tipc_bearer_stop(net);
> > -       tipc_node_stop(net);
> > +       tipc_node_stop_net(net);
> >         rtnl_unlock();
> >
> >         pr_info("Left network mode\n"); diff --git a/net/tipc/net.h
> > b/net/tipc/net.h index 77a7a11..c7c2549 100644
> > --- a/net/tipc/net.h
> > +++ b/net/tipc/net.h
> > @@ -39,6 +39,8 @@
> >
> >  #include <net/genetlink.h>
> >
> > +extern const struct nla_policy tipc_nl_net_policy[];
> > +
> >  int tipc_net_start(struct net *net, u32 addr);
> >
> >  void tipc_net_stop(struct net *net);
> > diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index
> > 8975b01..9019df8 100644
> > --- a/net/tipc/netlink.c
> > +++ b/net/tipc/netlink.c
> > @@ -145,6 +145,11 @@ static const struct genl_ops tipc_genl_v2_ops[] = {
> >                 .cmd    = TIPC_NL_NAME_TABLE_GET,
> >                 .dumpit = tipc_nl_name_table_dump,
> >                 .policy = tipc_nl_policy,
> > +       },
> > +       {
> > +               .cmd    = TIPC_NL_PEER_REMOVE,
> > +               .doit   = tipc_nl_peer_rm,
> > +               .policy = tipc_nl_policy,
> >         }
> >  };
> >
> > diff --git a/net/tipc/node.c b/net/tipc/node.c index fa97d96..988cdd9
> > 100644
> > --- a/net/tipc/node.c
> > +++ b/net/tipc/node.c
> > @@ -399,17 +399,21 @@ static void tipc_node_delete(struct tipc_node
> *node)
> >         kfree_rcu(node, rcu);
> >  }
> >
> > -void tipc_node_stop(struct net *net)
> > +static void tipc_node_stop(struct tipc_node *node) {
> > +       if (del_timer(&node->timer))
> > +               tipc_node_put(node);
> > +       tipc_node_put(node);
> > +}
> > +
> > +void tipc_node_stop_net(struct net *net)
> >  {
> >         struct tipc_net *tn = net_generic(net, tipc_net_id);
> >         struct tipc_node *node, *t_node;
> >
> >         spin_lock_bh(&tn->node_list_lock);
> > -       list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
> > -               if (del_timer(&node->timer))
> > -                       tipc_node_put(node);
> > -               tipc_node_put(node);
> > -       }
> > +       list_for_each_entry_safe(node, t_node, &tn->node_list, list)
> > +               tipc_node_stop(node);
> >         spin_unlock_bh(&tn->node_list_lock);
> >  }
> >
> > @@ -1529,6 +1533,50 @@ discard:
> >         kfree_skb(skb);
> >  }
> >
> > +int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info) {
> > +       int err;
> > +       u32 addr;
> > +       struct net *net = sock_net(skb->sk);
> > +       struct nlattr *attrs[TIPC_NLA_NET_MAX + 1];
> > +       struct tipc_net *tn = net_generic(net, tipc_net_id);
> > +       struct tipc_node *peer;
> > +
> > +       /* We identify the peer by its net */
> > +       if (!info->attrs[TIPC_NLA_NET])
> > +               return -EINVAL;
> > +
> > +       err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX,
> > +                              info->attrs[TIPC_NLA_NET],
> > +                              tipc_nl_net_policy);
> > +       if (err)
> > +               return err;
> > +
> > +       if (!attrs[TIPC_NLA_NET_ADDR])
> > +               return -EINVAL;
> > +
> > +       addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]);
> > +
> > +       spin_lock_bh(&tn->node_list_lock);
> > +       list_for_each_entry_rcu(peer, &tn->node_list, list) {
> > +               if (peer->addr != addr)
> > +                       continue;
> > +
> > +               if (peer->state == SELF_DOWN_PEER_DOWN ||
> > +                   peer->state == SELF_DOWN_PEER_LEAVING) {
> > +                       tipc_node_stop(peer);
> 
> I think here is not safe to do tipc_node_stop(). Because here we are not sure
> that node's refcount  really can drop to zero,thus, not sure
> tipc_node_delete() can be execute in a critical region which protected by
> node_list_lock spin lock. This may result in tipc_create_node() to return a to-
> be-deleted node which is not in the node list any more.
> > +
> > +                       spin_unlock_bh(&tn->node_list_lock);
> > +                       return 0;
> > +               }
> > +               spin_unlock_bh(&tn->node_list_lock);
> > +               return -EBUSY;
> > +       }
> > +       spin_unlock_bh(&tn->node_list_lock);
> > +
> > +       return -ENXIO;
> > +}
> > +
> >  int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback
> > *cb)  {
> >         int err;
> > diff --git a/net/tipc/node.h b/net/tipc/node.h index f39d9d0..8dfb6ba
> > 100644
> > --- a/net/tipc/node.h
> > +++ b/net/tipc/node.h
> > @@ -51,7 +51,7 @@ enum {
> >  #define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH  #define
> > INVALID_BEARER_ID -1
> >
> > -void tipc_node_stop(struct net *net);
> > +void tipc_node_stop_net(struct net *net);
> >  void tipc_node_check_dest(struct net *net, u32 onode,
> >                           struct tipc_bearer *bearer,
> >                           u16 capabilities, u32 signature, @@ -75,5
> > +75,6 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct
> netlink_callback *cb);
> >  int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct
> > genl_info
> *info);
> >  int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info
> > *info);  int tipc_nl_node_set_link(struct sk_buff *skb, struct
> > genl_info *info);
> > +int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info);
> >
> >  #endif
> > --
> > 2.1.4
> >
> >
> >
> ------------------------------------------------------------------------------
> > Site24x7 APM Insight: Get Deep Visibility into Application Performance
> > APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month
> > Monitor end-to-end web transactions and take corrective actions now
> > Troubleshoot faster and improve end-user experience. Signup Now!
> > http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140
> > _______________________________________________
> > tipc-discussion mailing list
> > tipc-discussion@lists.sourceforge.net
> > https://lists.sourceforge.net/lists/listinfo/tipc-discussion
> ------------------------------------------------------------------------------
> Site24x7 APM Insight: Get Deep Visibility into Application Performance APM
> + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor
> end-to-end web transactions and take corrective actions now Troubleshoot
> faster and improve end-user experience. Signup Now!
> http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140
> _______________________________________________
> tipc-discussion mailing list
> tipc-discussion@lists.sourceforge.net
> https://lists.sourceforge.net/lists/listinfo/tipc-discussion
Jon Maloy Feb. 10, 2016, 8:22 p.m. UTC | #3
There is of course a CPU3 involved in the first scenario. Just forgot to add it to the header.

///jon


> -----Original Message-----
> From: netdev-owner@vger.kernel.org [mailto:netdev-
> owner@vger.kernel.org] On Behalf Of Jon Maloy
> Sent: Wednesday, 10 February, 2016 14:44
> To: jason; Richard Alpe
> Cc: netdev@vger.kernel.org; tipc-discussion@lists.sourceforge.net
> Subject: RE: [tipc-discussion] [PATCH net-next v2] tipc: add peer removal
> functionality
> 
> Hi Jason,
> I believe you are right, although the real problem is not what you think it is.
> I can see at least two interesting scenarios here,  of which I believe you are
> alluding to the first one below. However, I don't think the problem is the way
> Richard is using node_stop(), but rather with node_stop() itself.
> See below.
> 
> Scenario 1:
> --------------
> 
> CPU 1                                CPU2
> ---------                             --------
> 1: node_timeout()                                                                                     //refcnt == 1
> 2:     node_get()                                                                                             //refcnt -> 2
> 3:                                       node_stop()
> 4:                                           del_timer()                                                       // expired
> timer returns 0
> 6:                                           node_put()                                                      // refcnt -> 1
> 6:                                                                          node_find()
> 7:                                                                               node_get()                   //refcnt -> 2
> 8:    node_put()                                                                                               //refcnt -> 1
> 9:                                                                                node_put()                  //refcnt -> 0
> 10:                                                                                    node_kref_release()
> 11:                                                                                       node_delete()
> 12:                                                                                             hlist_del_rcu()
> 13:                                                                                             kfree(node)
> 
> There are several possible variations on this theme, e.g., with multiple
> parallel node_find() calls, but I don't think  they will normally lead to a crash.
> Even though  node_find() sometimes may return the reference to a node
> that was meant to be deleted, it will find the node in a state where it will just
> decide to leave it again, where after the node will be deleted.
> 
> 
> The second scenario is more problematic:
> 
> Scenario 2:
> -------------
> CPU 1                                             CPU2
> ---------                                         --------
> 1: node_timeout()                                                          //refcnt == 1
> 2:                                                  node_stop()
> 3:                                                           del_timer()          // expired timer returns 0
> 4:                                                           node_put()          //refcnt -> 0
> 5:                                                                 node_kref_release()
> 6:                                                                        node_delete()
> 7:                                                                                kfree(node)
> 8:     tipc_node_readlock(node)
> 9:   BOOM!
> 
> This one explains a crash I have seen occasionally when I am deleting a huge
> number (>100) of name spaces interconnected with TIPC. I don't know why I
> have been using del_timer() instead of del_timer_sync() here, which would
> have been the  natural choice. I probably believed that our reference
> counter handling would make it safe, which it obviously isn't.
> 
> Regards
> ///jon
> 
> 
> 
> > -----Original Message-----
> > From: jason [mailto:huzhijiang@gmail.com]
> > Sent: Tuesday, 26 January, 2016 02:37
> > To: Richard Alpe
> > Cc: netdev@vger.kernel.org; tipc-discussion@lists.sourceforge.net
> > Subject: Re: [tipc-discussion] [PATCH net-next v2] tipc: add peer
> > removal functionality
> >
> > Hi,
> >
> > On Jan 12, 2016 10:09 PM, "Richard Alpe" <richard.alpe@ericsson.com>
> > wrote:
> > >
> > > Add TIPC_NL_PEER_REMOVE netlink command. This command can
> remove
> > an
> > > offline peer node from the internal data structures.
> > >
> > > This will be supported by the tipc user space tool in iproute2.
> > >
> > > Signed-off-by: Richard Alpe <richard.alpe@ericsson.com>
> > > Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
> > > Acked-by: Ying Xue <ying.xue@windriver.com>
> > > ---
> > >  include/uapi/linux/tipc_netlink.h |  1 +
> > >  net/tipc/net.c                    |  4 +--
> > >  net/tipc/net.h                    |  2 ++
> > >  net/tipc/netlink.c                |  5 ++++
> > >  net/tipc/node.c                   | 60
> > +++++++++++++++++++++++++++++++++++----
> > >  net/tipc/node.h                   |  3 +-
> > >  6 files changed, 66 insertions(+), 9 deletions(-)
> > >
> > > diff --git a/include/uapi/linux/tipc_netlink.h
> > b/include/uapi/linux/tipc_netlink.h
> > > index d4c8f14..25eb645 100644
> > > --- a/include/uapi/linux/tipc_netlink.h
> > > +++ b/include/uapi/linux/tipc_netlink.h
> > > @@ -56,6 +56,7 @@ enum {
> > >         TIPC_NL_NET_GET,
> > >         TIPC_NL_NET_SET,
> > >         TIPC_NL_NAME_TABLE_GET,
> > > +       TIPC_NL_PEER_REMOVE,
> > >
> > >         __TIPC_NL_CMD_MAX,
> > >         TIPC_NL_CMD_MAX = __TIPC_NL_CMD_MAX - 1 diff --git
> > > a/net/tipc/net.c b/net/tipc/net.c index 77bf911..042d4ce 100644
> > > --- a/net/tipc/net.c
> > > +++ b/net/tipc/net.c
> > > @@ -42,7 +42,7 @@
> > >  #include "node.h"
> > >  #include "bcast.h"
> > >
> > > -static const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX
> > > + 1]
> > = {
> > > +const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX + 1] =
> > > +{
> > >         [TIPC_NLA_NET_UNSPEC]   = { .type = NLA_UNSPEC },
> > >         [TIPC_NLA_NET_ID]       = { .type = NLA_U32 }
> > >  };
> > > @@ -139,7 +139,7 @@ void tipc_net_stop(struct net *net)
> > >                               tn->own_addr);
> > >         rtnl_lock();
> > >         tipc_bearer_stop(net);
> > > -       tipc_node_stop(net);
> > > +       tipc_node_stop_net(net);
> > >         rtnl_unlock();
> > >
> > >         pr_info("Left network mode\n"); diff --git a/net/tipc/net.h
> > > b/net/tipc/net.h index 77a7a11..c7c2549 100644
> > > --- a/net/tipc/net.h
> > > +++ b/net/tipc/net.h
> > > @@ -39,6 +39,8 @@
> > >
> > >  #include <net/genetlink.h>
> > >
> > > +extern const struct nla_policy tipc_nl_net_policy[];
> > > +
> > >  int tipc_net_start(struct net *net, u32 addr);
> > >
> > >  void tipc_net_stop(struct net *net); diff --git
> > > a/net/tipc/netlink.c b/net/tipc/netlink.c index
> > > 8975b01..9019df8 100644
> > > --- a/net/tipc/netlink.c
> > > +++ b/net/tipc/netlink.c
> > > @@ -145,6 +145,11 @@ static const struct genl_ops tipc_genl_v2_ops[] =
> {
> > >                 .cmd    = TIPC_NL_NAME_TABLE_GET,
> > >                 .dumpit = tipc_nl_name_table_dump,
> > >                 .policy = tipc_nl_policy,
> > > +       },
> > > +       {
> > > +               .cmd    = TIPC_NL_PEER_REMOVE,
> > > +               .doit   = tipc_nl_peer_rm,
> > > +               .policy = tipc_nl_policy,
> > >         }
> > >  };
> > >
> > > diff --git a/net/tipc/node.c b/net/tipc/node.c index
> > > fa97d96..988cdd9
> > > 100644
> > > --- a/net/tipc/node.c
> > > +++ b/net/tipc/node.c
> > > @@ -399,17 +399,21 @@ static void tipc_node_delete(struct tipc_node
> > *node)
> > >         kfree_rcu(node, rcu);
> > >  }
> > >
> > > -void tipc_node_stop(struct net *net)
> > > +static void tipc_node_stop(struct tipc_node *node) {
> > > +       if (del_timer(&node->timer))
> > > +               tipc_node_put(node);
> > > +       tipc_node_put(node);
> > > +}
> > > +
> > > +void tipc_node_stop_net(struct net *net)
> > >  {
> > >         struct tipc_net *tn = net_generic(net, tipc_net_id);
> > >         struct tipc_node *node, *t_node;
> > >
> > >         spin_lock_bh(&tn->node_list_lock);
> > > -       list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
> > > -               if (del_timer(&node->timer))
> > > -                       tipc_node_put(node);
> > > -               tipc_node_put(node);
> > > -       }
> > > +       list_for_each_entry_safe(node, t_node, &tn->node_list, list)
> > > +               tipc_node_stop(node);
> > >         spin_unlock_bh(&tn->node_list_lock);
> > >  }
> > >
> > > @@ -1529,6 +1533,50 @@ discard:
> > >         kfree_skb(skb);
> > >  }
> > >
> > > +int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info) {
> > > +       int err;
> > > +       u32 addr;
> > > +       struct net *net = sock_net(skb->sk);
> > > +       struct nlattr *attrs[TIPC_NLA_NET_MAX + 1];
> > > +       struct tipc_net *tn = net_generic(net, tipc_net_id);
> > > +       struct tipc_node *peer;
> > > +
> > > +       /* We identify the peer by its net */
> > > +       if (!info->attrs[TIPC_NLA_NET])
> > > +               return -EINVAL;
> > > +
> > > +       err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX,
> > > +                              info->attrs[TIPC_NLA_NET],
> > > +                              tipc_nl_net_policy);
> > > +       if (err)
> > > +               return err;
> > > +
> > > +       if (!attrs[TIPC_NLA_NET_ADDR])
> > > +               return -EINVAL;
> > > +
> > > +       addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]);
> > > +
> > > +       spin_lock_bh(&tn->node_list_lock);
> > > +       list_for_each_entry_rcu(peer, &tn->node_list, list) {
> > > +               if (peer->addr != addr)
> > > +                       continue;
> > > +
> > > +               if (peer->state == SELF_DOWN_PEER_DOWN ||
> > > +                   peer->state == SELF_DOWN_PEER_LEAVING) {
> > > +                       tipc_node_stop(peer);
> >
> > I think here is not safe to do tipc_node_stop(). Because here we are
> > not sure that node's refcount  really can drop to zero,thus, not sure
> > tipc_node_delete() can be execute in a critical region which protected
> > by node_list_lock spin lock. This may result in tipc_create_node() to
> > return a to- be-deleted node which is not in the node list any more.
> > > +
> > > +                       spin_unlock_bh(&tn->node_list_lock);
> > > +                       return 0;
> > > +               }
> > > +               spin_unlock_bh(&tn->node_list_lock);
> > > +               return -EBUSY;
> > > +       }
> > > +       spin_unlock_bh(&tn->node_list_lock);
> > > +
> > > +       return -ENXIO;
> > > +}
> > > +
> > >  int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback
> > > *cb)  {
> > >         int err;
> > > diff --git a/net/tipc/node.h b/net/tipc/node.h index
> > > f39d9d0..8dfb6ba
> > > 100644
> > > --- a/net/tipc/node.h
> > > +++ b/net/tipc/node.h
> > > @@ -51,7 +51,7 @@ enum {
> > >  #define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH  #define
> > > INVALID_BEARER_ID -1
> > >
> > > -void tipc_node_stop(struct net *net);
> > > +void tipc_node_stop_net(struct net *net);
> > >  void tipc_node_check_dest(struct net *net, u32 onode,
> > >                           struct tipc_bearer *bearer,
> > >                           u16 capabilities, u32 signature, @@ -75,5
> > > +75,6 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct
> > netlink_callback *cb);
> > >  int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct
> > > genl_info
> > *info);
> > >  int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info
> > > *info);  int tipc_nl_node_set_link(struct sk_buff *skb, struct
> > > genl_info *info);
> > > +int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info);
> > >
> > >  #endif
> > > --
> > > 2.1.4
> > >
> > >
> > >
> > ----------------------------------------------------------------------
> > --------
> > > Site24x7 APM Insight: Get Deep Visibility into Application
> > > Performance APM + Mobile APM + RUM: Monitor 3 App instances at just
> > > $35/Month Monitor end-to-end web transactions and take corrective
> > > actions now Troubleshoot faster and improve end-user experience.
> Signup Now!
> > > http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140
> > > _______________________________________________
> > > tipc-discussion mailing list
> > > tipc-discussion@lists.sourceforge.net
> > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion
> > ----------------------------------------------------------------------
> > --------
> > Site24x7 APM Insight: Get Deep Visibility into Application Performance
> > APM
> > + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor
> > end-to-end web transactions and take corrective actions now
> > Troubleshoot faster and improve end-user experience. Signup Now!
> > http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140
> > _______________________________________________
> > tipc-discussion mailing list
> > tipc-discussion@lists.sourceforge.net
> > https://lists.sourceforge.net/lists/listinfo/tipc-discussion
Jon Maloy Feb. 10, 2016, 10:39 p.m. UTC | #4
> -----Original Message-----

> From: jason [mailto:huzhijiang@gmail.com]

> Sent: Wednesday, 10 February, 2016 15:34

> To: Jon Maloy

> Cc: netdev@vger.kernel.org; tipc-discussion@lists.sourceforge.net; Richard

> Alpe

> Subject: RE: [tipc-discussion] [PATCH net-next v2] tipc: add peer removal

> functionality

> 

> Hi Jon,

> I think the refcount use case here looks like the same as a example in

> https://www.kernel.org/doc/Documentation/kref.txt (the last one) in which

> we should use kref_get_unless_zero() instead of kref_get() and use node

> list lock immediately around list_del_rcu(). In this way it can resolve the

> problem that referncing a to-be-freed node outside of rcu protection as well

> as the list protection problem which I realize is the real problem in my first

> mail I should have pointed out (thus, list modification operations may taking

> place concurrently without lock protection). So, it worth for us to swith to

> that refcounting model in kref.txt.

> 

> But the example however, did not show how to do removal especially how

> to interact with the timer for each object(node in our case) :(.


I wonder if we cannot do this very simple.  node_stop() is called from two
locations inside node.c, both protected by the node list spinlock. We could 
just make sure that the removal from the node list is properly done within
the lock scope, i.e., we move the two calls list_del_rcu() and hlist_del_rcu()
from node_delete() to node_stop(). That way, nobody can find the node
instance after the list lock is released, and we should be safe. The actual
freeing of the instance can still be done as we do now, in node_delete()
when kref goes to zero.

Unfortunately I realized that my scenario 2 was wrong. kref is 2 when the 
timer is running, not 1 as I claimed. So now I don't understand what is causing
the crash I am seeing, and I don't see any connection to the above problem
either. (It is really the timer function that is accessing a freed node instance
during the shutdown phase.) Can you see anything that I cannot see?

///jon

> 

> On Feb 11, 2016 3:43 AM, "Jon Maloy" <jon.maloy@ericsson.com> wrote:

> 

> 

> 	Hi Jason,

> 	I believe you are right, although the real problem is not what you

> think it is.

> 	I can see at least two interesting scenarios here,  of which I believe

> you are

> 	alluding to the first one below. However, I don't think the problem is

> the

> 	way Richard is using node_stop(), but rather with node_stop() itself.

> 	See below.

> 

> 	Scenario 1:

> 	--------------

> 

> 	CPU 1                                CPU2

> 	---------                             --------

> 	1: node_timeout()                                                                                     //refcnt

> == 1

> 	2:     node_get()

> //refcnt -> 2

> 	3:                                       node_stop()

> 	4:                                           del_timer()                                                       //

> expired timer returns 0

> 	6:                                           node_put()                                                      //

> refcnt -> 1

> 	6:                                                                          node_find()

> 	7:                                                                               node_get()

> //refcnt -> 2

> 	8:    node_put()

> //refcnt -> 1

> 	9:                                                                                node_put()

> //refcnt -> 0

> 	10:                                                                                    node_kref_release()

> 	11:                                                                                       node_delete()

> 	12:                                                                                             hlist_del_rcu()

> 	13:                                                                                             kfree(node)

> 

> 	There are several possible variations on this theme, e.g., with

> multiple

> 	parallel node_find() calls, but I don't think  they will normally lead to

> 	a crash. Even though  node_find() sometimes may return the

> reference

> 	to a node that was meant to be deleted, it will find the node in a

> state

> 	where it will just decide to leave it again, where after the node will

> be

> 	deleted.

> 

> 

> 	The second scenario is more problematic:

> 

> 	Scenario 2:

> 	-------------

> 	CPU 1                                             CPU2

> 	---------                                         --------

> 	1: node_timeout()                                                          //refcnt == 1

> 	2:                                                  node_stop()

> 	3:                                                           del_timer()          // expired timer

> returns 0

> 	4:                                                           node_put()          //refcnt -> 0

> 	5:                                                                 node_kref_release()

> 	6:                                                                        node_delete()

> 	7:                                                                                kfree(node)

> 	8:     tipc_node_readlock(node)

> 	9:   BOOM!

> 

> 	This one explains a crash I have seen occasionally when I am deleting

> a huge

> 	 number (>100) of name spaces interconnected with TIPC. I don't

> know why I

> 	 have been using del_timer() instead of del_timer_sync() here,

> which would

> 	have been the  natural choice. I probably believed that our reference

> counter

> 	handling would make it safe, which it obviously isn't.

> 

> 	Regards

> 	///jon

> 

> 

> 

> 	> -----Original Message-----

> 	> From: jason [mailto:huzhijiang@gmail.com]

> 	> Sent: Tuesday, 26 January, 2016 02:37

> 	> To: Richard Alpe

> 	> Cc: netdev@vger.kernel.org; tipc-discussion@lists.sourceforge.net

> 	> Subject: Re: [tipc-discussion] [PATCH net-next v2] tipc: add peer

> removal

> 	> functionality

> 	>

> 	> Hi,

> 	>

> 	> On Jan 12, 2016 10:09 PM, "Richard Alpe"

> <richard.alpe@ericsson.com>

> 	> wrote:

> 	> >

> 	> > Add TIPC_NL_PEER_REMOVE netlink command. This command

> can remove

> 	> an

> 	> > offline peer node from the internal data structures.

> 	> >

> 	> > This will be supported by the tipc user space tool in iproute2.

> 	> >

> 	> > Signed-off-by: Richard Alpe <richard.alpe@ericsson.com>

> 	> > Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>

> 	> > Acked-by: Ying Xue <ying.xue@windriver.com>

> 	> > ---

> 	> >  include/uapi/linux/tipc_netlink.h |  1 +

> 	> >  net/tipc/net.c                    |  4 +--

> 	> >  net/tipc/net.h                    |  2 ++

> 	> >  net/tipc/netlink.c                |  5 ++++

> 	> >  net/tipc/node.c                   | 60

> 	> +++++++++++++++++++++++++++++++++++----

> 	> >  net/tipc/node.h                   |  3 +-

> 	> >  6 files changed, 66 insertions(+), 9 deletions(-)

> 	> >

> 	> > diff --git a/include/uapi/linux/tipc_netlink.h

> 	> b/include/uapi/linux/tipc_netlink.h

> 	> > index d4c8f14..25eb645 100644

> 	> > --- a/include/uapi/linux/tipc_netlink.h

> 	> > +++ b/include/uapi/linux/tipc_netlink.h

> 	> > @@ -56,6 +56,7 @@ enum {

> 	> >         TIPC_NL_NET_GET,

> 	> >         TIPC_NL_NET_SET,

> 	> >         TIPC_NL_NAME_TABLE_GET,

> 	> > +       TIPC_NL_PEER_REMOVE,

> 	> >

> 	> >         __TIPC_NL_CMD_MAX,

> 	> >         TIPC_NL_CMD_MAX = __TIPC_NL_CMD_MAX - 1 diff --git

> 	> > a/net/tipc/net.c b/net/tipc/net.c index 77bf911..042d4ce 100644

> 	> > --- a/net/tipc/net.c

> 	> > +++ b/net/tipc/net.c

> 	> > @@ -42,7 +42,7 @@

> 	> >  #include "node.h"

> 	> >  #include "bcast.h"

> 	> >

> 	> > -static const struct nla_policy

> tipc_nl_net_policy[TIPC_NLA_NET_MAX +

> 	> > 1]

> 	> = {

> 	> > +const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX

> + 1] = {

> 	> >         [TIPC_NLA_NET_UNSPEC]   = { .type = NLA_UNSPEC },

> 	> >         [TIPC_NLA_NET_ID]       = { .type = NLA_U32 }

> 	> >  };

> 	> > @@ -139,7 +139,7 @@ void tipc_net_stop(struct net *net)

> 	> >                               tn->own_addr);

> 	> >         rtnl_lock();

> 	> >         tipc_bearer_stop(net);

> 	> > -       tipc_node_stop(net);

> 	> > +       tipc_node_stop_net(net);

> 	> >         rtnl_unlock();

> 	> >

> 	> >         pr_info("Left network mode\n"); diff --git a/net/tipc/net.h

> 	> > b/net/tipc/net.h index 77a7a11..c7c2549 100644

> 	> > --- a/net/tipc/net.h

> 	> > +++ b/net/tipc/net.h

> 	> > @@ -39,6 +39,8 @@

> 	> >

> 	> >  #include <net/genetlink.h>

> 	> >

> 	> > +extern const struct nla_policy tipc_nl_net_policy[];

> 	> > +

> 	> >  int tipc_net_start(struct net *net, u32 addr);

> 	> >

> 	> >  void tipc_net_stop(struct net *net);

> 	> > diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index

> 	> > 8975b01..9019df8 100644

> 	> > --- a/net/tipc/netlink.c

> 	> > +++ b/net/tipc/netlink.c

> 	> > @@ -145,6 +145,11 @@ static const struct genl_ops

> tipc_genl_v2_ops[] = {

> 	> >                 .cmd    = TIPC_NL_NAME_TABLE_GET,

> 	> >                 .dumpit = tipc_nl_name_table_dump,

> 	> >                 .policy = tipc_nl_policy,

> 	> > +       },

> 	> > +       {

> 	> > +               .cmd    = TIPC_NL_PEER_REMOVE,

> 	> > +               .doit   = tipc_nl_peer_rm,

> 	> > +               .policy = tipc_nl_policy,

> 	> >         }

> 	> >  };

> 	> >

> 	> > diff --git a/net/tipc/node.c b/net/tipc/node.c index

> fa97d96..988cdd9

> 	> > 100644

> 	> > --- a/net/tipc/node.c

> 	> > +++ b/net/tipc/node.c

> 	> > @@ -399,17 +399,21 @@ static void tipc_node_delete(struct

> tipc_node

> 	> *node)

> 	> >         kfree_rcu(node, rcu);

> 	> >  }

> 	> >

> 	> > -void tipc_node_stop(struct net *net)

> 	> > +static void tipc_node_stop(struct tipc_node *node) {

> 	> > +       if (del_timer(&node->timer))

> 	> > +               tipc_node_put(node);

> 	> > +       tipc_node_put(node);

> 	> > +}

> 	> > +

> 	> > +void tipc_node_stop_net(struct net *net)

> 	> >  {

> 	> >         struct tipc_net *tn = net_generic(net, tipc_net_id);

> 	> >         struct tipc_node *node, *t_node;

> 	> >

> 	> >         spin_lock_bh(&tn->node_list_lock);

> 	> > -       list_for_each_entry_safe(node, t_node, &tn->node_list, list)

> {

> 	> > -               if (del_timer(&node->timer))

> 	> > -                       tipc_node_put(node);

> 	> > -               tipc_node_put(node);

> 	> > -       }

> 	> > +       list_for_each_entry_safe(node, t_node, &tn->node_list,

> list)

> 	> > +               tipc_node_stop(node);

> 	> >         spin_unlock_bh(&tn->node_list_lock);

> 	> >  }

> 	> >

> 	> > @@ -1529,6 +1533,50 @@ discard:

> 	> >         kfree_skb(skb);

> 	> >  }

> 	> >

> 	> > +int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info)

> {

> 	> > +       int err;

> 	> > +       u32 addr;

> 	> > +       struct net *net = sock_net(skb->sk);

> 	> > +       struct nlattr *attrs[TIPC_NLA_NET_MAX + 1];

> 	> > +       struct tipc_net *tn = net_generic(net, tipc_net_id);

> 	> > +       struct tipc_node *peer;

> 	> > +

> 	> > +       /* We identify the peer by its net */

> 	> > +       if (!info->attrs[TIPC_NLA_NET])

> 	> > +               return -EINVAL;

> 	> > +

> 	> > +       err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX,

> 	> > +                              info->attrs[TIPC_NLA_NET],

> 	> > +                              tipc_nl_net_policy);

> 	> > +       if (err)

> 	> > +               return err;

> 	> > +

> 	> > +       if (!attrs[TIPC_NLA_NET_ADDR])

> 	> > +               return -EINVAL;

> 	> > +

> 	> > +       addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]);

> 	> > +

> 	> > +       spin_lock_bh(&tn->node_list_lock);

> 	> > +       list_for_each_entry_rcu(peer, &tn->node_list, list) {

> 	> > +               if (peer->addr != addr)

> 	> > +                       continue;

> 	> > +

> 	> > +               if (peer->state == SELF_DOWN_PEER_DOWN ||

> 	> > +                   peer->state == SELF_DOWN_PEER_LEAVING) {

> 	> > +                       tipc_node_stop(peer);

> 	>

> 	> I think here is not safe to do tipc_node_stop(). Because here we

> are not sure

> 	> that node's refcount  really can drop to zero,thus, not sure

> 	> tipc_node_delete() can be execute in a critical region which

> protected by

> 	> node_list_lock spin lock. This may result in tipc_create_node() to

> return a to-

> 	> be-deleted node which is not in the node list any more.

> 	> > +

> 	> > +                       spin_unlock_bh(&tn->node_list_lock);

> 	> > +                       return 0;

> 	> > +               }

> 	> > +               spin_unlock_bh(&tn->node_list_lock);

> 	> > +               return -EBUSY;

> 	> > +       }

> 	> > +       spin_unlock_bh(&tn->node_list_lock);

> 	> > +

> 	> > +       return -ENXIO;

> 	> > +}

> 	> > +

> 	> >  int tipc_nl_node_dump(struct sk_buff *skb, struct

> netlink_callback

> 	> > *cb)  {

> 	> >         int err;

> 	> > diff --git a/net/tipc/node.h b/net/tipc/node.h index

> f39d9d0..8dfb6ba

> 	> > 100644

> 	> > --- a/net/tipc/node.h

> 	> > +++ b/net/tipc/node.h

> 	> > @@ -51,7 +51,7 @@ enum {

> 	> >  #define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH  #define

> 	> > INVALID_BEARER_ID -1

> 	> >

> 	> > -void tipc_node_stop(struct net *net);

> 	> > +void tipc_node_stop_net(struct net *net);

> 	> >  void tipc_node_check_dest(struct net *net, u32 onode,

> 	> >                           struct tipc_bearer *bearer,

> 	> >                           u16 capabilities, u32 signature, @@ -75,5

> 	> > +75,6 @@ int tipc_nl_node_dump_link(struct sk_buff *skb,

> struct

> 	> netlink_callback *cb);

> 	> >  int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct

> 	> > genl_info

> 	> *info);

> 	> >  int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info

> 	> > *info);  int tipc_nl_node_set_link(struct sk_buff *skb, struct

> 	> > genl_info *info);

> 	> > +int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info);

> 	> >

> 	> >  #endif

> 	> > --

> 	> > 2.1.4

> 	> >

> 	> >

> 	> >

> 	> ------------------------------------------------------------------------------

> 	> > Site24x7 APM Insight: Get Deep Visibility into Application

> Performance

> 	> > APM + Mobile APM + RUM: Monitor 3 App instances at just

> $35/Month

> 	> > Monitor end-to-end web transactions and take corrective actions

> now

> 	> > Troubleshoot faster and improve end-user experience. Signup

> Now!

> 	> >

> http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140

> 	> > _______________________________________________

> 	> > tipc-discussion mailing list

> 	> > tipc-discussion@lists.sourceforge.net

> 	> > https://lists.sourceforge.net/lists/listinfo/tipc-discussion

> 	> ------------------------------------------------------------------------------

> 	> Site24x7 APM Insight: Get Deep Visibility into Application

> Performance APM

> 	> + Mobile APM + RUM: Monitor 3 App instances at just $35/Month

> Monitor

> 	> end-to-end web transactions and take corrective actions now

> Troubleshoot

> 	> faster and improve end-user experience. Signup Now!

> 	>

> http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140

> 	> _______________________________________________

> 	> tipc-discussion mailing list

> 	> tipc-discussion@lists.sourceforge.net

> 	> https://lists.sourceforge.net/lists/listinfo/tipc-discussion

>
diff mbox

Patch

diff --git a/include/uapi/linux/tipc_netlink.h b/include/uapi/linux/tipc_netlink.h
index d4c8f14..25eb645 100644
--- a/include/uapi/linux/tipc_netlink.h
+++ b/include/uapi/linux/tipc_netlink.h
@@ -56,6 +56,7 @@  enum {
 	TIPC_NL_NET_GET,
 	TIPC_NL_NET_SET,
 	TIPC_NL_NAME_TABLE_GET,
+	TIPC_NL_PEER_REMOVE,
 
 	__TIPC_NL_CMD_MAX,
 	TIPC_NL_CMD_MAX = __TIPC_NL_CMD_MAX - 1
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 77bf911..042d4ce 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -42,7 +42,7 @@ 
 #include "node.h"
 #include "bcast.h"
 
-static const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX + 1] = {
+const struct nla_policy tipc_nl_net_policy[TIPC_NLA_NET_MAX + 1] = {
 	[TIPC_NLA_NET_UNSPEC]	= { .type = NLA_UNSPEC },
 	[TIPC_NLA_NET_ID]	= { .type = NLA_U32 }
 };
@@ -139,7 +139,7 @@  void tipc_net_stop(struct net *net)
 			      tn->own_addr);
 	rtnl_lock();
 	tipc_bearer_stop(net);
-	tipc_node_stop(net);
+	tipc_node_stop_net(net);
 	rtnl_unlock();
 
 	pr_info("Left network mode\n");
diff --git a/net/tipc/net.h b/net/tipc/net.h
index 77a7a11..c7c2549 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -39,6 +39,8 @@ 
 
 #include <net/genetlink.h>
 
+extern const struct nla_policy tipc_nl_net_policy[];
+
 int tipc_net_start(struct net *net, u32 addr);
 
 void tipc_net_stop(struct net *net);
diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c
index 8975b01..9019df8 100644
--- a/net/tipc/netlink.c
+++ b/net/tipc/netlink.c
@@ -145,6 +145,11 @@  static const struct genl_ops tipc_genl_v2_ops[] = {
 		.cmd	= TIPC_NL_NAME_TABLE_GET,
 		.dumpit	= tipc_nl_name_table_dump,
 		.policy = tipc_nl_policy,
+	},
+	{
+		.cmd	= TIPC_NL_PEER_REMOVE,
+		.doit	= tipc_nl_peer_rm,
+		.policy = tipc_nl_policy,
 	}
 };
 
diff --git a/net/tipc/node.c b/net/tipc/node.c
index fa97d96..988cdd9 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -399,17 +399,21 @@  static void tipc_node_delete(struct tipc_node *node)
 	kfree_rcu(node, rcu);
 }
 
-void tipc_node_stop(struct net *net)
+static void tipc_node_stop(struct tipc_node *node)
+{
+	if (del_timer(&node->timer))
+		tipc_node_put(node);
+	tipc_node_put(node);
+}
+
+void tipc_node_stop_net(struct net *net)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_node *node, *t_node;
 
 	spin_lock_bh(&tn->node_list_lock);
-	list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
-		if (del_timer(&node->timer))
-			tipc_node_put(node);
-		tipc_node_put(node);
-	}
+	list_for_each_entry_safe(node, t_node, &tn->node_list, list)
+		tipc_node_stop(node);
 	spin_unlock_bh(&tn->node_list_lock);
 }
 
@@ -1529,6 +1533,50 @@  discard:
 	kfree_skb(skb);
 }
 
+int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info)
+{
+	int err;
+	u32 addr;
+	struct net *net = sock_net(skb->sk);
+	struct nlattr *attrs[TIPC_NLA_NET_MAX + 1];
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+	struct tipc_node *peer;
+
+	/* We identify the peer by its net */
+	if (!info->attrs[TIPC_NLA_NET])
+		return -EINVAL;
+
+	err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX,
+			       info->attrs[TIPC_NLA_NET],
+			       tipc_nl_net_policy);
+	if (err)
+		return err;
+
+	if (!attrs[TIPC_NLA_NET_ADDR])
+		return -EINVAL;
+
+	addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]);
+
+	spin_lock_bh(&tn->node_list_lock);
+	list_for_each_entry_rcu(peer, &tn->node_list, list) {
+		if (peer->addr != addr)
+			continue;
+
+		if (peer->state == SELF_DOWN_PEER_DOWN ||
+		    peer->state == SELF_DOWN_PEER_LEAVING) {
+			tipc_node_stop(peer);
+
+			spin_unlock_bh(&tn->node_list_lock);
+			return 0;
+		}
+		spin_unlock_bh(&tn->node_list_lock);
+		return -EBUSY;
+	}
+	spin_unlock_bh(&tn->node_list_lock);
+
+	return -ENXIO;
+}
+
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
 	int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index f39d9d0..8dfb6ba 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -51,7 +51,7 @@  enum {
 #define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
 #define INVALID_BEARER_ID -1
 
-void tipc_node_stop(struct net *net);
+void tipc_node_stop_net(struct net *net);
 void tipc_node_check_dest(struct net *net, u32 onode,
 			  struct tipc_bearer *bearer,
 			  u16 capabilities, u32 signature,
@@ -75,5 +75,6 @@  int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
 int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info);
 int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info);
 int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info);
+int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info);
 
 #endif