@@ -37,6 +37,8 @@ enum netpolicy_traffic {
NETPOLICY_RXTX,
};
+#define NETPOLICY_INVALID_QUEUE -1
+#define NETPOLICY_INVALID_LOC NETPOLICY_INVALID_QUEUE
#define POLICY_NAME_LEN_MAX 64
extern const char *policy_name[];
@@ -81,11 +83,34 @@ struct netpolicy_info {
struct list_head obj_list[NETPOLICY_RXTX][NET_POLICY_MAX];
};
+struct netpolicy_tcpudpip4_spec {
+ /* source and Destination host and port */
+ __be32 ip4src;
+ __be32 ip4dst;
+ __be16 psrc;
+ __be16 pdst;
+};
+
+union netpolicy_flow_union {
+ struct netpolicy_tcpudpip4_spec tcp_udp_ip4_spec;
+};
+
+struct netpolicy_flow_spec {
+ __u32 flow_type;
+ union netpolicy_flow_union spec;
+};
+
struct netpolicy_instance {
struct net_device *dev;
enum netpolicy_name policy; /* required policy */
void *ptr; /* pointers */
struct task_struct *task;
+ int location; /* rule location */
+ atomic_t rule_queue; /* queue set by rule */
+ struct work_struct fc_wk; /* flow classification work */
+ atomic_t fc_wk_cnt; /* flow classification work number */
+ struct netpolicy_flow_spec flow; /* flow information */
+
};
struct netpolicy_cpu_load {
@@ -106,6 +131,7 @@ extern int netpolicy_register(struct netpolicy_instance *instance,
enum netpolicy_name policy);
extern void netpolicy_unregister(struct netpolicy_instance *instance);
extern int netpolicy_pick_queue(struct netpolicy_instance *instance, bool is_rx);
+extern void netpolicy_set_rules(struct netpolicy_instance *instance);
#else
static inline void update_netpolicy_sys_map(void)
{
@@ -124,6 +150,10 @@ static inline int netpolicy_pick_queue(struct netpolicy_instance *instance, bool
{
return 0;
}
+
+static inline void netpolicy_set_rules(struct netpolicy_instance *instance)
+{
+}
#endif
#endif /*__LINUX_NETPOLICY_H*/
@@ -54,6 +54,8 @@ struct netpolicy_record {
static DEFINE_HASHTABLE(np_record_hash, 10);
static DEFINE_SPINLOCK(np_hashtable_lock);
+struct workqueue_struct *np_fc_wq;
+
static int netpolicy_get_dev_info(struct net_device *dev,
struct netpolicy_dev_info *d_info)
{
@@ -472,6 +474,90 @@ int netpolicy_pick_queue(struct netpolicy_instance *instance, bool is_rx)
}
EXPORT_SYMBOL(netpolicy_pick_queue);
+void np_flow_rule_set(struct work_struct *wk)
+{
+ struct netpolicy_instance *instance;
+ struct netpolicy_flow_spec *flow;
+ struct ethtool_rxnfc cmd;
+ struct net_device *dev;
+ int queue, ret;
+
+ instance = container_of(wk, struct netpolicy_instance,
+ fc_wk);
+ if (!instance)
+ goto done;
+
+ flow = &instance->flow;
+ if (WARN_ON(!flow))
+ goto done;
+ dev = instance->dev;
+ if (WARN_ON(!dev))
+ goto done;
+
+ /* Check if ntuple is supported */
+ if (!dev->ethtool_ops->set_rxnfc)
+ goto done;
+
+ /* Only support TCP/UDP V4 by now */
+ if ((flow->flow_type != TCP_V4_FLOW) &&
+ (flow->flow_type != UDP_V4_FLOW))
+ goto done;
+
+ queue = get_avail_queue(instance, true);
+ if (queue < 0)
+ goto done;
+
+ /* using ethtool flow-type to configure
+ * Rx network flow classification options or rules
+ * RX_CLS_LOC_ANY must be supported by the driver
+ */
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.cmd = ETHTOOL_SRXCLSRLINS;
+ cmd.fs.flow_type = flow->flow_type;
+ cmd.fs.h_u.tcp_ip4_spec.ip4src = flow->spec.tcp_udp_ip4_spec.ip4src;
+ cmd.fs.h_u.tcp_ip4_spec.psrc = flow->spec.tcp_udp_ip4_spec.psrc;
+ cmd.fs.h_u.tcp_ip4_spec.ip4dst = flow->spec.tcp_udp_ip4_spec.ip4dst;
+ cmd.fs.h_u.tcp_ip4_spec.pdst = flow->spec.tcp_udp_ip4_spec.pdst;
+ cmd.fs.ring_cookie = queue;
+ cmd.fs.location = RX_CLS_LOC_ANY;
+ rtnl_lock();
+ ret = dev->ethtool_ops->set_rxnfc(dev, &cmd);
+ rtnl_unlock();
+ if (ret < 0) {
+ pr_warn("Failed to set rules ret %d\n", ret);
+ atomic_set(&instance->rule_queue, NETPOLICY_INVALID_QUEUE);
+ goto done;
+ }
+
+ /* TODO: now one sk only has one rule */
+ if (instance->location != NETPOLICY_INVALID_LOC) {
+ /* delete the old rule */
+ struct ethtool_rxnfc del_cmd;
+
+ del_cmd.cmd = ETHTOOL_SRXCLSRLDEL;
+ del_cmd.fs.location = instance->location;
+ rtnl_lock();
+ ret = dev->ethtool_ops->set_rxnfc(dev, &del_cmd);
+ rtnl_unlock();
+ if (ret < 0)
+ pr_warn("Failed to delete rules ret %d\n", ret);
+ }
+
+ /* record rule location */
+ instance->location = cmd.fs.location;
+ atomic_set(&instance->rule_queue, queue);
+done:
+ atomic_set(&instance->fc_wk_cnt, 0);
+}
+
+static void init_instance(struct netpolicy_instance *instance)
+{
+ instance->location = NETPOLICY_INVALID_LOC;
+ atomic_set(&instance->rule_queue, NETPOLICY_INVALID_QUEUE);
+ atomic_set(&instance->fc_wk_cnt, 0);
+ INIT_WORK(&instance->fc_wk, np_flow_rule_set);
+}
+
/**
* netpolicy_register() - Register per socket/task policy request
* @instance: NET policy per socket/task instance info
@@ -516,6 +602,7 @@ int netpolicy_register(struct netpolicy_instance *instance,
}
kfree(new);
} else {
+ init_instance(instance);
new->ptr_id = ptr_id;
new->dev = instance->dev;
new->policy = policy;
@@ -538,8 +625,23 @@ EXPORT_SYMBOL(netpolicy_register);
*/
void netpolicy_unregister(struct netpolicy_instance *instance)
{
- struct netpolicy_record *record;
unsigned long ptr_id = (uintptr_t)instance->ptr;
+ struct net_device *dev = instance->dev;
+ struct netpolicy_record *record;
+
+ cancel_work_sync(&instance->fc_wk);
+ /* remove FD rules */
+ if (dev && instance->location != NETPOLICY_INVALID_LOC) {
+ struct ethtool_rxnfc del_cmd;
+
+ del_cmd.cmd = ETHTOOL_SRXCLSRLDEL;
+ del_cmd.fs.location = instance->location;
+ rtnl_lock();
+ dev->ethtool_ops->set_rxnfc(dev, &del_cmd);
+ rtnl_unlock();
+ instance->location = NETPOLICY_INVALID_LOC;
+ atomic_set(&instance->rule_queue, NETPOLICY_INVALID_QUEUE);
+ }
spin_lock_bh(&np_hashtable_lock);
/* del from hash table */
@@ -555,6 +657,28 @@ void netpolicy_unregister(struct netpolicy_instance *instance)
}
EXPORT_SYMBOL(netpolicy_unregister);
+/**
+ * netpolicy_set_rules() - Configure Rx network flow classification rules
+ * @instance: NET policy per socket/task instance info
+ *
+ * This function intends to configure Rx network flow classification rules
+ * according to ip and port information. The configuration will be done
+ * asynchronized by work queue. It avoids to destroy the connection rates.
+ *
+ * Currently, it only supports TCP and UDP V4. Other protocols will be
+ * supported later.
+ *
+ */
+void netpolicy_set_rules(struct netpolicy_instance *instance)
+{
+ /* There should be only one work to run at the same time */
+ if (!atomic_cmpxchg(&instance->fc_wk_cnt, 0, 1)) {
+ instance->task = current;
+ queue_work(np_fc_wq, &instance->fc_wk);
+ }
+}
+EXPORT_SYMBOL(netpolicy_set_rules);
+
const char *policy_name[NET_POLICY_MAX] = {
"NONE",
"CPU",
@@ -1255,6 +1379,10 @@ static int __init netpolicy_init(void)
{
int ret;
+ np_fc_wq = create_workqueue("np_fc");
+ if (!np_fc_wq)
+ return -ENOMEM;
+
ret = register_pernet_subsys(&netpolicy_net_ops);
if (!ret)
register_netdevice_notifier(&netpolicy_dev_notf);
@@ -1268,6 +1396,8 @@ static int __init netpolicy_init(void)
static void __exit netpolicy_exit(void)
{
+ destroy_workqueue(np_fc_wq);
+
unregister_netdevice_notifier(&netpolicy_dev_notf);
unregister_pernet_subsys(&netpolicy_net_ops);
@@ -754,6 +754,71 @@ ssize_t inet_sendpage(struct socket *sock, struct page *page, int offset,
}
EXPORT_SYMBOL(inet_sendpage);
+static void sock_netpolicy_manage_flow(struct sock *sk, struct msghdr *msg)
+{
+#ifdef CONFIG_NETPOLICY
+ struct netpolicy_instance *instance;
+ struct netpolicy_flow_spec *flow;
+ bool change = false;
+ int queue;
+
+ instance = netpolicy_find_instance(sk);
+ if (!instance)
+ return;
+
+ if (!instance->dev)
+ return;
+
+ flow = &instance->flow;
+ /* TODO: need to change here and add more protocol support */
+ if (sk->sk_family != AF_INET)
+ return;
+ if ((sk->sk_protocol == IPPROTO_TCP) &&
+ (sk->sk_type == SOCK_STREAM)) {
+ if ((flow->flow_type != TCP_V4_FLOW) ||
+ (flow->spec.tcp_udp_ip4_spec.ip4src != sk->sk_daddr) ||
+ (flow->spec.tcp_udp_ip4_spec.psrc != sk->sk_dport) ||
+ (flow->spec.tcp_udp_ip4_spec.ip4dst != sk->sk_rcv_saddr) ||
+ (flow->spec.tcp_udp_ip4_spec.pdst != htons(sk->sk_num)))
+ change = true;
+ if (change) {
+ flow->flow_type = TCP_V4_FLOW;
+ flow->spec.tcp_udp_ip4_spec.ip4src = sk->sk_daddr;
+ flow->spec.tcp_udp_ip4_spec.psrc = sk->sk_dport;
+ flow->spec.tcp_udp_ip4_spec.ip4dst = sk->sk_rcv_saddr;
+ flow->spec.tcp_udp_ip4_spec.pdst = htons(sk->sk_num);
+ }
+ } else if ((sk->sk_protocol == IPPROTO_UDP) &&
+ (sk->sk_type == SOCK_DGRAM)) {
+ DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
+
+ if (!sin || !sin->sin_addr.s_addr || !sin->sin_port)
+ return;
+ if ((flow->flow_type != UDP_V4_FLOW) ||
+ (flow->spec.tcp_udp_ip4_spec.ip4src != sin->sin_addr.s_addr) ||
+ (flow->spec.tcp_udp_ip4_spec.psrc != sin->sin_port) ||
+ (flow->spec.tcp_udp_ip4_spec.ip4dst != sk->sk_rcv_saddr) ||
+ (flow->spec.tcp_udp_ip4_spec.pdst != htons(sk->sk_num)))
+ change = true;
+ if (change) {
+ flow->flow_type = UDP_V4_FLOW;
+ flow->spec.tcp_udp_ip4_spec.ip4src = sin->sin_addr.s_addr;
+ flow->spec.tcp_udp_ip4_spec.psrc = sin->sin_port;
+ flow->spec.tcp_udp_ip4_spec.ip4dst = sk->sk_rcv_saddr;
+ flow->spec.tcp_udp_ip4_spec.pdst = htons(sk->sk_num);
+ }
+ } else {
+ return;
+ }
+
+ queue = netpolicy_pick_queue(instance, true);
+ if (queue < 0)
+ return;
+ if ((queue != atomic_read(&instance->rule_queue)) || change)
+ netpolicy_set_rules(instance);
+#endif
+}
+
int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int flags)
{
@@ -767,6 +832,12 @@ int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
flags & ~MSG_DONTWAIT, &addr_len);
if (err >= 0)
msg->msg_namelen = addr_len;
+
+ /* The dev info, src address and port information for UDP
+ * can only be retrieved after processing the msg.
+ */
+ sock_netpolicy_manage_flow(sk, msg);
+
return err;
}
EXPORT_SYMBOL(inet_recvmsg);
@@ -1786,6 +1786,10 @@ int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
if (sk) {
int ret;
+#ifdef CONFIG_NETPOLICY
+ /* Record dev info before it's discarded in udp_queue_rcv_skb */
+ sk->sk_netpolicy.dev = skb->dev;
+#endif
if (inet_get_convert_csum(sk) && uh->check && !IS_UDPLITE(sk))
skb_checksum_try_convert(skb, IPPROTO_UDP, uh->check,
inet_compute_pseudo);