diff mbox

[ovs-dev,RFC,v2,4/4] dpif-netdev: Time based output batching.

Message ID 1501082468-22006-5-git-send-email-i.maximets@samsung.com
State Superseded
Headers show

Commit Message

Ilya Maximets July 26, 2017, 3:21 p.m. UTC
This allows to collect packets from more than one RX burst
and send them together with a configurable maximum latency.

'other_config:output-max-latency' can be used to configure
time that a packet can wait in output batch for sending.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---

millisecon granularity is used for now. Can be easily switched to use
microseconds instead.

 lib/dpif-netdev.c    | 97 +++++++++++++++++++++++++++++++++++++++++++---------
 vswitchd/vswitch.xml | 15 ++++++++
 2 files changed, 95 insertions(+), 17 deletions(-)

Comments

Darrell Ball July 28, 2017, 7:20 a.m. UTC | #1
I have not tested yet

However, I would have expected something max latency config. to be specific to netdev-dpdk port types

This type of code also seems to intersect with present and future QoS considerations in netdev-dpdk



-----Original Message-----
From: Ilya Maximets <i.maximets@samsung.com>
Date: Wednesday, July 26, 2017 at 8:21 AM
To: "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>, Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
Cc: Heetae Ahn <heetae82.ahn@samsung.com>, Ben Pfaff <blp@ovn.org>, Antonio Fischetti <antonio.fischetti@intel.com>, Eelco Chaudron <echaudro@redhat.com>, Ciara Loftus <ciara.loftus@intel.com>, Kevin Traynor <ktraynor@redhat.com>, Darrell Ball <dball@vmware.com>, Ilya Maximets <i.maximets@samsung.com>
Subject: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.

    This allows to collect packets from more than one RX burst
    and send them together with a configurable maximum latency.
    
    'other_config:output-max-latency' can be used to configure
    time that a packet can wait in output batch for sending.
    
    Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
    ---
    
    millisecon granularity is used for now. Can be easily switched to use
    microseconds instead.
    
     lib/dpif-netdev.c    | 97 +++++++++++++++++++++++++++++++++++++++++++---------
     vswitchd/vswitch.xml | 15 ++++++++
     2 files changed, 95 insertions(+), 17 deletions(-)
    
    diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
    index 07c7dad..e5f8a3d 100644
    --- a/lib/dpif-netdev.c
    +++ b/lib/dpif-netdev.c
    @@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
     #define MAX_RECIRC_DEPTH 5
     DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
     
    +/* Use instant packet send by default. */
    +#define DEFAULT_OUTPUT_MAX_LATENCY 0
    +
     /* Configuration parameters. */
     enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
     enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
    @@ -261,6 +264,9 @@ struct dp_netdev {
         struct hmap ports;
         struct seq *port_seq;       /* Incremented whenever a port changes. */
     
    +    /* The time that a packet can wait in output batch for sending. */
    +    atomic_uint32_t output_max_latency;
    +
         /* Meters. */
         struct ovs_mutex meter_locks[N_METER_LOCKS];
         struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
    @@ -498,6 +504,7 @@ struct tx_port {
         int qid;
         long long last_used;
         struct hmap_node node;
    +    long long output_time;
         struct dp_packet_batch output_pkts;
     };
     
    @@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
          * than 'cmap_count(dp->poll_threads)'. */
         const int static_tx_qid;
     
    +    /* Number of filled output batches. */
    +    int n_output_batches;
    +
         struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
         /* List of rx queues to poll. */
         struct hmap poll_list OVS_GUARDED;
    @@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
     static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                            struct rxq_poll *poll)
         OVS_REQUIRES(pmd->port_mutex);
    -static void
    +static int
     dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
    -                                   long long now);
    +                                   long long now, bool force);
     static void reconfigure_datapath(struct dp_netdev *dp)
         OVS_REQUIRES(dp->port_mutex);
     static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
    @@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
         conntrack_init(&dp->conntrack);
     
         atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
    +    atomic_init(&dp->output_max_latency, DEFAULT_OUTPUT_MAX_LATENCY);
     
         cmap_init(&dp->poll_threads);
         ovs_mutex_init_recursive(&dp->non_pmd_mutex);
    @@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
         dp_packet_batch_init_packet(&pp, execute->packet);
         dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                                   execute->actions, execute->actions_len, now);
    -    dp_netdev_pmd_flush_output_packets(pmd, now);
    +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
     
         if (pmd->core_id == NON_PMD_CORE_ID) {
             ovs_mutex_unlock(&dp->non_pmd_mutex);
    @@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
             smap_get_ullong(other_config, "emc-insert-inv-prob",
                             DEFAULT_EM_FLOW_INSERT_INV_PROB);
         uint32_t insert_min, cur_min;
    +    uint32_t output_max_latency, cur_max_latency;
    +
    +    output_max_latency = smap_get_int(other_config, "output-max-latency",
    +                                      DEFAULT_OUTPUT_MAX_LATENCY);
    +    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
    +    if (output_max_latency != cur_max_latency) {
    +        atomic_store_relaxed(&dp->output_max_latency, output_max_latency);
    +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
    +                  output_max_latency);
    +    }
     
         if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
             free(dp->pmd_cmask);
    @@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
         non_atomic_ullong_add(&pmd->cycles.n[type], interval);
     }
     
    -static void
    +static int
     dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                        struct tx_port *p, long long now)
     {
         int tx_qid;
    +    int output_cnt;
         bool dynamic_txqs;
     
         dynamic_txqs = p->port->dynamic_txqs;
    @@ -3106,21 +3128,39 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
             tx_qid = pmd->static_tx_qid;
         }
     
    +    output_cnt = dp_packet_batch_size(&p->output_pkts);
         netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
         dp_packet_batch_init(&p->output_pkts);
    +
    +    if (output_cnt) {
    +        ovs_assert(pmd->n_output_batches > 0);
    +        pmd->n_output_batches--;
    +    }
    +    return output_cnt;
     }
     
    -static void
    +static int
     dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
    -                                   long long now)
    +                                   long long now, bool force)
     {
         struct tx_port *p;
    +    int output_cnt = 0;
    +
    +    if (!pmd->n_output_batches) {
    +        return 0;
    +    }
    +
    +    if (!now) {
    +        now = time_msec();
    +    }
     
         HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
    -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
    -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
    +        if (!dp_packet_batch_is_empty(&p->output_pkts)
    +            && (force || p->output_time <= now)) {
    +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
         }
    +    return output_cnt;
     }
     
     static int
    @@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     {
         struct dp_packet_batch batch;
         int error;
    -    int batch_cnt = 0;
    +    int batch_cnt = 0, output_cnt = 0;
     
         dp_packet_batch_init(&batch);
         error = netdev_rxq_recv(rx, &batch);
    @@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     
             batch_cnt = batch.count;
             dp_netdev_input(pmd, &batch, port_no, now);
    -        dp_netdev_pmd_flush_output_packets(pmd, now);
    +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now, false);
         } else if (error != EAGAIN && error != EOPNOTSUPP) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
     
    @@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                         netdev_rxq_get_name(rx), ovs_strerror(error));
         }
     
    -    return batch_cnt;
    +    return batch_cnt + output_cnt;
     }
     
     static struct tx_port *
    @@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
     {
         struct tx_port *tx_port_cached;
     
    +    /* Flush all the queued packets. */
    +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
         /* Free all used tx queue ids. */
         dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
     
    @@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
         bool exiting;
         int poll_cnt;
         int i;
    -    int process_packets = 0;
     
         poll_list = NULL;
     
    @@ -3788,8 +3829,10 @@ reload:
     
         cycles_count_start(pmd);
         for (;;) {
    +        int process_packets = 0;
    +
             for (i = 0; i < poll_cnt; i++) {
    -            process_packets =
    +            process_packets +=
                     dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
                                                poll_list[i].port_no);
                 cycles_count_intermediate(pmd,
    @@ -3797,6 +3840,16 @@ reload:
                                                           : PMD_CYCLES_IDLE);
             }
     
    +        if (!process_packets) {
    +            /* We didn't receive anything in the process loop.
    +             * Check if we need to send something. */
    +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
    +                                                                 0, false);
    +            cycles_count_intermediate(pmd,
    +                                      process_packets ? PMD_CYCLES_PROCESSING
    +                                                      : PMD_CYCLES_IDLE);
    +        }
    +
             if (lc++ > 1024) {
                 bool reload;
     
    @@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
         pmd->numa_id = numa_id;
         pmd->need_reload = false;
     
    +    pmd->n_output_batches = 0;
    +
         *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
     
         ovs_refcount_init(&pmd->ref_cnt);
    @@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
     
         tx->port = port;
         tx->qid = -1;
    +    tx->output_time = 0LL;
         dp_packet_batch_init(&tx->output_pkts);
     
         hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
    @@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                     dp_netdev_pmd_flush_output_on_port(pmd, p, now);
                 }
     #endif
    +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
    +                uint32_t cur_max_latency;
    +
    +                atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
    +                p->output_time = now + cur_max_latency;
     
    -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
    -                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
    -                /* Some packets was generated while input batch processing.
    -                 * Flush here to avoid overflow. */
    +                if (dp_packet_batch_size(packets_)) {
    +                    pmd->n_output_batches++;
    +                }
    +            } else if (dp_packet_batch_size(&p->output_pkts)
    +                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
    +                /* Flush here to avoid overflow. */
                     dp_netdev_pmd_flush_output_on_port(pmd, p, now);
                 }
     
    diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
    index 074535b..23930f0 100644
    --- a/vswitchd/vswitch.xml
    +++ b/vswitchd/vswitch.xml
    @@ -344,6 +344,21 @@
             </p>
           </column>
     
    +      <column name="other_config" key="output-max-latency"
    +              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
    +        <p>
    +          Specifies the time in milliseconds that a packet can wait in output
    +          batch for sending i.e. amount of time that packet can spend in an
    +          intermediate output queue before sending to netdev.
    +          This option can be used to configure balance between throughput
    +          and latency. Lower values decreases latency while higher values
    +          may be useful to achieve higher performance.
    +        </p>
    +        <p>
    +          Defaults to 0 i.e. instant packet sending (latency optimized).
    +        </p>
    +      </column>
    +
           <column name="other_config" key="n-handler-threads"
                   type='{"type": "integer", "minInteger": 1}'>
             <p>
    -- 
    2.7.4
Ilya Maximets July 31, 2017, 3:37 p.m. UTC | #2
On 28.07.2017 10:20, Darrell Ball wrote:
> I have not tested yet
> 
> However, I would have expected something max latency config. to be specific to netdev-dpdk port types

IMHO, if we can make it generic, we must make it generic. Making of this
functionality netdev-dpdk specific will brake ability to test it using
unit tests. As the change is complex and has a lot of pitfalls like
possible packet stucks and possible latency issues, this code should be
covered by unit tests to simplify the support and modifications.
(And it's already partly covered because it is generic. And I fixed many
minor issues while developing through unit test failures.)

In the future this can be used also to improve performance of netdev-linux
by replacing sendmsg() with batched sendmmsg(). This should significantly
increase performance of flood actions while MACs are not learned yet in
action NORMAL.

> This type of code also seems to intersect with present and future QoS considerations in netdev-dpdk

Maybe, but there are also some related features in mail-list like rx queue
prioritization which are implemented in generic way on dpif-netdev layer.

> 
> -----Original Message-----
> From: Ilya Maximets <i.maximets@samsung.com>
> Date: Wednesday, July 26, 2017 at 8:21 AM
> To: "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>, Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
> Cc: Heetae Ahn <heetae82.ahn@samsung.com>, Ben Pfaff <blp@ovn.org>, Antonio Fischetti <antonio.fischetti@intel.com>, Eelco Chaudron <echaudro@redhat.com>, Ciara Loftus <ciara.loftus@intel.com>, Kevin Traynor <ktraynor@redhat.com>, Darrell Ball <dball@vmware.com>, Ilya Maximets <i.maximets@samsung.com>
> Subject: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.
> 
>     This allows to collect packets from more than one RX burst
>     and send them together with a configurable maximum latency.
>     
>     'other_config:output-max-latency' can be used to configure
>     time that a packet can wait in output batch for sending.
>     
>     Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>     ---
>     
>     millisecon granularity is used for now. Can be easily switched to use
>     microseconds instead.
>     
>      lib/dpif-netdev.c    | 97 +++++++++++++++++++++++++++++++++++++++++++---------
>      vswitchd/vswitch.xml | 15 ++++++++
>      2 files changed, 95 insertions(+), 17 deletions(-)
>     
>     diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>     index 07c7dad..e5f8a3d 100644
>     --- a/lib/dpif-netdev.c
>     +++ b/lib/dpif-netdev.c
>     @@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>      #define MAX_RECIRC_DEPTH 5
>      DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>      
>     +/* Use instant packet send by default. */
>     +#define DEFAULT_OUTPUT_MAX_LATENCY 0
>     +
>      /* Configuration parameters. */
>      enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
>      enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
>     @@ -261,6 +264,9 @@ struct dp_netdev {
>          struct hmap ports;
>          struct seq *port_seq;       /* Incremented whenever a port changes. */
>      
>     +    /* The time that a packet can wait in output batch for sending. */
>     +    atomic_uint32_t output_max_latency;
>     +
>          /* Meters. */
>          struct ovs_mutex meter_locks[N_METER_LOCKS];
>          struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
>     @@ -498,6 +504,7 @@ struct tx_port {
>          int qid;
>          long long last_used;
>          struct hmap_node node;
>     +    long long output_time;
>          struct dp_packet_batch output_pkts;
>      };
>      
>     @@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
>           * than 'cmap_count(dp->poll_threads)'. */
>          const int static_tx_qid;
>      
>     +    /* Number of filled output batches. */
>     +    int n_output_batches;
>     +
>          struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
>          /* List of rx queues to poll. */
>          struct hmap poll_list OVS_GUARDED;
>     @@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>      static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>                                             struct rxq_poll *poll)
>          OVS_REQUIRES(pmd->port_mutex);
>     -static void
>     +static int
>      dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
>     -                                   long long now);
>     +                                   long long now, bool force);
>      static void reconfigure_datapath(struct dp_netdev *dp)
>          OVS_REQUIRES(dp->port_mutex);
>      static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
>     @@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>          conntrack_init(&dp->conntrack);
>      
>          atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
>     +    atomic_init(&dp->output_max_latency, DEFAULT_OUTPUT_MAX_LATENCY);
>      
>          cmap_init(&dp->poll_threads);
>          ovs_mutex_init_recursive(&dp->non_pmd_mutex);
>     @@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
>          dp_packet_batch_init_packet(&pp, execute->packet);
>          dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>                                    execute->actions, execute->actions_len, now);
>     -    dp_netdev_pmd_flush_output_packets(pmd, now);
>     +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
>      
>          if (pmd->core_id == NON_PMD_CORE_ID) {
>              ovs_mutex_unlock(&dp->non_pmd_mutex);
>     @@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
>              smap_get_ullong(other_config, "emc-insert-inv-prob",
>                              DEFAULT_EM_FLOW_INSERT_INV_PROB);
>          uint32_t insert_min, cur_min;
>     +    uint32_t output_max_latency, cur_max_latency;
>     +
>     +    output_max_latency = smap_get_int(other_config, "output-max-latency",
>     +                                      DEFAULT_OUTPUT_MAX_LATENCY);
>     +    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
>     +    if (output_max_latency != cur_max_latency) {
>     +        atomic_store_relaxed(&dp->output_max_latency, output_max_latency);
>     +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
>     +                  output_max_latency);
>     +    }
>      
>          if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>              free(dp->pmd_cmask);
>     @@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
>          non_atomic_ullong_add(&pmd->cycles.n[type], interval);
>      }
>      
>     -static void
>     +static int
>      dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>                                         struct tx_port *p, long long now)
>      {
>          int tx_qid;
>     +    int output_cnt;
>          bool dynamic_txqs;
>      
>          dynamic_txqs = p->port->dynamic_txqs;
>     @@ -3106,21 +3128,39 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>              tx_qid = pmd->static_tx_qid;
>          }
>      
>     +    output_cnt = dp_packet_batch_size(&p->output_pkts);
>          netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
>          dp_packet_batch_init(&p->output_pkts);
>     +
>     +    if (output_cnt) {
>     +        ovs_assert(pmd->n_output_batches > 0);
>     +        pmd->n_output_batches--;
>     +    }
>     +    return output_cnt;
>      }
>      
>     -static void
>     +static int
>      dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
>     -                                   long long now)
>     +                                   long long now, bool force)
>      {
>          struct tx_port *p;
>     +    int output_cnt = 0;
>     +
>     +    if (!pmd->n_output_batches) {
>     +        return 0;
>     +    }
>     +
>     +    if (!now) {
>     +        now = time_msec();
>     +    }
>      
>          HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>     -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>     -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>     +        if (!dp_packet_batch_is_empty(&p->output_pkts)
>     +            && (force || p->output_time <= now)) {
>     +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>              }
>          }
>     +    return output_cnt;
>      }
>      
>      static int
>     @@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>      {
>          struct dp_packet_batch batch;
>          int error;
>     -    int batch_cnt = 0;
>     +    int batch_cnt = 0, output_cnt = 0;
>      
>          dp_packet_batch_init(&batch);
>          error = netdev_rxq_recv(rx, &batch);
>     @@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>      
>              batch_cnt = batch.count;
>              dp_netdev_input(pmd, &batch, port_no, now);
>     -        dp_netdev_pmd_flush_output_packets(pmd, now);
>     +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now, false);
>          } else if (error != EAGAIN && error != EOPNOTSUPP) {
>              static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>      
>     @@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                          netdev_rxq_get_name(rx), ovs_strerror(error));
>          }
>      
>     -    return batch_cnt;
>     +    return batch_cnt + output_cnt;
>      }
>      
>      static struct tx_port *
>     @@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
>      {
>          struct tx_port *tx_port_cached;
>      
>     +    /* Flush all the queued packets. */
>     +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
>          /* Free all used tx queue ids. */
>          dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
>      
>     @@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
>          bool exiting;
>          int poll_cnt;
>          int i;
>     -    int process_packets = 0;
>      
>          poll_list = NULL;
>      
>     @@ -3788,8 +3829,10 @@ reload:
>      
>          cycles_count_start(pmd);
>          for (;;) {
>     +        int process_packets = 0;
>     +
>              for (i = 0; i < poll_cnt; i++) {
>     -            process_packets =
>     +            process_packets +=
>                      dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
>                                                 poll_list[i].port_no);
>                  cycles_count_intermediate(pmd,
>     @@ -3797,6 +3840,16 @@ reload:
>                                                            : PMD_CYCLES_IDLE);
>              }
>      
>     +        if (!process_packets) {
>     +            /* We didn't receive anything in the process loop.
>     +             * Check if we need to send something. */
>     +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
>     +                                                                 0, false);
>     +            cycles_count_intermediate(pmd,
>     +                                      process_packets ? PMD_CYCLES_PROCESSING
>     +                                                      : PMD_CYCLES_IDLE);
>     +        }
>     +
>              if (lc++ > 1024) {
>                  bool reload;
>      
>     @@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>          pmd->numa_id = numa_id;
>          pmd->need_reload = false;
>      
>     +    pmd->n_output_batches = 0;
>     +
>          *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
>      
>          ovs_refcount_init(&pmd->ref_cnt);
>     @@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>      
>          tx->port = port;
>          tx->qid = -1;
>     +    tx->output_time = 0LL;
>          dp_packet_batch_init(&tx->output_pkts);
>      
>          hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>     @@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
>                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>                  }
>      #endif
>     +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
>     +                uint32_t cur_max_latency;
>     +
>     +                atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
>     +                p->output_time = now + cur_max_latency;
>      
>     -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>     -                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>     -                /* Some packets was generated while input batch processing.
>     -                 * Flush here to avoid overflow. */
>     +                if (dp_packet_batch_size(packets_)) {
>     +                    pmd->n_output_batches++;
>     +                }
>     +            } else if (dp_packet_batch_size(&p->output_pkts)
>     +                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
>     +                /* Flush here to avoid overflow. */
>                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>                  }
>      
>     diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
>     index 074535b..23930f0 100644
>     --- a/vswitchd/vswitch.xml
>     +++ b/vswitchd/vswitch.xml
>     @@ -344,6 +344,21 @@
>              </p>
>            </column>
>      
>     +      <column name="other_config" key="output-max-latency"
>     +              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
>     +        <p>
>     +          Specifies the time in milliseconds that a packet can wait in output
>     +          batch for sending i.e. amount of time that packet can spend in an
>     +          intermediate output queue before sending to netdev.
>     +          This option can be used to configure balance between throughput
>     +          and latency. Lower values decreases latency while higher values
>     +          may be useful to achieve higher performance.
>     +        </p>
>     +        <p>
>     +          Defaults to 0 i.e. instant packet sending (latency optimized).
>     +        </p>
>     +      </column>
>     +
>            <column name="other_config" key="n-handler-threads"
>                    type='{"type": "integer", "minInteger": 1}'>
>              <p>
>     -- 
>     2.7.4
>     
>     
>
Darrell Ball July 31, 2017, 5:53 p.m. UTC | #3
-----Original Message-----
From: Ilya Maximets <i.maximets@samsung.com>
Date: Monday, July 31, 2017 at 8:37 AM
To: Darrell Ball <dball@vmware.com>, "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>, Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
Cc: Heetae Ahn <heetae82.ahn@samsung.com>, Ben Pfaff <blp@ovn.org>, Antonio Fischetti <antonio.fischetti@intel.com>, Eelco Chaudron <echaudro@redhat.com>, Ciara Loftus <ciara.loftus@intel.com>, Kevin Traynor <ktraynor@redhat.com>
Subject: Re: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.

    On 28.07.2017 10:20, Darrell Ball wrote:
    > I have not tested yet
    > 
    > However, I would have expected something max latency config. to be specific to netdev-dpdk port types
    
    IMHO, if we can make it generic, we must make it generic.

[Darrell]
The first question I ask myself is -  is this functionality intrinsically generic or
is it not ?
It is clearly not and trying to make it artificially so would do the following:

1) We end up designing something the wrong way where it partially works.
2) Breaks other features present and future that really do intersect.
 

 Making of this
    functionality netdev-dpdk specific will brake ability to test it using
    unit tests. As the change is complex and has a lot of pitfalls like
    possible packet stucks and possible latency issues, this code should be
    covered by unit tests to simplify the support and modifications.
    (And it's already partly covered because it is generic. And I fixed many
    minor issues while developing through unit test failures.)

[Darrell]
Most of dpdk is not tested by our unit tests because it cannot be simulated well at the
moment. This is orthogonal to the basic question however.

    
    In the future this can be used also to improve performance of netdev-linux
    by replacing sendmsg() with batched sendmmsg(). This should significantly
    increase performance of flood actions while MACs are not learned yet in
    action NORMAL.
    
    > This type of code also seems to intersect with present and future QoS considerations in netdev-dpdk
    
    Maybe, but there are also some related features in mail-list like rx queue
    prioritization which are implemented in generic way on dpif-netdev layer.
    
    > 
    > -----Original Message-----
    > From: Ilya Maximets <i.maximets@samsung.com>
    > Date: Wednesday, July 26, 2017 at 8:21 AM
    > To: "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>, Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
    > Cc: Heetae Ahn <heetae82.ahn@samsung.com>, Ben Pfaff <blp@ovn.org>, Antonio Fischetti <antonio.fischetti@intel.com>, Eelco Chaudron <echaudro@redhat.com>, Ciara Loftus <ciara.loftus@intel.com>, Kevin Traynor <ktraynor@redhat.com>, Darrell Ball <dball@vmware.com>, Ilya Maximets <i.maximets@samsung.com>
    > Subject: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.
    > 
    >     This allows to collect packets from more than one RX burst
    >     and send them together with a configurable maximum latency.
    >     
    >     'other_config:output-max-latency' can be used to configure
    >     time that a packet can wait in output batch for sending.
    >     
    >     Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
    >     ---
    >     
    >     millisecon granularity is used for now. Can be easily switched to use
    >     microseconds instead.
    >     
    >      lib/dpif-netdev.c    | 97 +++++++++++++++++++++++++++++++++++++++++++---------
    >      vswitchd/vswitch.xml | 15 ++++++++
    >      2 files changed, 95 insertions(+), 17 deletions(-)
    >     
    >     diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
    >     index 07c7dad..e5f8a3d 100644
    >     --- a/lib/dpif-netdev.c
    >     +++ b/lib/dpif-netdev.c
    >     @@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
    >      #define MAX_RECIRC_DEPTH 5
    >      DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
    >      
    >     +/* Use instant packet send by default. */
    >     +#define DEFAULT_OUTPUT_MAX_LATENCY 0
    >     +
    >      /* Configuration parameters. */
    >      enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
    >      enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
    >     @@ -261,6 +264,9 @@ struct dp_netdev {
    >          struct hmap ports;
    >          struct seq *port_seq;       /* Incremented whenever a port changes. */
    >      
    >     +    /* The time that a packet can wait in output batch for sending. */
    >     +    atomic_uint32_t output_max_latency;
    >     +
    >          /* Meters. */
    >          struct ovs_mutex meter_locks[N_METER_LOCKS];
    >          struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
    >     @@ -498,6 +504,7 @@ struct tx_port {
    >          int qid;
    >          long long last_used;
    >          struct hmap_node node;
    >     +    long long output_time;
    >          struct dp_packet_batch output_pkts;
    >      };
    >      
    >     @@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
    >           * than 'cmap_count(dp->poll_threads)'. */
    >          const int static_tx_qid;
    >      
    >     +    /* Number of filled output batches. */
    >     +    int n_output_batches;
    >     +
    >          struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
    >          /* List of rx queues to poll. */
    >          struct hmap poll_list OVS_GUARDED;
    >     @@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
    >      static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
    >                                             struct rxq_poll *poll)
    >          OVS_REQUIRES(pmd->port_mutex);
    >     -static void
    >     +static int
    >      dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
    >     -                                   long long now);
    >     +                                   long long now, bool force);
    >      static void reconfigure_datapath(struct dp_netdev *dp)
    >          OVS_REQUIRES(dp->port_mutex);
    >      static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
    >     @@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
    >          conntrack_init(&dp->conntrack);
    >      
    >          atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
    >     +    atomic_init(&dp->output_max_latency, DEFAULT_OUTPUT_MAX_LATENCY);
    >      
    >          cmap_init(&dp->poll_threads);
    >          ovs_mutex_init_recursive(&dp->non_pmd_mutex);
    >     @@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
    >          dp_packet_batch_init_packet(&pp, execute->packet);
    >          dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
    >                                    execute->actions, execute->actions_len, now);
    >     -    dp_netdev_pmd_flush_output_packets(pmd, now);
    >     +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
    >      
    >          if (pmd->core_id == NON_PMD_CORE_ID) {
    >              ovs_mutex_unlock(&dp->non_pmd_mutex);
    >     @@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
    >              smap_get_ullong(other_config, "emc-insert-inv-prob",
    >                              DEFAULT_EM_FLOW_INSERT_INV_PROB);
    >          uint32_t insert_min, cur_min;
    >     +    uint32_t output_max_latency, cur_max_latency;
    >     +
    >     +    output_max_latency = smap_get_int(other_config, "output-max-latency",
    >     +                                      DEFAULT_OUTPUT_MAX_LATENCY);
    >     +    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
    >     +    if (output_max_latency != cur_max_latency) {
    >     +        atomic_store_relaxed(&dp->output_max_latency, output_max_latency);
    >     +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
    >     +                  output_max_latency);
    >     +    }
    >      
    >          if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
    >              free(dp->pmd_cmask);
    >     @@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
    >          non_atomic_ullong_add(&pmd->cycles.n[type], interval);
    >      }
    >      
    >     -static void
    >     +static int
    >      dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
    >                                         struct tx_port *p, long long now)
    >      {
    >          int tx_qid;
    >     +    int output_cnt;
    >          bool dynamic_txqs;
    >      
    >          dynamic_txqs = p->port->dynamic_txqs;
    >     @@ -3106,21 +3128,39 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
    >              tx_qid = pmd->static_tx_qid;
    >          }
    >      
    >     +    output_cnt = dp_packet_batch_size(&p->output_pkts);
    >          netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
    >          dp_packet_batch_init(&p->output_pkts);
    >     +
    >     +    if (output_cnt) {
    >     +        ovs_assert(pmd->n_output_batches > 0);
    >     +        pmd->n_output_batches--;
    >     +    }
    >     +    return output_cnt;
    >      }
    >      
    >     -static void
    >     +static int
    >      dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
    >     -                                   long long now)
    >     +                                   long long now, bool force)
    >      {
    >          struct tx_port *p;
    >     +    int output_cnt = 0;
    >     +
    >     +    if (!pmd->n_output_batches) {
    >     +        return 0;
    >     +    }
    >     +
    >     +    if (!now) {
    >     +        now = time_msec();
    >     +    }
    >      
    >          HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
    >     -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
    >     -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
    >     +        if (!dp_packet_batch_is_empty(&p->output_pkts)
    >     +            && (force || p->output_time <= now)) {
    >     +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p, now);
    >              }
    >          }
    >     +    return output_cnt;
    >      }
    >      
    >      static int
    >     @@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
    >      {
    >          struct dp_packet_batch batch;
    >          int error;
    >     -    int batch_cnt = 0;
    >     +    int batch_cnt = 0, output_cnt = 0;
    >      
    >          dp_packet_batch_init(&batch);
    >          error = netdev_rxq_recv(rx, &batch);
    >     @@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
    >      
    >              batch_cnt = batch.count;
    >              dp_netdev_input(pmd, &batch, port_no, now);
    >     -        dp_netdev_pmd_flush_output_packets(pmd, now);
    >     +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now, false);
    >          } else if (error != EAGAIN && error != EOPNOTSUPP) {
    >              static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
    >      
    >     @@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
    >                          netdev_rxq_get_name(rx), ovs_strerror(error));
    >          }
    >      
    >     -    return batch_cnt;
    >     +    return batch_cnt + output_cnt;
    >      }
    >      
    >      static struct tx_port *
    >     @@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
    >      {
    >          struct tx_port *tx_port_cached;
    >      
    >     +    /* Flush all the queued packets. */
    >     +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
    >          /* Free all used tx queue ids. */
    >          dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
    >      
    >     @@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
    >          bool exiting;
    >          int poll_cnt;
    >          int i;
    >     -    int process_packets = 0;
    >      
    >          poll_list = NULL;
    >      
    >     @@ -3788,8 +3829,10 @@ reload:
    >      
    >          cycles_count_start(pmd);
    >          for (;;) {
    >     +        int process_packets = 0;
    >     +
    >              for (i = 0; i < poll_cnt; i++) {
    >     -            process_packets =
    >     +            process_packets +=
    >                      dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
    >                                                 poll_list[i].port_no);
    >                  cycles_count_intermediate(pmd,
    >     @@ -3797,6 +3840,16 @@ reload:
    >                                                            : PMD_CYCLES_IDLE);
    >              }
    >      
    >     +        if (!process_packets) {
    >     +            /* We didn't receive anything in the process loop.
    >     +             * Check if we need to send something. */
    >     +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
    >     +                                                                 0, false);
    >     +            cycles_count_intermediate(pmd,
    >     +                                      process_packets ? PMD_CYCLES_PROCESSING
    >     +                                                      : PMD_CYCLES_IDLE);
    >     +        }
    >     +
    >              if (lc++ > 1024) {
    >                  bool reload;
    >      
    >     @@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
    >          pmd->numa_id = numa_id;
    >          pmd->need_reload = false;
    >      
    >     +    pmd->n_output_batches = 0;
    >     +
    >          *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
    >      
    >          ovs_refcount_init(&pmd->ref_cnt);
    >     @@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
    >      
    >          tx->port = port;
    >          tx->qid = -1;
    >     +    tx->output_time = 0LL;
    >          dp_packet_batch_init(&tx->output_pkts);
    >      
    >          hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
    >     @@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
    >                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
    >                  }
    >      #endif
    >     +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
    >     +                uint32_t cur_max_latency;
    >     +
    >     +                atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
    >     +                p->output_time = now + cur_max_latency;
    >      
    >     -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
    >     -                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
    >     -                /* Some packets was generated while input batch processing.
    >     -                 * Flush here to avoid overflow. */
    >     +                if (dp_packet_batch_size(packets_)) {
    >     +                    pmd->n_output_batches++;
    >     +                }
    >     +            } else if (dp_packet_batch_size(&p->output_pkts)
    >     +                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
    >     +                /* Flush here to avoid overflow. */
    >                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
    >                  }
    >      
    >     diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
    >     index 074535b..23930f0 100644
    >     --- a/vswitchd/vswitch.xml
    >     +++ b/vswitchd/vswitch.xml
    >     @@ -344,6 +344,21 @@
    >              </p>
    >            </column>
    >      
    >     +      <column name="other_config" key="output-max-latency"
    >     +              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
    >     +        <p>
    >     +          Specifies the time in milliseconds that a packet can wait in output
    >     +          batch for sending i.e. amount of time that packet can spend in an
    >     +          intermediate output queue before sending to netdev.
    >     +          This option can be used to configure balance between throughput
    >     +          and latency. Lower values decreases latency while higher values
    >     +          may be useful to achieve higher performance.
    >     +        </p>
    >     +        <p>
    >     +          Defaults to 0 i.e. instant packet sending (latency optimized).
    >     +        </p>
    >     +      </column>
    >     +
    >            <column name="other_config" key="n-handler-threads"
    >                    type='{"type": "integer", "minInteger": 1}'>
    >              <p>
    >     -- 
    >     2.7.4
    >     
    >     
    >
Bodireddy, Bhanuprakash Aug. 1, 2017, 3:45 p.m. UTC | #4
>    On 28.07.2017 10:20, Darrell Ball wrote:
>    > I have not tested yet
>    >
>    > However, I would have expected something max latency config. to be
>specific to netdev-dpdk port types
>
>    IMHO, if we can make it generic, we must make it generic.
>
>[Darrell]
>The first question I ask myself is -  is this functionality intrinsically generic or is
>it not ?
>It is clearly not and trying to make it artificially so would do the following:
>
>1) We end up designing something the wrong way where it partially works.
>2) Breaks other features present and future that really do intersect.
>
>
> Making of this
>    functionality netdev-dpdk specific will brake ability to test it using
>    unit tests. As the change is complex and has a lot of pitfalls like
>    possible packet stucks and possible latency issues, this code should be
>    covered by unit tests to simplify the support and modifications.
>    (And it's already partly covered because it is generic. And I fixed many
>    minor issues while developing through unit test failures.)
>
>[Darrell]
>Most of dpdk is not tested by our unit tests because it cannot be simulated
>well at the moment. This is orthogonal to the basic question however.

Darrell is right and the unit tests we have currently don't test DPDK datapath well. 
So having this changes in netdev layer shouldn't  impact the unit tests much. 

While I share your other concern that changes in netdev layer will be little complex and slightly
painful for future code changes, this max latency config  introduced in dpif layer may not hold good to
different port types and users may potentially introduce conflicting changes in netdev layer in future to
suit their use cases.
 
>
>
>    In the future this can be used also to improve performance of netdev-linux
>    by replacing sendmsg() with batched sendmmsg(). This should significantly
>    increase performance of flood actions while MACs are not learned yet in
>    action NORMAL.
>
>    > This type of code also seems to intersect with present and future QoS
>considerations in netdev-dpdk

>
>    Maybe, but there are also some related features in mail-list like rx queue
>    prioritization which are implemented in generic way on dpif-netdev layer.

If you are referring to rxq prioritization work by Billy (https://mail.openvswitch.org/pipermail/ovs-dev/2017-July/336001.html),
this feature is more implemented in netdev layer with very minimal updates to dpif layer. 

BTW,  dp_execute_cb()  is getting cluttered with this patch. 

- Bhanuprakash.

>
>    >
>    > -----Original Message-----
>    > From: Ilya Maximets <i.maximets@samsung.com>
>    > Date: Wednesday, July 26, 2017 at 8:21 AM
>    > To: "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>,
>Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
>    > Cc: Heetae Ahn <heetae82.ahn@samsung.com>, Ben Pfaff
><blp@ovn.org>, Antonio Fischetti <antonio.fischetti@intel.com>, Eelco
>Chaudron <echaudro@redhat.com>, Ciara Loftus <ciara.loftus@intel.com>,
>Kevin Traynor <ktraynor@redhat.com>, Darrell Ball <dball@vmware.com>,
>Ilya Maximets <i.maximets@samsung.com>
>    > Subject: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.
>    >
>    >     This allows to collect packets from more than one RX burst
>    >     and send them together with a configurable maximum latency.
>    >
>    >     'other_config:output-max-latency' can be used to configure
>    >     time that a packet can wait in output batch for sending.
>    >
>    >     Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>    >     ---
>    >
>    >     millisecon granularity is used for now. Can be easily switched to use
>    >     microseconds instead.
>    >
>    >      lib/dpif-netdev.c    | 97
>+++++++++++++++++++++++++++++++++++++++++++---------
>    >      vswitchd/vswitch.xml | 15 ++++++++
>    >      2 files changed, 95 insertions(+), 17 deletions(-)
>    >
>    >     diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>    >     index 07c7dad..e5f8a3d 100644
>    >     --- a/lib/dpif-netdev.c
>    >     +++ b/lib/dpif-netdev.c
>    >     @@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>    >      #define MAX_RECIRC_DEPTH 5
>    >      DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>    >
>    >     +/* Use instant packet send by default. */
>    >     +#define DEFAULT_OUTPUT_MAX_LATENCY 0
>    >     +
>    >      /* Configuration parameters. */
>    >      enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow
>table. */
>    >      enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
>    >     @@ -261,6 +264,9 @@ struct dp_netdev {
>    >          struct hmap ports;
>    >          struct seq *port_seq;       /* Incremented whenever a port changes.
>*/
>    >
>    >     +    /* The time that a packet can wait in output batch for sending. */
>    >     +    atomic_uint32_t output_max_latency;
>    >     +
>    >          /* Meters. */
>    >          struct ovs_mutex meter_locks[N_METER_LOCKS];
>    >          struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
>    >     @@ -498,6 +504,7 @@ struct tx_port {
>    >          int qid;
>    >          long long last_used;
>    >          struct hmap_node node;
>    >     +    long long output_time;
>    >          struct dp_packet_batch output_pkts;
>    >      };
>    >
>    >     @@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
>    >           * than 'cmap_count(dp->poll_threads)'. */
>    >          const int static_tx_qid;
>    >
>    >     +    /* Number of filled output batches. */
>    >     +    int n_output_batches;
>    >     +
>    >          struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'.
>*/
>    >          /* List of rx queues to poll. */
>    >          struct hmap poll_list OVS_GUARDED;
>    >     @@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
>    >      static void dp_netdev_del_rxq_from_pmd(struct
>dp_netdev_pmd_thread *pmd,
>    >                                             struct rxq_poll *poll)
>    >          OVS_REQUIRES(pmd->port_mutex);
>    >     -static void
>    >     +static int
>    >      dp_netdev_pmd_flush_output_packets(struct
>dp_netdev_pmd_thread *pmd,
>    >     -                                   long long now);
>    >     +                                   long long now, bool force);
>    >      static void reconfigure_datapath(struct dp_netdev *dp)
>    >          OVS_REQUIRES(dp->port_mutex);
>    >      static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread
>*pmd);
>    >     @@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const
>struct dpif_class *class,
>    >          conntrack_init(&dp->conntrack);
>    >
>    >          atomic_init(&dp->emc_insert_min,
>DEFAULT_EM_FLOW_INSERT_MIN);
>    >     +    atomic_init(&dp->output_max_latency,
>DEFAULT_OUTPUT_MAX_LATENCY);
>    >
>    >          cmap_init(&dp->poll_threads);
>    >          ovs_mutex_init_recursive(&dp->non_pmd_mutex);
>    >     @@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>dpif_execute *execute)
>    >          dp_packet_batch_init_packet(&pp, execute->packet);
>    >          dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>    >                                    execute->actions, execute->actions_len, now);
>    >     -    dp_netdev_pmd_flush_output_packets(pmd, now);
>    >     +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
>    >
>    >          if (pmd->core_id == NON_PMD_CORE_ID) {
>    >              ovs_mutex_unlock(&dp->non_pmd_mutex);
>    >     @@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif,
>const struct smap *other_config)
>    >              smap_get_ullong(other_config, "emc-insert-inv-prob",
>    >                              DEFAULT_EM_FLOW_INSERT_INV_PROB);
>    >          uint32_t insert_min, cur_min;
>    >     +    uint32_t output_max_latency, cur_max_latency;
>    >     +
>    >     +    output_max_latency = smap_get_int(other_config, "output-max-
>latency",
>    >     +                                      DEFAULT_OUTPUT_MAX_LATENCY);
>    >     +    atomic_read_relaxed(&dp->output_max_latency,
>&cur_max_latency);
>    >     +    if (output_max_latency != cur_max_latency) {
>    >     +        atomic_store_relaxed(&dp->output_max_latency,
>output_max_latency);
>    >     +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
>    >     +                  output_max_latency);
>    >     +    }
>    >
>    >          if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>    >              free(dp->pmd_cmask);
>    >     @@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct
>dp_netdev_pmd_thread *pmd,
>    >          non_atomic_ullong_add(&pmd->cycles.n[type], interval);
>    >      }
>    >
>    >     -static void
>    >     +static int
>    >      dp_netdev_pmd_flush_output_on_port(struct
>dp_netdev_pmd_thread *pmd,
>    >                                         struct tx_port *p, long long now)
>    >      {
>    >          int tx_qid;
>    >     +    int output_cnt;
>    >          bool dynamic_txqs;
>    >
>    >          dynamic_txqs = p->port->dynamic_txqs;
>    >     @@ -3106,21 +3128,39 @@
>dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
>    >              tx_qid = pmd->static_tx_qid;
>    >          }
>    >
>    >     +    output_cnt = dp_packet_batch_size(&p->output_pkts);
>    >          netdev_send(p->port->netdev, tx_qid, &p->output_pkts,
>dynamic_txqs);
>    >          dp_packet_batch_init(&p->output_pkts);
>    >     +
>    >     +    if (output_cnt) {
>    >     +        ovs_assert(pmd->n_output_batches > 0);
>    >     +        pmd->n_output_batches--;
>    >     +    }
>    >     +    return output_cnt;
>    >      }
>    >
>    >     -static void
>    >     +static int
>    >      dp_netdev_pmd_flush_output_packets(struct
>dp_netdev_pmd_thread *pmd,
>    >     -                                   long long now)
>    >     +                                   long long now, bool force)
>    >      {
>    >          struct tx_port *p;
>    >     +    int output_cnt = 0;
>    >     +
>    >     +    if (!pmd->n_output_batches) {
>    >     +        return 0;
>    >     +    }
>    >     +
>    >     +    if (!now) {
>    >     +        now = time_msec();
>    >     +    }
>    >
>    >          HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>    >     -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>    >     -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>    >     +        if (!dp_packet_batch_is_empty(&p->output_pkts)
>    >     +            && (force || p->output_time <= now)) {
>    >     +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p,
>now);
>    >              }
>    >          }
>    >     +    return output_cnt;
>    >      }
>    >
>    >      static int
>    >     @@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,
>    >      {
>    >          struct dp_packet_batch batch;
>    >          int error;
>    >     -    int batch_cnt = 0;
>    >     +    int batch_cnt = 0, output_cnt = 0;
>    >
>    >          dp_packet_batch_init(&batch);
>    >          error = netdev_rxq_recv(rx, &batch);
>    >     @@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,
>    >
>    >              batch_cnt = batch.count;
>    >              dp_netdev_input(pmd, &batch, port_no, now);
>    >     -        dp_netdev_pmd_flush_output_packets(pmd, now);
>    >     +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now,
>false);
>    >          } else if (error != EAGAIN && error != EOPNOTSUPP) {
>    >              static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>    >
>    >     @@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,
>    >                          netdev_rxq_get_name(rx), ovs_strerror(error));
>    >          }
>    >
>    >     -    return batch_cnt;
>    >     +    return batch_cnt + output_cnt;
>    >      }
>    >
>    >      static struct tx_port *
>    >     @@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct
>dp_netdev_pmd_thread *pmd)
>    >      {
>    >          struct tx_port *tx_port_cached;
>    >
>    >     +    /* Flush all the queued packets. */
>    >     +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
>    >          /* Free all used tx queue ids. */
>    >          dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
>    >
>    >     @@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
>    >          bool exiting;
>    >          int poll_cnt;
>    >          int i;
>    >     -    int process_packets = 0;
>    >
>    >          poll_list = NULL;
>    >
>    >     @@ -3788,8 +3829,10 @@ reload:
>    >
>    >          cycles_count_start(pmd);
>    >          for (;;) {
>    >     +        int process_packets = 0;
>    >     +
>    >              for (i = 0; i < poll_cnt; i++) {
>    >     -            process_packets =
>    >     +            process_packets +=
>    >                      dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
>    >                                                 poll_list[i].port_no);
>    >                  cycles_count_intermediate(pmd,
>    >     @@ -3797,6 +3840,16 @@ reload:
>    >                                                            : PMD_CYCLES_IDLE);
>    >              }
>    >
>    >     +        if (!process_packets) {
>    >     +            /* We didn't receive anything in the process loop.
>    >     +             * Check if we need to send something. */
>    >     +            process_packets =
>dp_netdev_pmd_flush_output_packets(pmd,
>    >     +                                                                 0, false);
>    >     +            cycles_count_intermediate(pmd,
>    >     +                                      process_packets ? PMD_CYCLES_PROCESSING
>    >     +                                                      : PMD_CYCLES_IDLE);
>    >     +        }
>    >     +
>    >              if (lc++ > 1024) {
>    >                  bool reload;
>    >
>    >     @@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct
>dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>    >          pmd->numa_id = numa_id;
>    >          pmd->need_reload = false;
>    >
>    >     +    pmd->n_output_batches = 0;
>    >     +
>    >          *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp-
>>poll_threads);
>    >
>    >          ovs_refcount_init(&pmd->ref_cnt);
>    >     @@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
>    >
>    >          tx->port = port;
>    >          tx->qid = -1;
>    >     +    tx->output_time = 0LL;
>    >          dp_packet_batch_init(&tx->output_pkts);
>    >
>    >          hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no));
>    >     @@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
>    >                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>    >                  }
>    >      #endif
>    >     +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
>    >     +                uint32_t cur_max_latency;
>    >     +
>    >     +                atomic_read_relaxed(&dp->output_max_latency,
>&cur_max_latency);
>    >     +                p->output_time = now + cur_max_latency;
>    >
>    >     -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>    >     -                       + dp_packet_batch_size(packets_) >
>NETDEV_MAX_BURST)) {
>    >     -                /* Some packets was generated while input batch processing.
>    >     -                 * Flush here to avoid overflow. */
>    >     +                if (dp_packet_batch_size(packets_)) {
>    >     +                    pmd->n_output_batches++;
>    >     +                }
>    >     +            } else if (dp_packet_batch_size(&p->output_pkts)
>    >     +                       + dp_packet_batch_size(packets_) >
>NETDEV_MAX_BURST) {
>    >     +                /* Flush here to avoid overflow. */
>    >                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>    >                  }
>    >
>    >     diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
>    >     index 074535b..23930f0 100644
>    >     --- a/vswitchd/vswitch.xml
>    >     +++ b/vswitchd/vswitch.xml
>    >     @@ -344,6 +344,21 @@
>    >              </p>
>    >            </column>
>    >
>    >     +      <column name="other_config" key="output-max-latency"
>    >     +              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
>    >     +        <p>
>    >     +          Specifies the time in milliseconds that a packet can wait in output
>    >     +          batch for sending i.e. amount of time that packet can spend in an
>    >     +          intermediate output queue before sending to netdev.
>    >     +          This option can be used to configure balance between throughput
>    >     +          and latency. Lower values decreases latency while higher values
>    >     +          may be useful to achieve higher performance.
>    >     +        </p>
>    >     +        <p>
>    >     +          Defaults to 0 i.e. instant packet sending (latency optimized).
>    >     +        </p>
>    >     +      </column>
>    >     +
>    >            <column name="other_config" key="n-handler-threads"
>    >                    type='{"type": "integer", "minInteger": 1}'>
>    >              <p>
>    >     --
>    >     2.7.4
>    >
>    >
>    >
>
Ilya Maximets Aug. 2, 2017, 9:45 a.m. UTC | #5
On 01.08.2017 18:45, Bodireddy, Bhanuprakash wrote:
>>    On 28.07.2017 10:20, Darrell Ball wrote:
>>    > I have not tested yet
>>    >
>>    > However, I would have expected something max latency config. to be
>> specific to netdev-dpdk port types
>>
>>    IMHO, if we can make it generic, we must make it generic.
>>
>> [Darrell]
>> The first question I ask myself is -  is this functionality intrinsically generic or is
>> it not ?
>> It is clearly not

It's yours opinion. Do not give it as a well-known axiom.

>> and trying to make it artificially so would do the following:
>>
>> 1) We end up designing something the wrong way where it partially works.
>> 2) Breaks other features present and future that really do intersect.

The same applicable to netdev restricted features. Additionally we will have
to implement such functionality for each netdev separately if needed and
feature generic features that will intersect will be blocked due to one
particular netdev implementation. It's not clear what is worse.

In addition,
Such latency configuration can't be implemented without support from dpif.
This leads to the question:
Do we need specific for only one netdev code in generic dpif? We already have
too much dpdk specific code there.
Following your logic there should be separate DPDK datapath in OvS as it was
in earlier implementations from Intel.

>>
>> Making of this
>>    functionality netdev-dpdk specific will brake ability to test it using
>>    unit tests. As the change is complex and has a lot of pitfalls like
>>    possible packet stucks and possible latency issues, this code should be
>>    covered by unit tests to simplify the support and modifications.
>>    (And it's already partly covered because it is generic. And I fixed many
>>    minor issues while developing through unit test failures.)
>>
>> [Darrell]
>> Most of dpdk is not tested by our unit tests because it cannot be simulated
>> well at the moment.

It's not the argument to not test new features.

> This is orthogonal to the basic question however.
> 
> Darrell is right and the unit tests we have currently don't test DPDK datapath well. 
> So having this changes in netdev layer shouldn't  impact the unit tests much.

Yes, and that is the issue. IMHO, if we can do it generic and test as all other
generic code, we should do it. There is no dependencies from dpdk library.
This actually means that functionality can be tested in current environment.
But making it netdev-specific will block this ability.
 
> While I share your other concern that changes in netdev layer will be little complex and slightly
> painful for future code changes, this max latency config  introduced in dpif layer may not hold good to
> different port types and users may potentially introduce conflicting changes in netdev layer in future to
> suit their use cases.
>  
>>
>>
>>    In the future this can be used also to improve performance of netdev-linux
>>    by replacing sendmsg() with batched sendmmsg(). This should significantly
>>    increase performance of flood actions while MACs are not learned yet in
>>    action NORMAL.
>>
>>    > This type of code also seems to intersect with present and future QoS
>> considerations in netdev-dpdk
> 
>>
>>    Maybe, but there are also some related features in mail-list like rx queue
>>    prioritization which are implemented in generic way on dpif-netdev layer.
> 
> If you are referring to rxq prioritization work by Billy (https://mail.openvswitch.org/pipermail/ovs-dev/2017-July/336001.html),
> this feature is more implemented in netdev layer with very minimal updates to dpif layer. 
> 
> BTW,  dp_execute_cb()  is getting cluttered with this patch. 
> 
> - Bhanuprakash.
> 
>>
>>    >
>>    > -----Original Message-----
>>    > From: Ilya Maximets <i.maximets@samsung.com>
>>    > Date: Wednesday, July 26, 2017 at 8:21 AM
>>    > To: "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>,
>> Bhanuprakash Bodireddy <bhanuprakash.bodireddy@intel.com>
>>    > Cc: Heetae Ahn <heetae82.ahn@samsung.com>, Ben Pfaff
>> <blp@ovn.org>, Antonio Fischetti <antonio.fischetti@intel.com>, Eelco
>> Chaudron <echaudro@redhat.com>, Ciara Loftus <ciara.loftus@intel.com>,
>> Kevin Traynor <ktraynor@redhat.com>, Darrell Ball <dball@vmware.com>,
>> Ilya Maximets <i.maximets@samsung.com>
>>    > Subject: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.
>>    >
>>    >     This allows to collect packets from more than one RX burst
>>    >     and send them together with a configurable maximum latency.
>>    >
>>    >     'other_config:output-max-latency' can be used to configure
>>    >     time that a packet can wait in output batch for sending.
>>    >
>>    >     Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>>    >     ---
>>    >
>>    >     millisecon granularity is used for now. Can be easily switched to use
>>    >     microseconds instead.
>>    >
>>    >      lib/dpif-netdev.c    | 97
>> +++++++++++++++++++++++++++++++++++++++++++---------
>>    >      vswitchd/vswitch.xml | 15 ++++++++
>>    >      2 files changed, 95 insertions(+), 17 deletions(-)
>>    >
>>    >     diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>>    >     index 07c7dad..e5f8a3d 100644
>>    >     --- a/lib/dpif-netdev.c
>>    >     +++ b/lib/dpif-netdev.c
>>    >     @@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>>    >      #define MAX_RECIRC_DEPTH 5
>>    >      DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>>    >
>>    >     +/* Use instant packet send by default. */
>>    >     +#define DEFAULT_OUTPUT_MAX_LATENCY 0
>>    >     +
>>    >      /* Configuration parameters. */
>>    >      enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow
>> table. */
>>    >      enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
>>    >     @@ -261,6 +264,9 @@ struct dp_netdev {
>>    >          struct hmap ports;
>>    >          struct seq *port_seq;       /* Incremented whenever a port changes.
>> */
>>    >
>>    >     +    /* The time that a packet can wait in output batch for sending. */
>>    >     +    atomic_uint32_t output_max_latency;
>>    >     +
>>    >          /* Meters. */
>>    >          struct ovs_mutex meter_locks[N_METER_LOCKS];
>>    >          struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
>>    >     @@ -498,6 +504,7 @@ struct tx_port {
>>    >          int qid;
>>    >          long long last_used;
>>    >          struct hmap_node node;
>>    >     +    long long output_time;
>>    >          struct dp_packet_batch output_pkts;
>>    >      };
>>    >
>>    >     @@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
>>    >           * than 'cmap_count(dp->poll_threads)'. */
>>    >          const int static_tx_qid;
>>    >
>>    >     +    /* Number of filled output batches. */
>>    >     +    int n_output_batches;
>>    >     +
>>    >          struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'.
>> */
>>    >          /* List of rx queues to poll. */
>>    >          struct hmap poll_list OVS_GUARDED;
>>    >     @@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>> dp_netdev_pmd_thread *pmd,
>>    >      static void dp_netdev_del_rxq_from_pmd(struct
>> dp_netdev_pmd_thread *pmd,
>>    >                                             struct rxq_poll *poll)
>>    >          OVS_REQUIRES(pmd->port_mutex);
>>    >     -static void
>>    >     +static int
>>    >      dp_netdev_pmd_flush_output_packets(struct
>> dp_netdev_pmd_thread *pmd,
>>    >     -                                   long long now);
>>    >     +                                   long long now, bool force);
>>    >      static void reconfigure_datapath(struct dp_netdev *dp)
>>    >          OVS_REQUIRES(dp->port_mutex);
>>    >      static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread
>> *pmd);
>>    >     @@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const
>> struct dpif_class *class,
>>    >          conntrack_init(&dp->conntrack);
>>    >
>>    >          atomic_init(&dp->emc_insert_min,
>> DEFAULT_EM_FLOW_INSERT_MIN);
>>    >     +    atomic_init(&dp->output_max_latency,
>> DEFAULT_OUTPUT_MAX_LATENCY);
>>    >
>>    >          cmap_init(&dp->poll_threads);
>>    >          ovs_mutex_init_recursive(&dp->non_pmd_mutex);
>>    >     @@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>> dpif_execute *execute)
>>    >          dp_packet_batch_init_packet(&pp, execute->packet);
>>    >          dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>>    >                                    execute->actions, execute->actions_len, now);
>>    >     -    dp_netdev_pmd_flush_output_packets(pmd, now);
>>    >     +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
>>    >
>>    >          if (pmd->core_id == NON_PMD_CORE_ID) {
>>    >              ovs_mutex_unlock(&dp->non_pmd_mutex);
>>    >     @@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif,
>> const struct smap *other_config)
>>    >              smap_get_ullong(other_config, "emc-insert-inv-prob",
>>    >                              DEFAULT_EM_FLOW_INSERT_INV_PROB);
>>    >          uint32_t insert_min, cur_min;
>>    >     +    uint32_t output_max_latency, cur_max_latency;
>>    >     +
>>    >     +    output_max_latency = smap_get_int(other_config, "output-max-
>> latency",
>>    >     +                                      DEFAULT_OUTPUT_MAX_LATENCY);
>>    >     +    atomic_read_relaxed(&dp->output_max_latency,
>> &cur_max_latency);
>>    >     +    if (output_max_latency != cur_max_latency) {
>>    >     +        atomic_store_relaxed(&dp->output_max_latency,
>> output_max_latency);
>>    >     +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
>>    >     +                  output_max_latency);
>>    >     +    }
>>    >
>>    >          if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>>    >              free(dp->pmd_cmask);
>>    >     @@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct
>> dp_netdev_pmd_thread *pmd,
>>    >          non_atomic_ullong_add(&pmd->cycles.n[type], interval);
>>    >      }
>>    >
>>    >     -static void
>>    >     +static int
>>    >      dp_netdev_pmd_flush_output_on_port(struct
>> dp_netdev_pmd_thread *pmd,
>>    >                                         struct tx_port *p, long long now)
>>    >      {
>>    >          int tx_qid;
>>    >     +    int output_cnt;
>>    >          bool dynamic_txqs;
>>    >
>>    >          dynamic_txqs = p->port->dynamic_txqs;
>>    >     @@ -3106,21 +3128,39 @@
>> dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>> *pmd,
>>    >              tx_qid = pmd->static_tx_qid;
>>    >          }
>>    >
>>    >     +    output_cnt = dp_packet_batch_size(&p->output_pkts);
>>    >          netdev_send(p->port->netdev, tx_qid, &p->output_pkts,
>> dynamic_txqs);
>>    >          dp_packet_batch_init(&p->output_pkts);
>>    >     +
>>    >     +    if (output_cnt) {
>>    >     +        ovs_assert(pmd->n_output_batches > 0);
>>    >     +        pmd->n_output_batches--;
>>    >     +    }
>>    >     +    return output_cnt;
>>    >      }
>>    >
>>    >     -static void
>>    >     +static int
>>    >      dp_netdev_pmd_flush_output_packets(struct
>> dp_netdev_pmd_thread *pmd,
>>    >     -                                   long long now)
>>    >     +                                   long long now, bool force)
>>    >      {
>>    >          struct tx_port *p;
>>    >     +    int output_cnt = 0;
>>    >     +
>>    >     +    if (!pmd->n_output_batches) {
>>    >     +        return 0;
>>    >     +    }
>>    >     +
>>    >     +    if (!now) {
>>    >     +        now = time_msec();
>>    >     +    }
>>    >
>>    >          HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>>    >     -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>>    >     -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>>    >     +        if (!dp_packet_batch_is_empty(&p->output_pkts)
>>    >     +            && (force || p->output_time <= now)) {
>>    >     +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p,
>> now);
>>    >              }
>>    >          }
>>    >     +    return output_cnt;
>>    >      }
>>    >
>>    >      static int
>>    >     @@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct
>> dp_netdev_pmd_thread *pmd,
>>    >      {
>>    >          struct dp_packet_batch batch;
>>    >          int error;
>>    >     -    int batch_cnt = 0;
>>    >     +    int batch_cnt = 0, output_cnt = 0;
>>    >
>>    >          dp_packet_batch_init(&batch);
>>    >          error = netdev_rxq_recv(rx, &batch);
>>    >     @@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct
>> dp_netdev_pmd_thread *pmd,
>>    >
>>    >              batch_cnt = batch.count;
>>    >              dp_netdev_input(pmd, &batch, port_no, now);
>>    >     -        dp_netdev_pmd_flush_output_packets(pmd, now);
>>    >     +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now,
>> false);
>>    >          } else if (error != EAGAIN && error != EOPNOTSUPP) {
>>    >              static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>>    >
>>    >     @@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct
>> dp_netdev_pmd_thread *pmd,
>>    >                          netdev_rxq_get_name(rx), ovs_strerror(error));
>>    >          }
>>    >
>>    >     -    return batch_cnt;
>>    >     +    return batch_cnt + output_cnt;
>>    >      }
>>    >
>>    >      static struct tx_port *
>>    >     @@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct
>> dp_netdev_pmd_thread *pmd)
>>    >      {
>>    >          struct tx_port *tx_port_cached;
>>    >
>>    >     +    /* Flush all the queued packets. */
>>    >     +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
>>    >          /* Free all used tx queue ids. */
>>    >          dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
>>    >
>>    >     @@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
>>    >          bool exiting;
>>    >          int poll_cnt;
>>    >          int i;
>>    >     -    int process_packets = 0;
>>    >
>>    >          poll_list = NULL;
>>    >
>>    >     @@ -3788,8 +3829,10 @@ reload:
>>    >
>>    >          cycles_count_start(pmd);
>>    >          for (;;) {
>>    >     +        int process_packets = 0;
>>    >     +
>>    >              for (i = 0; i < poll_cnt; i++) {
>>    >     -            process_packets =
>>    >     +            process_packets +=
>>    >                      dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
>>    >                                                 poll_list[i].port_no);
>>    >                  cycles_count_intermediate(pmd,
>>    >     @@ -3797,6 +3840,16 @@ reload:
>>    >                                                            : PMD_CYCLES_IDLE);
>>    >              }
>>    >
>>    >     +        if (!process_packets) {
>>    >     +            /* We didn't receive anything in the process loop.
>>    >     +             * Check if we need to send something. */
>>    >     +            process_packets =
>> dp_netdev_pmd_flush_output_packets(pmd,
>>    >     +                                                                 0, false);
>>    >     +            cycles_count_intermediate(pmd,
>>    >     +                                      process_packets ? PMD_CYCLES_PROCESSING
>>    >     +                                                      : PMD_CYCLES_IDLE);
>>    >     +        }
>>    >     +
>>    >              if (lc++ > 1024) {
>>    >                  bool reload;
>>    >
>>    >     @@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct
>> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>    >          pmd->numa_id = numa_id;
>>    >          pmd->need_reload = false;
>>    >
>>    >     +    pmd->n_output_batches = 0;
>>    >     +
>>    >          *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp-
>>> poll_threads);
>>    >
>>    >          ovs_refcount_init(&pmd->ref_cnt);
>>    >     @@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>> dp_netdev_pmd_thread *pmd,
>>    >
>>    >          tx->port = port;
>>    >          tx->qid = -1;
>>    >     +    tx->output_time = 0LL;
>>    >          dp_packet_batch_init(&tx->output_pkts);
>>    >
>>    >          hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>> port_no));
>>    >     @@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct
>> dp_packet_batch *packets_,
>>    >                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>>    >                  }
>>    >      #endif
>>    >     +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
>>    >     +                uint32_t cur_max_latency;
>>    >     +
>>    >     +                atomic_read_relaxed(&dp->output_max_latency,
>> &cur_max_latency);
>>    >     +                p->output_time = now + cur_max_latency;
>>    >
>>    >     -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>>    >     -                       + dp_packet_batch_size(packets_) >
>> NETDEV_MAX_BURST)) {
>>    >     -                /* Some packets was generated while input batch processing.
>>    >     -                 * Flush here to avoid overflow. */
>>    >     +                if (dp_packet_batch_size(packets_)) {
>>    >     +                    pmd->n_output_batches++;
>>    >     +                }
>>    >     +            } else if (dp_packet_batch_size(&p->output_pkts)
>>    >     +                       + dp_packet_batch_size(packets_) >
>> NETDEV_MAX_BURST) {
>>    >     +                /* Flush here to avoid overflow. */
>>    >                      dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>>    >                  }
>>    >
>>    >     diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
>>    >     index 074535b..23930f0 100644
>>    >     --- a/vswitchd/vswitch.xml
>>    >     +++ b/vswitchd/vswitch.xml
>>    >     @@ -344,6 +344,21 @@
>>    >              </p>
>>    >            </column>
>>    >
>>    >     +      <column name="other_config" key="output-max-latency"
>>    >     +              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
>>    >     +        <p>
>>    >     +          Specifies the time in milliseconds that a packet can wait in output
>>    >     +          batch for sending i.e. amount of time that packet can spend in an
>>    >     +          intermediate output queue before sending to netdev.
>>    >     +          This option can be used to configure balance between throughput
>>    >     +          and latency. Lower values decreases latency while higher values
>>    >     +          may be useful to achieve higher performance.
>>    >     +        </p>
>>    >     +        <p>
>>    >     +          Defaults to 0 i.e. instant packet sending (latency optimized).
>>    >     +        </p>
>>    >     +      </column>
>>    >     +
>>    >            <column name="other_config" key="n-handler-threads"
>>    >                    type='{"type": "integer", "minInteger": 1}'>
>>    >              <p>
>>    >     --
>>    >     2.7.4
>>    >
>>    >
>>    >
>>
>
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 07c7dad..e5f8a3d 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -84,6 +84,9 @@  VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 
+/* Use instant packet send by default. */
+#define DEFAULT_OUTPUT_MAX_LATENCY 0
+
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
@@ -261,6 +264,9 @@  struct dp_netdev {
     struct hmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* The time that a packet can wait in output batch for sending. */
+    atomic_uint32_t output_max_latency;
+
     /* Meters. */
     struct ovs_mutex meter_locks[N_METER_LOCKS];
     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
@@ -498,6 +504,7 @@  struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    long long output_time;
     struct dp_packet_batch output_pkts;
 };
 
@@ -570,6 +577,9 @@  struct dp_netdev_pmd_thread {
      * than 'cmap_count(dp->poll_threads)'. */
     const int static_tx_qid;
 
+    /* Number of filled output batches. */
+    int n_output_batches;
+
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
     struct hmap poll_list OVS_GUARDED;
@@ -663,9 +673,9 @@  static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
 static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                        struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex);
-static void
+static int
 dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
-                                   long long now);
+                                   long long now, bool force);
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
@@ -1188,6 +1198,7 @@  create_dp_netdev(const char *name, const struct dpif_class *class,
     conntrack_init(&dp->conntrack);
 
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+    atomic_init(&dp->output_max_latency, DEFAULT_OUTPUT_MAX_LATENCY);
 
     cmap_init(&dp->poll_threads);
     ovs_mutex_init_recursive(&dp->non_pmd_mutex);
@@ -2843,7 +2854,7 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     dp_packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                               execute->actions, execute->actions_len, now);
-    dp_netdev_pmd_flush_output_packets(pmd, now);
+    dp_netdev_pmd_flush_output_packets(pmd, now, true);
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2892,6 +2903,16 @@  dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
         smap_get_ullong(other_config, "emc-insert-inv-prob",
                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
     uint32_t insert_min, cur_min;
+    uint32_t output_max_latency, cur_max_latency;
+
+    output_max_latency = smap_get_int(other_config, "output-max-latency",
+                                      DEFAULT_OUTPUT_MAX_LATENCY);
+    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
+    if (output_max_latency != cur_max_latency) {
+        atomic_store_relaxed(&dp->output_max_latency, output_max_latency);
+        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
+                  output_max_latency);
+    }
 
     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
         free(dp->pmd_cmask);
@@ -3092,11 +3113,12 @@  cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
     non_atomic_ullong_add(&pmd->cycles.n[type], interval);
 }
 
-static void
+static int
 dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                    struct tx_port *p, long long now)
 {
     int tx_qid;
+    int output_cnt;
     bool dynamic_txqs;
 
     dynamic_txqs = p->port->dynamic_txqs;
@@ -3106,21 +3128,39 @@  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
         tx_qid = pmd->static_tx_qid;
     }
 
+    output_cnt = dp_packet_batch_size(&p->output_pkts);
     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
     dp_packet_batch_init(&p->output_pkts);
+
+    if (output_cnt) {
+        ovs_assert(pmd->n_output_batches > 0);
+        pmd->n_output_batches--;
+    }
+    return output_cnt;
 }
 
-static void
+static int
 dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
-                                   long long now)
+                                   long long now, bool force)
 {
     struct tx_port *p;
+    int output_cnt = 0;
+
+    if (!pmd->n_output_batches) {
+        return 0;
+    }
+
+    if (!now) {
+        now = time_msec();
+    }
 
     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
-        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
-            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
+        if (!dp_packet_batch_is_empty(&p->output_pkts)
+            && (force || p->output_time <= now)) {
+            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p, now);
         }
     }
+    return output_cnt;
 }
 
 static int
@@ -3130,7 +3170,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 {
     struct dp_packet_batch batch;
     int error;
-    int batch_cnt = 0;
+    int batch_cnt = 0, output_cnt = 0;
 
     dp_packet_batch_init(&batch);
     error = netdev_rxq_recv(rx, &batch);
@@ -3141,7 +3181,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 
         batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no, now);
-        dp_netdev_pmd_flush_output_packets(pmd, now);
+        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now, false);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -3149,7 +3189,7 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                     netdev_rxq_get_name(rx), ovs_strerror(error));
     }
 
-    return batch_cnt;
+    return batch_cnt + output_cnt;
 }
 
 static struct tx_port *
@@ -3685,6 +3725,8 @@  pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct tx_port *tx_port_cached;
 
+    /* Flush all the queued packets. */
+    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
     /* Free all used tx queue ids. */
     dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
 
@@ -3759,7 +3801,6 @@  pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
-    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -3788,8 +3829,10 @@  reload:
 
     cycles_count_start(pmd);
     for (;;) {
+        int process_packets = 0;
+
         for (i = 0; i < poll_cnt; i++) {
-            process_packets =
+            process_packets +=
                 dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
                                            poll_list[i].port_no);
             cycles_count_intermediate(pmd,
@@ -3797,6 +3840,16 @@  reload:
                                                       : PMD_CYCLES_IDLE);
         }
 
+        if (!process_packets) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something. */
+            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
+                                                                 0, false);
+            cycles_count_intermediate(pmd,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
+        }
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -4234,6 +4287,8 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->numa_id = numa_id;
     pmd->need_reload = false;
 
+    pmd->n_output_batches = 0;
+
     *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
 
     ovs_refcount_init(&pmd->ref_cnt);
@@ -4418,6 +4473,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->output_time = 0LL;
     dp_packet_batch_init(&tx->output_pkts);
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
@@ -5094,11 +5150,18 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
 #endif
+            if (dp_packet_batch_is_empty(&p->output_pkts)) {
+                uint32_t cur_max_latency;
+
+                atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
+                p->output_time = now + cur_max_latency;
 
-            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
-                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
-                /* Some packets was generated while input batch processing.
-                 * Flush here to avoid overflow. */
+                if (dp_packet_batch_size(packets_)) {
+                    pmd->n_output_batches++;
+                }
+            } else if (dp_packet_batch_size(&p->output_pkts)
+                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+                /* Flush here to avoid overflow. */
                 dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
 
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index 074535b..23930f0 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -344,6 +344,21 @@ 
         </p>
       </column>
 
+      <column name="other_config" key="output-max-latency"
+              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
+        <p>
+          Specifies the time in milliseconds that a packet can wait in output
+          batch for sending i.e. amount of time that packet can spend in an
+          intermediate output queue before sending to netdev.
+          This option can be used to configure balance between throughput
+          and latency. Lower values decreases latency while higher values
+          may be useful to achieve higher performance.
+        </p>
+        <p>
+          Defaults to 0 i.e. instant packet sending (latency optimized).
+        </p>
+      </column>
+
       <column name="other_config" key="n-handler-threads"
               type='{"type": "integer", "minInteger": 1}'>
         <p>