diff mbox

[ovs-dev,v4,2/5] netdev-dpdk: Add netdev_dpdk_vhost_txq_flush function.

Message ID 1502211976-76937-3-git-send-email-bhanuprakash.bodireddy@intel.com
State Rejected
Delegated to: Darrell Ball
Headers show

Commit Message

Bodireddy, Bhanuprakash Aug. 8, 2017, 5:06 p.m. UTC
Add netdev_dpdk_vhost_txq_flush(), that flushes packets on vHost User
port queues. Also add netdev_dpdk_vhost_tx_burst() function that
uses rte_vhost_enqueue_burst() to enqueue burst of packets on vHost User
ports.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
Signed-off-by: Antonio Fischetti <antonio.fischetti@intel.com>
Co-authored-by: Antonio Fischetti <antonio.fischetti@intel.com>
Acked-by: Eelco Chaudron <echaudro@redhat.com>
---
 lib/netdev-dpdk.c | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 108 insertions(+), 4 deletions(-)

Comments

Ilya Maximets Aug. 9, 2017, 8:06 a.m. UTC | #1
Not a full review.
One comment inline.

> Add netdev_dpdk_vhost_txq_flush(), that flushes packets on vHost User
> port queues. Also add netdev_dpdk_vhost_tx_burst() function that
> uses rte_vhost_enqueue_burst() to enqueue burst of packets on vHost User
> ports.
> 
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
> Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
> Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
> Acked-by: Eelco Chaudron <echaudro at redhat.com>
> ---
>  lib/netdev-dpdk.c | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 108 insertions(+), 4 deletions(-)
> 
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 50d6b29..d3892fe 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -327,12 +327,22 @@ struct dpdk_tx_queue {
>                                      * pmd threads (see 'concurrent_txq'). */
>      int map;                       /* Mapping of configured vhost-user queues
>                                      * to enabled by guest. */
> -    int dpdk_pkt_cnt;              /* Number of buffered packets waiting to
> +    union {
> +        int dpdk_pkt_cnt;          /* Number of buffered packets waiting to
>                                        be sent on DPDK tx queue. */
> -    struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
> +        int vhost_pkt_cnt;         /* Number of buffered packets waiting to
> +                                      be sent on vhost port. */
> +    };
> +
> +    union {
> +        struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>                                     /* Intermediate queue where packets can
>                                      * be buffered to amortize the cost of MMIO
>                                      * writes. */
> +        struct dp_packet *vhost_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
> +                                   /* Intermediate queue where packets can
> +                                    * be buffered for vhost ports. */
> +    };
>  };
>  
>  /* dpdk has no way to remove dpdk ring ethernet devices
> @@ -1756,6 +1766,88 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats,
>      }
>  }
>  
> +static int
> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid)
> +{
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->vhost_burst_pkts;
> +
> +    int tx_vid = netdev_dpdk_get_vid(dev);
> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
> +    uint32_t sent = 0;
> +    uint32_t retries = 0;
> +    uint32_t sum, total_pkts;
> +
> +    total_pkts = sum = txq->vhost_pkt_cnt;
> +    do {
> +        uint32_t ret;
> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum);
> +        if (OVS_UNLIKELY(!ret)) {
> +            /* No packets enqueued - do not retry. */
> +            break;
> +        } else {
> +            /* Packet have been sent. */
> +            sent += ret;
> +
> +            /* 'sum' packet have to be retransmitted. */
> +            sum -= ret;
> +        }
> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
> +
> +    for (int i = 0; i < total_pkts; i++) {
> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
> +    }
> +
> +    /* Reset pkt count. */
> +    txq->vhost_pkt_cnt = 0;
> +
> +    /* 'sum' refers to packets dropped. */
> +    return sum;
> +}
> +
> +/* Flush the txq if there are any packets available. */
> +static int
> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
> +                            bool concurrent_txq OVS_UNUSED)
> +{
> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> +    struct dpdk_tx_queue *txq;
> +
> +    qid = dev->tx_q[qid % netdev->n_txq].map;
> +
> +    /* The qid may be disabled in the guest and has been set to
> +     * OVS_VHOST_QUEUE_DISABLED.
> +     */
> +    if (OVS_UNLIKELY(qid < 0)) {
> +        return 0;
> +    }
> +
> +    txq = &dev->tx_q[qid];
> +    /* Increment the drop count and free the memory. */
> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
> +                     !(dev->flags & NETDEV_UP))) {
> +
> +        if (txq->vhost_pkt_cnt) {
> +            rte_spinlock_lock(&dev->stats_lock);
> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
> +            rte_spinlock_unlock(&dev->stats_lock);
> +
> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
> +                dp_packet_delete(txq->vhost_burst_pkts[i]);

Spinlock (tx_lock) must be held here to avoid queue and mempool breakage.

> +            }
> +            txq->vhost_pkt_cnt = 0;
> +        }
> +    }
> +
> +    if (OVS_LIKELY(txq->vhost_pkt_cnt)) {
> +        rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> +        netdev_dpdk_vhost_tx_burst(dev, qid);
> +        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> +    }
> +
> +    return 0;
> +}
> +
>  static void
>  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                           struct dp_packet **pkts, int cnt)
> @@ -2799,6 +2891,17 @@ vring_state_changed(int vid, uint16_t queue_id, int enable)
>              if (enable) {
>                  dev->tx_q[qid].map = qid;
>              } else {
> +                /* If the queue is disabled in the guest, the corresponding qid
> +                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
> +                 *
> +                 * The packets that were queued in 'qid' could be potentially
> +                 * stuck and needs to be dropped.
> +                 *
> +                 * XXX: The queues may be already disabled in the guest so
> +                 * flush function in this case only helps in updating stats
> +                 * and freeing memory.
> +                 */
> +                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
>                  dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>              }
>              netdev_dpdk_remap_txqs(dev);
> @@ -3471,7 +3574,8 @@ static const struct netdev_class dpdk_vhost_class =
>          NULL,
>          netdev_dpdk_vhost_reconfigure,
>          netdev_dpdk_vhost_rxq_recv,
> -        NULL);
> +        netdev_dpdk_vhost_txq_flush);
> +
>  static const struct netdev_class dpdk_vhost_client_class =
>      NETDEV_DPDK_CLASS(
>          "dpdkvhostuserclient",
> @@ -3487,7 +3591,7 @@ static const struct netdev_class dpdk_vhost_client_class =
>          NULL,
>          netdev_dpdk_vhost_client_reconfigure,
>          netdev_dpdk_vhost_rxq_recv,
> -        NULL);
> +        netdev_dpdk_vhost_txq_flush);
>  
>  void
>  netdev_dpdk_register(void)
> -- 
> 2.4.11
Ilya Maximets Aug. 9, 2017, 10:03 a.m. UTC | #2
One more comment inline.

On 09.08.2017 11:06, Ilya Maximets wrote:
> Not a full review.
> One comment inline.
> 
>> Add netdev_dpdk_vhost_txq_flush(), that flushes packets on vHost User
>> port queues. Also add netdev_dpdk_vhost_tx_burst() function that
>> uses rte_vhost_enqueue_burst() to enqueue burst of packets on vHost User
>> ports.
>>
>> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
>> Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
>> Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
>> Acked-by: Eelco Chaudron <echaudro at redhat.com>
>> ---
>>  lib/netdev-dpdk.c | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>>  1 file changed, 108 insertions(+), 4 deletions(-)
>>
>> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
>> index 50d6b29..d3892fe 100644
>> --- a/lib/netdev-dpdk.c
>> +++ b/lib/netdev-dpdk.c
>> @@ -327,12 +327,22 @@ struct dpdk_tx_queue {
>>                                      * pmd threads (see 'concurrent_txq'). */
>>      int map;                       /* Mapping of configured vhost-user queues
>>                                      * to enabled by guest. */
>> -    int dpdk_pkt_cnt;              /* Number of buffered packets waiting to
>> +    union {
>> +        int dpdk_pkt_cnt;          /* Number of buffered packets waiting to
>>                                        be sent on DPDK tx queue. */
>> -    struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>> +        int vhost_pkt_cnt;         /* Number of buffered packets waiting to
>> +                                      be sent on vhost port. */
>> +    };
>> +
>> +    union {
>> +        struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>>                                     /* Intermediate queue where packets can
>>                                      * be buffered to amortize the cost of MMIO
>>                                      * writes. */
>> +        struct dp_packet *vhost_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>> +                                   /* Intermediate queue where packets can
>> +                                    * be buffered for vhost ports. */
>> +    };
>>  };
>>  
>>  /* dpdk has no way to remove dpdk ring ethernet devices
>> @@ -1756,6 +1766,88 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats,
>>      }
>>  }
>>  
>> +static int
>> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid)
>> +{
>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->vhost_burst_pkts;
>> +
>> +    int tx_vid = netdev_dpdk_get_vid(dev);
>> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
>> +    uint32_t sent = 0;
>> +    uint32_t retries = 0;
>> +    uint32_t sum, total_pkts;
>> +
>> +    total_pkts = sum = txq->vhost_pkt_cnt;
>> +    do {
>> +        uint32_t ret;
>> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum);
>> +        if (OVS_UNLIKELY(!ret)) {
>> +            /* No packets enqueued - do not retry. */
>> +            break;
>> +        } else {
>> +            /* Packet have been sent. */
>> +            sent += ret;
>> +
>> +            /* 'sum' packet have to be retransmitted. */
>> +            sum -= ret;
>> +        }
>> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
>> +
>> +    for (int i = 0; i < total_pkts; i++) {
>> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
>> +    }
>> +
>> +    /* Reset pkt count. */
>> +    txq->vhost_pkt_cnt = 0;
>> +
>> +    /* 'sum' refers to packets dropped. */
>> +    return sum;
>> +}
>> +
>> +/* Flush the txq if there are any packets available. */
>> +static int
>> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
>> +                            bool concurrent_txq OVS_UNUSED)
>> +{
>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>> +    struct dpdk_tx_queue *txq;
>> +
>> +    qid = dev->tx_q[qid % netdev->n_txq].map;
>> +
>> +    /* The qid may be disabled in the guest and has been set to
>> +     * OVS_VHOST_QUEUE_DISABLED.
>> +     */
>> +    if (OVS_UNLIKELY(qid < 0)) {
>> +        return 0;
>> +    }
>> +
>> +    txq = &dev->tx_q[qid];
>> +    /* Increment the drop count and free the memory. */
>> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
>> +                     !(dev->flags & NETDEV_UP))) {
>> +
>> +        if (txq->vhost_pkt_cnt) {
>> +            rte_spinlock_lock(&dev->stats_lock);
>> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>> +            rte_spinlock_unlock(&dev->stats_lock);
>> +
>> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>> +                dp_packet_delete(txq->vhost_burst_pkts[i]);
> 
> Spinlock (tx_lock) must be held here to avoid queue and mempool breakage.
> 
>> +            }
>> +            txq->vhost_pkt_cnt = 0;
>> +        }
>> +    }
>> +
>> +    if (OVS_LIKELY(txq->vhost_pkt_cnt)) {
>> +        rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>> +        netdev_dpdk_vhost_tx_burst(dev, qid);
>> +        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>>  static void
>>  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>>                           struct dp_packet **pkts, int cnt)
>> @@ -2799,6 +2891,17 @@ vring_state_changed(int vid, uint16_t queue_id, int enable)
>>              if (enable) {
>>                  dev->tx_q[qid].map = qid;

Here flushing required too because we're possibly enabling previously remapped queue.

>>              } else {
>> +                /* If the queue is disabled in the guest, the corresponding qid
>> +                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
>> +                 *
>> +                 * The packets that were queued in 'qid' could be potentially
>> +                 * stuck and needs to be dropped.
>> +                 *
>> +                 * XXX: The queues may be already disabled in the guest so
>> +                 * flush function in this case only helps in updating stats
>> +                 * and freeing memory.
>> +                 */
>> +                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
>>                  dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>>              }
>>              netdev_dpdk_remap_txqs(dev);

'netdev_dpdk_remap_txqs()', actually, is able to change mapping for all the
disabled in guest queues. So, we need to flush all of them while remapping
somewhere inside the function.
One other thing is that there is a race window between flush and
mapping update where another process able to enqueue more packets in just
flushed queue. The order of operations should be changed, or both of them
should be done under the same tx_lock. I think, it's required to make
tx_q[].map field atomic to fix the race condition, because send function
takes the 'map' and then locks the corresponding queue. It wasn't an issue
before, because packets in case of race was just dropped on attempt to send
to disabled queue, but with this patch applied they will be enqueued to the
intermediate queue and stuck there.

>> @@ -3471,7 +3574,8 @@ static const struct netdev_class dpdk_vhost_class =
>>          NULL,
>>          netdev_dpdk_vhost_reconfigure,
>>          netdev_dpdk_vhost_rxq_recv,
>> -        NULL);
>> +        netdev_dpdk_vhost_txq_flush);
>> +
>>  static const struct netdev_class dpdk_vhost_client_class =
>>      NETDEV_DPDK_CLASS(
>>          "dpdkvhostuserclient",
>> @@ -3487,7 +3591,7 @@ static const struct netdev_class dpdk_vhost_client_class =
>>          NULL,
>>          netdev_dpdk_vhost_client_reconfigure,
>>          netdev_dpdk_vhost_rxq_recv,
>> -        NULL);
>> +        netdev_dpdk_vhost_txq_flush);
>>  
>>  void
>>  netdev_dpdk_register(void)
>> -- 
>> 2.4.11
Ilya Maximets Aug. 9, 2017, 12:22 p.m. UTC | #3
On 09.08.2017 13:03, Ilya Maximets wrote:
> One more comment inline.
> 
> On 09.08.2017 11:06, Ilya Maximets wrote:
>> Not a full review.
>> One comment inline.
>>
>>> Add netdev_dpdk_vhost_txq_flush(), that flushes packets on vHost User
>>> port queues. Also add netdev_dpdk_vhost_tx_burst() function that
>>> uses rte_vhost_enqueue_burst() to enqueue burst of packets on vHost User
>>> ports.
>>>
>>> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
>>> Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
>>> Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
>>> Acked-by: Eelco Chaudron <echaudro at redhat.com>
>>> ---
>>>  lib/netdev-dpdk.c | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>>>  1 file changed, 108 insertions(+), 4 deletions(-)
>>>
>>> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
>>> index 50d6b29..d3892fe 100644
>>> --- a/lib/netdev-dpdk.c
>>> +++ b/lib/netdev-dpdk.c
>>> @@ -327,12 +327,22 @@ struct dpdk_tx_queue {
>>>                                      * pmd threads (see 'concurrent_txq'). */
>>>      int map;                       /* Mapping of configured vhost-user queues
>>>                                      * to enabled by guest. */
>>> -    int dpdk_pkt_cnt;              /* Number of buffered packets waiting to
>>> +    union {
>>> +        int dpdk_pkt_cnt;          /* Number of buffered packets waiting to
>>>                                        be sent on DPDK tx queue. */
>>> -    struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>>> +        int vhost_pkt_cnt;         /* Number of buffered packets waiting to
>>> +                                      be sent on vhost port. */
>>> +    };
>>> +
>>> +    union {
>>> +        struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>>>                                     /* Intermediate queue where packets can
>>>                                      * be buffered to amortize the cost of MMIO
>>>                                      * writes. */
>>> +        struct dp_packet *vhost_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
>>> +                                   /* Intermediate queue where packets can
>>> +                                    * be buffered for vhost ports. */
>>> +    };
>>>  };
>>>  
>>>  /* dpdk has no way to remove dpdk ring ethernet devices
>>> @@ -1756,6 +1766,88 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats,
>>>      }
>>>  }
>>>  
>>> +static int
>>> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid)
>>> +{
>>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>>> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->vhost_burst_pkts;
>>> +
>>> +    int tx_vid = netdev_dpdk_get_vid(dev);
>>> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
>>> +    uint32_t sent = 0;
>>> +    uint32_t retries = 0;
>>> +    uint32_t sum, total_pkts;
>>> +
>>> +    total_pkts = sum = txq->vhost_pkt_cnt;
>>> +    do {
>>> +        uint32_t ret;
>>> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum);
>>> +        if (OVS_UNLIKELY(!ret)) {
>>> +            /* No packets enqueued - do not retry. */
>>> +            break;
>>> +        } else {
>>> +            /* Packet have been sent. */
>>> +            sent += ret;
>>> +
>>> +            /* 'sum' packet have to be retransmitted. */
>>> +            sum -= ret;
>>> +        }
>>> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
>>> +
>>> +    for (int i = 0; i < total_pkts; i++) {
>>> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
>>> +    }
>>> +
>>> +    /* Reset pkt count. */
>>> +    txq->vhost_pkt_cnt = 0;
>>> +
>>> +    /* 'sum' refers to packets dropped. */
>>> +    return sum;
>>> +}
>>> +
>>> +/* Flush the txq if there are any packets available. */
>>> +static int
>>> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
>>> +                            bool concurrent_txq OVS_UNUSED)
>>> +{
>>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>>> +    struct dpdk_tx_queue *txq;
>>> +
>>> +    qid = dev->tx_q[qid % netdev->n_txq].map;
>>> +
>>> +    /* The qid may be disabled in the guest and has been set to
>>> +     * OVS_VHOST_QUEUE_DISABLED.
>>> +     */
>>> +    if (OVS_UNLIKELY(qid < 0)) {
>>> +        return 0;
>>> +    }
>>> +
>>> +    txq = &dev->tx_q[qid];
>>> +    /* Increment the drop count and free the memory. */
>>> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
>>> +                     !(dev->flags & NETDEV_UP))) {
>>> +
>>> +        if (txq->vhost_pkt_cnt) {
>>> +            rte_spinlock_lock(&dev->stats_lock);
>>> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>>> +            rte_spinlock_unlock(&dev->stats_lock);
>>> +
>>> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>>> +                dp_packet_delete(txq->vhost_burst_pkts[i]);
>>
>> Spinlock (tx_lock) must be held here to avoid queue and mempool breakage.
>>
>>> +            }
>>> +            txq->vhost_pkt_cnt = 0;
>>> +        }
>>> +    }
>>> +
>>> +    if (OVS_LIKELY(txq->vhost_pkt_cnt)) {
>>> +        rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>>> +        netdev_dpdk_vhost_tx_burst(dev, qid);
>>> +        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>>> +    }
>>> +
>>> +    return 0;
>>> +}
>>> +
>>>  static void
>>>  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>>>                           struct dp_packet **pkts, int cnt)
>>> @@ -2799,6 +2891,17 @@ vring_state_changed(int vid, uint16_t queue_id, int enable)
>>>              if (enable) {
>>>                  dev->tx_q[qid].map = qid;
> 
> Here flushing required too because we're possibly enabling previously remapped queue.
> 
>>>              } else {
>>> +                /* If the queue is disabled in the guest, the corresponding qid
>>> +                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
>>> +                 *
>>> +                 * The packets that were queued in 'qid' could be potentially
>>> +                 * stuck and needs to be dropped.
>>> +                 *
>>> +                 * XXX: The queues may be already disabled in the guest so
>>> +                 * flush function in this case only helps in updating stats
>>> +                 * and freeing memory.
>>> +                 */
>>> +                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
>>>                  dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>>>              }
>>>              netdev_dpdk_remap_txqs(dev);
> 
> 'netdev_dpdk_remap_txqs()', actually, is able to change mapping for all the
> disabled in guest queues. So, we need to flush all of them while remapping
> somewhere inside the function.
> One other thing is that there is a race window between flush and
> mapping update where another process able to enqueue more packets in just
> flushed queue. The order of operations should be changed, or both of them
> should be done under the same tx_lock. I think, it's required to make
> tx_q[].map field atomic to fix the race condition, because send function
> takes the 'map' and then locks the corresponding queue. It wasn't an issue
> before, because packets in case of race was just dropped on attempt to send
> to disabled queue, but with this patch applied they will be enqueued to the
> intermediate queue and stuck there.

Making 'map' atomic will not help. To solve the race we should make
'reading of map + enqueue' an atomic operation by some spinlock.
Like this:

vhost_send:
----------------------------------------------------------------
    qid = qid % netdev->n_txq;
    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);

    mapped_qid = dev->tx_q[qid].map;

    if (qid != mapped_qid) {
        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
    }

    tx_enqueue(mapped_qid, pkts, cnt);

    if (qid != mapped_qid) {
        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
    }

    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
----------------------------------------------------------------

txq remapping inside 'netdev_dpdk_remap_txqs()' or 'vring_state_changed()':
----------------------------------------------------------------
    qid - queue we need to remap.
    new_qid - queue we need to remap to.

    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);

    mapped_qid = dev->tx_q[qid].map;
    if (qid != mapped_qid) {
        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
    }

    tx_flush(mapped_qid)

    if (qid != mapped_qid) {
        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
    }

    dev->tx_q[qid].map = new_qid;

    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
----------------------------------------------------------------

Above schema should work without races, but looks kind of ugly and requires
taking of additional spinlock on each send.

P.S. Sorry for talking with myself. Just want to share my thoughts.

> 
>>> @@ -3471,7 +3574,8 @@ static const struct netdev_class dpdk_vhost_class =
>>>          NULL,
>>>          netdev_dpdk_vhost_reconfigure,
>>>          netdev_dpdk_vhost_rxq_recv,
>>> -        NULL);
>>> +        netdev_dpdk_vhost_txq_flush);
>>> +
>>>  static const struct netdev_class dpdk_vhost_client_class =
>>>      NETDEV_DPDK_CLASS(
>>>          "dpdkvhostuserclient",
>>> @@ -3487,7 +3591,7 @@ static const struct netdev_class dpdk_vhost_client_class =
>>>          NULL,
>>>          netdev_dpdk_vhost_client_reconfigure,
>>>          netdev_dpdk_vhost_rxq_recv,
>>> -        NULL);
>>> +        netdev_dpdk_vhost_txq_flush);
>>>  
>>>  void
>>>  netdev_dpdk_register(void)
>>> -- 
>>> 2.4.11
Bodireddy, Bhanuprakash Aug. 9, 2017, 12:35 p.m. UTC | #4
>>
>> +static int
>> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid) {
>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf
>> +**)txq->vhost_burst_pkts;
>> +
>> +    int tx_vid = netdev_dpdk_get_vid(dev);
>> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
>> +    uint32_t sent = 0;
>> +    uint32_t retries = 0;
>> +    uint32_t sum, total_pkts;
>> +
>> +    total_pkts = sum = txq->vhost_pkt_cnt;
>> +    do {
>> +        uint32_t ret;
>> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent],
>sum);
>> +        if (OVS_UNLIKELY(!ret)) {
>> +            /* No packets enqueued - do not retry. */
>> +            break;
>> +        } else {
>> +            /* Packet have been sent. */
>> +            sent += ret;
>> +
>> +            /* 'sum' packet have to be retransmitted. */
>> +            sum -= ret;
>> +        }
>> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
>> +
>> +    for (int i = 0; i < total_pkts; i++) {
>> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
>> +    }
>> +
>> +    /* Reset pkt count. */
>> +    txq->vhost_pkt_cnt = 0;
>> +
>> +    /* 'sum' refers to packets dropped. */
>> +    return sum;
>> +}
>> +
>> +/* Flush the txq if there are any packets available. */ static int
>> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
>> +                            bool concurrent_txq OVS_UNUSED) {
>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>> +    struct dpdk_tx_queue *txq;
>> +
>> +    qid = dev->tx_q[qid % netdev->n_txq].map;
>> +
>> +    /* The qid may be disabled in the guest and has been set to
>> +     * OVS_VHOST_QUEUE_DISABLED.
>> +     */
>> +    if (OVS_UNLIKELY(qid < 0)) {
>> +        return 0;
>> +    }
>> +
>> +    txq = &dev->tx_q[qid];
>> +    /* Increment the drop count and free the memory. */
>> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
>> +                     !(dev->flags & NETDEV_UP))) {
>> +
>> +        if (txq->vhost_pkt_cnt) {
>> +            rte_spinlock_lock(&dev->stats_lock);
>> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>> +            rte_spinlock_unlock(&dev->stats_lock);
>> +
>> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>> +                dp_packet_delete(txq->vhost_burst_pkts[i]);
>
>Spinlock (tx_lock) must be held here to avoid queue and mempool breakage.

I think you are right. tx_lock might be acquired for freeing the packets.

---------------------------------------------------------------------------
    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
    for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
             dp_packet_delete(txq->vhost_burst_pkts[i]);
    }
    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);

- Bhanuprakash
Bodireddy, Bhanuprakash Aug. 9, 2017, 12:46 p.m. UTC | #5
>enable)
>>>>              if (enable) {
>>>>                  dev->tx_q[qid].map = qid;
>>
>> Here flushing required too because we're possibly enabling previously
>remapped queue.
>>
>>>>              } else {
>>>> +                /* If the queue is disabled in the guest, the corresponding qid
>>>> +                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
>>>> +                 *
>>>> +                 * The packets that were queued in 'qid' could be potentially
>>>> +                 * stuck and needs to be dropped.
>>>> +                 *
>>>> +                 * XXX: The queues may be already disabled in the guest so
>>>> +                 * flush function in this case only helps in updating stats
>>>> +                 * and freeing memory.
>>>> +                 */
>>>> +                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
>>>>                  dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>>>>              }
>>>>              netdev_dpdk_remap_txqs(dev);
>>
>> 'netdev_dpdk_remap_txqs()', actually, is able to change mapping for
>> all the disabled in guest queues. So, we need to flush all of them
>> while remapping somewhere inside the function.
>> One other thing is that there is a race window between flush and
>> mapping update where another process able to enqueue more packets in
>> just flushed queue. The order of operations should be changed, or both
>> of them should be done under the same tx_lock. I think, it's required
>> to make tx_q[].map field atomic to fix the race condition, because
>> send function takes the 'map' and then locks the corresponding queue.
>> It wasn't an issue before, because packets in case of race was just
>> dropped on attempt to send to disabled queue, but with this patch
>> applied they will be enqueued to the intermediate queue and stuck there.
>
>Making 'map' atomic will not help. To solve the race we should make 'reading
>of map + enqueue' an atomic operation by some spinlock.
>Like this:
>
>vhost_send:
>----------------------------------------------------------------
>    qid = qid % netdev->n_txq;
>    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>
>    mapped_qid = dev->tx_q[qid].map;
>
>    if (qid != mapped_qid) {
>        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    tx_enqueue(mapped_qid, pkts, cnt);
>
>    if (qid != mapped_qid) {
>        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>----------------------------------------------------------------
>
>txq remapping inside 'netdev_dpdk_remap_txqs()' or
>'vring_state_changed()':
>----------------------------------------------------------------
>    qid - queue we need to remap.
>    new_qid - queue we need to remap to.
>
>    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>
>    mapped_qid = dev->tx_q[qid].map;
>    if (qid != mapped_qid) {
>        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    tx_flush(mapped_qid)
>
>    if (qid != mapped_qid) {
>        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    dev->tx_q[qid].map = new_qid;
>
>    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>----------------------------------------------------------------
>
>Above schema should work without races, but looks kind of ugly and requires
>taking of additional spinlock on each send.
>
>P.S. Sorry for talking with myself. Just want to share my thoughts.

Hi Ilya,

Thanks for reviewing the patches and providing inputs.
I went through your comments for this patch(2/5) and agree with the suggestions.
Meanwhile  while go through the changes above and get back to you.

Bhanuprakash.
Ilya Maximets Aug. 9, 2017, 12:49 p.m. UTC | #6
On 09.08.2017 15:35, Bodireddy, Bhanuprakash wrote:
>>>
>>> +static int
>>> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid) {
>>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>>> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf
>>> +**)txq->vhost_burst_pkts;
>>> +
>>> +    int tx_vid = netdev_dpdk_get_vid(dev);
>>> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
>>> +    uint32_t sent = 0;
>>> +    uint32_t retries = 0;
>>> +    uint32_t sum, total_pkts;
>>> +
>>> +    total_pkts = sum = txq->vhost_pkt_cnt;
>>> +    do {
>>> +        uint32_t ret;
>>> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent],
>> sum);
>>> +        if (OVS_UNLIKELY(!ret)) {
>>> +            /* No packets enqueued - do not retry. */
>>> +            break;
>>> +        } else {
>>> +            /* Packet have been sent. */
>>> +            sent += ret;
>>> +
>>> +            /* 'sum' packet have to be retransmitted. */
>>> +            sum -= ret;
>>> +        }
>>> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
>>> +
>>> +    for (int i = 0; i < total_pkts; i++) {
>>> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
>>> +    }
>>> +
>>> +    /* Reset pkt count. */
>>> +    txq->vhost_pkt_cnt = 0;
>>> +
>>> +    /* 'sum' refers to packets dropped. */
>>> +    return sum;
>>> +}
>>> +
>>> +/* Flush the txq if there are any packets available. */ static int
>>> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
>>> +                            bool concurrent_txq OVS_UNUSED) {
>>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>>> +    struct dpdk_tx_queue *txq;
>>> +
>>> +    qid = dev->tx_q[qid % netdev->n_txq].map;
>>> +
>>> +    /* The qid may be disabled in the guest and has been set to
>>> +     * OVS_VHOST_QUEUE_DISABLED.
>>> +     */
>>> +    if (OVS_UNLIKELY(qid < 0)) {
>>> +        return 0;
>>> +    }
>>> +
>>> +    txq = &dev->tx_q[qid];
>>> +    /* Increment the drop count and free the memory. */
>>> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
>>> +                     !(dev->flags & NETDEV_UP))) {
>>> +
>>> +        if (txq->vhost_pkt_cnt) {
>>> +            rte_spinlock_lock(&dev->stats_lock);
>>> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>>> +            rte_spinlock_unlock(&dev->stats_lock);
>>> +
>>> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>>> +                dp_packet_delete(txq->vhost_burst_pkts[i]);
>>
>> Spinlock (tx_lock) must be held here to avoid queue and mempool breakage.
> 
> I think you are right. tx_lock might be acquired for freeing the packets.

I think that 'vhost_pkt_cnt' reads and updates also should be protected to avoid races.

> ---------------------------------------------------------------------------
>     rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>     for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>              dp_packet_delete(txq->vhost_burst_pkts[i]);
>     }
>     rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> 
> - Bhanuprakash
>
Bodireddy, Bhanuprakash Aug. 10, 2017, 6:52 p.m. UTC | #7
>>
>>>>              } else {
>>>> +                /* If the queue is disabled in the guest, the corresponding qid
>>>> +                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
>>>> +                 *
>>>> +                 * The packets that were queued in 'qid' could be potentially
>>>> +                 * stuck and needs to be dropped.
>>>> +                 *
>>>> +                 * XXX: The queues may be already disabled in the guest so
>>>> +                 * flush function in this case only helps in updating stats
>>>> +                 * and freeing memory.
>>>> +                 */
>>>> +                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
>>>>                  dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>>>>              }
>>>>              netdev_dpdk_remap_txqs(dev);
>>
>> 'netdev_dpdk_remap_txqs()', actually, is able to change mapping for
>> all the disabled in guest queues. So, we need to flush all of them
>> while remapping somewhere inside the function.
>> One other thing is that there is a race window between flush and
>> mapping update where another process able to enqueue more packets in
>> just flushed queue. The order of operations should be changed, or both
>> of them should be done under the same tx_lock. I think, it's required
>> to make tx_q[].map field atomic to fix the race condition, because
>> send function takes the 'map' and then locks the corresponding queue.
>> It wasn't an issue before, because packets in case of race was just
>> dropped on attempt to send to disabled queue, but with this patch
>> applied they will be enqueued to the intermediate queue and stuck there.
>
>Making 'map' atomic will not help. To solve the race we should make 'reading
>of map + enqueue' an atomic operation by some spinlock.
>Like this:
>
>vhost_send:
>----------------------------------------------------------------
>    qid = qid % netdev->n_txq;
>    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>
>    mapped_qid = dev->tx_q[qid].map;
>
>    if (qid != mapped_qid) {
>        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    tx_enqueue(mapped_qid, pkts, cnt);
>
>    if (qid != mapped_qid) {
>        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>----------------------------------------------------------------
>
>txq remapping inside 'netdev_dpdk_remap_txqs()' or
>'vring_state_changed()':
>----------------------------------------------------------------
>    qid - queue we need to remap.
>    new_qid - queue we need to remap to.
>
>    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>
>    mapped_qid = dev->tx_q[qid].map;
>    if (qid != mapped_qid) {
>        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    tx_flush(mapped_qid)
>
>    if (qid != mapped_qid) {
>        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>    }
>
>    dev->tx_q[qid].map = new_qid;
>
>    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>----------------------------------------------------------------
>
>Above schema should work without races, but looks kind of ugly and requires
>taking of additional spinlock on each send.
>
>P.S. Sorry for talking with myself. Just want to share my thoughts.

Hi Ilya,

Can you please review the below changes based on what you suggested above. 
As the problem only happens when the queues are enabled/disabled in the guest, 
I did some  preliminary testing with the below changes by sending some traffic in to the VM
and enabling and disabling the queues inside the guest the same time. 

Vhost_send()
---------------------------------------------------------------------------------
    qid = qid % netdev->n_txq;

    /* Acquire tx_lock before reading tx_q[qid].map and enqueueing packets.
     * tx_q[].map gets updated in vring_state_changed() when vrings are
     * enabled/disabled in the guest. */
    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);

    mapped_qid = dev->tx_q[qid].map;
    if (OVS_UNLIKELY(qid != mapped_qid)) {
        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
    }

    if (OVS_UNLIKELY(!is_vhost_running(dev) || mapped_qid < 0
                     || !(dev->flags & NETDEV_UP))) {
        rte_spinlock_lock(&dev->stats_lock);
        dev->stats.tx_dropped+= cnt;
        rte_spinlock_unlock(&dev->stats_lock);

        for (i = 0; i < total_pkts; i++) {
            dp_packet_delete(pkts[i]);
        }

        if (OVS_UNLIKELY(qid != mapped_qid)) {
            rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
        }
        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);

        return;
    }

    cnt = netdev_dpdk_filter_packet_len(dev, cur_pkts, cnt);
    /* Check has QoS has been configured for the netdev */
    cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt);
    dropped = total_pkts - cnt;

    int idx = 0;
    struct dpdk_tx_queue *txq = &dev->tx_q[mapped_qid];
    while (idx < cnt) {
        txq->vhost_burst_pkts[txq->vhost_pkt_cnt++] = pkts[idx++];

        if (txq->vhost_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
            dropped += netdev_dpdk_vhost_tx_burst(dev, mapped_qid);
        }
    }

    if (OVS_UNLIKELY(qid != mapped_qid)) {
        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
    }

    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);

    rte_spinlock_lock(&dev->stats_lock);
    netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts,
                                         dropped);
    rte_spinlock_unlock(&dev->stats_lock);
-------------------------------------------------------------------------------------------------------


Vring_state_changed().

As t_q[].map should be atomic and is updated both in vring_state_changed and netdev_dpdk_remap_txqs(),
 I made updates to vring_state_changed(). 

------------------------------------------------------------------------------------------------------
LIST_FOR_EACH (dev, list_node, &dpdk_list) {
        ovs_mutex_lock(&dev->mutex);
        if (strncmp(ifname, dev->vhost_id, IF_NAME_SZ) == 0) {
            int mapped_qid;

            /* Acquire tx_lock as the dpdk_vhost_send() function will
             * read the tx_q[qid].map and lock the corresponding queue. */
            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);

            mapped_qid = dev->tx_q[qid].map;
            if (OVS_UNLIKELY(qid != mapped_qid)) {
                rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
            }

            netdev_dpdk_vhost_txq_flush(&dev->up, mapped_qid, 0);

            if (enable) {
                dev->tx_q[qid].map = qid;
            } else {
                dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
            }

            netdev_dpdk_remap_txqs(dev);

            if (OVS_UNLIKELY(qid != mapped_qid)) {
                rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
            }
            rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);

            exists = true;
            ovs_mutex_unlock(&dev->mutex);
            break;
        }
        ovs_mutex_unlock(&dev->mutex);
    }
----------------------------------------------------------------------------------------------

Regards,
Bhanuprakash.
Bodireddy, Bhanuprakash Aug. 11, 2017, 1:11 p.m. UTC | #8
>On 09.08.2017 15:35, Bodireddy, Bhanuprakash wrote:
>>>>
>>>> +static int
>>>> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid) {
>>>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>>>> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf
>>>> +**)txq->vhost_burst_pkts;
>>>> +
>>>> +    int tx_vid = netdev_dpdk_get_vid(dev);
>>>> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
>>>> +    uint32_t sent = 0;
>>>> +    uint32_t retries = 0;
>>>> +    uint32_t sum, total_pkts;
>>>> +
>>>> +    total_pkts = sum = txq->vhost_pkt_cnt;
>>>> +    do {
>>>> +        uint32_t ret;
>>>> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid,
>>>> + &cur_pkts[sent],
>>> sum);
>>>> +        if (OVS_UNLIKELY(!ret)) {
>>>> +            /* No packets enqueued - do not retry. */
>>>> +            break;
>>>> +        } else {
>>>> +            /* Packet have been sent. */
>>>> +            sent += ret;
>>>> +
>>>> +            /* 'sum' packet have to be retransmitted. */
>>>> +            sum -= ret;
>>>> +        }
>>>> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
>>>> +
>>>> +    for (int i = 0; i < total_pkts; i++) {
>>>> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
>>>> +    }
>>>> +
>>>> +    /* Reset pkt count. */
>>>> +    txq->vhost_pkt_cnt = 0;
>>>> +
>>>> +    /* 'sum' refers to packets dropped. */
>>>> +    return sum;
>>>> +}
>>>> +
>>>> +/* Flush the txq if there are any packets available. */ static int
>>>> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
>>>> +                            bool concurrent_txq OVS_UNUSED) {
>>>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>>>> +    struct dpdk_tx_queue *txq;
>>>> +
>>>> +    qid = dev->tx_q[qid % netdev->n_txq].map;
>>>> +
>>>> +    /* The qid may be disabled in the guest and has been set to
>>>> +     * OVS_VHOST_QUEUE_DISABLED.
>>>> +     */
>>>> +    if (OVS_UNLIKELY(qid < 0)) {
>>>> +        return 0;
>>>> +    }
>>>> +
>>>> +    txq = &dev->tx_q[qid];
>>>> +    /* Increment the drop count and free the memory. */
>>>> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
>>>> +                     !(dev->flags & NETDEV_UP))) {
>>>> +
>>>> +        if (txq->vhost_pkt_cnt) {
>>>> +            rte_spinlock_lock(&dev->stats_lock);
>>>> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>>>> +            rte_spinlock_unlock(&dev->stats_lock);
>>>> +
>>>> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>>>> +                dp_packet_delete(txq->vhost_burst_pkts[i]);
>>>
>>> Spinlock (tx_lock) must be held here to avoid queue and mempool
>breakage.
>>
>> I think you are right. tx_lock might be acquired for freeing the packets.
>
>I think that 'vhost_pkt_cnt' reads and updates also should be protected to
>avoid races.

From the discussion in the thread https://mail.openvswitch.org/pipermail/ovs-dev/2017-August/337133.html,
We are going to acquire tx_lock for updating the map and flushing the queue inside vring_state_changed(). 

That triggers a deadlock in the  flushing function as we have already acquired the same lock in netdev_dpdk_vhost_txq_flush().
This is the same problem for freeing the memory and protecting the updates to vhost_pkt_cnt.

   if (OVS_LIKELY(txq->vhost_pkt_cnt)) {
         rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
        netdev_dpdk_vhost_tx_burst(dev, qid);
        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
   }

As the problem is triggered when the guest queues are enabled/disabled, with a small race window where packets can get enqueued in to the queue just after the flush and before map value is updated in cb function(vring_state_changed()), how abt this?

Technically as the queues are disabled, there is no point in flushing the packets, so let's free the packets and set the txq->vhost_pkt_cnt in vring_state_changed() itself instead of calling flush().

vring_state_changed().
------------------------------------------------------
rte_spinlock_lock(&dev->tx_q[qid].tx_lock);

mapped_qid = dev->tx_q[qid].map;
 if (OVS_UNLIKELY(qid != mapped_qid)) {
        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
}

if (enable) {
                dev->tx_q[qid].map = qid;
  } else {
                struct dpdk_tx_queue *txq = &dev->tx_q[qid];
                if (txq->vhost_pkt_cnt) {
                    rte_spinlock_lock(&dev->stats_lock);
                    dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
                    rte_spinlock_unlock(&dev->stats_lock);

                    for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
                        dp_packet_delete(txq->vhost_burst_pkts[i]);
                    }
                    txq->vhost_pkt_cnt = 0;
                }

                dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
  }
-------------------------------------------------------------------------

Regards,
Bhanuprakash.

>
>> ---------------------------------------------------------------------------
>>     rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>>     for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>>              dp_packet_delete(txq->vhost_burst_pkts[i]);
>>     }
>>     rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>>
>> - Bhanuprakash
>>
Ilya Maximets Aug. 11, 2017, 1:32 p.m. UTC | #9
On 10.08.2017 21:52, Bodireddy, Bhanuprakash wrote:
>>>
>>>>>              } else {
>>>>> +                /* If the queue is disabled in the guest, the corresponding qid
>>>>> +                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
>>>>> +                 *
>>>>> +                 * The packets that were queued in 'qid' could be potentially
>>>>> +                 * stuck and needs to be dropped.
>>>>> +                 *
>>>>> +                 * XXX: The queues may be already disabled in the guest so
>>>>> +                 * flush function in this case only helps in updating stats
>>>>> +                 * and freeing memory.
>>>>> +                 */
>>>>> +                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
>>>>>                  dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>>>>>              }
>>>>>              netdev_dpdk_remap_txqs(dev);
>>>
>>> 'netdev_dpdk_remap_txqs()', actually, is able to change mapping for
>>> all the disabled in guest queues. So, we need to flush all of them
>>> while remapping somewhere inside the function.
>>> One other thing is that there is a race window between flush and
>>> mapping update where another process able to enqueue more packets in
>>> just flushed queue. The order of operations should be changed, or both
>>> of them should be done under the same tx_lock. I think, it's required
>>> to make tx_q[].map field atomic to fix the race condition, because
>>> send function takes the 'map' and then locks the corresponding queue.
>>> It wasn't an issue before, because packets in case of race was just
>>> dropped on attempt to send to disabled queue, but with this patch
>>> applied they will be enqueued to the intermediate queue and stuck there.
>>
>> Making 'map' atomic will not help. To solve the race we should make 'reading
>> of map + enqueue' an atomic operation by some spinlock.
>> Like this:
>>
>> vhost_send:
>> ----------------------------------------------------------------
>>    qid = qid % netdev->n_txq;
>>    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>>
>>    mapped_qid = dev->tx_q[qid].map;
>>
>>    if (qid != mapped_qid) {
>>        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>>    }
>>
>>    tx_enqueue(mapped_qid, pkts, cnt);
>>
>>    if (qid != mapped_qid) {
>>        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>>    }
>>
>>    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>> ----------------------------------------------------------------
>>
>> txq remapping inside 'netdev_dpdk_remap_txqs()' or
>> 'vring_state_changed()':
>> ----------------------------------------------------------------
>>    qid - queue we need to remap.
>>    new_qid - queue we need to remap to.
>>
>>    rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>>
>>    mapped_qid = dev->tx_q[qid].map;
>>    if (qid != mapped_qid) {
>>        rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>>    }
>>
>>    tx_flush(mapped_qid)
>>
>>    if (qid != mapped_qid) {
>>        rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>>    }
>>
>>    dev->tx_q[qid].map = new_qid;
>>
>>    rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>> ----------------------------------------------------------------
>>
>> Above schema should work without races, but looks kind of ugly and requires
>> taking of additional spinlock on each send.
>>
>> P.S. Sorry for talking with myself. Just want to share my thoughts.
> 
> Hi Ilya,
> 
> Can you please review the below changes based on what you suggested above. 
> As the problem only happens when the queues are enabled/disabled in the guest, 
> I did some  preliminary testing with the below changes by sending some traffic in to the VM
> and enabling and disabling the queues inside the guest the same time. 
> 
> Vhost_send()
> ---------------------------------------------------------------------------------
>     qid = qid % netdev->n_txq;
> 
>     /* Acquire tx_lock before reading tx_q[qid].map and enqueueing packets.
>      * tx_q[].map gets updated in vring_state_changed() when vrings are
>      * enabled/disabled in the guest. */
>     rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> 
>     mapped_qid = dev->tx_q[qid].map;
>     if (OVS_UNLIKELY(qid != mapped_qid)) {
>         rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>     }
> 
>     if (OVS_UNLIKELY(!is_vhost_running(dev) || mapped_qid < 0
>                      || !(dev->flags & NETDEV_UP))) {
>         rte_spinlock_lock(&dev->stats_lock);
>         dev->stats.tx_dropped+= cnt;
>         rte_spinlock_unlock(&dev->stats_lock);
> 
>         for (i = 0; i < total_pkts; i++) {
>             dp_packet_delete(pkts[i]);
>         }
> 
>         if (OVS_UNLIKELY(qid != mapped_qid)) {
>             rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>         }
>         rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> 
>         return;
>     }
> 
>     cnt = netdev_dpdk_filter_packet_len(dev, cur_pkts, cnt);
>     /* Check has QoS has been configured for the netdev */
>     cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt);
>     dropped = total_pkts - cnt;
> 
>     int idx = 0;
>     struct dpdk_tx_queue *txq = &dev->tx_q[mapped_qid];
>     while (idx < cnt) {
>         txq->vhost_burst_pkts[txq->vhost_pkt_cnt++] = pkts[idx++];
> 
>         if (txq->vhost_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
>             dropped += netdev_dpdk_vhost_tx_burst(dev, mapped_qid);
>         }
>     }
> 
>     if (OVS_UNLIKELY(qid != mapped_qid)) {
>         rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>     }
> 
>     rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> 
>     rte_spinlock_lock(&dev->stats_lock);
>     netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts,
>                                          dropped);
>     rte_spinlock_unlock(&dev->stats_lock);
> -------------------------------------------------------------------------------------------------------
> 
> 
> Vring_state_changed().
> 
> As t_q[].map should be atomic and is updated both in vring_state_changed and netdev_dpdk_remap_txqs(),
>  I made updates to vring_state_changed(). 
> 
> ------------------------------------------------------------------------------------------------------
> LIST_FOR_EACH (dev, list_node, &dpdk_list) {
>         ovs_mutex_lock(&dev->mutex);
>         if (strncmp(ifname, dev->vhost_id, IF_NAME_SZ) == 0) {
>             int mapped_qid;
> 
>             /* Acquire tx_lock as the dpdk_vhost_send() function will
>              * read the tx_q[qid].map and lock the corresponding queue. */
>             rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> 
>             mapped_qid = dev->tx_q[qid].map;
>             if (OVS_UNLIKELY(qid != mapped_qid)) {
>                 rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
>             }
> 
>             netdev_dpdk_vhost_txq_flush(&dev->up, mapped_qid, 0);
> 
>             if (enable) {
>                 dev->tx_q[qid].map = qid;
>             } else {
>                 dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>             }
> 
>             netdev_dpdk_remap_txqs(dev);

At this point netdev_dpdk_remap_txqs is able to remap other disabled queues
and we need to flush them too, I guess.

> 
>             if (OVS_UNLIKELY(qid != mapped_qid)) {
>                 rte_spinlock_unlock(&dev->tx_q[mapped_qid].tx_lock);
>             }
>             rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> 
>             exists = true;
>             ovs_mutex_unlock(&dev->mutex);
>             break;
>         }
>         ovs_mutex_unlock(&dev->mutex);
>     }
> ----------------------------------------------------------------------------------------------
> 
> Regards,
> Bhanuprakash.
>
Ilya Maximets Aug. 11, 2017, 1:34 p.m. UTC | #10
On 11.08.2017 16:11, Bodireddy, Bhanuprakash wrote:
>> On 09.08.2017 15:35, Bodireddy, Bhanuprakash wrote:
>>>>>
>>>>> +static int
>>>>> +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid) {
>>>>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>>>>> +    struct rte_mbuf **cur_pkts = (struct rte_mbuf
>>>>> +**)txq->vhost_burst_pkts;
>>>>> +
>>>>> +    int tx_vid = netdev_dpdk_get_vid(dev);
>>>>> +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
>>>>> +    uint32_t sent = 0;
>>>>> +    uint32_t retries = 0;
>>>>> +    uint32_t sum, total_pkts;
>>>>> +
>>>>> +    total_pkts = sum = txq->vhost_pkt_cnt;
>>>>> +    do {
>>>>> +        uint32_t ret;
>>>>> +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid,
>>>>> + &cur_pkts[sent],
>>>> sum);
>>>>> +        if (OVS_UNLIKELY(!ret)) {
>>>>> +            /* No packets enqueued - do not retry. */
>>>>> +            break;
>>>>> +        } else {
>>>>> +            /* Packet have been sent. */
>>>>> +            sent += ret;
>>>>> +
>>>>> +            /* 'sum' packet have to be retransmitted. */
>>>>> +            sum -= ret;
>>>>> +        }
>>>>> +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
>>>>> +
>>>>> +    for (int i = 0; i < total_pkts; i++) {
>>>>> +        dp_packet_delete(txq->vhost_burst_pkts[i]);
>>>>> +    }
>>>>> +
>>>>> +    /* Reset pkt count. */
>>>>> +    txq->vhost_pkt_cnt = 0;
>>>>> +
>>>>> +    /* 'sum' refers to packets dropped. */
>>>>> +    return sum;
>>>>> +}
>>>>> +
>>>>> +/* Flush the txq if there are any packets available. */ static int
>>>>> +netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
>>>>> +                            bool concurrent_txq OVS_UNUSED) {
>>>>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>>>>> +    struct dpdk_tx_queue *txq;
>>>>> +
>>>>> +    qid = dev->tx_q[qid % netdev->n_txq].map;
>>>>> +
>>>>> +    /* The qid may be disabled in the guest and has been set to
>>>>> +     * OVS_VHOST_QUEUE_DISABLED.
>>>>> +     */
>>>>> +    if (OVS_UNLIKELY(qid < 0)) {
>>>>> +        return 0;
>>>>> +    }
>>>>> +
>>>>> +    txq = &dev->tx_q[qid];
>>>>> +    /* Increment the drop count and free the memory. */
>>>>> +    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
>>>>> +                     !(dev->flags & NETDEV_UP))) {
>>>>> +
>>>>> +        if (txq->vhost_pkt_cnt) {
>>>>> +            rte_spinlock_lock(&dev->stats_lock);
>>>>> +            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>>>>> +            rte_spinlock_unlock(&dev->stats_lock);
>>>>> +
>>>>> +            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>>>>> +                dp_packet_delete(txq->vhost_burst_pkts[i]);
>>>>
>>>> Spinlock (tx_lock) must be held here to avoid queue and mempool
>> breakage.
>>>
>>> I think you are right. tx_lock might be acquired for freeing the packets.
>>
>> I think that 'vhost_pkt_cnt' reads and updates also should be protected to
>> avoid races.
> 
> From the discussion in the thread https://mail.openvswitch.org/pipermail/ovs-dev/2017-August/337133.html,
> We are going to acquire tx_lock for updating the map and flushing the queue inside vring_state_changed(). 
> 
> That triggers a deadlock in the  flushing function as we have already acquired the same lock in netdev_dpdk_vhost_txq_flush().
> This is the same problem for freeing the memory and protecting the updates to vhost_pkt_cnt.
> 
>    if (OVS_LIKELY(txq->vhost_pkt_cnt)) {
>          rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>         netdev_dpdk_vhost_tx_burst(dev, qid);
>         rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>    }
> 
> As the problem is triggered when the guest queues are enabled/disabled, with a small race window where packets can get enqueued in to the queue just after the flush and before map value is updated in cb function(vring_state_changed()), how abt this?
> 
> Technically as the queues are disabled, there is no point in flushing the packets, so let's free the packets and set the txq->vhost_pkt_cnt in vring_state_changed() itself instead of calling flush().

Technically, enabling case also should be handled, because while enabling
we're remapping the queue and, in some specific cases, I guess, the old
queue may be not used after remapping by the threads.

> 
> vring_state_changed().
> ------------------------------------------------------
> rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> 
> mapped_qid = dev->tx_q[qid].map;
>  if (OVS_UNLIKELY(qid != mapped_qid)) {
>         rte_spinlock_lock(&dev->tx_q[mapped_qid].tx_lock);
> }
> 
> if (enable) {
>                 dev->tx_q[qid].map = qid;
>   } else {
>                 struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>                 if (txq->vhost_pkt_cnt) {
>                     rte_spinlock_lock(&dev->stats_lock);
>                     dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
>                     rte_spinlock_unlock(&dev->stats_lock);
> 
>                     for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>                         dp_packet_delete(txq->vhost_burst_pkts[i]);
>                     }
>                     txq->vhost_pkt_cnt = 0;
>                 }
> 
>                 dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
>   }
> -------------------------------------------------------------------------
> 
> Regards,
> Bhanuprakash.
> 
>>
>>> ---------------------------------------------------------------------------
>>>     rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
>>>     for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
>>>              dp_packet_delete(txq->vhost_burst_pkts[i]);
>>>     }
>>>     rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
>>>
>>> - Bhanuprakash
>>>
diff mbox

Patch

diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 50d6b29..d3892fe 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -327,12 +327,22 @@  struct dpdk_tx_queue {
                                     * pmd threads (see 'concurrent_txq'). */
     int map;                       /* Mapping of configured vhost-user queues
                                     * to enabled by guest. */
-    int dpdk_pkt_cnt;              /* Number of buffered packets waiting to
+    union {
+        int dpdk_pkt_cnt;          /* Number of buffered packets waiting to
                                       be sent on DPDK tx queue. */
-    struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
+        int vhost_pkt_cnt;         /* Number of buffered packets waiting to
+                                      be sent on vhost port. */
+    };
+
+    union {
+        struct rte_mbuf *dpdk_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
                                    /* Intermediate queue where packets can
                                     * be buffered to amortize the cost of MMIO
                                     * writes. */
+        struct dp_packet *vhost_burst_pkts[INTERIM_QUEUE_BURST_THRESHOLD];
+                                   /* Intermediate queue where packets can
+                                    * be buffered for vhost ports. */
+    };
 };
 
 /* dpdk has no way to remove dpdk ring ethernet devices
@@ -1756,6 +1766,88 @@  netdev_dpdk_vhost_update_tx_counters(struct netdev_stats *stats,
     }
 }
 
+static int
+netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->vhost_burst_pkts;
+
+    int tx_vid = netdev_dpdk_get_vid(dev);
+    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+    uint32_t sent = 0;
+    uint32_t retries = 0;
+    uint32_t sum, total_pkts;
+
+    total_pkts = sum = txq->vhost_pkt_cnt;
+    do {
+        uint32_t ret;
+        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum);
+        if (OVS_UNLIKELY(!ret)) {
+            /* No packets enqueued - do not retry. */
+            break;
+        } else {
+            /* Packet have been sent. */
+            sent += ret;
+
+            /* 'sum' packet have to be retransmitted. */
+            sum -= ret;
+        }
+    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
+
+    for (int i = 0; i < total_pkts; i++) {
+        dp_packet_delete(txq->vhost_burst_pkts[i]);
+    }
+
+    /* Reset pkt count. */
+    txq->vhost_pkt_cnt = 0;
+
+    /* 'sum' refers to packets dropped. */
+    return sum;
+}
+
+/* Flush the txq if there are any packets available. */
+static int
+netdev_dpdk_vhost_txq_flush(struct netdev *netdev, int qid,
+                            bool concurrent_txq OVS_UNUSED)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct dpdk_tx_queue *txq;
+
+    qid = dev->tx_q[qid % netdev->n_txq].map;
+
+    /* The qid may be disabled in the guest and has been set to
+     * OVS_VHOST_QUEUE_DISABLED.
+     */
+    if (OVS_UNLIKELY(qid < 0)) {
+        return 0;
+    }
+
+    txq = &dev->tx_q[qid];
+    /* Increment the drop count and free the memory. */
+    if (OVS_UNLIKELY(!is_vhost_running(dev) ||
+                     !(dev->flags & NETDEV_UP))) {
+
+        if (txq->vhost_pkt_cnt) {
+            rte_spinlock_lock(&dev->stats_lock);
+            dev->stats.tx_dropped+= txq->vhost_pkt_cnt;
+            rte_spinlock_unlock(&dev->stats_lock);
+
+            for (int i = 0; i < txq->vhost_pkt_cnt; i++) {
+                dp_packet_delete(txq->vhost_burst_pkts[i]);
+            }
+            txq->vhost_pkt_cnt = 0;
+        }
+    }
+
+    if (OVS_LIKELY(txq->vhost_pkt_cnt)) {
+        rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
+        netdev_dpdk_vhost_tx_burst(dev, qid);
+        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
+    }
+
+    return 0;
+}
+
 static void
 __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                          struct dp_packet **pkts, int cnt)
@@ -2799,6 +2891,17 @@  vring_state_changed(int vid, uint16_t queue_id, int enable)
             if (enable) {
                 dev->tx_q[qid].map = qid;
             } else {
+                /* If the queue is disabled in the guest, the corresponding qid
+                 * map shall be set to OVS_VHOST_QUEUE_DISABLED(-2).
+                 *
+                 * The packets that were queued in 'qid' could be potentially
+                 * stuck and needs to be dropped.
+                 *
+                 * XXX: The queues may be already disabled in the guest so
+                 * flush function in this case only helps in updating stats
+                 * and freeing memory.
+                 */
+                netdev_dpdk_vhost_txq_flush(&dev->up, qid, 0);
                 dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
             }
             netdev_dpdk_remap_txqs(dev);
@@ -3471,7 +3574,8 @@  static const struct netdev_class dpdk_vhost_class =
         NULL,
         netdev_dpdk_vhost_reconfigure,
         netdev_dpdk_vhost_rxq_recv,
-        NULL);
+        netdev_dpdk_vhost_txq_flush);
+
 static const struct netdev_class dpdk_vhost_client_class =
     NETDEV_DPDK_CLASS(
         "dpdkvhostuserclient",
@@ -3487,7 +3591,7 @@  static const struct netdev_class dpdk_vhost_client_class =
         NULL,
         netdev_dpdk_vhost_client_reconfigure,
         netdev_dpdk_vhost_rxq_recv,
-        NULL);
+        netdev_dpdk_vhost_txq_flush);
 
 void
 netdev_dpdk_register(void)