diff mbox series

[ovs-dev,v4,6/7] dpif-netdev: Time based output batching.

Message ID 1507215962-17692-7-git-send-email-i.maximets@samsung.com
State Superseded
Headers show
Series Output packet batching. | expand

Commit Message

Ilya Maximets Oct. 5, 2017, 3:06 p.m. UTC
This allows to collect packets from more than one RX burst
and send them together with a configurable intervals.

'other_config:tx-flush-interval' can be used to configure
time that a packet can wait in output batch for sending.

dpif-netdev turned to microsecond resolution for time
measuring to ensure desired resolution of 'tx-flush-interval'.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
 lib/dpif-netdev.c    | 141 ++++++++++++++++++++++++++++++++++++++++-----------
 vswitchd/vswitch.xml |  16 ++++++
 2 files changed, 127 insertions(+), 30 deletions(-)

Comments

Eelco Chaudron Oct. 11, 2017, 9:18 a.m. UTC | #1
On 05/10/17 17:06, Ilya Maximets wrote:
> This allows to collect packets from more than one RX burst
> and send them together with a configurable intervals.
>
> 'other_config:tx-flush-interval' can be used to configure
> time that a packet can wait in output batch for sending.
>
> dpif-netdev turned to microsecond resolution for time
> measuring to ensure desired resolution of 'tx-flush-interval'.
>
> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---8<--- ... --->8---
> @@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
>           smap_get_ullong(other_config, "emc-insert-inv-prob",
>                           DEFAULT_EM_FLOW_INSERT_INV_PROB);
>       uint32_t insert_min, cur_min;
> +    uint32_t tx_flush_interval, cur_tx_flush_interval;
> +
> +    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
> +                                     DEFAULT_TX_FLUSH_INTERVAL);
> +    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
> +    if (tx_flush_interval != cur_tx_flush_interval) {
> +        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
> +        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
> +                  tx_flush_interval);
> +    }
I was wondering if it will be an issue that the new value configured 
might not
be in effect until the previous configured flush timeout has expired.
I wanted to suggest to maybe store the p->last_flush_time rather than 
p->flush
time. But looking at it again, with the maximum value being 1 second, it 
should
not be a problem...

All looks good to me

Acked-by: Eelco Chaudron <echaudro@redhat.com>

---8<--- ... --->8---
Bodireddy, Bhanuprakash Oct. 13, 2017, 2:33 p.m. UTC | #2
>This allows to collect packets from more than one RX burst and send them
>together with a configurable intervals.
>
>'other_config:tx-flush-interval' can be used to configure time that a packet
>can wait in output batch for sending.
>
>dpif-netdev turned to microsecond resolution for time measuring to ensure
>desired resolution of 'tx-flush-interval'.
>
>Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>---
> lib/dpif-netdev.c    | 141
>++++++++++++++++++++++++++++++++++++++++-----------
> vswitchd/vswitch.xml |  16 ++++++
> 2 files changed, 127 insertions(+), 30 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 166b73a..3ddb711
>100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
> #define MAX_RECIRC_DEPTH 5
> DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>
>+/* Use instant packet send by default. */ #define
>+DEFAULT_TX_FLUSH_INTERVAL 0
>+
> /* Configuration parameters. */
> enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table.
>*/
> enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
>@@ -178,12 +181,13 @@ struct emc_cache {
> 

> /* Simple non-wildcarding single-priority classifier. */
>
>-/* Time in ms between successive optimizations of the dpcls subtable vector
>*/ -#define DPCLS_OPTIMIZATION_INTERVAL 1000
>+/* Time in microseconds between successive optimizations of the dpcls
>+ * subtable vector */
>+#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
>
>-/* Time in ms of the interval in which rxq processing cycles used in
>- * rxq to pmd assignments is measured and stored. */ -#define
>PMD_RXQ_INTERVAL_LEN 10000
>+/* Time in microseconds of the interval in which rxq processing cycles
>+used
>+ * in rxq to pmd assignments is measured and stored. */ #define
>+PMD_RXQ_INTERVAL_LEN 10000000LL
>
> /* Number of intervals for which cycles are stored
>  * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@ struct
>dp_netdev {
>     struct hmap ports;
>     struct seq *port_seq;       /* Incremented whenever a port changes. */
>
>+    /* The time that a packet can wait in output batch for sending. */
>+    atomic_uint32_t tx_flush_interval;
>+
>     /* Meters. */
>     struct ovs_mutex meter_locks[N_METER_LOCKS];
>     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7
>+363,7 @@ enum rxq_cycles_counter_type {
>     RXQ_N_CYCLES
> };
>
>-#define XPS_TIMEOUT_MS 500LL
>+#define XPS_TIMEOUT 500000LL    /* In microseconds. */
>
> /* Contained by struct dp_netdev_port's 'rxqs' member.  */  struct
>dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port {
>     int qid;
>     long long last_used;
>     struct hmap_node node;
>+    long long flush_time;
>     struct dp_packet_batch output_pkts;  };
>
>@@ -614,6 +622,9 @@ struct dp_netdev_pmd_thread {
>      * than 'cmap_count(dp->poll_threads)'. */
>     uint32_t static_tx_qid;
>
>+    /* Number of filled output batches. */
>+    int n_output_batches;
>+
>     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
>     /* List of rx queues to poll. */
>     struct hmap poll_list OVS_GUARDED;
>@@ -707,8 +718,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>dp_netdev_pmd_thread *pmd,  static void
>dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>                                        struct rxq_poll *poll)
>     OVS_REQUIRES(pmd->port_mutex);
>-static void
>-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd);
>+static int
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+                                   bool force);
>
> static void reconfigure_datapath(struct dp_netdev *dp)
>     OVS_REQUIRES(dp->port_mutex);
>@@ -783,7 +795,7 @@ emc_cache_slow_sweep(struct emc_cache
>*flow_cache)  static inline void  pmd_thread_ctx_time_update(struct
>dp_netdev_pmd_thread *pmd)  {
>-    pmd->ctx.now = time_msec();
>+    pmd->ctx.now = time_usec();
> }
>
> /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */ @@ -
>1283,6 +1295,7 @@ create_dp_netdev(const char *name, const struct
>dpif_class *class,
>     conntrack_init(&dp->conntrack);
>
>     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
>+    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
>
>     cmap_init(&dp->poll_threads);
>
>@@ -2950,7 +2963,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>dpif_execute *execute)
>     dp_packet_batch_init_packet(&pp, execute->packet);
>     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>                               execute->actions, execute->actions_len);
>-    dp_netdev_pmd_flush_output_packets(pmd);
>+    dp_netdev_pmd_flush_output_packets(pmd, true);
>
>     if (pmd->core_id == NON_PMD_CORE_ID) {
>         ovs_mutex_unlock(&dp->non_pmd_mutex);
>@@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const
>struct smap *other_config)
>         smap_get_ullong(other_config, "emc-insert-inv-prob",
>                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
>     uint32_t insert_min, cur_min;
>+    uint32_t tx_flush_interval, cur_tx_flush_interval;
>+
>+    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
>+                                     DEFAULT_TX_FLUSH_INTERVAL);
>+    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
>+    if (tx_flush_interval != cur_tx_flush_interval) {
>+        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
>+        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
>+                  tx_flush_interval);
>+    }
>
>     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>         free(dp->pmd_cmask);
>@@ -3237,11 +3260,12 @@ dp_netdev_rxq_get_intrvl_cycles(struct
>dp_netdev_rxq *rx, unsigned idx)
>     return processing_cycles;
> }
>
>-static void
>+static int
> dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
>                                    struct tx_port *p)  {
>     int tx_qid;
>+    int output_cnt;
>     bool dynamic_txqs;
>
>     dynamic_txqs = p->port->dynamic_txqs; @@ -3251,20 +3275,41 @@
>dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
>         tx_qid = pmd->static_tx_qid;
>     }
>
>+    output_cnt = dp_packet_batch_size(&p->output_pkts);
>     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
>     dp_packet_batch_init(&p->output_pkts);
>+
>+    if (output_cnt) {

[BHANU] Reading the output batch size and having this extra check above seems redundant as 
we will only come here when !dp_packet_batch_is_empty() and the caller is dp_netdev_pmd_flush_output_packets().  

Even for other cases where this API is called directly (from  dp_execute_cb), the checks here
seems redundant as its checked at the caller. 

- Bhanuprakash.

>+        uint32_t tx_flush_interval;
>+
>+        /* Update time of the next flush. */
>+        atomic_read_relaxed(&pmd->dp->tx_flush_interval,
>&tx_flush_interval);
>+        p->flush_time = pmd->ctx.now + tx_flush_interval;
>+
>+        ovs_assert(pmd->n_output_batches > 0);
>+        pmd->n_output_batches--;
>+    }
>+    return output_cnt;
> }
>
>-static void
>-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd)
>+static int
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+                                   bool force)
> {
>     struct tx_port *p;
>+    int output_cnt = 0;
>+
>+    if (!pmd->n_output_batches) {
>+        return 0;
>+    }
>
>     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>-        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>-            dp_netdev_pmd_flush_output_on_port(pmd, p);
>+        if (!dp_packet_batch_is_empty(&p->output_pkts)
>+            && (force || pmd->ctx.now >= p->flush_time)) {
>+            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
>         }
>     }
>+    return output_cnt;
> }
>
> static int
>@@ -3274,7 +3319,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,  {
>     struct dp_packet_batch batch;
>     int error;
>-    int batch_cnt = 0;
>+    int batch_cnt = 0, output_cnt = 0;
>
>     dp_packet_batch_init(&batch);
>     error = netdev_rxq_recv(rx, &batch); @@ -3284,7 +3329,7 @@
>dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>
>         batch_cnt = batch.count;
>         dp_netdev_input(pmd, &batch, port_no);
>-        dp_netdev_pmd_flush_output_packets(pmd);
>+        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
>     } else if (error != EAGAIN && error != EOPNOTSUPP) {
>         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>
>@@ -3292,7 +3337,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,
>                     netdev_rxq_get_name(rx), ovs_strerror(error));
>     }
>
>-    return batch_cnt;
>+    return batch_cnt + output_cnt;
> }
>
> static struct tx_port *
>@@ -3904,7 +3949,8 @@ dpif_netdev_run(struct dpif *dpif)
>     struct dp_netdev *dp = get_dp_netdev(dpif);
>     struct dp_netdev_pmd_thread *non_pmd;
>     uint64_t new_tnl_seq;
>-    int process_packets = 0;
>+    int process_packets;
>+    bool need_to_flush = true;
>
>     ovs_mutex_lock(&dp->port_mutex);
>     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3924,9
>+3970,22 @@ dpif_netdev_run(struct dpif *dpif)
>                                               process_packets
>                                               ? PMD_CYCLES_PROCESSING
>                                               : PMD_CYCLES_IDLE);
>+                    if (process_packets) {
>+                        need_to_flush = false;
>+                    }
>                 }
>             }
>         }
>+        if (need_to_flush) {
>+            /* We didn't receive anything in the process loop.
>+             * Check if we need to send something. */
>+            process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
>+                                                                 false);
>+            cycles_count_intermediate(non_pmd, NULL, process_packets
>+                                                     ? PMD_CYCLES_PROCESSING
>+                                                     : PMD_CYCLES_IDLE);
>+        }
>+
>         cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
>         pmd_thread_ctx_time_update(non_pmd);
>         dpif_netdev_xps_revalidate_pmd(non_pmd, false); @@ -3979,6 +4038,8
>@@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)  {
>     struct tx_port *tx_port_cached;
>
>+    /* Flush all the queued packets. */
>+    dp_netdev_pmd_flush_output_packets(pmd, true);
>     /* Free all used tx queue ids. */
>     dpif_netdev_xps_revalidate_pmd(pmd, true);
>
>@@ -4077,7 +4138,6 @@ pmd_thread_main(void *f_)
>     bool exiting;
>     int poll_cnt;
>     int i;
>-    int process_packets = 0;
>
>     poll_list = NULL;
>
>@@ -4107,6 +4167,9 @@ reload:
>
>     cycles_count_start(pmd);
>     for (;;) {
>+        int process_packets;
>+        bool need_to_flush = true;
>+
>         for (i = 0; i < poll_cnt; i++) {
>             process_packets =
>                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx, @@ -4114,6
>+4177,18 @@ reload:
>             cycles_count_intermediate(pmd, poll_list[i].rxq,
>                                       process_packets ? PMD_CYCLES_PROCESSING
>                                                       : PMD_CYCLES_IDLE);
>+            if (process_packets) {
>+                need_to_flush = false;
>+            }
>+        }
>+
>+        if (need_to_flush) {
>+            /* We didn't receive anything in the process loop.
>+             * Check if we need to send something. */
>+            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
>false);
>+            cycles_count_intermediate(pmd, NULL,
>+                                      process_packets ? PMD_CYCLES_PROCESSING
>+                                                      :
>+ PMD_CYCLES_IDLE);
>         }
>
>         if (lc++ > 1024) {
>@@ -4207,7 +4282,7 @@ dp_netdev_run_meter(struct dp_netdev *dp,
>struct dp_packet_batch *packets_,
>     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>
>     /* All packets will hit the meter at the same time. */
>-    long_delta_t = (now - meter->used); /* msec */
>+    long_delta_t = (now - meter->used) / 1000; /* msec */
>
>     /* Make sure delta_t will not be too large, so that bucket will not
>      * wrap around below. */
>@@ -4363,7 +4438,7 @@ dpif_netdev_meter_set(struct dpif *dpif,
>ofproto_meter_id *meter_id,
>         meter->flags = config->flags;
>         meter->n_bands = config->n_bands;
>         meter->max_delta_t = 0;
>-        meter->used = time_msec();
>+        meter->used = time_usec();
>
>         /* set up bands */
>         for (i = 0; i < config->n_bands; ++i) { @@ -4561,6 +4636,7 @@
>dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>dp_netdev *dp,
>     pmd->core_id = core_id;
>     pmd->numa_id = numa_id;
>     pmd->need_reload = false;
>+    pmd->n_output_batches = 0;
>
>     ovs_refcount_init(&pmd->ref_cnt);
>     latch_init(&pmd->exit_latch);
>@@ -4748,6 +4824,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
>
>     tx->port = port;
>     tx->qid = -1;
>+    tx->flush_time = 0LL;
>     dp_packet_batch_init(&tx->output_pkts);
>
>     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no)); @@ -4911,7 +4988,7 @@ packet_batch_per_flow_execute(struct
>packet_batch_per_flow *batch,
>     struct dp_netdev_flow *flow = batch->flow;
>
>     dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
>-                        batch->tcp_flags, pmd->ctx.now);
>+                        batch->tcp_flags, pmd->ctx.now / 1000);
>
>     actions = dp_netdev_flow_get_actions(flow);
>
>@@ -5286,7 +5363,7 @@ dpif_netdev_xps_revalidate_pmd(const struct
>dp_netdev_pmd_thread *pmd,
>             continue;
>         }
>         interval = pmd->ctx.now - tx->last_used;
>-        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
>+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
>             port = tx->port;
>             ovs_mutex_lock(&port->txq_used_mutex);
>             port->txq_used[tx->qid]--;
>@@ -5307,7 +5384,7 @@ dpif_netdev_xps_get_tx_qid(const struct
>dp_netdev_pmd_thread *pmd,
>     interval = pmd->ctx.now - tx->last_used;
>     tx->last_used = pmd->ctx.now;
>
>-    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
>+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
>         return tx->qid;
>     }
>
>@@ -5439,12 +5516,16 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
>                 dp_netdev_pmd_flush_output_on_port(pmd, p);
>             }
> #endif
>-            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>-                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>-                /* Some packets was generated while input batch processing.
>-                 * Flush here to avoid overflow. */
>+            if (dp_packet_batch_size(&p->output_pkts)
>+                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
>+                /* Flush here to avoid overflow. */
>                 dp_netdev_pmd_flush_output_on_port(pmd, p);
>             }
>+
>+            if (dp_packet_batch_is_empty(&p->output_pkts)) {
>+                pmd->n_output_batches++;
>+            }
>+
>             DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>                 dp_packet_batch_add(&p->output_pkts, packet);
>             }
>@@ -5685,7 +5766,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
>
>         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type,
>force,
>                           commit, zone, setmark, setlabel, helper,
>-                          nat_action_info_ref, pmd->ctx.now);
>+                          nat_action_info_ref, pmd->ctx.now / 1000);
>         break;
>     }
>
>diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index
>074535b..b13b0fa 100644
>--- a/vswitchd/vswitch.xml
>+++ b/vswitchd/vswitch.xml
>@@ -344,6 +344,22 @@
>         </p>
>       </column>
>
>+      <column name="other_config" key="tx-flush-interval"
>+              type='{"type": "integer",
>+                     "minInteger": 0, "maxInteger": 1000000}'>
>+        <p>
>+          Specifies the time in microseconds that a packet can wait in output
>+          batch for sending i.e. amount of time that packet can spend in an
>+          intermediate output queue before sending to netdev.
>+          This option can be used to configure balance between throughput
>+          and latency. Lower values decreases latency while higher values
>+          may be useful to achieve higher performance.
>+        </p>
>+        <p>
>+          Defaults to 0 i.e. instant packet sending (latency optimized).
>+        </p>
>+      </column>
>+
>       <column name="other_config" key="n-handler-threads"
>               type='{"type": "integer", "minInteger": 1}'>
>         <p>
>--
>2.7.4
Ilya Maximets Oct. 27, 2017, 11:23 a.m. UTC | #3
On 13.10.2017 17:33, Bodireddy, Bhanuprakash wrote:
>> This allows to collect packets from more than one RX burst and send them
>> together with a configurable intervals.
>>
>> 'other_config:tx-flush-interval' can be used to configure time that a packet
>> can wait in output batch for sending.
>>
>> dpif-netdev turned to microsecond resolution for time measuring to ensure
>> desired resolution of 'tx-flush-interval'.
>>
>> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>> ---
>> lib/dpif-netdev.c    | 141
>> ++++++++++++++++++++++++++++++++++++++++-----------
>> vswitchd/vswitch.xml |  16 ++++++
>> 2 files changed, 127 insertions(+), 30 deletions(-)
>>
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 166b73a..3ddb711
>> 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>> #define MAX_RECIRC_DEPTH 5
>> DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>>
>> +/* Use instant packet send by default. */ #define
>> +DEFAULT_TX_FLUSH_INTERVAL 0
>> +
>> /* Configuration parameters. */
>> enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table.
>> */
>> enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
>> @@ -178,12 +181,13 @@ struct emc_cache {
>>
> 
>> /* Simple non-wildcarding single-priority classifier. */
>>
>> -/* Time in ms between successive optimizations of the dpcls subtable vector
>> */ -#define DPCLS_OPTIMIZATION_INTERVAL 1000
>> +/* Time in microseconds between successive optimizations of the dpcls
>> + * subtable vector */
>> +#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
>>
>> -/* Time in ms of the interval in which rxq processing cycles used in
>> - * rxq to pmd assignments is measured and stored. */ -#define
>> PMD_RXQ_INTERVAL_LEN 10000
>> +/* Time in microseconds of the interval in which rxq processing cycles
>> +used
>> + * in rxq to pmd assignments is measured and stored. */ #define
>> +PMD_RXQ_INTERVAL_LEN 10000000LL
>>
>> /* Number of intervals for which cycles are stored
>>  * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@ struct
>> dp_netdev {
>>     struct hmap ports;
>>     struct seq *port_seq;       /* Incremented whenever a port changes. */
>>
>> +    /* The time that a packet can wait in output batch for sending. */
>> +    atomic_uint32_t tx_flush_interval;
>> +
>>     /* Meters. */
>>     struct ovs_mutex meter_locks[N_METER_LOCKS];
>>     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7
>> +363,7 @@ enum rxq_cycles_counter_type {
>>     RXQ_N_CYCLES
>> };
>>
>> -#define XPS_TIMEOUT_MS 500LL
>> +#define XPS_TIMEOUT 500000LL    /* In microseconds. */
>>
>> /* Contained by struct dp_netdev_port's 'rxqs' member.  */  struct
>> dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port {
>>     int qid;
>>     long long last_used;
>>     struct hmap_node node;
>> +    long long flush_time;
>>     struct dp_packet_batch output_pkts;  };
>>
>> @@ -614,6 +622,9 @@ struct dp_netdev_pmd_thread {
>>      * than 'cmap_count(dp->poll_threads)'. */
>>     uint32_t static_tx_qid;
>>
>> +    /* Number of filled output batches. */
>> +    int n_output_batches;
>> +
>>     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
>>     /* List of rx queues to poll. */
>>     struct hmap poll_list OVS_GUARDED;
>> @@ -707,8 +718,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>> dp_netdev_pmd_thread *pmd,  static void
>> dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>>                                        struct rxq_poll *poll)
>>     OVS_REQUIRES(pmd->port_mutex);
>> -static void
>> -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>> *pmd);
>> +static int
>> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>> *pmd,
>> +                                   bool force);
>>
>> static void reconfigure_datapath(struct dp_netdev *dp)
>>     OVS_REQUIRES(dp->port_mutex);
>> @@ -783,7 +795,7 @@ emc_cache_slow_sweep(struct emc_cache
>> *flow_cache)  static inline void  pmd_thread_ctx_time_update(struct
>> dp_netdev_pmd_thread *pmd)  {
>> -    pmd->ctx.now = time_msec();
>> +    pmd->ctx.now = time_usec();
>> }
>>
>> /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */ @@ -
>> 1283,6 +1295,7 @@ create_dp_netdev(const char *name, const struct
>> dpif_class *class,
>>     conntrack_init(&dp->conntrack);
>>
>>     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
>> +    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
>>
>>     cmap_init(&dp->poll_threads);
>>
>> @@ -2950,7 +2963,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>> dpif_execute *execute)
>>     dp_packet_batch_init_packet(&pp, execute->packet);
>>     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>>                               execute->actions, execute->actions_len);
>> -    dp_netdev_pmd_flush_output_packets(pmd);
>> +    dp_netdev_pmd_flush_output_packets(pmd, true);
>>
>>     if (pmd->core_id == NON_PMD_CORE_ID) {
>>         ovs_mutex_unlock(&dp->non_pmd_mutex);
>> @@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const
>> struct smap *other_config)
>>         smap_get_ullong(other_config, "emc-insert-inv-prob",
>>                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
>>     uint32_t insert_min, cur_min;
>> +    uint32_t tx_flush_interval, cur_tx_flush_interval;
>> +
>> +    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
>> +                                     DEFAULT_TX_FLUSH_INTERVAL);
>> +    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
>> +    if (tx_flush_interval != cur_tx_flush_interval) {
>> +        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
>> +        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
>> +                  tx_flush_interval);
>> +    }
>>
>>     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>>         free(dp->pmd_cmask);
>> @@ -3237,11 +3260,12 @@ dp_netdev_rxq_get_intrvl_cycles(struct
>> dp_netdev_rxq *rx, unsigned idx)
>>     return processing_cycles;
>> }
>>
>> -static void
>> +static int
>> dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>> *pmd,
>>                                    struct tx_port *p)  {
>>     int tx_qid;
>> +    int output_cnt;
>>     bool dynamic_txqs;
>>
>>     dynamic_txqs = p->port->dynamic_txqs; @@ -3251,20 +3275,41 @@
>> dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>> *pmd,
>>         tx_qid = pmd->static_tx_qid;
>>     }
>>
>> +    output_cnt = dp_packet_batch_size(&p->output_pkts);
>>     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
>>     dp_packet_batch_init(&p->output_pkts);
>> +
>> +    if (output_cnt) {
> 
> [BHANU] Reading the output batch size and having this extra check above seems redundant as 
> we will only come here when !dp_packet_batch_is_empty() and the caller is dp_netdev_pmd_flush_output_packets().  
> 
> Even for other cases where this API is called directly (from  dp_execute_cb), the checks here
> seems redundant as its checked at the caller. 
> 
> - Bhanuprakash.

You're right that we're calling dp_netdev_pmd_flush_output_on_port() only if
we have something to flush, but I think that we need some kind of checking
anyway to avoid issues in the future and keep the function complete because
it updates global variables assuming something was really sent.
If you don't like this extra check, maybe ovs_assert(output_cnt > 0);
will be suitable here. What do you think?

> 
>> +        uint32_t tx_flush_interval;
>> +
>> +        /* Update time of the next flush. */
>> +        atomic_read_relaxed(&pmd->dp->tx_flush_interval,
>> &tx_flush_interval);
>> +        p->flush_time = pmd->ctx.now + tx_flush_interval;
>> +
>> +        ovs_assert(pmd->n_output_batches > 0);
>> +        pmd->n_output_batches--;
>> +    }
>> +    return output_cnt;
>> }
>>
>> -static void
>> -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>> *pmd)
>> +static int
>> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>> *pmd,
>> +                                   bool force)
>> {
>>     struct tx_port *p;
>> +    int output_cnt = 0;
>> +
>> +    if (!pmd->n_output_batches) {
>> +        return 0;
>> +    }
>>
>>     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>> -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>> -            dp_netdev_pmd_flush_output_on_port(pmd, p);
>> +        if (!dp_packet_batch_is_empty(&p->output_pkts)
>> +            && (force || pmd->ctx.now >= p->flush_time)) {
>> +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
>>         }
>>     }
>> +    return output_cnt;
>> }
>>
>> static int
>> @@ -3274,7 +3319,7 @@ dp_netdev_process_rxq_port(struct
>> dp_netdev_pmd_thread *pmd,  {
>>     struct dp_packet_batch batch;
>>     int error;
>> -    int batch_cnt = 0;
>> +    int batch_cnt = 0, output_cnt = 0;
>>
>>     dp_packet_batch_init(&batch);
>>     error = netdev_rxq_recv(rx, &batch); @@ -3284,7 +3329,7 @@
>> dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>>
>>         batch_cnt = batch.count;
>>         dp_netdev_input(pmd, &batch, port_no);
>> -        dp_netdev_pmd_flush_output_packets(pmd);
>> +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
>>     } else if (error != EAGAIN && error != EOPNOTSUPP) {
>>         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>>
>> @@ -3292,7 +3337,7 @@ dp_netdev_process_rxq_port(struct
>> dp_netdev_pmd_thread *pmd,
>>                     netdev_rxq_get_name(rx), ovs_strerror(error));
>>     }
>>
>> -    return batch_cnt;
>> +    return batch_cnt + output_cnt;
>> }
>>
>> static struct tx_port *
>> @@ -3904,7 +3949,8 @@ dpif_netdev_run(struct dpif *dpif)
>>     struct dp_netdev *dp = get_dp_netdev(dpif);
>>     struct dp_netdev_pmd_thread *non_pmd;
>>     uint64_t new_tnl_seq;
>> -    int process_packets = 0;
>> +    int process_packets;
>> +    bool need_to_flush = true;
>>
>>     ovs_mutex_lock(&dp->port_mutex);
>>     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3924,9
>> +3970,22 @@ dpif_netdev_run(struct dpif *dpif)
>>                                               process_packets
>>                                               ? PMD_CYCLES_PROCESSING
>>                                               : PMD_CYCLES_IDLE);
>> +                    if (process_packets) {
>> +                        need_to_flush = false;
>> +                    }
>>                 }
>>             }
>>         }
>> +        if (need_to_flush) {
>> +            /* We didn't receive anything in the process loop.
>> +             * Check if we need to send something. */
>> +            process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
>> +                                                                 false);
>> +            cycles_count_intermediate(non_pmd, NULL, process_packets
>> +                                                     ? PMD_CYCLES_PROCESSING
>> +                                                     : PMD_CYCLES_IDLE);
>> +        }
>> +
>>         cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
>>         pmd_thread_ctx_time_update(non_pmd);
>>         dpif_netdev_xps_revalidate_pmd(non_pmd, false); @@ -3979,6 +4038,8
>> @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)  {
>>     struct tx_port *tx_port_cached;
>>
>> +    /* Flush all the queued packets. */
>> +    dp_netdev_pmd_flush_output_packets(pmd, true);
>>     /* Free all used tx queue ids. */
>>     dpif_netdev_xps_revalidate_pmd(pmd, true);
>>
>> @@ -4077,7 +4138,6 @@ pmd_thread_main(void *f_)
>>     bool exiting;
>>     int poll_cnt;
>>     int i;
>> -    int process_packets = 0;
>>
>>     poll_list = NULL;
>>
>> @@ -4107,6 +4167,9 @@ reload:
>>
>>     cycles_count_start(pmd);
>>     for (;;) {
>> +        int process_packets;
>> +        bool need_to_flush = true;
>> +
>>         for (i = 0; i < poll_cnt; i++) {
>>             process_packets =
>>                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx, @@ -4114,6
>> +4177,18 @@ reload:
>>             cycles_count_intermediate(pmd, poll_list[i].rxq,
>>                                       process_packets ? PMD_CYCLES_PROCESSING
>>                                                       : PMD_CYCLES_IDLE);
>> +            if (process_packets) {
>> +                need_to_flush = false;
>> +            }
>> +        }
>> +
>> +        if (need_to_flush) {
>> +            /* We didn't receive anything in the process loop.
>> +             * Check if we need to send something. */
>> +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
>> false);
>> +            cycles_count_intermediate(pmd, NULL,
>> +                                      process_packets ? PMD_CYCLES_PROCESSING
>> +                                                      :
>> + PMD_CYCLES_IDLE);
>>         }
>>
>>         if (lc++ > 1024) {
>> @@ -4207,7 +4282,7 @@ dp_netdev_run_meter(struct dp_netdev *dp,
>> struct dp_packet_batch *packets_,
>>     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>>
>>     /* All packets will hit the meter at the same time. */
>> -    long_delta_t = (now - meter->used); /* msec */
>> +    long_delta_t = (now - meter->used) / 1000; /* msec */
>>
>>     /* Make sure delta_t will not be too large, so that bucket will not
>>      * wrap around below. */
>> @@ -4363,7 +4438,7 @@ dpif_netdev_meter_set(struct dpif *dpif,
>> ofproto_meter_id *meter_id,
>>         meter->flags = config->flags;
>>         meter->n_bands = config->n_bands;
>>         meter->max_delta_t = 0;
>> -        meter->used = time_msec();
>> +        meter->used = time_usec();
>>
>>         /* set up bands */
>>         for (i = 0; i < config->n_bands; ++i) { @@ -4561,6 +4636,7 @@
>> dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>> dp_netdev *dp,
>>     pmd->core_id = core_id;
>>     pmd->numa_id = numa_id;
>>     pmd->need_reload = false;
>> +    pmd->n_output_batches = 0;
>>
>>     ovs_refcount_init(&pmd->ref_cnt);
>>     latch_init(&pmd->exit_latch);
>> @@ -4748,6 +4824,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>> dp_netdev_pmd_thread *pmd,
>>
>>     tx->port = port;
>>     tx->qid = -1;
>> +    tx->flush_time = 0LL;
>>     dp_packet_batch_init(&tx->output_pkts);
>>
>>     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>> port_no)); @@ -4911,7 +4988,7 @@ packet_batch_per_flow_execute(struct
>> packet_batch_per_flow *batch,
>>     struct dp_netdev_flow *flow = batch->flow;
>>
>>     dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
>> -                        batch->tcp_flags, pmd->ctx.now);
>> +                        batch->tcp_flags, pmd->ctx.now / 1000);
>>
>>     actions = dp_netdev_flow_get_actions(flow);
>>
>> @@ -5286,7 +5363,7 @@ dpif_netdev_xps_revalidate_pmd(const struct
>> dp_netdev_pmd_thread *pmd,
>>             continue;
>>         }
>>         interval = pmd->ctx.now - tx->last_used;
>> -        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
>> +        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
>>             port = tx->port;
>>             ovs_mutex_lock(&port->txq_used_mutex);
>>             port->txq_used[tx->qid]--;
>> @@ -5307,7 +5384,7 @@ dpif_netdev_xps_get_tx_qid(const struct
>> dp_netdev_pmd_thread *pmd,
>>     interval = pmd->ctx.now - tx->last_used;
>>     tx->last_used = pmd->ctx.now;
>>
>> -    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
>> +    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
>>         return tx->qid;
>>     }
>>
>> @@ -5439,12 +5516,16 @@ dp_execute_cb(void *aux_, struct
>> dp_packet_batch *packets_,
>>                 dp_netdev_pmd_flush_output_on_port(pmd, p);
>>             }
>> #endif
>> -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>> -                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>> -                /* Some packets was generated while input batch processing.
>> -                 * Flush here to avoid overflow. */
>> +            if (dp_packet_batch_size(&p->output_pkts)
>> +                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
>> +                /* Flush here to avoid overflow. */
>>                 dp_netdev_pmd_flush_output_on_port(pmd, p);
>>             }
>> +
>> +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
>> +                pmd->n_output_batches++;
>> +            }
>> +
>>             DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>>                 dp_packet_batch_add(&p->output_pkts, packet);
>>             }
>> @@ -5685,7 +5766,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>> *packets_,
>>
>>         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type,
>> force,
>>                           commit, zone, setmark, setlabel, helper,
>> -                          nat_action_info_ref, pmd->ctx.now);
>> +                          nat_action_info_ref, pmd->ctx.now / 1000);
>>         break;
>>     }
>>
>> diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index
>> 074535b..b13b0fa 100644
>> --- a/vswitchd/vswitch.xml
>> +++ b/vswitchd/vswitch.xml
>> @@ -344,6 +344,22 @@
>>         </p>
>>       </column>
>>
>> +      <column name="other_config" key="tx-flush-interval"
>> +              type='{"type": "integer",
>> +                     "minInteger": 0, "maxInteger": 1000000}'>
>> +        <p>
>> +          Specifies the time in microseconds that a packet can wait in output
>> +          batch for sending i.e. amount of time that packet can spend in an
>> +          intermediate output queue before sending to netdev.
>> +          This option can be used to configure balance between throughput
>> +          and latency. Lower values decreases latency while higher values
>> +          may be useful to achieve higher performance.
>> +        </p>
>> +        <p>
>> +          Defaults to 0 i.e. instant packet sending (latency optimized).
>> +        </p>
>> +      </column>
>> +
>>       <column name="other_config" key="n-handler-threads"
>>               type='{"type": "integer", "minInteger": 1}'>
>>         <p>
>> --
>> 2.7.4
> 
> 
> 
>
diff mbox series

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 166b73a..3ddb711 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -85,6 +85,9 @@  VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 
+/* Use instant packet send by default. */
+#define DEFAULT_TX_FLUSH_INTERVAL 0
+
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
@@ -178,12 +181,13 @@  struct emc_cache {
 
 /* Simple non-wildcarding single-priority classifier. */
 
-/* Time in ms between successive optimizations of the dpcls subtable vector */
-#define DPCLS_OPTIMIZATION_INTERVAL 1000
+/* Time in microseconds between successive optimizations of the dpcls
+ * subtable vector */
+#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
 
-/* Time in ms of the interval in which rxq processing cycles used in
- * rxq to pmd assignments is measured and stored. */
-#define PMD_RXQ_INTERVAL_LEN 10000
+/* Time in microseconds of the interval in which rxq processing cycles used
+ * in rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000000LL
 
 /* Number of intervals for which cycles are stored
  * and used during rxq to pmd assignment. */
@@ -270,6 +274,9 @@  struct dp_netdev {
     struct hmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* The time that a packet can wait in output batch for sending. */
+    atomic_uint32_t tx_flush_interval;
+
     /* Meters. */
     struct ovs_mutex meter_locks[N_METER_LOCKS];
     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
@@ -356,7 +363,7 @@  enum rxq_cycles_counter_type {
     RXQ_N_CYCLES
 };
 
-#define XPS_TIMEOUT_MS 500LL
+#define XPS_TIMEOUT 500000LL    /* In microseconds. */
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
 struct dp_netdev_rxq {
@@ -526,6 +533,7 @@  struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    long long flush_time;
     struct dp_packet_batch output_pkts;
 };
 
@@ -614,6 +622,9 @@  struct dp_netdev_pmd_thread {
      * than 'cmap_count(dp->poll_threads)'. */
     uint32_t static_tx_qid;
 
+    /* Number of filled output batches. */
+    int n_output_batches;
+
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
     struct hmap poll_list OVS_GUARDED;
@@ -707,8 +718,9 @@  static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
 static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                        struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex);
-static void
-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd);
+static int
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+                                   bool force);
 
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
@@ -783,7 +795,7 @@  emc_cache_slow_sweep(struct emc_cache *flow_cache)
 static inline void
 pmd_thread_ctx_time_update(struct dp_netdev_pmd_thread *pmd)
 {
-    pmd->ctx.now = time_msec();
+    pmd->ctx.now = time_usec();
 }
 
 /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */
@@ -1283,6 +1295,7 @@  create_dp_netdev(const char *name, const struct dpif_class *class,
     conntrack_init(&dp->conntrack);
 
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
 
     cmap_init(&dp->poll_threads);
 
@@ -2950,7 +2963,7 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     dp_packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                               execute->actions, execute->actions_len);
-    dp_netdev_pmd_flush_output_packets(pmd);
+    dp_netdev_pmd_flush_output_packets(pmd, true);
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2999,6 +3012,16 @@  dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
         smap_get_ullong(other_config, "emc-insert-inv-prob",
                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
     uint32_t insert_min, cur_min;
+    uint32_t tx_flush_interval, cur_tx_flush_interval;
+
+    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
+                                     DEFAULT_TX_FLUSH_INTERVAL);
+    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
+    if (tx_flush_interval != cur_tx_flush_interval) {
+        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
+        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
+                  tx_flush_interval);
+    }
 
     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
         free(dp->pmd_cmask);
@@ -3237,11 +3260,12 @@  dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
     return processing_cycles;
 }
 
-static void
+static int
 dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                    struct tx_port *p)
 {
     int tx_qid;
+    int output_cnt;
     bool dynamic_txqs;
 
     dynamic_txqs = p->port->dynamic_txqs;
@@ -3251,20 +3275,41 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
         tx_qid = pmd->static_tx_qid;
     }
 
+    output_cnt = dp_packet_batch_size(&p->output_pkts);
     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
     dp_packet_batch_init(&p->output_pkts);
+
+    if (output_cnt) {
+        uint32_t tx_flush_interval;
+
+        /* Update time of the next flush. */
+        atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval);
+        p->flush_time = pmd->ctx.now + tx_flush_interval;
+
+        ovs_assert(pmd->n_output_batches > 0);
+        pmd->n_output_batches--;
+    }
+    return output_cnt;
 }
 
-static void
-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd)
+static int
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+                                   bool force)
 {
     struct tx_port *p;
+    int output_cnt = 0;
+
+    if (!pmd->n_output_batches) {
+        return 0;
+    }
 
     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
-        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
-            dp_netdev_pmd_flush_output_on_port(pmd, p);
+        if (!dp_packet_batch_is_empty(&p->output_pkts)
+            && (force || pmd->ctx.now >= p->flush_time)) {
+            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
         }
     }
+    return output_cnt;
 }
 
 static int
@@ -3274,7 +3319,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 {
     struct dp_packet_batch batch;
     int error;
-    int batch_cnt = 0;
+    int batch_cnt = 0, output_cnt = 0;
 
     dp_packet_batch_init(&batch);
     error = netdev_rxq_recv(rx, &batch);
@@ -3284,7 +3329,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 
         batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no);
-        dp_netdev_pmd_flush_output_packets(pmd);
+        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -3292,7 +3337,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                     netdev_rxq_get_name(rx), ovs_strerror(error));
     }
 
-    return batch_cnt;
+    return batch_cnt + output_cnt;
 }
 
 static struct tx_port *
@@ -3904,7 +3949,8 @@  dpif_netdev_run(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *non_pmd;
     uint64_t new_tnl_seq;
-    int process_packets = 0;
+    int process_packets;
+    bool need_to_flush = true;
 
     ovs_mutex_lock(&dp->port_mutex);
     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
@@ -3924,9 +3970,22 @@  dpif_netdev_run(struct dpif *dpif)
                                               process_packets
                                               ? PMD_CYCLES_PROCESSING
                                               : PMD_CYCLES_IDLE);
+                    if (process_packets) {
+                        need_to_flush = false;
+                    }
                 }
             }
         }
+        if (need_to_flush) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something. */
+            process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
+                                                                 false);
+            cycles_count_intermediate(non_pmd, NULL, process_packets
+                                                     ? PMD_CYCLES_PROCESSING
+                                                     : PMD_CYCLES_IDLE);
+        }
+
         cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
         pmd_thread_ctx_time_update(non_pmd);
         dpif_netdev_xps_revalidate_pmd(non_pmd, false);
@@ -3979,6 +4038,8 @@  pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct tx_port *tx_port_cached;
 
+    /* Flush all the queued packets. */
+    dp_netdev_pmd_flush_output_packets(pmd, true);
     /* Free all used tx queue ids. */
     dpif_netdev_xps_revalidate_pmd(pmd, true);
 
@@ -4077,7 +4138,6 @@  pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
-    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -4107,6 +4167,9 @@  reload:
 
     cycles_count_start(pmd);
     for (;;) {
+        int process_packets;
+        bool need_to_flush = true;
+
         for (i = 0; i < poll_cnt; i++) {
             process_packets =
                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
@@ -4114,6 +4177,18 @@  reload:
             cycles_count_intermediate(pmd, poll_list[i].rxq,
                                       process_packets ? PMD_CYCLES_PROCESSING
                                                       : PMD_CYCLES_IDLE);
+            if (process_packets) {
+                need_to_flush = false;
+            }
+        }
+
+        if (need_to_flush) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something. */
+            process_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
+            cycles_count_intermediate(pmd, NULL,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
         }
 
         if (lc++ > 1024) {
@@ -4207,7 +4282,7 @@  dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
 
     /* All packets will hit the meter at the same time. */
-    long_delta_t = (now - meter->used); /* msec */
+    long_delta_t = (now - meter->used) / 1000; /* msec */
 
     /* Make sure delta_t will not be too large, so that bucket will not
      * wrap around below. */
@@ -4363,7 +4438,7 @@  dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
         meter->flags = config->flags;
         meter->n_bands = config->n_bands;
         meter->max_delta_t = 0;
-        meter->used = time_msec();
+        meter->used = time_usec();
 
         /* set up bands */
         for (i = 0; i < config->n_bands; ++i) {
@@ -4561,6 +4636,7 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
     pmd->need_reload = false;
+    pmd->n_output_batches = 0;
 
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
@@ -4748,6 +4824,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->flush_time = 0LL;
     dp_packet_batch_init(&tx->output_pkts);
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
@@ -4911,7 +4988,7 @@  packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
     struct dp_netdev_flow *flow = batch->flow;
 
     dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
-                        batch->tcp_flags, pmd->ctx.now);
+                        batch->tcp_flags, pmd->ctx.now / 1000);
 
     actions = dp_netdev_flow_get_actions(flow);
 
@@ -5286,7 +5363,7 @@  dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
             continue;
         }
         interval = pmd->ctx.now - tx->last_used;
-        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
             port = tx->port;
             ovs_mutex_lock(&port->txq_used_mutex);
             port->txq_used[tx->qid]--;
@@ -5307,7 +5384,7 @@  dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
     interval = pmd->ctx.now - tx->last_used;
     tx->last_used = pmd->ctx.now;
 
-    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
         return tx->qid;
     }
 
@@ -5439,12 +5516,16 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 dp_netdev_pmd_flush_output_on_port(pmd, p);
             }
 #endif
-            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
-                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
-                /* Some packets was generated while input batch processing.
-                 * Flush here to avoid overflow. */
+            if (dp_packet_batch_size(&p->output_pkts)
+                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+                /* Flush here to avoid overflow. */
                 dp_netdev_pmd_flush_output_on_port(pmd, p);
             }
+
+            if (dp_packet_batch_is_empty(&p->output_pkts)) {
+                pmd->n_output_batches++;
+            }
+
             DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
                 dp_packet_batch_add(&p->output_pkts, packet);
             }
@@ -5685,7 +5766,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
                           commit, zone, setmark, setlabel, helper,
-                          nat_action_info_ref, pmd->ctx.now);
+                          nat_action_info_ref, pmd->ctx.now / 1000);
         break;
     }
 
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index 074535b..b13b0fa 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -344,6 +344,22 @@ 
         </p>
       </column>
 
+      <column name="other_config" key="tx-flush-interval"
+              type='{"type": "integer",
+                     "minInteger": 0, "maxInteger": 1000000}'>
+        <p>
+          Specifies the time in microseconds that a packet can wait in output
+          batch for sending i.e. amount of time that packet can spend in an
+          intermediate output queue before sending to netdev.
+          This option can be used to configure balance between throughput
+          and latency. Lower values decreases latency while higher values
+          may be useful to achieve higher performance.
+        </p>
+        <p>
+          Defaults to 0 i.e. instant packet sending (latency optimized).
+        </p>
+      </column>
+
       <column name="other_config" key="n-handler-threads"
               type='{"type": "integer", "minInteger": 1}'>
         <p>