diff mbox

[ver2] Avoid enqueuing skb for default qdiscs

Message ID 20090806114421.19208.39374.sendpatchset@localhost.localdomain
State Accepted, archived
Delegated to: David Miller
Headers show

Commit Message

Krishna Kumar Aug. 6, 2009, 11:44 a.m. UTC
From: Krishna Kumar <krkumar2@in.ibm.com>

dev_queue_xmit enqueue's a skb and calls qdisc_run which
dequeue's the skb and xmits it. In most cases, the skb that
is enqueue'd is the same one that is dequeue'd (unless the
queue gets stopped or multiple cpu's write to the same queue
and ends in a race with qdisc_run). For default qdiscs, we
can remove the redundant enqueue/dequeue and simply xmit the
skb since the default qdisc is work-conserving.

The patch uses a new flag - TCQ_F_CAN_BYPASS to identify the
default fast queue. The controversial part of the patch is
incrementing qlen when a skb is requeued - this is to avoid
checks like the second line below:

+  } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
>>         !q->gso_skb &&
+          !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {

Results of a 2 hour testing for multiple netperf sessions (1,
2, 4, 8, 12 sessions on a 4 cpu system-X). The BW numbers are
aggregate Mb/s across iterations tested with this version on
System-X boxes with Chelsio 10gbps cards:

----------------------------------
Size |  ORG BW          NEW BW   |
----------------------------------
128K |  156964          159381   |
256K |  158650          162042   |
----------------------------------

Changes from ver1:

1. Move sch_direct_xmit declaration from sch_generic.h to
   pkt_sched.h
2. Update qdisc basic statistics for direct xmit path.
3. Set qlen to zero in qdisc_reset.
4. Changed some function names to more meaningful ones.

Thanks,

- KK

Signed-off-by: Krishna Kumar <krkumar2@in.ibm.com>
---
 include/net/pkt_sched.h   |    3 +
 include/net/sch_generic.h |   15 +++++
 net/core/dev.c            |   48 +++++++++++++-----
 net/sched/sch_generic.c   |   93 ++++++++++++++++++++++--------------
 4 files changed, 108 insertions(+), 51 deletions(-)

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

David Miller Aug. 6, 2009, 8:41 p.m. UTC | #1
From: Krishna Kumar <krkumar2@in.ibm.com>
Date: Thu, 06 Aug 2009 17:14:21 +0530

> -
>  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
>  {
>  	q->gso_skb = skb;
>  	q->qstats.requeues++;
> +	q->q.qlen++;	/* it's still part of the queue */
>  	__netif_schedule(q);
>  
>  	return 0;

I'm dubious about this new qlen accounting scheme, does it handle
the non-bypass case properly?  Can you explain how it works
exactly?
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
stephen hemminger Aug. 6, 2009, 8:49 p.m. UTC | #2
On Thu, 06 Aug 2009 17:14:21 +0530
Krishna Kumar <krkumar2@in.ibm.com> wrote:

> +static inline int qdisc_qlen(struct Qdisc *q)
> +{
> +	return q->q.qlen;
> +}
> +

Why?
This kind of oneline accessor decreases the code clarity.
Krishna Kumar Aug. 7, 2009, 2:56 a.m. UTC | #3
Hi Dave,

David Miller <davem@davemloft.net> wrote on 08/07/2009 02:11:26 AM:

> >  static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc
*q)
> >  {
> >     q->gso_skb = skb;
> >     q->qstats.requeues++;
> > +   q->q.qlen++;   /* it's still part of the queue */
> >     __netif_schedule(q);
> >
> >     return 0;
>
> I'm dubious about this new qlen accounting scheme, does it handle
> the non-bypass case properly?  Can you explain how it works
> exactly?

Adding one to qlen will handle non-bypass correctly. On the next
non-bypass (or a bypass) xmit, dequeue_skb will remove gso_skb and
decrement the counter. If no more xmits are done, qdisc_reset will
also fix it. I got this idea by extending one of Jarek's patches:

commit 61c9eaf90081cbe6dc4f389e0056bff76eca19ec
Author: Jarek Poplawski <jarkao2@gmail.com>
Date:   Wed Nov 5 16:02:34 2008 -0800
    pkt_sched: Fix qdisc len in qdisc_peek_dequeued()

Thanks,

- KK

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Krishna Kumar Aug. 7, 2009, 3:01 a.m. UTC | #4
Hi Stephen,

Stephen Hemminger <shemminger@vyatta.com> wrote on 08/07/2009 02:19:46 AM:

> > +static inline int qdisc_qlen(struct Qdisc *q)
> > +{
> > +   return q->q.qlen;
> > +}
> > +
>
> Why?
> This kind of oneline accessor decreases the code clarity.

qdisc_qlen() was previously defined in sch_generic.c. I moved it
to the header file so that it could be accessed in dev.c (where
there is no code using q.qlen), so it is not new code.

Thanks,

- KK

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
David Miller Aug. 7, 2009, 3:08 a.m. UTC | #5
From: Krishna Kumar2 <krkumar2@in.ibm.com>
Date: Fri, 7 Aug 2009 08:26:04 +0530

> David Miller <davem@davemloft.net> wrote on 08/07/2009 02:11:26 AM:
> 
>> I'm dubious about this new qlen accounting scheme, does it handle
>> the non-bypass case properly?  Can you explain how it works
>> exactly?
> 
> Adding one to qlen will handle non-bypass correctly. On the next
> non-bypass (or a bypass) xmit, dequeue_skb will remove gso_skb and
> decrement the counter. If no more xmits are done, qdisc_reset will
> also fix it. I got this idea by extending one of Jarek's patches:

Aha, I see.  Thanks for the explanation.

Ok, I'll let this rip... let's see what falls out of it :-)
Will apply this to net-next-2.6 right now.
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Maxime Bizon Feb. 1, 2010, 9:52 a.m. UTC | #6
On Thu, 2009-08-06 at 17:14 +0530, Krishna Kumar wrote:

Hello,

> From: Krishna Kumar <krkumar2@in.ibm.com>
> 
> dev_queue_xmit enqueue's a skb and calls qdisc_run which
> dequeue's the skb and xmits it. In most cases, the skb that
> is enqueue'd is the same one that is dequeue'd (unless the
> queue gets stopped or multiple cpu's write to the same queue
> and ends in a race with qdisc_run). For default qdiscs, we
> can remove the redundant enqueue/dequeue and simply xmit the
> skb since the default qdisc is work-conserving.

Sorry old stuff, but I just noticed this.

One side effect of this patch is that tc now shows a lot of requeue for
slow interfaces (slow dsl link in my case), because sch_direct_xmit is
called even if device stopped its queue.

My first reflex was too investigate for a misbehaving driver (returning
TX_BUSY), but start_xmit is not even called, sch_direct_xmit notices
that queue is stopped, and just does a dequeue/requeue.

Since we're talking about slow interfaces, this has no impact on
performance, the requeue counter incrementing is just a bit scary.

Shouldn't we check for stopped queue earlier to avoid this ?

Regards,
Krishna Kumar Feb. 1, 2010, 1:42 p.m. UTC | #7
Hi Maxime,

Maxime Bizon <mbizon@freebox.fr> wrote on 02/01/2010 03:22:20 PM:

> Sorry old stuff, but I just noticed this.
>
> One side effect of this patch is that tc now shows a lot of requeue for
> slow interfaces (slow dsl link in my case), because sch_direct_xmit is
> called even if device stopped its queue.

sch_direct_xmit can be called from dev_queue_xmit for a
stopped device only if the device had xmit the previous
skb, stopped the device and returned OK. Then the next
sch_d_x will dequeue and requeue the new skb. But from
now on, sch_d_x is never called from dev_queue_xmit
since qdisc_qlen(q) > 0. Instead dev_queue_xmit enqueue's
the skb and calls qdisc_restart, which calls dequeue_skb.
dequeue_skb finds there is an earlier gso_skb but since
the device is stopped, it returns NULL. Hence there
should be only one requeue (and dequeue) for every stop.

BTW, I don't think this patch would change the earlier
behavior. The old code would have done the same thing.

> My first reflex was too investigate for a misbehaving driver (returning
> TX_BUSY), but start_xmit is not even called, sch_direct_xmit notices
> that queue is stopped, and just does a dequeue/requeue.

As I explained, this should happen only once per stop event.
Could you tell which driver is having this problem? Is it
waking up too early, eg, it might be stopping when the hw tx
descriptor is full but waking up when a few slots open up,
and those will get filled up immediately on fast systems.
Then you will see a lot of requeue's.

> Since we're talking about slow interfaces, this has no impact on
> performance, the requeue counter incrementing is just a bit scary.
>
> Shouldn't we check for stopped queue earlier to avoid this ?

dev_queue_xmit is checking for qdisc_qlen == 0 before calling
sch_direct_xmit, which is enough for all stopped queue except
the first time. Also, it's not possible to check for stopped
Q earlier in qdisc_restart() due to multiqueue implementation.

Please let me know if you find something different from what
I have described.

thanks,

- KK

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Maxime Bizon Feb. 1, 2010, 2:16 p.m. UTC | #8
On Mon, 2010-02-01 at 19:12 +0530, Krishna Kumar2 wrote:

Hi,

> sch_direct_xmit can be called from dev_queue_xmit for a
> stopped device only if the device had xmit the previous
> skb, stopped the device and returned OK. Then the next

Yes, that's what happen in my case.

> BTW, I don't think this patch would change the earlier
> behavior. The old code would have done the same thing.

Oh, my mistake then. I always thought the requeue counter was indicating
the driver returned TX_BUSY (and failed to stop the queue before), which
is not polite.

If the current behavior is the expected one, then there is no problem
here.

> As I explained, this should happen only once per stop event.
> Could you tell which driver is having this problem? Is it
> waking up too early, eg, it might be stopping when the hw tx
> descriptor is full but waking up when a few slots open up,
> and those will get filled up immediately on fast systems.
> Then you will see a lot of requeue's.

This is an atm driver which is not mainlined. The system is not that
fast, but the link is (very) slow (< 1 Mbit/s).

The driver is waking up the queue once the hardware queue is not full
anymore (at least one free tx desc), and the free slot gets filled
immediately since the link is slow.

Thanks for explaining !
diff mbox

Patch

diff -ruNp org2/include/net/pkt_sched.h new6/include/net/pkt_sched.h
--- org2/include/net/pkt_sched.h	2009-08-04 11:41:21.000000000 +0530
+++ new6/include/net/pkt_sched.h	2009-08-05 22:24:41.000000000 +0530
@@ -87,6 +87,9 @@  extern struct qdisc_rate_table *qdisc_ge
 extern void qdisc_put_rtab(struct qdisc_rate_table *tab);
 extern void qdisc_put_stab(struct qdisc_size_table *tab);
 extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc);
+extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+			   struct net_device *dev, struct netdev_queue *txq,
+			   spinlock_t *root_lock);
 
 extern void __qdisc_run(struct Qdisc *q);
 
diff -ruNp org2/include/net/sch_generic.h new6/include/net/sch_generic.h
--- org2/include/net/sch_generic.h	2009-03-24 08:54:16.000000000 +0530
+++ new6/include/net/sch_generic.h	2009-08-05 22:26:40.000000000 +0530
@@ -45,6 +45,7 @@  struct Qdisc
 #define TCQ_F_BUILTIN		1
 #define TCQ_F_THROTTLED		2
 #define TCQ_F_INGRESS		4
+#define TCQ_F_CAN_BYPASS	8
 #define TCQ_F_WARN_NONWC	(1 << 16)
 	int			padded;
 	struct Qdisc_ops	*ops;
@@ -182,6 +183,11 @@  struct qdisc_skb_cb {
 	char			data[];
 };
 
+static inline int qdisc_qlen(struct Qdisc *q)
+{
+	return q->q.qlen;
+}
+
 static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb)
 {
 	return (struct qdisc_skb_cb *)skb->cb;
@@ -387,13 +393,18 @@  static inline int qdisc_enqueue_root(str
 	return qdisc_enqueue(skb, sch) & NET_XMIT_MASK;
 }
 
+static inline void __qdisc_update_bstats(struct Qdisc *sch, unsigned int len)
+{
+	sch->bstats.bytes += len;
+	sch->bstats.packets++;
+}
+
 static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch,
 				       struct sk_buff_head *list)
 {
 	__skb_queue_tail(list, skb);
 	sch->qstats.backlog += qdisc_pkt_len(skb);
-	sch->bstats.bytes += qdisc_pkt_len(skb);
-	sch->bstats.packets++;
+	__qdisc_update_bstats(sch, qdisc_pkt_len(skb));
 
 	return NET_XMIT_SUCCESS;
 }
diff -ruNp org2/net/core/dev.c new6/net/core/dev.c
--- org2/net/core/dev.c	2009-07-27 09:08:24.000000000 +0530
+++ new6/net/core/dev.c	2009-08-06 11:34:21.000000000 +0530
@@ -1786,6 +1786,40 @@  static struct netdev_queue *dev_pick_tx(
 	return netdev_get_tx_queue(dev, queue_index);
 }
 
+static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
+				 struct net_device *dev,
+				 struct netdev_queue *txq)
+{
+	spinlock_t *root_lock = qdisc_lock(q);
+	int rc;
+
+	spin_lock(root_lock);
+	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
+		kfree_skb(skb);
+		rc = NET_XMIT_DROP;
+	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
+		   !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
+		/*
+		 * This is a work-conserving queue; there are no old skbs
+		 * waiting to be sent out; and the qdisc is not running -
+		 * xmit the skb directly.
+		 */
+		__qdisc_update_bstats(q, skb->len);
+		if (sch_direct_xmit(skb, q, dev, txq, root_lock))
+			__qdisc_run(q);
+		else
+			clear_bit(__QDISC_STATE_RUNNING, &q->state);
+
+		rc = NET_XMIT_SUCCESS;
+	} else {
+		rc = qdisc_enqueue_root(skb, q);
+		qdisc_run(q);
+	}
+	spin_unlock(root_lock);
+
+	return rc;
+}
+
 /**
  *	dev_queue_xmit - transmit a buffer
  *	@skb: buffer to transmit
@@ -1859,19 +1893,7 @@  gso:
 	skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
 #endif
 	if (q->enqueue) {
-		spinlock_t *root_lock = qdisc_lock(q);
-
-		spin_lock(root_lock);
-
-		if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
-			kfree_skb(skb);
-			rc = NET_XMIT_DROP;
-		} else {
-			rc = qdisc_enqueue_root(skb, q);
-			qdisc_run(q);
-		}
-		spin_unlock(root_lock);
-
+		rc = __dev_xmit_skb(skb, q, dev, txq);
 		goto out;
 	}
 
diff -ruNp org2/net/sched/sch_generic.c new6/net/sched/sch_generic.c
--- org2/net/sched/sch_generic.c	2009-05-25 07:48:07.000000000 +0530
+++ new6/net/sched/sch_generic.c	2009-08-05 22:24:41.000000000 +0530
@@ -37,15 +37,11 @@ 
  * - updates to tree and tree walking are only done under the rtnl mutex.
  */
 
-static inline int qdisc_qlen(struct Qdisc *q)
-{
-	return q->q.qlen;
-}
-
 static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
 {
 	q->gso_skb = skb;
 	q->qstats.requeues++;
+	q->q.qlen++;	/* it's still part of the queue */
 	__netif_schedule(q);
 
 	return 0;
@@ -61,9 +57,11 @@  static inline struct sk_buff *dequeue_sk
 
 		/* check the reason of requeuing without tx lock first */
 		txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-		if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq))
+		if (!netif_tx_queue_stopped(txq) &&
+		    !netif_tx_queue_frozen(txq)) {
 			q->gso_skb = NULL;
-		else
+			q->q.qlen--;
+		} else
 			skb = NULL;
 	} else {
 		skb = q->dequeue(q);
@@ -103,44 +101,23 @@  static inline int handle_dev_cpu_collisi
 }
 
 /*
- * NOTE: Called under qdisc_lock(q) with locally disabled BH.
- *
- * __QDISC_STATE_RUNNING guarantees only one CPU can process
- * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
- * this queue.
- *
- *  netif_tx_lock serializes accesses to device driver.
- *
- *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
- *  if one is grabbed, another must be free.
- *
- * Note, that this procedure can be called by a watchdog timer
+ * Transmit one skb, and handle the return status as required. Holding the
+ * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this
+ * function.
  *
  * Returns to the caller:
  *				0  - queue is empty or throttled.
  *				>0 - queue is not empty.
- *
  */
-static inline int qdisc_restart(struct Qdisc *q)
+int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
+		    struct net_device *dev, struct netdev_queue *txq,
+		    spinlock_t *root_lock)
 {
-	struct netdev_queue *txq;
 	int ret = NETDEV_TX_BUSY;
-	struct net_device *dev;
-	spinlock_t *root_lock;
-	struct sk_buff *skb;
-
-	/* Dequeue packet */
-	if (unlikely((skb = dequeue_skb(q)) == NULL))
-		return 0;
-
-	root_lock = qdisc_lock(q);
 
 	/* And release qdisc */
 	spin_unlock(root_lock);
 
-	dev = qdisc_dev(q);
-	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
-
 	HARD_TX_LOCK(dev, txq, smp_processor_id());
 	if (!netif_tx_queue_stopped(txq) &&
 	    !netif_tx_queue_frozen(txq))
@@ -177,6 +154,44 @@  static inline int qdisc_restart(struct Q
 	return ret;
 }
 
+/*
+ * NOTE: Called under qdisc_lock(q) with locally disabled BH.
+ *
+ * __QDISC_STATE_RUNNING guarantees only one CPU can process
+ * this qdisc at a time. qdisc_lock(q) serializes queue accesses for
+ * this queue.
+ *
+ *  netif_tx_lock serializes accesses to device driver.
+ *
+ *  qdisc_lock(q) and netif_tx_lock are mutually exclusive,
+ *  if one is grabbed, another must be free.
+ *
+ * Note, that this procedure can be called by a watchdog timer
+ *
+ * Returns to the caller:
+ *				0  - queue is empty or throttled.
+ *				>0 - queue is not empty.
+ *
+ */
+static inline int qdisc_restart(struct Qdisc *q)
+{
+	struct netdev_queue *txq;
+	struct net_device *dev;
+	spinlock_t *root_lock;
+	struct sk_buff *skb;
+
+	/* Dequeue packet */
+	skb = dequeue_skb(q);
+	if (unlikely(!skb))
+		return 0;
+
+	root_lock = qdisc_lock(q);
+	dev = qdisc_dev(q);
+	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
+
+	return sch_direct_xmit(skb, q, dev, txq, root_lock);
+}
+
 void __qdisc_run(struct Qdisc *q)
 {
 	unsigned long start_time = jiffies;
@@ -547,8 +562,11 @@  void qdisc_reset(struct Qdisc *qdisc)
 	if (ops->reset)
 		ops->reset(qdisc);
 
-	kfree_skb(qdisc->gso_skb);
-	qdisc->gso_skb = NULL;
+	if (qdisc->gso_skb) {
+		kfree_skb(qdisc->gso_skb);
+		qdisc->gso_skb = NULL;
+		qdisc->q.qlen = 0;
+	}
 }
 EXPORT_SYMBOL(qdisc_reset);
 
@@ -605,6 +623,9 @@  static void attach_one_default_qdisc(str
 			printk(KERN_INFO "%s: activation failed\n", dev->name);
 			return;
 		}
+
+		/* Can by-pass the queue discipline for default qdisc */
+		qdisc->flags |= TCQ_F_CAN_BYPASS;
 	} else {
 		qdisc =  &noqueue_qdisc;
 	}