[ovs-dev,v3,1/4] dpif-netdev: Output packet batching.

Message ID 1502379486-1568-2-git-send-email-i.maximets@samsung.com
State New
Headers show

Commit Message

Ilya Maximets Aug. 10, 2017, 3:38 p.m.
While processing incoming batch of packets they are scattered
across many per-flow batches and sent separately.

This becomes an issue while using more than a few flows.

For example if we have balanced-tcp OvS bonding with 2 ports
there will be 256 datapath internal flows for each dp_hash
pattern. This will lead to scattering of a single recieved
batch across all of that 256 per-flow batches and invoking
send for each packet separately. This behaviour greatly degrades
overall performance of netdev_send because of inability to use
advantages of vectorized transmit functions.
But the half (if 2 ports in bonding) of datapath flows will
have the same output actions. This means that we can collect
them in a single place back and send at once using single call
to netdev_send. This patch introduces per-port packet batch
for output packets for that purpose.

'output_pkts' batch is thread local and located in send port cache.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
 lib/dpif-netdev.c | 104 ++++++++++++++++++++++++++++++++++++++++++------------
 1 file changed, 82 insertions(+), 22 deletions(-)

Comments

Gao Zhenyu Aug. 18, 2017, 2:30 a.m. | #1
Hi IIya,

   Thanks for working on it.
   This patch consumes dp_packet_batch_clone so I have concern on the
performance. Could you please show some performace number with/without your
patch.

Thanks
Zhenyu Gao

2017-08-10 23:38 GMT+08:00 Ilya Maximets <i.maximets@samsung.com>:

> While processing incoming batch of packets they are scattered
> across many per-flow batches and sent separately.
>
> This becomes an issue while using more than a few flows.
>
> For example if we have balanced-tcp OvS bonding with 2 ports
> there will be 256 datapath internal flows for each dp_hash
> pattern. This will lead to scattering of a single recieved
> batch across all of that 256 per-flow batches and invoking
> send for each packet separately. This behaviour greatly degrades
> overall performance of netdev_send because of inability to use
> advantages of vectorized transmit functions.
> But the half (if 2 ports in bonding) of datapath flows will
> have the same output actions. This means that we can collect
> them in a single place back and send at once using single call
> to netdev_send. This patch introduces per-port packet batch
> for output packets for that purpose.
>
> 'output_pkts' batch is thread local and located in send port cache.
>
> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
> ---
>  lib/dpif-netdev.c | 104 ++++++++++++++++++++++++++++++
> ++++++++++++------------
>  1 file changed, 82 insertions(+), 22 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index e2cd931..a2a25be 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -502,6 +502,7 @@ struct tx_port {
>      int qid;
>      long long last_used;
>      struct hmap_node node;
> +    struct dp_packet_batch output_pkts;
>  };
>
>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
> @@ -633,9 +634,10 @@ static void dp_netdev_execute_actions(struct
> dp_netdev_pmd_thread *pmd,
>                                        size_t actions_len,
>                                        long long now);
>  static void dp_netdev_input(struct dp_netdev_pmd_thread *,
> -                            struct dp_packet_batch *, odp_port_t port_no);
> +                            struct dp_packet_batch *, odp_port_t port_no,
> +                            long long now);
>  static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
> -                                  struct dp_packet_batch *);
> +                                  struct dp_packet_batch *, long long
> now);
>
>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>  static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
> @@ -667,6 +669,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
> dp_netdev_pmd_thread *pmd,
>  static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>                                         struct rxq_poll *poll)
>      OVS_REQUIRES(pmd->port_mutex);
> +static void
> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
> +                                   long long now);
>  static void reconfigure_datapath(struct dp_netdev *dp)
>      OVS_REQUIRES(dp->port_mutex);
>  static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
> @@ -2809,6 +2814,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_pmd_thread *pmd;
>      struct dp_packet_batch pp;
> +    long long now = time_msec();
>
>      if (dp_packet_size(execute->packet) < ETH_HEADER_LEN ||
>          dp_packet_size(execute->packet) > UINT16_MAX) {
> @@ -2851,8 +2857,8 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
>
>      dp_packet_batch_init_packet(&pp, execute->packet);
>      dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
> -                              execute->actions, execute->actions_len,
> -                              time_msec());
> +                              execute->actions, execute->actions_len,
> now);
> +    dp_netdev_pmd_flush_output_packets(pmd, now);
>
>      if (pmd->core_id == NON_PMD_CORE_ID) {
>          ovs_mutex_unlock(&dp->non_pmd_mutex);
> @@ -3101,6 +3107,37 @@ cycles_count_intermediate(struct
> dp_netdev_pmd_thread *pmd,
>      non_atomic_ullong_add(&pmd->cycles.n[type], interval);
>  }
>
> +static void
> +dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
> +                                   struct tx_port *p, long long now)
> +{
> +    int tx_qid;
> +    bool dynamic_txqs;
> +
> +    dynamic_txqs = p->port->dynamic_txqs;
> +    if (dynamic_txqs) {
> +        tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
> +    } else {
> +        tx_qid = pmd->static_tx_qid;
> +    }
> +
> +    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, true,
> dynamic_txqs);
> +    dp_packet_batch_init(&p->output_pkts);
> +}
> +
> +static void
> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
> +                                   long long now)
> +{
> +    struct tx_port *p;
> +
> +    HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
> +        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
> +            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
> +        }
> +    }
> +}
> +
>  static int
>  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                             struct netdev_rxq *rx,
> @@ -3113,10 +3150,13 @@ dp_netdev_process_rxq_port(struct
> dp_netdev_pmd_thread *pmd,
>      dp_packet_batch_init(&batch);
>      error = netdev_rxq_recv(rx, &batch);
>      if (!error) {
> +        long long now = time_msec();
> +
>          *recirc_depth_get() = 0;
>
>          batch_cnt = batch.count;
> -        dp_netdev_input(pmd, &batch, port_no);
> +        dp_netdev_input(pmd, &batch, port_no, now);
> +        dp_netdev_pmd_flush_output_packets(pmd, now);
>      } else if (error != EAGAIN && error != EOPNOTSUPP) {
>          static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>
> @@ -4481,6 +4521,7 @@ dp_netdev_add_port_tx_to_pmd(struct
> dp_netdev_pmd_thread *pmd,
>
>      tx->port = port;
>      tx->qid = -1;
> +    dp_packet_batch_init(&tx->output_pkts);
>
>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_
> no));
>      pmd->need_reload = true;
> @@ -4901,7 +4942,8 @@ fast_path_processing(struct dp_netdev_pmd_thread
> *pmd,
>  static void
>  dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
>                    struct dp_packet_batch *packets,
> -                  bool md_is_valid, odp_port_t port_no)
> +                  bool md_is_valid, odp_port_t port_no,
> +                  long long now)
>  {
>      int cnt = packets->count;
>  #if !defined(__CHECKER__) && !defined(_WIN32)
> @@ -4913,7 +4955,6 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
>      OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
>          struct netdev_flow_key keys[PKT_ARRAY_SIZE];
>      struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
> -    long long now = time_msec();
>      size_t n_batches;
>      odp_port_t in_port;
>
> @@ -4949,16 +4990,16 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
>  static void
>  dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
>                  struct dp_packet_batch *packets,
> -                odp_port_t port_no)
> +                odp_port_t port_no, long long now)
>  {
> -    dp_netdev_input__(pmd, packets, false, port_no);
> +    dp_netdev_input__(pmd, packets, false, port_no, now);
>  }
>
>  static void
>  dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
> -                      struct dp_packet_batch *packets)
> +                      struct dp_packet_batch *packets, long long now)
>  {
> -    dp_netdev_input__(pmd, packets, true, 0);
> +    dp_netdev_input__(pmd, packets, true, 0, now);
>  }
>
>  struct dp_netdev_execute_aux {
> @@ -5136,18 +5177,37 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
> *packets_,
>      case OVS_ACTION_ATTR_OUTPUT:
>          p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
>          if (OVS_LIKELY(p)) {
> -            int tx_qid;
> -            bool dynamic_txqs;
> +            struct dp_packet *packet;
> +            struct dp_packet_batch out;
>
> -            dynamic_txqs = p->port->dynamic_txqs;
> -            if (dynamic_txqs) {
> -                tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
> -            } else {
> -                tx_qid = pmd->static_tx_qid;
> +            if (!may_steal) {
> +                dp_packet_batch_clone(&out, packets_);
> +                dp_packet_batch_reset_cutlen(packets_);
> +                packets_ = &out;
> +            }
> +            dp_packet_batch_apply_cutlen(packets_);
> +
> +#ifdef DPDK_NETDEV
> +            if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
> +                             && packets_->packets[0]->source
> +                                != p->output_pkts.packets[0]->source)) {
> +                /* XXX: netdev-dpdk assumes that all packets in a single
> +                 *      outptut batch has the same source. Flush here to
> +                 *      avoid memory access issues. */
> +                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
> +            }
> +#endif
> +
> +            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
> +                       + dp_packet_batch_size(packets_) >
> NETDEV_MAX_BURST)) {
> +                /* Some packets was generated while input batch
> processing.
> +                 * Flush here to avoid overflow. */
> +                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>              }
>
> -            netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
> -                        dynamic_txqs);
> +            DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
> +                dp_packet_batch_add(&p->output_pkts, packet);
> +            }
>              return;
>          }
>          break;
> @@ -5188,7 +5248,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
> *packets_,
>                  }
>
>                  (*depth)++;
> -                dp_netdev_recirculate(pmd, packets_);
> +                dp_netdev_recirculate(pmd, packets_, now);
>                  (*depth)--;
>                  return;
>              }
> @@ -5253,7 +5313,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
> *packets_,
>              }
>
>              (*depth)++;
> -            dp_netdev_recirculate(pmd, packets_);
> +            dp_netdev_recirculate(pmd, packets_, now);
>              (*depth)--;
>
>              return;
> --
> 2.7.4
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Bodireddy, Bhanuprakash Sept. 25, 2017, 3:07 p.m. | #2
Hi Ilya,

This series needs to be rebased.  Few comments below.

>While processing incoming batch of packets they are scattered across many
>per-flow batches and sent separately.
>
>This becomes an issue while using more than a few flows.
>
>For example if we have balanced-tcp OvS bonding with 2 ports there will be
>256 datapath internal flows for each dp_hash pattern. This will lead to
>scattering of a single recieved batch across all of that 256 per-flow batches and
>invoking send for each packet separately. This behaviour greatly degrades
>overall performance of netdev_send because of inability to use advantages of
>vectorized transmit functions.
>But the half (if 2 ports in bonding) of datapath flows will have the same output
>actions. This means that we can collect them in a single place back and send at
>once using single call to netdev_send. This patch introduces per-port packet
>batch for output packets for that purpose.
>
>'output_pkts' batch is thread local and located in send port cache.
>
>Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>---
> lib/dpif-netdev.c | 104
>++++++++++++++++++++++++++++++++++++++++++------------
> 1 file changed, 82 insertions(+), 22 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index e2cd931..a2a25be
>100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -502,6 +502,7 @@ struct tx_port {
>     int qid;
>     long long last_used;
>     struct hmap_node node;
>+    struct dp_packet_batch output_pkts;
> };
>
> /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>@@ -633,9 +634,10 @@ static void dp_netdev_execute_actions(struct
>dp_netdev_pmd_thread *pmd,
>                                       size_t actions_len,
>                                       long long now);  static void dp_netdev_input(struct
>dp_netdev_pmd_thread *,
>-                            struct dp_packet_batch *, odp_port_t port_no);
>+                            struct dp_packet_batch *, odp_port_t port_no,
>+                            long long now);
> static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
>-                                  struct dp_packet_batch *);
>+                                  struct dp_packet_batch *, long long
>+ now);
>
> static void dp_netdev_disable_upcall(struct dp_netdev *);  static void
>dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); @@ -
>667,6 +669,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>dp_netdev_pmd_thread *pmd,  static void
>dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>                                        struct rxq_poll *poll)
>     OVS_REQUIRES(pmd->port_mutex);
>+static void
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+                                   long long now);
> static void reconfigure_datapath(struct dp_netdev *dp)
>     OVS_REQUIRES(dp->port_mutex);
> static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
>@@ -2809,6 +2814,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>dpif_execute *execute)
>     struct dp_netdev *dp = get_dp_netdev(dpif);
>     struct dp_netdev_pmd_thread *pmd;
>     struct dp_packet_batch pp;
>+    long long now = time_msec();

[BHANU] Calling time_msec() can be moved little down in this function, may be after the 'probe' check.

>
>     if (dp_packet_size(execute->packet) < ETH_HEADER_LEN ||
>         dp_packet_size(execute->packet) > UINT16_MAX) { @@ -2851,8 +2857,8
>@@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
>
>     dp_packet_batch_init_packet(&pp, execute->packet);
>     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>-                              execute->actions, execute->actions_len,
>-                              time_msec());
>+                              execute->actions, execute->actions_len, now);
>+    dp_netdev_pmd_flush_output_packets(pmd, now);

[BHANU] Is this code path mostly run in non-pmd thread context? I can only think of bfd case where the 
where all the above runs in monitoring thread(non-pmd) context.  

>
>     if (pmd->core_id == NON_PMD_CORE_ID) {
>         ovs_mutex_unlock(&dp->non_pmd_mutex);
>@@ -3101,6 +3107,37 @@ cycles_count_intermediate(struct
>dp_netdev_pmd_thread *pmd,
>     non_atomic_ullong_add(&pmd->cycles.n[type], interval);  }
>
>+static void
>+dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
>+                                   struct tx_port *p, long long now) {
>+    int tx_qid;
>+    bool dynamic_txqs;
>+
>+    dynamic_txqs = p->port->dynamic_txqs;
>+    if (dynamic_txqs) {
>+        tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
>+    } else {
>+        tx_qid = pmd->static_tx_qid;
>+    }
>+
>+    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, true,
>dynamic_txqs);
>+    dp_packet_batch_init(&p->output_pkts);
>+}
>+
>+static void
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+                                   long long now) {
>+    struct tx_port *p;
>+
>+    HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>+        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>+            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>+        }
>+    }
>+}
>+
> static int
> dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                            struct netdev_rxq *rx, @@ -3113,10 +3150,13 @@
>dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>     dp_packet_batch_init(&batch);
>     error = netdev_rxq_recv(rx, &batch);
>     if (!error) {
>+        long long now = time_msec();
>+
>         *recirc_depth_get() = 0;
>
>         batch_cnt = batch.count;
>-        dp_netdev_input(pmd, &batch, port_no);
>+        dp_netdev_input(pmd, &batch, port_no, now);
>+        dp_netdev_pmd_flush_output_packets(pmd, now);
>     } else if (error != EAGAIN && error != EOPNOTSUPP) {
>         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>
>@@ -4481,6 +4521,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
>
>     tx->port = port;
>     tx->qid = -1;
>+    dp_packet_batch_init(&tx->output_pkts);
>
>     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no));
>     pmd->need_reload = true;
>@@ -4901,7 +4942,8 @@ fast_path_processing(struct
>dp_netdev_pmd_thread *pmd,  static void  dp_netdev_input__(struct
>dp_netdev_pmd_thread *pmd,
>                   struct dp_packet_batch *packets,
>-                  bool md_is_valid, odp_port_t port_no)
>+                  bool md_is_valid, odp_port_t port_no,
>+                  long long now)
> {
>     int cnt = packets->count;
> #if !defined(__CHECKER__) && !defined(_WIN32) @@ -4913,7 +4955,6 @@
>dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
>     OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
>         struct netdev_flow_key keys[PKT_ARRAY_SIZE];
>     struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
>-    long long now = time_msec();
>     size_t n_batches;
>     odp_port_t in_port;
>
>@@ -4949,16 +4990,16 @@ dp_netdev_input__(struct
>dp_netdev_pmd_thread *pmd,  static void  dp_netdev_input(struct
>dp_netdev_pmd_thread *pmd,
>                 struct dp_packet_batch *packets,
>-                odp_port_t port_no)
>+                odp_port_t port_no, long long now)
> {
>-    dp_netdev_input__(pmd, packets, false, port_no);
>+    dp_netdev_input__(pmd, packets, false, port_no, now);
> }
>
> static void
> dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
>-                      struct dp_packet_batch *packets)
>+                      struct dp_packet_batch *packets, long long now)
> {
>-    dp_netdev_input__(pmd, packets, true, 0);
>+    dp_netdev_input__(pmd, packets, true, 0, now);
> }
>
> struct dp_netdev_execute_aux {
>@@ -5136,18 +5177,37 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
>     case OVS_ACTION_ATTR_OUTPUT:
>         p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
>         if (OVS_LIKELY(p)) {
>-            int tx_qid;
>-            bool dynamic_txqs;
>+            struct dp_packet *packet;
>+            struct dp_packet_batch out;
>
>-            dynamic_txqs = p->port->dynamic_txqs;
>-            if (dynamic_txqs) {
>-                tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
>-            } else {
>-                tx_qid = pmd->static_tx_qid;
>+            if (!may_steal) {
>+                dp_packet_batch_clone(&out, packets_);
>+                dp_packet_batch_reset_cutlen(packets_);
>+                packets_ = &out;
>+            }

[BHANU]   The above change seems to be independent and is more about refactoring the code.
I see that  redundant dp_packet_batch_reset_cutlen()  in netdev_send() is removed in the 2/4.

>+            dp_packet_batch_apply_cutlen(packets_);

[BHANU]  Seems fine, as all the redundant calls are removed in 3/4. 

>+
>+#ifdef DPDK_NETDEV
>+            if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
>+                             && packets_->packets[0]->source
>+                                != p->output_pkts.packets[0]->source)) {
>+                /* XXX: netdev-dpdk assumes that all packets in a single
>+                 *      outptut batch has the same source. Flush here to

[BHANU]  Typo with 'output'.

>+                 *      avoid memory access issues. */
>+                dp_netdev_pmd_flush_output_on_port(pmd, p, now);

[BHANU] When would we hit this case?

>+            }
>+#endif
>+
>+            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>+                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>+                /* Some packets was generated while input batch processing.
>+                 * Flush here to avoid overflow. */
>+                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>             }
>
>-            netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>-                        dynamic_txqs);
>+            DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>+                dp_packet_batch_add(&p->output_pkts, packet);
>+            }
>             return;
>         }
>         break;
>@@ -5188,7 +5248,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
>                 }
>
>                 (*depth)++;
>-                dp_netdev_recirculate(pmd, packets_);
>+                dp_netdev_recirculate(pmd, packets_, now);
>                 (*depth)--;
>                 return;
>             }
>@@ -5253,7 +5313,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
>             }
>
>             (*depth)++;
>-            dp_netdev_recirculate(pmd, packets_);
>+            dp_netdev_recirculate(pmd, packets_, now);
>             (*depth)--;
>
>             return;
>--
>2.7.4
Ilya Maximets Sept. 26, 2017, 2:25 p.m. | #3
On 25.09.2017 18:07, Bodireddy, Bhanuprakash wrote:
> Hi Ilya,
> 
> This series needs to be rebased.  Few comments below.

Hi. Thanks for review.

I just returned from vacation and starting working on this.

Comments inline.

Best regards, Ilya Maximets.

> 
>> While processing incoming batch of packets they are scattered across many
>> per-flow batches and sent separately.
>>
>> This becomes an issue while using more than a few flows.
>>
>> For example if we have balanced-tcp OvS bonding with 2 ports there will be
>> 256 datapath internal flows for each dp_hash pattern. This will lead to
>> scattering of a single recieved batch across all of that 256 per-flow batches and
>> invoking send for each packet separately. This behaviour greatly degrades
>> overall performance of netdev_send because of inability to use advantages of
>> vectorized transmit functions.
>> But the half (if 2 ports in bonding) of datapath flows will have the same output
>> actions. This means that we can collect them in a single place back and send at
>> once using single call to netdev_send. This patch introduces per-port packet
>> batch for output packets for that purpose.
>>
>> 'output_pkts' batch is thread local and located in send port cache.
>>
>> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>> ---
>> lib/dpif-netdev.c | 104
>> ++++++++++++++++++++++++++++++++++++++++++------------
>> 1 file changed, 82 insertions(+), 22 deletions(-)
>>
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index e2cd931..a2a25be
>> 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -502,6 +502,7 @@ struct tx_port {
>>     int qid;
>>     long long last_used;
>>     struct hmap_node node;
>> +    struct dp_packet_batch output_pkts;
>> };
>>
>> /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>> @@ -633,9 +634,10 @@ static void dp_netdev_execute_actions(struct
>> dp_netdev_pmd_thread *pmd,
>>                                       size_t actions_len,
>>                                       long long now);  static void dp_netdev_input(struct
>> dp_netdev_pmd_thread *,
>> -                            struct dp_packet_batch *, odp_port_t port_no);
>> +                            struct dp_packet_batch *, odp_port_t port_no,
>> +                            long long now);
>> static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
>> -                                  struct dp_packet_batch *);
>> +                                  struct dp_packet_batch *, long long
>> + now);
>>
>> static void dp_netdev_disable_upcall(struct dp_netdev *);  static void
>> dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); @@ -
>> 667,6 +669,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>> dp_netdev_pmd_thread *pmd,  static void
>> dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>>                                        struct rxq_poll *poll)
>>     OVS_REQUIRES(pmd->port_mutex);
>> +static void
>> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>> *pmd,
>> +                                   long long now);
>> static void reconfigure_datapath(struct dp_netdev *dp)
>>     OVS_REQUIRES(dp->port_mutex);
>> static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
>> @@ -2809,6 +2814,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>> dpif_execute *execute)
>>     struct dp_netdev *dp = get_dp_netdev(dpif);
>>     struct dp_netdev_pmd_thread *pmd;
>>     struct dp_packet_batch pp;
>> +    long long now = time_msec();
> 
> [BHANU] Calling time_msec() can be moved little down in this function, may be after the 'probe' check.

Yes, sure. I'm going to follow suggestion from Jan and rebase this series
on top of other my patch https://patchwork.ozlabs.org/patch/800276/
(dpif-netdev: Keep latest measured time for PMD thread). Time update
moved closer to action executing in that patch.

I'll reply to Jan's mails additionally.

> 
>>
>>     if (dp_packet_size(execute->packet) < ETH_HEADER_LEN ||
>>         dp_packet_size(execute->packet) > UINT16_MAX) { @@ -2851,8 +2857,8
>> @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
>>
>>     dp_packet_batch_init_packet(&pp, execute->packet);
>>     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>> -                              execute->actions, execute->actions_len,
>> -                              time_msec());
>> +                              execute->actions, execute->actions_len, now);
>> +    dp_netdev_pmd_flush_output_packets(pmd, now);
> 
> [BHANU] Is this code path mostly run in non-pmd thread context? I can only think of bfd case where the 
> where all the above runs in monitoring thread(non-pmd) context.

Yes, I think so, but I'm not sure is there some weird cases.
Also, there is no explicit restrictions to run this from the PMD thread.

P.S. In my cases 'main' thread is the main consumer of this function for
     sending LACP messages.

> 
>>
>>     if (pmd->core_id == NON_PMD_CORE_ID) {
>>         ovs_mutex_unlock(&dp->non_pmd_mutex);
>> @@ -3101,6 +3107,37 @@ cycles_count_intermediate(struct
>> dp_netdev_pmd_thread *pmd,
>>     non_atomic_ullong_add(&pmd->cycles.n[type], interval);  }
>>
>> +static void
>> +dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>> *pmd,
>> +                                   struct tx_port *p, long long now) {
>> +    int tx_qid;
>> +    bool dynamic_txqs;
>> +
>> +    dynamic_txqs = p->port->dynamic_txqs;
>> +    if (dynamic_txqs) {
>> +        tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
>> +    } else {
>> +        tx_qid = pmd->static_tx_qid;
>> +    }
>> +
>> +    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, true,
>> dynamic_txqs);
>> +    dp_packet_batch_init(&p->output_pkts);
>> +}
>> +
>> +static void
>> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>> *pmd,
>> +                                   long long now) {
>> +    struct tx_port *p;
>> +
>> +    HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>> +        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>> +            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>> +        }
>> +    }
>> +}
>> +
>> static int
>> dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>>                            struct netdev_rxq *rx, @@ -3113,10 +3150,13 @@
>> dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>>     dp_packet_batch_init(&batch);
>>     error = netdev_rxq_recv(rx, &batch);
>>     if (!error) {
>> +        long long now = time_msec();
>> +
>>         *recirc_depth_get() = 0;
>>
>>         batch_cnt = batch.count;
>> -        dp_netdev_input(pmd, &batch, port_no);
>> +        dp_netdev_input(pmd, &batch, port_no, now);
>> +        dp_netdev_pmd_flush_output_packets(pmd, now);
>>     } else if (error != EAGAIN && error != EOPNOTSUPP) {
>>         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>>
>> @@ -4481,6 +4521,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>> dp_netdev_pmd_thread *pmd,
>>
>>     tx->port = port;
>>     tx->qid = -1;
>> +    dp_packet_batch_init(&tx->output_pkts);
>>
>>     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>> port_no));
>>     pmd->need_reload = true;
>> @@ -4901,7 +4942,8 @@ fast_path_processing(struct
>> dp_netdev_pmd_thread *pmd,  static void  dp_netdev_input__(struct
>> dp_netdev_pmd_thread *pmd,
>>                   struct dp_packet_batch *packets,
>> -                  bool md_is_valid, odp_port_t port_no)
>> +                  bool md_is_valid, odp_port_t port_no,
>> +                  long long now)
>> {
>>     int cnt = packets->count;
>> #if !defined(__CHECKER__) && !defined(_WIN32) @@ -4913,7 +4955,6 @@
>> dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
>>     OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
>>         struct netdev_flow_key keys[PKT_ARRAY_SIZE];
>>     struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
>> -    long long now = time_msec();
>>     size_t n_batches;
>>     odp_port_t in_port;
>>
>> @@ -4949,16 +4990,16 @@ dp_netdev_input__(struct
>> dp_netdev_pmd_thread *pmd,  static void  dp_netdev_input(struct
>> dp_netdev_pmd_thread *pmd,
>>                 struct dp_packet_batch *packets,
>> -                odp_port_t port_no)
>> +                odp_port_t port_no, long long now)
>> {
>> -    dp_netdev_input__(pmd, packets, false, port_no);
>> +    dp_netdev_input__(pmd, packets, false, port_no, now);
>> }
>>
>> static void
>> dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
>> -                      struct dp_packet_batch *packets)
>> +                      struct dp_packet_batch *packets, long long now)
>> {
>> -    dp_netdev_input__(pmd, packets, true, 0);
>> +    dp_netdev_input__(pmd, packets, true, 0, now);
>> }
>>
>> struct dp_netdev_execute_aux {
>> @@ -5136,18 +5177,37 @@ dp_execute_cb(void *aux_, struct
>> dp_packet_batch *packets_,
>>     case OVS_ACTION_ATTR_OUTPUT:
>>         p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
>>         if (OVS_LIKELY(p)) {
>> -            int tx_qid;
>> -            bool dynamic_txqs;
>> +            struct dp_packet *packet;
>> +            struct dp_packet_batch out;
>>
>> -            dynamic_txqs = p->port->dynamic_txqs;
>> -            if (dynamic_txqs) {
>> -                tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
>> -            } else {
>> -                tx_qid = pmd->static_tx_qid;
>> +            if (!may_steal) {
>> +                dp_packet_batch_clone(&out, packets_);
>> +                dp_packet_batch_reset_cutlen(packets_);
>> +                packets_ = &out;
>> +            }
> 
> [BHANU]   The above change seems to be independent and is more about refactoring the code.

It's not the refactoring because we must clone the batch if we're
not going to send it right now. Packets could be changed after that
and we must be sure that we will send the old version.

> I see that  redundant dp_packet_batch_reset_cutlen()  in netdev_send() is removed in the 2/4.
> 
>> +            dp_packet_batch_apply_cutlen(packets_);
> 
> [BHANU]  Seems fine, as all the redundant calls are removed in 3/4. 
> 
>> +
>> +#ifdef DPDK_NETDEV
>> +            if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
>> +                             && packets_->packets[0]->source
>> +                                != p->output_pkts.packets[0]->source)) {
>> +                /* XXX: netdev-dpdk assumes that all packets in a single
>> +                 *      outptut batch has the same source. Flush here to
> 
> [BHANU]  Typo with 'output'.

Thanks.

> 
>> +                 *      avoid memory access issues. */
>> +                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
> 
> [BHANU] When would we hit this case?

Without time based batching we may have packets that matches different
rules. One packet can be cloned while processing and will have DPBUF_MALLOC
source while other will not be cloned after receiving and still will have
DPBUF_DPDK source. But netdev-dpdk checks the source only for the first packet
in batch and it may try to send malloced packet to dpdk port. This will
likely lead to segmentation fault.

With time based batching we may just have packets from different ports like
netdev-dpdk and netdev-linux.

> 
>> +            }
>> +#endif
>> +
>> +            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>> +                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>> +                /* Some packets was generated while input batch processing.
>> +                 * Flush here to avoid overflow. */
>> +                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>>             }
>>
>> -            netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>> -                        dynamic_txqs);
>> +            DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>> +                dp_packet_batch_add(&p->output_pkts, packet);
>> +            }
>>             return;
>>         }
>>         break;
>> @@ -5188,7 +5248,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>> *packets_,
>>                 }
>>
>>                 (*depth)++;
>> -                dp_netdev_recirculate(pmd, packets_);
>> +                dp_netdev_recirculate(pmd, packets_, now);
>>                 (*depth)--;
>>                 return;
>>             }
>> @@ -5253,7 +5313,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>> *packets_,
>>             }
>>
>>             (*depth)++;
>> -            dp_netdev_recirculate(pmd, packets_);
>> +            dp_netdev_recirculate(pmd, packets_, now);
>>             (*depth)--;
>>
>>             return;
>> --
>> 2.7.4
> 
> 
> 
>

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index e2cd931..a2a25be 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -502,6 +502,7 @@  struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    struct dp_packet_batch output_pkts;
 };
 
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
@@ -633,9 +634,10 @@  static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                                       size_t actions_len,
                                       long long now);
 static void dp_netdev_input(struct dp_netdev_pmd_thread *,
-                            struct dp_packet_batch *, odp_port_t port_no);
+                            struct dp_packet_batch *, odp_port_t port_no,
+                            long long now);
 static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
-                                  struct dp_packet_batch *);
+                                  struct dp_packet_batch *, long long now);
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
@@ -667,6 +669,9 @@  static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
 static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                        struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex);
+static void
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+                                   long long now);
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
@@ -2809,6 +2814,7 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *pmd;
     struct dp_packet_batch pp;
+    long long now = time_msec();
 
     if (dp_packet_size(execute->packet) < ETH_HEADER_LEN ||
         dp_packet_size(execute->packet) > UINT16_MAX) {
@@ -2851,8 +2857,8 @@  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
 
     dp_packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
-                              execute->actions, execute->actions_len,
-                              time_msec());
+                              execute->actions, execute->actions_len, now);
+    dp_netdev_pmd_flush_output_packets(pmd, now);
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -3101,6 +3107,37 @@  cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
     non_atomic_ullong_add(&pmd->cycles.n[type], interval);
 }
 
+static void
+dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
+                                   struct tx_port *p, long long now)
+{
+    int tx_qid;
+    bool dynamic_txqs;
+
+    dynamic_txqs = p->port->dynamic_txqs;
+    if (dynamic_txqs) {
+        tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
+    } else {
+        tx_qid = pmd->static_tx_qid;
+    }
+
+    netdev_send(p->port->netdev, tx_qid, &p->output_pkts, true, dynamic_txqs);
+    dp_packet_batch_init(&p->output_pkts);
+}
+
+static void
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+                                   long long now)
+{
+    struct tx_port *p;
+
+    HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
+        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
+            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
+        }
+    }
+}
+
 static int
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
@@ -3113,10 +3150,13 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     dp_packet_batch_init(&batch);
     error = netdev_rxq_recv(rx, &batch);
     if (!error) {
+        long long now = time_msec();
+
         *recirc_depth_get() = 0;
 
         batch_cnt = batch.count;
-        dp_netdev_input(pmd, &batch, port_no);
+        dp_netdev_input(pmd, &batch, port_no, now);
+        dp_netdev_pmd_flush_output_packets(pmd, now);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -4481,6 +4521,7 @@  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    dp_packet_batch_init(&tx->output_pkts);
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
@@ -4901,7 +4942,8 @@  fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 static void
 dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
                   struct dp_packet_batch *packets,
-                  bool md_is_valid, odp_port_t port_no)
+                  bool md_is_valid, odp_port_t port_no,
+                  long long now)
 {
     int cnt = packets->count;
 #if !defined(__CHECKER__) && !defined(_WIN32)
@@ -4913,7 +4955,6 @@  dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
     OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
         struct netdev_flow_key keys[PKT_ARRAY_SIZE];
     struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
-    long long now = time_msec();
     size_t n_batches;
     odp_port_t in_port;
 
@@ -4949,16 +4990,16 @@  dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
 static void
 dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
                 struct dp_packet_batch *packets,
-                odp_port_t port_no)
+                odp_port_t port_no, long long now)
 {
-    dp_netdev_input__(pmd, packets, false, port_no);
+    dp_netdev_input__(pmd, packets, false, port_no, now);
 }
 
 static void
 dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
-                      struct dp_packet_batch *packets)
+                      struct dp_packet_batch *packets, long long now)
 {
-    dp_netdev_input__(pmd, packets, true, 0);
+    dp_netdev_input__(pmd, packets, true, 0, now);
 }
 
 struct dp_netdev_execute_aux {
@@ -5136,18 +5177,37 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_OUTPUT:
         p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
         if (OVS_LIKELY(p)) {
-            int tx_qid;
-            bool dynamic_txqs;
+            struct dp_packet *packet;
+            struct dp_packet_batch out;
 
-            dynamic_txqs = p->port->dynamic_txqs;
-            if (dynamic_txqs) {
-                tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
-            } else {
-                tx_qid = pmd->static_tx_qid;
+            if (!may_steal) {
+                dp_packet_batch_clone(&out, packets_);
+                dp_packet_batch_reset_cutlen(packets_);
+                packets_ = &out;
+            }
+            dp_packet_batch_apply_cutlen(packets_);
+
+#ifdef DPDK_NETDEV
+            if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
+                             && packets_->packets[0]->source
+                                != p->output_pkts.packets[0]->source)) {
+                /* XXX: netdev-dpdk assumes that all packets in a single
+                 *      outptut batch has the same source. Flush here to
+                 *      avoid memory access issues. */
+                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
+            }
+#endif
+
+            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
+                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
+                /* Some packets was generated while input batch processing.
+                 * Flush here to avoid overflow. */
+                dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
 
-            netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
-                        dynamic_txqs);
+            DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+                dp_packet_batch_add(&p->output_pkts, packet);
+            }
             return;
         }
         break;
@@ -5188,7 +5248,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 }
 
                 (*depth)++;
-                dp_netdev_recirculate(pmd, packets_);
+                dp_netdev_recirculate(pmd, packets_, now);
                 (*depth)--;
                 return;
             }
@@ -5253,7 +5313,7 @@  dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
             }
 
             (*depth)++;
-            dp_netdev_recirculate(pmd, packets_);
+            dp_netdev_recirculate(pmd, packets_, now);
             (*depth)--;
 
             return;