@@ -498,6 +498,7 @@ struct tx_port {
int qid;
long long last_used;
struct hmap_node node;
+ struct dp_packet_batch output_pkts;
};
/* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
@@ -629,9 +630,10 @@ static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
size_t actions_len,
long long now);
static void dp_netdev_input(struct dp_netdev_pmd_thread *,
- struct dp_packet_batch *, odp_port_t port_no);
+ struct dp_packet_batch *, odp_port_t port_no,
+ long long now);
static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
- struct dp_packet_batch *);
+ struct dp_packet_batch *, long long now);
static void dp_netdev_disable_upcall(struct dp_netdev *);
static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
@@ -661,6 +663,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
struct rxq_poll *poll)
OVS_REQUIRES(pmd->port_mutex);
+static void
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+ long long now);
static void reconfigure_datapath(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
@@ -2794,6 +2799,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *pmd;
struct dp_packet_batch pp;
+ long long now = time_msec();
if (dp_packet_size(execute->packet) < ETH_HEADER_LEN ||
dp_packet_size(execute->packet) > UINT16_MAX) {
@@ -2836,8 +2842,8 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
dp_packet_batch_init_packet(&pp, execute->packet);
dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
- execute->actions, execute->actions_len,
- time_msec());
+ execute->actions, execute->actions_len, now);
+ dp_netdev_pmd_flush_output_packets(pmd, now);
if (pmd->core_id == NON_PMD_CORE_ID) {
ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -3086,6 +3092,37 @@ cycles_count_intermediate(struct dp_netdev_pmd_thread *pmd,
non_atomic_ullong_add(&pmd->cycles.n[type], interval);
}
+static void
+dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *p, long long now)
+{
+ int tx_qid;
+ bool dynamic_txqs;
+
+ dynamic_txqs = p->port->dynamic_txqs;
+ if (dynamic_txqs) {
+ tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
+ } else {
+ tx_qid = pmd->static_tx_qid;
+ }
+
+ netdev_send(p->port->netdev, tx_qid, &p->output_pkts, true, dynamic_txqs);
+ dp_packet_batch_init(&p->output_pkts);
+}
+
+static void
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+ long long now)
+{
+ struct tx_port *p;
+
+ HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
+ if (!dp_packet_batch_is_empty(&p->output_pkts)) {
+ dp_netdev_pmd_flush_output_on_port(pmd, p, now);
+ }
+ }
+}
+
static int
dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
struct netdev_rxq *rx,
@@ -3098,10 +3135,13 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
dp_packet_batch_init(&batch);
error = netdev_rxq_recv(rx, &batch);
if (!error) {
+ long long now = time_msec();
+
*recirc_depth_get() = 0;
batch_cnt = batch.count;
- dp_netdev_input(pmd, &batch, port_no);
+ dp_netdev_input(pmd, &batch, port_no, now);
+ dp_netdev_pmd_flush_output_packets(pmd, now);
} else if (error != EAGAIN && error != EOPNOTSUPP) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -4378,6 +4418,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
tx->port = port;
tx->qid = -1;
+ dp_packet_batch_init(&tx->output_pkts);
hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
pmd->need_reload = true;
@@ -4798,7 +4839,8 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
static void
dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
- bool md_is_valid, odp_port_t port_no)
+ bool md_is_valid, odp_port_t port_no,
+ long long now)
{
int cnt = packets->count;
#if !defined(__CHECKER__) && !defined(_WIN32)
@@ -4810,7 +4852,6 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
struct netdev_flow_key keys[PKT_ARRAY_SIZE];
struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
- long long now = time_msec();
size_t n_batches;
odp_port_t in_port;
@@ -4846,16 +4887,16 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
static void
dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
- odp_port_t port_no)
+ odp_port_t port_no, long long now)
{
- dp_netdev_input__(pmd, packets, false, port_no);
+ dp_netdev_input__(pmd, packets, false, port_no, now);
}
static void
dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
- struct dp_packet_batch *packets)
+ struct dp_packet_batch *packets, long long now)
{
- dp_netdev_input__(pmd, packets, true, 0);
+ dp_netdev_input__(pmd, packets, true, 0, now);
}
struct dp_netdev_execute_aux {
@@ -5033,18 +5074,37 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
case OVS_ACTION_ATTR_OUTPUT:
p = pmd_send_port_cache_lookup(pmd, nl_attr_get_odp_port(a));
if (OVS_LIKELY(p)) {
- int tx_qid;
- bool dynamic_txqs;
+ struct dp_packet *packet;
+ struct dp_packet_batch out;
- dynamic_txqs = p->port->dynamic_txqs;
- if (dynamic_txqs) {
- tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
- } else {
- tx_qid = pmd->static_tx_qid;
+ if (!may_steal) {
+ dp_packet_batch_clone(&out, packets_);
+ dp_packet_batch_reset_cutlen(packets_);
+ packets_ = &out;
+ }
+ dp_packet_batch_apply_cutlen(packets_);
+
+#ifdef DPDK_NETDEV
+ if (OVS_UNLIKELY(!dp_packet_batch_is_empty(&p->output_pkts)
+ && packets_->packets[0]->source
+ != p->output_pkts.packets[0]->source)) {
+ /* XXX: netdev-dpdk assumes that all packets in a single
+ * outptut batch has the same source. Flush here to
+ * avoid memory access issues. */
+ dp_netdev_pmd_flush_output_on_port(pmd, p, now);
+ }
+#endif
+
+ if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
+ + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
+ /* Some packets was generated while input batch processing.
+ * Flush here to avoid overflow. */
+ dp_netdev_pmd_flush_output_on_port(pmd, p, now);
}
- netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
- dynamic_txqs);
+ DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
+ dp_packet_batch_add(&p->output_pkts, packet);
+ }
return;
}
break;
@@ -5085,7 +5145,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
}
(*depth)++;
- dp_netdev_recirculate(pmd, packets_);
+ dp_netdev_recirculate(pmd, packets_, now);
(*depth)--;
return;
}
@@ -5150,7 +5210,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
}
(*depth)++;
- dp_netdev_recirculate(pmd, packets_);
+ dp_netdev_recirculate(pmd, packets_, now);
(*depth)--;
return;
While processing incoming batch of packets they are scattered across many per-flow batches and sent separately. This becomes an issue while using more than a few flows. For example if we have balanced-tcp OvS bonding with 2 ports there will be 256 datapath internal flows for each dp_hash pattern. This will lead to scattering of a single recieved batch across all of that 256 per-flow batches and invoking send for each packet separately. This behaviour greatly degrades overall performance of netdev_send because of inability to use advantages of vectorized transmit functions. But the half (if 2 ports in bonding) of datapath flows will have the same output actions. This means that we can collect them in a single place back and send at once using single call to netdev_send. This patch introduces per-port packet batch for output packets for that purpose. 'output_pkts' batch is thread local and located in send port cache. Signed-off-by: Ilya Maximets <i.maximets@samsung.com> --- lib/dpif-netdev.c | 104 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 22 deletions(-)