diff mbox

[ovs-dev,v4] dpif-netdev: proper tx queue id

Message ID 1441194265-21074-1-git-send-email-i.maximets@samsung.com
State Changes Requested
Headers show

Commit Message

Ilya Maximets Sept. 2, 2015, 11:44 a.m. UTC
Currently tx_qid is equal to pmd->core_id. This leads to unexpected
behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
e.g. if core_ids are not sequential, or doesn't start from 0, or both.

Example:
	starting 2 pmd threads with 1 port, 2 rxqs per port,
	pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2

	It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
	txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
	queues).

	In that case, after truncating in netdev_dpdk_send__():
		'qid = qid % dev->real_n_txq;'
	pmd_1: qid = 2 % 2 = 0
	pmd_2: qid = 4 % 2 = 0

	So, both threads will call dpdk_queue_pkts() with same qid = 0.
	This is unexpected behavior if there is 2 tx queues in device.
	Queue #1 will not be used and both threads will lock queue #0
	on each send.

Fix that by introducing per pmd thread hash map 'tx_queues', where will
be stored all available tx queues for that pmd thread with
port_no as a key(hash). All tx_qid-s will be unique per port and
sequential to prevent described unexpected mapping after truncating.

Implemented infrastructure can be used in the future to choose
between all tx queues available for that pmd thread.

Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
---
version 4:
	* fixed distribution of tx queues if multiqueue is not supported

version 3:
	* fixed failing of unit tests by adding tx queues of non
	  pmd devices to non pmd thread. (they haven't been used by any thread)
	* pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
	* function names changed to dp_netdev_*
	* dp_netdev_pmd_lookup_txq now looks by port_no.
	* removed unnecessary dp_netdev_lookup_port in dp_execute_cb
	  for OVS_ACTION_ATTR_OUTPUT.
	* refactoring

 lib/dpif-netdev.c | 160 +++++++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 139 insertions(+), 21 deletions(-)

Comments

Ilya Maximets Sept. 10, 2015, 6:52 a.m. UTC | #1
Ping.

On 02.09.2015 14:44, Ilya Maximets wrote:
> Currently tx_qid is equal to pmd->core_id. This leads to unexpected
> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
> 
> Example:
> 	starting 2 pmd threads with 1 port, 2 rxqs per port,
> 	pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2
> 
> 	It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
> 	txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
> 	queues).
> 
> 	In that case, after truncating in netdev_dpdk_send__():
> 		'qid = qid % dev->real_n_txq;'
> 	pmd_1: qid = 2 % 2 = 0
> 	pmd_2: qid = 4 % 2 = 0
> 
> 	So, both threads will call dpdk_queue_pkts() with same qid = 0.
> 	This is unexpected behavior if there is 2 tx queues in device.
> 	Queue #1 will not be used and both threads will lock queue #0
> 	on each send.
> 
> Fix that by introducing per pmd thread hash map 'tx_queues', where will
> be stored all available tx queues for that pmd thread with
> port_no as a key(hash). All tx_qid-s will be unique per port and
> sequential to prevent described unexpected mapping after truncating.
> 
> Implemented infrastructure can be used in the future to choose
> between all tx queues available for that pmd thread.
> 
> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
> ---
> version 4:
> 	* fixed distribution of tx queues if multiqueue is not supported
> 
> version 3:
> 	* fixed failing of unit tests by adding tx queues of non
> 	  pmd devices to non pmd thread. (they haven't been used by any thread)
> 	* pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
> 	* function names changed to dp_netdev_*
> 	* dp_netdev_pmd_lookup_txq now looks by port_no.
> 	* removed unnecessary dp_netdev_lookup_port in dp_execute_cb
> 	  for OVS_ACTION_ATTR_OUTPUT.
> 	* refactoring
> 
>  lib/dpif-netdev.c | 160 +++++++++++++++++++++++++++++++++++++++++++++++-------
>  1 file changed, 139 insertions(+), 21 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index cf5b064..d430bc9 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -371,6 +371,13 @@ struct dp_netdev_pmd_cycles {
>      atomic_ullong n[PMD_N_CYCLES];
>  };
>  
> +struct dp_netdev_pmd_txq {
> +    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's */
> +                                  /* 'tx_queues'. */
> +    struct dp_netdev_port *port;
> +    int tx_qid;
> +};
> +
>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>   * the performance overhead of interrupt processing.  Therefore netdev can
>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
> @@ -426,8 +433,8 @@ struct dp_netdev_pmd_thread {
>                                      /* threads on same numa node. */
>      unsigned core_id;               /* CPU core id of this pmd thread. */
>      int numa_id;                    /* numa node id of this pmd thread. */
> -    int tx_qid;                     /* Queue id used by this pmd thread to
> -                                     * send packets on all netdevs */
> +    struct cmap tx_queues;          /* Queue ids used by this pmd thread to
> +                                     * send packets to ports */
>  
>      /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>       * The main thread keeps 'stats_zero' and 'cycles_zero' as base
> @@ -469,6 +476,15 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *,
>  
>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>  void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
> +static void dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd);
> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
> +                                  struct dp_netdev_port *port, int queue_id);
> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
> +                                  struct dp_netdev_pmd_txq *txq);
> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd);
> +static struct dp_netdev_pmd_txq *
> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
> +                         odp_port_t port_no);
>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>                                      struct dp_netdev *dp, int index,
>                                      unsigned core_id, int numa_id);
> @@ -1050,6 +1066,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>      struct netdev_saved_flags *sf;
>      struct dp_netdev_port *port;
>      struct netdev *netdev;
> +    struct dp_netdev_pmd_thread *non_pmd;
>      enum netdev_flags flags;
>      const char *open_type;
>      int error;
> @@ -1126,6 +1143,11 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>      ovs_refcount_init(&port->ref_cnt);
>      cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>  
> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
> +    if (non_pmd) {
> +        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
> +        dp_netdev_pmd_unref(non_pmd);
> +    }
>      if (netdev_is_pmd(netdev)) {
>          dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>          dp_netdev_reload_pmds(dp);
> @@ -1307,8 +1329,21 @@ static void
>  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>      OVS_REQUIRES(dp->port_mutex)
>  {
> +    struct dp_netdev_pmd_thread *non_pmd;
> +
>      cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
>      seq_change(dp->port_seq);
> +
> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
> +    if (non_pmd) {
> +        /* There is only one txq for each port for non pmd thread */
> +        struct dp_netdev_pmd_txq *txq;
> +        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
> +        if (OVS_LIKELY(txq))
> +            dp_netdev_pmd_del_txq(non_pmd, txq);
> +        dp_netdev_pmd_unref(non_pmd);
> +    }
> +
>      if (netdev_is_pmd(port->netdev)) {
>          int numa_id = netdev_get_numa_id(port->netdev);
>  
> @@ -2578,6 +2613,80 @@ dpif_netdev_wait(struct dpif *dpif)
>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>  }
>  
> +static void
> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
> +                      struct dp_netdev_port *port, int queue_id)
> +{
> +    if (port_try_ref(port)) {
> +        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
> +        txq->port = port;
> +        txq->tx_qid = queue_id;
> +        cmap_insert(&pmd->tx_queues, &txq->node,
> +                        hash_port_no(port->port_no));
> +    }
> +}
> +
> +/* Configures tx_queues for non pmd thread. */
> +static void
> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
> +{
> +    if (!cmap_is_empty(&pmd->tx_queues))
> +        dp_netdev_pmd_detach_tx_queues(pmd);
> +
> +    struct dp_netdev_port *port;
> +    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
> +        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
> +    }
> +}
> +
> +static void
> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
> +                      struct dp_netdev_pmd_txq *txq)
> +{
> +    cmap_remove(&pmd->tx_queues, &txq->node,
> +                hash_port_no(txq->port->port_no));
> +    port_unref(txq->port);
> +    free(txq);
> +}
> +
> +/* Removes all queues from 'tx_queues' of pmd thread. */
> +static void
> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
> +{
> +    struct dp_netdev_pmd_txq *txq;
> +
> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
> +        dp_netdev_pmd_del_txq(pmd, txq);
> +    }
> +}
> +
> +static void OVS_UNUSED
> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
> +{
> +    struct dp_netdev_pmd_txq *txq;
> +
> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
> +        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
> +                   pmd->core_id, netdev_get_name(txq->port->netdev),
> +                   txq->tx_qid);
> +    }
> +}
> +
> +static struct dp_netdev_pmd_txq *
> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
> +                         odp_port_t port_no)
> +{
> +    struct dp_netdev_pmd_txq *txq;
> +
> +    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
> +                             &pmd->tx_queues) {
> +        if (txq->port->port_no == port_no) {
> +            return txq;
> +        }
> +    }
> +    return NULL;
> +}
> +
>  struct rxq_poll {
>      struct dp_netdev_port *port;
>      struct netdev_rxq *rx;
> @@ -2589,16 +2698,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>  {
>      struct rxq_poll *poll_list = *ppoll_list;
>      struct dp_netdev_port *port;
> -    int n_pmds_on_numa, index, i;
> +    int n_pmds_on_numa, rx_index, tx_index, i, n_txq;
>  
>      /* Simple scheduler for netdev rx polling. */
> +    dp_netdev_pmd_detach_tx_queues(pmd);
> +
>      for (i = 0; i < poll_cnt; i++) {
>          port_unref(poll_list[i].port);
>      }
>  
>      poll_cnt = 0;
>      n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
> -    index = 0;
> +    rx_index = 0;
> +    tx_index = 0;
>  
>      CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>          /* Calls port_try_ref() to prevent the main thread
> @@ -2609,7 +2721,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>                  int i;
>  
>                  for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> -                    if ((index % n_pmds_on_numa) == pmd->index) {
> +                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
>                          poll_list = xrealloc(poll_list,
>                                          sizeof *poll_list * (poll_cnt + 1));
>  
> @@ -2618,7 +2730,16 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>                          poll_list[poll_cnt].rx = port->rxq[i];
>                          poll_cnt++;
>                      }
> -                    index++;
> +                    rx_index++;
> +                }
> +
> +                n_txq = netdev_n_txq(port->netdev);
> +                /* Last queue reserved for non pmd threads */
> +                n_txq = n_txq == 1 ? 1 : n_txq - 1;
> +                for (i = 0; i < n_txq; i++) {
> +                    if ((tx_index % n_pmds_on_numa) == pmd->index || n_txq == 1)
> +                        dp_netdev_pmd_add_txq(pmd, port, i);
> +                    tx_index++;
>                  }
>              }
>              /* Unrefs the port_try_ref(). */
> @@ -2689,6 +2810,8 @@ reload:
>          goto reload;
>      }
>  
> +    dp_netdev_pmd_detach_tx_queues(pmd);
> +
>      for (i = 0; i < poll_cnt; i++) {
>           port_unref(poll_list[i].port);
>      }
> @@ -2802,16 +2925,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
>      return next;
>  }
>  
> -static int
> -core_id_to_qid(unsigned core_id)
> -{
> -    if (core_id != NON_PMD_CORE_ID) {
> -        return core_id;
> -    } else {
> -        return ovs_numa_get_n_cores();
> -    }
> -}
> -
>  /* Configures the 'pmd' based on the input argument. */
>  static void
>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> @@ -2820,7 +2933,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      pmd->dp = dp;
>      pmd->index = index;
>      pmd->core_id = core_id;
> -    pmd->tx_qid = core_id_to_qid(core_id);
>      pmd->numa_id = numa_id;
>  
>      ovs_refcount_init(&pmd->ref_cnt);
> @@ -2831,9 +2943,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      ovs_mutex_init(&pmd->flow_mutex);
>      dpcls_init(&pmd->cls);
>      cmap_init(&pmd->flow_table);
> -    /* init the 'flow_cache' since there is no
> +    cmap_init(&pmd->tx_queues);
> +    /* init the 'flow_cache' and 'tx_queues' since there is no
>       * actual thread created for NON_PMD_CORE_ID. */
>      if (core_id == NON_PMD_CORE_ID) {
> +        dp_netdev_configure_non_pmd_txqs(pmd);
>          emc_cache_init(&pmd->flow_cache);
>      }
>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
> @@ -2846,6 +2960,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
>      dp_netdev_pmd_flow_flush(pmd);
>      dpcls_destroy(&pmd->cls);
>      cmap_destroy(&pmd->flow_table);
> +    cmap_destroy(&pmd->tx_queues);
>      ovs_mutex_destroy(&pmd->flow_mutex);
>      latch_destroy(&pmd->exit_latch);
>      xpthread_cond_destroy(&pmd->cond);
> @@ -2862,6 +2977,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>       * no actual thread uninit it for NON_PMD_CORE_ID. */
>      if (pmd->core_id == NON_PMD_CORE_ID) {
>          emc_cache_uninit(&pmd->flow_cache);
> +        dp_netdev_pmd_detach_tx_queues(pmd);
>      } else {
>          latch_set(&pmd->exit_latch);
>          dp_netdev_reload_pmd__(pmd);
> @@ -3471,13 +3587,15 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
>      struct dp_netdev *dp = pmd->dp;
>      int type = nl_attr_type(a);
>      struct dp_netdev_port *p;
> +    struct dp_netdev_pmd_txq *txq;
>      int i;
>  
>      switch ((enum ovs_action_attr)type) {
>      case OVS_ACTION_ATTR_OUTPUT:
> -        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
> -        if (OVS_LIKELY(p)) {
> -            netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal);
> +        txq = dp_netdev_pmd_lookup_txq(pmd, u32_to_odp(nl_attr_get_u32(a)));
> +        if (OVS_LIKELY(txq)) {
> +            netdev_send(txq->port->netdev, txq->tx_qid,
> +                        packets, cnt, may_steal);
>              return;
>          }
>          break;
>
Daniele Di Proietto Sept. 10, 2015, 6:03 p.m. UTC | #2
Sorry for the delay.

There's still one problem with this patch:

when a non-DPDK port is added to the datapath, its txqs are not
added to the pmd threads.

Can you confirm the issue?

Thanks

On 10/09/2015 07:52, "Ilya Maximets" <i.maximets@samsung.com> wrote:

>Ping.
>
>On 02.09.2015 14:44, Ilya Maximets wrote:
>> Currently tx_qid is equal to pmd->core_id. This leads to unexpected
>> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
>> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
>> 
>> Example:
>> 	starting 2 pmd threads with 1 port, 2 rxqs per port,
>> 	pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2
>> 
>> 	It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
>> 	txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
>> 	queues).
>> 
>> 	In that case, after truncating in netdev_dpdk_send__():
>> 		'qid = qid % dev->real_n_txq;'
>> 	pmd_1: qid = 2 % 2 = 0
>> 	pmd_2: qid = 4 % 2 = 0
>> 
>> 	So, both threads will call dpdk_queue_pkts() with same qid = 0.
>> 	This is unexpected behavior if there is 2 tx queues in device.
>> 	Queue #1 will not be used and both threads will lock queue #0
>> 	on each send.
>> 
>> Fix that by introducing per pmd thread hash map 'tx_queues', where will
>> be stored all available tx queues for that pmd thread with
>> port_no as a key(hash). All tx_qid-s will be unique per port and
>> sequential to prevent described unexpected mapping after truncating.
>> 
>> Implemented infrastructure can be used in the future to choose
>> between all tx queues available for that pmd thread.
>> 
>> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>> ---
>> version 4:
>> 	* fixed distribution of tx queues if multiqueue is not supported
>> 
>> version 3:
>> 	* fixed failing of unit tests by adding tx queues of non
>> 	  pmd devices to non pmd thread. (they haven't been used by any thread)
>> 	* pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
>> 	* function names changed to dp_netdev_*
>> 	* dp_netdev_pmd_lookup_txq now looks by port_no.
>> 	* removed unnecessary dp_netdev_lookup_port in dp_execute_cb
>> 	  for OVS_ACTION_ATTR_OUTPUT.
>> 	* refactoring
>> 
>>  lib/dpif-netdev.c | 160
>>+++++++++++++++++++++++++++++++++++++++++++++++-------
>>  1 file changed, 139 insertions(+), 21 deletions(-)
>> 
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>> index cf5b064..d430bc9 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -371,6 +371,13 @@ struct dp_netdev_pmd_cycles {
>>      atomic_ullong n[PMD_N_CYCLES];
>>  };
>>  
>> +struct dp_netdev_pmd_txq {
>> +    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's
>>*/
>> +                                  /* 'tx_queues'. */
>> +    struct dp_netdev_port *port;
>> +    int tx_qid;
>> +};
>> +
>>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to
>>eliminate
>>   * the performance overhead of interrupt processing.  Therefore netdev
>>can
>>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
>> @@ -426,8 +433,8 @@ struct dp_netdev_pmd_thread {
>>                                      /* threads on same numa node. */
>>      unsigned core_id;               /* CPU core id of this pmd thread.
>>*/
>>      int numa_id;                    /* numa node id of this pmd
>>thread. */
>> -    int tx_qid;                     /* Queue id used by this pmd
>>thread to
>> -                                     * send packets on all netdevs */
>> +    struct cmap tx_queues;          /* Queue ids used by this pmd
>>thread to
>> +                                     * send packets to ports */
>>  
>>      /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>>       * The main thread keeps 'stats_zero' and 'cycles_zero' as base
>> @@ -469,6 +476,15 @@ static void dp_netdev_input(struct
>>dp_netdev_pmd_thread *,
>>  
>>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>>  void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>> +static void dp_netdev_configure_non_pmd_txqs(struct
>>dp_netdev_pmd_thread *pmd);
>> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>> +                                  struct dp_netdev_port *port, int
>>queue_id);
>> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>> +                                  struct dp_netdev_pmd_txq *txq);
>> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread
>>*pmd);
>> +static struct dp_netdev_pmd_txq *
>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>> +                         odp_port_t port_no);
>>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>>                                      struct dp_netdev *dp, int index,
>>                                      unsigned core_id, int numa_id);
>> @@ -1050,6 +1066,7 @@ do_add_port(struct dp_netdev *dp, const char
>>*devname, const char *type,
>>      struct netdev_saved_flags *sf;
>>      struct dp_netdev_port *port;
>>      struct netdev *netdev;
>> +    struct dp_netdev_pmd_thread *non_pmd;
>>      enum netdev_flags flags;
>>      const char *open_type;
>>      int error;
>> @@ -1126,6 +1143,11 @@ do_add_port(struct dp_netdev *dp, const char
>>*devname, const char *type,
>>      ovs_refcount_init(&port->ref_cnt);
>>      cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>>  
>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>> +    if (non_pmd) {
>> +        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
>> +        dp_netdev_pmd_unref(non_pmd);
>> +    }
>>      if (netdev_is_pmd(netdev)) {
>>          dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>>          dp_netdev_reload_pmds(dp);
>> @@ -1307,8 +1329,21 @@ static void
>>  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>>      OVS_REQUIRES(dp->port_mutex)
>>  {
>> +    struct dp_netdev_pmd_thread *non_pmd;
>> +
>>      cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
>>      seq_change(dp->port_seq);
>> +
>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>> +    if (non_pmd) {
>> +        /* There is only one txq for each port for non pmd thread */
>> +        struct dp_netdev_pmd_txq *txq;
>> +        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
>> +        if (OVS_LIKELY(txq))
>> +            dp_netdev_pmd_del_txq(non_pmd, txq);
>> +        dp_netdev_pmd_unref(non_pmd);
>> +    }
>> +
>>      if (netdev_is_pmd(port->netdev)) {
>>          int numa_id = netdev_get_numa_id(port->netdev);
>>  
>> @@ -2578,6 +2613,80 @@ dpif_netdev_wait(struct dpif *dpif)
>>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>>  }
>>  
>> +static void
>> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>> +                      struct dp_netdev_port *port, int queue_id)
>> +{
>> +    if (port_try_ref(port)) {
>> +        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
>> +        txq->port = port;
>> +        txq->tx_qid = queue_id;
>> +        cmap_insert(&pmd->tx_queues, &txq->node,
>> +                        hash_port_no(port->port_no));
>> +    }
>> +}
>> +
>> +/* Configures tx_queues for non pmd thread. */
>> +static void
>> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
>> +{
>> +    if (!cmap_is_empty(&pmd->tx_queues))
>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>> +
>> +    struct dp_netdev_port *port;
>> +    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>> +        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
>> +    }
>> +}
>> +
>> +static void
>> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>> +                      struct dp_netdev_pmd_txq *txq)
>> +{
>> +    cmap_remove(&pmd->tx_queues, &txq->node,
>> +                hash_port_no(txq->port->port_no));
>> +    port_unref(txq->port);
>> +    free(txq);
>> +}
>> +
>> +/* Removes all queues from 'tx_queues' of pmd thread. */
>> +static void
>> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
>> +{
>> +    struct dp_netdev_pmd_txq *txq;
>> +
>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>> +        dp_netdev_pmd_del_txq(pmd, txq);
>> +    }
>> +}
>> +
>> +static void OVS_UNUSED
>> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
>> +{
>> +    struct dp_netdev_pmd_txq *txq;
>> +
>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>> +        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
>> +                   pmd->core_id, netdev_get_name(txq->port->netdev),
>> +                   txq->tx_qid);
>> +    }
>> +}
>> +
>> +static struct dp_netdev_pmd_txq *
>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>> +                         odp_port_t port_no)
>> +{
>> +    struct dp_netdev_pmd_txq *txq;
>> +
>> +    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
>> +                             &pmd->tx_queues) {
>> +        if (txq->port->port_no == port_no) {
>> +            return txq;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>>  struct rxq_poll {
>>      struct dp_netdev_port *port;
>>      struct netdev_rxq *rx;
>> @@ -2589,16 +2698,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread
>>*pmd,
>>  {
>>      struct rxq_poll *poll_list = *ppoll_list;
>>      struct dp_netdev_port *port;
>> -    int n_pmds_on_numa, index, i;
>> +    int n_pmds_on_numa, rx_index, tx_index, i, n_txq;
>>  
>>      /* Simple scheduler for netdev rx polling. */
>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>> +
>>      for (i = 0; i < poll_cnt; i++) {
>>          port_unref(poll_list[i].port);
>>      }
>>  
>>      poll_cnt = 0;
>>      n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>> -    index = 0;
>> +    rx_index = 0;
>> +    tx_index = 0;
>>  
>>      CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>>          /* Calls port_try_ref() to prevent the main thread
>> @@ -2609,7 +2721,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>                  int i;
>>  
>>                  for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>> -                    if ((index % n_pmds_on_numa) == pmd->index) {
>> +                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
>>                          poll_list = xrealloc(poll_list,
>>                                          sizeof *poll_list * (poll_cnt
>>+ 1));
>>  
>> @@ -2618,7 +2730,16 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>                          poll_list[poll_cnt].rx = port->rxq[i];
>>                          poll_cnt++;
>>                      }
>> -                    index++;
>> +                    rx_index++;
>> +                }
>> +
>> +                n_txq = netdev_n_txq(port->netdev);
>> +                /* Last queue reserved for non pmd threads */
>> +                n_txq = n_txq == 1 ? 1 : n_txq - 1;
>> +                for (i = 0; i < n_txq; i++) {
>> +                    if ((tx_index % n_pmds_on_numa) == pmd->index ||
>>n_txq == 1)
>> +                        dp_netdev_pmd_add_txq(pmd, port, i);
>> +                    tx_index++;
>>                  }
>>              }
>>              /* Unrefs the port_try_ref(). */
>> @@ -2689,6 +2810,8 @@ reload:
>>          goto reload;
>>      }
>>  
>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>> +
>>      for (i = 0; i < poll_cnt; i++) {
>>           port_unref(poll_list[i].port);
>>      }
>> @@ -2802,16 +2925,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp,
>>struct cmap_position *pos)
>>      return next;
>>  }
>>  
>> -static int
>> -core_id_to_qid(unsigned core_id)
>> -{
>> -    if (core_id != NON_PMD_CORE_ID) {
>> -        return core_id;
>> -    } else {
>> -        return ovs_numa_get_n_cores();
>> -    }
>> -}
>> -
>>  /* Configures the 'pmd' based on the input argument. */
>>  static void
>>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>>dp_netdev *dp,
>> @@ -2820,7 +2933,6 @@ dp_netdev_configure_pmd(struct
>>dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>      pmd->dp = dp;
>>      pmd->index = index;
>>      pmd->core_id = core_id;
>> -    pmd->tx_qid = core_id_to_qid(core_id);
>>      pmd->numa_id = numa_id;
>>  
>>      ovs_refcount_init(&pmd->ref_cnt);
>> @@ -2831,9 +2943,11 @@ dp_netdev_configure_pmd(struct
>>dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>      ovs_mutex_init(&pmd->flow_mutex);
>>      dpcls_init(&pmd->cls);
>>      cmap_init(&pmd->flow_table);
>> -    /* init the 'flow_cache' since there is no
>> +    cmap_init(&pmd->tx_queues);
>> +    /* init the 'flow_cache' and 'tx_queues' since there is no
>>       * actual thread created for NON_PMD_CORE_ID. */
>>      if (core_id == NON_PMD_CORE_ID) {
>> +        dp_netdev_configure_non_pmd_txqs(pmd);
>>          emc_cache_init(&pmd->flow_cache);
>>      }
>>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
>>&pmd->node),
>> @@ -2846,6 +2960,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>>*pmd)
>>      dp_netdev_pmd_flow_flush(pmd);
>>      dpcls_destroy(&pmd->cls);
>>      cmap_destroy(&pmd->flow_table);
>> +    cmap_destroy(&pmd->tx_queues);
>>      ovs_mutex_destroy(&pmd->flow_mutex);
>>      latch_destroy(&pmd->exit_latch);
>>      xpthread_cond_destroy(&pmd->cond);
>> @@ -2862,6 +2977,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct
>>dp_netdev_pmd_thread *pmd)
>>       * no actual thread uninit it for NON_PMD_CORE_ID. */
>>      if (pmd->core_id == NON_PMD_CORE_ID) {
>>          emc_cache_uninit(&pmd->flow_cache);
>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>>      } else {
>>          latch_set(&pmd->exit_latch);
>>          dp_netdev_reload_pmd__(pmd);
>> @@ -3471,13 +3587,15 @@ dp_execute_cb(void *aux_, struct dp_packet
>>**packets, int cnt,
>>      struct dp_netdev *dp = pmd->dp;
>>      int type = nl_attr_type(a);
>>      struct dp_netdev_port *p;
>> +    struct dp_netdev_pmd_txq *txq;
>>      int i;
>>  
>>      switch ((enum ovs_action_attr)type) {
>>      case OVS_ACTION_ATTR_OUTPUT:
>> -        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
>> -        if (OVS_LIKELY(p)) {
>> -            netdev_send(p->netdev, pmd->tx_qid, packets, cnt,
>>may_steal);
>> +        txq = dp_netdev_pmd_lookup_txq(pmd,
>>u32_to_odp(nl_attr_get_u32(a)));
>> +        if (OVS_LIKELY(txq)) {
>> +            netdev_send(txq->port->netdev, txq->tx_qid,
>> +                        packets, cnt, may_steal);
>>              return;
>>          }
>>          break;
>>
Ilya Maximets Sept. 11, 2015, 11:40 a.m. UTC | #3
Thanks. Fixed.
New version here: http://openvswitch.org/pipermail/dev/2015-September/059895.html

Best regards, Ilya Maximets.

On 10.09.2015 21:03, Daniele Di Proietto wrote:
> Sorry for the delay.
> 
> There's still one problem with this patch:
> 
> when a non-DPDK port is added to the datapath, its txqs are not
> added to the pmd threads.
> 
> Can you confirm the issue?
> 
> Thanks
> 
> On 10/09/2015 07:52, "Ilya Maximets" <i.maximets@samsung.com> wrote:
> 
>> Ping.
>>
>> On 02.09.2015 14:44, Ilya Maximets wrote:
>>> Currently tx_qid is equal to pmd->core_id. This leads to unexpected
>>> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
>>> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
>>>
>>> Example:
>>> 	starting 2 pmd threads with 1 port, 2 rxqs per port,
>>> 	pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2
>>>
>>> 	It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
>>> 	txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
>>> 	queues).
>>>
>>> 	In that case, after truncating in netdev_dpdk_send__():
>>> 		'qid = qid % dev->real_n_txq;'
>>> 	pmd_1: qid = 2 % 2 = 0
>>> 	pmd_2: qid = 4 % 2 = 0
>>>
>>> 	So, both threads will call dpdk_queue_pkts() with same qid = 0.
>>> 	This is unexpected behavior if there is 2 tx queues in device.
>>> 	Queue #1 will not be used and both threads will lock queue #0
>>> 	on each send.
>>>
>>> Fix that by introducing per pmd thread hash map 'tx_queues', where will
>>> be stored all available tx queues for that pmd thread with
>>> port_no as a key(hash). All tx_qid-s will be unique per port and
>>> sequential to prevent described unexpected mapping after truncating.
>>>
>>> Implemented infrastructure can be used in the future to choose
>>> between all tx queues available for that pmd thread.
>>>
>>> Signed-off-by: Ilya Maximets <i.maximets@samsung.com>
>>> ---
>>> version 4:
>>> 	* fixed distribution of tx queues if multiqueue is not supported
>>>
>>> version 3:
>>> 	* fixed failing of unit tests by adding tx queues of non
>>> 	  pmd devices to non pmd thread. (they haven't been used by any thread)
>>> 	* pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
>>> 	* function names changed to dp_netdev_*
>>> 	* dp_netdev_pmd_lookup_txq now looks by port_no.
>>> 	* removed unnecessary dp_netdev_lookup_port in dp_execute_cb
>>> 	  for OVS_ACTION_ATTR_OUTPUT.
>>> 	* refactoring
>>>
>>>  lib/dpif-netdev.c | 160
>>> +++++++++++++++++++++++++++++++++++++++++++++++-------
>>>  1 file changed, 139 insertions(+), 21 deletions(-)
>>>
>>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>>> index cf5b064..d430bc9 100644
>>> --- a/lib/dpif-netdev.c
>>> +++ b/lib/dpif-netdev.c
>>> @@ -371,6 +371,13 @@ struct dp_netdev_pmd_cycles {
>>>      atomic_ullong n[PMD_N_CYCLES];
>>>  };
>>>  
>>> +struct dp_netdev_pmd_txq {
>>> +    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's
>>> */
>>> +                                  /* 'tx_queues'. */
>>> +    struct dp_netdev_port *port;
>>> +    int tx_qid;
>>> +};
>>> +
>>>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to
>>> eliminate
>>>   * the performance overhead of interrupt processing.  Therefore netdev
>>> can
>>>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
>>> @@ -426,8 +433,8 @@ struct dp_netdev_pmd_thread {
>>>                                      /* threads on same numa node. */
>>>      unsigned core_id;               /* CPU core id of this pmd thread.
>>> */
>>>      int numa_id;                    /* numa node id of this pmd
>>> thread. */
>>> -    int tx_qid;                     /* Queue id used by this pmd
>>> thread to
>>> -                                     * send packets on all netdevs */
>>> +    struct cmap tx_queues;          /* Queue ids used by this pmd
>>> thread to
>>> +                                     * send packets to ports */
>>>  
>>>      /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>>>       * The main thread keeps 'stats_zero' and 'cycles_zero' as base
>>> @@ -469,6 +476,15 @@ static void dp_netdev_input(struct
>>> dp_netdev_pmd_thread *,
>>>  
>>>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>>>  void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>>> +static void dp_netdev_configure_non_pmd_txqs(struct
>>> dp_netdev_pmd_thread *pmd);
>>> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                                  struct dp_netdev_port *port, int
>>> queue_id);
>>> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                                  struct dp_netdev_pmd_txq *txq);
>>> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread
>>> *pmd);
>>> +static struct dp_netdev_pmd_txq *
>>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>>> +                         odp_port_t port_no);
>>>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>>>                                      struct dp_netdev *dp, int index,
>>>                                      unsigned core_id, int numa_id);
>>> @@ -1050,6 +1066,7 @@ do_add_port(struct dp_netdev *dp, const char
>>> *devname, const char *type,
>>>      struct netdev_saved_flags *sf;
>>>      struct dp_netdev_port *port;
>>>      struct netdev *netdev;
>>> +    struct dp_netdev_pmd_thread *non_pmd;
>>>      enum netdev_flags flags;
>>>      const char *open_type;
>>>      int error;
>>> @@ -1126,6 +1143,11 @@ do_add_port(struct dp_netdev *dp, const char
>>> *devname, const char *type,
>>>      ovs_refcount_init(&port->ref_cnt);
>>>      cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>>>  
>>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>>> +    if (non_pmd) {
>>> +        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
>>> +        dp_netdev_pmd_unref(non_pmd);
>>> +    }
>>>      if (netdev_is_pmd(netdev)) {
>>>          dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>>>          dp_netdev_reload_pmds(dp);
>>> @@ -1307,8 +1329,21 @@ static void
>>>  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>>>      OVS_REQUIRES(dp->port_mutex)
>>>  {
>>> +    struct dp_netdev_pmd_thread *non_pmd;
>>> +
>>>      cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
>>>      seq_change(dp->port_seq);
>>> +
>>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>>> +    if (non_pmd) {
>>> +        /* There is only one txq for each port for non pmd thread */
>>> +        struct dp_netdev_pmd_txq *txq;
>>> +        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
>>> +        if (OVS_LIKELY(txq))
>>> +            dp_netdev_pmd_del_txq(non_pmd, txq);
>>> +        dp_netdev_pmd_unref(non_pmd);
>>> +    }
>>> +
>>>      if (netdev_is_pmd(port->netdev)) {
>>>          int numa_id = netdev_get_numa_id(port->netdev);
>>>  
>>> @@ -2578,6 +2613,80 @@ dpif_netdev_wait(struct dpif *dpif)
>>>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>>>  }
>>>  
>>> +static void
>>> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                      struct dp_netdev_port *port, int queue_id)
>>> +{
>>> +    if (port_try_ref(port)) {
>>> +        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
>>> +        txq->port = port;
>>> +        txq->tx_qid = queue_id;
>>> +        cmap_insert(&pmd->tx_queues, &txq->node,
>>> +                        hash_port_no(port->port_no));
>>> +    }
>>> +}
>>> +
>>> +/* Configures tx_queues for non pmd thread. */
>>> +static void
>>> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
>>> +{
>>> +    if (!cmap_is_empty(&pmd->tx_queues))
>>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>>> +
>>> +    struct dp_netdev_port *port;
>>> +    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>>> +        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
>>> +    }
>>> +}
>>> +
>>> +static void
>>> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                      struct dp_netdev_pmd_txq *txq)
>>> +{
>>> +    cmap_remove(&pmd->tx_queues, &txq->node,
>>> +                hash_port_no(txq->port->port_no));
>>> +    port_unref(txq->port);
>>> +    free(txq);
>>> +}
>>> +
>>> +/* Removes all queues from 'tx_queues' of pmd thread. */
>>> +static void
>>> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
>>> +{
>>> +    struct dp_netdev_pmd_txq *txq;
>>> +
>>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>>> +        dp_netdev_pmd_del_txq(pmd, txq);
>>> +    }
>>> +}
>>> +
>>> +static void OVS_UNUSED
>>> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
>>> +{
>>> +    struct dp_netdev_pmd_txq *txq;
>>> +
>>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>>> +        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
>>> +                   pmd->core_id, netdev_get_name(txq->port->netdev),
>>> +                   txq->tx_qid);
>>> +    }
>>> +}
>>> +
>>> +static struct dp_netdev_pmd_txq *
>>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>>> +                         odp_port_t port_no)
>>> +{
>>> +    struct dp_netdev_pmd_txq *txq;
>>> +
>>> +    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
>>> +                             &pmd->tx_queues) {
>>> +        if (txq->port->port_no == port_no) {
>>> +            return txq;
>>> +        }
>>> +    }
>>> +    return NULL;
>>> +}
>>> +
>>>  struct rxq_poll {
>>>      struct dp_netdev_port *port;
>>>      struct netdev_rxq *rx;
>>> @@ -2589,16 +2698,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread
>>> *pmd,
>>>  {
>>>      struct rxq_poll *poll_list = *ppoll_list;
>>>      struct dp_netdev_port *port;
>>> -    int n_pmds_on_numa, index, i;
>>> +    int n_pmds_on_numa, rx_index, tx_index, i, n_txq;
>>>  
>>>      /* Simple scheduler for netdev rx polling. */
>>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>>> +
>>>      for (i = 0; i < poll_cnt; i++) {
>>>          port_unref(poll_list[i].port);
>>>      }
>>>  
>>>      poll_cnt = 0;
>>>      n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>>> -    index = 0;
>>> +    rx_index = 0;
>>> +    tx_index = 0;
>>>  
>>>      CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>>>          /* Calls port_try_ref() to prevent the main thread
>>> @@ -2609,7 +2721,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>>                  int i;
>>>  
>>>                  for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>>> -                    if ((index % n_pmds_on_numa) == pmd->index) {
>>> +                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
>>>                          poll_list = xrealloc(poll_list,
>>>                                          sizeof *poll_list * (poll_cnt
>>> + 1));
>>>  
>>> @@ -2618,7 +2730,16 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>>                          poll_list[poll_cnt].rx = port->rxq[i];
>>>                          poll_cnt++;
>>>                      }
>>> -                    index++;
>>> +                    rx_index++;
>>> +                }
>>> +
>>> +                n_txq = netdev_n_txq(port->netdev);
>>> +                /* Last queue reserved for non pmd threads */
>>> +                n_txq = n_txq == 1 ? 1 : n_txq - 1;
>>> +                for (i = 0; i < n_txq; i++) {
>>> +                    if ((tx_index % n_pmds_on_numa) == pmd->index ||
>>> n_txq == 1)
>>> +                        dp_netdev_pmd_add_txq(pmd, port, i);
>>> +                    tx_index++;
>>>                  }
>>>              }
>>>              /* Unrefs the port_try_ref(). */
>>> @@ -2689,6 +2810,8 @@ reload:
>>>          goto reload;
>>>      }
>>>  
>>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>>> +
>>>      for (i = 0; i < poll_cnt; i++) {
>>>           port_unref(poll_list[i].port);
>>>      }
>>> @@ -2802,16 +2925,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp,
>>> struct cmap_position *pos)
>>>      return next;
>>>  }
>>>  
>>> -static int
>>> -core_id_to_qid(unsigned core_id)
>>> -{
>>> -    if (core_id != NON_PMD_CORE_ID) {
>>> -        return core_id;
>>> -    } else {
>>> -        return ovs_numa_get_n_cores();
>>> -    }
>>> -}
>>> -
>>>  /* Configures the 'pmd' based on the input argument. */
>>>  static void
>>>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>>> dp_netdev *dp,
>>> @@ -2820,7 +2933,6 @@ dp_netdev_configure_pmd(struct
>>> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>>      pmd->dp = dp;
>>>      pmd->index = index;
>>>      pmd->core_id = core_id;
>>> -    pmd->tx_qid = core_id_to_qid(core_id);
>>>      pmd->numa_id = numa_id;
>>>  
>>>      ovs_refcount_init(&pmd->ref_cnt);
>>> @@ -2831,9 +2943,11 @@ dp_netdev_configure_pmd(struct
>>> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>>      ovs_mutex_init(&pmd->flow_mutex);
>>>      dpcls_init(&pmd->cls);
>>>      cmap_init(&pmd->flow_table);
>>> -    /* init the 'flow_cache' since there is no
>>> +    cmap_init(&pmd->tx_queues);
>>> +    /* init the 'flow_cache' and 'tx_queues' since there is no
>>>       * actual thread created for NON_PMD_CORE_ID. */
>>>      if (core_id == NON_PMD_CORE_ID) {
>>> +        dp_netdev_configure_non_pmd_txqs(pmd);
>>>          emc_cache_init(&pmd->flow_cache);
>>>      }
>>>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
>>> &pmd->node),
>>> @@ -2846,6 +2960,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>>> *pmd)
>>>      dp_netdev_pmd_flow_flush(pmd);
>>>      dpcls_destroy(&pmd->cls);
>>>      cmap_destroy(&pmd->flow_table);
>>> +    cmap_destroy(&pmd->tx_queues);
>>>      ovs_mutex_destroy(&pmd->flow_mutex);
>>>      latch_destroy(&pmd->exit_latch);
>>>      xpthread_cond_destroy(&pmd->cond);
>>> @@ -2862,6 +2977,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct
>>> dp_netdev_pmd_thread *pmd)
>>>       * no actual thread uninit it for NON_PMD_CORE_ID. */
>>>      if (pmd->core_id == NON_PMD_CORE_ID) {
>>>          emc_cache_uninit(&pmd->flow_cache);
>>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>>>      } else {
>>>          latch_set(&pmd->exit_latch);
>>>          dp_netdev_reload_pmd__(pmd);
>>> @@ -3471,13 +3587,15 @@ dp_execute_cb(void *aux_, struct dp_packet
>>> **packets, int cnt,
>>>      struct dp_netdev *dp = pmd->dp;
>>>      int type = nl_attr_type(a);
>>>      struct dp_netdev_port *p;
>>> +    struct dp_netdev_pmd_txq *txq;
>>>      int i;
>>>  
>>>      switch ((enum ovs_action_attr)type) {
>>>      case OVS_ACTION_ATTR_OUTPUT:
>>> -        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
>>> -        if (OVS_LIKELY(p)) {
>>> -            netdev_send(p->netdev, pmd->tx_qid, packets, cnt,
>>> may_steal);
>>> +        txq = dp_netdev_pmd_lookup_txq(pmd,
>>> u32_to_odp(nl_attr_get_u32(a)));
>>> +        if (OVS_LIKELY(txq)) {
>>> +            netdev_send(txq->port->netdev, txq->tx_qid,
>>> +                        packets, cnt, may_steal);
>>>              return;
>>>          }
>>>          break;
>>>
> 
>
diff mbox

Patch

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index cf5b064..d430bc9 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -371,6 +371,13 @@  struct dp_netdev_pmd_cycles {
     atomic_ullong n[PMD_N_CYCLES];
 };
 
+struct dp_netdev_pmd_txq {
+    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's */
+                                  /* 'tx_queues'. */
+    struct dp_netdev_port *port;
+    int tx_qid;
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
@@ -426,8 +433,8 @@  struct dp_netdev_pmd_thread {
                                     /* threads on same numa node. */
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
-    int tx_qid;                     /* Queue id used by this pmd thread to
-                                     * send packets on all netdevs */
+    struct cmap tx_queues;          /* Queue ids used by this pmd thread to
+                                     * send packets to ports */
 
     /* Only a pmd thread can write on its own 'cycles' and 'stats'.
      * The main thread keeps 'stats_zero' and 'cycles_zero' as base
@@ -469,6 +476,15 @@  static void dp_netdev_input(struct dp_netdev_pmd_thread *,
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
+                                  struct dp_netdev_port *port, int queue_id);
+static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
+                                  struct dp_netdev_pmd_txq *txq);
+static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd);
+static struct dp_netdev_pmd_txq *
+dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
+                         odp_port_t port_no);
 static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
                                     struct dp_netdev *dp, int index,
                                     unsigned core_id, int numa_id);
@@ -1050,6 +1066,7 @@  do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     struct netdev_saved_flags *sf;
     struct dp_netdev_port *port;
     struct netdev *netdev;
+    struct dp_netdev_pmd_thread *non_pmd;
     enum netdev_flags flags;
     const char *open_type;
     int error;
@@ -1126,6 +1143,11 @@  do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     ovs_refcount_init(&port->ref_cnt);
     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
 
+    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+    if (non_pmd) {
+        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
+        dp_netdev_pmd_unref(non_pmd);
+    }
     if (netdev_is_pmd(netdev)) {
         dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
         dp_netdev_reload_pmds(dp);
@@ -1307,8 +1329,21 @@  static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
 {
+    struct dp_netdev_pmd_thread *non_pmd;
+
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
+
+    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
+    if (non_pmd) {
+        /* There is only one txq for each port for non pmd thread */
+        struct dp_netdev_pmd_txq *txq;
+        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
+        if (OVS_LIKELY(txq))
+            dp_netdev_pmd_del_txq(non_pmd, txq);
+        dp_netdev_pmd_unref(non_pmd);
+    }
+
     if (netdev_is_pmd(port->netdev)) {
         int numa_id = netdev_get_numa_id(port->netdev);
 
@@ -2578,6 +2613,80 @@  dpif_netdev_wait(struct dpif *dpif)
     seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
 }
 
+static void
+dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_netdev_port *port, int queue_id)
+{
+    if (port_try_ref(port)) {
+        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
+        txq->port = port;
+        txq->tx_qid = queue_id;
+        cmap_insert(&pmd->tx_queues, &txq->node,
+                        hash_port_no(port->port_no));
+    }
+}
+
+/* Configures tx_queues for non pmd thread. */
+static void
+dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
+{
+    if (!cmap_is_empty(&pmd->tx_queues))
+        dp_netdev_pmd_detach_tx_queues(pmd);
+
+    struct dp_netdev_port *port;
+    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
+        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
+    }
+}
+
+static void
+dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_netdev_pmd_txq *txq)
+{
+    cmap_remove(&pmd->tx_queues, &txq->node,
+                hash_port_no(txq->port->port_no));
+    port_unref(txq->port);
+    free(txq);
+}
+
+/* Removes all queues from 'tx_queues' of pmd thread. */
+static void
+dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
+{
+    struct dp_netdev_pmd_txq *txq;
+
+    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
+        dp_netdev_pmd_del_txq(pmd, txq);
+    }
+}
+
+static void OVS_UNUSED
+dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
+{
+    struct dp_netdev_pmd_txq *txq;
+
+    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
+        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
+                   pmd->core_id, netdev_get_name(txq->port->netdev),
+                   txq->tx_qid);
+    }
+}
+
+static struct dp_netdev_pmd_txq *
+dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
+                         odp_port_t port_no)
+{
+    struct dp_netdev_pmd_txq *txq;
+
+    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
+                             &pmd->tx_queues) {
+        if (txq->port->port_no == port_no) {
+            return txq;
+        }
+    }
+    return NULL;
+}
+
 struct rxq_poll {
     struct dp_netdev_port *port;
     struct netdev_rxq *rx;
@@ -2589,16 +2698,19 @@  pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
 {
     struct rxq_poll *poll_list = *ppoll_list;
     struct dp_netdev_port *port;
-    int n_pmds_on_numa, index, i;
+    int n_pmds_on_numa, rx_index, tx_index, i, n_txq;
 
     /* Simple scheduler for netdev rx polling. */
+    dp_netdev_pmd_detach_tx_queues(pmd);
+
     for (i = 0; i < poll_cnt; i++) {
         port_unref(poll_list[i].port);
     }
 
     poll_cnt = 0;
     n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
-    index = 0;
+    rx_index = 0;
+    tx_index = 0;
 
     CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
         /* Calls port_try_ref() to prevent the main thread
@@ -2609,7 +2721,7 @@  pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                 int i;
 
                 for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                    if ((index % n_pmds_on_numa) == pmd->index) {
+                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
                         poll_list = xrealloc(poll_list,
                                         sizeof *poll_list * (poll_cnt + 1));
 
@@ -2618,7 +2730,16 @@  pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                         poll_list[poll_cnt].rx = port->rxq[i];
                         poll_cnt++;
                     }
-                    index++;
+                    rx_index++;
+                }
+
+                n_txq = netdev_n_txq(port->netdev);
+                /* Last queue reserved for non pmd threads */
+                n_txq = n_txq == 1 ? 1 : n_txq - 1;
+                for (i = 0; i < n_txq; i++) {
+                    if ((tx_index % n_pmds_on_numa) == pmd->index || n_txq == 1)
+                        dp_netdev_pmd_add_txq(pmd, port, i);
+                    tx_index++;
                 }
             }
             /* Unrefs the port_try_ref(). */
@@ -2689,6 +2810,8 @@  reload:
         goto reload;
     }
 
+    dp_netdev_pmd_detach_tx_queues(pmd);
+
     for (i = 0; i < poll_cnt; i++) {
          port_unref(poll_list[i].port);
     }
@@ -2802,16 +2925,6 @@  dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
     return next;
 }
 
-static int
-core_id_to_qid(unsigned core_id)
-{
-    if (core_id != NON_PMD_CORE_ID) {
-        return core_id;
-    } else {
-        return ovs_numa_get_n_cores();
-    }
-}
-
 /* Configures the 'pmd' based on the input argument. */
 static void
 dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
@@ -2820,7 +2933,6 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->dp = dp;
     pmd->index = index;
     pmd->core_id = core_id;
-    pmd->tx_qid = core_id_to_qid(core_id);
     pmd->numa_id = numa_id;
 
     ovs_refcount_init(&pmd->ref_cnt);
@@ -2831,9 +2943,11 @@  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     ovs_mutex_init(&pmd->flow_mutex);
     dpcls_init(&pmd->cls);
     cmap_init(&pmd->flow_table);
-    /* init the 'flow_cache' since there is no
+    cmap_init(&pmd->tx_queues);
+    /* init the 'flow_cache' and 'tx_queues' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
+        dp_netdev_configure_non_pmd_txqs(pmd);
         emc_cache_init(&pmd->flow_cache);
     }
     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
@@ -2846,6 +2960,7 @@  dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     dp_netdev_pmd_flow_flush(pmd);
     dpcls_destroy(&pmd->cls);
     cmap_destroy(&pmd->flow_table);
+    cmap_destroy(&pmd->tx_queues);
     ovs_mutex_destroy(&pmd->flow_mutex);
     latch_destroy(&pmd->exit_latch);
     xpthread_cond_destroy(&pmd->cond);
@@ -2862,6 +2977,7 @@  dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
      * no actual thread uninit it for NON_PMD_CORE_ID. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
         emc_cache_uninit(&pmd->flow_cache);
+        dp_netdev_pmd_detach_tx_queues(pmd);
     } else {
         latch_set(&pmd->exit_latch);
         dp_netdev_reload_pmd__(pmd);
@@ -3471,13 +3587,15 @@  dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
     struct dp_netdev *dp = pmd->dp;
     int type = nl_attr_type(a);
     struct dp_netdev_port *p;
+    struct dp_netdev_pmd_txq *txq;
     int i;
 
     switch ((enum ovs_action_attr)type) {
     case OVS_ACTION_ATTR_OUTPUT:
-        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
-        if (OVS_LIKELY(p)) {
-            netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal);
+        txq = dp_netdev_pmd_lookup_txq(pmd, u32_to_odp(nl_attr_get_u32(a)));
+        if (OVS_LIKELY(txq)) {
+            netdev_send(txq->port->netdev, txq->tx_qid,
+                        packets, cnt, may_steal);
             return;
         }
         break;