diff mbox series

[bpf-next,05/16] bpf: create tcp_bpf_ulp allowing BPF to monitor socket TX/RX data

Message ID 20180305195122.6612.78322.stgit@john-Precision-Tower-5810
State Changes Requested, archived
Delegated to: BPF Maintainers
Headers show
Series bpf,sockmap: sendmsg/sendfile ULP | expand

Commit Message

John Fastabend March 5, 2018, 7:51 p.m. UTC
This implements a BPF ULP layer to allow policy enforcement and
monitoring at the socket layer. In order to support this a new
program type BPF_PROG_TYPE_SK_MSG is used to run the policy at
the sendmsg/sendpage hook. To attach the policy to sockets a
sockmap is used with a new program attach type BPF_SK_MSG_VERDICT.

Similar to previous sockmap usages when a sock is added to a
sockmap, via a map update, if the map contains a BPF_SK_MSG_VERDICT
program type attached then the BPF ULP layer is created on the
socket and the attached BPF_PROG_TYPE_SK_MSG program is run for
every msg in sendmsg case and page/offset in sendpage case.

BPF_PROG_TYPE_SK_MSG Semantics/API:

BPF_PROG_TYPE_SK_MSG supports only two return codes SK_PASS and
SK_DROP. Returning SK_DROP free's the copied data in the sendmsg
case and in the sendpage case leaves the data untouched. Both cases
return -EACESS to the user. Returning SK_PASS will allow the msg to
be sent.

In the sendmsg case data is copied into kernel space buffers before
running the BPF program. In the sendpage case data is never copied.
The implication being users may change data after BPF programs run in
the sendpage case. (A flag will be added to always copy shortly
if the copy must always be performed).

The verdict from the BPF_PROG_TYPE_SK_MSG applies to the entire msg
in the sendmsg() case and the entire page/offset in the sendpage case.
This avoids ambiguity on how to handle mixed return codes in the
sendmsg case. The readable/writeable data provided to the program
in the sendmsg case may not be the entire message, in fact for
large sends this is likely the case. The data range that can be
read is part of the sk_msg_md structure. This is because similar
to the tc bpf_cls case the data is stored in a scatter gather list.
Future work will address this short-coming to allow users to pull
in more data if needed (similar to TC BPF).

The helper msg_redirect_map() can be used to select the socket to
send the data on. This is used similar to existing redirect use
cases. This allows policy to redirect msgs.

Pseudo code simple example:

The basic logic to attach a program to a socket is as follows,

  // load the programs
  bpf_prog_load(SOCKMAP_TCP_MSG_PROG, BPF_PROG_TYPE_SK_MSG,
		&obj, &msg_prog);

  // lookup the sockmap
  bpf_map_msg = bpf_object__find_map_by_name(obj, "my_sock_map");

  // get fd for sockmap
  map_fd_msg = bpf_map__fd(bpf_map_msg);

  // attach program to sockmap
  bpf_prog_attach(msg_prog, map_fd_msg, BPF_SK_MSG_VERDICT, 0);

Adding sockets to the map is done in the normal way,

  // Add a socket 'fd' to sockmap at location 'i'
  bpf_map_update_elem(map_fd_msg, &i, fd, BPF_ANY);

After the above any socket attached to "my_sock_map", in this case
'fd', will run the BPF msg verdict program (msg_prog) on every
sendmsg and sendpage system call.

For a complete example see BPF selftests or sockmap samples.

Implementation notes:

It seemed the simplest, to me at least, to use a refcnt to ensure
psock is not lost across the sendmsg copy into the sg, the bpf program
running on the data in sg_data, and the final pass to the TCP stack.
Some performance testing may show a better method to do this and avoid
the refcnt cost, but for now use the simpler method.

Another item that will come after basic support is in place is
supporting MSG_MORE flag. At the moment we call sendpages even if
the MSG_MORE flag is set. An enhancement would be to collect the
pages into a larger scatterlist and pass down the stack. Notice that
bpf_tcp_sendmsg() could support this with some additional state saved
across sendmsg calls. I built the code to support this without having
to do refactoring work. Other features TBD include ZEROCOPY and the
TCP_RECV_QUEUE/TCP_NO_QUEUE support. This will follow initial series
shortly.

Future work could improve size limits on the scatterlist rings used
here. Currently, we use MAX_SKB_FRAGS simply because this was being
used already in the TLS case. Future work could extend the kernel sk
APIs to tune this depending on workload. This is a trade-off
between memory usage and throughput performance.

Signed-off-by: John Fastabend <john.fastabend@gmail.com>
---
 include/linux/bpf.h       |    1 
 include/linux/bpf_types.h |    1 
 include/linux/filter.h    |   14 +
 include/uapi/linux/bpf.h  |   28 ++
 kernel/bpf/sockmap.c      |  517 ++++++++++++++++++++++++++++++++++++++++++++-
 kernel/bpf/syscall.c      |   14 +
 kernel/bpf/verifier.c     |    5 
 net/core/filter.c         |  106 +++++++++
 8 files changed, 668 insertions(+), 18 deletions(-)

Comments

David Miller March 5, 2018, 9:40 p.m. UTC | #1
From: John Fastabend <john.fastabend@gmail.com>
Date: Mon, 05 Mar 2018 11:51:22 -0800

> BPF_PROG_TYPE_SK_MSG supports only two return codes SK_PASS and
> SK_DROP. Returning SK_DROP free's the copied data in the sendmsg
> case and in the sendpage case leaves the data untouched. Both cases
> return -EACESS to the user. Returning SK_PASS will allow the msg to
> be sent.
> 
> In the sendmsg case data is copied into kernel space buffers before
> running the BPF program. In the sendpage case data is never copied.
> The implication being users may change data after BPF programs run in
> the sendpage case. (A flag will be added to always copy shortly
> if the copy must always be performed).

I don't see how the sendpage case can be right.

The user can asynchronously change the page contents whenever they
want, and if the BPF program runs on the old contents then the verdict
is not for what actually ends up being sent on the socket.

There is really no way to cheaply freeze the page contents other than
to make a copy.
John Fastabend March 5, 2018, 10:53 p.m. UTC | #2
On 03/05/2018 01:40 PM, David Miller wrote:
> From: John Fastabend <john.fastabend@gmail.com>
> Date: Mon, 05 Mar 2018 11:51:22 -0800
> 
>> BPF_PROG_TYPE_SK_MSG supports only two return codes SK_PASS and
>> SK_DROP. Returning SK_DROP free's the copied data in the sendmsg
>> case and in the sendpage case leaves the data untouched. Both cases
>> return -EACESS to the user. Returning SK_PASS will allow the msg to
>> be sent.
>>
>> In the sendmsg case data is copied into kernel space buffers before
>> running the BPF program. In the sendpage case data is never copied.
>> The implication being users may change data after BPF programs run in
>> the sendpage case. (A flag will be added to always copy shortly
>> if the copy must always be performed).
> 
> I don't see how the sendpage case can be right.
> 
> The user can asynchronously change the page contents whenever they
> want, and if the BPF program runs on the old contents then the verdict
> is not for what actually ends up being sent on the socket> 
> There is really no way to cheaply freeze the page contents other than
> to make a copy.
> 

Right, so we have two cases. The first is we are not trying to protect
against malicious users but merely monitor the connection. This case
is primarily for L7 statistics, number of bytes sent to URL foo
for example. If users are changing data (for a real program not something
malicious) mid sendfile() this is really buggy anyways. There is no way to
know when/if the data is being copied lower in the stack. Even worse would
be if it changed a msg header, such as the http or kafka header, then
I don't see how such a program would work reliable at all. Some of my
L7 monitoring BPF programs fall into this category.

The second case is we want to implement a strict policy. For example
never allow user 'bar' to send to URL foo. In the current patches this
would be vulnerable to async data changes. I was planning to have a follow
up patch to this series to add a flag "always copy" which handles the
asynchronous case by always copying the data if the BPF policy can
not tolerate user changing data mid-send. Another class of BPF programs
I have fall into this bucket.

However, the performance cost of copy can be significant so allowing the
BPF policy to decide which mode they require seems best to me. I decided
to make the default no-copy to mirror the existing sendpage() semantics
and then to add the flag later. The flag support is not in this series
simply because I wanted to get the base support in first.

Make sense? The default could be to copy sendpage data and then a
flag could be made to allow it to skip the copy. But I prefer the
current defaults.

Thanks,
John
David Miller March 6, 2018, 5:42 a.m. UTC | #3
From: John Fastabend <john.fastabend@gmail.com>
Date: Mon, 5 Mar 2018 14:53:08 -0800

> I decided to make the default no-copy to mirror the existing
> sendpage() semantics and then to add the flag later. The flag
> support is not in this series simply because I wanted to get the
> base support in first.

What existing sendpage semantics are you referring to?
John Fastabend March 6, 2018, 6:22 a.m. UTC | #4
On 03/05/2018 09:42 PM, David Miller wrote:
> From: John Fastabend <john.fastabend@gmail.com>
> Date: Mon, 5 Mar 2018 14:53:08 -0800
> 
>> I decided to make the default no-copy to mirror the existing
>> sendpage() semantics and then to add the flag later. The flag
>> support is not in this series simply because I wanted to get the
>> base support in first.
> 
> What existing sendpage semantics are you referring to?
> 

All I meant by this is if an application uses sendfile() call
there is no good way to know when/if the kernel side will copy or
xmit the  data. So a reliable user space application will need to
only modify the data if it "knows" there are no outstanding sends
in-flight. So if we assume applications follow this then it
is OK to avoid the copy. Of course this is not good enough for
security, but for monitoring/statistics (my use case 1 it works).

By keep existing sendpage semantics I just meant applications
should already follow the above.
David Miller March 6, 2018, 6:42 a.m. UTC | #5
From: John Fastabend <john.fastabend@gmail.com>
Date: Mon, 5 Mar 2018 22:22:21 -0800

> All I meant by this is if an application uses sendfile() call
> there is no good way to know when/if the kernel side will copy or
> xmit the  data. So a reliable user space application will need to
> only modify the data if it "knows" there are no outstanding sends
> in-flight. So if we assume applications follow this then it
> is OK to avoid the copy. Of course this is not good enough for
> security, but for monitoring/statistics (my use case 1 it works).

For an application implementing a networking file system, it's pretty
legitimate for file contents to change before the page gets DMA's to
the networking card.

And that's perfectly fine, and we everything such that this will work
properly.

The card checksums what ends up being DMA'd so nothing from the
networking side is broken.

So this assumption you mention really does not hold.

There needs to be some feedback from the BPF program that parses the
packet.  This way it can say, "I need at least X more bytes before I
can generate a verdict".  And you keep copying more and more bytes
into a linear buffer and calling the parser over and over until it can
generate a full verdict or you run out of networking data.
John Fastabend March 6, 2018, 7:06 a.m. UTC | #6
On 03/05/2018 10:42 PM, David Miller wrote:
> From: John Fastabend <john.fastabend@gmail.com>
> Date: Mon, 5 Mar 2018 22:22:21 -0800
> 
>> All I meant by this is if an application uses sendfile() call
>> there is no good way to know when/if the kernel side will copy or
>> xmit the  data. So a reliable user space application will need to
>> only modify the data if it "knows" there are no outstanding sends
>> in-flight. So if we assume applications follow this then it
>> is OK to avoid the copy. Of course this is not good enough for
>> security, but for monitoring/statistics (my use case 1 it works).
> 
> For an application implementing a networking file system, it's pretty
> legitimate for file contents to change before the page gets DMA's to
> the networking card.
> 

Still there are useful BPF programs that can tolerate this. So I
would prefer to allow BPF programs to operate in the no-copy mode
if wanted. It doesn't have to be the default though as it currently
is. A l7 load balancer is a good example of this.

> And that's perfectly fine, and we everything such that this will work
> properly.
> 
> The card checksums what ends up being DMA'd so nothing from the
> networking side is broken.

Assuming the card has checksum support correct? Which is why we have
the SKBTX_SHARED_FRAG checked in skb_has_shared_frag() and the checksum
helpers called by the drivers when they do not support the protocol
being used. So probably OK assumption if using supported protocols and
hardware? Perhaps in general folks just use normal protocols and
hardware so it works.

> 
> So this assumption you mention really does not hold.
> 

OK.

> There needs to be some feedback from the BPF program that parses the
> packet.  This way it can say, "I need at least X more bytes before I
> can generate a verdict".  And you keep copying more and more bytes
> into a linear buffer and calling the parser over and over until it can
> generate a full verdict or you run out of networking data.
> 

So the "I need at least X more bytes" is the msg_cork_bytes() in patch
7. I could handle the sendpage case the same as I handle the sendmsg
case and copy the data into the buffer until N bytes are received. I
had planned to add this mode in a follow up series but could add it in
this series so we have all the pieces in one submission.

Although I used a scatterlist instead of a linear buffer. I was
planning to add a helper to pull in next sg list item if needed
rather than try to allocate a large linear block up front.
David Miller March 6, 2018, 3:47 p.m. UTC | #7
From: John Fastabend <john.fastabend@gmail.com>
Date: Mon, 5 Mar 2018 23:06:01 -0800

> On 03/05/2018 10:42 PM, David Miller wrote:
>> From: John Fastabend <john.fastabend@gmail.com>
>> Date: Mon, 5 Mar 2018 22:22:21 -0800
>> 
>>> All I meant by this is if an application uses sendfile() call
>>> there is no good way to know when/if the kernel side will copy or
>>> xmit the  data. So a reliable user space application will need to
>>> only modify the data if it "knows" there are no outstanding sends
>>> in-flight. So if we assume applications follow this then it
>>> is OK to avoid the copy. Of course this is not good enough for
>>> security, but for monitoring/statistics (my use case 1 it works).
>> 
>> For an application implementing a networking file system, it's pretty
>> legitimate for file contents to change before the page gets DMA's to
>> the networking card.
>> 
> 
> Still there are useful BPF programs that can tolerate this. So I
> would prefer to allow BPF programs to operate in the no-copy mode
> if wanted. It doesn't have to be the default though as it currently
> is. A l7 load balancer is a good example of this.

Maybe I'd be ok if it were not the default.  But do you really want to
expose a potential attack vector, even if the app gets to choose and
say "I'm ok"?

>> And that's perfectly fine, and we everything such that this will work
>> properly.
>> 
>> The card checksums what ends up being DMA'd so nothing from the
>> networking side is broken.
> 
> Assuming the card has checksum support correct? Which is why we have
> the SKBTX_SHARED_FRAG checked in skb_has_shared_frag() and the checksum
> helpers called by the drivers when they do not support the protocol
> being used. So probably OK assumption if using supported protocols and
> hardware? Perhaps in general folks just use normal protocols and
> hardware so it works.

If the hardware doesn't support the checksums, we linearize the SKB
(therefore obtain a snapshot of the data), and checksum.  Exactly what
would happen if the hardware did the checksum.

So OK in that case too.

We always guarantee that you will always get a correct checksum on
outgoing packets, even if you modify the page contents meanwhile.

> So the "I need at least X more bytes" is the msg_cork_bytes() in patch
> 7. I could handle the sendpage case the same as I handle the sendmsg
> case and copy the data into the buffer until N bytes are received. I
> had planned to add this mode in a follow up series but could add it in
> this series so we have all the pieces in one submission.
> 
> Although I used a scatterlist instead of a linear buffer. I was
> planning to add a helper to pull in next sg list item if needed
> rather than try to allocate a large linear block up front.

For non-deep packet inspection cases this re-running of the parser case
will probably not trigger at all.
John Fastabend March 6, 2018, 6:18 p.m. UTC | #8
On 03/06/2018 07:47 AM, David Miller wrote:
> From: John Fastabend <john.fastabend@gmail.com>
> Date: Mon, 5 Mar 2018 23:06:01 -0800
> 
>> On 03/05/2018 10:42 PM, David Miller wrote:
>>> From: John Fastabend <john.fastabend@gmail.com>
>>> Date: Mon, 5 Mar 2018 22:22:21 -0800
>>>
>>>> All I meant by this is if an application uses sendfile() call
>>>> there is no good way to know when/if the kernel side will copy or
>>>> xmit the  data. So a reliable user space application will need to
>>>> only modify the data if it "knows" there are no outstanding sends
>>>> in-flight. So if we assume applications follow this then it
>>>> is OK to avoid the copy. Of course this is not good enough for
>>>> security, but for monitoring/statistics (my use case 1 it works).
>>>
>>> For an application implementing a networking file system, it's pretty
>>> legitimate for file contents to change before the page gets DMA's to
>>> the networking card.
>>>
>>
>> Still there are useful BPF programs that can tolerate this. So I
>> would prefer to allow BPF programs to operate in the no-copy mode
>> if wanted. It doesn't have to be the default though as it currently
>> is. A l7 load balancer is a good example of this.
> 
> Maybe I'd be ok if it were not the default.  But do you really want to
> expose a potential attack vector, even if the app gets to choose and
> say "I'm ok"?
> 

Yes, because I have use cases where I don't need to read the data, but
have already "approved" the data. One example applications like
nginx can serve static http data. Just reading over the code what they
do, when sendfile is enabled, is a sendmsg call with the header. We want
to enforce the policy on the header. Then we know the next N bytes are
OK. Nginx will then send the payload over sendfile syscall. We already
know the data is good from initial sendmsg call the next N bytes can
get the verdict SK_PASS without even touching the data. If we do a
copy in this case we see significant performance degradation.

The other use case is the L7 load balancer mentioned above. If we are
using RR policies or some other heuristic if the user modifies the
payload after the BPF verdict that is also fine. A malicious user
could rewrite the header and try to game the load balancer but the
BPF program can always just dev/null (SK_DROP) the application when
it detects this. This also assumes the load balancer is using the
header for its heuristic some interesting heuristics may not use
the header at all.

>>> And that's perfectly fine, and we everything such that this will work
>>> properly.
>>>
>>> The card checksums what ends up being DMA'd so nothing from the
>>> networking side is broken.
>>
>> Assuming the card has checksum support correct? Which is why we have
>> the SKBTX_SHARED_FRAG checked in skb_has_shared_frag() and the checksum
>> helpers called by the drivers when they do not support the protocol
>> being used. So probably OK assumption if using supported protocols and
>> hardware? Perhaps in general folks just use normal protocols and
>> hardware so it works.
> 
> If the hardware doesn't support the checksums, we linearize the SKB
> (therefore obtain a snapshot of the data), and checksum.  Exactly what
> would happen if the hardware did the checksum.
> 
> So OK in that case too.
> 
> We always guarantee that you will always get a correct checksum on
> outgoing packets, even if you modify the page contents meanwhile.
> 

Agreed the checksum is correct, but the user doesn't know if the linearize
happened while it was modifying the data, potentially creating data with
a partial update. Because the user modifying the data doesn't block the
linearize operation in the kernel and vice versa the linearize operation
can happen in parallel with the user side data modification. So maybe
I'm still missing something but it seems the data can be in some unknown
state on the wire.

Either way though I think its fine to make the default sendpage hook do
the copy. A flag to avoid the copy can be added later to resolve my use
cases above. I'll code this up in a v2 today/tomorrow.

>> So the "I need at least X more bytes" is the msg_cork_bytes() in patch
>> 7. I could handle the sendpage case the same as I handle the sendmsg
>> case and copy the data into the buffer until N bytes are received. I
>> had planned to add this mode in a follow up series but could add it in
>> this series so we have all the pieces in one submission.
>>
>> Although I used a scatterlist instead of a linear buffer. I was
>> planning to add a helper to pull in next sg list item if needed
>> rather than try to allocate a large linear block up front.
> 
> For non-deep packet inspection cases this re-running of the parser case
> will probably not trigger at all.
> 

Agreed, its mostly there to handle cases where the sendmsg call
only sent part of a application (kafka, http, etc) header. This can
happen if user is sending multiple messages in a single sendmsg/sendfile
call. But, yeah I see it rarely in practice its mostly there for
completeness and to handle these edge cases.
John Fastabend March 7, 2018, 3:25 a.m. UTC | #9
On 03/06/2018 10:18 AM, John Fastabend wrote:
> On 03/06/2018 07:47 AM, David Miller wrote:
>> From: John Fastabend <john.fastabend@gmail.com>
>> Date: Mon, 5 Mar 2018 23:06:01 -0800
>>
>>> On 03/05/2018 10:42 PM, David Miller wrote:
>>>> From: John Fastabend <john.fastabend@gmail.com>
>>>> Date: Mon, 5 Mar 2018 22:22:21 -0800
>>>>
>>>>> All I meant by this is if an application uses sendfile() call
>>>>> there is no good way to know when/if the kernel side will copy or
>>>>> xmit the  data. So a reliable user space application will need to
>>>>> only modify the data if it "knows" there are no outstanding sends
>>>>> in-flight. So if we assume applications follow this then it
>>>>> is OK to avoid the copy. Of course this is not good enough for
>>>>> security, but for monitoring/statistics (my use case 1 it works).
>>>>
>>>> For an application implementing a networking file system, it's pretty
>>>> legitimate for file contents to change before the page gets DMA's to
>>>> the networking card.
>>>>
>>>
>>> Still there are useful BPF programs that can tolerate this. So I
>>> would prefer to allow BPF programs to operate in the no-copy mode
>>> if wanted. It doesn't have to be the default though as it currently
>>> is. A l7 load balancer is a good example of this.
>>
>> Maybe I'd be ok if it were not the default.  But do you really want to
>> expose a potential attack vector, even if the app gets to choose and
>> say "I'm ok"?
>>
> 
> Yes, because I have use cases where I don't need to read the data, but
> have already "approved" the data. One example applications like
> nginx can serve static http data. Just reading over the code what they
> do, when sendfile is enabled, is a sendmsg call with the header. We want
> to enforce the policy on the header. Then we know the next N bytes are
> OK. Nginx will then send the payload over sendfile syscall. We already
> know the data is good from initial sendmsg call the next N bytes can
> get the verdict SK_PASS without even touching the data. If we do a
> copy in this case we see significant performance degradation.
> 
> The other use case is the L7 load balancer mentioned above. If we are
> using RR policies or some other heuristic if the user modifies the
> payload after the BPF verdict that is also fine. A malicious user
> could rewrite the header and try to game the load balancer but the
> BPF program can always just dev/null (SK_DROP) the application when
> it detects this. This also assumes the load balancer is using the
> header for its heuristic some interesting heuristics may not use
> the header at all.
> 
>>>> And that's perfectly fine, and we everything such that this will work
>>>> properly.
>>>>
>>>> The card checksums what ends up being DMA'd so nothing from the
>>>> networking side is broken.
>>>
>>> Assuming the card has checksum support correct? Which is why we have
>>> the SKBTX_SHARED_FRAG checked in skb_has_shared_frag() and the checksum
>>> helpers called by the drivers when they do not support the protocol
>>> being used. So probably OK assumption if using supported protocols and
>>> hardware? Perhaps in general folks just use normal protocols and
>>> hardware so it works.
>>
>> If the hardware doesn't support the checksums, we linearize the SKB
>> (therefore obtain a snapshot of the data), and checksum.  Exactly what
>> would happen if the hardware did the checksum.
>>
>> So OK in that case too.
>>
>> We always guarantee that you will always get a correct checksum on
>> outgoing packets, even if you modify the page contents meanwhile.
>>
> 
> Agreed the checksum is correct, but the user doesn't know if the linearize
> happened while it was modifying the data, potentially creating data with
> a partial update. Because the user modifying the data doesn't block the
> linearize operation in the kernel and vice versa the linearize operation
> can happen in parallel with the user side data modification. So maybe
> I'm still missing something but it seems the data can be in some unknown
> state on the wire.
> 
> Either way though I think its fine to make the default sendpage hook do
> the copy. A flag to avoid the copy can be added later to resolve my use
> cases above. I'll code this up in a v2 today/tomorrow.

Hi,

Thought about this a bit more and chatted with Daniel a bit. I think
a better solution is to set data_start = data_end = 0 by default in the
sendpage case. This will disallow any read/writes into the sendpage
data. Then if the user needs to read/write data we can use a helper
bpf_sk_msg_pull_data(start_byte, end_byte) which can pull the data into a
linear buffer as needed. This will ensure any user writes will not
change data after the BPF verdict (your concern). Also it will minimize
the amount of data that needs to be copied (my concern). In some of my
use cases where no data is needed we can simple not use the helper. Then
on the sendmsg side we can continue to set the (data_start, data_end)
pointers to the first scatterlist element. But, also use this helper to
set the data pointers past the first scatterlist element if needed. So
if someone wants to read past the first 4k bytes on a large send for
example this can be done with the above helper. BPF programs just
need to check (start,end) data pointers and can be oblivious to
if the program is being invoked by a call from sendpage or sendmsg.

I think this is a fairly elegant solution. Finally we can further
optimize later with a flag if needed to cover the case where we
want to read lots of bytes but _not_ do the copy. We can debate
the usefulness of this later with actual perf data.

All this has the added bonus that all I need is another patch on
top to add the helper. Pseudo code might look like this,

my_bpf_prog(struct sk_msg_md *msg) {
	void *data_end = msg->data_end;
	void *data_start = msg->data_start;

	need = PARSE_BYTES;

	// ensure user actually sent full header
	if (msg->size < PARSE_BYTES) {
		bpf_msg_cork(PARSE_BYTES);
		return SK_DROP;
	}

	/* ensure we can read full header, if this is a
	 * sendmsg system call AND PARSE_BYTES are all in
	 * the first scatterlist elem this is a no-op.
	 * If this is a sendpage call will put PARSE_BYTES
	 * in a psock buffer to avoid user modifications.
	 */
	if (data_end - data_start < PARSE_BYTES) {
		err = bpf_sk_msg_pull_data(0, PARSE_BYTES, flags);
		if (err)
			return SK_DROP;
	}

	// we have the full header parse it now
	verdict = my_bpf_header_parser(msg);
	return verdict;
}

Future optimization can work with prologue to pull in bytes
more efficiently. And for what its worth I found a couple bugs
in the error path of the sendpage hook I can fix in the v2 as well.

What do you think? 

@Daniel, sound more or less like what you were thinking?

> 
>>> So the "I need at least X more bytes" is the msg_cork_bytes() in patch
>>> 7. I could handle the sendpage case the same as I handle the sendmsg
>>> case and copy the data into the buffer until N bytes are received. I
>>> had planned to add this mode in a follow up series but could add it in
>>> this series so we have all the pieces in one submission.
>>>
>>> Although I used a scatterlist instead of a linear buffer. I was
>>> planning to add a helper to pull in next sg list item if needed
>>> rather than try to allocate a large linear block up front.
>>
>> For non-deep packet inspection cases this re-running of the parser case
>> will probably not trigger at all.
>>
> 
> Agreed, its mostly there to handle cases where the sendmsg call
> only sent part of a application (kafka, http, etc) header. This can
> happen if user is sending multiple messages in a single sendmsg/sendfile
> call. But, yeah I see it rarely in practice its mostly there for
> completeness and to handle these edge cases.
>
David Miller March 7, 2018, 4:41 a.m. UTC | #10
From: John Fastabend <john.fastabend@gmail.com>
Date: Tue, 6 Mar 2018 19:25:01 -0800

> What do you think? 

Sounds good from your description, I can't wait to see it :-)
Daniel Borkmann March 7, 2018, 1:03 p.m. UTC | #11
On 03/07/2018 04:25 AM, John Fastabend wrote:
[...]
> Thought about this a bit more and chatted with Daniel a bit. I think
> a better solution is to set data_start = data_end = 0 by default in the
> sendpage case. This will disallow any read/writes into the sendpage
> data. Then if the user needs to read/write data we can use a helper
> bpf_sk_msg_pull_data(start_byte, end_byte) which can pull the data into a
> linear buffer as needed. This will ensure any user writes will not
> change data after the BPF verdict (your concern). Also it will minimize
> the amount of data that needs to be copied (my concern). In some of my
> use cases where no data is needed we can simple not use the helper. Then
> on the sendmsg side we can continue to set the (data_start, data_end)
> pointers to the first scatterlist element. But, also use this helper to
> set the data pointers past the first scatterlist element if needed. So
> if someone wants to read past the first 4k bytes on a large send for
> example this can be done with the above helper. BPF programs just
> need to check (start,end) data pointers and can be oblivious to
> if the program is being invoked by a call from sendpage or sendmsg.
> 
> I think this is a fairly elegant solution. Finally we can further
> optimize later with a flag if needed to cover the case where we
> want to read lots of bytes but _not_ do the copy. We can debate
> the usefulness of this later with actual perf data.
> 
> All this has the added bonus that all I need is another patch on
> top to add the helper. Pseudo code might look like this,
> 
> my_bpf_prog(struct sk_msg_md *msg) {
> 	void *data_end = msg->data_end;
> 	void *data_start = msg->data_start;
> 
> 	need = PARSE_BYTES;
> 
> 	// ensure user actually sent full header
> 	if (msg->size < PARSE_BYTES) {
> 		bpf_msg_cork(PARSE_BYTES);
> 		return SK_DROP;
> 	}
> 
> 	/* ensure we can read full header, if this is a
> 	 * sendmsg system call AND PARSE_BYTES are all in
> 	 * the first scatterlist elem this is a no-op.
> 	 * If this is a sendpage call will put PARSE_BYTES
> 	 * in a psock buffer to avoid user modifications.
> 	 */
> 	if (data_end - data_start < PARSE_BYTES) {

I think it might need to look like 'data_start + PARSE_BYTES > data_end'
for verifier to recognize (unless LLVM generates code that way).

> 		err = bpf_sk_msg_pull_data(0, PARSE_BYTES, flags);
> 		if (err)
> 			return SK_DROP;

Above should be:

		if (unlikely(err || data_start + PARSE_BYTES > data_end))
			return SK_DROP;

Here for the successful case, you need to recheck since data pointers
were invalidated due to the helper call. bpf_sk_msg_pull_data() would
for the very first case potentially be called unconditionally at prog
start though when you start out with 0 len anyway, basically right after
msg->size test.

> 	}
> 
> 	// we have the full header parse it now
> 	verdict = my_bpf_header_parser(msg);
> 	return verdict;
> }
> 
> Future optimization can work with prologue to pull in bytes
> more efficiently. And for what its worth I found a couple bugs
> in the error path of the sendpage hook I can fix in the v2 as well.
> 
> What do you think? 
> 
> @Daniel, sound more or less like what you were thinking?

Yes, absolutely what I was thinking.

We have exactly the same logic in tc/BPF today for the case when the direct
packet access test fails and we want to pull in skb data from non-linear
area, so we can in such case just call bpf_skb_pull_data(skb, len) and redo
the test to access it privately after that.

Thanks,
Daniel
diff mbox series

Patch

diff --git a/include/linux/bpf.h b/include/linux/bpf.h
index 66df387..819229c 100644
--- a/include/linux/bpf.h
+++ b/include/linux/bpf.h
@@ -21,6 +21,7 @@ 
 struct perf_event;
 struct bpf_prog;
 struct bpf_map;
+struct sock;
 
 /* map is generic key/value storage optionally accesible by eBPF programs */
 struct bpf_map_ops {
diff --git a/include/linux/bpf_types.h b/include/linux/bpf_types.h
index 19b8349..5e2e8a4 100644
--- a/include/linux/bpf_types.h
+++ b/include/linux/bpf_types.h
@@ -13,6 +13,7 @@ 
 BPF_PROG_TYPE(BPF_PROG_TYPE_LWT_XMIT, lwt_xmit)
 BPF_PROG_TYPE(BPF_PROG_TYPE_SOCK_OPS, sock_ops)
 BPF_PROG_TYPE(BPF_PROG_TYPE_SK_SKB, sk_skb)
+BPF_PROG_TYPE(BPF_PROG_TYPE_SK_MSG, sk_msg)
 #endif
 #ifdef CONFIG_BPF_EVENTS
 BPF_PROG_TYPE(BPF_PROG_TYPE_KPROBE, kprobe)
diff --git a/include/linux/filter.h b/include/linux/filter.h
index fdb691b..15c663e 100644
--- a/include/linux/filter.h
+++ b/include/linux/filter.h
@@ -507,6 +507,19 @@  struct xdp_buff {
 	struct xdp_rxq_info *rxq;
 };
 
+struct sk_msg_buff {
+	void *data;
+	void *data_end;
+	int sg_start;
+	int sg_curr;
+	int sg_end;
+	int sg_size;
+	struct scatterlist sg_data[MAX_SKB_FRAGS];
+	__u32 key;
+	__u32 flags;
+	struct bpf_map *map;
+};
+
 /* Compute the linear packet data range [data, data_end) which
  * will be accessed by various program types (cls_bpf, act_bpf,
  * lwt, ...). Subsystems allowing direct data access must (!)
@@ -771,6 +784,7 @@  int xdp_do_redirect(struct net_device *dev,
 void bpf_warn_invalid_xdp_action(u32 act);
 
 struct sock *do_sk_redirect_map(struct sk_buff *skb);
+struct sock *do_msg_redirect_map(struct sk_msg_buff *md);
 
 #ifdef CONFIG_BPF_JIT
 extern int bpf_jit_enable;
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index 2a66769..b8275f0 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -133,6 +133,7 @@  enum bpf_prog_type {
 	BPF_PROG_TYPE_SOCK_OPS,
 	BPF_PROG_TYPE_SK_SKB,
 	BPF_PROG_TYPE_CGROUP_DEVICE,
+	BPF_PROG_TYPE_SK_MSG,
 };
 
 enum bpf_attach_type {
@@ -143,6 +144,7 @@  enum bpf_attach_type {
 	BPF_SK_SKB_STREAM_PARSER,
 	BPF_SK_SKB_STREAM_VERDICT,
 	BPF_CGROUP_DEVICE,
+	BPF_SK_MSG_VERDICT,
 	__MAX_BPF_ATTACH_TYPE
 };
 
@@ -696,6 +698,15 @@  enum bpf_attach_type {
  * int bpf_override_return(pt_regs, rc)
  *	@pt_regs: pointer to struct pt_regs
  *	@rc: the return value to set
+ *
+ * int bpf_msg_redirect_map(map, key, flags)
+ *     Redirect msg to a sock in map using key as a lookup key for the
+ *     sock in map.
+ *     @map: pointer to sockmap
+ *     @key: key to lookup sock in map
+ *     @flags: reserved for future use
+ *     Return: SK_PASS
+ *
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -757,7 +768,8 @@  enum bpf_attach_type {
 	FN(perf_prog_read_value),	\
 	FN(getsockopt),			\
 	FN(override_return),		\
-	FN(sock_ops_cb_flags_set),
+	FN(sock_ops_cb_flags_set),	\
+	FN(msg_redirect_map),
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
  * function eBPF program intends to call
@@ -920,6 +932,20 @@  enum sk_action {
 	SK_PASS,
 };
 
+/* User return codes for SK_MSG prog type. */
+enum sk_msg_action {
+	SK_MSG_DROP = 0,
+	SK_MSG_PASS,
+};
+
+/* user accessible metadata for SK_MSG packet hook, new fields must
+ * be added to the end of this structure
+ */
+struct sk_msg_md {
+	__u32 data;
+	__u32 data_end;
+};
+
 #define BPF_TAG_SIZE	8
 
 struct bpf_prog_info {
diff --git a/kernel/bpf/sockmap.c b/kernel/bpf/sockmap.c
index 051b2242..0fd5556 100644
--- a/kernel/bpf/sockmap.c
+++ b/kernel/bpf/sockmap.c
@@ -38,6 +38,7 @@ 
 #include <linux/skbuff.h>
 #include <linux/workqueue.h>
 #include <linux/list.h>
+#include <linux/mm.h>
 #include <net/strparser.h>
 #include <net/tcp.h>
 
@@ -47,6 +48,7 @@ 
 struct bpf_stab {
 	struct bpf_map map;
 	struct sock **sock_map;
+	struct bpf_prog *bpf_tx_msg;
 	struct bpf_prog *bpf_parse;
 	struct bpf_prog *bpf_verdict;
 };
@@ -74,6 +76,7 @@  struct smap_psock {
 	struct sk_buff *save_skb;
 
 	struct strparser strp;
+	struct bpf_prog *bpf_tx_msg;
 	struct bpf_prog *bpf_parse;
 	struct bpf_prog *bpf_verdict;
 	struct list_head maps;
@@ -91,6 +94,11 @@  struct smap_psock {
 	void (*save_write_space)(struct sock *sk);
 };
 
+static void smap_release_sock(struct smap_psock *psock, struct sock *sock);
+static int bpf_tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size);
+static int bpf_tcp_sendpage(struct sock *sk, struct page *page,
+			    int offset, size_t size, int flags);
+
 static inline struct smap_psock *smap_psock_sk(const struct sock *sk)
 {
 	return rcu_dereference_sk_user_data(sk);
@@ -115,6 +123,12 @@  static int bpf_tcp_init(struct sock *sk)
 
 	psock->save_close = sk->sk_prot->close;
 	psock->sk_proto = sk->sk_prot;
+
+	if (psock->bpf_tx_msg) {
+		tcp_bpf_proto.sendmsg = bpf_tcp_sendmsg;
+		tcp_bpf_proto.sendpage = bpf_tcp_sendpage;
+	}
+
 	sk->sk_prot = &tcp_bpf_proto;
 	rcu_read_unlock();
 	return 0;
@@ -174,6 +188,7 @@  enum __sk_action {
 	__SK_DROP = 0,
 	__SK_PASS,
 	__SK_REDIRECT,
+	__SK_NONE,
 };
 
 static struct tcp_ulp_ops bpf_tcp_ulp_ops __read_mostly = {
@@ -185,10 +200,459 @@  enum __sk_action {
 	.release	= bpf_tcp_release,
 };
 
+static int memcopy_from_iter(struct sock *sk,
+			     struct sk_msg_buff *md,
+			     struct iov_iter *from, int bytes)
+{
+	struct scatterlist *sg = md->sg_data;
+	int i = md->sg_curr, rc = 0;
+
+	do {
+		int copy;
+		char *to;
+
+		copy = sg[i].length;
+		to = sg_virt(&sg[i]);
+
+		if (sk->sk_route_caps & NETIF_F_NOCACHE_COPY)
+			rc = copy_from_iter_nocache(to, copy, from);
+		else
+			rc = copy_from_iter(to, copy, from);
+
+		if (rc != copy) {
+			rc = -EFAULT;
+			goto out;
+		}
+
+		bytes -= copy;
+		if (!bytes)
+			break;
+
+		if (++i == MAX_SKB_FRAGS)
+			i = 0;
+	} while (i != md->sg_end);
+out:
+	md->sg_curr = i;
+	return rc;
+}
+
+static int bpf_tcp_push(struct sock *sk,
+			struct smap_psock *psock, struct sk_msg_buff *md,
+			int flags, bool uncharge)
+{
+	struct scatterlist *sg;
+	int offset, ret = 0;
+	struct page *p;
+	size_t size;
+
+	while (1) {
+		sg = md->sg_data + md->sg_start;
+		size = sg->length;
+		offset = sg->offset;
+
+		tcp_rate_check_app_limited(sk);
+		p = sg_page(sg);
+retry:
+		ret = do_tcp_sendpages(sk, p, offset, size, flags);
+		if (ret != size) {
+			if (ret > 0) {
+				size -= ret;
+				offset += ret;
+				if (uncharge)
+					sk_mem_uncharge(sk, ret);
+				goto retry;
+			}
+
+			sg->length = size;
+			sg->offset = offset;
+			return ret;
+		}
+
+		put_page(p);
+		sg->offset += ret;
+		sg->length -= ret;
+		if (uncharge)
+			sk_mem_uncharge(sk, ret);
+
+		if (!sg->length) {
+			put_page(p);
+			md->sg_start++;
+			if (md->sg_start == MAX_SKB_FRAGS)
+				md->sg_start = 0;
+			memset(sg, 0, sizeof(*sg));
+		}
+
+		if (md->sg_start == md->sg_end)
+			break;
+	}
+	return 0;
+}
+
+static inline void bpf_compute_data_pointers_sg(struct sk_msg_buff *md)
+{
+	struct scatterlist *sg = md->sg_data + md->sg_start;
+
+	md->data = sg_virt(sg);
+	md->data_end = md->data + sg->length;
+}
+
+static void return_mem_sg(struct sock *sk, struct sk_msg_buff *md)
+{
+	struct scatterlist *sg = md->sg_data;
+	int i;
+
+	i = md->sg_start;
+	do {
+		sk_mem_uncharge(sk, sg[i].length);
+
+		i++;
+		if (i == MAX_SKB_FRAGS)
+			i = 0;
+	} while (i != md->sg_end);
+}
+
+static int free_sg(struct sock *sk, int start, struct sk_msg_buff *md)
+{
+	struct scatterlist *sg = md->sg_data;
+	int i = start, free = 0;
+
+	while (sg[i].length) {
+		free += sg[i].length;
+		sk_mem_uncharge(sk, sg[i].length);
+		put_page(sg_page(&sg[i]));
+		sg[i].length = 0;
+		sg[i].page_link = 0;
+		sg[i].offset = 0;
+		i++;
+
+		if (i == MAX_SKB_FRAGS)
+			i = 0;
+	}
+
+	return free;
+}
+
+static int free_start_sg(struct sock *sk, struct sk_msg_buff *md)
+{
+	int free = free_sg(sk, md->sg_start, md);
+
+	md->sg_start = md->sg_end;
+	return free;
+}
+
+static int free_curr_sg(struct sock *sk, struct sk_msg_buff *md)
+{
+	return free_sg(sk, md->sg_curr, md);
+}
+
+static int bpf_map_msg_verdict(int _rc, struct sk_msg_buff *md)
+{
+	return ((_rc == SK_PASS) ?
+	       (md->map ? __SK_REDIRECT : __SK_PASS) :
+	       __SK_DROP);
+}
+
+static unsigned int smap_do_tx_msg(struct sock *sk,
+				   struct smap_psock *psock,
+				   struct sk_msg_buff *md)
+{
+	struct bpf_prog *prog;
+	unsigned int rc, _rc;
+
+	preempt_disable();
+	rcu_read_lock();
+
+	/* If the policy was removed mid-send then default to 'accept' */
+	prog = READ_ONCE(psock->bpf_tx_msg);
+	if (unlikely(!prog)) {
+		_rc = SK_PASS;
+		goto verdict;
+	}
+
+	bpf_compute_data_pointers_sg(md);
+	rc = (*prog->bpf_func)(md, prog->insnsi);
+
+	/* Moving return codes from UAPI namespace into internal namespace */
+	_rc = bpf_map_msg_verdict(rc, md);
+verdict:
+	rcu_read_unlock();
+	preempt_enable();
+
+	return _rc;
+}
+
+static int bpf_tcp_sendmsg_do_redirect(struct sk_msg_buff *md,
+				       int flags)
+{
+	struct smap_psock *psock;
+	struct scatterlist *sg;
+	int i, err, free = 0;
+	struct sock *sk;
+
+	sg = md->sg_data;
+
+	rcu_read_lock();
+	sk = do_msg_redirect_map(md);
+	if (unlikely(!sk))
+		goto out_rcu;
+
+	psock = smap_psock_sk(sk);
+	if (unlikely(!psock))
+		goto out_rcu;
+
+	if (!refcount_inc_not_zero(&psock->refcnt))
+		goto out_rcu;
+
+	rcu_read_unlock();
+	lock_sock(sk);
+	err = bpf_tcp_push(sk, psock, md, flags, false);
+	release_sock(sk);
+	smap_release_sock(psock, sk);
+	if (unlikely(err))
+		goto out;
+	return 0;
+out_rcu:
+	rcu_read_unlock();
+out:
+	i = md->sg_start;
+	while (sg[i].length) {
+		free += sg[i].length;
+		put_page(sg_page(&sg[i]));
+		sg[i].length = 0;
+		i++;
+		if (i == MAX_SKB_FRAGS)
+			i = 0;
+	}
+	return free;
+}
+
+static inline void bpf_md_init(struct sk_msg_buff *md)
+{
+	md->sg_size = 0;
+}
+
+static int bpf_tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
+{
+	int flags = msg->msg_flags | MSG_NO_SHARED_FRAGS;
+	int err = 0, eval = __SK_NONE;
+	struct sk_msg_buff md = {0};
+	unsigned int sg_copy = 0;
+	struct smap_psock *psock;
+	size_t copy, copied = 0;
+	struct scatterlist *sg;
+	long timeo;
+
+	/* Its possible a sock event or user removed the psock _but_ the ops
+	 * have not been reprogrammed yet so we get here. In this case fallback
+	 * to tcp_sendmsg. Note this only works because we _only_ ever allow
+	 * a single ULP there is no hierarchy here.
+	 */
+	rcu_read_lock();
+	psock = smap_psock_sk(sk);
+	if (unlikely(!psock)) {
+		rcu_read_unlock();
+		return tcp_sendmsg(sk, msg, size);
+	}
+
+	/* Increment the psock refcnt to ensure its not released while sending a
+	 * message. Required because sk lookup and bpf programs are used in
+	 * separate rcu critical sections. Its OK if we lose the map entry
+	 * but we can't lose the sock reference, possible when the refcnt hits
+	 * zero and garbage collection calls sock_put().
+	 */
+	if (!refcount_inc_not_zero(&psock->refcnt)) {
+		rcu_read_unlock();
+		return tcp_sendmsg(sk, msg, size);
+	}
+
+	sg = md.sg_data;
+	sg_init_table(sg, MAX_SKB_FRAGS);
+	rcu_read_unlock();
+
+	lock_sock(sk);
+	timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
+
+	md.sg_size = 0;
+
+	while (msg_data_left(msg)) {
+		if (sk->sk_err) {
+			err = sk->sk_err;
+			goto out_err;
+		}
+
+		copy = msg_data_left(msg);
+		if (!sk_stream_memory_free(sk))
+			goto wait_for_sndbuf;
+
+		md.sg_curr = md.sg_end;
+		err = sk_alloc_sg(sk, copy, sg,
+				  md.sg_start, &md.sg_end, &sg_copy,
+				  md.sg_end);
+		if (err) {
+			if (err != -ENOSPC)
+				goto wait_for_memory;
+			copy = sg_copy;
+		}
+
+		err = memcopy_from_iter(sk, &md, &msg->msg_iter, copy);
+		if (err < 0) {
+			free_curr_sg(sk, &md);
+			goto out_err;
+		}
+
+		copied += copy;
+		sg_copy = 0;
+		/* If msg is larger than MAX_SKB_FRAGS we can send multiple
+		 * scatterlists per msg. However BPF decisions apply to the
+		 * entire msg.
+		 */
+		if (eval == __SK_NONE)
+			eval = smap_do_tx_msg(sk, psock, &md);
+
+		switch (eval) {
+		case __SK_PASS:
+			err = bpf_tcp_push(sk, psock, &md, flags, true);
+			if (unlikely(err)) {
+				copied -= free_start_sg(sk, &md);
+				goto out_err;
+			}
+			break;
+		case __SK_REDIRECT:
+			return_mem_sg(sk, &md);
+			release_sock(sk);
+			err = bpf_tcp_sendmsg_do_redirect(&md, flags);
+			if (unlikely(err)) {
+				copied -= err;
+				goto out_redir;
+			}
+			lock_sock(sk);
+			break;
+		case __SK_DROP:
+		default:
+			copied -= free_start_sg(sk, &md);
+			goto out_err;
+		}
+
+		bpf_md_init(&md);
+		continue;
+wait_for_sndbuf:
+		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+wait_for_memory:
+		err = sk_stream_wait_memory(sk, &timeo);
+		if (err)
+			goto out_err;
+	}
+out_err:
+	bpf_md_init(&md);
+	if (err < 0)
+		err = sk_stream_error(sk, msg->msg_flags, err);
+	release_sock(sk);
+out_redir:
+	smap_release_sock(psock, sk);
+	return copied ? copied : err;
+}
+
+static int bpf_tcp_sendpage_do_redirect(struct page *page, int offset,
+					size_t size, int flags,
+					struct sk_msg_buff *md)
+{
+	struct smap_psock *psock;
+	struct sock *sk;
+	int rc;
+
+	rcu_read_lock();
+	sk = do_msg_redirect_map(md);
+	if (unlikely(!sk))
+		goto out_rcu;
+
+	psock = smap_psock_sk(sk);
+	if (unlikely(!psock))
+		goto out_rcu;
+
+	if (!refcount_inc_not_zero(&psock->refcnt))
+		goto out_rcu;
+
+	rcu_read_unlock();
+
+	lock_sock(sk);
+	rc = tcp_sendpage_locked(sk, page, offset, size, flags);
+	release_sock(sk);
+
+	smap_release_sock(psock, sk);
+	return rc;
+out_rcu:
+	rcu_read_unlock();
+	return -EINVAL;
+}
+
+static int bpf_tcp_sendpage(struct sock *sk, struct page *page,
+			    int offset, size_t size, int flags)
+{
+	struct sk_msg_buff md = {0};
+	struct smap_psock *psock;
+	int rc, _rc = __SK_PASS;
+	struct bpf_prog *prog;
+
+	preempt_disable();
+	rcu_read_lock();
+	psock = smap_psock_sk(sk);
+	if (unlikely(!psock))
+		goto verdict;
+
+	/* If the policy was removed mid-send then default to 'accept' */
+	prog = READ_ONCE(psock->bpf_tx_msg);
+	if (unlikely(!prog))
+		goto verdict;
+
+	/* Calculate pkt data pointers and run BPF program */
+	md.data = page_address(page) + offset;
+	md.data_end = md.data + size;
+	_rc = (*prog->bpf_func)(&md, prog->insnsi);
+
+verdict:
+	rcu_read_unlock();
+	preempt_enable();
+
+	/* Moving return codes from UAPI namespace into internal namespace */
+	rc = bpf_map_msg_verdict(_rc, &md);
+
+	switch (rc) {
+	case __SK_PASS:
+		lock_sock(sk);
+		rc = tcp_sendpage_locked(sk, page, offset, size, flags);
+		release_sock(sk);
+		break;
+	case __SK_REDIRECT:
+		rc = bpf_tcp_sendpage_do_redirect(page, offset, size, flags,
+						  &md);
+		break;
+	case __SK_DROP:
+	default:
+		rc = -EACCES;
+	}
+
+	return rc;
+}
+
+static void bpf_tcp_msg_add(struct smap_psock *psock,
+			    struct sock *sk,
+			    struct bpf_prog *tx_msg)
+{
+	struct bpf_prog *orig_tx_msg;
+
+	orig_tx_msg = xchg(&psock->bpf_tx_msg, tx_msg);
+	if (orig_tx_msg)
+		bpf_prog_put(orig_tx_msg);
+}
+
 static int bpf_tcp_ulp_register(void)
 {
 	tcp_bpf_proto = tcp_prot;
 	tcp_bpf_proto.close = bpf_tcp_close;
+	/* Once BPF TX ULP is registered it is never unregistered. It
+	 * will be in the ULP list for the lifetime of the system. Doing
+	 * duplicate registers is not a problem.
+	 */
 	return tcp_register_ulp(&bpf_tcp_ulp_ops);
 }
 
@@ -412,7 +876,6 @@  static int smap_parse_func_strparser(struct strparser *strp,
 	return rc;
 }
 
-
 static int smap_read_sock_done(struct strparser *strp, int err)
 {
 	return err;
@@ -482,6 +945,8 @@  static void smap_gc_work(struct work_struct *w)
 		bpf_prog_put(psock->bpf_parse);
 	if (psock->bpf_verdict)
 		bpf_prog_put(psock->bpf_verdict);
+	if (psock->bpf_tx_msg)
+		bpf_prog_put(psock->bpf_tx_msg);
 
 	list_for_each_entry_safe(e, tmp, &psock->maps, list) {
 		list_del(&e->list);
@@ -668,8 +1133,6 @@  static int sock_map_delete_elem(struct bpf_map *map, void *key)
 	if (!psock)
 		goto out;
 
-	if (psock->bpf_parse)
-		smap_stop_sock(psock, sock);
 	smap_list_remove(psock, &stab->sock_map[k]);
 	smap_release_sock(psock, sock);
 out:
@@ -711,10 +1174,11 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 {
 	struct bpf_stab *stab = container_of(map, struct bpf_stab, map);
 	struct smap_psock_map_entry *e = NULL;
-	struct bpf_prog *verdict, *parse;
+	struct bpf_prog *verdict, *parse, *tx_msg;
 	struct sock *osock, *sock;
 	struct smap_psock *psock;
 	u32 i = *(u32 *)key;
+	bool new = false;
 	int err;
 
 	if (unlikely(flags > BPF_EXIST))
@@ -737,6 +1201,7 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 	 */
 	verdict = READ_ONCE(stab->bpf_verdict);
 	parse = READ_ONCE(stab->bpf_parse);
+	tx_msg = READ_ONCE(stab->bpf_tx_msg);
 
 	if (parse && verdict) {
 		/* bpf prog refcnt may be zero if a concurrent attach operation
@@ -755,6 +1220,17 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 		}
 	}
 
+	if (tx_msg) {
+		tx_msg = bpf_prog_inc_not_zero(stab->bpf_tx_msg);
+		if (IS_ERR(tx_msg)) {
+			if (verdict)
+				bpf_prog_put(verdict);
+			if (parse)
+				bpf_prog_put(parse);
+			return PTR_ERR(tx_msg);
+		}
+	}
+
 	write_lock_bh(&sock->sk_callback_lock);
 	psock = smap_psock_sk(sock);
 
@@ -769,7 +1245,14 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 			err = -EBUSY;
 			goto out_progs;
 		}
-		refcount_inc(&psock->refcnt);
+		if (READ_ONCE(psock->bpf_tx_msg) && tx_msg) {
+			err = -EBUSY;
+			goto out_progs;
+		}
+		if (!refcount_inc_not_zero(&psock->refcnt)) {
+			err = -EAGAIN;
+			goto out_progs;
+		}
 	} else {
 		psock = smap_init_psock(sock, stab);
 		if (IS_ERR(psock)) {
@@ -777,11 +1260,8 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 			goto out_progs;
 		}
 
-		err = tcp_set_ulp_id(sock, TCP_ULP_BPF);
-		if (err)
-			goto out_progs;
-
 		set_bit(SMAP_TX_RUNNING, &psock->state);
+		new = true;
 	}
 
 	e = kzalloc(sizeof(*e), GFP_ATOMIC | __GFP_NOWARN);
@@ -794,6 +1274,14 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 	/* 3. At this point we have a reference to a valid psock that is
 	 * running. Attach any BPF programs needed.
 	 */
+	if (tx_msg)
+		bpf_tcp_msg_add(psock, sock, tx_msg);
+	if (new) {
+		err = tcp_set_ulp_id(sock, TCP_ULP_BPF);
+		if (err)
+			goto out_free;
+	}
+
 	if (parse && verdict && !psock->strp_enabled) {
 		err = smap_init_sock(psock, sock);
 		if (err)
@@ -815,8 +1303,6 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 		struct smap_psock *opsock = smap_psock_sk(osock);
 
 		write_lock_bh(&osock->sk_callback_lock);
-		if (osock != sock && parse)
-			smap_stop_sock(opsock, osock);
 		smap_list_remove(opsock, &stab->sock_map[i]);
 		smap_release_sock(opsock, osock);
 		write_unlock_bh(&osock->sk_callback_lock);
@@ -829,6 +1315,8 @@  static int sock_map_ctx_update_elem(struct bpf_sock_ops_kern *skops,
 		bpf_prog_put(verdict);
 	if (parse)
 		bpf_prog_put(parse);
+	if (tx_msg)
+		bpf_prog_put(tx_msg);
 	write_unlock_bh(&sock->sk_callback_lock);
 	kfree(e);
 	return err;
@@ -843,6 +1331,9 @@  int sock_map_prog(struct bpf_map *map, struct bpf_prog *prog, u32 type)
 		return -EINVAL;
 
 	switch (type) {
+	case BPF_SK_MSG_VERDICT:
+		orig = xchg(&stab->bpf_tx_msg, prog);
+		break;
 	case BPF_SK_SKB_STREAM_PARSER:
 		orig = xchg(&stab->bpf_parse, prog);
 		break;
@@ -904,6 +1395,10 @@  static void sock_map_release(struct bpf_map *map, struct file *map_file)
 	orig = xchg(&stab->bpf_verdict, NULL);
 	if (orig)
 		bpf_prog_put(orig);
+
+	orig = xchg(&stab->bpf_tx_msg, NULL);
+	if (orig)
+		bpf_prog_put(orig);
 }
 
 const struct bpf_map_ops sock_map_ops = {
diff --git a/kernel/bpf/syscall.c b/kernel/bpf/syscall.c
index e24aa32..3aeb4ea 100644
--- a/kernel/bpf/syscall.c
+++ b/kernel/bpf/syscall.c
@@ -1315,7 +1315,8 @@  static int bpf_obj_get(const union bpf_attr *attr)
 
 #define BPF_PROG_ATTACH_LAST_FIELD attach_flags
 
-static int sockmap_get_from_fd(const union bpf_attr *attr, bool attach)
+static int sockmap_get_from_fd(const union bpf_attr *attr,
+			       int type, bool attach)
 {
 	struct bpf_prog *prog = NULL;
 	int ufd = attr->target_fd;
@@ -1329,8 +1330,7 @@  static int sockmap_get_from_fd(const union bpf_attr *attr, bool attach)
 		return PTR_ERR(map);
 
 	if (attach) {
-		prog = bpf_prog_get_type(attr->attach_bpf_fd,
-					 BPF_PROG_TYPE_SK_SKB);
+		prog = bpf_prog_get_type(attr->attach_bpf_fd, type);
 		if (IS_ERR(prog)) {
 			fdput(f);
 			return PTR_ERR(prog);
@@ -1382,9 +1382,11 @@  static int bpf_prog_attach(const union bpf_attr *attr)
 	case BPF_CGROUP_DEVICE:
 		ptype = BPF_PROG_TYPE_CGROUP_DEVICE;
 		break;
+	case BPF_SK_MSG_VERDICT:
+		return sockmap_get_from_fd(attr, BPF_PROG_TYPE_SK_MSG, true);
 	case BPF_SK_SKB_STREAM_PARSER:
 	case BPF_SK_SKB_STREAM_VERDICT:
-		return sockmap_get_from_fd(attr, true);
+		return sockmap_get_from_fd(attr, BPF_PROG_TYPE_SK_SKB, true);
 	default:
 		return -EINVAL;
 	}
@@ -1437,9 +1439,11 @@  static int bpf_prog_detach(const union bpf_attr *attr)
 	case BPF_CGROUP_DEVICE:
 		ptype = BPF_PROG_TYPE_CGROUP_DEVICE;
 		break;
+	case BPF_SK_MSG_VERDICT:
+		return sockmap_get_from_fd(attr, BPF_PROG_TYPE_SK_MSG, false);
 	case BPF_SK_SKB_STREAM_PARSER:
 	case BPF_SK_SKB_STREAM_VERDICT:
-		return sockmap_get_from_fd(attr, false);
+		return sockmap_get_from_fd(attr, BPF_PROG_TYPE_SK_SKB, false);
 	default:
 		return -EINVAL;
 	}
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 3c74b16..3d14059 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -1248,6 +1248,7 @@  static bool may_access_direct_pkt_data(struct bpf_verifier_env *env,
 	case BPF_PROG_TYPE_XDP:
 	case BPF_PROG_TYPE_LWT_XMIT:
 	case BPF_PROG_TYPE_SK_SKB:
+	case BPF_PROG_TYPE_SK_MSG:
 		if (meta)
 			return meta->pkt_access;
 
@@ -2062,7 +2063,8 @@  static int check_map_func_compatibility(struct bpf_verifier_env *env,
 	case BPF_MAP_TYPE_SOCKMAP:
 		if (func_id != BPF_FUNC_sk_redirect_map &&
 		    func_id != BPF_FUNC_sock_map_update &&
-		    func_id != BPF_FUNC_map_delete_elem)
+		    func_id != BPF_FUNC_map_delete_elem &&
+		    func_id != BPF_FUNC_msg_redirect_map)
 			goto error;
 		break;
 	default:
@@ -2100,6 +2102,7 @@  static int check_map_func_compatibility(struct bpf_verifier_env *env,
 			goto error;
 		break;
 	case BPF_FUNC_sk_redirect_map:
+	case BPF_FUNC_msg_redirect_map:
 		if (map->map_type != BPF_MAP_TYPE_SOCKMAP)
 			goto error;
 		break;
diff --git a/net/core/filter.c b/net/core/filter.c
index 33edfa8..314c311 100644
--- a/net/core/filter.c
+++ b/net/core/filter.c
@@ -1890,6 +1890,44 @@  struct sock *do_sk_redirect_map(struct sk_buff *skb)
 	.arg4_type      = ARG_ANYTHING,
 };
 
+BPF_CALL_4(bpf_msg_redirect_map, struct sk_msg_buff *, msg,
+	   struct bpf_map *, map, u32, key, u64, flags)
+{
+	/* If user passes invalid input drop the packet. */
+	if (unlikely(flags))
+		return SK_DROP;
+
+	msg->key = key;
+	msg->flags = flags;
+	msg->map = map;
+
+	return SK_PASS;
+}
+
+struct sock *do_msg_redirect_map(struct sk_msg_buff *msg)
+{
+	struct sock *sk = NULL;
+
+	if (msg->map) {
+		sk = __sock_map_lookup_elem(msg->map, msg->key);
+
+		msg->key = 0;
+		msg->map = NULL;
+	}
+
+	return sk;
+}
+
+static const struct bpf_func_proto bpf_msg_redirect_map_proto = {
+	.func           = bpf_msg_redirect_map,
+	.gpl_only       = false,
+	.ret_type       = RET_INTEGER,
+	.arg1_type	= ARG_PTR_TO_CTX,
+	.arg2_type      = ARG_CONST_MAP_PTR,
+	.arg3_type      = ARG_ANYTHING,
+	.arg4_type      = ARG_ANYTHING,
+};
+
 BPF_CALL_1(bpf_get_cgroup_classid, const struct sk_buff *, skb)
 {
 	return task_get_classid(skb);
@@ -3591,6 +3629,16 @@  static unsigned long bpf_xdp_copy(void *dst_buff, const void *src_buff,
 	}
 }
 
+static const struct bpf_func_proto *sk_msg_func_proto(enum bpf_func_id func_id)
+{
+	switch (func_id) {
+	case BPF_FUNC_msg_redirect_map:
+		return &bpf_msg_redirect_map_proto;
+	default:
+		return bpf_base_func_proto(func_id);
+	}
+}
+
 static const struct bpf_func_proto *sk_skb_func_proto(enum bpf_func_id func_id)
 {
 	switch (func_id) {
@@ -3980,6 +4028,32 @@  static bool sk_skb_is_valid_access(int off, int size,
 	return bpf_skb_is_valid_access(off, size, type, info);
 }
 
+static bool sk_msg_is_valid_access(int off, int size,
+				   enum bpf_access_type type,
+				   struct bpf_insn_access_aux *info)
+{
+	if (type == BPF_WRITE)
+		return false;
+
+	switch (off) {
+	case offsetof(struct sk_msg_md, data):
+		info->reg_type = PTR_TO_PACKET;
+		break;
+	case offsetof(struct sk_msg_md, data_end):
+		info->reg_type = PTR_TO_PACKET_END;
+		break;
+	}
+
+	if (off < 0 || off >= sizeof(struct sk_msg_md))
+		return false;
+	if (off % size != 0)
+		return false;
+	if (size != sizeof(__u32))
+		return false;
+
+	return true;
+}
+
 static u32 bpf_convert_ctx_access(enum bpf_access_type type,
 				  const struct bpf_insn *si,
 				  struct bpf_insn *insn_buf,
@@ -4778,6 +4852,29 @@  static u32 sk_skb_convert_ctx_access(enum bpf_access_type type,
 	return insn - insn_buf;
 }
 
+static u32 sk_msg_convert_ctx_access(enum bpf_access_type type,
+				     const struct bpf_insn *si,
+				     struct bpf_insn *insn_buf,
+				     struct bpf_prog *prog, u32 *target_size)
+{
+	struct bpf_insn *insn = insn_buf;
+
+	switch (si->off) {
+	case offsetof(struct sk_msg_md, data):
+		*insn++ = BPF_LDX_MEM(BPF_FIELD_SIZEOF(struct sk_msg_buff, data),
+				      si->dst_reg, si->src_reg,
+				      offsetof(struct sk_msg_buff, data));
+		break;
+	case offsetof(struct sk_msg_md, data_end):
+		*insn++ = BPF_LDX_MEM(BPF_FIELD_SIZEOF(struct sk_msg_buff, data_end),
+				      si->dst_reg, si->src_reg,
+				      offsetof(struct sk_msg_buff, data_end));
+		break;
+	}
+
+	return insn - insn_buf;
+}
+
 const struct bpf_verifier_ops sk_filter_verifier_ops = {
 	.get_func_proto		= sk_filter_func_proto,
 	.is_valid_access	= sk_filter_is_valid_access,
@@ -4868,6 +4965,15 @@  static u32 sk_skb_convert_ctx_access(enum bpf_access_type type,
 const struct bpf_prog_ops sk_skb_prog_ops = {
 };
 
+const struct bpf_verifier_ops sk_msg_verifier_ops = {
+	.get_func_proto		= sk_msg_func_proto,
+	.is_valid_access	= sk_msg_is_valid_access,
+	.convert_ctx_access	= sk_msg_convert_ctx_access,
+};
+
+const struct bpf_prog_ops sk_msg_prog_ops = {
+};
+
 int sk_detach_filter(struct sock *sk)
 {
 	int ret = -ENOENT;