diff mbox

Packet socket: mmapped IO: PACKET_TX_RING

Message ID 1225100005.29750.46.camel@localhost
State Changes Requested, archived
Delegated to: David Miller
Headers show

Commit Message

Johann Baudy Oct. 27, 2008, 9:33 a.m. UTC
New packet socket feature that makes packet socket more efficient for transmission.
- It reduces number of system call through a PACKET_TX_RING mechanism, based on PACKET_RX_RING (Circular buffer allocated in kernel space which is mmapped from user space).
- It minimizes CPU copy using fragmented SKB.

Signed-off-by: Johann Baudy <johann.baudy@gnu-log.net>

--
Hi All,

Before going on submitting process, I would like to get your opinion on those below points:

1#:
In order to make such feature, each packet header of the circular buffer can be set to one of those three statuses:
- TP_STATUS_USER: packet buffer needs to be transmitted. 
On this status, Kernel: 
 - Allocates a new skb
 - Attaches packet buffer pages 
 - Changes skb destructor 
 - Stores packet header pointer or index into skb 
 - Changes the packet header status to TP_STATUS_COPY;
 - Sends it to device.

- TP_STATUS_COPY: packet transmission is ongoing
On this status, the skb destructor (called during skb release)
 - Gets packet header pointer linked to skb pointer;
 - Changes the packet header status to TP_STATUS_KERNEL;

- TP_STATUS_KERNEL: packet buffer has been transmitted and buffer is ready for user.

As you can see, this skb destructor needs packet header pointer related to sk_buff pointer to change header status. 
I've first used skb->cb (control buffer) to forward this parameter to destructor. 
Unfortunately, this one seems to be overwritten in packet queue mechanism. So I've finally chosen skb->mark to store buffer index.

Can I use skb->mark in such condition?
If not, what is the best solution:
- new field in sk_buff struct?
- a sk_buff pointer array [NB_FRAME] (one index matches to one skb pointer)? (parsing is not really good for cpu load)
- other ?

2#:
Do I need to protect send() procedure with some locks? 
Especially when changing status from TP_STATUS_USER to TP_STATUS_COPY to prevent kernel from sending a packet buffer twice? 
(If two send() are called from different threads In SMP for example)
Or is it implicit?

Thanks in advance,
Johann Baudy

--
  Documentation/networking/packet_mmap.txt |  132 ++++++++--
 include/linux/if_packet.h                |    3 +-
 net/packet/af_packet.c                   |  422 +++++++++++++++++++++++++-----
 3 files changed, 470 insertions(+), 87 deletions(-)



--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

David Miller Oct. 28, 2008, 10:44 p.m. UTC | #1
From: Johann Baudy <johaahn@gmail.com>
Date: Mon, 27 Oct 2008 10:33:25 +0100

> @@ -100,7 +101,7 @@ struct tpacket2_hdr
>  enum tpacket_versions
>  {
>  	TPACKET_V1,
> -	TPACKET_V2,
> +	TPACKET_V2
>  };
>  
>  /*

Please don't do this, the comma there is intentional.

If a future patch adds a new version, the diff will be the
addition of one new line, instead of one changed line and one
new line.
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/Documentation/networking/packet_mmap.txt b/Documentation/networking/packet_mmap.txt
index 07c53d5..32f5c33 100644
--- a/Documentation/networking/packet_mmap.txt
+++ b/Documentation/networking/packet_mmap.txt
@@ -4,8 +4,8 @@ 
 
 This file documents the CONFIG_PACKET_MMAP option available with the PACKET
 socket interface on 2.4 and 2.6 kernels. This type of sockets is used for 
-capture network traffic with utilities like tcpdump or any other that uses 
-the libpcap library. 
+capture network traffic with utilities like tcpdump or any other that needs
+raw acces to network interface.
 
 You can find the latest version of this document at
 
@@ -14,6 +14,7 @@  You can find the latest version of this document at
 Please send me your comments to
 
     Ulisses Alonso CamarĂ³ <uaca@i.hate.spam.alumni.uv.es>
+    Johann Baudy <johann.baudy@gnu-log.net> (TX RING)
 
 -------------------------------------------------------------------------------
 + Why use PACKET_MMAP
@@ -25,19 +26,24 @@  to capture each packet, it requires two if you want to get packet's
 timestamp (like libpcap always does).
 
 In the other hand PACKET_MMAP is very efficient. PACKET_MMAP provides a size 
-configurable circular buffer mapped in user space. This way reading packets just 
-needs to wait for them, most of the time there is no need to issue a single 
-system call. By using a shared buffer between the kernel and the user 
-also has the benefit of minimizing packet copies.
-
-It's fine to use PACKET_MMAP to improve the performance of the capture process, 
-but it isn't everything. At least, if you are capturing at high speeds (this 
-is relative to the cpu speed), you should check if the device driver of your 
-network interface card supports some sort of interrupt load mitigation or 
-(even better) if it supports NAPI, also make sure it is enabled.
+configurable circular buffer mapped in user space that can be used to either
+send or receive packets. This way reading packets just needs to wait for them,
+most of the time there is no need to issue a single system call. Concerning
+transmission, multiple packets can be sent through one system call to get the
+highest bandwidth.
+By using a shared buffer between the kernel and the user also has the benefit
+of minimizing packet copies.
+
+It's fine to use PACKET_MMAP to improve the performance of the capture and
+transmission process, but it isn't everything. At least, if you are capturing
+at high speeds (this is relative to the cpu speed), you should check if the
+device driver of your network interface card supports some sort of interrupt
+load mitigation or (even better) if it supports NAPI, also make sure it is
+enabled. For transmission, check the MTU (Maximum Transmission Unit) used and
+supported by devices of your network.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP
++ How to use CONFIG_PACKET_MMAP to improve capture process
 --------------------------------------------------------------------------------
 
 From the user standpoint, you should use the higher level libpcap library, which
@@ -57,7 +63,7 @@  the low level details or want to improve libpcap by including PACKET_MMAP
 support.
 
 --------------------------------------------------------------------------------
-+ How to use CONFIG_PACKET_MMAP directly
++ How to use CONFIG_PACKET_MMAP directly to improve capture porcess
 --------------------------------------------------------------------------------
 
 From the system calls stand point, the use of PACKET_MMAP involves
@@ -66,6 +72,7 @@  the following process:
 
 [setup]     socket() -------> creation of the capture socket
             setsockopt() ---> allocation of the circular buffer (ring)
+                              option: PACKET_RX_RING
             mmap() ---------> mapping of the allocated buffer to the
                               user process
 
@@ -97,13 +104,75 @@  also the mapping of the circular buffer in the user process and
 the use of this buffer.
 
 --------------------------------------------------------------------------------
++ How to use CONFIG_PACKET_MMAP directly to improve transmission process
+--------------------------------------------------------------------------------
+Transmission process is similar to capture as shown below.
+
+[setup]          socket() -------> creation of the transmission socket
+                 setsockopt() ---> allocation of the circular buffer (ring)
+                                   option: PACKET_TX_RING
+                 bind() ---------> bind transmission socket with a network interface
+                 mmap() ---------> mapping of the allocated buffer to the
+                                   user process
+
+[transmission]   poll() ---------> wait for free packets (optional)
+                 send() ---------> send all packets that are set as ready in
+                                   the ring
+                                   The flag MSG_DONTWAIT can be used to return
+                                   before end of transfer.
+
+[shutdown]  close() --------> destruction of the transmission socket and
+                              deallocation of all associated resources.
+
+Binding the socket to your network interface is mandatory (with zero copy) to
+know the header size of frames used in the circular buffer.
+
+As capture, each frame contains two parts:
+
+ --------------------
+| struct tpacket_hdr | Header. It contains the status of
+|                    | of this frame
+|--------------------|
+| data buffer        |
+.                    .  Data that will be sent over the network interface.
+.                    .
+ --------------------
+
+ bind() associates the socket to your network interface thanks to
+ sll_ifindex parameter of struct sockaddr_ll.
+
+ Initialization example:
+
+ struct sockaddr_ll my_addr;
+ struct ifreq s_ifr;
+ ...
+
+ strncpy (s_ifr.ifr_name, "eth0", sizeof(s_ifr.ifr_name));
+
+ /* get interface index of eth0 */
+ ioctl(this->socket, SIOCGIFINDEX, &s_ifr);
+
+ /* fill sockaddr_ll struct to prepare binding */
+ my_addr.sll_family = AF_PACKET;
+ my_addr.sll_protocol = ETH_P_ALL;
+ my_addr.sll_ifindex =  s_ifr.ifr_ifindex;
+
+ /* bind socket to eth0 */
+ bind(this->socket, (struct sockaddr *)&my_addr, sizeof(struct sockaddr_ll));
+
+ A complete tutorial is available at: http://wiki.gnu-log.net/
+
+--------------------------------------------------------------------------------
 + PACKET_MMAP settings
 --------------------------------------------------------------------------------
 
 
 To setup PACKET_MMAP from user level code is done with a call like
 
+ - Capture process
      setsockopt(fd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))
+ - Transmission process
+     setsockopt(fd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req))
 
 The most significant argument in the previous call is the req parameter, 
 this parameter must to have the following structure:
@@ -117,11 +186,11 @@  this parameter must to have the following structure:
     };
 
 This structure is defined in /usr/include/linux/if_packet.h and establishes a 
-circular buffer (ring) of unswappable memory mapped in the capture process. 
+circular buffer (ring) of unswappable memory.
 Being mapped in the capture process allows reading the captured frames and 
 related meta-information like timestamps without requiring a system call.
 
-Captured frames are grouped in blocks. Each block is a physically contiguous 
+Frames are grouped in blocks. Each block is a physically contiguous
 region of memory and holds tp_block_size/tp_frame_size frames. The total number 
 of blocks is tp_block_nr. Note that tp_frame_nr is a redundant parameter because
 
@@ -336,6 +405,7 @@  struct tpacket_hdr). If this field is 0 means that the frame is ready
 to be used for the kernel, If not, there is a frame the user can read 
 and the following flags apply:
 
++++ Capture process:
      from include/linux/if_packet.h
 
      #define TP_STATUS_COPY          2 
@@ -391,6 +461,36 @@  packets are in the ring:
 It doesn't incur in a race condition to first check the status value and 
 then poll for frames.
 
+
+++ Transmission process
+Those defines are also used for transmission:
+
+     #define TP_STATUS_KERNEL        0 // Frame is available
+     #define TP_STATUS_USER          1 // Frame will be sent on next send()
+     #define TP_STATUS_COPY          2 // Frame is currently in transmission
+
+First, the kernel initializes all frames to TP_STATUS_KERNEL. To send a packet,
+the user fills a data buffer of an available frame, sets tp_len to current
+data buffer size and sets its status field to TP_STATUS_USER. This can be done
+on multiple frames. Once the user is ready to transmit, it calls send().
+Then all buffers with status equal to TP_STATUS_USER are forwarded to the
+network device. The kernel updates each status of sent frames with
+TP_STATUS_COPY until the end of transfer.
+At the end of each transfer, buffer status returns to TP_STATUS_KERNEL.
+
+    header->tp_len = in_i_size;
+    header->tp_status = TP_STATUS_USER;
+    retval = send(this->socket, NULL, 0, 0);
+
+The user can also use poll() to check if a buffer is available:
+(status == TP_STATUS_KERNEL)
+
+    struct pollfd pfd;
+    pfd.fd = fd;
+    pfd.revents = 0;
+    pfd.events = POLLOUT;
+    retval = poll(&pfd, 1, timeout);
+
 --------------------------------------------------------------------------------
 + THANKS
 --------------------------------------------------------------------------------
diff --git a/include/linux/if_packet.h b/include/linux/if_packet.h
index 18db066..f6a247a 100644
--- a/include/linux/if_packet.h
+++ b/include/linux/if_packet.h
@@ -46,6 +46,7 @@  struct sockaddr_ll
 #define PACKET_VERSION			10
 #define PACKET_HDRLEN			11
 #define PACKET_RESERVE			12
+#define PACKET_TX_RING			13
 
 struct tpacket_stats
 {
@@ -100,7 +101,7 @@  struct tpacket2_hdr
 enum tpacket_versions
 {
 	TPACKET_V1,
-	TPACKET_V2,
+	TPACKET_V2
 };
 
 /*
diff --git a/net/packet/af_packet.c b/net/packet/af_packet.c
index c718e7e..c52462a 100644
--- a/net/packet/af_packet.c
+++ b/net/packet/af_packet.c
@@ -156,7 +156,23 @@  struct packet_mreq_max
 };
 
 #ifdef CONFIG_PACKET_MMAP
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing);
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req,
+		int closing, int tx_ring);
+
+struct packet_ring_buffer {
+	char *			*pg_vec;
+	unsigned int		head;
+	unsigned int		frames_per_block;
+	unsigned int		frame_size;
+	unsigned int		frame_max;
+
+	unsigned int		pg_vec_order;
+	unsigned int		pg_vec_pages;
+	unsigned int		pg_vec_len;
+};
+
+struct packet_sock;
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg);
 #endif
 
 static void packet_flush_mclist(struct sock *sk);
@@ -166,12 +182,10 @@  struct packet_sock {
 	struct sock		sk;
 	struct tpacket_stats	stats;
 #ifdef CONFIG_PACKET_MMAP
-	char *			*pg_vec;
-	unsigned int		head;
-	unsigned int            frames_per_block;
-	unsigned int		frame_size;
-	unsigned int		frame_max;
+	struct packet_ring_buffer	rx_ring;
+	struct packet_ring_buffer	tx_ring;
 	int			copy_thresh;
+	atomic_t		tx_pending_skb;
 #endif
 	struct packet_type	prot_hook;
 	spinlock_t		bind_lock;
@@ -183,9 +197,6 @@  struct packet_sock {
 	struct packet_mclist	*mclist;
 #ifdef CONFIG_PACKET_MMAP
 	atomic_t		mapped;
-	unsigned int            pg_vec_order;
-	unsigned int		pg_vec_pages;
-	unsigned int		pg_vec_len;
 	enum tpacket_versions	tp_version;
 	unsigned int		tp_hdrlen;
 	unsigned int		tp_reserve;
@@ -204,8 +215,10 @@  struct packet_skb_cb {
 
 #ifdef CONFIG_PACKET_MMAP
 
-static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
-				 int status)
+static void *packet_lookup_frame(struct packet_sock *po,
+		struct packet_ring_buffer *buff,
+		unsigned int position,
+		int status)
 {
 	unsigned int pg_vec_pos, frame_offset;
 	union {
@@ -214,25 +227,50 @@  static void *packet_lookup_frame(struct packet_sock *po, unsigned int position,
 		void *raw;
 	} h;
 
-	pg_vec_pos = position / po->frames_per_block;
-	frame_offset = position % po->frames_per_block;
+	pg_vec_pos = position / buff->frames_per_block;
+	frame_offset = position % buff->frames_per_block;
 
-	h.raw = po->pg_vec[pg_vec_pos] + (frame_offset * po->frame_size);
+	h.raw = buff->pg_vec[pg_vec_pos] + (frame_offset * buff->frame_size);
 	switch (po->tp_version) {
 	case TPACKET_V1:
-		if (status != h.h1->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
+		if (status != h.h1->tp_status)
 			return NULL;
 		break;
 	case TPACKET_V2:
-		if (status != h.h2->tp_status ? TP_STATUS_USER :
-						TP_STATUS_KERNEL)
+		if (status != h.h2->tp_status)
 			return NULL;
 		break;
 	}
 	return h.raw;
 }
 
+static inline void *packet_current_rx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->rx_ring, po->rx_ring.head, status);
+}
+
+static inline void *packet_current_tx_frame(struct packet_sock *po, int status)
+{
+	return packet_lookup_frame(po, &po->tx_ring, po->tx_ring.head, status);
+}
+
+static inline void *packet_previous_rx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->rx_ring.head ? po->rx_ring.head - 1 : po->rx_ring.frame_max;
+	return packet_lookup_frame(po, &po->rx_ring, previous, status);
+}
+
+static inline void *packet_previous_tx_frame(struct packet_sock *po, int status)
+{
+	unsigned int previous = po->tx_ring.head ? po->tx_ring.head - 1 : po->tx_ring.frame_max;
+	return packet_lookup_frame(po, &po->tx_ring, previous, status);
+}
+
+static inline void packet_increment_head(struct packet_ring_buffer *buff)
+{
+	buff->head = buff->head != buff->frame_max ? buff->head+1 : 0;
+}
+
 static void __packet_set_status(struct packet_sock *po, void *frame, int status)
 {
 	union {
@@ -646,7 +684,7 @@  static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 		macoff = netoff - maclen;
 	}
 
-	if (macoff + snaplen > po->frame_size) {
+	if (macoff + snaplen > po->rx_ring.frame_size) {
 		if (po->copy_thresh &&
 		    atomic_read(&sk->sk_rmem_alloc) + skb->truesize <
 		    (unsigned)sk->sk_rcvbuf) {
@@ -659,16 +697,16 @@  static int tpacket_rcv(struct sk_buff *skb, struct net_device *dev, struct packe
 			if (copy_skb)
 				skb_set_owner_r(copy_skb, sk);
 		}
-		snaplen = po->frame_size - macoff;
+		snaplen = po->rx_ring.frame_size - macoff;
 		if ((int)snaplen < 0)
 			snaplen = 0;
 	}
 
 	spin_lock(&sk->sk_receive_queue.lock);
-	h.raw = packet_lookup_frame(po, po->head, TP_STATUS_KERNEL);
+	h.raw = packet_current_rx_frame(po, TP_STATUS_KERNEL);
 	if (!h.raw)
 		goto ring_is_full;
-	po->head = po->head != po->frame_max ? po->head+1 : 0;
+	packet_increment_head(&po->rx_ring);
 	po->stats.tp_packets++;
 	if (copy_skb) {
 		status |= TP_STATUS_COPY;
@@ -759,10 +797,212 @@  ring_is_full:
 	goto drop_n_restore;
 }
 
-#endif
+static void tpacket_destruct_skb(struct sk_buff *skb)
+{
+	struct packet_sock *po = pkt_sk(skb->sk);
+	void * ph;
 
+	BUG_ON(skb == NULL);
+	ph = packet_lookup_frame( po, &po->tx_ring, skb->mark, TP_STATUS_COPY);
 
-static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+	BUG_ON(ph == NULL);
+	BUG_ON(atomic_read(&po->tx_pending_skb) == 0);
+
+	atomic_dec(&po->tx_pending_skb);
+	__packet_set_status(po, ph, TP_STATUS_KERNEL);
+
+	sock_wfree(skb);
+}
+
+static int tpacket_fill_skb(struct packet_sock *po, struct sk_buff * skb, void * frame,
+		struct net_device *dev, int size_max, __be16 proto,
+		unsigned char * addr)
+{
+	union {
+		struct tpacket_hdr *h1;
+		struct tpacket2_hdr *h2;
+		void *raw;
+	} ph;
+	int to_write, offset, len, tp_len;
+	struct socket *sock = po->sk.sk_socket;
+	struct page *page;
+	void *data;
+	int err;
+
+
+	ph.raw = frame;
+
+	skb->protocol = proto;
+	skb->dev = dev;
+	skb->priority = po->sk.sk_priority;
+	skb->destructor = tpacket_destruct_skb;
+	skb->mark = po->tx_ring.head;
+
+	switch(po->tp_version) {
+	case TPACKET_V2:
+		tp_len = ph.h1->tp_len;
+		break;
+	default:
+		tp_len = ph.h1->tp_len;
+		break;
+	}
+
+	if (unlikely(tp_len > size_max)) {
+		printk(KERN_ERR "packet size is too long (%d > %d)\n",
+				tp_len, size_max);
+		return -EMSGSIZE;
+	}
+
+	skb_reserve(skb, LL_RESERVED_SPACE(dev));
+	data = ph.raw + po->tp_hdrlen;
+
+	if (sock->type == SOCK_DGRAM) {
+		err = dev_hard_header(skb, dev, ntohs(proto), addr,
+				NULL, tp_len);
+		if (unlikely(err < 0))
+			return -EINVAL;
+	} else if (dev->hard_header_len ) {
+		/* net device doesn't like empty head */
+		if(unlikely(tp_len <= dev->hard_header_len)) {
+			printk(KERN_ERR "packet size is too short "
+					"(%d < %d)\n", tp_len,
+					dev->hard_header_len);
+			return -EINVAL;
+		}
+
+		skb_push(skb, dev->hard_header_len);
+		err = skb_store_bits(skb, 0, data,
+				dev->hard_header_len);
+		if (unlikely(err))
+			return err;
+	}
+
+	err = -EFAULT;
+	to_write = tp_len - dev->hard_header_len;
+	data += dev->hard_header_len;
+	page = virt_to_page(data);
+	len = ((to_write > PAGE_SIZE) ? PAGE_SIZE : to_write);
+
+	offset = (int)((long)data & (~PAGE_MASK));
+	len -= offset;
+
+	skb->data_len = to_write;
+	skb->len += to_write;
+
+	while ( likely(to_write) ) {
+		get_page(page);
+		skb_fill_page_desc(skb,
+				skb_shinfo(skb)->nr_frags,
+				page++, offset, len);
+		to_write -= len;
+		len = (to_write > PAGE_SIZE) ? PAGE_SIZE : to_write;
+		offset = 0;
+	}
+
+
+	return tp_len;
+}
+
+static int tpacket_snd(struct packet_sock *po, struct msghdr *msg)
+{
+	struct socket *sock;
+	struct sk_buff *skb;
+	struct net_device *dev;
+	__be16 proto;
+	int ifindex, err, reserve = 0;
+	void * ph;
+	struct sockaddr_ll *saddr=(struct sockaddr_ll *)msg->msg_name;
+	int tp_len, size_max;
+	unsigned char *addr;
+	int len_sum = 0;
+
+	BUG_ON(po == NULL);
+	sock = po->sk.sk_socket;
+
+	if (saddr == NULL) {
+		ifindex	= po->ifindex;
+		proto	= po->num;
+		addr	= NULL;
+	} else {
+		err = -EINVAL;
+		if (msg->msg_namelen < sizeof(struct sockaddr_ll))
+			goto out;
+		if (msg->msg_namelen < (saddr->sll_halen + offsetof(struct sockaddr_ll, sll_addr)))
+			goto out;
+		ifindex	= saddr->sll_ifindex;
+		proto	= saddr->sll_protocol;
+		addr	= saddr->sll_addr;
+	}
+
+	dev = dev_get_by_index(sock_net(&po->sk), ifindex);
+	err = -ENXIO;
+	if (unlikely(dev == NULL))
+		goto out;
+
+	err = -EINVAL;
+	if (unlikely(sock->type != SOCK_RAW))
+		goto out_unlock;
+
+	reserve = dev->hard_header_len;
+
+	err = -ENETDOWN;
+	if (unlikely(!(dev->flags & IFF_UP)))
+		goto out_unlock;
+
+	size_max = po->tx_ring.frame_size - sizeof(struct skb_shared_info)
+		- po->tp_hdrlen - LL_ALLOCATED_SPACE(dev);
+
+	if (size_max > dev->mtu + reserve)
+		size_max = dev->mtu + reserve;
+
+
+	do
+	{
+		ph = packet_current_tx_frame(po, TP_STATUS_USER);
+		if(unlikely(ph == NULL))
+			continue;
+
+		skb = sock_alloc_send_skb(&po->sk, LL_ALLOCATED_SPACE(dev),
+				msg->msg_flags & MSG_DONTWAIT, &err);
+		if (unlikely(skb == NULL))
+			goto out_unlock;
+
+		__packet_set_status(po, ph, TP_STATUS_COPY);
+		atomic_inc(&po->tx_pending_skb);
+
+		tp_len = tpacket_fill_skb(po, skb, ph, dev, size_max, proto,
+				addr);
+		if(unlikely(tp_len < 0)) {
+			err = tp_len;
+			goto out_free;
+		}
+
+		err = dev_queue_xmit(skb);
+		if (unlikely(err > 0 && (err = net_xmit_errno(err)) != 0))
+			goto out_free;
+
+		packet_increment_head(&po->tx_ring);
+		len_sum += tp_len;
+	}
+	while(likely((ph != NULL)
+				|| ((!(msg->msg_flags & MSG_DONTWAIT))
+					&& atomic_read(&po->tx_pending_skb))));
+
+	err = len_sum;
+	goto out_unlock;
+
+out_free:
+	__packet_set_status(po, ph, TP_STATUS_USER);
+	atomic_dec(&po->tx_pending_skb);
+	kfree_skb(skb);
+out_unlock:
+	dev_put(dev);
+out:
+	return err;
+}
+#endif
+
+static int packet_snd(struct socket *sock,
 			  struct msghdr *msg, size_t len)
 {
 	struct sock *sk = sock->sk;
@@ -853,6 +1093,19 @@  out:
 	return err;
 }
 
+static int packet_sendmsg(struct kiocb *iocb, struct socket *sock,
+		struct msghdr *msg, size_t len)
+{
+#ifdef CONFIG_PACKET_MMAP
+	struct sock *sk = sock->sk;
+	struct packet_sock *po = pkt_sk(sk);
+	if (po->tx_ring.pg_vec)
+		return tpacket_snd(po, msg);
+	else
+#endif
+		return packet_snd(sock, msg, len);
+}
+
 /*
  *	Close a PACKET socket. This is fairly simple. We immediately go
  *	to 'closed' state and remove our protocol entry in the device list.
@@ -891,10 +1144,13 @@  static int packet_release(struct socket *sock)
 	packet_flush_mclist(sk);
 
 #ifdef CONFIG_PACKET_MMAP
-	if (po->pg_vec) {
+	{
 		struct tpacket_req req;
 		memset(&req, 0, sizeof(req));
-		packet_set_ring(sk, &req, 1);
+		if (po->rx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 0);
+		if (po->tx_ring.pg_vec)
+			packet_set_ring(sk, &req, 1, 1);
 	}
 #endif
 
@@ -1411,6 +1667,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 #ifdef CONFIG_PACKET_MMAP
 	case PACKET_RX_RING:
+	case PACKET_TX_RING:
 	{
 		struct tpacket_req req;
 
@@ -1418,7 +1675,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 			return -EINVAL;
 		if (copy_from_user(&req,optval,sizeof(req)))
 			return -EFAULT;
-		return packet_set_ring(sk, &req, 0);
+		return packet_set_ring(sk, &req, 0, optname == PACKET_TX_RING);
 	}
 	case PACKET_COPY_THRESH:
 	{
@@ -1438,7 +1695,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1457,7 +1714,7 @@  packet_setsockopt(struct socket *sock, int level, int optname, char __user *optv
 
 		if (optlen != sizeof(val))
 			return -EINVAL;
-		if (po->pg_vec)
+		if (po->rx_ring.pg_vec || po->tx_ring.pg_vec)
 			return -EBUSY;
 		if (copy_from_user(&val, optval, sizeof(val)))
 			return -EFAULT;
@@ -1701,13 +1958,17 @@  static unsigned int packet_poll(struct file * file, struct socket *sock,
 	unsigned int mask = datagram_poll(file, sock, wait);
 
 	spin_lock_bh(&sk->sk_receive_queue.lock);
-	if (po->pg_vec) {
-		unsigned last = po->head ? po->head-1 : po->frame_max;
-
-		if (packet_lookup_frame(po, last, TP_STATUS_USER))
+	if (po->rx_ring.pg_vec) {
+		if (packet_previous_rx_frame(po, TP_STATUS_USER))
 			mask |= POLLIN | POLLRDNORM;
 	}
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
+	spin_lock_bh(&sk->sk_write_queue.lock);
+	if (po->tx_ring.pg_vec) {
+		if (packet_current_tx_frame(po, TP_STATUS_KERNEL))
+			mask |= POLLOUT | POLLWRNORM;
+	}
+	spin_unlock_bh(&sk->sk_write_queue.lock);
 	return mask;
 }
 
@@ -1783,20 +2044,24 @@  out_free_pgvec:
 	goto out;
 }
 
-static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing)
+static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing, int tx_ring)
 {
 	char **pg_vec = NULL;
 	struct packet_sock *po = pkt_sk(sk);
 	int was_running, order = 0;
+	struct packet_ring_buffer *rb;
+	struct sk_buff_head *rb_queue;
 	__be16 num;
 	int err = 0;
 
+	rb = tx_ring ? &po->tx_ring : &po->rx_ring;
+	rb_queue = tx_ring ? &sk->sk_write_queue : &sk->sk_receive_queue;
+
 	if (req->tp_block_nr) {
 		int i;
 
 		/* Sanity tests and some calculations */
-
-		if (unlikely(po->pg_vec))
+		if (unlikely(rb->pg_vec))
 			return -EBUSY;
 
 		switch (po->tp_version) {
@@ -1813,16 +2078,16 @@  static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 		if (unlikely(req->tp_block_size & (PAGE_SIZE - 1)))
 			return -EINVAL;
 		if (unlikely(req->tp_frame_size < po->tp_hdrlen +
-						  po->tp_reserve))
+					po->tp_reserve))
 			return -EINVAL;
 		if (unlikely(req->tp_frame_size & (TPACKET_ALIGNMENT - 1)))
 			return -EINVAL;
 
-		po->frames_per_block = req->tp_block_size/req->tp_frame_size;
-		if (unlikely(po->frames_per_block <= 0))
+		rb->frames_per_block = req->tp_block_size/req->tp_frame_size;
+		if (unlikely(rb->frames_per_block <= 0))
 			return -EINVAL;
-		if (unlikely((po->frames_per_block * req->tp_block_nr) !=
-			     req->tp_frame_nr))
+		if (unlikely((rb->frames_per_block * req->tp_block_nr) !=
+					req->tp_frame_nr))
 			return -EINVAL;
 
 		err = -ENOMEM;
@@ -1835,17 +2100,19 @@  static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 			void *ptr = pg_vec[i];
 			int k;
 
-			for (k = 0; k < po->frames_per_block; k++) {
+			for (k = 0; k < rb->frames_per_block; k++) {
 				__packet_set_status(po, ptr, TP_STATUS_KERNEL);
 				ptr += req->tp_frame_size;
 			}
 		}
-		/* Done */
-	} else {
+	}
+	/* Done */
+	else {
 		if (unlikely(req->tp_frame_nr))
 			return -EINVAL;
 	}
 
+
 	lock_sock(sk);
 
 	/* Detach socket from network */
@@ -1866,20 +2133,19 @@  static int packet_set_ring(struct sock *sk, struct tpacket_req *req, int closing
 	if (closing || atomic_read(&po->mapped) == 0) {
 		err = 0;
 #define XC(a, b) ({ __typeof__ ((a)) __t; __t = (a); (a) = (b); __t; })
-
-		spin_lock_bh(&sk->sk_receive_queue.lock);
-		pg_vec = XC(po->pg_vec, pg_vec);
-		po->frame_max = (req->tp_frame_nr - 1);
-		po->head = 0;
-		po->frame_size = req->tp_frame_size;
-		spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-		order = XC(po->pg_vec_order, order);
-		req->tp_block_nr = XC(po->pg_vec_len, req->tp_block_nr);
-
-		po->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
-		po->prot_hook.func = po->pg_vec ? tpacket_rcv : packet_rcv;
-		skb_queue_purge(&sk->sk_receive_queue);
+		spin_lock_bh(&rb_queue->lock);
+		pg_vec = XC(rb->pg_vec, pg_vec);
+		rb->frame_max = (req->tp_frame_nr - 1);
+		rb->head = 0;
+		rb->frame_size = req->tp_frame_size;
+		spin_unlock_bh(&rb_queue->lock);
+
+		order = XC(rb->pg_vec_order, order);
+		req->tp_block_nr = XC(rb->pg_vec_len, req->tp_block_nr);
+
+		rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
+		po->prot_hook.func = (po->rx_ring.pg_vec) ? tpacket_rcv : packet_rcv;
+		skb_queue_purge(rb_queue);
 #undef XC
 		if (atomic_read(&po->mapped))
 			printk(KERN_DEBUG "packet_mmap: vma is busy: %d\n", atomic_read(&po->mapped));
@@ -1906,7 +2172,8 @@  static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 {
 	struct sock *sk = sock->sk;
 	struct packet_sock *po = pkt_sk(sk);
-	unsigned long size;
+	unsigned long size, expected_size;
+	struct packet_ring_buffer *rb;
 	unsigned long start;
 	int err = -EINVAL;
 	int i;
@@ -1917,23 +2184,38 @@  static int packet_mmap(struct file *file, struct socket *sock, struct vm_area_st
 	size = vma->vm_end - vma->vm_start;
 
 	lock_sock(sk);
-	if (po->pg_vec == NULL)
+
+	expected_size = 0;
+	if (po->rx_ring.pg_vec)
+		expected_size += po->rx_ring.pg_vec_len * po->rx_ring.pg_vec_pages * PAGE_SIZE;
+	if (po->tx_ring.pg_vec)
+		expected_size += po->tx_ring.pg_vec_len * po->tx_ring.pg_vec_pages * PAGE_SIZE;
+
+	if (expected_size == 0)
 		goto out;
-	if (size != po->pg_vec_len*po->pg_vec_pages*PAGE_SIZE)
+
+	if (size != expected_size)
 		goto out;
 
 	start = vma->vm_start;
-	for (i = 0; i < po->pg_vec_len; i++) {
-		struct page *page = virt_to_page(po->pg_vec[i]);
-		int pg_num;
-
-		for (pg_num = 0; pg_num < po->pg_vec_pages; pg_num++, page++) {
-			err = vm_insert_page(vma, start, page);
-			if (unlikely(err))
-				goto out;
-			start += PAGE_SIZE;
+
+	for(rb = &po->rx_ring; rb <= &po->tx_ring; rb++) {
+		if (rb->pg_vec == NULL)
+			continue;
+
+		for (i = 0; i < rb->pg_vec_len; i++) {
+			struct page *page = virt_to_page(rb->pg_vec[i]);
+			int pg_num;
+
+			for (pg_num = 0; pg_num < rb->pg_vec_pages; pg_num++, page++) {
+				err = vm_insert_page(vma, start, page);
+				if (unlikely(err))
+					goto out;
+				start += PAGE_SIZE;
+			}
 		}
 	}
+
 	atomic_inc(&po->mapped);
 	vma->vm_ops = &packet_mmap_ops;
 	err = 0;