[ovs-dev,v4,6/7] dpif-netdev: Time based output batching.

Message ID 1507215962-17692-7-git-send-email-i.maximets@samsung.com
State New
Headers show
Series
  • Output packet batching.
Related show

Commit Message

Ilya Maximets Oct. 5, 2017, 3:06 p.m.
This allows to collect packets from more than one RX burst
and send them together with a configurable intervals.

'other_config:tx-flush-interval' can be used to configure
time that a packet can wait in output batch for sending.

dpif-netdev turned to microsecond resolution for time
measuring to ensure desired resolution of 'tx-flush-interval'.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
 lib/dpif-netdev.c    | 141 ++++++++++++++++++++++++++++++++++++++++-----------
 vswitchd/vswitch.xml |  16 ++++++
 2 files changed, 127 insertions(+), 30 deletions(-)

Comments

Eelco Chaudron Oct. 11, 2017, 9:18 a.m. | #1
On 05/10/17 17:06, Ilya Maximets wrote:
> This allows to collect packets from more than one RX burst
> and send them together with a configurable intervals.
>
> 'other_config:tx-flush-interval' can be used to configure
> time that a packet can wait in output batch for sending.
>
> dpif-netdev turned to microsecond resolution for time
> measuring to ensure desired resolution of 'tx-flush-interval'.
>
> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---8<--- ... --->8---
> @@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
>           smap_get_ullong(other_config, "emc-insert-inv-prob",
>                           DEFAULT_EM_FLOW_INSERT_INV_PROB);
>       uint32_t insert_min, cur_min;
> +    uint32_t tx_flush_interval, cur_tx_flush_interval;
> +
> +    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
> +                                     DEFAULT_TX_FLUSH_INTERVAL);
> +    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
> +    if (tx_flush_interval != cur_tx_flush_interval) {
> +        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
> +        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
> +                  tx_flush_interval);
> +    }
I was wondering if it will be an issue that the new value configured 
might not
be in effect until the previous configured flush timeout has expired.
I wanted to suggest to maybe store the p->last_flush_time rather than 
p->flush
time. But looking at it again, with the maximum value being 1 second, it 
should
not be a problem...

All looks good to me

Acked-by: Eelco Chaudron <echaudro@redhat.com>

---8<--- ... --->8---
Bodireddy, Bhanuprakash Oct. 13, 2017, 2:33 p.m. | #2
>This allows to collect packets from more than one RX burst and send them
>together with a configurable intervals.
>
>'other_config:tx-flush-interval' can be used to configure time that a packet
>can wait in output batch for sending.
>
>dpif-netdev turned to microsecond resolution for time measuring to ensure
>desired resolution of 'tx-flush-interval'.
>
>Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>---
> lib/dpif-netdev.c    | 141
>++++++++++++++++++++++++++++++++++++++++-----------
> vswitchd/vswitch.xml |  16 ++++++
> 2 files changed, 127 insertions(+), 30 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 166b73a..3ddb711
>100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
> #define MAX_RECIRC_DEPTH 5
> DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>
>+/* Use instant packet send by default. */ #define
>+DEFAULT_TX_FLUSH_INTERVAL 0
>+
> /* Configuration parameters. */
> enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table.
>*/
> enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
>@@ -178,12 +181,13 @@ struct emc_cache {
> 

> /* Simple non-wildcarding single-priority classifier. */
>
>-/* Time in ms between successive optimizations of the dpcls subtable vector
>*/ -#define DPCLS_OPTIMIZATION_INTERVAL 1000
>+/* Time in microseconds between successive optimizations of the dpcls
>+ * subtable vector */
>+#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
>
>-/* Time in ms of the interval in which rxq processing cycles used in
>- * rxq to pmd assignments is measured and stored. */ -#define
>PMD_RXQ_INTERVAL_LEN 10000
>+/* Time in microseconds of the interval in which rxq processing cycles
>+used
>+ * in rxq to pmd assignments is measured and stored. */ #define
>+PMD_RXQ_INTERVAL_LEN 10000000LL
>
> /* Number of intervals for which cycles are stored
>  * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@ struct
>dp_netdev {
>     struct hmap ports;
>     struct seq *port_seq;       /* Incremented whenever a port changes. */
>
>+    /* The time that a packet can wait in output batch for sending. */
>+    atomic_uint32_t tx_flush_interval;
>+
>     /* Meters. */
>     struct ovs_mutex meter_locks[N_METER_LOCKS];
>     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7
>+363,7 @@ enum rxq_cycles_counter_type {
>     RXQ_N_CYCLES
> };
>
>-#define XPS_TIMEOUT_MS 500LL
>+#define XPS_TIMEOUT 500000LL    /* In microseconds. */
>
> /* Contained by struct dp_netdev_port's 'rxqs' member.  */  struct
>dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port {
>     int qid;
>     long long last_used;
>     struct hmap_node node;
>+    long long flush_time;
>     struct dp_packet_batch output_pkts;  };
>
>@@ -614,6 +622,9 @@ struct dp_netdev_pmd_thread {
>      * than 'cmap_count(dp->poll_threads)'. */
>     uint32_t static_tx_qid;
>
>+    /* Number of filled output batches. */
>+    int n_output_batches;
>+
>     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
>     /* List of rx queues to poll. */
>     struct hmap poll_list OVS_GUARDED;
>@@ -707,8 +718,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>dp_netdev_pmd_thread *pmd,  static void
>dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>                                        struct rxq_poll *poll)
>     OVS_REQUIRES(pmd->port_mutex);
>-static void
>-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd);
>+static int
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+                                   bool force);
>
> static void reconfigure_datapath(struct dp_netdev *dp)
>     OVS_REQUIRES(dp->port_mutex);
>@@ -783,7 +795,7 @@ emc_cache_slow_sweep(struct emc_cache
>*flow_cache)  static inline void  pmd_thread_ctx_time_update(struct
>dp_netdev_pmd_thread *pmd)  {
>-    pmd->ctx.now = time_msec();
>+    pmd->ctx.now = time_usec();
> }
>
> /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */ @@ -
>1283,6 +1295,7 @@ create_dp_netdev(const char *name, const struct
>dpif_class *class,
>     conntrack_init(&dp->conntrack);
>
>     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
>+    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
>
>     cmap_init(&dp->poll_threads);
>
>@@ -2950,7 +2963,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>dpif_execute *execute)
>     dp_packet_batch_init_packet(&pp, execute->packet);
>     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>                               execute->actions, execute->actions_len);
>-    dp_netdev_pmd_flush_output_packets(pmd);
>+    dp_netdev_pmd_flush_output_packets(pmd, true);
>
>     if (pmd->core_id == NON_PMD_CORE_ID) {
>         ovs_mutex_unlock(&dp->non_pmd_mutex);
>@@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const
>struct smap *other_config)
>         smap_get_ullong(other_config, "emc-insert-inv-prob",
>                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
>     uint32_t insert_min, cur_min;
>+    uint32_t tx_flush_interval, cur_tx_flush_interval;
>+
>+    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
>+                                     DEFAULT_TX_FLUSH_INTERVAL);
>+    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
>+    if (tx_flush_interval != cur_tx_flush_interval) {
>+        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
>+        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
>+                  tx_flush_interval);
>+    }
>
>     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>         free(dp->pmd_cmask);
>@@ -3237,11 +3260,12 @@ dp_netdev_rxq_get_intrvl_cycles(struct
>dp_netdev_rxq *rx, unsigned idx)
>     return processing_cycles;
> }
>
>-static void
>+static int
> dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
>                                    struct tx_port *p)  {
>     int tx_qid;
>+    int output_cnt;
>     bool dynamic_txqs;
>
>     dynamic_txqs = p->port->dynamic_txqs; @@ -3251,20 +3275,41 @@
>dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
>         tx_qid = pmd->static_tx_qid;
>     }
>
>+    output_cnt = dp_packet_batch_size(&p->output_pkts);
>     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
>     dp_packet_batch_init(&p->output_pkts);
>+
>+    if (output_cnt) {

[BHANU] Reading the output batch size and having this extra check above seems redundant as 
we will only come here when !dp_packet_batch_is_empty() and the caller is dp_netdev_pmd_flush_output_packets().  

Even for other cases where this API is called directly (from  dp_execute_cb), the checks here
seems redundant as its checked at the caller. 

- Bhanuprakash.

>+        uint32_t tx_flush_interval;
>+
>+        /* Update time of the next flush. */
>+        atomic_read_relaxed(&pmd->dp->tx_flush_interval,
>&tx_flush_interval);
>+        p->flush_time = pmd->ctx.now + tx_flush_interval;
>+
>+        ovs_assert(pmd->n_output_batches > 0);
>+        pmd->n_output_batches--;
>+    }
>+    return output_cnt;
> }
>
>-static void
>-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd)
>+static int
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+                                   bool force)
> {
>     struct tx_port *p;
>+    int output_cnt = 0;
>+
>+    if (!pmd->n_output_batches) {
>+        return 0;
>+    }
>
>     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>-        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>-            dp_netdev_pmd_flush_output_on_port(pmd, p);
>+        if (!dp_packet_batch_is_empty(&p->output_pkts)
>+            && (force || pmd->ctx.now >= p->flush_time)) {
>+            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
>         }
>     }
>+    return output_cnt;
> }
>
> static int
>@@ -3274,7 +3319,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,  {
>     struct dp_packet_batch batch;
>     int error;
>-    int batch_cnt = 0;
>+    int batch_cnt = 0, output_cnt = 0;
>
>     dp_packet_batch_init(&batch);
>     error = netdev_rxq_recv(rx, &batch); @@ -3284,7 +3329,7 @@
>dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>
>         batch_cnt = batch.count;
>         dp_netdev_input(pmd, &batch, port_no);
>-        dp_netdev_pmd_flush_output_packets(pmd);
>+        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
>     } else if (error != EAGAIN && error != EOPNOTSUPP) {
>         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>
>@@ -3292,7 +3337,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,
>                     netdev_rxq_get_name(rx), ovs_strerror(error));
>     }
>
>-    return batch_cnt;
>+    return batch_cnt + output_cnt;
> }
>
> static struct tx_port *
>@@ -3904,7 +3949,8 @@ dpif_netdev_run(struct dpif *dpif)
>     struct dp_netdev *dp = get_dp_netdev(dpif);
>     struct dp_netdev_pmd_thread *non_pmd;
>     uint64_t new_tnl_seq;
>-    int process_packets = 0;
>+    int process_packets;
>+    bool need_to_flush = true;
>
>     ovs_mutex_lock(&dp->port_mutex);
>     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3924,9
>+3970,22 @@ dpif_netdev_run(struct dpif *dpif)
>                                               process_packets
>                                               ? PMD_CYCLES_PROCESSING
>                                               : PMD_CYCLES_IDLE);
>+                    if (process_packets) {
>+                        need_to_flush = false;
>+                    }
>                 }
>             }
>         }
>+        if (need_to_flush) {
>+            /* We didn't receive anything in the process loop.
>+             * Check if we need to send something. */
>+            process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
>+                                                                 false);
>+            cycles_count_intermediate(non_pmd, NULL, process_packets
>+                                                     ? PMD_CYCLES_PROCESSING
>+                                                     : PMD_CYCLES_IDLE);
>+        }
>+
>         cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
>         pmd_thread_ctx_time_update(non_pmd);
>         dpif_netdev_xps_revalidate_pmd(non_pmd, false); @@ -3979,6 +4038,8
>@@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)  {
>     struct tx_port *tx_port_cached;
>
>+    /* Flush all the queued packets. */
>+    dp_netdev_pmd_flush_output_packets(pmd, true);
>     /* Free all used tx queue ids. */
>     dpif_netdev_xps_revalidate_pmd(pmd, true);
>
>@@ -4077,7 +4138,6 @@ pmd_thread_main(void *f_)
>     bool exiting;
>     int poll_cnt;
>     int i;
>-    int process_packets = 0;
>
>     poll_list = NULL;
>
>@@ -4107,6 +4167,9 @@ reload:
>
>     cycles_count_start(pmd);
>     for (;;) {
>+        int process_packets;
>+        bool need_to_flush = true;
>+
>         for (i = 0; i < poll_cnt; i++) {
>             process_packets =
>                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx, @@ -4114,6
>+4177,18 @@ reload:
>             cycles_count_intermediate(pmd, poll_list[i].rxq,
>                                       process_packets ? PMD_CYCLES_PROCESSING
>                                                       : PMD_CYCLES_IDLE);
>+            if (process_packets) {
>+                need_to_flush = false;
>+            }
>+        }
>+
>+        if (need_to_flush) {
>+            /* We didn't receive anything in the process loop.
>+             * Check if we need to send something. */
>+            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
>false);
>+            cycles_count_intermediate(pmd, NULL,
>+                                      process_packets ? PMD_CYCLES_PROCESSING
>+                                                      :
>+ PMD_CYCLES_IDLE);
>         }
>
>         if (lc++ > 1024) {
>@@ -4207,7 +4282,7 @@ dp_netdev_run_meter(struct dp_netdev *dp,
>struct dp_packet_batch *packets_,
>     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>
>     /* All packets will hit the meter at the same time. */
>-    long_delta_t = (now - meter->used); /* msec */
>+    long_delta_t = (now - meter->used) / 1000; /* msec */
>
>     /* Make sure delta_t will not be too large, so that bucket will not
>      * wrap around below. */
>@@ -4363,7 +4438,7 @@ dpif_netdev_meter_set(struct dpif *dpif,
>ofproto_meter_id *meter_id,
>         meter->flags = config->flags;
>         meter->n_bands = config->n_bands;
>         meter->max_delta_t = 0;
>-        meter->used = time_msec();
>+        meter->used = time_usec();
>
>         /* set up bands */
>         for (i = 0; i < config->n_bands; ++i) { @@ -4561,6 +4636,7 @@
>dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>dp_netdev *dp,
>     pmd->core_id = core_id;
>     pmd->numa_id = numa_id;
>     pmd->need_reload = false;
>+    pmd->n_output_batches = 0;
>
>     ovs_refcount_init(&pmd->ref_cnt);
>     latch_init(&pmd->exit_latch);
>@@ -4748,6 +4824,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
>
>     tx->port = port;
>     tx->qid = -1;
>+    tx->flush_time = 0LL;
>     dp_packet_batch_init(&tx->output_pkts);
>
>     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no)); @@ -4911,7 +4988,7 @@ packet_batch_per_flow_execute(struct
>packet_batch_per_flow *batch,
>     struct dp_netdev_flow *flow = batch->flow;
>
>     dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
>-                        batch->tcp_flags, pmd->ctx.now);
>+                        batch->tcp_flags, pmd->ctx.now / 1000);
>
>     actions = dp_netdev_flow_get_actions(flow);
>
>@@ -5286,7 +5363,7 @@ dpif_netdev_xps_revalidate_pmd(const struct
>dp_netdev_pmd_thread *pmd,
>             continue;
>         }
>         interval = pmd->ctx.now - tx->last_used;
>-        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
>+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
>             port = tx->port;
>             ovs_mutex_lock(&port->txq_used_mutex);
>             port->txq_used[tx->qid]--;
>@@ -5307,7 +5384,7 @@ dpif_netdev_xps_get_tx_qid(const struct
>dp_netdev_pmd_thread *pmd,
>     interval = pmd->ctx.now - tx->last_used;
>     tx->last_used = pmd->ctx.now;
>
>-    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
>+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
>         return tx->qid;
>     }
>
>@@ -5439,12 +5516,16 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
>                 dp_netdev_pmd_flush_output_on_port(pmd, p);
>             }
> #endif
>-            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>-                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>-                /* Some packets was generated while input batch processing.
>-                 * Flush here to avoid overflow. */
>+            if (dp_packet_batch_size(&p->output_pkts)
>+                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
>+                /* Flush here to avoid overflow. */
>                 dp_netdev_pmd_flush_output_on_port(pmd, p);
>             }
>+
>+            if (dp_packet_batch_is_empty(&p->output_pkts)) {
>+                pmd->n_output_batches++;
>+            }
>+
>             DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>                 dp_packet_batch_add(&p->output_pkts, packet);
>             }
>@@ -5685,7 +5766,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
>
>         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type,
>force,
>                           commit, zone, setmark, setlabel, helper,
>-                          nat_action_info_ref, pmd->ctx.now);
>+                          nat_action_info_ref, pmd->ctx.now / 1000);
>         break;
>     }
>
>diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index
>074535b..b13b0fa 100644
>--- a/vswitchd/vswitch.xml
>+++ b/vswitchd/vswitch.xml
>@@ -344,6 +344,22 @@
>         </p>
>       </column>
>
>+      <column name="other_config" key="tx-flush-interval"
>+              type='{"type": "integer",
>+                     "minInteger": 0, "maxInteger": 1000000}'>
>+        <p>
>+          Specifies the time in microseconds that a packet can wait in output
>+          batch for sending i.e. amount of time that packet can spend in an
>+          intermediate output queue before sending to netdev.
>+          This option can be used to configure balance between throughput
>+          and latency. Lower values decreases latency while higher values
>+          may be useful to achieve higher performance.
>+        </p>
>+        <p>
>+          Defaults to 0 i.e. instant packet sending (latency optimized).
>+        </p>
>+      </column>
>+
>       <column name="other_config" key="n-handler-threads"
>               type='{"type": "integer", "minInteger": 1}'>
>         <p>
>--
>2.7.4

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 166b73a..3ddb711 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -85,6 +85,9 @@  VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 
+/* Use instant packet send by default. */
+#define DEFAULT_TX_FLUSH_INTERVAL 0
+
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
@@ -178,12 +181,13 @@  struct emc_cache {
 
 /* Simple non-wildcarding single-priority classifier. */
 
-/* Time in ms between successive optimizations of the dpcls subtable vector */
-#define DPCLS_OPTIMIZATION_INTERVAL 1000
+/* Time in microseconds between successive optimizations of the dpcls
+ * subtable vector */
+#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
 
-/* Time in ms of the interval in which rxq processing cycles used in
- * rxq to pmd assignments is measured and stored. */
-#define PMD_RXQ_INTERVAL_LEN 10000
+/* Time in microseconds of the interval in which rxq processing cycles used
+ * in rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000000LL
 
 /* Number of intervals for which cycles are stored
  * and used during rxq to pmd assignment. */
@@ -270,6 +274,9 @@  struct dp_netdev {
     struct hmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* The time that a packet can wait in output batch for sending. */
+    atomic_uint32_t tx_flush_interval;
+
     /* Meters. */
     struct ovs_mutex meter_locks[N_METER_LOCKS];
     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
@@ -356,7 +363,7 @@  enum rxq_cycles_counter_type {
     RXQ_N_CYCLES
 };
 
-#define XPS_TIMEOUT_MS 500LL
+#define XPS_TIMEOUT 500000LL    /* In microseconds. */
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
 struct dp_netdev_rxq {
@@ -526,6 +533,7 @@  struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    long long flush_time;
     struct dp_packet_batch output_pkts;
 };
 
@@ -614,6 +622,9 @@  struct dp_netdev_pmd_thread {
      * than 'cmap_count(dp->poll_threads)'. */
     uint32_t static_tx_qid;
 
+    /* Number of filled output batches. */
+    int n_output_batches;
+
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
     struct hmap poll_list OVS_GUARDED;
@@ -707,8 +718,9 @@  static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
 static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                        struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex);
-static void
-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd);
+static int
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+                                   bool force);
 
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
@@ -783,7 +795,7 @@  emc_cache_slow_sweep(struct emc_cache *flow_cache)
 static inline void
 pmd_thread_ctx_time_update(struct dp_netdev_pmd_thread *pmd)
 {
-    pmd->ctx.now = time_msec();
+    pmd->ctx.now = time_usec();
 }
 
 /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */
@@ -1283,6 +1295,7 @@  create_dp_netdev(const char *name, const struct dpif_class *class,
     conntrack_init(&dp->conntrack);
 
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
 
     cmap_init(&dp->poll_threads);
 
@@ -2950,7 +2963,7 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     dp_packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                               execute->actions, execute->actions_len);
-    dp_netdev_pmd_flush_output_packets(pmd);
+    dp_netdev_pmd_flush_output_packets(pmd, true);
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2999,6 +3012,16 @@  dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
         smap_get_ullong(other_config, "emc-insert-inv-prob",
                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
     uint32_t insert_min, cur_min;
+    uint32_t tx_flush_interval, cur_tx_flush_interval;
+
+    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
+                                     DEFAULT_TX_FLUSH_INTERVAL);
+    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
+    if (tx_flush_interval != cur_tx_flush_interval) {
+        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
+        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
+                  tx_flush_interval);
+    }
 
     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
         free(dp->pmd_cmask);
@@ -3237,11 +3260,12 @@  dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
     return processing_cycles;
 }
 
-static void
+static int
 dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                    struct tx_port *p)
 {
     int tx_qid;
+    int output_cnt;
     bool dynamic_txqs;
 
     dynamic_txqs = p->port->dynamic_txqs;
@@ -3251,20 +3275,41 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
         tx_qid = pmd->static_tx_qid;
     }
 
+    output_cnt = dp_packet_batch_size(&p->output_pkts);
     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
     dp_packet_batch_init(&p->output_pkts);
+
+    if (output_cnt) {
+        uint32_t tx_flush_interval;
+
+        /* Update time of the next flush. */
+        atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval);
+        p->flush_time = pmd->ctx.now + tx_flush_interval;
+
+        ovs_assert(pmd->n_output_batches > 0);
+        pmd->n_output_batches--;
+    }
+    return output_cnt;
 }
 
-static void
-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd)
+static int
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+                                   bool force)
 {
     struct tx_port *p;
+    int output_cnt = 0;
+
+    if (!pmd->n_output_batches) {
+        return 0;
+    }
 
     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
-        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
-            dp_netdev_pmd_flush_output_on_port(pmd, p);
+        if (!dp_packet_batch_is_empty(&p->output_pkts)
+            && (force || pmd->ctx.now >= p->flush_time)) {
+            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
         }
     }
+    return output_cnt;
 }
 
 static int
@@ -3274,7 +3319,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 {
     struct dp_packet_batch batch;
     int error;
-    int batch_cnt = 0;
+    int batch_cnt = 0, output_cnt = 0;
 
     dp_packet_batch_init(&batch);
     error = netdev_rxq_recv(rx, &batch);
@@ -3284,7 +3329,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 
         batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no);
-        dp_netdev_pmd_flush_output_packets(pmd);
+        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -3292,7 +3337,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                     netdev_rxq_get_name(rx), ovs_strerror(error));
     }
 
-    return batch_cnt;
+    return batch_cnt + output_cnt;
 }
 
 static struct tx_port *
@@ -3904,7 +3949,8 @@  dpif_netdev_run(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *non_pmd;
     uint64_t new_tnl_seq;
-    int process_packets = 0;
+    int process_packets;
+    bool need_to_flush = true;
 
     ovs_mutex_lock(&dp->port_mutex);
     non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
@@ -3924,9 +3970,22 @@  dpif_netdev_run(struct dpif *dpif)
                                               process_packets
                                               ? PMD_CYCLES_PROCESSING
                                               : PMD_CYCLES_IDLE);
+                    if (process_packets) {
+                        need_to_flush = false;
+                    }
                 }
             }
         }
+        if (need_to_flush) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something. */
+            process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
+                                                                 false);
+            cycles_count_intermediate(non_pmd, NULL, process_packets
+                                                     ? PMD_CYCLES_PROCESSING
+                                                     : PMD_CYCLES_IDLE);
+        }
+
         cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
         pmd_thread_ctx_time_update(non_pmd);
         dpif_netdev_xps_revalidate_pmd(non_pmd, false);
@@ -3979,6 +4038,8 @@  pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct tx_port *tx_port_cached;
 
+    /* Flush all the queued packets. */
+    dp_netdev_pmd_flush_output_packets(pmd, true);
     /* Free all used tx queue ids. */
     dpif_netdev_xps_revalidate_pmd(pmd, true);
 
@@ -4077,7 +4138,6 @@  pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
-    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -4107,6 +4167,9 @@  reload:
 
     cycles_count_start(pmd);
     for (;;) {
+        int process_packets;
+        bool need_to_flush = true;
+
         for (i = 0; i < poll_cnt; i++) {
             process_packets =
                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
@@ -4114,6 +4177,18 @@  reload:
             cycles_count_intermediate(pmd, poll_list[i].rxq,
                                       process_packets ? PMD_CYCLES_PROCESSING
                                                       : PMD_CYCLES_IDLE);
+            if (process_packets) {
+                need_to_flush = false;
+            }
+        }
+
+        if (need_to_flush) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something. */
+            process_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
+            cycles_count_intermediate(pmd, NULL,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
         }
 
         if (lc++ > 1024) {
@@ -4207,7 +4282,7 @@  dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
 
     /* All packets will hit the meter at the same time. */
-    long_delta_t = (now - meter->used); /* msec */
+    long_delta_t = (now - meter->used) / 1000; /* msec */
 
     /* Make sure delta_t will not be too large, so that bucket will not
      * wrap around below. */
@@ -4363,7 +4438,7 @@  dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
         meter->flags = config->flags;
         meter->n_bands = config->n_bands;
         meter->max_delta_t = 0;
-        meter->used = time_msec();
+        meter->used = time_usec();
 
         /* set up bands */
         for (i = 0; i < config->n_bands; ++i) {
@@ -4561,6 +4636,7 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
     pmd->need_reload = false;
+    pmd->n_output_batches = 0;
 
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
@@ -4748,6 +4824,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->flush_time = 0LL;
     dp_packet_batch_init(&tx->output_pkts);
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
@@ -4911,7 +4988,7 @@  packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
     struct dp_netdev_flow *flow = batch->flow;
 
     dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
-                        batch->tcp_flags, pmd->ctx.now);
+                        batch->tcp_flags, pmd->ctx.now / 1000);
 
     actions = dp_netdev_flow_get_actions(flow);
 
@@ -5286,7 +5363,7 @@  dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
             continue;
         }
         interval = pmd->ctx.now - tx->last_used;
-        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
             port = tx->port;
             ovs_mutex_lock(&port->txq_used_mutex);
             port->txq_used[tx->qid]--;
@@ -5307,7 +5384,7 @@  dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
     interval = pmd->ctx.now - tx->last_used;
     tx->last_used = pmd->ctx.now;
 
-    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
         return tx->qid;
     }
 
@@ -5439,12 +5516,16 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 dp_netdev_pmd_flush_output_on_port(pmd, p);
             }
 #endif
-            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
-                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
-                /* Some packets was generated while input batch processing.
-                 * Flush here to avoid overflow. */
+            if (dp_packet_batch_size(&p->output_pkts)
+                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+                /* Flush here to avoid overflow. */
                 dp_netdev_pmd_flush_output_on_port(pmd, p);
             }
+
+            if (dp_packet_batch_is_empty(&p->output_pkts)) {
+                pmd->n_output_batches++;
+            }
+
             DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
                 dp_packet_batch_add(&p->output_pkts, packet);
             }
@@ -5685,7 +5766,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
 
         conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
                           commit, zone, setmark, setlabel, helper,
-                          nat_action_info_ref, pmd->ctx.now);
+                          nat_action_info_ref, pmd->ctx.now / 1000);
         break;
     }
 
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index 074535b..b13b0fa 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -344,6 +344,22 @@ 
         </p>
       </column>
 
+      <column name="other_config" key="tx-flush-interval"
+              type='{"type": "integer",
+                     "minInteger": 0, "maxInteger": 1000000}'>
+        <p>
+          Specifies the time in microseconds that a packet can wait in output
+          batch for sending i.e. amount of time that packet can spend in an
+          intermediate output queue before sending to netdev.
+          This option can be used to configure balance between throughput
+          and latency. Lower values decreases latency while higher values
+          may be useful to achieve higher performance.
+        </p>
+        <p>
+          Defaults to 0 i.e. instant packet sending (latency optimized).
+        </p>
+      </column>
+
       <column name="other_config" key="n-handler-threads"
               type='{"type": "integer", "minInteger": 1}'>
         <p>