diff mbox series

[ovs-dev,1/5] dpif-netdev: Rework rxq scheduling code.

Message ID 20210604211856.915563-2-ktraynor@redhat.com
State New
Headers show
Series Rxq scheduling updates. | expand

Commit Message

Kevin Traynor June 4, 2021, 9:18 p.m. UTC
This reworks the current rxq scheduling code to break it into more
generic and reusable pieces.

The behaviour does not change from a user perspective, except the logs
are updated to be more consistent.

From an implementation view, there are some changes with mind to adding
functionality and reuse in later patches.

The high level reusable functions added in this patch are:
- Generate a list of current numas and pmds
- Perform rxq scheduling into the list
- Effect the rxq scheduling assignments so they are used

The rxq scheduling is updated to handle both pinned and non-pinned rxqs
in the same call.

Signed-off-by: Kevin Traynor <ktraynor@redhat.com>
---
 lib/dpif-netdev.c | 538 ++++++++++++++++++++++++++++++++++++++--------
 tests/pmd.at      |   2 +-
 2 files changed, 446 insertions(+), 94 deletions(-)

Comments

Pai G, Sunil June 22, 2021, 7:02 p.m. UTC | #1
Hey Kevin , 

Patch looks good to me:

Builds fine , all test cases here http://patchwork.ozlabs.org/project/openvswitch/patch/20210316154532.127858-1-ktraynor@redhat.com/ pass as well.

Some minor nits inline :

<snipped>
 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 650e67ab3..57d23e112
> 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -5006,4 +5006,211 @@ rr_numa_list_destroy(struct rr_numa_list *rr)  }
> 
> +struct sched_numa_list {
> +    struct hmap numas;  /* Contains 'struct sched_numa'. */ };
> +
> +/* Meta data for out-of-place pmd rxq assignments. */ struct sched_pmd
> +{
> +    /* Associated PMD thread. */
> +    struct dp_netdev_pmd_thread *pmd;
> +    uint64_t pmd_proc_cycles;

Is there a purpose to store pmd_proc_cycles? just curious.

> +    struct dp_netdev_rxq **rxqs;
> +    unsigned n_rxq;
> +    bool isolated;
> +};
> +
> +struct sched_numa {
> +    struct hmap_node node;
> +    int numa_id;
> +    /* PMDs on numa node. */
> +    struct sched_pmd *pmds;
> +    /* Num of PMDs on numa node. */
> +    unsigned n_pmds;
> +    /* Num of isolated PMDs on numa node. */
> +    unsigned n_iso;

iso is a bit cryptic , something like n_isolated is better ?


> +    int rr_cur_index;
> +    bool rr_idx_inc;
> +};
> +

<snipped>

> +static unsigned
> +sched_get_numa_pmd_noniso(struct sched_numa *numa) {
> +    if (numa->n_pmds > numa->n_iso) {
> +        return numa->n_pmds - numa->n_iso;
> +    }
> +    return 0;
> +}
> +
>  /* Sort Rx Queues by the processing cycles they are consuming. */  static int
> @@ -5037,22 +5244,106 @@ compare_rxq_cycles(const void *a, const void
> *b)  }
> 
> -/* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
> - * queues and marks the pmds as isolated.  Otherwise, assign non isolated
> - * pmds to unpinned queues.
> +/*
> + * Returns the next pmd from the numa node.
>   *
> - * The function doesn't touch the pmd threads, it just stores the assignment
> - * in the 'pmd' member of each rxq. */
> + * If 'updown' is 'true' it will alternate between selecting the next
> +pmd in
> + * either an up or down walk, switching between up/down when the first
> +or last
> + * core is reached. e.g. 1,2,3,3,2,1,1,2...
> + *
> + * If 'updown' is 'false' it will select the next pmd wrapping around
> +when
> + * last core reached. e.g. 1,2,3,1,2,3,1,2...
> + */
> +static struct sched_pmd *
> +get_rr_pmd(struct sched_numa *numa, bool updown) {
> +    int numa_idx = numa->rr_cur_index;
> +
> +    if (numa->rr_idx_inc == true) {
> +        /* Incrementing through list of pmds. */
> +        if (numa->rr_cur_index == numa->n_pmds - 1) {
> +            /* Reached the last pmd. */
> +            if (updown) {
> +                numa->rr_idx_inc = false;
> +            } else {
> +                numa->rr_cur_index = 0;
> +            }
> +        } else {
> +            numa->rr_cur_index++;
> +        }
> +    } else {
> +        /* Decrementing through list of pmds. */
> +        if (numa->rr_cur_index == 0) {
> +            /* Reached the first pmd. */
> +            numa->rr_idx_inc = true;
> +        } else {
> +            numa->rr_cur_index--;
> +        }
> +    }
> +    return &numa->pmds[numa_idx];
> +}
> +
> +static struct sched_pmd *
> +get_available_rr_pmd(struct sched_numa *numa, bool updown) {
> +    struct sched_pmd *sched_pmd = NULL;
> +
> +    /* get_rr_pmd() may return duplicate PMDs before all PMDs have been
> +     * returned depending on updown. Extend the number of call to ensure
> all
> +     * PMDs can be checked. */
> +    for (unsigned i = 0; i < numa->n_pmds * 2; i++) {
> +        sched_pmd = get_rr_pmd(numa, updown);
> +        if (!sched_pmd->isolated) {
> +            break;
> +        }
> +        sched_pmd = NULL;
> +    }
> +    return sched_pmd;
> +}
> +
> +static struct sched_pmd *
> +get_next_pmd(struct sched_numa *numa, bool algo) {
> +    return get_available_rr_pmd(numa, algo); }

just checking : 
if algo is cycles based : updown = true , else use roundrobin ?

<snipped>

> +
>  static void
> -rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp-
> >port_mutex)
> +sched_numa_list_schedule(struct sched_numa_list *numa_list,
> +                         struct dp_netdev *dp,
> +                         bool algo,

algo could be renamed to something like is_rxq_cyc/is_round_robin ? a bit verbose , but algo seems a bit generic.
Seems like its used in the entire patch.

> +                         enum vlog_level level)
> +    OVS_REQUIRES(dp->port_mutex)

<snipped>
David Marchand June 23, 2021, 1:04 p.m. UTC | #2
On Fri, Jun 4, 2021 at 11:19 PM Kevin Traynor <ktraynor@redhat.com> wrote:
>
> This reworks the current rxq scheduling code to break it into more
> generic and reusable pieces.
>
> The behaviour does not change from a user perspective, except the logs
> are updated to be more consistent.
>
> From an implementation view, there are some changes with mind to adding
> functionality and reuse in later patches.
>
> The high level reusable functions added in this patch are:
> - Generate a list of current numas and pmds
> - Perform rxq scheduling into the list
> - Effect the rxq scheduling assignments so they are used
>
> The rxq scheduling is updated to handle both pinned and non-pinned rxqs
> in the same call.

As a global comment, I prefer consistency for function names.
I put suggestions inline, you can take these as simple nits.


This patch prepares arrival of new scheduling algorithm, but uses a
single boolean flag.
I would avoid this temp boolean and introduce the enum (used later in
this series).


>
> Signed-off-by: Kevin Traynor <ktraynor@redhat.com>
> ---
>  lib/dpif-netdev.c | 538 ++++++++++++++++++++++++++++++++++++++--------
>  tests/pmd.at      |   2 +-
>  2 files changed, 446 insertions(+), 94 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 650e67ab3..57d23e112 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -5006,4 +5006,211 @@ rr_numa_list_destroy(struct rr_numa_list *rr)
>  }
>
> +struct sched_numa_list {
> +    struct hmap numas;  /* Contains 'struct sched_numa'. */
> +};
> +
> +/* Meta data for out-of-place pmd rxq assignments. */
> +struct sched_pmd {
> +    /* Associated PMD thread. */
> +    struct dp_netdev_pmd_thread *pmd;
> +    uint64_t pmd_proc_cycles;
> +    struct dp_netdev_rxq **rxqs;
> +    unsigned n_rxq;
> +    bool isolated;
> +};

sched_pmd objects are associated in a unique fashion to a sched_numa object.
Having a back pointer to sched_numa in the sched_pmd object removes
the need for sched_numa_list_find_numa().


> +
> +struct sched_numa {
> +    struct hmap_node node;
> +    int numa_id;
> +    /* PMDs on numa node. */
> +    struct sched_pmd *pmds;
> +    /* Num of PMDs on numa node. */
> +    unsigned n_pmds;
> +    /* Num of isolated PMDs on numa node. */
> +    unsigned n_iso;
> +    int rr_cur_index;
> +    bool rr_idx_inc;
> +};
> +
> +static size_t
> +sched_numa_list_count(struct sched_numa_list *numa_list)
> +{
> +    return hmap_count(&numa_list->numas);
> +}
> +
> +static struct sched_numa *
> +sched_numa_list_next(struct sched_numa_list *numa_list,
> +                     const struct sched_numa *numa)
> +{
> +    struct hmap_node *node = NULL;
> +
> +    if (numa) {
> +        node = hmap_next(&numa_list->numas, &numa->node);
> +    }
> +    if (!node) {
> +        node = hmap_first(&numa_list->numas);
> +    }
> +
> +    return (node) ? CONTAINER_OF(node, struct sched_numa, node) : NULL;
> +}
> +
> +static struct sched_numa *
> +sched_numa_list_lookup(struct sched_numa_list * numa_list, int numa_id)

Nit: no space *numa_list


> +{
> +    struct sched_numa *numa;
> +
> +    HMAP_FOR_EACH_WITH_HASH (numa, node, hash_int(numa_id, 0),
> +                             &numa_list->numas) {
> +        if (numa->numa_id == numa_id) {
> +            return numa;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Populate numas and pmds on those numas */

Nit: missing a '.'.


> +static void
> +sched_numa_list_populate(struct sched_numa_list *numa_list,
> +                         struct dp_netdev *dp)
> +{
> +    struct dp_netdev_pmd_thread *pmd;

Nit: missing a blank line after var definition.


> +    hmap_init(&numa_list->numas);
> +
> +    /* For each pmd on this datapath. */
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        struct sched_numa *numa;
> +        struct sched_pmd *sched_pmd;
> +        if (pmd->core_id == NON_PMD_CORE_ID) {
> +            continue;
> +        }
> +
> +        /* Get the numa of the PMD. */
> +        numa = sched_numa_list_lookup(numa_list, pmd->numa_id);
> +        /* Create a new numa node for it if not already created */

Nit: missing a '.'.


> +        if (!numa) {
> +            numa = xzalloc(sizeof *numa);
> +            numa->numa_id = pmd->numa_id;
> +            hmap_insert(&numa_list->numas, &numa->node,
> +                        hash_int(pmd->numa_id, 0));
> +        }
> +
> +        /* Create a sched_pmd on this numa for the pmd. */
> +        numa->n_pmds++;
> +        numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
> +        sched_pmd = &numa->pmds[numa->n_pmds - 1];
> +        memset(sched_pmd ,0, sizeof *sched_pmd);

Nit: should be sched_pmd, 0,


> +        sched_pmd->pmd = pmd;
> +        /* At least one pmd is present so initialize curr_idx and idx_inc. */
> +        numa->rr_cur_index = 0;
> +        numa->rr_idx_inc = true;
> +    }
> +}
> +
> +static void
> +sched_numa_list_free_entries(struct sched_numa_list *numa_list)
> +{
> +    struct sched_numa *numa;
> +
> +    HMAP_FOR_EACH_POP (numa, node, &numa_list->numas) {
> +        for (unsigned i = 0; i < numa->n_pmds; i++) {
> +            struct sched_pmd *sched_pmd;
> +
> +            sched_pmd = &numa->pmds[i];
> +            sched_pmd->n_rxq = 0;
> +            free(sched_pmd->rxqs);
> +        }
> +        numa->n_pmds = 0;
> +        free(numa->pmds);
> +    }

Are we leaking numa objects?


> +    hmap_destroy(&numa_list->numas);
> +}
> +
> +static struct sched_pmd *
> +find_sched_pmd_by_pmd(struct sched_numa_list *numa_list,
> +                      struct dp_netdev_pmd_thread *pmd)

Nit: it returns a sched_pmd object, I would go with sched_pmd_find_by_pmd().


> +{
> +    struct sched_numa *numa;
> +
> +    HMAP_FOR_EACH (numa, node, &numa_list->numas) {
> +        for (unsigned i = 0; i < numa->n_pmds; i++) {
> +            struct sched_pmd *sched_pmd;
> +
> +            sched_pmd = &numa->pmds[i];
> +            if (pmd == sched_pmd->pmd) {
> +                return sched_pmd;
> +            }
> +        }
> +    }
> +    return NULL;
> +}
> +
> +static struct sched_numa *
> +sched_numa_list_find_numa(struct sched_numa_list *numa_list,
> +                           struct sched_pmd *sched_pmd)

As commented around the sched_pmd struct definition, this function can
be removed if sched_pmd has a reference to sched_numa.


> +{
> +    struct sched_numa *numa;
> +
> +    HMAP_FOR_EACH (numa, node, &numa_list->numas) {
> +        for (unsigned i = 0; i < numa->n_pmds; i++) {
> +            struct sched_pmd *numa_sched_pmd;
> +
> +            numa_sched_pmd = &numa->pmds[i];
> +            if (numa_sched_pmd == sched_pmd) {
> +                return numa;
> +            }
> +        }
> +    }
> +    return NULL;
> +}
> +
> +static void
> +sched_add_rxq_to_sched_pmd(struct sched_pmd *sched_pmd,
> +                           struct dp_netdev_rxq *rxq, uint64_t cycles)

Nit: name could be sched_pmd_add_rxq().


> +{
> +    /* As sched_pmd is allocated outside this fn. better to not assume
> +     * rxq is initialized to NULL. */
> +    if (sched_pmd->n_rxq == 0) {
> +        sched_pmd->rxqs = xmalloc(sizeof *sched_pmd->rxqs);
> +    } else {
> +        sched_pmd->rxqs = xrealloc(sched_pmd->rxqs, (sched_pmd->n_rxq + 1) *
> +                                                    sizeof *sched_pmd->rxqs);
> +    }
> +
> +    sched_pmd->rxqs[sched_pmd->n_rxq++] = rxq;
> +    sched_pmd->pmd_proc_cycles += cycles;
> +}
> +
> +static void
> +sched_numa_list_put_in_place(struct sched_numa_list *numa_list)
> +{
> +    struct sched_numa *numa;
> +
> +    /* For each numa */
> +    HMAP_FOR_EACH (numa, node, &numa_list->numas) {
> +        /* For each pmd */
> +        for (int i = 0; i < numa->n_pmds; i++) {
> +            struct sched_pmd *sched_pmd;
> +
> +            sched_pmd = &numa->pmds[i];
> +            sched_pmd->pmd->isolated = sched_pmd->isolated;
> +            /* For each rxq. */
> +            for (unsigned k = 0; k < sched_pmd->n_rxq; k++) {
> +                /* Store the new pmd from the out of place sched_numa_list
> +                 * struct to the dp_netdev_rxq struct */
> +                sched_pmd->rxqs[k]->pmd = sched_pmd->pmd;
> +            }
> +        }
> +    }
> +}
> +
> +static unsigned
> +sched_get_numa_pmd_noniso(struct sched_numa *numa)

Nit: we are not getting a pmd from this function, sched_numa_noniso_pmd_count()?


> +{
> +    if (numa->n_pmds > numa->n_iso) {
> +        return numa->n_pmds - numa->n_iso;
> +    }
> +    return 0;
> +}
> +
>  /* Sort Rx Queues by the processing cycles they are consuming. */
>  static int
> @@ -5037,22 +5244,106 @@ compare_rxq_cycles(const void *a, const void *b)
>  }
>
> -/* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
> - * queues and marks the pmds as isolated.  Otherwise, assign non isolated
> - * pmds to unpinned queues.
> +/*
> + * Returns the next pmd from the numa node.
>   *
> - * The function doesn't touch the pmd threads, it just stores the assignment
> - * in the 'pmd' member of each rxq. */
> + * If 'updown' is 'true' it will alternate between selecting the next pmd in
> + * either an up or down walk, switching between up/down when the first or last
> + * core is reached. e.g. 1,2,3,3,2,1,1,2...
> + *
> + * If 'updown' is 'false' it will select the next pmd wrapping around when
> + * last core reached. e.g. 1,2,3,1,2,3,1,2...
> + */
> +static struct sched_pmd *
> +get_rr_pmd(struct sched_numa *numa, bool updown)

Nit: sched_pmd_next_rr().


> +{
> +    int numa_idx = numa->rr_cur_index;
> +
> +    if (numa->rr_idx_inc == true) {
> +        /* Incrementing through list of pmds. */
> +        if (numa->rr_cur_index == numa->n_pmds - 1) {
> +            /* Reached the last pmd. */
> +            if (updown) {
> +                numa->rr_idx_inc = false;
> +            } else {
> +                numa->rr_cur_index = 0;
> +            }
> +        } else {
> +            numa->rr_cur_index++;
> +        }
> +    } else {
> +        /* Decrementing through list of pmds. */
> +        if (numa->rr_cur_index == 0) {
> +            /* Reached the first pmd. */
> +            numa->rr_idx_inc = true;
> +        } else {
> +            numa->rr_cur_index--;
> +        }
> +    }
> +    return &numa->pmds[numa_idx];
> +}
> +
> +static struct sched_pmd *
> +get_available_rr_pmd(struct sched_numa *numa, bool updown)

Nit: sched_pmd_next_noniso_rr().


> +{
> +    struct sched_pmd *sched_pmd = NULL;
> +
> +    /* get_rr_pmd() may return duplicate PMDs before all PMDs have been
> +     * returned depending on updown. Extend the number of call to ensure all
> +     * PMDs can be checked. */
> +    for (unsigned i = 0; i < numa->n_pmds * 2; i++) {
> +        sched_pmd = get_rr_pmd(numa, updown);
> +        if (!sched_pmd->isolated) {
> +            break;
> +        }
> +        sched_pmd = NULL;
> +    }
> +    return sched_pmd;
> +}
> +
> +static struct sched_pmd *
> +get_next_pmd(struct sched_numa *numa, bool algo)

Nit: sched_pmd_next().


> +{
> +    return get_available_rr_pmd(numa, algo);
> +}
> +
> +static const char *
> +get_assignment_type_string(bool algo)
> +{
> +    if (algo == false) {
> +        return "roundrobin";
> +    }
> +    return "cycles";
> +}
> +
> +#define MAX_RXQ_CYC_STRLEN (INT_STRLEN(uint64_t) + 40)

I am not sure where this 40 comes from.
Is this "big enough in most cases"?


> +
> +static bool
> +get_rxq_cyc_log(char *a, bool algo, uint64_t cycles)
> +{
> +    int ret = 0;
> +
> +    if (algo) {
> +        ret = snprintf(a, MAX_RXQ_CYC_STRLEN,
> +                 " (measured processing cycles %"PRIu64").",

The final "." must be removed since it is not consistent with the !algo case.


> +                 cycles);
> +    }
> +    return ret > 0;
> +}

We can do a[0] = '\0' when !algo and on error and return 'a' unconditionnally.
This way, callers don't need to check this function succeeded.



> +
>  static void
> -rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
> +sched_numa_list_schedule(struct sched_numa_list *numa_list,
> +                         struct dp_netdev *dp,
> +                         bool algo,
> +                         enum vlog_level level)
> +    OVS_REQUIRES(dp->port_mutex)
>  {
>      struct dp_netdev_port *port;
> -    struct rr_numa_list rr;
> -    struct rr_numa *non_local_numa = NULL;
>      struct dp_netdev_rxq ** rxqs = NULL;

**rxqs

> -    int n_rxqs = 0;
> -    struct rr_numa *numa = NULL;
> -    int numa_id;
> -    bool assign_cyc = dp->pmd_rxq_assign_cyc;
> +    struct sched_numa *last_cross_numa;
> +    unsigned n_rxqs = 0;
> +    bool start_logged = false;
> +    size_t n_numa;
>
> +    /* For each port. */
>      HMAP_FOR_EACH (port, node, &dp->ports) {
>          if (!netdev_is_pmd(port->netdev)) {
> @@ -5060,48 +5351,68 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
>          }
>
> +        /* For each rxq on the port. */
>          for (int qid = 0; qid < port->n_rxq; qid++) {
> -            struct dp_netdev_rxq *q = &port->rxqs[qid];
> +            struct dp_netdev_rxq *rxq = &port->rxqs[qid];
>
> -            if (pinned && q->core_id != OVS_CORE_UNSPEC) {
> -                struct dp_netdev_pmd_thread *pmd;
> +            rxqs = xrealloc(rxqs, (n_rxqs + 1) * sizeof *rxqs);
> +            rxqs[n_rxqs++] = rxq;

We might as well only store unpinned queues in rxqs (maybe rename this
var) in this first loop... (see comment in second loop)


>
> -                pmd = dp_netdev_get_pmd(dp, q->core_id);
> -                if (!pmd) {
> -                    VLOG_WARN("There is no PMD thread on core %d. Queue "
> -                              "%d on port \'%s\' will not be polled.",
> -                              q->core_id, qid, netdev_get_name(port->netdev));
> -                } else {
> -                    q->pmd = pmd;
> -                    pmd->isolated = true;
> -                    VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
> -                              "rx queue %d.", pmd->core_id, pmd->numa_id,
> -                              netdev_rxq_get_name(q->rx),
> -                              netdev_rxq_get_queue_id(q->rx));
> -                    dp_netdev_pmd_unref(pmd);
> -                }
> -            } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
> +            if (algo == true) {
>                  uint64_t cycle_hist = 0;
>
> -                if (n_rxqs == 0) {
> -                    rxqs = xmalloc(sizeof *rxqs);
> -                } else {
> -                    rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
> +                /* Sum the queue intervals and store the cycle history. */
> +                for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
> +                    cycle_hist += dp_netdev_rxq_get_intrvl_cycles(rxq, i);
>                  }
> +                dp_netdev_rxq_set_cycles(rxq, RXQ_CYCLES_PROC_HIST,
> +                                         cycle_hist);
> +            }
>
> -                if (assign_cyc) {
> -                    /* Sum the queue intervals and store the cycle history. */
> -                    for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
> -                        cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
> -                    }
> -                    dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST,
> -                                             cycle_hist);
> +            /* Check if this rxq is pinned. */
> +            if (rxq->core_id != OVS_CORE_UNSPEC) {
> +                struct sched_pmd *sched_pmd = NULL;

No need to initialize to NULL.


> +                struct dp_netdev_pmd_thread *pmd;
> +                struct sched_numa *numa;
> +                uint64_t proc_cycles;
> +                char rxq_cyc_log[MAX_RXQ_CYC_STRLEN];
> +
> +
> +                /* This rxq should be pinned, pin it now. */
> +                pmd = dp_netdev_get_pmd(dp, rxq->core_id);
> +                sched_pmd = find_sched_pmd_by_pmd(numa_list, pmd);
> +                if (!sched_pmd) {
> +                    /* Cannot find the PMD.  Cannot pin this rxq. */
> +                    VLOG(level == VLL_DBG ? VLL_DBG : VLL_WARN,
> +                            "Core %2u cannot be pinned with "
> +                            "port \'%s\' rx queue %d. Use pmd-cpu-mask to "
> +                            "enable a pmd on core %u.",
> +                            rxq->core_id,
> +                            netdev_rxq_get_name(rxq->rx),
> +                            netdev_rxq_get_queue_id(rxq->rx),
> +                            rxq->core_id);
> +                    continue;
> +                }
> +                /* Mark PMD as isolated if not done already. */
> +                if (sched_pmd->isolated == false) {
> +                    sched_pmd->isolated = true;
> +                    numa = sched_numa_list_find_numa(numa_list,
> +                                                     sched_pmd);
> +                    numa->n_iso++;
>                  }
> -                /* Store the queue. */
> -                rxqs[n_rxqs++] = q;
> +                proc_cycles = dp_netdev_rxq_get_cycles(rxq,
> +                                                       RXQ_CYCLES_PROC_HIST);
> +                VLOG(level, "Core %2u on numa node %d is pinned with "
> +                            "port \'%s\' rx queue %d.%s",

rx queue %d.%s -> rx queue %d%s. (once get_rxq_cyc_log() does not put
a final '.').



> +                            sched_pmd->pmd->core_id, sched_pmd->pmd->numa_id,
> +                            netdev_rxq_get_name(rxq->rx),
> +                            netdev_rxq_get_queue_id(rxq->rx),
> +                            get_rxq_cyc_log(rxq_cyc_log, algo, proc_cycles)
> +                               ? rxq_cyc_log : "");
> +                sched_add_rxq_to_sched_pmd(sched_pmd, rxq, proc_cycles);
>              }
>          }
>      }
>
> -    if (n_rxqs > 1 && assign_cyc) {
> +    if (n_rxqs > 1 && algo) {
>          /* Sort the queues in order of the processing cycles
>           * they consumed during their last pmd interval. */
> @@ -5109,54 +5420,100 @@ rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
>      }
>
> -    rr_numa_list_populate(dp, &rr);
> -    /* Assign the sorted queues to pmds in round robin. */
> -    for (int i = 0; i < n_rxqs; i++) {
> -        numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
> -        numa = rr_numa_list_lookup(&rr, numa_id);
> -        if (!numa) {
> -            /* There are no pmds on the queue's local NUMA node.
> -               Round robin on the NUMA nodes that do have pmds. */
> -            non_local_numa = rr_numa_list_next(&rr, non_local_numa);
> -            if (!non_local_numa) {
> -                VLOG_ERR("There is no available (non-isolated) pmd "
> -                         "thread for port \'%s\' queue %d. This queue "
> -                         "will not be polled. Is pmd-cpu-mask set to "
> -                         "zero? Or are all PMDs isolated to other "
> -                         "queues?", netdev_rxq_get_name(rxqs[i]->rx),
> -                         netdev_rxq_get_queue_id(rxqs[i]->rx));
> -                continue;
> +    last_cross_numa = NULL;
> +    n_numa = sched_numa_list_count(numa_list);
> +    for (unsigned i = 0; i < n_rxqs; i++) {
> +        struct dp_netdev_rxq *rxq = rxqs[i];
> +        struct sched_pmd *sched_pmd;
> +        struct sched_numa *numa;
> +        int numa_id;
> +        uint64_t proc_cycles;
> +        char rxq_cyc_log[MAX_RXQ_CYC_STRLEN];
> +
> +        if (rxq->core_id != OVS_CORE_UNSPEC) {
> +            continue;
> +        }

(cont of comment on rxqs array)... this way there is no need for a "is
rxq pinned?" check here.


> +
> +        if (start_logged == false && level != VLL_DBG) {
> +            VLOG(level, "Performing pmd to rx queue assignment using %s "
> +                        "algorithm.", get_assignment_type_string(algo));
> +            start_logged = true;
> +        }
> +
> +        /* Store the cycles for this rxq as we will log these later. */
> +        proc_cycles = dp_netdev_rxq_get_cycles(rxq, RXQ_CYCLES_PROC_HIST);
> +        /* Select the numa that should be used for this rxq. */
> +        numa_id = netdev_get_numa_id(rxq->port->netdev);
> +        numa = sched_numa_list_lookup(numa_list, numa_id);
> +
> +        /* Ensure that there is at least one non-isolated pmd on that numa. */
> +        if (numa && !sched_get_numa_pmd_noniso(numa)) {
> +            numa = NULL;
> +        }

I think this block above can be covered with the check below.


> +
> +        if (!numa || !sched_get_numa_pmd_noniso(numa)) {
> +            /* Find any numa with available pmds. */
> +            for (int k = 0; k < n_numa; k++) {

Super nit: previous index is 'i', I would expect 'j'.


> +                numa = sched_numa_list_next(numa_list, last_cross_numa);
> +                if (sched_get_numa_pmd_noniso(numa)) {
> +                    break;
> +                }
> +                last_cross_numa = numa;
> +                numa = NULL;
>              }
> -            rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa, assign_cyc);
> -            VLOG_WARN("There's no available (non-isolated) pmd thread "
> -                      "on numa node %d. Queue %d on port \'%s\' will "
> -                      "be assigned to the pmd on core %d "
> -                      "(numa node %d). Expect reduced performance.",
> -                      numa_id, netdev_rxq_get_queue_id(rxqs[i]->rx),
> -                      netdev_rxq_get_name(rxqs[i]->rx),
> -                      rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
> -        } else {
> -            rxqs[i]->pmd = rr_numa_get_pmd(numa, assign_cyc);
> -            if (assign_cyc) {
> -                VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
> -                          "rx queue %d "
> -                          "(measured processing cycles %"PRIu64").",
> -                          rxqs[i]->pmd->core_id, numa_id,
> -                          netdev_rxq_get_name(rxqs[i]->rx),
> -                          netdev_rxq_get_queue_id(rxqs[i]->rx),
> -                          dp_netdev_rxq_get_cycles(rxqs[i],
> -                                                   RXQ_CYCLES_PROC_HIST));
> -            } else {
> -                VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
> -                          "rx queue %d.", rxqs[i]->pmd->core_id, numa_id,
> -                          netdev_rxq_get_name(rxqs[i]->rx),
> -                          netdev_rxq_get_queue_id(rxqs[i]->rx));
> +        }
> +        if (numa && numa->numa_id != numa_id) {
> +            VLOG(level, "There's no available (non-isolated) pmd thread "
> +                        "on numa node %d. Port \'%s\' rx queue %d will "
> +                        "be assigned to a pmd on numa node %d. "
> +                        "This may lead to reduced performance.",
> +                        numa_id, netdev_rxq_get_name(rxq->rx),
> +                        netdev_rxq_get_queue_id(rxq->rx), numa->numa_id);
> +        }
> +
> +        sched_pmd = NULL;
> +        if (numa) {
> +            /* Select the PMD that should be used for this rxq. */
> +            sched_pmd = get_next_pmd(numa, algo);
> +            if (sched_pmd) {
> +                VLOG(level, "Core %2u on numa node %d assigned port \'%s\' "
> +                            "rx queue %d.%s",

%d%s.


> +                            sched_pmd->pmd->core_id, sched_pmd->pmd->numa_id,
> +                            netdev_rxq_get_name(rxq->rx),
> +                            netdev_rxq_get_queue_id(rxq->rx),
> +                            get_rxq_cyc_log(rxq_cyc_log, algo, proc_cycles)
> +                                ? rxq_cyc_log : "");
> +                sched_add_rxq_to_sched_pmd(sched_pmd, rxq, proc_cycles);
>              }
>          }
> +        if (!sched_pmd) {
> +            VLOG(level == VLL_DBG ? level : VLL_WARN,
> +                    "No non-isolated pmd on any numa available for "
> +                    "port \'%s\' rx queue %d.%s "

%d%s.



> +                    "This rx queue will not be polled.",
> +                    netdev_rxq_get_name(rxq->rx),
> +                    netdev_rxq_get_queue_id(rxq->rx),
> +                    get_rxq_cyc_log(rxq_cyc_log, algo, proc_cycles)
> +                        ? rxq_cyc_log :  "");
> +        }
>      }
> -
> -    rr_numa_list_destroy(&rr);
>      free(rxqs);
>  }
>
> +static void
> +rxq_scheduling(struct dp_netdev *dp) OVS_REQUIRES(dp->port_mutex)
> +{
> +    struct sched_numa_list *numa_list;
> +    bool algo = dp->pmd_rxq_assign_cyc;
> +
> +    numa_list = xzalloc(sizeof *numa_list);
> +
> +    sched_numa_list_populate(numa_list, dp);
> +    sched_numa_list_schedule(numa_list, dp, algo, VLL_INFO);
> +    sched_numa_list_put_in_place(numa_list);
> +
> +    sched_numa_list_free_entries(numa_list);
> +    free(numa_list);
> +}
> +
>  static void
>  reload_affected_pmds(struct dp_netdev *dp)
> @@ -5406,10 +5763,5 @@ reconfigure_datapath(struct dp_netdev *dp)
>          }
>      }
> -
> -    /* Add pinned queues and mark pmd threads isolated. */
> -    rxq_scheduling(dp, true);
> -
> -    /* Add non-pinned queues. */
> -    rxq_scheduling(dp, false);
> +    rxq_scheduling(dp);
>
>      /* Step 5: Remove queues not compliant with new scheduling. */
> diff --git a/tests/pmd.at b/tests/pmd.at
> index cc5371d5a..78105bf45 100644
> --- a/tests/pmd.at
> +++ b/tests/pmd.at
> @@ -580,5 +580,5 @@ p1 3 0 2
>  ])
>
> -OVS_VSWITCHD_STOP(["/dpif_netdev|WARN|There is no PMD thread on core/d"])
> +OVS_VSWITCHD_STOP(["/cannot be pinned with port/d"])
>  AT_CLEANUP
>
> --
> 2.31.1
>
diff mbox series

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 650e67ab3..57d23e112 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -5006,4 +5006,211 @@  rr_numa_list_destroy(struct rr_numa_list *rr)
 }
 
+struct sched_numa_list {
+    struct hmap numas;  /* Contains 'struct sched_numa'. */
+};
+
+/* Meta data for out-of-place pmd rxq assignments. */
+struct sched_pmd {
+    /* Associated PMD thread. */
+    struct dp_netdev_pmd_thread *pmd;
+    uint64_t pmd_proc_cycles;
+    struct dp_netdev_rxq **rxqs;
+    unsigned n_rxq;
+    bool isolated;
+};
+
+struct sched_numa {
+    struct hmap_node node;
+    int numa_id;
+    /* PMDs on numa node. */
+    struct sched_pmd *pmds;
+    /* Num of PMDs on numa node. */
+    unsigned n_pmds;
+    /* Num of isolated PMDs on numa node. */
+    unsigned n_iso;
+    int rr_cur_index;
+    bool rr_idx_inc;
+};
+
+static size_t
+sched_numa_list_count(struct sched_numa_list *numa_list)
+{
+    return hmap_count(&numa_list->numas);
+}
+
+static struct sched_numa *
+sched_numa_list_next(struct sched_numa_list *numa_list,
+                     const struct sched_numa *numa)
+{
+    struct hmap_node *node = NULL;
+
+    if (numa) {
+        node = hmap_next(&numa_list->numas, &numa->node);
+    }
+    if (!node) {
+        node = hmap_first(&numa_list->numas);
+    }
+
+    return (node) ? CONTAINER_OF(node, struct sched_numa, node) : NULL;
+}
+
+static struct sched_numa *
+sched_numa_list_lookup(struct sched_numa_list * numa_list, int numa_id)
+{
+    struct sched_numa *numa;
+
+    HMAP_FOR_EACH_WITH_HASH (numa, node, hash_int(numa_id, 0),
+                             &numa_list->numas) {
+        if (numa->numa_id == numa_id) {
+            return numa;
+        }
+    }
+    return NULL;
+}
+
+/* Populate numas and pmds on those numas */
+static void
+sched_numa_list_populate(struct sched_numa_list *numa_list,
+                         struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    hmap_init(&numa_list->numas);
+
+    /* For each pmd on this datapath. */
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        struct sched_numa *numa;
+        struct sched_pmd *sched_pmd;
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
+
+        /* Get the numa of the PMD. */
+        numa = sched_numa_list_lookup(numa_list, pmd->numa_id);
+        /* Create a new numa node for it if not already created */
+        if (!numa) {
+            numa = xzalloc(sizeof *numa);
+            numa->numa_id = pmd->numa_id;
+            hmap_insert(&numa_list->numas, &numa->node,
+                        hash_int(pmd->numa_id, 0));
+        }
+
+        /* Create a sched_pmd on this numa for the pmd. */
+        numa->n_pmds++;
+        numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
+        sched_pmd = &numa->pmds[numa->n_pmds - 1];
+        memset(sched_pmd ,0, sizeof *sched_pmd);
+        sched_pmd->pmd = pmd;
+        /* At least one pmd is present so initialize curr_idx and idx_inc. */
+        numa->rr_cur_index = 0;
+        numa->rr_idx_inc = true;
+    }
+}
+
+static void
+sched_numa_list_free_entries(struct sched_numa_list *numa_list)
+{
+    struct sched_numa *numa;
+
+    HMAP_FOR_EACH_POP (numa, node, &numa_list->numas) {
+        for (unsigned i = 0; i < numa->n_pmds; i++) {
+            struct sched_pmd *sched_pmd;
+
+            sched_pmd = &numa->pmds[i];
+            sched_pmd->n_rxq = 0;
+            free(sched_pmd->rxqs);
+        }
+        numa->n_pmds = 0;
+        free(numa->pmds);
+    }
+    hmap_destroy(&numa_list->numas);
+}
+
+static struct sched_pmd *
+find_sched_pmd_by_pmd(struct sched_numa_list *numa_list,
+                      struct dp_netdev_pmd_thread *pmd)
+{
+    struct sched_numa *numa;
+
+    HMAP_FOR_EACH (numa, node, &numa_list->numas) {
+        for (unsigned i = 0; i < numa->n_pmds; i++) {
+            struct sched_pmd *sched_pmd;
+
+            sched_pmd = &numa->pmds[i];
+            if (pmd == sched_pmd->pmd) {
+                return sched_pmd;
+            }
+        }
+    }
+    return NULL;
+}
+
+static struct sched_numa *
+sched_numa_list_find_numa(struct sched_numa_list *numa_list,
+                           struct sched_pmd *sched_pmd)
+{
+    struct sched_numa *numa;
+
+    HMAP_FOR_EACH (numa, node, &numa_list->numas) {
+        for (unsigned i = 0; i < numa->n_pmds; i++) {
+            struct sched_pmd *numa_sched_pmd;
+
+            numa_sched_pmd = &numa->pmds[i];
+            if (numa_sched_pmd == sched_pmd) {
+                return numa;
+            }
+        }
+    }
+    return NULL;
+}
+
+static void
+sched_add_rxq_to_sched_pmd(struct sched_pmd *sched_pmd,
+                           struct dp_netdev_rxq *rxq, uint64_t cycles)
+{
+    /* As sched_pmd is allocated outside this fn. better to not assume
+     * rxq is initialized to NULL. */
+    if (sched_pmd->n_rxq == 0) {
+        sched_pmd->rxqs = xmalloc(sizeof *sched_pmd->rxqs);
+    } else {
+        sched_pmd->rxqs = xrealloc(sched_pmd->rxqs, (sched_pmd->n_rxq + 1) *
+                                                    sizeof *sched_pmd->rxqs);
+    }
+
+    sched_pmd->rxqs[sched_pmd->n_rxq++] = rxq;
+    sched_pmd->pmd_proc_cycles += cycles;
+}
+
+static void
+sched_numa_list_put_in_place(struct sched_numa_list *numa_list)
+{
+    struct sched_numa *numa;
+
+    /* For each numa */
+    HMAP_FOR_EACH (numa, node, &numa_list->numas) {
+        /* For each pmd */
+        for (int i = 0; i < numa->n_pmds; i++) {
+            struct sched_pmd *sched_pmd;
+
+            sched_pmd = &numa->pmds[i];
+            sched_pmd->pmd->isolated = sched_pmd->isolated;
+            /* For each rxq. */
+            for (unsigned k = 0; k < sched_pmd->n_rxq; k++) {
+                /* Store the new pmd from the out of place sched_numa_list
+                 * struct to the dp_netdev_rxq struct */
+                sched_pmd->rxqs[k]->pmd = sched_pmd->pmd;
+            }
+        }
+    }
+}
+
+static unsigned
+sched_get_numa_pmd_noniso(struct sched_numa *numa)
+{
+    if (numa->n_pmds > numa->n_iso) {
+        return numa->n_pmds - numa->n_iso;
+    }
+    return 0;
+}
+
 /* Sort Rx Queues by the processing cycles they are consuming. */
 static int
@@ -5037,22 +5244,106 @@  compare_rxq_cycles(const void *a, const void *b)
 }
 
-/* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
- * queues and marks the pmds as isolated.  Otherwise, assign non isolated
- * pmds to unpinned queues.
+/*
+ * Returns the next pmd from the numa node.
  *
- * The function doesn't touch the pmd threads, it just stores the assignment
- * in the 'pmd' member of each rxq. */
+ * If 'updown' is 'true' it will alternate between selecting the next pmd in
+ * either an up or down walk, switching between up/down when the first or last
+ * core is reached. e.g. 1,2,3,3,2,1,1,2...
+ *
+ * If 'updown' is 'false' it will select the next pmd wrapping around when
+ * last core reached. e.g. 1,2,3,1,2,3,1,2...
+ */
+static struct sched_pmd *
+get_rr_pmd(struct sched_numa *numa, bool updown)
+{
+    int numa_idx = numa->rr_cur_index;
+
+    if (numa->rr_idx_inc == true) {
+        /* Incrementing through list of pmds. */
+        if (numa->rr_cur_index == numa->n_pmds - 1) {
+            /* Reached the last pmd. */
+            if (updown) {
+                numa->rr_idx_inc = false;
+            } else {
+                numa->rr_cur_index = 0;
+            }
+        } else {
+            numa->rr_cur_index++;
+        }
+    } else {
+        /* Decrementing through list of pmds. */
+        if (numa->rr_cur_index == 0) {
+            /* Reached the first pmd. */
+            numa->rr_idx_inc = true;
+        } else {
+            numa->rr_cur_index--;
+        }
+    }
+    return &numa->pmds[numa_idx];
+}
+
+static struct sched_pmd *
+get_available_rr_pmd(struct sched_numa *numa, bool updown)
+{
+    struct sched_pmd *sched_pmd = NULL;
+
+    /* get_rr_pmd() may return duplicate PMDs before all PMDs have been
+     * returned depending on updown. Extend the number of call to ensure all
+     * PMDs can be checked. */
+    for (unsigned i = 0; i < numa->n_pmds * 2; i++) {
+        sched_pmd = get_rr_pmd(numa, updown);
+        if (!sched_pmd->isolated) {
+            break;
+        }
+        sched_pmd = NULL;
+    }
+    return sched_pmd;
+}
+
+static struct sched_pmd *
+get_next_pmd(struct sched_numa *numa, bool algo)
+{
+    return get_available_rr_pmd(numa, algo);
+}
+
+static const char *
+get_assignment_type_string(bool algo)
+{
+    if (algo == false) {
+        return "roundrobin";
+    }
+    return "cycles";
+}
+
+#define MAX_RXQ_CYC_STRLEN (INT_STRLEN(uint64_t) + 40)
+
+static bool
+get_rxq_cyc_log(char *a, bool algo, uint64_t cycles)
+{
+    int ret = 0;
+
+    if (algo) {
+        ret = snprintf(a, MAX_RXQ_CYC_STRLEN,
+                 " (measured processing cycles %"PRIu64").",
+                 cycles);
+    }
+    return ret > 0;
+}
+
 static void
-rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
+sched_numa_list_schedule(struct sched_numa_list *numa_list,
+                         struct dp_netdev *dp,
+                         bool algo,
+                         enum vlog_level level)
+    OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_port *port;
-    struct rr_numa_list rr;
-    struct rr_numa *non_local_numa = NULL;
     struct dp_netdev_rxq ** rxqs = NULL;
-    int n_rxqs = 0;
-    struct rr_numa *numa = NULL;
-    int numa_id;
-    bool assign_cyc = dp->pmd_rxq_assign_cyc;
+    struct sched_numa *last_cross_numa;
+    unsigned n_rxqs = 0;
+    bool start_logged = false;
+    size_t n_numa;
 
+    /* For each port. */
     HMAP_FOR_EACH (port, node, &dp->ports) {
         if (!netdev_is_pmd(port->netdev)) {
@@ -5060,48 +5351,68 @@  rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
         }
 
+        /* For each rxq on the port. */
         for (int qid = 0; qid < port->n_rxq; qid++) {
-            struct dp_netdev_rxq *q = &port->rxqs[qid];
+            struct dp_netdev_rxq *rxq = &port->rxqs[qid];
 
-            if (pinned && q->core_id != OVS_CORE_UNSPEC) {
-                struct dp_netdev_pmd_thread *pmd;
+            rxqs = xrealloc(rxqs, (n_rxqs + 1) * sizeof *rxqs);
+            rxqs[n_rxqs++] = rxq;
 
-                pmd = dp_netdev_get_pmd(dp, q->core_id);
-                if (!pmd) {
-                    VLOG_WARN("There is no PMD thread on core %d. Queue "
-                              "%d on port \'%s\' will not be polled.",
-                              q->core_id, qid, netdev_get_name(port->netdev));
-                } else {
-                    q->pmd = pmd;
-                    pmd->isolated = true;
-                    VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
-                              "rx queue %d.", pmd->core_id, pmd->numa_id,
-                              netdev_rxq_get_name(q->rx),
-                              netdev_rxq_get_queue_id(q->rx));
-                    dp_netdev_pmd_unref(pmd);
-                }
-            } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
+            if (algo == true) {
                 uint64_t cycle_hist = 0;
 
-                if (n_rxqs == 0) {
-                    rxqs = xmalloc(sizeof *rxqs);
-                } else {
-                    rxqs = xrealloc(rxqs, sizeof *rxqs * (n_rxqs + 1));
+                /* Sum the queue intervals and store the cycle history. */
+                for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
+                    cycle_hist += dp_netdev_rxq_get_intrvl_cycles(rxq, i);
                 }
+                dp_netdev_rxq_set_cycles(rxq, RXQ_CYCLES_PROC_HIST,
+                                         cycle_hist);
+            }
 
-                if (assign_cyc) {
-                    /* Sum the queue intervals and store the cycle history. */
-                    for (unsigned i = 0; i < PMD_RXQ_INTERVAL_MAX; i++) {
-                        cycle_hist += dp_netdev_rxq_get_intrvl_cycles(q, i);
-                    }
-                    dp_netdev_rxq_set_cycles(q, RXQ_CYCLES_PROC_HIST,
-                                             cycle_hist);
+            /* Check if this rxq is pinned. */
+            if (rxq->core_id != OVS_CORE_UNSPEC) {
+                struct sched_pmd *sched_pmd = NULL;
+                struct dp_netdev_pmd_thread *pmd;
+                struct sched_numa *numa;
+                uint64_t proc_cycles;
+                char rxq_cyc_log[MAX_RXQ_CYC_STRLEN];
+
+
+                /* This rxq should be pinned, pin it now. */
+                pmd = dp_netdev_get_pmd(dp, rxq->core_id);
+                sched_pmd = find_sched_pmd_by_pmd(numa_list, pmd);
+                if (!sched_pmd) {
+                    /* Cannot find the PMD.  Cannot pin this rxq. */
+                    VLOG(level == VLL_DBG ? VLL_DBG : VLL_WARN,
+                            "Core %2u cannot be pinned with "
+                            "port \'%s\' rx queue %d. Use pmd-cpu-mask to "
+                            "enable a pmd on core %u.",
+                            rxq->core_id,
+                            netdev_rxq_get_name(rxq->rx),
+                            netdev_rxq_get_queue_id(rxq->rx),
+                            rxq->core_id);
+                    continue;
+                }
+                /* Mark PMD as isolated if not done already. */
+                if (sched_pmd->isolated == false) {
+                    sched_pmd->isolated = true;
+                    numa = sched_numa_list_find_numa(numa_list,
+                                                     sched_pmd);
+                    numa->n_iso++;
                 }
-                /* Store the queue. */
-                rxqs[n_rxqs++] = q;
+                proc_cycles = dp_netdev_rxq_get_cycles(rxq,
+                                                       RXQ_CYCLES_PROC_HIST);
+                VLOG(level, "Core %2u on numa node %d is pinned with "
+                            "port \'%s\' rx queue %d.%s",
+                            sched_pmd->pmd->core_id, sched_pmd->pmd->numa_id,
+                            netdev_rxq_get_name(rxq->rx),
+                            netdev_rxq_get_queue_id(rxq->rx),
+                            get_rxq_cyc_log(rxq_cyc_log, algo, proc_cycles)
+                               ? rxq_cyc_log : "");
+                sched_add_rxq_to_sched_pmd(sched_pmd, rxq, proc_cycles);
             }
         }
     }
 
-    if (n_rxqs > 1 && assign_cyc) {
+    if (n_rxqs > 1 && algo) {
         /* Sort the queues in order of the processing cycles
          * they consumed during their last pmd interval. */
@@ -5109,54 +5420,100 @@  rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
     }
 
-    rr_numa_list_populate(dp, &rr);
-    /* Assign the sorted queues to pmds in round robin. */
-    for (int i = 0; i < n_rxqs; i++) {
-        numa_id = netdev_get_numa_id(rxqs[i]->port->netdev);
-        numa = rr_numa_list_lookup(&rr, numa_id);
-        if (!numa) {
-            /* There are no pmds on the queue's local NUMA node.
-               Round robin on the NUMA nodes that do have pmds. */
-            non_local_numa = rr_numa_list_next(&rr, non_local_numa);
-            if (!non_local_numa) {
-                VLOG_ERR("There is no available (non-isolated) pmd "
-                         "thread for port \'%s\' queue %d. This queue "
-                         "will not be polled. Is pmd-cpu-mask set to "
-                         "zero? Or are all PMDs isolated to other "
-                         "queues?", netdev_rxq_get_name(rxqs[i]->rx),
-                         netdev_rxq_get_queue_id(rxqs[i]->rx));
-                continue;
+    last_cross_numa = NULL;
+    n_numa = sched_numa_list_count(numa_list);
+    for (unsigned i = 0; i < n_rxqs; i++) {
+        struct dp_netdev_rxq *rxq = rxqs[i];
+        struct sched_pmd *sched_pmd;
+        struct sched_numa *numa;
+        int numa_id;
+        uint64_t proc_cycles;
+        char rxq_cyc_log[MAX_RXQ_CYC_STRLEN];
+
+        if (rxq->core_id != OVS_CORE_UNSPEC) {
+            continue;
+        }
+
+        if (start_logged == false && level != VLL_DBG) {
+            VLOG(level, "Performing pmd to rx queue assignment using %s "
+                        "algorithm.", get_assignment_type_string(algo));
+            start_logged = true;
+        }
+
+        /* Store the cycles for this rxq as we will log these later. */
+        proc_cycles = dp_netdev_rxq_get_cycles(rxq, RXQ_CYCLES_PROC_HIST);
+        /* Select the numa that should be used for this rxq. */
+        numa_id = netdev_get_numa_id(rxq->port->netdev);
+        numa = sched_numa_list_lookup(numa_list, numa_id);
+
+        /* Ensure that there is at least one non-isolated pmd on that numa. */
+        if (numa && !sched_get_numa_pmd_noniso(numa)) {
+            numa = NULL;
+        }
+
+        if (!numa || !sched_get_numa_pmd_noniso(numa)) {
+            /* Find any numa with available pmds. */
+            for (int k = 0; k < n_numa; k++) {
+                numa = sched_numa_list_next(numa_list, last_cross_numa);
+                if (sched_get_numa_pmd_noniso(numa)) {
+                    break;
+                }
+                last_cross_numa = numa;
+                numa = NULL;
             }
-            rxqs[i]->pmd = rr_numa_get_pmd(non_local_numa, assign_cyc);
-            VLOG_WARN("There's no available (non-isolated) pmd thread "
-                      "on numa node %d. Queue %d on port \'%s\' will "
-                      "be assigned to the pmd on core %d "
-                      "(numa node %d). Expect reduced performance.",
-                      numa_id, netdev_rxq_get_queue_id(rxqs[i]->rx),
-                      netdev_rxq_get_name(rxqs[i]->rx),
-                      rxqs[i]->pmd->core_id, rxqs[i]->pmd->numa_id);
-        } else {
-            rxqs[i]->pmd = rr_numa_get_pmd(numa, assign_cyc);
-            if (assign_cyc) {
-                VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
-                          "rx queue %d "
-                          "(measured processing cycles %"PRIu64").",
-                          rxqs[i]->pmd->core_id, numa_id,
-                          netdev_rxq_get_name(rxqs[i]->rx),
-                          netdev_rxq_get_queue_id(rxqs[i]->rx),
-                          dp_netdev_rxq_get_cycles(rxqs[i],
-                                                   RXQ_CYCLES_PROC_HIST));
-            } else {
-                VLOG_INFO("Core %d on numa node %d assigned port \'%s\' "
-                          "rx queue %d.", rxqs[i]->pmd->core_id, numa_id,
-                          netdev_rxq_get_name(rxqs[i]->rx),
-                          netdev_rxq_get_queue_id(rxqs[i]->rx));
+        }
+        if (numa && numa->numa_id != numa_id) {
+            VLOG(level, "There's no available (non-isolated) pmd thread "
+                        "on numa node %d. Port \'%s\' rx queue %d will "
+                        "be assigned to a pmd on numa node %d. "
+                        "This may lead to reduced performance.",
+                        numa_id, netdev_rxq_get_name(rxq->rx),
+                        netdev_rxq_get_queue_id(rxq->rx), numa->numa_id);
+        }
+
+        sched_pmd = NULL;
+        if (numa) {
+            /* Select the PMD that should be used for this rxq. */
+            sched_pmd = get_next_pmd(numa, algo);
+            if (sched_pmd) {
+                VLOG(level, "Core %2u on numa node %d assigned port \'%s\' "
+                            "rx queue %d.%s",
+                            sched_pmd->pmd->core_id, sched_pmd->pmd->numa_id,
+                            netdev_rxq_get_name(rxq->rx),
+                            netdev_rxq_get_queue_id(rxq->rx),
+                            get_rxq_cyc_log(rxq_cyc_log, algo, proc_cycles)
+                                ? rxq_cyc_log : "");
+                sched_add_rxq_to_sched_pmd(sched_pmd, rxq, proc_cycles);
             }
         }
+        if (!sched_pmd) {
+            VLOG(level == VLL_DBG ? level : VLL_WARN,
+                    "No non-isolated pmd on any numa available for "
+                    "port \'%s\' rx queue %d.%s "
+                    "This rx queue will not be polled.",
+                    netdev_rxq_get_name(rxq->rx),
+                    netdev_rxq_get_queue_id(rxq->rx),
+                    get_rxq_cyc_log(rxq_cyc_log, algo, proc_cycles)
+                        ? rxq_cyc_log :  "");
+        }
     }
-
-    rr_numa_list_destroy(&rr);
     free(rxqs);
 }
 
+static void
+rxq_scheduling(struct dp_netdev *dp) OVS_REQUIRES(dp->port_mutex)
+{
+    struct sched_numa_list *numa_list;
+    bool algo = dp->pmd_rxq_assign_cyc;
+
+    numa_list = xzalloc(sizeof *numa_list);
+
+    sched_numa_list_populate(numa_list, dp);
+    sched_numa_list_schedule(numa_list, dp, algo, VLL_INFO);
+    sched_numa_list_put_in_place(numa_list);
+
+    sched_numa_list_free_entries(numa_list);
+    free(numa_list);
+}
+
 static void
 reload_affected_pmds(struct dp_netdev *dp)
@@ -5406,10 +5763,5 @@  reconfigure_datapath(struct dp_netdev *dp)
         }
     }
-
-    /* Add pinned queues and mark pmd threads isolated. */
-    rxq_scheduling(dp, true);
-
-    /* Add non-pinned queues. */
-    rxq_scheduling(dp, false);
+    rxq_scheduling(dp);
 
     /* Step 5: Remove queues not compliant with new scheduling. */
diff --git a/tests/pmd.at b/tests/pmd.at
index cc5371d5a..78105bf45 100644
--- a/tests/pmd.at
+++ b/tests/pmd.at
@@ -580,5 +580,5 @@  p1 3 0 2
 ])
 
-OVS_VSWITCHD_STOP(["/dpif_netdev|WARN|There is no PMD thread on core/d"])
+OVS_VSWITCHD_STOP(["/cannot be pinned with port/d"])
 AT_CLEANUP