Patchwork TX_RING and packet mmap

login
register
mail settings
Submitter Johann Baudy
Date April 2, 2009, 7:48 p.m.
Message ID <1238701718.5669.26.camel@bender>
Download mbox | patch
Permalink /patch/25538/
State Changes Requested
Delegated to: David Miller
Headers show

Comments

Johann Baudy - April 2, 2009, 7:48 p.m.
From: Johann Baudy <johann.baudy@gnu-log.net>

New packet socket feature that makes packet socket more efficient for transmission.
- It reduces number of system call through a PACKET_TX_RING mechanism, based on PACKET_RX_RING (Circular buffer allocated in kernel space which is mmapped from user space).
- It minimizes CPU copy using fragmented SKB (almost zero copy).

Signed-off-by: Johann Baudy <johann.baudy@gnu-log.net>

--

As I start to receive few mails from people that are now using this feature, I would like to discuss with you if it is possible to integrate this patch in the kernel under CONFIG_PACKET_MMAP feature flag.
More details (example of using, patch 2.4,2.6 ...) can be found at:  http://wiki.ipxwarzone.com/index.php5?title=Linux_packet_mmap
This feature is based on PACKET_RX_RING mechanism. I've implemented a parallel mechanism for TX that can send multiple packets to device in one system call thanks to mmapped memory.
It uses skb destructor and mmapped memory (status flag) to indicate user that packet has been sent and buffer is now free for new sending. 
It also adds some abstraction functions/structures to prevent code redundancy.

 Documentation/networking/packet_mmap.txt |  139 +++++++-
 include/linux/if_packet.h                |    2 +
 net/packet/af_packet.c                   |  561 ++++++++++++++++++++++++------
 3 files changed, 579 insertions(+), 123 deletions(-)



--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Herbert Xu - April 7, 2009, 7:26 a.m.
On Thu, Apr 02, 2009 at 07:48:38PM +0000, Johann Baudy wrote:
>
> +static void tpacket_destruct_skb(struct sk_buff *skb)
> +{
> +	struct packet_sock *po = pkt_sk(skb->sk);
> +	void * ph;
> +	
> +	BUG_ON(skb == NULL);
> +	
> +	if (likely(po->tx_ring.pg_vec)) {
> +		ph = packet_lookup_frame(po, &po->tx_ring, skb->mark, TP_STATUS_COPY);

What if something modifies skb->mark after it's sent? For now the
only thing I can think of is a tc netfilter action.  So you could
also solve the problem by removing that feature since IMHO it is
pretty dodgy :)

Cheers,
jamal - April 7, 2009, 12:48 p.m.
On Tue, 2009-04-07 at 15:26 +0800, Herbert Xu wrote:

> What if something modifies skb->mark after it's sent? 
> For now the only thing I can think of is a tc netfilter action.  So you could
> also solve the problem by removing that feature since IMHO it is
> pretty dodgy :)

I see it as being non-trivial even with tc/mark. What did you
have in mind? 
If it was possible to have a different skb->cookie (other than
skb->mark, sort of "global cb") which the sender sets and 
the kernel never touches then this would be easy. The skb->destructor
can then be sure that it was the original skb in the lookup. Such a
field could serve other purposes like notify user space that the packet
has been really sent out (eg in the case of tun). But that would require
a new field on the skb.

[For a fleeting latte-moment there i thought i had a clever idea: I
wanted to say that the numeric (32/64 bit) allocated address of the skb
may be useful for the cookie but i think that could change in the skb's
kernel lifetime (skb_expand etc).]

cheers,
jamal

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Herbert Xu - April 7, 2009, 1:04 p.m.
On Tue, Apr 07, 2009 at 08:48:40AM -0400, jamal wrote:
>
> I see it as being non-trivial even with tc/mark. What did you
> have in mind? 

Well through tc ipt actions you can invoke arbitrary netfilter
targets, including MARK --set-mark.

In fact I've never really liked tc ipt actions since it doesn't
do any sanity checking on the packet before invoking the target
while normally netfitler does quite a bit of sanity checking.

Cheers,
jamal - April 7, 2009, 1:47 p.m.
On Tue, 2009-04-07 at 21:04 +0800, Herbert Xu wrote:

> Well through tc ipt actions you can invoke arbitrary netfilter
> targets, including MARK --set-mark.

I thought you were saying this issue can be solved by 
tc/mark but you meant tc/mark is one culprit;->
Solvable by making sure tc/mark is not used in the system
you are running an app using this feature.

But the superset of what Johann wants seems to be a common 
issue: identifying _precisely_ when the skb sent out is done 
with - in his case, so he can use some field to lookup a table.
Other times i have seen people ask for it is for tun/tap 
notification in user space when a packet sent actually went out.

> In fact I've never really liked tc ipt actions since it doesn't
> do any sanity checking on the packet before invoking the target
> while normally netfitler does quite a bit of sanity checking.

more could be done in user space if you really need it.

cheers,
jamal

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Herbert Xu - April 7, 2009, 2:17 p.m.
On Tue, Apr 07, 2009 at 09:47:47AM -0400, jamal wrote:
> On Tue, 2009-04-07 at 21:04 +0800, Herbert Xu wrote:
> 
> > Well through tc ipt actions you can invoke arbitrary netfilter
> > targets, including MARK --set-mark.
> 
> I thought you were saying this issue can be solved by 
> tc/mark but you meant tc/mark is one culprit;->
> Solvable by making sure tc/mark is not used in the system
> you are running an app using this feature.

Actually it's not just the tc ipt action.  If the target device
is a bridge then that too can invoke netfilter.  On the other hand
this is yet another feature that I've never really liked :)

Cheers,
Johann Baudy - April 7, 2009, 2:40 p.m.
Hi All,
Many thanks for your replies!

Indeed, I've tried to find a solution to store data in skb (valid
until destructor).

I've chosen this solution according to previous email on this subject:
http://lists.openwall.net/netdev/2008/11/11/93

I thought skb->mark was not used anymore at this level.
So I've used it to forward buffer index.

Do you think this solution is not acceptable?

Thanks,
Johann
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
jamal - April 7, 2009, 8:56 p.m.
On Tue, 2009-04-07 at 16:38 +0200, Johann Baudy wrote:
> Hi All, 
> Many thanks for your replies!
> 
> Indeed, I've tried to find a solution to store data in skb (valid
> until destructor).

Makes sense for what you are trying to do (or someone else a long
while back who wanted to notify user space of a sent skb).
Any skb metadata can mutate along its path. Actually even
if you used a field off the skb->data it too could be changed 
somewhere along the path before destructor is invoked.
There maybe a "hard way" to achieve your goal: use the address
of the skb to derive your index; i am not 100% sure if your destructor
will always be called (check skb_expand() for example).

cheers,
jamal

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Evgeniy Polyakov - April 7, 2009, 9:11 p.m.
On Tue, Apr 07, 2009 at 04:56:40PM -0400, jamal (hadi@cyberus.ca) wrote:
> Makes sense for what you are trying to do (or someone else a long
> while back who wanted to notify user space of a sent skb).
> Any skb metadata can mutate along its path. Actually even
> if you used a field off the skb->data it too could be changed 
> somewhere along the path before destructor is invoked.
> There maybe a "hard way" to achieve your goal: use the address
> of the skb to derive your index; i am not 100% sure if your destructor
> will always be called (check skb_expand() for example).

It should, I actually do not see any sending path which does not invoke
original skb destructor with the new data. It does not change the fact
though, that effectively any other skb field can be modified during skb
lifecycle no matter at which level it was allocated.

Having a data pointer as an index could work though, especially it looks
promising for fragments placed in own pages.
Herbert Xu - April 8, 2009, 6:51 a.m.
On Tue, Apr 07, 2009 at 04:38:01PM +0200, Johann Baudy wrote:
>
> Do you think this solution is not acceptable?

Not unless you can figure out a way to prevent netfilter from
being run on your packet.  The two hurdles you have to overcome
are act_ipt and bridge netfilter.

Cheers,
Johann Baudy - April 8, 2009, 9:06 p.m.
When you said "skb address", did you mean skb pointer value or
fragments data address?

If fragment data address, I can use first fragment page address to
deduce header address (in destructor)
As there is a constant offset between header address and data address.

However I'm afraid of issues once it is linearized.
Is there a sending path that will lose fragments info?

Best regards,
Johann
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Johann Baudy - April 12, 2009, 10:27 a.m.
Hi Evgeniy,

> It should, I actually do not see any sending path which does not invoke
> original skb destructor with the new data. It does not change the fact
> though, that effectively any other skb field can be modified during skb
> lifecycle no matter at which level it was allocated.
> Having a data pointer as an index could work though, especially it looks
> promising for fragments placed in own pages.

Do you mean that skb_shinfo(skb)->frags[i].page will be valid until
destructor? even if linearized, cloned ....?

Thanks in advance for your help,
Johann
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Evgeniy Polyakov - April 12, 2009, 10:32 a.m.
Hi Johann.
Sorry for long reply.

On Sun, Apr 12, 2009 at 12:27:04PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> Do you mean that skb_shinfo(skb)->frags[i].page will be valid until
> destructor? even if linearized, cloned ....?

Page itself will live until destruction time, when skb is linearized
page is freed, so your own destructor will find that given page is no
longer used, and appropriate memory area can be overwritten.

When skb is cloned, page is untouched, only shared info reference
counter is increased.
Johann Baudy - April 12, 2009, 11:23 a.m.
Thanks Evgeniy,

> Page itself will live until destruction time, when skb is linearized
> page is freed, so your own destructor will find that given page is no
> longer used, and appropriate memory area can be overwritten.
Ok, so if my understanding is correct, fragment data is copied, frags
info (page, offset, size ...) of  skb_shinfo(skb) remain available and
skb->data_len is equal to 0 (to tag it as linear).
Then I can use address of skb_shinfo(n)->frags[0].page to derive my
header whatever sending path.
Is it correct?

Best regards,
Johann
Evgeniy Polyakov - April 12, 2009, 2:24 p.m.
On Sun, Apr 12, 2009 at 01:23:30PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> > Page itself will live until destruction time, when skb is linearized
> > page is freed, so your own destructor will find that given page is no
> > longer used, and appropriate memory area can be overwritten.
> Ok, so if my understanding is correct, fragment data is copied, frags
> info (page, offset, size ...) of  skb_shinfo(skb) remain available and
> skb->data_len is equal to 0 (to tag it as linear).
> Then I can use address of skb_shinfo(n)->frags[0].page to derive my
> header whatever sending path.

No, I was wrong, linearization may silently put pages without calling a
destructor.
Johann Baudy - April 12, 2009, 7:27 p.m.
I've seen that during skb_linearize(), skb_shinfo(skb)->frags[i].page
is not lost even if pages has been put. So I can get back original
frag address from this info in skb destructor. Can I rely on
skb_shinfo(skb)->frags[i].page on more complicated sending paths ... ?

Thanks in advance,
Johann





On Sun, Apr 12, 2009 at 4:24 PM, Evgeniy Polyakov <zbr@ioremap.net> wrote:
> On Sun, Apr 12, 2009 at 01:23:30PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
>> > Page itself will live until destruction time, when skb is linearized
>> > page is freed, so your own destructor will find that given page is no
>> > longer used, and appropriate memory area can be overwritten.
>> Ok, so if my understanding is correct, fragment data is copied, frags
>> info (page, offset, size ...) of  skb_shinfo(skb) remain available and
>> skb->data_len is equal to 0 (to tag it as linear).
>> Then I can use address of skb_shinfo(n)->frags[0].page to derive my
>> header whatever sending path.
>
> No, I was wrong, linearization may silently put pages without calling a
> destructor.
>
> --
>        Evgeniy Polyakov
>
Evgeniy Polyakov - April 12, 2009, 7:52 p.m.
On Sun, Apr 12, 2009 at 09:27:37PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> I've seen that during skb_linearize(), skb_shinfo(skb)->frags[i].page
> is not lost even if pages has been put. So I can get back original
> frag address from this info in skb destructor. Can I rely on
> skb_shinfo(skb)->frags[i].page on more complicated sending paths ... ?

skb_shinfo(skb)->frags[i] can be overwritten, for example in
__pskb_pull_tail() at pull_pages: label.
Johann Baudy - April 12, 2009, 8:30 p.m.
Where do you see that skb_shinfo(skb)->frags[i].page (not other
fields) can be overwritten ?

Thanks,
Johann

On Sun, Apr 12, 2009 at 9:52 PM, Evgeniy Polyakov <zbr@ioremap.net> wrote:
> On Sun, Apr 12, 2009 at 09:27:37PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
>> I've seen that during skb_linearize(), skb_shinfo(skb)->frags[i].page
>> is not lost even if pages has been put. So I can get back original
>> frag address from this info in skb destructor. Can I rely on
>> skb_shinfo(skb)->frags[i].page on more complicated sending paths ... ?
>
> skb_shinfo(skb)->frags[i] can be overwritten, for example in
> __pskb_pull_tail() at pull_pages: label.
>
> --
>        Evgeniy Polyakov
>
Evgeniy Polyakov - April 12, 2009, 8:53 p.m.
On Sun, Apr 12, 2009 at 10:30:34PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> Where do you see that skb_shinfo(skb)->frags[i].page (not other
> fields) can be overwritten ?

pull_pages:
	eat = delta;
	k = 0;
	for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
		if (skb_shinfo(skb)->frags[i].size <= eat) {
			put_page(skb_shinfo(skb)->frags[i].page);
			eat -= skb_shinfo(skb)->frags[i].size;
		} else {
			skb_shinfo(skb)->frags[k] = skb_shinfo(skb)->frags[i];
Johann Baudy - April 12, 2009, 11:31 p.m.
Thanks Evgeniy,

So if my understanding is correct, there is no way to get original
fragment address in destructor using skb fragment page/data.

Then,
I can't use fragments due to skb_linearize
I can't hide pointer into skb data due to skb_copy.
I can't rely on other fields of skb.
IMHO, using skb pointer requires too much cpu resources (parsing
headers to identify the right buffer...)

So what can I do except using a new field?
What do you think about adding a new field that is always linked to destructor?
I mean adding a generic new field skb->destructor_arg.

Currently if someone want to change destructor, it stores old
destructor before substitution; and executes it at the end of new
destructor. (ex: dev_gso_skb_destructor(struct sk_buff *skb))

Can we just add same mechanism for a new argument?
If someone needs destructor_arg, it saves the old value somewhere; and
restores it before calling old destructor (in the new destructor).
This way everybody can forward data to his destructor properly.

 Is it conceivable?

Thanks for your help,
Johann
Evgeniy Polyakov - April 15, 2009, 7:10 a.m.
Hi Johann.

On Mon, Apr 13, 2009 at 01:31:14AM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> So if my understanding is correct, there is no way to get original
> fragment address in destructor using skb fragment page/data.
> 
> Then,
> I can't use fragments due to skb_linearize
> I can't hide pointer into skb data due to skb_copy.
> I can't rely on other fields of skb.
> IMHO, using skb pointer requires too much cpu resources (parsing
> headers to identify the right buffer...)
> 
> So what can I do except using a new field?
> What do you think about adding a new field that is always linked to destructor?
> I mean adding a generic new field skb->destructor_arg.

It can work but do not place it into skb itself, but into shared area,
which is slab allocated and does not suffer from size restrictions.
You can also add shared-info destructor, which could be used by other
Johann Baudy - April 15, 2009, 1:14 p.m.
Thanks for your reply, Evgeniy,

> It can work but do not place it into skb itself, but into shared area,
> which is slab allocated and does not suffer from size restrictions.
Ok

> You can also add shared-info destructor, which could be used by other
Kind of:
struct skb_shared_info {
         atomic_t        dataref;
....
+void                    (*destructor)(struct sk_buff *skb, void *
destructor_arg);
+void * destructor_arg;
};

Should I call this new destructor from skb_release_data() before
kfree(skb->head) ?
or in  skb_release_head_state()? close to current destructor call

Best regards,
Johann
Evgeniy Polyakov - April 16, 2009, 11:16 a.m.
On Wed, Apr 15, 2009 at 03:14:22PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> +void                    (*destructor)(struct sk_buff *skb, void *
> destructor_arg);
> +void * destructor_arg;
> };
> 
> Should I call this new destructor from skb_release_data() before
> kfree(skb->head) ?
> or in  skb_release_head_state()? close to current destructor call

I think it should only be called when shared area is about to be freed
and not when attached skb is freed. Thus you can put your pages into
fraglist and reuse them after the last reference to the shared area is
gone.
Johann Baudy - April 18, 2009, 9:38 p.m.
> I think it should only be called when shared area is about to be freed
> and not when attached skb is freed.
During skb_linearize(). shared info is memcopied and freed in
pskb_expand_head() if not enough memory is available in current head.
Then, Do you expect this new destructor to be called twice? (during
skb_linearize() and kfree_skb())

>Thus you can put your pages into
> fraglist and reuse them after the last reference to the shared area is
> gone.

If my understanding is correct, skb can be linearized without calling
skb_release_data() (if enough space is available in head). Hence, In
this case, I will not have access to original frags[] from
skb_shared_info destructor.

Thanks again,
Johann
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Evgeniy Polyakov - April 21, 2009, 9:41 a.m.
Hi Johann.

On Sat, Apr 18, 2009 at 11:38:48PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> > I think it should only be called when shared area is about to be freed
> > and not when attached skb is freed.
> During skb_linearize(). shared info is memcopied and freed in
> pskb_expand_head() if not enough memory is available in current head.
> Then, Do you expect this new destructor to be called twice? (during
> skb_linearize() and kfree_skb())

It cals skb_release_data() which should invoke shared area destructor
before being freed.

> >Thus you can put your pages into
> > fraglist and reuse them after the last reference to the shared area is
> > gone.
> 
> If my understanding is correct, skb can be linearized without calling
> skb_release_data() (if enough space is available in head). Hence, In
> this case, I will not have access to original frags[] from
> skb_shared_info destructor.

Yes, that's right.
Johann Baudy - April 21, 2009, 1:16 p.m.
Hi Evgeniy,
> It cals skb_release_data() which should invoke shared area destructor
> before being freed.

Yes but in this case, it will be called twice, as shared info (that
contains destructor) is memcopied to new expanded head.
shared info destructor will be excuted two times on "same" data
(during skb_expand_head() and during kfree_skb())
Actually, I want to know if this behavior is intended with a new
shared info destructor.

Thanks
Evgeniy Polyakov - April 21, 2009, 1:56 p.m.
On Tue, Apr 21, 2009 at 03:16:49PM +0200, Johann Baudy (johann.baudy@gnu-log.net) wrote:
> Yes but in this case, it will be called twice, as shared info (that
> contains destructor) is memcopied to new expanded head.
> shared info destructor will be excuted two times on "same" data
> (during skb_expand_head() and during kfree_skb())
> Actually, I want to know if this behavior is intended with a new
> shared info destructor.

It depends on your task... You can always store a pointer in the
tree/hash and check it in the destructor, you can play some games in the
expand helper not to call destructor: like messing with the reference
counter and reusing the old area, or silently freeing old area without
destruction invocation (presumably with some new helper).

Moreover you can allocate skb so that no matter what but it could not be
reused by the underlying layers, so it could be fully copied. In this
case usual destructor is enough.
Christoph Lameter - April 21, 2009, 3:36 p.m.
On Thu, 2 Apr 2009, Johann Baudy wrote:

> +++ Transmission process
> +Those defines are also used for transmission:
> +
> +     #define TP_STATUS_KERNEL        0 // Frame is available
> +     #define TP_STATUS_USER          1 // Frame will be sent on next send()
> +     #define TP_STATUS_COPY          2 // Frame is currently in transmission
> +
> +First, the kernel initializes all frames to TP_STATUS_KERNEL. To send a packet,
> +the user fills a data buffer of an available frame, sets tp_len to current
> +data buffer size and sets its status field to TP_STATUS_USER. This can be done
> +on multiple frames. Once the user is ready to transmit, it calls send().
> +Then all buffers with status equal to TP_STATUS_USER are forwarded to the
> +network device. The kernel updates each status of sent frames with
> +TP_STATUS_COPY until the end of transfer.
> +At the end of each transfer, buffer status returns to TP_STATUS_KERNEL.

Could you clean then states up a bit to reflect what they actually mean?

TP_STATUS_AVAILABLE		=> Frame is available
TP_STATUS_SEND_REQUEST		=> Frame waits for sending
TP_STATUS_SENDING		=> Frame is being sent.


Also can you ensure that send() continues to send if I concurrently set
the status to TP_STATUS_SEND_REQUEST from another thread? How it is
serialized anyways? Status is an atomic value? Or do you rely on status
only being modified while send() is running?


--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Johann Baudy - April 21, 2009, 8:13 p.m.
Hi Christoph,

> Could you clean then states up a bit to reflect what they actually mean?
>
> TP_STATUS_AVAILABLE             => Frame is available
> TP_STATUS_SEND_REQUEST          => Frame waits for sending
> TP_STATUS_SENDING               => Frame is being sent.

Ok

> Also can you ensure that send() continues to send if I concurrently set
> the status to TP_STATUS_SEND_REQUEST from another thread? How it is
> serialized anyways? Status is an atomic value? Or do you rely on status
> only being modified while send() is running?

TP_STATUS_KERNEL => TP_STATUS_SEND_REQUEST: only performed by user.
TP_STATUS_SEND_REQUEST => TP_STATUS_SENDING  only performed by kernel
TP_STATUS_SENDING => TP_STATUS_KERNEL  only performed by kernel.

Only one thread is allowed to change status values from user space.
This way, you can take advantage of smp. One thread is filling the
buffer changing status  from TP_STATUS_KERNEL to
TP_STATUS_SEND_REQUEST , another is calling send() in loop
(MSG_DONTBLOCK option can be used).

You can also perform filling and send() sequentially

An example can be found at:
http://wiki.gnu-log.net/index.php5?title=Linux_packet_mmap
It can customize almost all parameters, use multi_thread, use DGRAM ...

Thanks,
Johann
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Lameter - April 21, 2009, 8:42 p.m.
On Tue, 21 Apr 2009, Johann Baudy wrote:

> > Also can you ensure that send() continues to send if I concurrently set
> > the status to TP_STATUS_SEND_REQUEST from another thread? How it is
> > serialized anyways? Status is an atomic value? Or do you rely on status
> > only being modified while send() is running?
>
> TP_STATUS_KERNEL => TP_STATUS_SEND_REQUEST: only performed by user.
> TP_STATUS_SEND_REQUEST => TP_STATUS_SENDING  only performed by kernel
> TP_STATUS_SENDING => TP_STATUS_KERNEL  only performed by kernel.
>
> Only one thread is allowed to change status values from user space.

Duh. So I cannot concurrently operate with multiple threads on the
structure.

The kernel synchronizes with itself via the socket?

> This way, you can take advantage of smp. One thread is filling the
> buffer changing status  from TP_STATUS_KERNEL to
> TP_STATUS_SEND_REQUEST , another is calling send() in loop
> (MSG_DONTBLOCK option can be used).

Ah ok.

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Lameter - April 21, 2009, 8:43 p.m.
On Tue, 21 Apr 2009, Johann Baudy wrote:

> >> Could you clean then states up a bit to reflect what they actually mean?
> >>
> >> TP_STATUS_AVAILABLE             => Frame is available
> >> TP_STATUS_SEND_REQUEST          => Frame waits for sending
> >> TP_STATUS_SENDING               => Frame is being sent.
> >
> > Ok
>
> Do you suggest to replace previous TP_STATUS_KERNEL with TP_STATUS_AVAILABLE?
> or Add a new status?

Well yes. But I guess you know best what to call them. Just make them
more decriptive of their function.
Johann Baudy - April 21, 2009, 8:46 p.m.
>
>> Could you clean then states up a bit to reflect what they actually mean?
>>
>> TP_STATUS_AVAILABLE             => Frame is available
>> TP_STATUS_SEND_REQUEST          => Frame waits for sending
>> TP_STATUS_SENDING               => Frame is being sent.
>
> Ok

Do you suggest to replace previous TP_STATUS_KERNEL with TP_STATUS_AVAILABLE?
or Add a new status?

Thanks in advance,
Johann
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Johann Baudy - April 21, 2009, 9 p.m.
> Duh. So I cannot concurrently operate with multiple threads on the
> structure.
Except if you can ensure that multiple threads will not use the same buffer.

> The kernel synchronizes with itself via the socket?
Not sure to understand?
Both below rules + cache/mem flushing synchronize User and Kernel spaces
TP_STATUS_KERNEL => TP_STATUS_SEND_REQUEST: only performed by user.
TP_STATUS_SENDING => TP_STATUS_KERNEL  only performed by kernel.
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Lameter - April 21, 2009, 9 p.m.
On Tue, 21 Apr 2009, Johann Baudy wrote:

> > Duh. So I cannot concurrently operate with multiple threads on the
> > structure.
> Except if you can ensure that multiple threads will not use the same buffer.

Hmmm... Ok.

> > The kernel synchronizes with itself via the socket?
> Not sure to understand?

Doesnt the kernel need some way to synchronize the two
transitions made by the kernel? From "send request" to "sending" and then
to "available"? Otherwise the kernel may set the request to "available"
before the change from "send request" to "sending" is complete.
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Johann Baudy - April 21, 2009, 9:05 p.m.
> Well yes. But I guess you know best what to call them. Just make them
> more decriptive of their function.

Yes, I understand, but I'm just afraid of backward compatibility with
libraries that are currently using packet_mmap for RX ring. If I
rename those TP_STATUS_KERNEL into TP_STATUS_AVAILABLE. I will break
this compatibility.
(for compilation).
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Lameter - April 21, 2009, 9:08 p.m.
On Tue, 21 Apr 2009, Johann Baudy wrote:

> > Well yes. But I guess you know best what to call them. Just make them
> > more decriptive of their function.
>
> Yes, I understand, but I'm just afraid of backward compatibility with
> libraries that are currently using packet_mmap for RX ring. If I
> rename those TP_STATUS_KERNEL into TP_STATUS_AVAILABLE. I will break
> this compatibility.
> (for compilation).

You can do

#define TP_STATUS_AVAILABLE TP_STATUS_KERNEL

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Johann Baudy - April 21, 2009, 9:13 p.m.
> Doesnt the kernel need some way to synchronize the two
> transitions made by the kernel? From "send request" to "sending" and then
> to "available"? Otherwise the kernel may set the request to "available"
> before the change from "send request" to "sending" is complete.

This synchronization is performed through skb mechanism.
"send request" to "sending" is done before submitting skb to device.
"sending" to "available" will be performed in skb destructor.
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Patch

diff --git a/Documentation/networking/packet_mmap.txt b/Documentation/networking/packet_mmap.txt
index 07c53d5..43e4a49 100644
--- a/Documentation/networking/packet_mmap.txt
+++ b/Documentation/networking/packet_mmap.txt
@@ -4,16 +4,18 @@ 
 
 This file documents the CONFIG_PACKET_MMAP option available with the PACKET
 socket interface on 2.4 and 2.6 kernels. This type of sockets is used for 
-capture network traffic with utilities like tcpdump or any other that uses 
-the libpcap library. 
-
-You can find the latest version of this document at
+capture network traffic with utilities like tcpdump or any other that needs
+raw access to network interface.
 
+You can find the latest version of this document at:
     http://pusa.uv.es/~ulisses/packet_mmap/
 
-Please send me your comments to
+Howto can be found at:
+    http://wiki.gnu-log.net (packet_mmap)
 
+Please send your comments to
     Ulisses Alonso Camaró <uaca@i.hate.spam.alumni.uv.es>
+    Johann Baudy <johann.baudy@gnu-log.net>
 
 -------------------------------------------------------------------------------
 + Why use PACKET_MMAP
@@ -25,19 +27,24 @@  to capture each packet, it requires two if you want to get packet's
 timestamp (like libpcap always does).
 
 In the other hand PACKET_MMAP is very efficient. PACKET_MMAP provides a size 
-configurable circular buffer mapped in user space. This way reading packets just 
-needs to wait for them, most of the time there is no need to issue a single 
-system call. By using a shared buffer between the kernel and the user 
-also has the benefit of minimizing packet copies.
-
-It's fine to use PACKET_MMAP to improve the performance of the capture process, 
-but it isn't everything. At least, if you are capturing at high speeds (this 
-is relative to the cpu speed), you should check if the device driver of your 
-network interface card supports some sort of interrupt load mitigation or 
-(even better) if it supports NAPI, also make sure it is enabled.
+configurable circular buffer mapped in user space that can be used to either
+send or receive packets. This way reading packets just needs to wait for them,
+most of the time there is no need to issue a single system call. Concerning
+transmission, multiple packets can be sent through one system call to get the
+highest bandwidth.
+By using a shared buffer between the kernel and the user also has the benefit
+of minimizing packet copies.
+
+It's fine to use PACKET_MMAP to improve the performance of the capture and
+transmission process, but it isn't everything. At least, if you are capturing
+at high speeds (this is relative to the cpu speed), you should check if the
+device driver of your network interface card supports some sort of interrupt
+load mitigation or (even better) if it supports NAPI, also make sure it is
+enabled. For transmission, check the MTU (Maximum Transmission Unit) used and
+supported by devices of your network.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP
++ How to use CONFIG_PACKET_MMAP to improve capture process
 --------------------------------------------------------------------------------
 
 From the user standpoint, you should use the higher level libpcap library, which
@@ -57,7 +64,7 @@  the low level details or want to improve libpcap by including PACKET_MMAP
 support.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP directly
++ How to use CONFIG_PACKET_MMAP directly to improve capture process
 --------------------------------------------------------------------------------
 
 From the system calls stand point, the use of PACKET_MMAP involves
@@ -66,6 +73,7 @@  the following process:
 
 [setup]     socket() -------> creation of the capture socket
             setsockopt() ---> allocation of the circular buffer (ring)
+                              option: PACKET_RX_RING
             mmap() ---------> mapping of the allocated buffer to the
                               user process
 
@@ -97,13 +105,75 @@  also the mapping of the circular buffer in the user process and
 the use of this buffer.
 
 --------------------------------------------------------------------------------
++ How to use CONFIG_PACKET_MMAP directly to improve transmission process
+--------------------------------------------------------------------------------
+Transmission process is similar to capture as shown below.
+
+[setup]          socket() -------> creation of the transmission socket
+                 setsockopt() ---> allocation of the circular buffer (ring)
+                                   option: PACKET_TX_RING
+                 bind() ---------> bind transmission socket with a network interface
+                 mmap() ---------> mapping of the allocated buffer to the
+                                   user process
+
+[transmission]   poll() ---------> wait for free packets (optional)
+                 send() ---------> send all packets that are set as ready in
+                                   the ring
+                                   The flag MSG_DONTWAIT can be used to return
+                                   before end of transfer.
+
+[shutdown]  close() --------> destruction of the transmission socket and
+                              deallocation of all associated resources.
+
+Binding the socket to your network interface is mandatory (with zero copy) to
+know the header size of frames used in the circular buffer.
+
+As capture, each frame contains two parts:
+
+ --------------------
+| struct tpacket_hdr | Header. It contains the status of
+|                    | of this frame
+|--------------------|
+| data buffer        |
+.                    .  Data that will be sent over the network interface.
+.                    .
+ --------------------
+
+ bind() associates the socket to your network interface thanks to
+ sll_ifindex parameter of struct sockaddr_ll.
+
+ Initialization example:
+
+ struct sockaddr_ll my_addr;
+ struct ifreq s_ifr;
+ ...
+
+ strncpy (s_ifr.ifr_name, "eth0", sizeof(s_ifr.ifr_name));
+
+ /* get interface index of eth0 */
+ ioctl(this->socket, SIOCGIFINDEX, &s_ifr);
+
+ /* fill sockaddr_ll struct to prepare binding */
+ my_addr.sll_family = AF_PACKET;
+ my_addr.sll_protocol = ETH_P_ALL;
+ my_addr.sll_ifindex =  s_ifr.ifr_ifindex;
+
+ /* bind socket to eth0 */
+ bind(this->socket, (struct sockaddr *)&my_addr, sizeof(struct sockaddr_ll));
+
+ A complete tutorial is available at: http://wiki.gnu-log.net/
+
+--------------------------------------------------------------------------------
 + PACKET_MMAP settings
 --------------------------------------------------------------------------------
 
 
 To setup PACKET_MMAP from user level code is done with a call like
 
+ - Capture process
      setsockopt(fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))
+ - Transmission process
+     setsockopt(fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req))
 
 The most significant argument in the previous call is the req parameter, 
 this parameter must to have the following structure:
@@ -117,11 +187,11 @@  this parameter must to have the following structure:
     };
 
 This structure is defined in /usr/include/linux/if_packet.h and establishes a 
-circular buffer (ring) of unswappable memory mapped in the capture process. 
+circular buffer (ring) of unswappable memory.
 Being mapped in the capture process allows reading the captured frames and 
 related meta-information like timestamps without requiring a system call.
 
-Captured frames are grouped in blocks. Each block is a physically contiguous 
+Frames are grouped in blocks. Each block is a physically contiguous
 region of memory and holds tp_block_size/tp_frame_size frames. The total number 
 of blocks is tp_block_nr. Note that tp_frame_nr is a redundant parameter because
 
@@ -336,6 +406,7 @@  struct tpacket_hdr). If this field is 0 means that the frame is ready
 to be used for the kernel, If not, there is a frame the user can read 
 and the following flags apply:
 
++++ Capture process:
      from include/linux/if_packet.h
 
      #define TP_STATUS_COPY          2 
@@ -391,6 +462,36 @@  packets are in the ring:
 It doesn't incur in a race condition to first check the status value and 
 then poll for frames.
 
+
+++ Transmission process
+Those defines are also used for transmission:
+
+     #define TP_STATUS_KERNEL        0 // Frame is available
+     #define TP_STATUS_USER          1 // Frame will be sent on next send()
+     #define TP_STATUS_COPY          2 // Frame is currently in transmission
+
+First, the kernel initializes all frames to TP_STATUS_KERNEL. To send a packet,
+the user fills a data buffer of an available frame, sets tp_len to current
+data buffer size and sets its status field to TP_STATUS_USER. This can be done
+on multiple frames. Once the user is ready to transmit, it calls send().
+Then all buffers with status equal to TP_STATUS_USER are forwarded to the
+network device. The kernel updates each status of sent frames with
+TP_STATUS_COPY until the end of transfer.
+At the end of each transfer, buffer status returns to TP_STATUS_KERNEL.
+
+    header->tp_len = in_i_size;
+    header->tp_status = TP_STATUS_USER;
+    retval = send(this->socket, NULL, 0, 0);
+
+The user can also use poll() to check if a buffer is available:
+(status == TP_STATUS_KERNEL)
+
+    struct pollfd pfd;
+    pfd.fd = fd;
+    pfd.revents = 0;
+    pfd.events = POLLOUT;
+    retval = poll(&pfd, 1, timeout);
+
 --------------------------------------------------------------------------------
 + THANKS
 --------------------------------------------------------------------------------
diff --git a/include/linux/if_packet.h b/include/linux/if_packet.h
index 18db066..a5d2f9a 100644
--- a/include/linux/if_packet.h
+++ b/include/linux/if_packet.h
@@ -46,6 +46,8 @@  struct sockaddr_ll
 #define PACKET_VERSION			10
 #define PACKET_HDRLEN			11
 #define PACKET_RESERVE			12
+#define PACKET_TX_RING			13
+#define PACKET_LOSS			14
 
 struct tpacket_stats
 {
diff --git a/net/packet/af_packet.c b/net/packet/af_packet.c
index 1fc4a78..e239696 100644
--- a/net/packet/af_packet.c
+++ b/net/packet/af_packet.c
@@ -157,7 +157,25 @@  struct packet_mreq_max
 };
 
 #ifdef CONFIG_PACKET_MMAP
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing);
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req,
+		int closing, int tx_ring);
+
+struct packet_ring_buffer {
+	char *			*pg_vec;
+	unsigned int		head;
+	unsigned int		frames_per_block;
+	unsigned int		frame_size;
+	unsigned int		frame_max;
+
+	unsigned int		pg_vec_order;
+	unsigned int		pg_vec_pages;
+	unsigned int		pg_vec_len;
+
+	atomic_t		pending;
+};
+
+struct packet_sock;
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg);
 #endif
 
 static void packet_flush_mclist(struct sock *sk);
@@ -167,11 +185,8 @@  struct packet_sock {
 	struct sock		sk;
 	struct tpacket_stats	stats;
 #ifdef CONFIG_PACKET_MMAP
-	char *			*pg_vec;
-	unsigned int		head;
-	unsigned int            frames_per_block;
-	unsigned int		frame_size;
-	unsigned int		frame_max;
+	struct packet_ring_buffer	rx_ring;
+	struct packet_ring_buffer	tx_ring;
 	int			copy_thresh;
 #endif
 	struct packet_type	prot_hook;
@@ -185,12 +200,10 @@  struct packet_sock {
 	struct packet_mclist	*mclist;
 #ifdef CONFIG_PACKET_MMAP
 	atomic_t		mapped;
-	unsigned int            pg_vec_order;
-	unsigned int		pg_vec_pages;
-	unsigned int		pg_vec_len;
 	enum tpacket_versions	tp_version;
 	unsigned int		tp_hdrlen;
 	unsigned int		tp_reserve;
+	unsigned int		tp_loss:1;
 #endif
 };
 
@@ -206,36 +219,33 @@  struct packet_skb_cb {
 
 #ifdef CONFIG_PACKET_MMAP
 
-static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
-				 int status)
+static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 {
-	unsigned int pg_vec_pos, frame_offset;
 	union {
 		struct tpacket_hdr *h1;
 		struct tpacket2_hdr *h2;
 		void *raw;
 	} h;
 
-	pg_vec_pos = position / po->frames_per_block;
-	frame_offset = position % po->frames_per_block;
-
-	h.raw = po->pg_vec[pg_vec_pos] + (frame_offset * po->frame_size);
+	h.raw = frame;
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		if (status != (h.h1->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL))
-			return NULL;
+		h.h1->tp_status = status;
+		flush_dcache_page(virt_to_page(&h.h1->tp_status));
 		break;
 	case TPACKET_V2:
-		if (status != (h.h2->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL))
-			return NULL;
+		h.h2->tp_status = status;
+		flush_dcache_page(virt_to_page(&h.h2->tp_status));
 		break;
+	default:
+		printk(KERN_ERR "TPACKET version not supported\n");
+		BUG();
 	}
-	return h.raw;
+	
+	smp_wmb();
 }
 
-static void __packet_set_status(struct packet_sock *po, void *frame, int status)
+static int __packet_get_status(struct packet_sock *po, void *frame)
 {
 	union {
 		struct tpacket_hdr *h1;
@@ -243,16 +253,73 @@  static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 		void *raw;
 	} h;
 
+	smp_rmb();
+
 	h.raw = frame;
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		h.h1->tp_status = status;
-		break;
+		flush_dcache_page(virt_to_page(&h.h1->tp_status));
+		return h.h1->tp_status;
 	case TPACKET_V2:
-		h.h2->tp_status = status;
-		break;
+		flush_dcache_page(virt_to_page(&h.h2->tp_status));
+		return h.h2->tp_status;
+	default:
+		printk(KERN_ERR "TPACKET version not supported\n");
+		BUG();
+		return 0;
 	}
 }
+
+static void *packet_lookup_frame(struct packet_sock *po,
+		struct packet_ring_buffer *rb,
+		unsigned int position,
+		int status)
+{
+	unsigned int pg_vec_pos, frame_offset;
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} h;
+
+	pg_vec_pos = position / rb->frames_per_block;
+	frame_offset = position % rb->frames_per_block;
+
+	h.raw = rb->pg_vec[pg_vec_pos] + (frame_offset * rb->frame_size);
+
+	if( status != __packet_get_status(po, h.raw) )
+		return NULL;
+
+	return h.raw;
+}
+
+static inline void *packet_current_rx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->rx_ring, po->rx_ring.head, status);
+}
+
+static inline void *packet_current_tx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->tx_ring, po->tx_ring.head, status);
+}
+
+static inline void *packet_previous_rx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->rx_ring.head ? po->rx_ring.head - 1 : po->rx_ring.frame_max;
+	return packet_lookup_frame(po, &po->rx_ring, previous, status);
+}
+
+static inline void *packet_previous_tx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->tx_ring.head ? po->tx_ring.head - 1 : po->tx_ring.frame_max;
+	return packet_lookup_frame(po, &po->tx_ring, previous, status);
+}
+
+static inline void packet_increment_head(struct packet_ring_buffer *buff)
+{
+	buff->head = buff->head != buff->frame_max ? buff->head+1 : 0;
+}
+
 #endif
 
 static inline struct packet_sock *pkt_sk(struct sock *sk)
@@ -648,7 +715,7 @@  static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 		macoff = netoff - maclen;
 	}
 
-	if (macoff + snaplen > po->frame_size) {
+	if (macoff + snaplen > po->rx_ring.frame_size) {
 		if (po->copy_thresh &&
 		    atomic_read(&sk->sk_rmem_alloc) + skb->truesize <
 		    (unsigned)sk->sk_rcvbuf) {
@@ -661,16 +728,16 @@  static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 			if (copy_skb)
 				skb_set_owner_r(copy_skb, sk);
 		}
-		snaplen = po->frame_size - macoff;
+		snaplen = po->rx_ring.frame_size - macoff;
 		if ((int)snaplen < 0)
 			snaplen = 0;
 	}
 
 	spin_lock(&sk->sk_receive_queue.lock);
-	h.raw = packet_lookup_frame(po, po->head, TP_STATUS_KERNEL);
+	h.raw = packet_current_rx_frame(po, TP_STATUS_KERNEL);
 	if (!h.raw)
 		goto ring_is_full;
-	po->head = po->head != po->frame_max ? po->head+1 : 0;
+	packet_increment_head(&po->rx_ring);
 	po->stats.tp_packets++;
 	if (copy_skb) {
 		status |= TP_STATUS_COPY;
@@ -727,7 +794,6 @@  static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 
 	__packet_set_status(po, h.raw, status);
 	smp_mb();
-
 	{
 		struct page *p_start, *p_end;
 		u8 *h_end = h.raw + macoff + snaplen - 1;
@@ -761,10 +827,238 @@  ring_is_full:
 	goto drop_n_restore;
 }
 
-#endif
+static void tpacket_destruct_skb(struct sk_buff *skb)
+{
+	struct packet_sock *po = pkt_sk(skb->sk);
+	void * ph;
+	
+	BUG_ON(skb == NULL);
+	
+	if (likely(po->tx_ring.pg_vec)) {
+		ph = packet_lookup_frame(po, &po->tx_ring, skb->mark, TP_STATUS_COPY);
+		BUG_ON(atomic_read(&po->tx_ring.pending) == 0);
+		atomic_dec(&po->tx_ring.pending);
+		__packet_set_status(po, ph, TP_STATUS_KERNEL);
+	}
+	
+	sock_wfree(skb);
+}
+
+static int tpacket_fill_skb(struct packet_sock *po, struct sk_buff * skb, void * frame,
+		struct net_device *dev, int size_max, __be16 proto,
+		unsigned char * addr)
+{
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} ph;
+	int to_write, offset, len, tp_len, nr_frags, len_max;
+	struct socket *sock = po->sk.sk_socket;
+	struct page *page;
+	void *data;
+	int err;
 
+	ph.raw = frame;
 
-static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+	skb->protocol = proto;
+	skb->dev = dev;
+	skb->priority = po->sk.sk_priority;
+	skb->mark = po->tx_ring.head;
+
+	switch(po->tp_version) {
+	case TPACKET_V2:
+		tp_len = ph.h2->tp_len;
+		break;
+	default:
+		tp_len = ph.h1->tp_len;
+		break;
+	}
+	if (unlikely(tp_len > size_max)) {
+		printk(KERN_ERR "packet size is too long (%d > %d)\n",
+				tp_len, size_max);
+		return -EMSGSIZE;
+	}
+
+	skb_reserve(skb, LL_RESERVED_SPACE(dev));
+	skb_reset_network_header(skb);
+
+	data = ph.raw + po->tp_hdrlen - sizeof(struct sockaddr_ll);
+	to_write = tp_len;
+
+	if (sock->type == SOCK_DGRAM) {
+		err = dev_hard_header(skb, dev, ntohs(proto), addr,
+				NULL, tp_len);
+		if (unlikely(err < 0))
+			return -EINVAL;
+	} else if (dev->hard_header_len ) {
+		/* net device doesn't like empty head */
+		if(unlikely(tp_len <= dev->hard_header_len)) {
+			printk(KERN_ERR "packet size is too short "
+					"(%d < %d)\n", tp_len,
+					dev->hard_header_len);
+			return -EINVAL;
+		}
+
+		skb_push(skb, dev->hard_header_len);
+		err = skb_store_bits(skb, 0, data,
+				dev->hard_header_len);
+		if (unlikely(err))
+			return err;
+
+		data += dev->hard_header_len;
+		to_write -= dev->hard_header_len;
+	}
+
+	err = -EFAULT;
+	page = virt_to_page(data);
+	offset = offset_in_page(data);
+	len_max = PAGE_SIZE - offset;
+	len = ((to_write > len_max) ? len_max : to_write);
+
+	skb->data_len = to_write;
+	skb->len += to_write;
+	skb->truesize += to_write;
+	atomic_add(to_write, &po->sk.sk_wmem_alloc);
+
+	while ( likely(to_write) ) {
+		nr_frags = skb_shinfo(skb)->nr_frags;
+
+		if(unlikely(nr_frags >= MAX_SKB_FRAGS)) {
+			printk(KERN_ERR "Packet exceed the number "
+					"of skb frags(%lu)\n",
+					MAX_SKB_FRAGS);
+			return -EFAULT;
+		}
+
+		flush_dcache_page(page);
+		get_page(page);
+		skb_fill_page_desc(skb,
+				nr_frags,
+				page++, offset, len);
+		to_write -= len;
+		offset = 0;
+		len_max = PAGE_SIZE;
+		len = ((to_write > len_max) ? len_max : to_write);
+	}
+
+	return tp_len;
+}
+
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg)
+{
+	struct socket *sock;
+	struct sk_buff *skb;
+	struct net_device *dev;
+	__be16 proto;
+	int ifindex, err, reserve = 0;
+	void * ph;
+	struct sockaddr_ll *saddr=(struct sockaddr_ll *)msg->msg_name;
+	int tp_len, size_max;
+	unsigned char *addr;
+	int len_sum = 0;
+	int status = 0;
+
+	sock = po->sk.sk_socket;
+
+	mutex_lock(&po->pg_vec_lock);
+
+	err = -EBUSY;
+	if (saddr == NULL) {
+		ifindex	= po->ifindex;
+		proto	= po->num;
+		addr	= NULL;
+	} else {
+		err = -EINVAL;
+		if (msg->msg_namelen < sizeof(struct sockaddr_ll))
+			goto out;
+		if (msg->msg_namelen < (saddr->sll_halen + offsetof(struct sockaddr_ll, sll_addr)))
+			goto out;
+		ifindex	= saddr->sll_ifindex;
+		proto	= saddr->sll_protocol;
+		addr	= saddr->sll_addr;
+	}
+
+	dev = dev_get_by_index(sock_net(&po->sk), ifindex);
+	err = -ENXIO;
+	if (unlikely(dev == NULL))
+		goto out;
+
+	reserve = dev->hard_header_len;
+
+	err = -ENETDOWN;
+	if (unlikely(!(dev->flags & IFF_UP)))
+		goto out_put;
+
+	size_max = po->tx_ring.frame_size - sizeof(struct skb_shared_info)
+		- po->tp_hdrlen - LL_ALLOCATED_SPACE(dev) - sizeof(struct sockaddr_ll);
+
+	if (size_max > dev->mtu + reserve)
+		size_max = dev->mtu + reserve;
+
+	do
+	{
+		ph = packet_current_tx_frame(po, TP_STATUS_USER);
+		if(unlikely(ph == NULL)) {
+			schedule();
+			continue;
+		}
+
+		status = TP_STATUS_USER;
+		skb = sock_alloc_send_skb(&po->sk, LL_ALLOCATED_SPACE(dev) + sizeof(struct sockaddr_ll),
+				0, &err);
+		if (unlikely(skb == NULL))
+			goto out_status;
+
+		tp_len = tpacket_fill_skb(po, skb, ph, dev, size_max, proto,
+				addr);
+		if(unlikely(tp_len < 0)) {
+			if(po->tp_loss) {
+				__packet_set_status(po, ph, TP_STATUS_KERNEL);
+				packet_increment_head(&po->tx_ring);
+				kfree_skb(skb);
+				continue;
+			} else {
+				status = TP_STATUS_LOSING;
+				err = tp_len;
+				goto out_status;
+			}
+		}
+
+		skb->destructor = tpacket_destruct_skb;
+		__packet_set_status(po, ph, TP_STATUS_COPY);
+		atomic_inc(&po->tx_ring.pending);
+
+		status = TP_STATUS_USER;
+		err = dev_queue_xmit(skb);
+		if (unlikely(err > 0 && (err = net_xmit_errno(err)) != 0))
+			goto out_xmit;
+		packet_increment_head(&po->tx_ring);
+		len_sum += tp_len;
+	}
+	while(likely(((ph != NULL)
+			|| ((!(msg->msg_flags & MSG_DONTWAIT))
+				 && (atomic_read(&po->tx_ring.pending))))
+		    ));
+
+	err = len_sum;
+	goto out_put;
+
+out_xmit:
+	skb->destructor = sock_wfree;
+	atomic_dec(&po->tx_ring.pending);
+out_status:
+	__packet_set_status(po, ph, status);
+	kfree_skb(skb);
+out_put:
+	dev_put(dev);
+out:
+	mutex_unlock(&po->pg_vec_lock);
+	return err;
+}
+#endif
+
+static int packet_snd(struct socket *sock,
 			  struct msghdr *msg, size_t len)
 {
 	struct sock *sk = sock->sk;
@@ -855,6 +1149,19 @@  out:
 	return err;
 }
 
+static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+		struct msghdr *msg, size_t len)
+{
+#ifdef CONFIG_PACKET_MMAP
+	struct sock *sk = sock->sk;
+	struct packet_sock *po = pkt_sk(sk);
+	if (po->tx_ring.pg_vec)
+		return tpacket_snd(po, msg);
+	else
+#endif
+		return packet_snd(sock, msg, len);
+}
+
 /*
  *	Close a PACKET socket. This is fairly simple. We immediately go
  *	to 'closed' state and remove our protocol entry in the device list.
@@ -894,10 +1201,15 @@  static int packet_release(struct socket *sock)
 	packet_flush_mclist(sk);
 
 #ifdef CONFIG_PACKET_MMAP
-	if (po->pg_vec) {
+	{
 		struct tpacket_req req;
 		memset(&req, 0, sizeof(req));
-		packet_set_ring(sk, &req, 1);
+
+		if (po->rx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 0);
+
+		if (po->tx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 1);
 	}
 #endif
 
@@ -1416,6 +1728,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 #ifdef CONFIG_PACKET_MMAP
 	case PACKET_RX_RING:
+	case PACKET_TX_RING:
 	{
 		struct tpacket_req req;
 
@@ -1423,7 +1736,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 			return -EINVAL;
 		if (copy_from_user(&req,optval,sizeof(req)))
 			return -EFAULT;
-		return packet_set_ring(sk, &req, 0);
+		return packet_set_ring(sk, &req, 0, optname == PACKET_TX_RING);
 	}
 	case PACKET_COPY_THRESH:
 	{
@@ -1443,7 +1756,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1462,13 +1775,26 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
 		po->tp_reserve = val;
 		return 0;
 	}
+	case PACKET_LOSS:
+	{
+		unsigned int val;
+
+		if (optlen != sizeof(val))
+			return -EINVAL;
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
+			return -EBUSY;
+		if (copy_from_user(&val, optval, sizeof(val)))
+			return -EFAULT;
+		po->tp_loss = !!val;
+		return 0;
+	}
 #endif
 	case PACKET_AUXDATA:
 	{
@@ -1574,6 +1900,12 @@  static int packet_getsockopt(struct socket *sock, int level, int optname,
 		val = po->tp_reserve;
 		data = &val;
 		break;
+	case PACKET_LOSS:
+		if (len > sizeof(unsigned int))
+			len = sizeof(unsigned int);
+		val = po->tp_loss;
+		data = &val;
+		break;
 #endif
 	default:
 		return -ENOPROTOOPT;
@@ -1706,13 +2038,17 @@  static unsigned int packet_poll(struct file * file, struct socket *sock,
 	unsigned int mask = datagram_poll(file, sock, wait);
 
 	spin_lock_bh(&sk->sk_receive_queue.lock);
-	if (po->pg_vec) {
-		unsigned last = po->head ? po->head-1 : po->frame_max;
-
-		if (packet_lookup_frame(po, last, TP_STATUS_USER))
+	if (po->rx_ring.pg_vec) {
+		if (packet_previous_rx_frame(po, TP_STATUS_USER))
 			mask |= POLLIN | POLLRDNORM;
 	}
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
+	spin_lock_bh(&sk->sk_write_queue.lock);
+	if (po->tx_ring.pg_vec) {
+		if (packet_current_tx_frame(po, TP_STATUS_KERNEL))
+			mask |= POLLOUT | POLLWRNORM;
+	}
+	spin_unlock_bh(&sk->sk_write_queue.lock);
 	return mask;
 }
 
@@ -1788,21 +2124,32 @@  out_free_pgvec:
 	goto out;
 }
 
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing)
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing, int tx_ring)
 {
 	char **pg_vec = NULL;
 	struct packet_sock *po = pkt_sk(sk);
 	int was_running, order = 0;
+	struct packet_ring_buffer *rb;
+	struct sk_buff_head *rb_queue;
 	__be16 num;
-	int err = 0;
+	int err;
 
-	if (req->tp_block_nr) {
-		int i;
+	rb = tx_ring ? &po->tx_ring : &po->rx_ring;
+	rb_queue = tx_ring ? &sk->sk_write_queue : &sk->sk_receive_queue;
 
-		/* Sanity tests and some calculations */
+	err = -EBUSY;
+	if(!closing) {
+		if (atomic_read(&po->mapped))
+			goto out;
+		if (atomic_read(&rb->pending))
+			goto out;
+	}
 
-		if (unlikely(po->pg_vec))
-			return -EBUSY;
+	if (req->tp_block_nr) {
+		/* Sanity tests and some calculations */
+		err = -EBUSY;
+		if (unlikely(rb->pg_vec))
+			goto out;
 
 		switch (po->tp_version) {
 		case TPACKET_V1:
@@ -1813,42 +2160,35 @@  static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 			break;
 		}
 
+		err = -EINVAL;
 		if (unlikely((int)req->tp_block_size <= 0))
-			return -EINVAL;
+			goto out;
 		if (unlikely(req->tp_block_size & (PAGE_SIZE - 1)))
-			return -EINVAL;
+			goto out;
 		if (unlikely(req->tp_frame_size < po->tp_hdrlen +
-						  po->tp_reserve))
-			return -EINVAL;
+					po->tp_reserve))
+			goto out;
 		if (unlikely(req->tp_frame_size & (TPACKET_ALIGNMENT - 1)))
-			return -EINVAL;
+			goto out;
 
-		po->frames_per_block = req->tp_block_size/req->tp_frame_size;
-		if (unlikely(po->frames_per_block <= 0))
-			return -EINVAL;
-		if (unlikely((po->frames_per_block * req->tp_block_nr) !=
-			     req->tp_frame_nr))
-			return -EINVAL;
+		rb->frames_per_block = req->tp_block_size/req->tp_frame_size;
+		if (unlikely(rb->frames_per_block <= 0))
+			goto out;
+		if (unlikely((rb->frames_per_block * req->tp_block_nr) !=
+					req->tp_frame_nr))
+			goto out;
 
 		err = -ENOMEM;
 		order = get_order(req->tp_block_size);
 		pg_vec = alloc_pg_vec(req, order);
 		if (unlikely(!pg_vec))
 			goto out;
-
-		for (i = 0; i < req->tp_block_nr; i++) {
-			void *ptr = pg_vec[i];
-			int k;
-
-			for (k = 0; k < po->frames_per_block; k++) {
-				__packet_set_status(po, ptr, TP_STATUS_KERNEL);
-				ptr += req->tp_frame_size;
-			}
-		}
-		/* Done */
-	} else {
+	}
+	/* Done */
+	else {
+		err = -EINVAL;
 		if (unlikely(req->tp_frame_nr))
-			return -EINVAL;
+			goto out;
 	}
 
 	lock_sock(sk);
@@ -1872,20 +2212,19 @@  static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 	if (closing || atomic_read(&po->mapped) == 0) {
 		err = 0;
 #define XC(a, b) ({ __typeof__ ((a)) __t; __t = (a); (a) = (b); __t; })
-
-		spin_lock_bh(&sk->sk_receive_queue.lock);
-		pg_vec = XC(po->pg_vec, pg_vec);
-		po->frame_max = (req->tp_frame_nr - 1);
-		po->head = 0;
-		po->frame_size = req->tp_frame_size;
-		spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-		order = XC(po->pg_vec_order, order);
-		req->tp_block_nr = XC(po->pg_vec_len, req->tp_block_nr);
-
-		po->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
-		po->prot_hook.func = po->pg_vec ? tpacket_rcv : packet_rcv;
-		skb_queue_purge(&sk->sk_receive_queue);
+		spin_lock_bh(&rb_queue->lock);
+		pg_vec = XC(rb->pg_vec, pg_vec);
+		rb->frame_max = (req->tp_frame_nr - 1);
+		rb->head = 0;
+		rb->frame_size = req->tp_frame_size;
+		spin_unlock_bh(&rb_queue->lock);
+
+		order = XC(rb->pg_vec_order, order);
+		req->tp_block_nr = XC(rb->pg_vec_len, req->tp_block_nr);
+
+		rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
+		po->prot_hook.func = (po->rx_ring.pg_vec) ? tpacket_rcv : packet_rcv;
+		skb_queue_purge(rb_queue);
 #undef XC
 		if (atomic_read(&po->mapped))
 			printk(KERN_DEBUG "packet_mmap: vma is busy: %d\n", atomic_read(&po->mapped));
@@ -1913,7 +2252,8 @@  static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 {
 	struct sock *sk = sock->sk;
 	struct packet_sock *po = pkt_sk(sk);
-	unsigned long size;
+	unsigned long size, expected_size;
+	struct packet_ring_buffer *rb;
 	unsigned long start;
 	int err = -EINVAL;
 	int i;
@@ -1921,26 +2261,39 @@  static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 	if (vma->vm_pgoff)
 		return -EINVAL;
 
-	size = vma->vm_end - vma->vm_start;
-
 	mutex_lock(&po->pg_vec_lock);
-	if (po->pg_vec == NULL)
+
+	expected_size = 0;
+	for(rb = &po->rx_ring; rb <= &po->tx_ring; rb++) {
+		if (rb->pg_vec)
+			expected_size += rb->pg_vec_len * rb->pg_vec_pages * PAGE_SIZE;
+	}
+
+	if (expected_size == 0)
 		goto out;
-	if (size != po->pg_vec_len*po->pg_vec_pages*PAGE_SIZE)
+
+	size = vma->vm_end - vma->vm_start;
+	if (size != expected_size)
 		goto out;
 
 	start = vma->vm_start;
-	for (i = 0; i < po->pg_vec_len; i++) {
-		struct page *page = virt_to_page(po->pg_vec[i]);
-		int pg_num;
-
-		for (pg_num = 0; pg_num < po->pg_vec_pages; pg_num++, page++) {
-			err = vm_insert_page(vma, start, page);
-			if (unlikely(err))
-				goto out;
-			start += PAGE_SIZE;
+	for(rb = &po->rx_ring; rb <= &po->tx_ring; rb++) {
+		if (rb->pg_vec == NULL)
+			continue;
+
+		for (i = 0; i < rb->pg_vec_len; i++) {
+			struct page *page = virt_to_page(rb->pg_vec[i]);
+			int pg_num;
+
+			for (pg_num = 0; pg_num < rb->pg_vec_pages; pg_num++, page++) {
+				err = vm_insert_page(vma, start, page);
+				if (unlikely(err))
+					goto out;
+				start += PAGE_SIZE;
+			}
 		}
-	}
+	}	
+	
 	atomic_inc(&po->mapped);
 	vma->vm_ops = &packet_mmap_ops;
 	err = 0;