[ovs-dev,RFC,2/2] ingress scheduling: Provide per interface ingress priority

Message ID 1518814401-21311-3-git-send-email-billy.o.mahony@intel.com
State Superseded
Delegated to: Ian Stokes
Headers show
Series
  • Ingress Scheduling
Related show

Commit Message

O Mahony, Billy Feb. 16, 2018, 8:53 p.m.
Allow configuration to specify an ingress priority for interfaces.
Modify ovs-netdev datapath to act on this configuration so that packets
on interfaces with a higher priority will tend be processed ahead of
packets on lower priority interfaces.  This protects traffic on higher
priority interfaces from loss and latency as PMDs get overloaded.

Signed-off-by: Billy O'Mahony <billy.o.mahony@intel.com>
---
 include/openvswitch/ofp-parse.h |  3 ++
 lib/dpif-netdev.c               | 47 +++++++++++++++++++++---------
 lib/netdev-bsd.c                |  1 +
 lib/netdev-dpdk.c               | 64 +++++++++++++++++++++++++++++++++++++++--
 lib/netdev-dummy.c              |  1 +
 lib/netdev-linux.c              |  1 +
 lib/netdev-provider.h           | 11 ++++++-
 lib/netdev-vport.c              |  1 +
 lib/netdev.c                    | 23 +++++++++++++++
 lib/netdev.h                    |  2 ++
 vswitchd/bridge.c               |  2 ++
 11 files changed, 140 insertions(+), 16 deletions(-)

Comments

Ilya Maximets Feb. 20, 2018, 3:09 p.m. | #1
Not a full review.
Two general comments inline.

> Allow configuration to specify an ingress priority for interfaces.
> Modify ovs-netdev datapath to act on this configuration so that packets
> on interfaces with a higher priority will tend be processed ahead of
> packets on lower priority interfaces.  This protects traffic on higher
> priority interfaces from loss and latency as PMDs get overloaded.
> 
> Signed-off-by: Billy O'Mahony <billy.o.mahony at intel.com>
> ---
>  include/openvswitch/ofp-parse.h |  3 ++
>  lib/dpif-netdev.c               | 47 +++++++++++++++++++++---------
>  lib/netdev-bsd.c                |  1 +
>  lib/netdev-dpdk.c               | 64 +++++++++++++++++++++++++++++++++++++++--
>  lib/netdev-dummy.c              |  1 +
>  lib/netdev-linux.c              |  1 +
>  lib/netdev-provider.h           | 11 ++++++-
>  lib/netdev-vport.c              |  1 +
>  lib/netdev.c                    | 23 +++++++++++++++
>  lib/netdev.h                    |  2 ++
>  vswitchd/bridge.c               |  2 ++
>  11 files changed, 140 insertions(+), 16 deletions(-)
> 
> diff --git a/include/openvswitch/ofp-parse.h b/include/openvswitch/ofp-parse.h
> index 3fdd468..d77ab8f 100644
> --- a/include/openvswitch/ofp-parse.h
> +++ b/include/openvswitch/ofp-parse.h
> @@ -33,6 +33,9 @@ extern "C" {
>  struct match;
>  struct mf_field;
>  struct ofputil_port_map;
> +struct tun_table;
> +struct flow_wildcards;
> +struct ofputil_port_map;
>  
>  struct ofp_protocol {
>      const char *name;
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index d49c986..89d8229 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -42,6 +42,7 @@
>  #include "dpif.h"
>  #include "dpif-netdev-perf.h"
>  #include "dpif-provider.h"
> +#include "netdev-provider.h"
>  #include "dummy.h"
>  #include "fat-rwlock.h"
>  #include "flow.h"
> @@ -487,6 +488,7 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
>  struct polled_queue {
>      struct dp_netdev_rxq *rxq;
>      odp_port_t port_no;
> +    uint8_t priority;
>  };
>  
>  /* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
> @@ -626,6 +628,12 @@ struct dpif_netdev {
>      uint64_t last_port_seq;
>  };
>  
> +static void
> +dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
> +                           struct dp_netdev_rxq *rxq,
> +                           odp_port_t port_no,
> +                           unsigned int *rxd_cnt,
> +                           unsigned int *txd_cnt);
>  static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
>                                struct dp_netdev_port **portp)
>      OVS_REQUIRES(dp->port_mutex);
> @@ -3259,15 +3267,16 @@ dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
>      return output_cnt;
>  }
>  
> -static int
> +static void
>  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                             struct dp_netdev_rxq *rxq,
> -                           odp_port_t port_no)
> +                           odp_port_t port_no,
> +                           unsigned int *rxd_cnt,
> +                           unsigned int *txd_cnt)
>  {
>      struct dp_packet_batch batch;
>      struct cycle_timer timer;
>      int error;
> -    int batch_cnt = 0, output_cnt = 0;
>      uint64_t cycles;
>  
>      /* Measure duration for polling and processing rx burst. */
> @@ -3279,17 +3288,17 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>      error = netdev_rxq_recv(rxq->rx, &batch);
>      if (!error) {
>          /* At least one packet received. */
> +        *rxd_cnt = batch.count;
>          *recirc_depth_get() = 0;
>          pmd_thread_ctx_time_update(pmd);
>  
> -        batch_cnt = batch.count;
>          dp_netdev_input(pmd, &batch, port_no);
>  
>          /* Assign processing cycles to rx queue. */
>          cycles = cycle_timer_stop(&pmd->perf_stats, &timer);
>          dp_netdev_rxq_add_cycles(rxq, RXQ_CYCLES_PROC_CURR, cycles);
>  
> -        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
> +        *txd_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
>      } else {
>          /* Discard cycles. */
>          cycle_timer_stop(&pmd->perf_stats, &timer);
> @@ -3299,11 +3308,11 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>              VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
>                      netdev_rxq_get_name(rxq->rx), ovs_strerror(error));
>          }
> +        *txd_cnt = 0;
>      }
>  
>      pmd->ctx.last_rxq = NULL;
>  
> -    return batch_cnt + output_cnt;
>  }
>  
>  static struct tx_port *
> @@ -3935,11 +3944,16 @@ dpif_netdev_run(struct dpif *dpif)
>          HMAP_FOR_EACH (port, node, &dp->ports) {
>              if (!netdev_is_pmd(port->netdev)) {
>                  int i;
> +                unsigned int rxd_cnt;
> +                unsigned int txd_cnt;
>  
>                  for (i = 0; i < port->n_rxq; i++) {
> -                    if (dp_netdev_process_rxq_port(non_pmd,
> -                                                   &port->rxqs[i],
> -                                                   port->port_no)) {
> +                    dp_netdev_process_rxq_port(non_pmd,
> +                                               &port->rxqs[i],
> +                                               port->port_no,
> +                                               &rxd_cnt,
> +                                               &txd_cnt);
> +                    if (rxd_cnt) {
>                          need_to_flush = false;
>                      }
>                  }
> @@ -4083,6 +4097,7 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
>      HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
>          poll_list[i].rxq = poll->rxq;
>          poll_list[i].port_no = poll->rxq->port->port_no;
> +        poll_list[i].priority = poll->rxq->rx->netdev->ingress_prio;
>          i++;
>      }
>  
> @@ -4104,7 +4119,6 @@ pmd_thread_main(void *f_)
>      bool exiting;
>      int poll_cnt;
>      int i;
> -    int process_packets = 0;
>  
>      poll_list = NULL;
>  
> @@ -4142,10 +4156,17 @@ reload:
>  
>          pmd_perf_start_iteration(s);
>          for (i = 0; i < poll_cnt; i++) {
> -            process_packets =
> +            unsigned int priority_max_reads = 1 + poll_list[i].priority * 10;
> +            unsigned int rxd_cnt;
> +            unsigned int txd_cnt;
> +
> +            do {
>                  dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
> -                                           poll_list[i].port_no);
> -            iter_packets += process_packets;
> +                                           poll_list[i].port_no,
> +                                           &rxd_cnt, &txd_cnt);
> +                iter_packets = iter_packets + rxd_cnt + txd_cnt;
> +                priority_max_reads--;
> +            } while (rxd_cnt >= NETDEV_MAX_BURST && priority_max_reads);
>          }
>  
>          if (!iter_packets) {
> diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
> index 05974c1..ddfbdf2 100644
> --- a/lib/netdev-bsd.c
> +++ b/lib/netdev-bsd.c
> @@ -1506,6 +1506,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off,
>      netdev_bsd_get_etheraddr,                        \
>      netdev_bsd_get_mtu,                              \
>      NULL, /* set_mtu */                              \
> +    NULL, /* set_ingress_sched */                    \
>      netdev_bsd_get_ifindex,                          \
>      netdev_bsd_get_carrier,                          \
>      NULL, /* get_carrier_resets */                   \
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 94fb163..a17d021 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -39,6 +39,8 @@
>  #include <rte_vhost.h>
>  #include <rte_version.h>
>  
> +#include <openvswitch/ofp-parse.h>
> +#include <openvswitch/ofp-util.h>
>  #include "dirs.h"
>  #include "dp-packet.h"
>  #include "dpdk.h"
> @@ -88,6 +90,7 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
>                                               + sizeof(struct dp_packet) \
>                                               + RTE_PKTMBUF_HEADROOM),   \
>                                               RTE_CACHE_LINE_SIZE)
> +#define MAX_PORT_PRIO               3
>  #define NETDEV_DPDK_MBUF_ALIGN      1024
>  #define NETDEV_DPDK_MAX_PKT_LEN     9728
>  
> @@ -442,6 +445,10 @@ struct netdev_dpdk {
>          int rte_xstats_ids_size;
>          uint64_t *rte_xstats_ids;
>      );
> +
> +    /* Ingress Scheduling config & state. */
> +    uint8_t ingress_prio;
> +    bool ingress_sched_changed;

Above variables must be protected somehow.
Possibly, by the 'dev->mutex'.

>  };
>  
>  struct netdev_rxq_dpdk {
> @@ -774,6 +781,13 @@ dpdk_eth_flow_ctrl_setup(struct netdev_dpdk *dev) OVS_REQUIRES(dev->mutex)
>      }
>  }
>  
> +static void
> +dpdk_apply_port_prioritization(struct netdev_dpdk *dev)
> +{
> +    dev->up.ingress_prio = dev->ingress_prio;
> +    dev->ingress_sched_changed = false;
> +}
> +
>  static int
>  dpdk_eth_dev_init(struct netdev_dpdk *dev)
>      OVS_REQUIRES(dev->mutex)
> @@ -808,6 +822,8 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev)
>          return -diag;
>      }
>  
> +    dpdk_apply_port_prioritization(dev);
> +
>      diag = rte_eth_dev_start(dev->port_id);
>      if (diag) {
>          VLOG_ERR("Interface %s start error: %s", dev->up.name,
> @@ -914,6 +930,8 @@ common_construct(struct netdev *netdev, dpdk_port_t port_no,
>      dev->requested_rxq_size = NIC_PORT_DEFAULT_RXQ_SIZE;
>      dev->requested_txq_size = NIC_PORT_DEFAULT_TXQ_SIZE;
>  
> +    dev->ingress_prio = 0;
> +    dev->ingress_sched_changed = false;
>      /* Initialize the flow control to NULL */
>      memset(&dev->fc_conf, 0, sizeof dev->fc_conf);
>  
> @@ -2213,6 +2231,43 @@ netdev_dpdk_set_mtu(struct netdev *netdev, int mtu)
>  }
>  
>  static int
> +netdev_dpdk_set_ingress_sched(struct netdev *netdev,
> +                              const struct smap *ingress_sched_smap)
> +{

I see nothing DPDK-specific here.
Can we just move this code to 'netdev_set_ingress_sched()' and provide
ingres scheduling for all the port types at once?
There are people here in mail-list trying to implement netdev-netmap.
It'll be cool if they will have this functionality out-of-the-box without
copying this code.

> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> +    char *mallocd_err_str;
> +    uint8_t port_prio;
> +    const char *port_prio_str = smap_get(ingress_sched_smap, "port_prio");
> +
> +    if (port_prio_str) {
> +        mallocd_err_str = str_to_u8(port_prio_str, "port_prio",
> +                                    &port_prio);
> +        if (mallocd_err_str) {
> +            VLOG_ERR ("%s while parsing Interface ingress_sched:prio for"
> +                      " '%s'", mallocd_err_str, netdev->name);
> +            free(mallocd_err_str);
> +            mallocd_err_str = NULL;
> +            return EINVAL;
> +        }
> +    } else {
> +        port_prio = 0;
> +    }
> +
> +    if (port_prio > MAX_PORT_PRIO) {
> +        port_prio = MAX_PORT_PRIO;
> +        VLOG_WARN ("Requested port_prio for '%s' exceeds max. Limiting to %d.",
> +                   netdev->name, MAX_PORT_PRIO);
> +    }
> +
> +    if (port_prio != dev->ingress_prio) {
> +        dev->ingress_prio = port_prio;
> +        dev->ingress_sched_changed = true;
> +        netdev_request_reconfigure(netdev);
> +    }
> +    return 0;
> +}
> +
> +static int
>  netdev_dpdk_get_carrier(const struct netdev *netdev, bool *carrier);
>  
>  static int
> @@ -3548,9 +3603,9 @@ netdev_dpdk_reconfigure(struct netdev *netdev)
>          && dev->mtu == dev->requested_mtu
>          && dev->rxq_size == dev->requested_rxq_size
>          && dev->txq_size == dev->requested_txq_size
> -        && dev->socket_id == dev->requested_socket_id) {
> +        && dev->socket_id == dev->requested_socket_id
> +        && !dev->ingress_sched_changed) {
>          /* Reconfiguration is unnecessary */
> -
>          goto out;
>      }
>  
> @@ -3637,6 +3692,10 @@ netdev_dpdk_vhost_client_reconfigure(struct netdev *netdev)
>  
>      ovs_mutex_lock(&dev->mutex);
>  
> +    if (dev->ingress_sched_changed) {
> +        dpdk_apply_port_prioritization(dev);
> +    }
> +
>      /* Configure vHost client mode if requested and if the following criteria
>       * are met:
>       *  1. Device hasn't been registered yet.
> @@ -3742,6 +3801,7 @@ unlock:
>      netdev_dpdk_get_etheraddr,                                \
>      netdev_dpdk_get_mtu,                                      \
>      netdev_dpdk_set_mtu,                                      \
> +    netdev_dpdk_set_ingress_sched,                            \
>      netdev_dpdk_get_ifindex,                                  \
>      GET_CARRIER,                                              \
>      netdev_dpdk_get_carrier_resets,                           \
> diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
> index 0d05759..d2ae6e3 100644
> --- a/lib/netdev-dummy.c
> +++ b/lib/netdev-dummy.c
> @@ -1415,6 +1415,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
>      netdev_dummy_get_etheraddr,                                 \
>      netdev_dummy_get_mtu,                                       \
>      netdev_dummy_set_mtu,                                       \
> +    NULL,                       /* set_ingress_sched */         \
>      netdev_dummy_get_ifindex,                                   \
>      NULL,                       /* get_carrier */               \
>      NULL,                       /* get_carrier_resets */        \
> diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
> index 4e0473c..997dc24 100644
> --- a/lib/netdev-linux.c
> +++ b/lib/netdev-linux.c
> @@ -2867,6 +2867,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
>      netdev_linux_get_etheraddr,                                 \
>      netdev_linux_get_mtu,                                       \
>      netdev_linux_set_mtu,                                       \
> +    NULL,                       /* set_ingress_sched */         \
>      netdev_linux_get_ifindex,                                   \
>      netdev_linux_get_carrier,                                   \
>      netdev_linux_get_carrier_resets,                            \
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index 25bd671..9f018e9 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -78,7 +78,8 @@ struct netdev {
>       * modify them. */
>      int n_txq;
>      int n_rxq;
> -    struct shash_node *node;            /* Pointer to element in global map. */
> +    uint8_t ingress_prio;             /* 0 lowest to 3 highest. Default 0. */
> +    struct shash_node *node;          /* Pointer to element in global map. */
>      struct ovs_list saved_flags_list; /* Contains "struct netdev_saved_flags". */
>  };
>  
> @@ -412,6 +413,14 @@ struct netdev_class {
>       * null if it would always return EOPNOTSUPP. */
>      int (*set_mtu)(struct netdev *netdev, int mtu);
>  
> +    /* Sets 'netdev''s ingress scheduling policy.
> +     *
> +     * If 'netdev' does not support the specified policy then this function
> +     * should return EOPNOTSUPP.  This function may be set to null if it would
> +     * always return EOPNOTSUPP. */
> +    int (*set_ingress_sched)(struct netdev *netdev,
> +         const struct smap *ingress_sched_smap);
> +
>      /* Returns the ifindex of 'netdev', if successful, as a positive number.
>       * On failure, returns a negative errno value.
>       *
> diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
> index 52aa12d..b77b28f 100644
> --- a/lib/netdev-vport.c
> +++ b/lib/netdev-vport.c
> @@ -902,6 +902,7 @@ netdev_vport_get_ifindex(const struct netdev *netdev_)
>      netdev_vport_get_etheraddr,                             \
>      NULL,                       /* get_mtu */               \
>      NULL,                       /* set_mtu */               \
> +    NULL,                       /* set_ingress_sched */     \
>      GET_IFINDEX,                                            \
>      NULL,                       /* get_carrier */           \
>      NULL,                       /* get_carrier_resets */    \
> diff --git a/lib/netdev.c b/lib/netdev.c
> index be05dc6..2354723 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -418,6 +418,7 @@ netdev_open(const char *name, const char *type, struct netdev **netdevp)
>                  /* By default enable one tx and rx queue per netdev. */
>                  netdev->n_txq = netdev->netdev_class->send ? 1 : 0;
>                  netdev->n_rxq = netdev->netdev_class->rxq_alloc ? 1 : 0;
> +                netdev->ingress_prio = 0;
>  
>                  ovs_list_init(&netdev->saved_flags_list);
>  
> @@ -977,6 +978,28 @@ netdev_mtu_is_user_config(struct netdev *netdev)
>      return netdev->mtu_user_config;
>  }
>  
> +/* Sets the Ingress Scheduling policy of 'netdev'.
> + *
> + * If successful, returns 0.  Returns EOPNOTSUPP if 'netdev' does not support
> + * the specified policy */
> +int
> +netdev_set_ingress_sched(struct netdev *netdev,
> +                         const struct smap *ingress_sched_smap)
> +{
> +    const struct netdev_class *class = netdev->netdev_class;
> +    int error;
> +
> +    error = class->set_ingress_sched ?
> +        class->set_ingress_sched(netdev, ingress_sched_smap) : EOPNOTSUPP;
> +    if (error && error != EOPNOTSUPP) {
> +        VLOG_DBG_RL(&rl, "failed to set ingress scheduling for network " \
> +                    "device %s: %s",
> +                    netdev_get_name(netdev), ovs_strerror(error));
> +    }
> +
> +    return error;
> +}
> +
>  /* Returns the ifindex of 'netdev', if successful, as a positive number.  On
>   * failure, returns a negative errno value.
>   *
> diff --git a/lib/netdev.h b/lib/netdev.h
> index ff1b604..d49ba91 100644
> --- a/lib/netdev.h
> +++ b/lib/netdev.h
> @@ -164,6 +164,8 @@ int netdev_get_mtu(const struct netdev *, int *mtup);
>  int netdev_set_mtu(struct netdev *, int mtu);
>  void netdev_mtu_user_config(struct netdev *, bool);
>  bool netdev_mtu_is_user_config(struct netdev *);
> +int netdev_set_ingress_sched(struct netdev *,
> +                             const struct smap *ingress_sched_smap);
>  int netdev_get_ifindex(const struct netdev *);
>  int netdev_set_tx_multiq(struct netdev *, unsigned int n_txq);
>  enum netdev_pt_mode netdev_get_pt_mode(const struct netdev *);
> diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
> index cf9c79f..7b86452 100644
> --- a/vswitchd/bridge.c
> +++ b/vswitchd/bridge.c
> @@ -830,6 +830,7 @@ bridge_delete_or_reconfigure_ports(struct bridge *br)
>          }
>  
>          iface_set_netdev_mtu(iface->cfg, iface->netdev);
> +        netdev_set_ingress_sched(iface->netdev, &iface->cfg->ingress_sched);
>  
>          /* If the requested OpenFlow port for 'iface' changed, and it's not
>           * already the correct port, then we might want to temporarily delete
> @@ -1793,6 +1794,7 @@ iface_do_create(const struct bridge *br,
>      }
>  
>      iface_set_netdev_mtu(iface_cfg, netdev);
> +    netdev_set_ingress_sched(netdev, &iface_cfg->ingress_sched);
>  
>      *ofp_portp = iface_pick_ofport(iface_cfg);
>      error = ofproto_port_add(br->ofproto, netdev, ofp_portp);
> -- 
> 2.7.4

Patch

diff --git a/include/openvswitch/ofp-parse.h b/include/openvswitch/ofp-parse.h
index 3fdd468..d77ab8f 100644
--- a/include/openvswitch/ofp-parse.h
+++ b/include/openvswitch/ofp-parse.h
@@ -33,6 +33,9 @@  extern "C" {
 struct match;
 struct mf_field;
 struct ofputil_port_map;
+struct tun_table;
+struct flow_wildcards;
+struct ofputil_port_map;
 
 struct ofp_protocol {
     const char *name;
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index d49c986..89d8229 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -42,6 +42,7 @@ 
 #include "dpif.h"
 #include "dpif-netdev-perf.h"
 #include "dpif-provider.h"
+#include "netdev-provider.h"
 #include "dummy.h"
 #include "fat-rwlock.h"
 #include "flow.h"
@@ -487,6 +488,7 @@  static void dp_netdev_actions_free(struct dp_netdev_actions *);
 struct polled_queue {
     struct dp_netdev_rxq *rxq;
     odp_port_t port_no;
+    uint8_t priority;
 };
 
 /* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
@@ -626,6 +628,12 @@  struct dpif_netdev {
     uint64_t last_port_seq;
 };
 
+static void
+dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
+                           struct dp_netdev_rxq *rxq,
+                           odp_port_t port_no,
+                           unsigned int *rxd_cnt,
+                           unsigned int *txd_cnt);
 static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
                               struct dp_netdev_port **portp)
     OVS_REQUIRES(dp->port_mutex);
@@ -3259,15 +3267,16 @@  dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
     return output_cnt;
 }
 
-static int
+static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct dp_netdev_rxq *rxq,
-                           odp_port_t port_no)
+                           odp_port_t port_no,
+                           unsigned int *rxd_cnt,
+                           unsigned int *txd_cnt)
 {
     struct dp_packet_batch batch;
     struct cycle_timer timer;
     int error;
-    int batch_cnt = 0, output_cnt = 0;
     uint64_t cycles;
 
     /* Measure duration for polling and processing rx burst. */
@@ -3279,17 +3288,17 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     error = netdev_rxq_recv(rxq->rx, &batch);
     if (!error) {
         /* At least one packet received. */
+        *rxd_cnt = batch.count;
         *recirc_depth_get() = 0;
         pmd_thread_ctx_time_update(pmd);
 
-        batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no);
 
         /* Assign processing cycles to rx queue. */
         cycles = cycle_timer_stop(&pmd->perf_stats, &timer);
         dp_netdev_rxq_add_cycles(rxq, RXQ_CYCLES_PROC_CURR, cycles);
 
-        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
+        *txd_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
     } else {
         /* Discard cycles. */
         cycle_timer_stop(&pmd->perf_stats, &timer);
@@ -3299,11 +3308,11 @@  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
             VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
                     netdev_rxq_get_name(rxq->rx), ovs_strerror(error));
         }
+        *txd_cnt = 0;
     }
 
     pmd->ctx.last_rxq = NULL;
 
-    return batch_cnt + output_cnt;
 }
 
 static struct tx_port *
@@ -3935,11 +3944,16 @@  dpif_netdev_run(struct dpif *dpif)
         HMAP_FOR_EACH (port, node, &dp->ports) {
             if (!netdev_is_pmd(port->netdev)) {
                 int i;
+                unsigned int rxd_cnt;
+                unsigned int txd_cnt;
 
                 for (i = 0; i < port->n_rxq; i++) {
-                    if (dp_netdev_process_rxq_port(non_pmd,
-                                                   &port->rxqs[i],
-                                                   port->port_no)) {
+                    dp_netdev_process_rxq_port(non_pmd,
+                                               &port->rxqs[i],
+                                               port->port_no,
+                                               &rxd_cnt,
+                                               &txd_cnt);
+                    if (rxd_cnt) {
                         need_to_flush = false;
                     }
                 }
@@ -4083,6 +4097,7 @@  pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
     HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
         poll_list[i].rxq = poll->rxq;
         poll_list[i].port_no = poll->rxq->port->port_no;
+        poll_list[i].priority = poll->rxq->rx->netdev->ingress_prio;
         i++;
     }
 
@@ -4104,7 +4119,6 @@  pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
-    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -4142,10 +4156,17 @@  reload:
 
         pmd_perf_start_iteration(s);
         for (i = 0; i < poll_cnt; i++) {
-            process_packets =
+            unsigned int priority_max_reads = 1 + poll_list[i].priority * 10;
+            unsigned int rxd_cnt;
+            unsigned int txd_cnt;
+
+            do {
                 dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
-                                           poll_list[i].port_no);
-            iter_packets += process_packets;
+                                           poll_list[i].port_no,
+                                           &rxd_cnt, &txd_cnt);
+                iter_packets = iter_packets + rxd_cnt + txd_cnt;
+                priority_max_reads--;
+            } while (rxd_cnt >= NETDEV_MAX_BURST && priority_max_reads);
         }
 
         if (!iter_packets) {
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index 05974c1..ddfbdf2 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -1506,6 +1506,7 @@  netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off,
     netdev_bsd_get_etheraddr,                        \
     netdev_bsd_get_mtu,                              \
     NULL, /* set_mtu */                              \
+    NULL, /* set_ingress_sched */                    \
     netdev_bsd_get_ifindex,                          \
     netdev_bsd_get_carrier,                          \
     NULL, /* get_carrier_resets */                   \
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 94fb163..a17d021 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -39,6 +39,8 @@ 
 #include <rte_vhost.h>
 #include <rte_version.h>
 
+#include <openvswitch/ofp-parse.h>
+#include <openvswitch/ofp-util.h>
 #include "dirs.h"
 #include "dp-packet.h"
 #include "dpdk.h"
@@ -88,6 +90,7 @@  static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
                                              + sizeof(struct dp_packet) \
                                              + RTE_PKTMBUF_HEADROOM),   \
                                              RTE_CACHE_LINE_SIZE)
+#define MAX_PORT_PRIO               3
 #define NETDEV_DPDK_MBUF_ALIGN      1024
 #define NETDEV_DPDK_MAX_PKT_LEN     9728
 
@@ -442,6 +445,10 @@  struct netdev_dpdk {
         int rte_xstats_ids_size;
         uint64_t *rte_xstats_ids;
     );
+
+    /* Ingress Scheduling config & state. */
+    uint8_t ingress_prio;
+    bool ingress_sched_changed;
 };
 
 struct netdev_rxq_dpdk {
@@ -774,6 +781,13 @@  dpdk_eth_flow_ctrl_setup(struct netdev_dpdk *dev) OVS_REQUIRES(dev->mutex)
     }
 }
 
+static void
+dpdk_apply_port_prioritization(struct netdev_dpdk *dev)
+{
+    dev->up.ingress_prio = dev->ingress_prio;
+    dev->ingress_sched_changed = false;
+}
+
 static int
 dpdk_eth_dev_init(struct netdev_dpdk *dev)
     OVS_REQUIRES(dev->mutex)
@@ -808,6 +822,8 @@  dpdk_eth_dev_init(struct netdev_dpdk *dev)
         return -diag;
     }
 
+    dpdk_apply_port_prioritization(dev);
+
     diag = rte_eth_dev_start(dev->port_id);
     if (diag) {
         VLOG_ERR("Interface %s start error: %s", dev->up.name,
@@ -914,6 +930,8 @@  common_construct(struct netdev *netdev, dpdk_port_t port_no,
     dev->requested_rxq_size = NIC_PORT_DEFAULT_RXQ_SIZE;
     dev->requested_txq_size = NIC_PORT_DEFAULT_TXQ_SIZE;
 
+    dev->ingress_prio = 0;
+    dev->ingress_sched_changed = false;
     /* Initialize the flow control to NULL */
     memset(&dev->fc_conf, 0, sizeof dev->fc_conf);
 
@@ -2213,6 +2231,43 @@  netdev_dpdk_set_mtu(struct netdev *netdev, int mtu)
 }
 
 static int
+netdev_dpdk_set_ingress_sched(struct netdev *netdev,
+                              const struct smap *ingress_sched_smap)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    char *mallocd_err_str;
+    uint8_t port_prio;
+    const char *port_prio_str = smap_get(ingress_sched_smap, "port_prio");
+
+    if (port_prio_str) {
+        mallocd_err_str = str_to_u8(port_prio_str, "port_prio",
+                                    &port_prio);
+        if (mallocd_err_str) {
+            VLOG_ERR ("%s while parsing Interface ingress_sched:prio for"
+                      " '%s'", mallocd_err_str, netdev->name);
+            free(mallocd_err_str);
+            mallocd_err_str = NULL;
+            return EINVAL;
+        }
+    } else {
+        port_prio = 0;
+    }
+
+    if (port_prio > MAX_PORT_PRIO) {
+        port_prio = MAX_PORT_PRIO;
+        VLOG_WARN ("Requested port_prio for '%s' exceeds max. Limiting to %d.",
+                   netdev->name, MAX_PORT_PRIO);
+    }
+
+    if (port_prio != dev->ingress_prio) {
+        dev->ingress_prio = port_prio;
+        dev->ingress_sched_changed = true;
+        netdev_request_reconfigure(netdev);
+    }
+    return 0;
+}
+
+static int
 netdev_dpdk_get_carrier(const struct netdev *netdev, bool *carrier);
 
 static int
@@ -3548,9 +3603,9 @@  netdev_dpdk_reconfigure(struct netdev *netdev)
         && dev->mtu == dev->requested_mtu
         && dev->rxq_size == dev->requested_rxq_size
         && dev->txq_size == dev->requested_txq_size
-        && dev->socket_id == dev->requested_socket_id) {
+        && dev->socket_id == dev->requested_socket_id
+        && !dev->ingress_sched_changed) {
         /* Reconfiguration is unnecessary */
-
         goto out;
     }
 
@@ -3637,6 +3692,10 @@  netdev_dpdk_vhost_client_reconfigure(struct netdev *netdev)
 
     ovs_mutex_lock(&dev->mutex);
 
+    if (dev->ingress_sched_changed) {
+        dpdk_apply_port_prioritization(dev);
+    }
+
     /* Configure vHost client mode if requested and if the following criteria
      * are met:
      *  1. Device hasn't been registered yet.
@@ -3742,6 +3801,7 @@  unlock:
     netdev_dpdk_get_etheraddr,                                \
     netdev_dpdk_get_mtu,                                      \
     netdev_dpdk_set_mtu,                                      \
+    netdev_dpdk_set_ingress_sched,                            \
     netdev_dpdk_get_ifindex,                                  \
     GET_CARRIER,                                              \
     netdev_dpdk_get_carrier_resets,                           \
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 0d05759..d2ae6e3 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1415,6 +1415,7 @@  netdev_dummy_update_flags(struct netdev *netdev_,
     netdev_dummy_get_etheraddr,                                 \
     netdev_dummy_get_mtu,                                       \
     netdev_dummy_set_mtu,                                       \
+    NULL,                       /* set_ingress_sched */         \
     netdev_dummy_get_ifindex,                                   \
     NULL,                       /* get_carrier */               \
     NULL,                       /* get_carrier_resets */        \
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 4e0473c..997dc24 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -2867,6 +2867,7 @@  netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
     netdev_linux_get_etheraddr,                                 \
     netdev_linux_get_mtu,                                       \
     netdev_linux_set_mtu,                                       \
+    NULL,                       /* set_ingress_sched */         \
     netdev_linux_get_ifindex,                                   \
     netdev_linux_get_carrier,                                   \
     netdev_linux_get_carrier_resets,                            \
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 25bd671..9f018e9 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -78,7 +78,8 @@  struct netdev {
      * modify them. */
     int n_txq;
     int n_rxq;
-    struct shash_node *node;            /* Pointer to element in global map. */
+    uint8_t ingress_prio;             /* 0 lowest to 3 highest. Default 0. */
+    struct shash_node *node;          /* Pointer to element in global map. */
     struct ovs_list saved_flags_list; /* Contains "struct netdev_saved_flags". */
 };
 
@@ -412,6 +413,14 @@  struct netdev_class {
      * null if it would always return EOPNOTSUPP. */
     int (*set_mtu)(struct netdev *netdev, int mtu);
 
+    /* Sets 'netdev''s ingress scheduling policy.
+     *
+     * If 'netdev' does not support the specified policy then this function
+     * should return EOPNOTSUPP.  This function may be set to null if it would
+     * always return EOPNOTSUPP. */
+    int (*set_ingress_sched)(struct netdev *netdev,
+         const struct smap *ingress_sched_smap);
+
     /* Returns the ifindex of 'netdev', if successful, as a positive number.
      * On failure, returns a negative errno value.
      *
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 52aa12d..b77b28f 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -902,6 +902,7 @@  netdev_vport_get_ifindex(const struct netdev *netdev_)
     netdev_vport_get_etheraddr,                             \
     NULL,                       /* get_mtu */               \
     NULL,                       /* set_mtu */               \
+    NULL,                       /* set_ingress_sched */     \
     GET_IFINDEX,                                            \
     NULL,                       /* get_carrier */           \
     NULL,                       /* get_carrier_resets */    \
diff --git a/lib/netdev.c b/lib/netdev.c
index be05dc6..2354723 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -418,6 +418,7 @@  netdev_open(const char *name, const char *type, struct netdev **netdevp)
                 /* By default enable one tx and rx queue per netdev. */
                 netdev->n_txq = netdev->netdev_class->send ? 1 : 0;
                 netdev->n_rxq = netdev->netdev_class->rxq_alloc ? 1 : 0;
+                netdev->ingress_prio = 0;
 
                 ovs_list_init(&netdev->saved_flags_list);
 
@@ -977,6 +978,28 @@  netdev_mtu_is_user_config(struct netdev *netdev)
     return netdev->mtu_user_config;
 }
 
+/* Sets the Ingress Scheduling policy of 'netdev'.
+ *
+ * If successful, returns 0.  Returns EOPNOTSUPP if 'netdev' does not support
+ * the specified policy */
+int
+netdev_set_ingress_sched(struct netdev *netdev,
+                         const struct smap *ingress_sched_smap)
+{
+    const struct netdev_class *class = netdev->netdev_class;
+    int error;
+
+    error = class->set_ingress_sched ?
+        class->set_ingress_sched(netdev, ingress_sched_smap) : EOPNOTSUPP;
+    if (error && error != EOPNOTSUPP) {
+        VLOG_DBG_RL(&rl, "failed to set ingress scheduling for network " \
+                    "device %s: %s",
+                    netdev_get_name(netdev), ovs_strerror(error));
+    }
+
+    return error;
+}
+
 /* Returns the ifindex of 'netdev', if successful, as a positive number.  On
  * failure, returns a negative errno value.
  *
diff --git a/lib/netdev.h b/lib/netdev.h
index ff1b604..d49ba91 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -164,6 +164,8 @@  int netdev_get_mtu(const struct netdev *, int *mtup);
 int netdev_set_mtu(struct netdev *, int mtu);
 void netdev_mtu_user_config(struct netdev *, bool);
 bool netdev_mtu_is_user_config(struct netdev *);
+int netdev_set_ingress_sched(struct netdev *,
+                             const struct smap *ingress_sched_smap);
 int netdev_get_ifindex(const struct netdev *);
 int netdev_set_tx_multiq(struct netdev *, unsigned int n_txq);
 enum netdev_pt_mode netdev_get_pt_mode(const struct netdev *);
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index cf9c79f..7b86452 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -830,6 +830,7 @@  bridge_delete_or_reconfigure_ports(struct bridge *br)
         }
 
         iface_set_netdev_mtu(iface->cfg, iface->netdev);
+        netdev_set_ingress_sched(iface->netdev, &iface->cfg->ingress_sched);
 
         /* If the requested OpenFlow port for 'iface' changed, and it's not
          * already the correct port, then we might want to temporarily delete
@@ -1793,6 +1794,7 @@  iface_do_create(const struct bridge *br,
     }
 
     iface_set_netdev_mtu(iface_cfg, netdev);
+    netdev_set_ingress_sched(netdev, &iface_cfg->ingress_sched);
 
     *ofp_portp = iface_pick_ofport(iface_cfg);
     error = ofproto_port_add(br->ofproto, netdev, ofp_portp);