@@ -286,6 +286,7 @@ struct dp_netdev_rxq {
pinned. OVS_CORE_UNSPEC if the
queue doesn't need to be pinned to a
particular core. */
+ struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll this queue. */
};
/* A port in a netdev-based datapath. */
@@ -301,6 +302,7 @@ struct dp_netdev_port {
struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
char *rxq_affinity_list; /* Requested affinity of rx queues. */
+ bool need_reconfigure; /* True if we should reconfigure netdev. */
};
/* Contained by struct dp_netdev_flow's 'stats' member. */
@@ -503,7 +505,7 @@ struct dp_netdev_pmd_thread {
/* Queue id used by this pmd thread to send packets on all netdevs if
* XPS disabled for this netdev. All static_tx_qid's are unique and less
- * than 'ovs_numa_get_n_cores() + 1'. */
+ * than 'cmap_count(dp->poll_threads)'. */
const int static_tx_qid;
struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
@@ -532,6 +534,9 @@ struct dp_netdev_pmd_thread {
* reporting to the user */
unsigned long long stats_zero[DP_N_STATS];
uint64_t cycles_zero[PMD_N_CYCLES];
+
+ /* Set to true if the pmd thread needs to be reloaded. */
+ bool need_reload;
};
/* Interface to netdev-based datapath. */
@@ -576,29 +581,26 @@ static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
+static void *pmd_thread_main(void *);
static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
unsigned core_id);
static struct dp_netdev_pmd_thread *
dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
-static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
-static void dp_netdev_stop_pmds(struct dp_netdev *dp);
-static void dp_netdev_start_pmds(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex);
+static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
-static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port);
-static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port);
static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_port *port);
+ struct dp_netdev_port *port)
+ OVS_REQUIRES(pmd->port_mutex);
+static void dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx)
+ OVS_REQUIRES(pmd->port_mutex);
static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_rxq *rxq)
OVS_REQUIRES(pmd->port_mutex);
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex);
-static void reconfigure_pmd_threads(struct dp_netdev *dp)
+static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct rxq_poll *poll)
+ OVS_REQUIRES(pmd->port_mutex);
+static void reconfigure_datapath(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
@@ -1149,7 +1151,7 @@ dp_netdev_free(struct dp_netdev *dp)
do_del_port(dp, port);
}
ovs_mutex_unlock(&dp->port_mutex);
- dp_netdev_destroy_all_pmds(dp);
+ dp_netdev_destroy_all_pmds(dp, true);
cmap_destroy(&dp->poll_threads);
ovs_mutex_destroy(&dp->non_pmd_mutex);
@@ -1284,10 +1286,7 @@ port_create(const char *devname, const char *type,
struct dp_netdev_port *port;
enum netdev_flags flags;
struct netdev *netdev;
- int n_open_rxqs = 0;
- int n_cores = 0;
- int i, error;
- bool dynamic_txqs = false;
+ int error;
*portp = NULL;
@@ -1305,79 +1304,24 @@ port_create(const char *devname, const char *type,
goto out;
}
- if (netdev_is_pmd(netdev)) {
- n_cores = ovs_numa_get_n_cores();
-
- if (n_cores == OVS_CORE_UNSPEC) {
- VLOG_ERR("%s, cannot get cpu core info", devname);
- error = ENOENT;
- goto out;
- }
- /* There can only be ovs_numa_get_n_cores() pmd threads,
- * so creates a txq for each, and one extra for the non
- * pmd threads. */
- error = netdev_set_tx_multiq(netdev, n_cores + 1);
- if (error && (error != EOPNOTSUPP)) {
- VLOG_ERR("%s, cannot set multiq", devname);
- goto out;
- }
- }
-
- if (netdev_is_reconf_required(netdev)) {
- error = netdev_reconfigure(netdev);
- if (error) {
- goto out;
- }
- }
-
- if (netdev_is_pmd(netdev)) {
- if (netdev_n_txq(netdev) < n_cores + 1) {
- dynamic_txqs = true;
- }
+ error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
+ if (error) {
+ VLOG_ERR("%s: cannot set promisc flag", devname);
+ goto out;
}
port = xzalloc(sizeof *port);
port->port_no = port_no;
port->netdev = netdev;
- port->n_rxq = netdev_n_rxq(netdev);
- port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
- port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
port->type = xstrdup(type);
- ovs_mutex_init(&port->txq_used_mutex);
- port->dynamic_txqs = dynamic_txqs;
-
- for (i = 0; i < port->n_rxq; i++) {
- port->rxqs[i].port = port;
- error = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
- if (error) {
- VLOG_ERR("%s: cannot receive packets on this network device (%s)",
- devname, ovs_strerror(errno));
- goto out_rxq_close;
- }
- port->rxqs[i].core_id = OVS_CORE_UNSPEC;
- n_open_rxqs++;
- }
-
- error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
- if (error) {
- goto out_rxq_close;
- }
port->sf = sf;
+ port->need_reconfigure = true;
+ ovs_mutex_init(&port->txq_used_mutex);
*portp = port;
return 0;
-out_rxq_close:
- for (i = 0; i < n_open_rxqs; i++) {
- netdev_rxq_close(port->rxqs[i].rx);
- }
- ovs_mutex_destroy(&port->txq_used_mutex);
- free(port->type);
- free(port->txq_used);
- free(port->rxqs);
- free(port);
-
out:
netdev_close(netdev);
return error;
@@ -1401,15 +1345,11 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
return error;
}
- if (netdev_is_pmd(port->netdev)) {
- dp_netdev_start_pmds(dp);
- }
-
- dp_netdev_add_port_to_pmds(dp, port);
-
hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
seq_change(dp->port_seq);
+ reconfigure_datapath(dp);
+
return 0;
}
@@ -1537,13 +1477,6 @@ get_port_by_name(struct dp_netdev *dp,
return ENODEV;
}
-static int
-get_n_pmd_threads(struct dp_netdev *dp)
-{
- /* There is one non pmd thread in dp->poll_threads */
- return cmap_count(&dp->poll_threads) - 1;
-}
-
/* Returns 'true' if there is a port with pmd netdev. */
static bool
has_pmd_port(struct dp_netdev *dp)
@@ -1560,7 +1493,6 @@ has_pmd_port(struct dp_netdev *dp)
return false;
}
-
static void
do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
OVS_REQUIRES(dp->port_mutex)
@@ -1568,14 +1500,7 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
hmap_remove(&dp->ports, &port->node);
seq_change(dp->port_seq);
- dp_netdev_del_port_from_all_pmds(dp, port);
-
- if (netdev_is_pmd(port->netdev)) {
- /* If there is no pmd netdev, delete the pmd threads */
- if (!has_pmd_port(dp)) {
- dp_netdev_stop_pmds(dp);
- }
- }
+ reconfigure_datapath(dp);
port_destroy(port);
}
@@ -2977,15 +2902,27 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
}
}
+static struct tx_port *
+tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
+{
+ struct tx_port *tx;
+
+ HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
+ if (tx->port->port_no == port_no) {
+ return tx;
+ }
+ }
+
+ return NULL;
+}
+
static int
port_reconfigure(struct dp_netdev_port *port)
{
struct netdev *netdev = port->netdev;
int i, err;
- if (!netdev_is_reconf_required(netdev)) {
- return 0;
- }
+ port->need_reconfigure = false;
/* Closes the existing 'rxq's. */
for (i = 0; i < port->n_rxq; i++) {
@@ -2995,11 +2932,13 @@ port_reconfigure(struct dp_netdev_port *port)
port->n_rxq = 0;
/* Allows 'netdev' to apply the pending configuration changes. */
- err = netdev_reconfigure(netdev);
- if (err && (err != EOPNOTSUPP)) {
- VLOG_ERR("Failed to set interface %s new configuration",
- netdev_get_name(netdev));
- return err;
+ if (netdev_is_reconf_required(netdev)) {
+ err = netdev_reconfigure(netdev);
+ if (err && (err != EOPNOTSUPP)) {
+ VLOG_ERR("Failed to set interface %s new configuration",
+ netdev_get_name(netdev));
+ return err;
+ }
}
/* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
port->rxqs = xrealloc(port->rxqs,
@@ -3023,42 +2962,379 @@ port_reconfigure(struct dp_netdev_port *port)
return 0;
}
+struct rr_numa_list {
+ struct hmap numas; /* Contains 'struct rr_numa' */
+};
+
+struct rr_numa {
+ struct hmap_node node;
+
+ int numa_id;
+
+ /* Non isolated pmds on numa node 'numa_id' */
+ struct dp_netdev_pmd_thread **pmds;
+ int n_pmds;
+
+ int cur_index;
+};
+
+static struct rr_numa *
+rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
+{
+ struct rr_numa *numa;
+
+ HMAP_FOR_EACH_WITH_HASH (numa, node, hash_int(numa_id, 0), &rr->numas) {
+ if (numa->numa_id == numa_id) {
+ return numa;
+ }
+ }
+
+ return NULL;
+}
+
+static void
+rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct rr_numa *numa;
+
+ hmap_init(&rr->numas);
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
+ continue;
+ }
+
+ numa = rr_numa_list_lookup(rr, pmd->numa_id);
+ if (!numa) {
+ numa = xzalloc(sizeof *numa);
+ numa->numa_id = pmd->numa_id;
+ hmap_insert(&rr->numas, &numa->node, hash_int(pmd->numa_id, 0));
+ }
+ numa->n_pmds++;
+ numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
+ numa->pmds[numa->n_pmds - 1] = pmd;
+ }
+}
+
+static struct dp_netdev_pmd_thread *
+rr_numa_get_pmd(struct rr_numa *numa)
+{
+ return numa->pmds[numa->cur_index++ % numa->n_pmds];
+}
+
+static void
+rr_numa_list_destroy(struct rr_numa_list *rr)
+{
+ struct rr_numa *numa;
+
+ HMAP_FOR_EACH_POP (numa, node, &rr->numas) {
+ free(numa->pmds);
+ free(numa);
+ }
+ hmap_destroy(&rr->numas);
+}
+
+/* Assign pmds to queues. If 'pinned' is true, assign pmds to pinned
+ * queues and marks the pmds as isolated. Otherwise, assign non isolated
+ * pmds to unpinned queues.
+ *
+ * The function doesn't touch the pmd threads, it just stores the assignment
+ * in the 'pmd' member of each rxq. */
+static void
+rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_port *port;
+ struct rr_numa_list rr;
+
+ rr_numa_list_populate(dp, &rr);
+
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ struct rr_numa *numa;
+ int numa_id;
+
+ if (!netdev_is_pmd(port->netdev)) {
+ continue;
+ }
+
+ numa_id = netdev_get_numa_id(port->netdev);
+ numa = rr_numa_list_lookup(&rr, numa_id);
+
+ for (int qid = 0; qid < port->n_rxq; qid++) {
+ struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+ if (pinned && q->core_id != OVS_CORE_UNSPEC) {
+ struct dp_netdev_pmd_thread *pmd;
+
+ pmd = dp_netdev_get_pmd(dp, q->core_id);
+ if (!pmd) {
+ VLOG_WARN("There is no PMD thread on core %d. Queue "
+ "%d on port \'%s\' will not be polled.",
+ q->core_id, qid, netdev_get_name(port->netdev));
+ } else {
+ q->pmd = pmd;
+ pmd->isolated = true;
+ dp_netdev_pmd_unref(pmd);
+ }
+ } else if (!pinned && q->core_id == OVS_CORE_UNSPEC) {
+ if (!numa) {
+ VLOG_WARN("There's no available (non isolated) pmd thread "
+ "on numa node %d. Queue %d on port \'%s\' will "
+ "not be polled.",
+ numa_id, qid, netdev_get_name(port->netdev));
+ } else {
+ q->pmd = rr_numa_get_pmd(numa);
+ }
+ }
+ }
+ }
+
+ rr_numa_list_destroy(&rr);
+}
+
static void
reconfigure_pmd_threads(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex)
{
- struct dp_netdev_port *port, *next;
- int n_cores;
+ struct dp_netdev_pmd_thread *pmd;
+ struct ovs_numa_dump *pmd_cores;
+ bool changed = false;
+
+ /* The pmd threads should be started only if there's a pmd port in the
+ * datapath. If the user didn't provide any "pmd-cpu-mask", we start
+ * NR_PMD_THREADS per numa node. */
+ if (!has_pmd_port(dp)) {
+ pmd_cores = ovs_numa_dump_n_cores_per_numa(0);
+ } else if (dp->pmd_cmask && dp->pmd_cmask[0]) {
+ pmd_cores = ovs_numa_dump_cores_with_cmask(dp->pmd_cmask);
+ } else {
+ pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
+ }
+
+ /* Check for changed configuration */
+ if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
+ changed = true;
+ } else {
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->core_id != NON_PMD_CORE_ID
+ && !ovs_numa_dump_contains_core(pmd_cores,
+ pmd->numa_id,
+ pmd->core_id)) {
+ changed = true;
+ break;
+ }
+ }
+ }
+
+ /* Destroy the old and recreate the new pmd threads. We don't perform an
+ * incremental update because we would have to adjust 'static_tx_qid'. */
+ if (changed) {
+ struct ovs_numa_info_core *core;
+ struct ovs_numa_info_numa *numa;
+
+ /* Do not destroy the non pmd thread. */
+ dp_netdev_destroy_all_pmds(dp, false);
+ FOR_EACH_CORE_ON_DUMP (core, pmd_cores) {
+ struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+
+ dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
+
+ pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+ }
+
+ /* Log the number of pmd threads per numa node. */
+ FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
+ VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+ numa->n_cores, numa->numa_id);
+ }
+ }
+
+ ovs_numa_dump_destroy(pmd_cores);
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->need_reload) {
+ dp_netdev_reload_pmd__(pmd);
+ pmd->need_reload = false;
+ }
+ }
+}
+
+static void
+pmd_remove_stale_ports(struct dp_netdev *dp,
+ struct dp_netdev_pmd_thread *pmd)
+ OVS_EXCLUDED(pmd->port_mutex)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct rxq_poll *poll, *poll_next;
+ struct tx_port *tx, *tx_next;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
+ struct dp_netdev_port *port = poll->rxq->port;
+
+ if (port->need_reconfigure
+ || !hmap_contains(&dp->ports, &port->node)) {
+ dp_netdev_del_rxq_from_pmd(pmd, poll);
+ }
+ }
+ HMAP_FOR_EACH_SAFE (tx, tx_next, node, &pmd->tx_ports) {
+ struct dp_netdev_port *port = tx->port;
+
+ if (port->need_reconfigure
+ || !hmap_contains(&dp->ports, &port->node)) {
+ dp_netdev_del_port_tx_from_pmd(pmd, tx);
+ }
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+/* Must be called each time a port is added/removed or the cmask changes.
+ * This creates and destroys pmd threads, reconfigures ports, opens their
+ * rxqs and assigns all rxqs/txqs to pmd threads. */
+static void
+reconfigure_datapath(struct dp_netdev *dp)
+ OVS_REQUIRES(dp->port_mutex)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct dp_netdev_port *port;
+ int wanted_txqs;
dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
- dp_netdev_destroy_all_pmds(dp);
+ /* Step 1: Adjust the pmd threads based on the datapath ports, the cores
+ * on the system and the user configuration. */
+ reconfigure_pmd_threads(dp);
- /* Reconfigures the cpu mask. */
- ovs_numa_set_cpu_mask(dp->pmd_cmask);
+ wanted_txqs = cmap_count(&dp->poll_threads);
- n_cores = ovs_numa_get_n_cores();
- if (n_cores == OVS_CORE_UNSPEC) {
- VLOG_ERR("Cannot get cpu core info");
- return;
+ /* The number of pmd threads might have changed, or a port can be new:
+ * adjust the txqs. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ netdev_set_tx_multiq(port->netdev, wanted_txqs);
}
- HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
+ /* Step 2: Remove from the pmd threads ports that have been removed or
+ * need reconfiguration. */
+
+ /* Check for all the ports that need reconfiguration. We cache this in
+ * 'port->reconfigure', because netdev_is_reconf_required() can change at
+ * any time. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_reconf_required(port->netdev)) {
+ port->need_reconfigure = true;
+ }
+ }
+
+ /* Remove from the pmd threads all the ports that have been deleted or
+ * need reconfiguration. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ pmd_remove_stale_ports(dp, pmd);
+ }
+
+ /* Reload affected pmd threads. We must wait for the pmd threads before
+ * reconfiguring the ports, because a port cannot be reconfigured while
+ * it's being used. */
+ reload_affected_pmds(dp);
+
+ /* Step 3: Reconfigure ports. */
+
+ /* We only reconfigure the ports that we determined above, because they're
+ * not being used by any pmd thread at the moment. If a port fails to
+ * reconfigure we remove it from the datapath. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
int err;
+ if (!port->need_reconfigure) {
+ continue;
+ }
+
err = port_reconfigure(port);
if (err) {
hmap_remove(&dp->ports, &port->node);
seq_change(dp->port_seq);
port_destroy(port);
} else {
- port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
+ port->dynamic_txqs = netdev_n_txq(port->netdev) < wanted_txqs;
}
}
- /* Restores the non-pmd. */
- dp_netdev_set_nonpmd(dp);
- /* Restores all pmd threads. */
- dp_netdev_reset_pmd_threads(dp);
+
+ /* Step 4: Compute new rxq scheduling. We don't touch the pmd threads
+ * for now, we just update the 'pmd' pointer in each rxq to point to the
+ * wanted thread according to the scheduling policy. */
+
+ /* Reset all the pmd threads to non isolated. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ pmd->isolated = false;
+ }
+
+ /* Reset all the queues to unassigned */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ for (int i = 0; i < port->n_rxq; i++) {
+ port->rxqs[i].pmd = NULL;
+ }
+ }
+
+ /* Add pinned queues and mark pmd threads isolated. */
+ rxq_scheduling(dp, true);
+
+ /* Add non-pinned queues. */
+ rxq_scheduling(dp, false);
+
+ /* Step 5: Remove queues not compliant with new scheduling. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ struct rxq_poll *poll, *poll_next;
+
+ ovs_mutex_lock(&pmd->port_mutex);
+ HMAP_FOR_EACH_SAFE (poll, poll_next, node, &pmd->poll_list) {
+ if (poll->rxq->pmd != pmd) {
+ dp_netdev_del_rxq_from_pmd(pmd, poll);
+ }
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+ }
+
+ /* Reload affected pmd threads. We must wait for the pmd threads to remove
+ * the old queues before readding them, otherwise a queue can be polled by
+ * two threads at the same time. */
+ reload_affected_pmds(dp);
+
+ /* Step 6: Add queues from scheduling, if they're not there already. */
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ if (!netdev_is_pmd(port->netdev)) {
+ continue;
+ }
+
+ for (int qid = 0; qid < port->n_rxq; qid++) {
+ struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+ if (q->pmd) {
+ ovs_mutex_lock(&q->pmd->port_mutex);
+ dp_netdev_add_rxq_to_pmd(q->pmd, q);
+ ovs_mutex_unlock(&q->pmd->port_mutex);
+ }
+ }
+ }
+
+ /* Add every port to the tx cache of every pmd thread, if it's not
+ * there already and if this pmd has at least one rxq to poll. */
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ ovs_mutex_lock(&pmd->port_mutex);
+ if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
+ HMAP_FOR_EACH (port, node, &dp->ports) {
+ dp_netdev_add_port_tx_to_pmd(pmd, port);
+ }
+ }
+ ovs_mutex_unlock(&pmd->port_mutex);
+ }
+
+ /* Reload affected pmd threads. */
+ reload_affected_pmds(dp);
}
/* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -3107,7 +3383,7 @@ dpif_netdev_run(struct dpif *dpif)
}
if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
- reconfigure_pmd_threads(dp);
+ reconfigure_datapath(dp);
}
ovs_mutex_unlock(&dp->port_mutex);
@@ -3357,16 +3633,9 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex)
{
struct dp_netdev_pmd_thread *non_pmd;
- struct dp_netdev_port *port;
non_pmd = xzalloc(sizeof *non_pmd);
dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
-
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_tx_to_pmd(non_pmd, port);
- }
-
- dp_netdev_reload_pmd__(non_pmd);
}
/* Caller must have valid pointer to 'pmd'. */
@@ -3412,10 +3681,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd->dp = dp;
pmd->core_id = core_id;
pmd->numa_id = numa_id;
+ pmd->need_reload = false;
- *CONST_CAST(int *, &pmd->static_tx_qid) = (core_id == NON_PMD_CORE_ID)
- ? ovs_numa_get_n_cores()
- : get_n_pmd_threads(dp);
+ *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
@@ -3483,7 +3751,6 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
} else {
latch_set(&pmd->exit_latch);
dp_netdev_reload_pmd__(pmd);
- ovs_numa_unpin_core(pmd->core_id);
xpthread_join(pmd->thread, NULL);
}
@@ -3498,20 +3765,20 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
dp_netdev_pmd_unref(pmd);
}
-/* Destroys all pmd threads, but not the non pmd thread. */
+/* Destroys all pmd threads. If 'non_pmd' is true it also destroys the non pmd
+ * thread. */
static void
-dp_netdev_stop_pmds(struct dp_netdev *dp)
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd)
{
struct dp_netdev_pmd_thread *pmd;
struct dp_netdev_pmd_thread **pmd_list;
size_t k = 0, n_pmds;
- n_pmds = get_n_pmd_threads(dp);
+ n_pmds = cmap_count(&dp->poll_threads);
pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- /* We don't need to destroy the non pmd thread */
- if (pmd->core_id == NON_PMD_CORE_ID) {
+ if (!non_pmd && pmd->core_id == NON_PMD_CORE_ID) {
continue;
}
/* We cannot call dp_netdev_del_pmd(), since it alters
@@ -3527,32 +3794,6 @@ dp_netdev_stop_pmds(struct dp_netdev *dp)
free(pmd_list);
}
-/* Destroys all pmd threads, including the non pmd thread. */
-static void
-dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
-{
- struct dp_netdev_pmd_thread *pmd;
- struct dp_netdev_pmd_thread **pmd_list;
- size_t k = 0, n_pmds;
-
- n_pmds = cmap_count(&dp->poll_threads);
- pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- /* We cannot call dp_netdev_del_pmd(), since it alters
- * 'dp->poll_threads' (while we're iterating it) and it
- * might quiesce. */
- ovs_assert(k < n_pmds);
- pmd_list[k++] = pmd;
- }
-
- for (size_t i = 0; i < k; i++) {
- dp_netdev_del_pmd(dp, pmd_list[i]);
- }
-
- free(pmd_list);
-}
-
/* Deletes all rx queues from pmd->poll_list and all the ports from
* pmd->tx_ports. */
static void
@@ -3571,126 +3812,40 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
ovs_mutex_unlock(&pmd->port_mutex);
}
-static struct tx_port *
-tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
-{
- struct tx_port *tx;
-
- HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
- if (tx->port->port_no == port_no) {
- return tx;
- }
- }
-
- return NULL;
-}
-
-/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
- * 'tx_ports' of 'pmd' thread. Returns true if 'port' was found in 'pmd'
- * (therefore a restart is required). */
-static bool
-dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
- struct dp_netdev_pmd_thread *pmd)
-{
- struct rxq_poll *poll, *next;
- struct tx_port *tx;
- bool found = false;
-
- ovs_mutex_lock(&pmd->port_mutex);
- HMAP_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
- if (poll->rxq->port == port) {
- found = true;
- hmap_remove(&pmd->poll_list, &poll->node);
- free(poll);
- }
- }
-
- tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
- if (tx) {
- hmap_remove(&pmd->tx_ports, &tx->node);
- free(tx);
- found = true;
- }
- ovs_mutex_unlock(&pmd->port_mutex);
-
- return found;
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads. The pmd threads that need to be restarted are inserted in
- * 'to_reload'. */
+/* Adds rx queue to poll_list of PMD thread, if it's not there already. */
static void
-dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
- struct dp_netdev_port *port,
- struct hmapx *to_reload)
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_rxq *rxq)
+ OVS_REQUIRES(pmd->port_mutex)
{
- struct dp_netdev_pmd_thread *pmd;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- bool found;
-
- found = dp_netdev_del_port_from_pmd__(port, pmd);
+ int qid = netdev_rxq_get_queue_id(rxq->rx);
+ uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
+ struct rxq_poll *poll;
- if (found) {
- hmapx_add(to_reload, pmd);
+ HMAP_FOR_EACH_WITH_HASH (poll, node, hash, &pmd->poll_list) {
+ if (poll->rxq == rxq) {
+ /* 'rxq' is already polled by this thread. Do nothing. */
+ return;
}
}
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads. Reloads the threads if needed. */
-static void
-dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port)
-{
- struct dp_netdev_pmd_thread *pmd;
- struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
- struct hmapx_node *node;
-
- dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
-
- HMAPX_FOR_EACH (node, &to_reload) {
- pmd = (struct dp_netdev_pmd_thread *) node->data;
- dp_netdev_reload_pmd__(pmd);
- }
-
- hmapx_destroy(&to_reload);
-}
+ poll = xmalloc(sizeof *poll);
+ poll->rxq = rxq;
+ hmap_insert(&pmd->poll_list, &poll->node, hash);
-/* Returns non-isolated PMD thread from this numa node with fewer
- * rx queues to poll. Returns NULL if there is no non-isolated PMD threads
- * on this numa node. Can be called safely only by main thread. */
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
-{
- int min_cnt = -1;
- struct dp_netdev_pmd_thread *pmd, *res = NULL;
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- if (!pmd->isolated && pmd->numa_id == numa_id
- && (min_cnt > hmap_count(&pmd->poll_list) || res == NULL)) {
- min_cnt = hmap_count(&pmd->poll_list);
- res = pmd;
- }
- }
-
- return res;
+ pmd->need_reload = true;
}
-/* Adds rx queue to poll_list of PMD thread. */
+/* Delete 'poll' from poll_list of PMD thread. */
static void
-dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
- struct dp_netdev_rxq *rxq)
+dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct rxq_poll *poll)
OVS_REQUIRES(pmd->port_mutex)
{
- int qid = netdev_rxq_get_queue_id(rxq->rx);
- uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
- struct rxq_poll *poll;
+ hmap_remove(&pmd->poll_list, &poll->node);
+ free(poll);
- poll = xmalloc(sizeof *poll);
- poll->rxq = rxq;
- hmap_insert(&pmd->poll_list, &poll->node, hash);
+ pmd->need_reload = true;
}
/* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
@@ -3698,190 +3853,37 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
static void
dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_port *port)
+ OVS_REQUIRES(pmd->port_mutex)
{
struct tx_port *tx;
+ tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
+ if (tx) {
+ /* 'port' is already on this thread tx cache. Do nothing. */
+ return;
+ }
+
tx = xzalloc(sizeof *tx);
tx->port = port;
tx->qid = -1;
- ovs_mutex_lock(&pmd->port_mutex);
hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
- ovs_mutex_unlock(&pmd->port_mutex);
-}
-
-/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
- * threads in 'dp'. The pmd threads that need to be restarted are inserted
- * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
-static void
-dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
- struct dp_netdev_port *port,
- struct hmapx *to_reload, bool pinned)
-{
- int numa_id = netdev_get_numa_id(port->netdev);
- struct dp_netdev_pmd_thread *pmd;
- int i;
-
- if (!netdev_is_pmd(port->netdev)) {
- return;
- }
-
- for (i = 0; i < port->n_rxq; i++) {
- if (pinned) {
- if (port->rxqs[i].core_id == OVS_CORE_UNSPEC) {
- continue;
- }
- pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
- if (!pmd) {
- VLOG_WARN("There is no PMD thread on core %d. "
- "Queue %d on port \'%s\' will not be polled.",
- port->rxqs[i].core_id, i,
- netdev_get_name(port->netdev));
- continue;
- }
- pmd->isolated = true;
- dp_netdev_pmd_unref(pmd);
- } else {
- if (port->rxqs[i].core_id != OVS_CORE_UNSPEC) {
- continue;
- }
- pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
- if (!pmd) {
- VLOG_WARN("There's no available pmd thread on numa node %d",
- numa_id);
- break;
- }
- }
-
- ovs_mutex_lock(&pmd->port_mutex);
- dp_netdev_add_rxq_to_pmd(pmd, &port->rxqs[i]);
- ovs_mutex_unlock(&pmd->port_mutex);
-
- hmapx_add(to_reload, pmd);
- }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
- * that need to be restarted are inserted in 'to_reload'. */
-static void
-dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
- struct hmapx *to_reload)
-{
- struct dp_netdev_pmd_thread *pmd;
-
- dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
-
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- dp_netdev_add_port_tx_to_pmd(pmd, port);
- hmapx_add(to_reload, pmd);
- }
+ pmd->need_reload = true;
}
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
- * if needed. */
-static void
-dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
-{
- struct dp_netdev_pmd_thread *pmd;
- struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
- struct hmapx_node *node;
-
- dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
-
- HMAPX_FOR_EACH (node, &to_reload) {
- pmd = (struct dp_netdev_pmd_thread *) node->data;
- dp_netdev_reload_pmd__(pmd);
- }
-
- hmapx_destroy(&to_reload);
-}
-
-static void
-dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
-{
- int can_have, n_unpinned, i;
-
- n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
- if (!n_unpinned) {
- VLOG_WARN("Cannot create pmd threads due to out of unpinned "
- "cores on numa node %d", numa_id);
- return;
- }
-
- /* If cpu mask is specified, uses all unpinned cores, otherwise
- * tries creating NR_PMD_THREADS pmd threads. */
- can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
- for (i = 0; i < can_have; i++) {
- unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
- struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
- struct dp_netdev_port *port;
-
- dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
-
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_tx_to_pmd(pmd, port);
- }
-
- pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
- }
- VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
-}
-
-/* Starts pmd threads, if not already started. The function takes care of
- * filling the threads tx port cache. */
+/* Del 'tx' from the tx port cache of 'pmd', which must be reloaded for the
+ * changes to take effect. */
static void
-dp_netdev_start_pmds(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex)
+dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx)
+ OVS_REQUIRES(pmd->port_mutex)
{
- int n_pmds;
-
- n_pmds = get_n_pmd_threads(dp);
-
- /* If there are already pmd threads created for the datapath, do nothing.
- * Else, creates the pmd threads. */
- if (!n_pmds) {
- int n_numas = ovs_numa_get_n_numas();
-
- for (int numa_id = 0; numa_id < n_numas; numa_id++) {
- dp_netdev_start_pmds_on_numa(dp, numa_id);
- }
- }
+ hmap_remove(&pmd->tx_ports, &tx->node);
+ free(tx);
+ pmd->need_reload = true;
}
-
-/* Called after pmd threads config change. Restarts pmd threads with
- * new configuration. */
-static void
-dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
- OVS_REQUIRES(dp->port_mutex)
-{
- struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
- struct dp_netdev_pmd_thread *pmd;
- struct dp_netdev_port *port;
- struct hmapx_node *node;
-
- dp_netdev_start_pmds(dp);
- /* Distribute only pinned rx queues first to mark threads as isolated */
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
- }
-
- /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
- HMAP_FOR_EACH (port, node, &dp->ports) {
- dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
- }
-
- HMAPX_FOR_EACH (node, &to_reload) {
- pmd = (struct dp_netdev_pmd_thread *) node->data;
- dp_netdev_reload_pmd__(pmd);
- }
-
- hmapx_destroy(&to_reload);
-}
-
static char *
dpif_netdev_get_datapath_version(void)
{
@@ -4871,12 +4873,12 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
/* Remove port. */
hmap_remove(&dp->ports, &port->node);
- dp_netdev_del_port_from_all_pmds(dp, port);
+ reconfigure_datapath(dp);
/* Reinsert with new port number. */
port->port_no = port_no;
hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
- dp_netdev_add_port_to_pmds(dp, port);
+ reconfigure_datapath(dp);
seq_change(dp->port_seq);
unixctl_command_reply(conn, NULL);
@@ -614,8 +614,7 @@ p2 0 0 0
p2 1 0 0
])
-dnl During reconfiguration some packets will be dropped. This is expected
-OVS_VSWITCHD_STOP(["/dpif(monitor[[0-9]]\+)|WARN|dummy@ovs-dummy: execute [[0-9]]\+ failed/d"])
+OVS_VSWITCHD_STOP
AT_CLEANUP
AT_SETUP([PMD - dpctl])