Message ID | 20090806114421.19208.39374.sendpatchset@localhost.localdomain |
---|---|
State | Accepted, archived |
Delegated to: | David Miller |
Headers | show |
From: Krishna Kumar <krkumar2@in.ibm.com> Date: Thu, 06 Aug 2009 17:14:21 +0530 > - > static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q) > { > q->gso_skb = skb; > q->qstats.requeues++; > + q->q.qlen++; /* it's still part of the queue */ > __netif_schedule(q); > > return 0; I'm dubious about this new qlen accounting scheme, does it handle the non-bypass case properly? Can you explain how it works exactly? -- To unsubscribe from this list: send the line "unsubscribe netdev" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Thu, 06 Aug 2009 17:14:21 +0530 Krishna Kumar <krkumar2@in.ibm.com> wrote: > +static inline int qdisc_qlen(struct Qdisc *q) > +{ > + return q->q.qlen; > +} > + Why? This kind of oneline accessor decreases the code clarity.
Hi Dave, David Miller <davem@davemloft.net> wrote on 08/07/2009 02:11:26 AM: > > static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q) > > { > > q->gso_skb = skb; > > q->qstats.requeues++; > > + q->q.qlen++; /* it's still part of the queue */ > > __netif_schedule(q); > > > > return 0; > > I'm dubious about this new qlen accounting scheme, does it handle > the non-bypass case properly? Can you explain how it works > exactly? Adding one to qlen will handle non-bypass correctly. On the next non-bypass (or a bypass) xmit, dequeue_skb will remove gso_skb and decrement the counter. If no more xmits are done, qdisc_reset will also fix it. I got this idea by extending one of Jarek's patches: commit 61c9eaf90081cbe6dc4f389e0056bff76eca19ec Author: Jarek Poplawski <jarkao2@gmail.com> Date: Wed Nov 5 16:02:34 2008 -0800 pkt_sched: Fix qdisc len in qdisc_peek_dequeued() Thanks, - KK -- To unsubscribe from this list: send the line "unsubscribe netdev" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
Hi Stephen, Stephen Hemminger <shemminger@vyatta.com> wrote on 08/07/2009 02:19:46 AM: > > +static inline int qdisc_qlen(struct Qdisc *q) > > +{ > > + return q->q.qlen; > > +} > > + > > Why? > This kind of oneline accessor decreases the code clarity. qdisc_qlen() was previously defined in sch_generic.c. I moved it to the header file so that it could be accessed in dev.c (where there is no code using q.qlen), so it is not new code. Thanks, - KK -- To unsubscribe from this list: send the line "unsubscribe netdev" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
From: Krishna Kumar2 <krkumar2@in.ibm.com> Date: Fri, 7 Aug 2009 08:26:04 +0530 > David Miller <davem@davemloft.net> wrote on 08/07/2009 02:11:26 AM: > >> I'm dubious about this new qlen accounting scheme, does it handle >> the non-bypass case properly? Can you explain how it works >> exactly? > > Adding one to qlen will handle non-bypass correctly. On the next > non-bypass (or a bypass) xmit, dequeue_skb will remove gso_skb and > decrement the counter. If no more xmits are done, qdisc_reset will > also fix it. I got this idea by extending one of Jarek's patches: Aha, I see. Thanks for the explanation. Ok, I'll let this rip... let's see what falls out of it :-) Will apply this to net-next-2.6 right now. -- To unsubscribe from this list: send the line "unsubscribe netdev" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Thu, 2009-08-06 at 17:14 +0530, Krishna Kumar wrote: Hello, > From: Krishna Kumar <krkumar2@in.ibm.com> > > dev_queue_xmit enqueue's a skb and calls qdisc_run which > dequeue's the skb and xmits it. In most cases, the skb that > is enqueue'd is the same one that is dequeue'd (unless the > queue gets stopped or multiple cpu's write to the same queue > and ends in a race with qdisc_run). For default qdiscs, we > can remove the redundant enqueue/dequeue and simply xmit the > skb since the default qdisc is work-conserving. Sorry old stuff, but I just noticed this. One side effect of this patch is that tc now shows a lot of requeue for slow interfaces (slow dsl link in my case), because sch_direct_xmit is called even if device stopped its queue. My first reflex was too investigate for a misbehaving driver (returning TX_BUSY), but start_xmit is not even called, sch_direct_xmit notices that queue is stopped, and just does a dequeue/requeue. Since we're talking about slow interfaces, this has no impact on performance, the requeue counter incrementing is just a bit scary. Shouldn't we check for stopped queue earlier to avoid this ? Regards,
Hi Maxime, Maxime Bizon <mbizon@freebox.fr> wrote on 02/01/2010 03:22:20 PM: > Sorry old stuff, but I just noticed this. > > One side effect of this patch is that tc now shows a lot of requeue for > slow interfaces (slow dsl link in my case), because sch_direct_xmit is > called even if device stopped its queue. sch_direct_xmit can be called from dev_queue_xmit for a stopped device only if the device had xmit the previous skb, stopped the device and returned OK. Then the next sch_d_x will dequeue and requeue the new skb. But from now on, sch_d_x is never called from dev_queue_xmit since qdisc_qlen(q) > 0. Instead dev_queue_xmit enqueue's the skb and calls qdisc_restart, which calls dequeue_skb. dequeue_skb finds there is an earlier gso_skb but since the device is stopped, it returns NULL. Hence there should be only one requeue (and dequeue) for every stop. BTW, I don't think this patch would change the earlier behavior. The old code would have done the same thing. > My first reflex was too investigate for a misbehaving driver (returning > TX_BUSY), but start_xmit is not even called, sch_direct_xmit notices > that queue is stopped, and just does a dequeue/requeue. As I explained, this should happen only once per stop event. Could you tell which driver is having this problem? Is it waking up too early, eg, it might be stopping when the hw tx descriptor is full but waking up when a few slots open up, and those will get filled up immediately on fast systems. Then you will see a lot of requeue's. > Since we're talking about slow interfaces, this has no impact on > performance, the requeue counter incrementing is just a bit scary. > > Shouldn't we check for stopped queue earlier to avoid this ? dev_queue_xmit is checking for qdisc_qlen == 0 before calling sch_direct_xmit, which is enough for all stopped queue except the first time. Also, it's not possible to check for stopped Q earlier in qdisc_restart() due to multiqueue implementation. Please let me know if you find something different from what I have described. thanks, - KK -- To unsubscribe from this list: send the line "unsubscribe netdev" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
On Mon, 2010-02-01 at 19:12 +0530, Krishna Kumar2 wrote: Hi, > sch_direct_xmit can be called from dev_queue_xmit for a > stopped device only if the device had xmit the previous > skb, stopped the device and returned OK. Then the next Yes, that's what happen in my case. > BTW, I don't think this patch would change the earlier > behavior. The old code would have done the same thing. Oh, my mistake then. I always thought the requeue counter was indicating the driver returned TX_BUSY (and failed to stop the queue before), which is not polite. If the current behavior is the expected one, then there is no problem here. > As I explained, this should happen only once per stop event. > Could you tell which driver is having this problem? Is it > waking up too early, eg, it might be stopping when the hw tx > descriptor is full but waking up when a few slots open up, > and those will get filled up immediately on fast systems. > Then you will see a lot of requeue's. This is an atm driver which is not mainlined. The system is not that fast, but the link is (very) slow (< 1 Mbit/s). The driver is waking up the queue once the hardware queue is not full anymore (at least one free tx desc), and the free slot gets filled immediately since the link is slow. Thanks for explaining !
diff -ruNp org2/include/net/pkt_sched.h new6/include/net/pkt_sched.h --- org2/include/net/pkt_sched.h 2009-08-04 11:41:21.000000000 +0530 +++ new6/include/net/pkt_sched.h 2009-08-05 22:24:41.000000000 +0530 @@ -87,6 +87,9 @@ extern struct qdisc_rate_table *qdisc_ge extern void qdisc_put_rtab(struct qdisc_rate_table *tab); extern void qdisc_put_stab(struct qdisc_size_table *tab); extern void qdisc_warn_nonwc(char *txt, struct Qdisc *qdisc); +extern int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, + struct net_device *dev, struct netdev_queue *txq, + spinlock_t *root_lock); extern void __qdisc_run(struct Qdisc *q); diff -ruNp org2/include/net/sch_generic.h new6/include/net/sch_generic.h --- org2/include/net/sch_generic.h 2009-03-24 08:54:16.000000000 +0530 +++ new6/include/net/sch_generic.h 2009-08-05 22:26:40.000000000 +0530 @@ -45,6 +45,7 @@ struct Qdisc #define TCQ_F_BUILTIN 1 #define TCQ_F_THROTTLED 2 #define TCQ_F_INGRESS 4 +#define TCQ_F_CAN_BYPASS 8 #define TCQ_F_WARN_NONWC (1 << 16) int padded; struct Qdisc_ops *ops; @@ -182,6 +183,11 @@ struct qdisc_skb_cb { char data[]; }; +static inline int qdisc_qlen(struct Qdisc *q) +{ + return q->q.qlen; +} + static inline struct qdisc_skb_cb *qdisc_skb_cb(struct sk_buff *skb) { return (struct qdisc_skb_cb *)skb->cb; @@ -387,13 +393,18 @@ static inline int qdisc_enqueue_root(str return qdisc_enqueue(skb, sch) & NET_XMIT_MASK; } +static inline void __qdisc_update_bstats(struct Qdisc *sch, unsigned int len) +{ + sch->bstats.bytes += len; + sch->bstats.packets++; +} + static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch, struct sk_buff_head *list) { __skb_queue_tail(list, skb); sch->qstats.backlog += qdisc_pkt_len(skb); - sch->bstats.bytes += qdisc_pkt_len(skb); - sch->bstats.packets++; + __qdisc_update_bstats(sch, qdisc_pkt_len(skb)); return NET_XMIT_SUCCESS; } diff -ruNp org2/net/core/dev.c new6/net/core/dev.c --- org2/net/core/dev.c 2009-07-27 09:08:24.000000000 +0530 +++ new6/net/core/dev.c 2009-08-06 11:34:21.000000000 +0530 @@ -1786,6 +1786,40 @@ static struct netdev_queue *dev_pick_tx( return netdev_get_tx_queue(dev, queue_index); } +static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, + struct net_device *dev, + struct netdev_queue *txq) +{ + spinlock_t *root_lock = qdisc_lock(q); + int rc; + + spin_lock(root_lock); + if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) { + kfree_skb(skb); + rc = NET_XMIT_DROP; + } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) && + !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) { + /* + * This is a work-conserving queue; there are no old skbs + * waiting to be sent out; and the qdisc is not running - + * xmit the skb directly. + */ + __qdisc_update_bstats(q, skb->len); + if (sch_direct_xmit(skb, q, dev, txq, root_lock)) + __qdisc_run(q); + else + clear_bit(__QDISC_STATE_RUNNING, &q->state); + + rc = NET_XMIT_SUCCESS; + } else { + rc = qdisc_enqueue_root(skb, q); + qdisc_run(q); + } + spin_unlock(root_lock); + + return rc; +} + /** * dev_queue_xmit - transmit a buffer * @skb: buffer to transmit @@ -1859,19 +1893,7 @@ gso: skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS); #endif if (q->enqueue) { - spinlock_t *root_lock = qdisc_lock(q); - - spin_lock(root_lock); - - if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) { - kfree_skb(skb); - rc = NET_XMIT_DROP; - } else { - rc = qdisc_enqueue_root(skb, q); - qdisc_run(q); - } - spin_unlock(root_lock); - + rc = __dev_xmit_skb(skb, q, dev, txq); goto out; } diff -ruNp org2/net/sched/sch_generic.c new6/net/sched/sch_generic.c --- org2/net/sched/sch_generic.c 2009-05-25 07:48:07.000000000 +0530 +++ new6/net/sched/sch_generic.c 2009-08-05 22:24:41.000000000 +0530 @@ -37,15 +37,11 @@ * - updates to tree and tree walking are only done under the rtnl mutex. */ -static inline int qdisc_qlen(struct Qdisc *q) -{ - return q->q.qlen; -} - static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q) { q->gso_skb = skb; q->qstats.requeues++; + q->q.qlen++; /* it's still part of the queue */ __netif_schedule(q); return 0; @@ -61,9 +57,11 @@ static inline struct sk_buff *dequeue_sk /* check the reason of requeuing without tx lock first */ txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb)); - if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq)) + if (!netif_tx_queue_stopped(txq) && + !netif_tx_queue_frozen(txq)) { q->gso_skb = NULL; - else + q->q.qlen--; + } else skb = NULL; } else { skb = q->dequeue(q); @@ -103,44 +101,23 @@ static inline int handle_dev_cpu_collisi } /* - * NOTE: Called under qdisc_lock(q) with locally disabled BH. - * - * __QDISC_STATE_RUNNING guarantees only one CPU can process - * this qdisc at a time. qdisc_lock(q) serializes queue accesses for - * this queue. - * - * netif_tx_lock serializes accesses to device driver. - * - * qdisc_lock(q) and netif_tx_lock are mutually exclusive, - * if one is grabbed, another must be free. - * - * Note, that this procedure can be called by a watchdog timer + * Transmit one skb, and handle the return status as required. Holding the + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this + * function. * * Returns to the caller: * 0 - queue is empty or throttled. * >0 - queue is not empty. - * */ -static inline int qdisc_restart(struct Qdisc *q) +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, + struct net_device *dev, struct netdev_queue *txq, + spinlock_t *root_lock) { - struct netdev_queue *txq; int ret = NETDEV_TX_BUSY; - struct net_device *dev; - spinlock_t *root_lock; - struct sk_buff *skb; - - /* Dequeue packet */ - if (unlikely((skb = dequeue_skb(q)) == NULL)) - return 0; - - root_lock = qdisc_lock(q); /* And release qdisc */ spin_unlock(root_lock); - dev = qdisc_dev(q); - txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb)); - HARD_TX_LOCK(dev, txq, smp_processor_id()); if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq)) @@ -177,6 +154,44 @@ static inline int qdisc_restart(struct Q return ret; } +/* + * NOTE: Called under qdisc_lock(q) with locally disabled BH. + * + * __QDISC_STATE_RUNNING guarantees only one CPU can process + * this qdisc at a time. qdisc_lock(q) serializes queue accesses for + * this queue. + * + * netif_tx_lock serializes accesses to device driver. + * + * qdisc_lock(q) and netif_tx_lock are mutually exclusive, + * if one is grabbed, another must be free. + * + * Note, that this procedure can be called by a watchdog timer + * + * Returns to the caller: + * 0 - queue is empty or throttled. + * >0 - queue is not empty. + * + */ +static inline int qdisc_restart(struct Qdisc *q) +{ + struct netdev_queue *txq; + struct net_device *dev; + spinlock_t *root_lock; + struct sk_buff *skb; + + /* Dequeue packet */ + skb = dequeue_skb(q); + if (unlikely(!skb)) + return 0; + + root_lock = qdisc_lock(q); + dev = qdisc_dev(q); + txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb)); + + return sch_direct_xmit(skb, q, dev, txq, root_lock); +} + void __qdisc_run(struct Qdisc *q) { unsigned long start_time = jiffies; @@ -547,8 +562,11 @@ void qdisc_reset(struct Qdisc *qdisc) if (ops->reset) ops->reset(qdisc); - kfree_skb(qdisc->gso_skb); - qdisc->gso_skb = NULL; + if (qdisc->gso_skb) { + kfree_skb(qdisc->gso_skb); + qdisc->gso_skb = NULL; + qdisc->q.qlen = 0; + } } EXPORT_SYMBOL(qdisc_reset); @@ -605,6 +623,9 @@ static void attach_one_default_qdisc(str printk(KERN_INFO "%s: activation failed\n", dev->name); return; } + + /* Can by-pass the queue discipline for default qdisc */ + qdisc->flags |= TCQ_F_CAN_BYPASS; } else { qdisc = &noqueue_qdisc; }