diff mbox

rfs: Receive Flow Steering

Message ID alpine.DEB.1.00.1004012045560.4252@pokey.mtv.corp.google.com
State Changes Requested, archived
Delegated to: David Miller
Headers show

Commit Message

Tom Herbert April 2, 2010, 3:59 a.m. UTC
This patch implements receive flow steering (RFS).  RFS steers received packets for layer 3 and 4 processing to the CPU where the application for the corresponding flow is running.  RFS is an extension of Receive Packet Steering (RPS).

The basic idea of RFS is that when an application calls recvmsg (or sendmsg) the application's running CPU is stored in a hash table that is indexed by the connection's rxhash which is stored in the socket structure.  The rxhash is passed in skb's received on the connection from netif_receive_skb.  For each received packet, the associated rxhash is used to look up the CPU in the hash table, if a valid CPU is set then the packet is steered to that CPU using the RPS mechanisms.

The convolution of the simple approach is that it would potentially allow OOO packets.  If threads are thrashing around CPUs or multiple threads are trying to read from the same sockets, a quickly changing CPU value in the hash table could cause rampant OOO packets-- we consider this a non-starter.

To avoid OOO packets, this solution implements two types of hash tables: rps_sock_flow_table and rps_dev_flow_table.

rps_sock_table is a global hash table.  Each entry is just a CPU number and it is populated in recvmsg and sendmsg as described above.  This table contains the "desired" CPUs for flows.

rps_dev_flow_table is specific to each device queue.  Each entry contains a CPU and a tail queue counter.  The CPU is the "current" CPU for a matching flow.  The tail queue counter holds the value of a tail queue counter for the associated CPU's backlog queue at the time of last enqueue for a flow matching the entry.

Each backlog queue has a queue head counter which is incremented on dequeue, and so a queue tail counter is computed as queue head count + queue length.  When a packet is enqueued on a backlog queue, the current value of the queue tail counter is saved in the hash entry of the rps_dev_flow_table.

And now the trick: when selecting the CPU for RPS (get_rps_cpu) the rps_sock_flow table and the rps_dev_flow table for the RX queue are consulted.  When the desired CPU for the flow (found in the rps_sock_flow table) does not match the current CPU (found in the rps_dev_flow table), the current CPU is changed to the desired CPU if one of the following is true:

- The current CPU is unset (equal to NR_CPUS)
- Current CPU is offline
- The current CPU's queue head counter >= queue tail counter in the rps_dev_flow table.  This checks if the queue tail has advanced beyond the last packet that was enqueued using this table entry.  This guarantees that all packets queued using this entry have been dequeued, thus preserving in order delivery.

Making each queue have its own rps_dev_flow table has two advantages: 1) the tail queue counters will be written on each receive, so keeping the table local to interrupting CPU s good for locality.  2) this allows lockless access to the table-- the CPU number and queue tail counter need to be accessed together under mutual exclusion from netif_receive_skb, we assume that this is only called from device napi_poll which is non-reentrant.

This patch implements RFS for TCP, although it should be usable for other flow oriented protocols.  The patch has some minor modifications in TCP and inet: in tcp_v4_rcv the rxhash is saved in the inet_sk, in inet_sendmsg and inet_recvmsg (added function) the current CPU is recored in rps_sock_flow_table as indexed by the saved rxhash.

There are two configuration parameters for RFS.  The "rps_flow_entries" kernel init parameter sets the number of entries in the rps_sock_flow_table, the per rxqueue sysfs entry "rps_flow_cnt" contains the number of entries in the rps_dev_flow table for the rxqueue.  Both are rounded to power of two.

The obvious benefit of RFS (over just RPS) is that it achieves CPU locality between the receive processing for a flow and the applications processing; this can result in increased performance (higher pps, lower latency).

The benefits of RFS are dependent on cache hierarchy, application load, and other factors.  On simple benchmarks, we don't necessarily see improvement and sometimes see degradation.  However, for more complex benchmarks and for applications where cache pressure is much higher this technique seems to perform very well.

Below are some benchmark results which show the potential benfit of this patch.  The netperf test has 500 instances of netperf TCP_RR test with 1 byte req. and resp.  The RPC test is an request/response test similar in structure to netperf RR test ith 100 threads on each host, but does more work in userspace that netperf.

e1000e on 8 core Intel
   No RFS or RPS		104K tps at 30% CPU
   No RFS (best RPS config):    290K tps at 63% CPU
   RFS				303K tps at 61% CPU

RPC test		tps	CPU%	50/90/99% usec latency	StdDev
   No RFS or RPS	103K	48%	757/900/3185		4472.35
   RPS only:		174K	73%	415/993/2468		491.66
   RFS			223K	73%	379/651/1382		315.61
   
Signed-off-by: Tom Herbert <therbert@google.com>
---
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Changli Gao April 2, 2010, 5:04 a.m. UTC | #1
On Fri, Apr 2, 2010 at 11:59 AM, Tom Herbert <therbert@google.com> wrote:
> @@ -714,6 +716,8 @@ int inet_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
>  {
>        struct sock *sk = sock->sk;
>
> +       inet_rps_record_flow(sk);
> +
>        /* We may need to bind the socket. */
>        if (!inet_sk(sk)->inet_num && inet_autobind(sk))
>                return -EAGAIN;
> @@ -722,12 +726,13 @@ int inet_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
>  }
>  EXPORT_SYMBOL(inet_sendmsg);
>
> -
>  static ssize_t inet_sendpage(struct socket *sock, struct page *page, int offset,
>                             size_t size, int flags)
>  {
>        struct sock *sk = sock->sk;
>
> +       inet_rps_record_flow(sk);
> +
>        /* We may need to bind the socket. */
>        if (!inet_sk(sk)->inet_num && inet_autobind(sk))
>                return -EAGAIN;
> @@ -737,6 +742,22 @@ static ssize_t inet_sendpage(struct socket *sock, struct page *page, int offset,
>        return sock_no_sendpage(sock, page, offset, size, flags);
>  }
>

for sending packets, how about letting sender compute the rxhash of
the packets from the other side if the rxhash of socket hasn't been
set yet. I is better for client applications.

For router and bridge, the current RPS can work well, but not for
server or client applications. So I propose a new socket option to get
the rps cpu of the packets received on a socket. It may be like this:

int cpu;
getsockopt(sock, SOL_SOCKET, SO_RPSCPU, &cpu, sizeof(cpu));

As Tom's patch did, rxhash is recorded in socket. When the call above
is made, rps_map is looked up to find the RPSCPU for that hash. Once
we get the cpu of the current connection, for a TCP server, it can
dispatch the new connection to the processes which run on that CPU.
the server code will be like this:

fd = accpet(fd, NULL, NULL);
getsockopt(fd, SOL_SOCKET, SO_RPSCPU, &cpu, sizeof(cpu));
asyncq_enqueue(work_queue[cpu], fd);

For a client program, the rxhash can be got after the first packet of
the connection is sent. So the client code will be:

fd = connect(fd, &addr, addr_len);
getsockopt(fd, SOL_SOCKET, SO_RPSCPU, &cpu, sizeof(cpu));
asyncq_enqueue(work_queue[cpu], fd);

I do think this idea is easier to understood. I'll cook a patch later
if it is welcomed.
Eric Dumazet April 2, 2010, 7:29 a.m. UTC | #2
Le vendredi 02 avril 2010 à 13:04 +0800, Changli Gao a écrit :

> for sending packets, how about letting sender compute the rxhash of
> the packets from the other side if the rxhash of socket hasn't been
> set yet. I is better for client applications.
> 



> For router and bridge, the current RPS can work well, but not for
> server or client applications. So I propose a new socket option to get
> the rps cpu of the packets received on a socket. It may be like this:
> 


Your claim of RPS being not good for applications is wrong, our test
results show an improvement as is. Maybe your applications dont scale,
because of bad habits, or collidings heuristics, I dont know.

> int cpu;
> getsockopt(sock, SOL_SOCKET, SO_RPSCPU, &cpu, sizeof(cpu));
> 
> As Tom's patch did, rxhash is recorded in socket. When the call above
> is made, rps_map is looked up to find the RPSCPU for that hash. Once
> we get the cpu of the current connection, for a TCP server, it can
> dispatch the new connection to the processes which run on that CPU.
> the server code will be like this:
> 
> fd = accpet(fd, NULL, NULL);
> getsockopt(fd, SOL_SOCKET, SO_RPSCPU, &cpu, sizeof(cpu));
> asyncq_enqueue(work_queue[cpu], fd);
> 
> For a client program, the rxhash can be got after the first packet of
> the connection is sent. So the client code will be:
> 
> fd = connect(fd, &addr, addr_len);
> getsockopt(fd, SOL_SOCKET, SO_RPSCPU, &cpu, sizeof(cpu));
> asyncq_enqueue(work_queue[cpu], fd);
> 
> I do think this idea is easier to understood. I'll cook a patch later
> if it is welcomed.
> 

Whole point of Herbert patches is you dont need to change applications
and put complex logic in them, knowing exact machine topology.

Your suggestion is very complex, because you must bind each thread on a
particular cpu, and this is pretty bad for many reasons. We should allow
thread migrations, because scheduler or admin know better than the
application.

Application writers should rely on standard kernel mechanisms, and
schedulers, because an application have a limited point of view of what
really happens on the machine.




--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Eric Dumazet April 2, 2010, 7:58 a.m. UTC | #3
Le jeudi 01 avril 2010 à 20:59 -0700, Tom Herbert a écrit :

> +static void rps_dev_flow_table_release_work(struct work_struct *work)
> +{
> +	struct rps_dev_flow_table *table = container_of(work,
> +	    struct rps_dev_flow_table, free_work);
> +
> +	vfree(table);
> +}
> +
> +static void rps_dev_flow_table_release(struct rcu_head *rcu)
> +{
> +	struct rps_dev_flow_table *table = container_of(rcu,
> +	    struct rps_dev_flow_table, rcu);
> +
> +	INIT_WORK(&table->free_work, rps_dev_flow_table_release_work);
> +	schedule_work(&table->free_work);
> +}
> +
> +ssize_t store_rps_dev_flow_table_cnt(struct netdev_rx_queue *queue,
> +				     struct rx_queue_attribute *attr,
> +				     const char *buf, size_t len)
> +{

....

> +	if (old_table)
> +		call_rcu(&old_table->rcu, rps_dev_flow_table_release);
> +
> +	return len;
> +}
> +

Instead of this complex logic (yet fine), you might try :

if (old_table) {
	synchronize_rcu();
	vfree(old_table);
	}

(and remove free_work from struct rps_dev_flow_table)

We dont write this file that often, we can afford the synchronize_rcu()
here...



--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Eric Dumazet April 2, 2010, 8:35 a.m. UTC | #4
Le jeudi 01 avril 2010 à 20:59 -0700, Tom Herbert a écrit :

> e1000e on 8 core Intel
>    No RFS or RPS		104K tps at 30% CPU
>    No RFS (best RPS config):    290K tps at 63% CPU
>    RFS				303K tps at 61% CPU
> 
> RPC test		tps	CPU%	50/90/99% usec latency	StdDev
>    No RFS or RPS	103K	48%	757/900/3185		4472.35
>    RPS only:		174K	73%	415/993/2468		491.66
>    RFS			223K	73%	379/651/1382		315.61
>    
> Signed-off-by: Tom Herbert <therbert@google.com>
> ---
> diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h
> index b5670ab..ea6ebca 100644
> --- a/include/linux/netdevice.h
> +++ b/include/linux/netdevice.h
> @@ -543,14 +543,72 @@ struct rps_map {
>  };
>  #define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + (_num * sizeof(u16)))
>  
> +/*
> + * The rps_dev_flow structure contains the mapping of a flow to a CPU and the
> + * tail pointer for that CPU's input queue at the time of last enqueue.
> + */
> +struct rps_dev_flow {
> +	u16 cpu;

please use a 32bit quantity, it is faster on many arches, and avoid a
16bits hole.

> +	unsigned int last_qtail;
> +};
> +
> +/*
> + * The rps_dev_flow_table structure contains a table of flow mappings.
> + */
> +struct rps_dev_flow_table {
> +	unsigned int mask;
> +	struct rcu_head rcu;
> +	struct work_struct free_work;
> +	struct rps_dev_flow flows[0];
> +};
> +#define RPS_DEV_FLOW_TABLE_SIZE(_num) (sizeof(struct rps_dev_flow_table) + \
> +    (_num * sizeof(struct rps_dev_flow)))
> +
> +/*
> + * The rps_sock_flow_table contains mappings of flows to the last CPU
> + * on which they were processed by the application (set in recvmsg).
> + */
> +struct rps_sock_flow_table {
> +	unsigned int mask;
> +	u16 *ents;
> +};
> +
> +#define RPS_NO_CPU NR_CPUS

using 0xFFFF might be faster

> +
> +static inline void rps_set_sock_flow(struct rps_sock_flow_table *table,
> +				     u32 hash, int cpu)
> +{
> +	if (table->ents && hash) {
> +		unsigned int index = hash & table->mask;
> +
> +		if (table->ents[index] != cpu)
> +			table->ents[index] = cpu;
> +	}
> +}
> +
> +static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
> +					u32 hash)
> +{

Hmm, so rps_record_sock_flow() is always called from non preemptable
contextes ?

> +	rps_set_sock_flow(table, hash, smp_processor_id());
> +}
> +
> +static inline void rps_reset_sock_flow(struct rps_sock_flow_table *table,
> +				       u32 hash)
> +{
> +	rps_set_sock_flow(table, hash, RPS_NO_CPU);
> +}
> +

Could you respin your patch against latest net-next-2.6, because it
doesnt apply after latest RPS commit from Changli.

Thanks !


--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Changli Gao April 2, 2010, 10:58 a.m. UTC | #5
On Fri, Apr 2, 2010 at 3:29 PM, Eric Dumazet <eric.dumazet@gmail.com> wrote:
> Le vendredi 02 avril 2010 à 13:04 +0800, Changli Gao a écrit :
>
>
> Your claim of RPS being not good for applications is wrong, our test
> results show an improvement as is. Maybe your applications dont scale,
> because of bad habits, or collidings heuristics, I dont know.
>

I didn't mean RPS isn't good for applications. I mean that the
performance improvement of applications isn't as much as firewalls'.
In the other words, we can do much better than the current RPS,  as
RFS does.

>
> Whole point of Herbert patches is you dont need to change applications
> and put complex logic in them, knowing exact machine topology.
>
> Your suggestion is very complex, because you must bind each thread on a
> particular cpu, and this is pretty bad for many reasons. We should allow
> thread migrations, because scheduler or admin know better than the
> application.
>
> Application writers should rely on standard kernel mechanisms, and
> schedulers, because an application have a limited point of view of what
> really happens on the machine.
>

Yes, it is more complex. Some high performance server use the
event-driven model, such as memcached, nginx and lighttpd. This model
has high performance on UP with no doubt, and on SMP they usually use
one individual epoll fd for each Core/CPU, and the acceptor dispatches
works among these epoll fds. This program model is popular, and  it
bypass the  system scheduler. I think the socket option SO_RPSCPU can
help this kind of applications work better, why not do that?
Compatility with other Unixes isn't a good cause, for high performance
applications, there are always lots of OS special features used. For
example: epoll vs kqueue, tcp defer accept vs accept filter.
Eric Dumazet April 2, 2010, 12:01 p.m. UTC | #6
Le vendredi 02 avril 2010 à 18:58 +0800, Changli Gao a écrit :

> Yes, it is more complex. Some high performance server use the
> event-driven model, such as memcached, nginx and lighttpd. This model
> has high performance on UP with no doubt, and on SMP they usually use
> one individual epoll fd for each Core/CPU, and the acceptor dispatches
> works among these epoll fds. This program model is popular, and  it
> bypass the  system scheduler. I think the socket option SO_RPSCPU can
> help this kind of applications work better, why not do that?
> Compatility with other Unixes isn't a good cause, for high performance
> applications, there are always lots of OS special features used. For
> example: epoll vs kqueue, tcp defer accept vs accept filter.
> 
> 

This dispatch things in UserLand is a poor workaround even if its
popular (because people try to code portable applications), the hard
work is already done, this increases latencies and bus traffic.

For short works, that is too expensive.

If you really want to speedup memcached/DNS_server like apps, you might
add a generic mechanism in kernel to split queues of _individual_
socket.

Aka multiqueue capabilities at socket level. Combined to multiqueue
devices or RPS, this can be great.


That is, an application tells kernel in how many queues incoming UDP
frames for a given port can be dispatched (number of worker threads)
No more contention, and this can be done regardless of RPS/RFS.

UDP frame comes in, and is stored on the appropriate sub-queue (can be a
mapping given by current cpu number). Wakeup the thread that is likely
running on same cpu.

Same for outgoing frames (answers). You might split the sk_wmemalloc
thing to make sure several cpus can concurrently use same UDP socket to
send their frames.



--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Eric Dumazet April 2, 2010, 12:37 p.m. UTC | #7
> > +static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
> > +					u32 hash)
> > +{
> 
> Hmm, so rps_record_sock_flow() is always called from non preemptable
> contextes ?
> 
> > +	rps_set_sock_flow(table, hash, smp_processor_id());
> > +}
> > +

I had to change this to :

static inline void rps_record_sock_flow(struct rps_sock_flow_table *table, u32 hash)
{
	/* we only give a hint, preemption can change our cpu under us */
	rps_set_sock_flow(table, hash, raw_smp_processor_id());
}



--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Changli Gao April 2, 2010, 1:45 p.m. UTC | #8
On Fri, Apr 2, 2010 at 8:01 PM, Eric Dumazet <eric.dumazet@gmail.com> wrote:
> Le vendredi 02 avril 2010 à 18:58 +0800, Changli Gao a écrit :
>
> This dispatch things in UserLand is a poor workaround even if its
> popular (because people try to code portable applications), the hard
> work is already done, this increases latencies and bus traffic.
>
> For short works, that is too expensive.

I'll write a sample program to verify it. For TCP progam, I think it
won't be too expensive.

>
> If you really want to speedup memcached/DNS_server like apps, you might
> add a generic mechanism in kernel to split queues of _individual_
> socket.
>
> Aka multiqueue capabilities at socket level. Combined to multiqueue
> devices or RPS, this can be great.
>
>
> That is, an application tells kernel in how many queues incoming UDP
> frames for a given port can be dispatched (number of worker threads)
> No more contention, and this can be done regardless of RPS/RFS.
>
> UDP frame comes in, and is stored on the appropriate sub-queue (can be a
> mapping given by current cpu number). Wakeup the thread that is likely
> running on same cpu.
>
> Same for outgoing frames (answers). You might split the sk_wmemalloc
> thing to make sure several cpus can concurrently use same UDP socket to
> send their frames.
>

Yea. It much likes my another idea: selective wakeup. Always try to
wake up the sleeping process which is likely running on the same cpu.
Rick Jones April 2, 2010, 5:01 p.m. UTC | #9
Eric Dumazet wrote:
> 
> Your claim of RPS being not good for applications is wrong, our test
> results show an improvement as is. Maybe your applications dont scale,
> because of bad habits, or collidings heuristics, I dont know.

The progression in HP-UX was IPS (10.20) (aka RPS) then TOPS (11.0) (aka RFS). 
We found that IPS was great for single-flow-per-thread-of-execution stuff and 
that TOPS was better for multiple-flow-per-thread-of-execution stuff.  It was 
long enough ago now that I can safely say for one system-level benchmark not 
known to be a "networking" benchmark, and without a massive kernel component, 
TOPS was a 10% win.  Not too shabby.

It wasn't that IPS wasn't good in its context - just that TOPS was even better.

We also preferred the concept of the scheduler giving networking clues as to 
where to process an application's packets rather than networking trying to tell 
the scheduler.  There was some discussion of out of order worries, but we were 
willing to trust to the basic soundness of the scheduler - if it was moving 
threads around willy nilly at a rate able to cause big packet reordering it had 
fundamental problems that would have to be addressed anyway.  And while it may 
be incindiary to point this out :)  I suspect (without concrete data :) that 
bonding mode 0 is a much, Much, MUCH larger source of out-of-order traffic than 
any plausible scheduler thrashing.

happy benchmarking,

rick jones
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Rick Jones April 2, 2010, 6:25 p.m. UTC | #10
Tom Herbert wrote:
> 
> 
> On Fri, Apr 2, 2010 at 10:01 AM, Rick Jones <rick.jones2@hp.com 
> <mailto:rick.jones2@hp.com>> wrote:
> 
>     Eric Dumazet wrote:
> 
> 
>         Your claim of RPS being not good for applications is wrong, our test
>         results show an improvement as is. Maybe your applications dont
>         scale,
>         because of bad habits, or collidings heuristics, I dont know.
> 
> 
>     The progression in HP-UX was IPS (10.20) (aka RPS) then TOPS (11.0)
>     (aka RFS). We found that IPS was great for
>     single-flow-per-thread-of-execution stuff and that TOPS was better
>     for multiple-flow-per-thread-of-execution stuff.  It was long enough
>     ago now that I can safely say for one system-level benchmark not
>     known to be a "networking" benchmark, and without a massive kernel
>     component, TOPS was a 10% win.  Not too shabby.
> 
>     It wasn't that IPS wasn't good in its context - just that TOPS was
>     even better.
> 
> I would assume that with IPS threads would migrate to where packets were 
> being delivered thus giving the same sort of locality TOPS was 
> providing?  That would work great without any other constraints 
> (multiple flows per thread, thread CPU bindings, etc.).

Well... that depended - at the time, and still, we were and are also encouraging 
users and app designers to make copious use of processor/locality affinity (SMP 
and NUMA going back far longer in the RISC et al space than the x86 space).  So, 
it was and is entirely possible that the application thread of execution is 
hard-bound to a specific core/locality.  Also, I do not recall if HP-UX was as 
aggressive about waking a process/thread on the processor from which the wake-up 
came vs on the processor on which it last ran.

>     We also preferred the concept of the scheduler giving networking
>     clues as to where to process an application's packets rather than
>     networking trying to tell the scheduler.  There was some discussion
>     of out of order worries, but we were willing to trust to the basic
>     soundness of the scheduler - if it was moving threads around willy
>     nilly at a rate able to cause big packet reordering it had
>     fundamental problems that would have to be addressed anyway.
> 
> 
> I also think scheduler leading networking, like in RPS,  is generally 
> more scalable.  As for OOO packets, I've spent way to much time trying 
> to convince the bean-counters that a small number of them aren't 
> problematic :-), in the end it's just easier to not introduce new 
> mechanisms that will cause them!

So long as it doesn't drive you to produce new mechanisms heavier than they 
would have otherwise been.

The irony in the case of HP-UX IPS was that it was put in place in response to 
the severe out of order packet problems in HP-UX in 10.X before 10.20 - there 
were multiple netisr processes and only one netisr queue.  The other little 
tweak that came along in 10.20 with IPS, was inaddition to having a per 
processor (well, per core in today's parlance) netisr queue, the netisr would 
grab the entire queue under the one spinlock and work off of that.  That was 
nice because the code path became more efficient under load - more packets 
processed per spinlock/unlock pair.

happy benchmarking,

rick jones
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Changli Gao April 8, 2010, 1:37 a.m. UTC | #11
On Sat, Apr 3, 2010 at 2:25 AM, Rick Jones <rick.jones2@hp.com> wrote:
> Tom Herbert wrote:
>>    The progression in HP-UX was IPS (10.20) (aka RPS) then TOPS (11.0)
>>    (aka RFS). We found that IPS was great for
>>    single-flow-per-thread-of-execution stuff and that TOPS was better
>>    for multiple-flow-per-thread-of-execution stuff.  It was long enough
>>    ago now that I can safely say for one system-level benchmark not
>>    known to be a "networking" benchmark, and without a massive kernel
>>    component, TOPS was a 10% win.  Not too shabby.
>>
>>    It wasn't that IPS wasn't good in its context - just that TOPS was
>>    even better.
>>
>> I would assume that with IPS threads would migrate to where packets were
>> being delivered thus giving the same sort of locality TOPS was providing?
>>  That would work great without any other constraints (multiple flows per
>> thread, thread CPU bindings, etc.).
>
> Well... that depended - at the time, and still, we were and are also
> encouraging users and app designers to make copious use of
> processor/locality affinity (SMP and NUMA going back far longer in the RISC
> et al space than the x86 space).  So, it was and is entirely possible that
> the application thread of execution is hard-bound to a specific
> core/locality.  Also, I do not recall if HP-UX was as aggressive about
> waking a process/thread on the processor from which the wake-up came vs on
> the processor on which it last ran.
>

Maybe RPS should be work against process not processor. For packets
forwarding, the process is net_rx softirq.

>>    We also preferred the concept of the scheduler giving networking
>>    clues as to where to process an application's packets rather than
>>    networking trying to tell the scheduler.  There was some discussion
>>    of out of order worries, but we were willing to trust to the basic
>>    soundness of the scheduler - if it was moving threads around willy
>>    nilly at a rate able to cause big packet reordering it had
>>    fundamental problems that would have to be addressed anyway.
>>
>>
>> I also think scheduler leading networking, like in RPS,  is generally more
>> scalable.  As for OOO packets, I've spent way to much time trying to
>> convince the bean-counters that a small number of them aren't problematic
>> :-), in the end it's just easier to not introduce new mechanisms that will
>> cause them!
>
> So long as it doesn't drive you to produce new mechanisms heavier than they
> would have otherwise been.
>
> The irony in the case of HP-UX IPS was that it was put in place in response
> to the severe out of order packet problems in HP-UX in 10.X before 10.20 -
> there were multiple netisr processes and only one netisr queue.  The other
> little tweak that came along in 10.20 with IPS, was inaddition to having a
> per processor (well, per core in today's parlance) netisr queue, the netisr
> would grab the entire queue under the one spinlock and work off of that.
>  That was nice because the code path became more efficient under load - more
> packets processed per spinlock/unlock pair.
>

RPS dispatches packets among all the CPUs permitted fairly, in order
to take full advantage of all the CPU power. The assumption is the cpu
cycles each CPU gives to packet processing are the same. But it isn't
always true as scheduler is mixed in. In this case, scheduler leading
network is a good choice. Maybe we should make softirq threaded under
the control of scheduler. And the number of softirq threads can be
specified by users. By default, the number of the softirq threads are
the same as the number of CPUs, and each thread binds to a special
CPU, to keep the current behavior. If the other tasks aren't
dispatched among the CPUs even, system administrator may increase the
number of softirq thread, and dissolve the thread binding, then there
will be enough schedulable softirq threads for scheduler scheduling.
Oh, maybe there is no need of weighted packets dispatching RPS.
diff mbox

Patch

diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h
index b5670ab..ea6ebca 100644
--- a/include/linux/netdevice.h
+++ b/include/linux/netdevice.h
@@ -543,14 +543,72 @@  struct rps_map {
 };
 #define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + (_num * sizeof(u16)))
 
+/*
+ * The rps_dev_flow structure contains the mapping of a flow to a CPU and the
+ * tail pointer for that CPU's input queue at the time of last enqueue.
+ */
+struct rps_dev_flow {
+	u16 cpu;
+	unsigned int last_qtail;
+};
+
+/*
+ * The rps_dev_flow_table structure contains a table of flow mappings.
+ */
+struct rps_dev_flow_table {
+	unsigned int mask;
+	struct rcu_head rcu;
+	struct work_struct free_work;
+	struct rps_dev_flow flows[0];
+};
+#define RPS_DEV_FLOW_TABLE_SIZE(_num) (sizeof(struct rps_dev_flow_table) + \
+    (_num * sizeof(struct rps_dev_flow)))
+
+/*
+ * The rps_sock_flow_table contains mappings of flows to the last CPU
+ * on which they were processed by the application (set in recvmsg).
+ */
+struct rps_sock_flow_table {
+	unsigned int mask;
+	u16 *ents;
+};
+
+#define RPS_NO_CPU NR_CPUS
+
+static inline void rps_set_sock_flow(struct rps_sock_flow_table *table,
+				     u32 hash, int cpu)
+{
+	if (table->ents && hash) {
+		unsigned int index = hash & table->mask;
+
+		if (table->ents[index] != cpu)
+			table->ents[index] = cpu;
+	}
+}
+
+static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
+					u32 hash)
+{
+	rps_set_sock_flow(table, hash, smp_processor_id());
+}
+
+static inline void rps_reset_sock_flow(struct rps_sock_flow_table *table,
+				       u32 hash)
+{
+	rps_set_sock_flow(table, hash, RPS_NO_CPU);
+}
+
+extern struct rps_sock_flow_table rps_sock_flow_table;
+
 /* This structure contains an instance of an RX queue. */
 struct netdev_rx_queue {
 	struct rps_map *rps_map;
+	struct rps_dev_flow_table *rps_flow_table;
 	struct kobject kobj;
 	struct netdev_rx_queue *first;
 	atomic_t count;
 } ____cacheline_aligned_in_smp;
-#endif
+#endif /* CONFIG_RPS */
 
 /*
  * This structure defines the management hooks for network devices.
@@ -1346,8 +1404,9 @@  struct softnet_data {
 	struct sk_buff		*completion_queue;
 
 	/* Elements below can be accessed between CPUs for RPS */
-#ifdef CONFIG_SMP
+#ifdef CONFIG_RPS
 	struct call_single_data	csd ____cacheline_aligned_in_smp;
+	unsigned int		input_queue_head;
 #endif
 	struct sk_buff_head	input_pkt_queue;
 	struct napi_struct	backlog;
diff --git a/include/net/inet_sock.h b/include/net/inet_sock.h
index 83fd344..daa2666 100644
--- a/include/net/inet_sock.h
+++ b/include/net/inet_sock.h
@@ -21,6 +21,7 @@ 
 #include <linux/string.h>
 #include <linux/types.h>
 #include <linux/jhash.h>
+#include <linux/netdevice.h>
 
 #include <net/flow.h>
 #include <net/sock.h>
@@ -101,6 +102,7 @@  struct rtable;
  * @uc_ttl - Unicast TTL
  * @inet_sport - Source port
  * @inet_id - ID counter for DF pkts
+ * @rxhash - flow hash received from netif layer
  * @tos - TOS
  * @mc_ttl - Multicasting TTL
  * @is_icsk - is this an inet_connection_sock?
@@ -124,6 +126,9 @@  struct inet_sock {
 	__u16			cmsg_flags;
 	__be16			inet_sport;
 	__u16			inet_id;
+#ifdef CONFIG_RPS
+	__u32			rxhash;
+#endif
 
 	struct ip_options	*opt;
 	__u8			tos;
@@ -219,4 +224,17 @@  static inline __u8 inet_sk_flowi_flags(const struct sock *sk)
 	return inet_sk(sk)->transparent ? FLOWI_FLAG_ANYSRC : 0;
 }
 
+static inline void inet_rps_record_flow(const struct sock *sk)
+{
+#ifdef CONFIG_RPS
+	rps_record_sock_flow(&rps_sock_flow_table, inet_sk(sk)->rxhash);
+#endif
+}
+
+static inline void inet_rps_reset_flow(const struct sock *sk)
+{
+#ifdef CONFIG_RPS
+	rps_reset_sock_flow(&rps_sock_flow_table, inet_sk(sk)->rxhash);
+#endif
+}
 #endif	/* _INET_SOCK_H */
diff --git a/net/core/dev.c b/net/core/dev.c
index 887aa84..94fd5c6 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -129,6 +129,7 @@ 
 #include <linux/jhash.h>
 #include <linux/random.h>
 #include <trace/events/napi.h>
+#include <linux/bootmem.h>
 
 #include "net-sysfs.h"
 
@@ -2178,21 +2179,29 @@  int weight_p __read_mostly = 64;            /* old backlog weight */
 DEFINE_PER_CPU(struct netif_rx_stats, netdev_rx_stat) = { 0, };
 
 #ifdef CONFIG_RPS
+/* One global table that all flow-based protocols share. */
+struct rps_sock_flow_table rps_sock_flow_table;
+EXPORT_SYMBOL(rps_sock_flow_table);
+
 /*
  * get_rps_cpu is called from netif_receive_skb and returns the target
  * CPU from the RPS map of the receiving queue for a given skb.
+ * rcu_read_lock must be held on entry.
  */
-static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
+static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
+		       struct rps_dev_flow **rflowp)
 {
 	struct ipv6hdr *ip6;
 	struct iphdr *ip;
 	struct netdev_rx_queue *rxqueue;
 	struct rps_map *map;
+	struct rps_dev_flow_table *flow_table;
 	int cpu = -1;
 	u8 ip_proto;
+	u16 tcpu;
 	u32 addr1, addr2, ports, ihl;
 
-	rcu_read_lock();
+	*rflowp = NULL;
 
 	if (skb_rx_queue_recorded(skb)) {
 		u16 index = skb_get_rx_queue(skb);
@@ -2208,7 +2217,7 @@  static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
 	} else
 		rxqueue = dev->_rx;
 
-	if (!rxqueue->rps_map)
+	if (!rxqueue->rps_map && !rxqueue->rps_flow_table)
 		goto done;
 
 	if (skb->rxhash)
@@ -2260,9 +2269,47 @@  static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
 		skb->rxhash = 1;
 
 got_hash:
+	flow_table = rcu_dereference(rxqueue->rps_flow_table);
+	if (flow_table && rps_sock_flow_table.ents) {
+		u16 next_cpu;
+		struct rps_dev_flow *rflow;
+
+		rflow = &flow_table->flows[skb->rxhash & flow_table->mask];
+		tcpu = rflow->cpu;
+
+		next_cpu = rps_sock_flow_table.ents[skb->rxhash &
+		    rps_sock_flow_table.mask];
+
+		/*
+		 * If the desired CPU (where last recvmsg was done) is
+		 * different from current CPU (one in the rx-queue flow
+		 * table entry), switch if one of the following holds:
+		 *   - Current CPU is unset (equal to RPS_NO_CPU).
+		 *   - Current CPU is offline.
+		 *   - The current CPU's queue tail has advanced beyond the
+		 *     last packet that was enqueued using this table entry.
+		 *     This guarantees that all previous packets for the flow
+		 *     have been dequeued, thus preserving in order delivery.
+		 */
+		if (unlikely(tcpu != next_cpu) &&
+		    (tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
+		     ((int)(per_cpu(softnet_data, tcpu).input_queue_head -
+		      rflow->last_qtail)) >= 0)) {
+			tcpu = rflow->cpu = next_cpu;
+			if (tcpu != RPS_NO_CPU)
+				rflow->last_qtail = per_cpu(softnet_data,
+				    tcpu).input_queue_head;
+		}
+		if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
+			*rflowp = rflow;
+			cpu = tcpu;
+			goto done;
+		}
+	}
+
 	map = rcu_dereference(rxqueue->rps_map);
 	if (map) {
-		u16 tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];
+		tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];
 
 		if (cpu_online(tcpu)) {
 			cpu = tcpu;
@@ -2271,7 +2318,6 @@  got_hash:
 	}
 
 done:
-	rcu_read_unlock();
 	return cpu;
 }
 
@@ -2297,13 +2343,14 @@  static void trigger_softirq(void *data)
 	__napi_schedule(&queue->backlog);
 	__get_cpu_var(netdev_rx_stat).received_rps++;
 }
-#endif /* CONFIG_SMP */
+#endif /* CONFIG_RPS */
 
 /*
  * enqueue_to_backlog is called to queue an skb to a per CPU backlog
  * queue (may be a remote CPU queue).
  */
-static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
+static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
+			      unsigned int *qtail)
 {
 	struct softnet_data *queue;
 	unsigned long flags;
@@ -2318,6 +2365,10 @@  static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
 		if (queue->input_pkt_queue.qlen) {
 enqueue:
 			__skb_queue_tail(&queue->input_pkt_queue, skb);
+#ifdef CONFIG_RPS
+			*qtail = queue->input_queue_head +
+			    queue->input_pkt_queue.qlen;
+#endif
 			spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
 			    flags);
 			return NET_RX_SUCCESS;
@@ -2367,7 +2418,11 @@  enqueue:
 
 int netif_rx(struct sk_buff *skb)
 {
-	int cpu;
+	unsigned int qtail;
+#ifdef CONFIG_RPS
+	struct rps_dev_flow *rflow;
+	int cpu, err;
+#endif
 
 	/* if netpoll wants it, pretend we never saw it */
 	if (netpoll_rx(skb))
@@ -2377,14 +2432,24 @@  int netif_rx(struct sk_buff *skb)
 		net_timestamp(skb);
 
 #ifdef CONFIG_RPS
-	cpu = get_rps_cpu(skb->dev, skb);
+	rcu_read_lock();
+
+	cpu = get_rps_cpu(skb->dev, skb, &rflow);
 	if (cpu < 0)
 		cpu = smp_processor_id();
+
+	err = enqueue_to_backlog(skb, cpu, &qtail);
+
+	if (rflow)
+		rflow->last_qtail = qtail;
+
+	rcu_read_unlock();
+
+	return err;
 #else
-	cpu = smp_processor_id();
+	return enqueue_to_backlog(skb, smp_processor_id(), &qtail);
 #endif
 
-	return enqueue_to_backlog(skb, cpu);
 }
 EXPORT_SYMBOL(netif_rx);
 
@@ -2751,14 +2816,26 @@  out:
 int netif_receive_skb(struct sk_buff *skb)
 {
 #ifdef CONFIG_RPS
-	int cpu;
+	struct rps_dev_flow *rflow;
+	int cpu, err;
+
+	rcu_read_lock();
 
-	cpu = get_rps_cpu(skb->dev, skb);
+	cpu = get_rps_cpu(skb->dev, skb, &rflow);
 
 	if (cpu < 0)
-		return __netif_receive_skb(skb);
-	else
-		return enqueue_to_backlog(skb, cpu);
+		err = __netif_receive_skb(skb);
+	else {
+		unsigned int qtail;
+
+		err = enqueue_to_backlog(skb, cpu, &qtail);
+		if (rflow)
+			rflow->last_qtail = qtail;
+	}
+
+	rcu_read_unlock();
+
+	return err;
 #else
 	return __netif_receive_skb(skb);
 #endif
@@ -2777,6 +2854,9 @@  static void flush_backlog(struct net_device *dev, int cpu)
 		if (skb->dev == dev) {
 			__skb_unlink(skb, &queue->input_pkt_queue);
 			kfree_skb(skb);
+#ifdef CONFIG_RPS
+			queue->input_queue_head++;
+#endif
 		}
 	spin_unlock_irqrestore(&queue->input_pkt_queue.lock, flags);
 }
@@ -3098,6 +3178,9 @@  static int process_backlog(struct napi_struct *napi, int quota)
 			spin_unlock_irq(&queue->input_pkt_queue.lock);
 			break;
 		}
+#ifdef CONFIG_RPS
+		queue->input_queue_head++;
+#endif
 		spin_unlock_irq(&queue->input_pkt_queue.lock);
 
 		__netif_receive_skb(skb);
@@ -6021,8 +6104,12 @@  static int dev_cpu_callback(struct notifier_block *nfb,
 	local_irq_enable();
 
 	/* Process offline CPU's input_pkt_queue */
-	while ((skb = __skb_dequeue(&oldsd->input_pkt_queue)))
+	while ((skb = __skb_dequeue(&oldsd->input_pkt_queue))) {
 		netif_rx(skb);
+#ifdef CONFIG_RPS
+		oldsd->input_queue_head++;
+#endif
+	}
 
 	return NOTIFY_OK;
 }
@@ -6203,6 +6290,42 @@  static struct pernet_operations __net_initdata default_device_ops = {
 	.exit_batch = default_device_exit_batch,
 };
 
+
+#ifdef CONFIG_RPS
+static __initdata unsigned long rps_sock_flow_entries;
+
+static int __init set_rps_sock_flow_entries(char *str)
+{
+	if (str)
+		rps_sock_flow_entries = simple_strtoul(str, &str, 0);
+
+	return 0;
+}
+
+__setup("rps_flow_entries=", set_rps_sock_flow_entries);
+
+static int alloc_rps_sock_flow_entries(void)
+{
+	unsigned int i, hash_size;
+
+	if (!rps_sock_flow_entries)
+		return 0;
+
+	rps_sock_flow_table.ents =
+	    alloc_large_system_hash("RPS flow table", sizeof(u16),
+	    rps_sock_flow_entries, 0, 0, &hash_size, NULL, 0);
+	hash_size = 1 << hash_size;
+	rps_sock_flow_table.mask = hash_size - 1;
+	for (i = 0; i < hash_size; i++)
+		rps_sock_flow_table.ents[i] = RPS_NO_CPU;
+
+	printk(KERN_INFO "RPS: flow table configured with %d entries\n",
+	    hash_size);
+
+	return 0;
+}
+#endif
+
 /*
  *	Initialize the DEV module. At boot time this walks the device list and
  *	unhooks any devices that fail to initialise (normally hardware not
@@ -6223,6 +6346,11 @@  static int __init net_dev_init(void)
 	if (dev_proc_init())
 		goto out;
 
+#ifdef CONFIG_RPS
+	if (alloc_rps_sock_flow_entries())
+		goto out;
+#endif
+
 	if (netdev_kobject_init())
 		goto out;
 
diff --git a/net/core/net-sysfs.c b/net/core/net-sysfs.c
index 1e7fdd6..3039e98 100644
--- a/net/core/net-sysfs.c
+++ b/net/core/net-sysfs.c
@@ -600,22 +600,106 @@  ssize_t store_rps_map(struct netdev_rx_queue *queue,
 	return len;
 }
 
+
+static ssize_t show_rps_dev_flow_table_cnt(struct netdev_rx_queue *queue,
+					   struct rx_queue_attribute *attr,
+					   char *buf)
+{
+	struct rps_dev_flow_table *flow_table;
+	unsigned int val = 0;
+
+	rcu_read_lock();
+	flow_table = rcu_dereference(queue->rps_flow_table);
+	if (flow_table)
+		val = flow_table->mask + 1;
+	rcu_read_unlock();
+
+	return sprintf(buf, "%u\n", val);
+}
+
+static void rps_dev_flow_table_release_work(struct work_struct *work)
+{
+	struct rps_dev_flow_table *table = container_of(work,
+	    struct rps_dev_flow_table, free_work);
+
+	vfree(table);
+}
+
+static void rps_dev_flow_table_release(struct rcu_head *rcu)
+{
+	struct rps_dev_flow_table *table = container_of(rcu,
+	    struct rps_dev_flow_table, rcu);
+
+	INIT_WORK(&table->free_work, rps_dev_flow_table_release_work);
+	schedule_work(&table->free_work);
+}
+
+ssize_t store_rps_dev_flow_table_cnt(struct netdev_rx_queue *queue,
+				     struct rx_queue_attribute *attr,
+				     const char *buf, size_t len)
+{
+	unsigned int count;
+	char *endp;
+	struct rps_dev_flow_table *table, *old_table;
+	static DEFINE_SPINLOCK(rps_dev_flow_lock);
+
+	if (!capable(CAP_NET_ADMIN))
+		return -EPERM;
+
+	count = simple_strtoul(buf, &endp, 0);
+	if (endp == buf)
+		return -EINVAL;
+
+	if (count) {
+		int i;
+
+		count = roundup_pow_of_two(count);
+		table = vmalloc(RPS_DEV_FLOW_TABLE_SIZE(count));
+		if (!table)
+			return -ENOMEM;
+
+		table->mask = count - 1;
+		for (i = 0; i < count; i++)
+			table->flows[i].cpu = RPS_NO_CPU;
+	} else
+		table = NULL;
+
+	spin_lock(&rps_dev_flow_lock);
+	old_table = queue->rps_flow_table;
+	rcu_assign_pointer(queue->rps_flow_table, table);
+	spin_unlock(&rps_dev_flow_lock);
+
+	if (old_table)
+		call_rcu(&old_table->rcu, rps_dev_flow_table_release);
+
+	return len;
+}
+
 static struct rx_queue_attribute rps_cpus_attribute =
 	__ATTR(rps_cpus, S_IRUGO | S_IWUSR, show_rps_map, store_rps_map);
 
+
+static struct rx_queue_attribute rps_dev_flow_table_cnt_attribute =
+	__ATTR(rps_flow_cnt, S_IRUGO | S_IWUSR,
+	    show_rps_dev_flow_table_cnt, store_rps_dev_flow_table_cnt);
+
 static struct attribute *rx_queue_default_attrs[] = {
 	&rps_cpus_attribute.attr,
+	&rps_dev_flow_table_cnt_attribute.attr,
 	NULL
 };
 
 static void rx_queue_release(struct kobject *kobj)
 {
 	struct netdev_rx_queue *queue = to_rx_queue(kobj);
-	struct rps_map *map = queue->rps_map;
 	struct netdev_rx_queue *first = queue->first;
 
-	if (map)
-		call_rcu(&map->rcu, rps_map_release);
+	if (queue->rps_map)
+		call_rcu(&queue->rps_map->rcu, rps_map_release);
+
+	if (queue->rps_flow_table)
+		call_rcu(&queue->rps_flow_table->rcu,
+		    rps_dev_flow_table_release);
 
 	if (atomic_dec_and_test(&first->count))
 		kfree(first);
diff --git a/net/ipv4/af_inet.c b/net/ipv4/af_inet.c
index 55e1190..eb6155a 100644
--- a/net/ipv4/af_inet.c
+++ b/net/ipv4/af_inet.c
@@ -418,6 +418,8 @@  int inet_release(struct socket *sock)
 	if (sk) {
 		long timeout;
 
+		inet_rps_reset_flow(sk);
+
 		/* Applications forget to leave groups before exiting */
 		ip_mc_drop_socket(sk);
 
@@ -714,6 +716,8 @@  int inet_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 {
 	struct sock *sk = sock->sk;
 
+	inet_rps_record_flow(sk);
+
 	/* We may need to bind the socket. */
 	if (!inet_sk(sk)->inet_num && inet_autobind(sk))
 		return -EAGAIN;
@@ -722,12 +726,13 @@  int inet_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 }
 EXPORT_SYMBOL(inet_sendmsg);
 
-
 static ssize_t inet_sendpage(struct socket *sock, struct page *page, int offset,
 			     size_t size, int flags)
 {
 	struct sock *sk = sock->sk;
 
+	inet_rps_record_flow(sk);
+
 	/* We may need to bind the socket. */
 	if (!inet_sk(sk)->inet_num && inet_autobind(sk))
 		return -EAGAIN;
@@ -737,6 +742,22 @@  static ssize_t inet_sendpage(struct socket *sock, struct page *page, int offset,
 	return sock_no_sendpage(sock, page, offset, size, flags);
 }
 
+int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
+		 size_t size, int flags)
+{
+	struct sock *sk = sock->sk;
+	int addr_len = 0;
+	int err;
+
+	inet_rps_record_flow(sk);
+
+	err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
+				   flags & ~MSG_DONTWAIT, &addr_len);
+	if (err >= 0)
+		msg->msg_namelen = addr_len;
+	return err;
+}
+EXPORT_SYMBOL(inet_recvmsg);
 
 int inet_shutdown(struct socket *sock, int how)
 {
@@ -866,7 +887,7 @@  const struct proto_ops inet_stream_ops = {
 	.setsockopt	   = sock_common_setsockopt,
 	.getsockopt	   = sock_common_getsockopt,
 	.sendmsg	   = tcp_sendmsg,
-	.recvmsg	   = sock_common_recvmsg,
+	.recvmsg	   = inet_recvmsg,
 	.mmap		   = sock_no_mmap,
 	.sendpage	   = tcp_sendpage,
 	.splice_read	   = tcp_splice_read,
@@ -893,7 +914,7 @@  const struct proto_ops inet_dgram_ops = {
 	.setsockopt	   = sock_common_setsockopt,
 	.getsockopt	   = sock_common_getsockopt,
 	.sendmsg	   = inet_sendmsg,
-	.recvmsg	   = sock_common_recvmsg,
+	.recvmsg	   = inet_recvmsg,
 	.mmap		   = sock_no_mmap,
 	.sendpage	   = inet_sendpage,
 #ifdef CONFIG_COMPAT
@@ -923,7 +944,7 @@  static const struct proto_ops inet_sockraw_ops = {
 	.setsockopt	   = sock_common_setsockopt,
 	.getsockopt	   = sock_common_getsockopt,
 	.sendmsg	   = inet_sendmsg,
-	.recvmsg	   = sock_common_recvmsg,
+	.recvmsg	   = inet_recvmsg,
 	.mmap		   = sock_no_mmap,
 	.sendpage	   = inet_sendpage,
 #ifdef CONFIG_COMPAT
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index f4df5f9..3060c17 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -1674,6 +1674,10 @@  process:
 
 	skb->dev = NULL;
 
+#ifdef CONFIG_RPS
+	inet_sk(sk)->rxhash = skb->rxhash;
+#endif
+
 	bh_lock_sock_nested(sk);
 	ret = 0;
 	if (!sock_owned_by_user(sk)) {