diff mbox series

[ovs-dev,v12,11/16] dpif-netdev/dpcls-avx512: Enable 16 block processing.

Message ID 20210517140434.59555-12-cian.ferriter@intel.com
State Changes Requested
Headers show
Series DPIF Framework + Optimizations | expand

Commit Message

Cian Ferriter May 17, 2021, 2:04 p.m. UTC
From: Harry van Haaren <harry.van.haaren@intel.com>

This commit implements larger subtable searches in avx512. A limitation
of the previous implementation was that up to 8 blocks of miniflow
data could be matched on (so a subtable with 8 blocks was handled
in avx, but 9 blocks or more would fall back to scalar/generic).
This limitation is removed in this patch, where up to 16 blocks
of subtable can be matched on.

From an implementation perspective, the key to enabling 16 blocks
over 8 blocks was to do bitmask calculation up front, and then use
the pre-calculated bitmasks for 2x passes of the "blocks gather"
routine. The bitmasks need to be shifted for k-mask usage in the
upper (8-15) block range, but it is relatively trivial. This also
helps in case expanding to 24 blocks is desired in future.

The implementation of the 2nd iteration to handle > 8 blocks is
behind a conditional branch which checks the total number of bits.
This helps the specialized versions of the function that have a
miniflow fingerprint of less-than-or-equal 8 blocks, as the code
can be statically stripped out of those functions. Specialized
functions that do require more than 8 blocks will have the branch
removed and unconditionally execute the 2nd blocks gather routine.

Lastly, the _any() flavour will have the conditional branch, and
the branch predictor may mispredict a bit, but per burst will
likely get most packets correct (particularly towards the middle
and end of a burst).

The code has been run with unit tests under autovalidation and
passes all cases, and unit test coverage has been checked to
ensure the 16 block code paths are executing.

Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>

---

v9: Fixup post 2.15 rebase on NEWS
v8: Add NEWS entry
---
 NEWS                                   |   1 +
 lib/dpif-netdev-lookup-avx512-gather.c | 203 ++++++++++++++++++-------
 2 files changed, 147 insertions(+), 57 deletions(-)

Comments

Stokes, Ian June 9, 2021, 12:03 p.m. UTC | #1
> This commit implements larger subtable searches in avx512. A limitation
> of the previous implementation was that up to 8 blocks of miniflow
> data could be matched on (so a subtable with 8 blocks was handled
> in avx, but 9 blocks or more would fall back to scalar/generic).
> This limitation is removed in this patch, where up to 16 blocks
> of subtable can be matched on.
> 
> From an implementation perspective, the key to enabling 16 blocks
> over 8 blocks was to do bitmask calculation up front, and then use
> the pre-calculated bitmasks for 2x passes of the "blocks gather"
> routine. The bitmasks need to be shifted for k-mask usage in the
> upper (8-15) block range, but it is relatively trivial. This also
> helps in case expanding to 24 blocks is desired in future.
> 
> The implementation of the 2nd iteration to handle > 8 blocks is
> behind a conditional branch which checks the total number of bits.
> This helps the specialized versions of the function that have a
> miniflow fingerprint of less-than-or-equal 8 blocks, as the code
> can be statically stripped out of those functions. Specialized
> functions that do require more than 8 blocks will have the branch
> removed and unconditionally execute the 2nd blocks gather routine.
> 
> Lastly, the _any() flavour will have the conditional branch, and
> the branch predictor may mispredict a bit, but per burst will
> likely get most packets correct (particularly towards the middle
> and end of a burst).
> 
> The code has been run with unit tests under autovalidation and
> passes all cases, and unit test coverage has been checked to
> ensure the 16 block code paths are executing.
> 

Hi Harry/Cian, thanks for the patch.

I looked through it the past few days. There is a lot  here in terms of masking/shifting and taking advantage of the intrinsic to accomplish that.
The logic seemed OK to my mind but I didn't run something like GDB to check line by line.

I think key to this is what you've called out above that although this is the specific AVX512 implementation, the unit tests match with what is expected from the scalar implementation e.g.

Although the implementation differs the end results are the same. The performance of the two methods will differ for sure but that's to be expected.

With that in mind and given that this has been tested for months with different traffic patterns I have confidence with this patch.


A few more comments inline below.

> Signed-off-by: Harry van Haaren <harry.van.haaren@intel.com>
> 
> ---
> 
> v9: Fixup post 2.15 rebase on NEWS
> v8: Add NEWS entry
> ---
>  NEWS                                   |   1 +
>  lib/dpif-netdev-lookup-avx512-gather.c | 203 ++++++++++++++++++-------
>  2 files changed, 147 insertions(+), 57 deletions(-)
> 
> diff --git a/NEWS b/NEWS
> index 5e847a95e..6a07cd362 100644
> --- a/NEWS
> +++ b/NEWS
> @@ -11,6 +11,7 @@ Post-v2.15.0
>       * Add avx512 implementation of dpif which can process non recirculated
>         packets. It supports partial HWOL, EMC, SMC and DPCLS lookups.
>       * Add commands to get and set the dpif implementations.
> +     * Enable AVX512 optimized DPCLS to search subtables with larger
> miniflows.
>     - ovs-ctl:
>       * New option '--no-record-hostname' to disable hostname configuration
>         in ovsdb on startup.
> diff --git a/lib/dpif-netdev-lookup-avx512-gather.c b/lib/dpif-netdev-lookup-
> avx512-gather.c
> index 8fc1cdfa5..1bfabdcb1 100644
> --- a/lib/dpif-netdev-lookup-avx512-gather.c
> +++ b/lib/dpif-netdev-lookup-avx512-gather.c
> @@ -34,7 +34,21 @@
>   * AVX512 code at a time.
>   */
>  #define NUM_U64_IN_ZMM_REG (8)
> -#define BLOCKS_CACHE_SIZE (NETDEV_MAX_BURST *
> NUM_U64_IN_ZMM_REG)
> +
> +/* This implementation of AVX512 gather allows up to 16 blocks of MF data
> to be
> + * present in the blocks_cache, hence the multiply by 2 in the blocks count.
> + */
> +#define MF_BLOCKS_PER_PACKET (NUM_U64_IN_ZMM_REG * 2)
> +
> +/* Blocks cache size is the maximum number of miniflow blocks that this
> + * implementation of lookup can handle.
> + */
> +#define BLOCKS_CACHE_SIZE (NETDEV_MAX_BURST *
> MF_BLOCKS_PER_PACKET)
> +
> +/* The gather instruction can handle a scale for the size of the items to
> + * gather. For uint64_t data, this scale is 8.
> + */
> +#define GATHER_SCALE_8 (8)
> 
> 
>  VLOG_DEFINE_THIS_MODULE(dpif_lookup_avx512_gather);
> @@ -69,22 +83,83 @@ netdev_rule_matches_key(const struct dpcls_rule
> *rule,
>  {
>      const uint64_t *keyp = miniflow_get_values(&rule->flow.mf);
>      const uint64_t *maskp = miniflow_get_values(&rule->mask->mf);
> -    const uint32_t lane_mask = (1 << mf_bits_total) - 1;
> +    const uint32_t lane_mask = (1ULL << mf_bits_total) - 1;
> 
>      /* Always load a full cache line from blocks_cache. Other loads must be
>       * trimmed to the amount of data required for mf_bits_total blocks.
>       */
> -    __m512i v_blocks = _mm512_loadu_si512(&block_cache[0]);
> -    __m512i v_mask   = _mm512_maskz_loadu_epi64(lane_mask,
> &maskp[0]);
> -    __m512i v_key    = _mm512_maskz_loadu_epi64(lane_mask, &keyp[0]);
> +    uint32_t res_mask;
> +
> +    {

Just curious as to the need of the brace above and the brace below on this code block?
Is this a specific requirement for operations with _mm512 calls?

> +        __m512i v_blocks = _mm512_loadu_si512(&block_cache[0]);
> +        __m512i v_mask   = _mm512_maskz_loadu_epi64(lane_mask,
> &maskp[0]);
> +        __m512i v_key    = _mm512_maskz_loadu_epi64(lane_mask,
> &keyp[0]);
> +        __m512i v_data = _mm512_and_si512(v_blocks, v_mask);
> +        res_mask = _mm512_mask_cmpeq_epi64_mask(lane_mask, v_data,
> v_key);
> +    }
> 
> -    __m512i v_data = _mm512_and_si512(v_blocks, v_mask);
> -    uint32_t res_mask = _mm512_mask_cmpeq_epi64_mask(lane_mask,
> v_data, v_key);
> +    if (mf_bits_total > 8) {
> +        uint32_t lane_mask_gt8 = lane_mask >> 8;
> +        __m512i v_blocks = _mm512_loadu_si512(&block_cache[8]);
> +        __m512i v_mask   = _mm512_maskz_loadu_epi64(lane_mask_gt8,
> &maskp[8]);
> +        __m512i v_key    = _mm512_maskz_loadu_epi64(lane_mask_gt8,
> &keyp[8]);
> +        __m512i v_data = _mm512_and_si512(v_blocks, v_mask);
> +        uint32_t c = _mm512_mask_cmpeq_epi64_mask(lane_mask_gt8,
> v_data,
> +                                                  v_key);
> +        res_mask |= (c << 8);
> +    }
> 
> -    /* returns 1 assuming result of SIMD compare is all blocks. */
> +    /* returns 1 assuming result of SIMD compare is all blocks matching. */
>      return res_mask == lane_mask;
>  }
> 
> +/* Takes u0 and u1 inputs, and gathers the next 8 blocks to be stored
> + * contigously into the blocks cache. Note that the pointers and bitmasks
> + * passed into this function must be incremented for handling next 8 blocks.
> + */
> +static inline ALWAYS_INLINE __m512i
> +avx512_blocks_gather(__m512i v_u0, /* reg of u64 of all u0 bits */
> +                     __m512i v_u1, /* reg of u64 of all u1 bits */
> +                     const uint64_t *pkt_blocks, /* ptr pkt blocks to load */
> +                     const void *tbl_blocks,     /* ptr to blocks in tbl */
> +                     const void *tbl_mf_masks,   /* ptr to subtable mf masks */
> +                     __mmask64 u1_bcast_msk,      /* mask of u1 lanes */
> +                     const uint64_t pkt_mf_u0_pop, /* num bits in u0 of pkt */
> +                     __mmask64 zero_mask, /* maskz if pkt not have mf bit */
Typo in comment above, maskz -> masks.

> +                     __mmask64 u64_lanes_mask) /* total lane count to use */

Not sure on the format of the comment above, would it be better to explain each parameter in the comment preceding the function?
As regards OVS standards I have not seen an example where arguments and comments are mixed like this.

> +{
> +        /* Suggest to compiler to load tbl blocks ahead of gather(). */
> +        __m512i v_tbl_blocks =
> _mm512_maskz_loadu_epi64(u64_lanes_mask,
> +                                                        tbl_blocks);
> +
> +        /* Blend u0 and u1 bits together for these 8 blocks. */
> +        __m512i v_pkt_bits = _mm512_mask_blend_epi64(u1_bcast_msk,
> v_u0, v_u1);
> +
> +        /* Load pre-created tbl miniflow bitmasks, bitwise AND with them. */
> +        __m512i v_tbl_masks =
> _mm512_maskz_loadu_epi64(u64_lanes_mask,
> +                                                      tbl_mf_masks);
> +        __m512i v_masks = _mm512_and_si512(v_pkt_bits, v_tbl_masks);
> +
> +        /* Manual AVX512 popcount for u64 lanes. */
> +        __m512i v_popcnts = _mm512_popcnt_epi64_manual(v_masks);

Can I ask what's specific about the manual count? I could find intrinsic guides for the other functions above but not the manual count here? IS there a performance hit associated with a manual count?

> +
> +        /* Add popcounts and offset for u1 bits. */
> +        __m512i v_idx_u0_offset =
> _mm512_maskz_set1_epi64(u1_bcast_msk,
> +                                                          pkt_mf_u0_pop);

Above, is the assumption that the popcount for u0 will be the same as u1?
Is there a case where they would not be?

> +        __m512i v_indexes = _mm512_add_epi64(v_popcnts,
> v_idx_u0_offset);
> +
> +        /* Gather u64 blocks from packet miniflow. */
> +        __m512i v_zeros = _mm512_setzero_si512();
> +        __m512i v_blocks = _mm512_mask_i64gather_epi64(v_zeros,
> u64_lanes_mask,
> +                                                       v_indexes, pkt_blocks,
> +                                                       GATHER_SCALE_8);
> +
> +        /* Mask pkt blocks with subtable blocks, k-mask to zero lanes. */
> +        __m512i v_masked_blocks = _mm512_maskz_and_epi64(zero_mask,
> v_blocks,
> +                                                         v_tbl_blocks);
> +        return v_masked_blocks;
> +}
> +
>  static inline uint32_t ALWAYS_INLINE
>  avx512_lookup_impl(struct dpcls_subtable *subtable,
>                     uint32_t keys_map,
> @@ -94,76 +169,86 @@ avx512_lookup_impl(struct dpcls_subtable
> *subtable,
>                     const uint32_t bit_count_u1)
>  {
>      OVS_ALIGNED_VAR(CACHE_LINE_SIZE)uint64_t
> block_cache[BLOCKS_CACHE_SIZE];
> -
> -    const uint32_t bit_count_total = bit_count_u0 + bit_count_u1;
> -    int i;
>      uint32_t hashes[NETDEV_MAX_BURST];
> +
White space added here, should have probably been done in previous patch rather than fixed here.


BR
Ian
>      const uint32_t n_pkts = __builtin_popcountll(keys_map);
>      ovs_assert(NETDEV_MAX_BURST >= n_pkts);
> 
> +    const uint32_t bit_count_total = bit_count_u0 + bit_count_u1;
> +    const uint64_t bit_count_total_mask = (1ULL << bit_count_total) - 1;
> +
>      const uint64_t tbl_u0 = subtable->mask.mf.map.bits[0];
>      const uint64_t tbl_u1 = subtable->mask.mf.map.bits[1];
> 
> -    /* Load subtable blocks for masking later. */
>      const uint64_t *tbl_blocks = miniflow_get_values(&subtable->mask.mf);
> -    const __m512i v_tbl_blocks = _mm512_loadu_si512(&tbl_blocks[0]);
> -
> -    /* Load pre-created subtable masks for each block in subtable. */
> -    const __mmask8 bit_count_total_mask = (1 << bit_count_total) - 1;
> -    const __m512i v_mf_masks =
> _mm512_maskz_loadu_epi64(bit_count_total_mask,
> -                                                        subtable->mf_masks);
> +    const uint64_t *tbl_mf_masks = subtable->mf_masks;
> 
> +    int i;
>      ULLONG_FOR_EACH_1 (i, keys_map) {
> +        /* Create mask register with packet-specific u0 offset.
> +         * Note that as 16 blocks can be handled in total, the width of the
> +         * mask register must be >=16.
> +         */
>          const uint64_t pkt_mf_u0_bits = keys[i]->mf.map.bits[0];
>          const uint64_t pkt_mf_u0_pop = __builtin_popcountll(pkt_mf_u0_bits);
> -
> -        /* Pre-create register with *PER PACKET* u0 offset. */
> -        const __mmask8 u1_bcast_mask = (UINT8_MAX << bit_count_u0);
> -        const __m512i v_idx_u0_offset =
> _mm512_maskz_set1_epi64(u1_bcast_mask,
> -                                                                pkt_mf_u0_pop);
> +        const __mmask64 u1_bcast_mask = (UINT64_MAX << bit_count_u0);
> 
>          /* Broadcast u0, u1 bitmasks to 8x u64 lanes. */
> -        __m512i v_u0 = _mm512_set1_epi64(pkt_mf_u0_bits);
> -        __m512i v_pkt_bits = _mm512_mask_set1_epi64(v_u0,
> u1_bcast_mask,
> -                                         keys[i]->mf.map.bits[1]);
> -
> -        /* Bitmask by pre-created masks. */
> -        __m512i v_masks = _mm512_and_si512(v_pkt_bits, v_mf_masks);
> -
> -        /* Manual AVX512 popcount for u64 lanes. */
> -        __m512i v_popcnts = _mm512_popcnt_epi64_manual(v_masks);
> -
> -        /* Offset popcounts for u1 with pre-created offset register. */
> -        __m512i v_indexes = _mm512_add_epi64(v_popcnts,
> v_idx_u0_offset);
> -
> -        /* Gather u64 blocks from packet miniflow. */
> -        const __m512i v_zeros = _mm512_setzero_si512();
> -        const void *pkt_data = miniflow_get_values(&keys[i]->mf);
> -        __m512i v_all_blocks = _mm512_mask_i64gather_epi64(v_zeros,
> -                                   bit_count_total_mask, v_indexes,
> -                                   pkt_data, 8);
> +        __m512i v_u0 = _mm512_set1_epi64(keys[i]->mf.map.bits[0]);
> +        __m512i v_u1 = _mm512_set1_epi64(keys[i]->mf.map.bits[1]);
> 
>          /* Zero out bits that pkt doesn't have:
>           * - 2x pext() to extract bits from packet miniflow as needed by TBL
>           * - Shift u1 over by bit_count of u0, OR to create zero bitmask
>           */
> -         uint64_t u0_to_zero = _pext_u64(keys[i]->mf.map.bits[0], tbl_u0);
> -         uint64_t u1_to_zero = _pext_u64(keys[i]->mf.map.bits[1], tbl_u1);
> -         uint64_t zero_mask = (u1_to_zero << bit_count_u0) | u0_to_zero;
> -
> -        /* Mask blocks using AND with subtable blocks, use k-mask to zero
> -         * where lanes as required for this packet.
> -         */
> -        __m512i v_masked_blocks = _mm512_maskz_and_epi64(zero_mask,
> -                                                v_all_blocks, v_tbl_blocks);
> +        uint64_t u0_to_zero = _pext_u64(keys[i]->mf.map.bits[0], tbl_u0);
> +        uint64_t u1_to_zero = _pext_u64(keys[i]->mf.map.bits[1], tbl_u1);
> +        const uint64_t zero_mask_wip = (u1_to_zero << bit_count_u0) |
> +                                       u0_to_zero;
> +        const uint64_t zero_mask = zero_mask_wip & bit_count_total_mask;
> +
> +        /* Get ptr to packet data blocks. */
> +        const uint64_t *pkt_blocks = miniflow_get_values(&keys[i]->mf);
> +
> +        /* Store first 8 blocks cache, full cache line aligned. */
> +        __m512i v_blocks = avx512_blocks_gather(v_u0, v_u1,
> +                                                &pkt_blocks[0],
> +                                                &tbl_blocks[0],
> +                                                &tbl_mf_masks[0],
> +                                                u1_bcast_mask,
> +                                                pkt_mf_u0_pop,
> +                                                zero_mask,
> +                                                bit_count_total_mask);
> +        _mm512_storeu_si512(&block_cache[i * MF_BLOCKS_PER_PACKET],
> v_blocks);
> +
> +        if (bit_count_total > 8) {
> +            /* Shift masks over by 8.
> +             * Pkt blocks pointer remains 0, it is incremented by popcount.
> +             * Move tbl and mf masks pointers forward.
> +             * Increase offsets by 8.
> +             * Re-run same gather code.
> +             */
> +            uint64_t zero_mask_gt8 = (zero_mask >> 8);
> +            uint64_t u1_bcast_mask_gt8 = (u1_bcast_mask >> 8);
> +            uint64_t bit_count_gt8_mask = bit_count_total_mask >> 8;
> +
> +            __m512i v_blocks_gt8 = avx512_blocks_gather(v_u0, v_u1,
> +                                                    &pkt_blocks[0],
> +                                                    &tbl_blocks[8],
> +                                                    &tbl_mf_masks[8],
> +                                                    u1_bcast_mask_gt8,
> +                                                    pkt_mf_u0_pop,
> +                                                    zero_mask_gt8,
> +                                                    bit_count_gt8_mask);
> +            _mm512_storeu_si512(&block_cache[(i * MF_BLOCKS_PER_PACKET)
> + 8],
> +                                v_blocks_gt8);
> +        }
> 
> -        /* Store to blocks cache, full cache line aligned. */
> -        _mm512_storeu_si512(&block_cache[i * 8], v_masked_blocks);
>      }
> 
>      /* Hash the now linearized blocks of packet metadata. */
>      ULLONG_FOR_EACH_1 (i, keys_map) {
> -        uint64_t *block_ptr = &block_cache[i * 8];
> +        uint64_t *block_ptr = &block_cache[i * MF_BLOCKS_PER_PACKET];
>          uint32_t hash = hash_add_words64(0, block_ptr, bit_count_total);
>          hashes[i] = hash_finish(hash, bit_count_total * 8);
>      }
> @@ -183,7 +268,7 @@ avx512_lookup_impl(struct dpcls_subtable *subtable,
>          struct dpcls_rule *rule;
> 
>          CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) {
> -            const uint32_t cidx = i * 8;
> +            const uint32_t cidx = i * MF_BLOCKS_PER_PACKET;
>              uint32_t match = netdev_rule_matches_key(rule, bit_count_total,
>                                                       &block_cache[cidx]);
>              if (OVS_LIKELY(match)) {
> @@ -220,7 +305,7 @@ DECLARE_OPTIMIZED_LOOKUP_FUNCTION(4, 0)
> 
>  /* Check if a specialized function is valid for the required subtable. */
>  #define CHECK_LOOKUP_FUNCTION(U0, U1)                                         \
> -    ovs_assert((U0 + U1) <= NUM_U64_IN_ZMM_REG);                              \
> +    ovs_assert((U0 + U1) <= (NUM_U64_IN_ZMM_REG * 2));                        \
>      if (!f && u0_bits == U0 && u1_bits == U1) {                               \
>          f = dpcls_avx512_gather_mf_##U0##_##U1;                               \
>      }
> @@ -250,7 +335,11 @@ dpcls_subtable_avx512_gather_probe(uint32_t
> u0_bits, uint32_t u1_bits)
>      CHECK_LOOKUP_FUNCTION(4, 1);
>      CHECK_LOOKUP_FUNCTION(4, 0);
> 
> -    if (!f && (u0_bits + u1_bits) < NUM_U64_IN_ZMM_REG) {
> +    /* Check if the _any looping version of the code can perform this miniflow
> +     * lookup. Performance gain may be less pronounced due to non-
> specialized
> +     * hashing, however there is usually a good performance win overall.
> +     */
> +    if (!f && (u0_bits + u1_bits) < (NUM_U64_IN_ZMM_REG * 2)) {
>          f = dpcls_avx512_gather_mf_any;
>          VLOG_INFO("Using avx512_gather_mf_any for subtable (%d,%d)\n",
>                    u0_bits, u1_bits);
> --
> 2.31.1
> 
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
diff mbox series

Patch

diff --git a/NEWS b/NEWS
index 5e847a95e..6a07cd362 100644
--- a/NEWS
+++ b/NEWS
@@ -11,6 +11,7 @@  Post-v2.15.0
      * Add avx512 implementation of dpif which can process non recirculated
        packets. It supports partial HWOL, EMC, SMC and DPCLS lookups.
      * Add commands to get and set the dpif implementations.
+     * Enable AVX512 optimized DPCLS to search subtables with larger miniflows.
    - ovs-ctl:
      * New option '--no-record-hostname' to disable hostname configuration
        in ovsdb on startup.
diff --git a/lib/dpif-netdev-lookup-avx512-gather.c b/lib/dpif-netdev-lookup-avx512-gather.c
index 8fc1cdfa5..1bfabdcb1 100644
--- a/lib/dpif-netdev-lookup-avx512-gather.c
+++ b/lib/dpif-netdev-lookup-avx512-gather.c
@@ -34,7 +34,21 @@ 
  * AVX512 code at a time.
  */
 #define NUM_U64_IN_ZMM_REG (8)
-#define BLOCKS_CACHE_SIZE (NETDEV_MAX_BURST * NUM_U64_IN_ZMM_REG)
+
+/* This implementation of AVX512 gather allows up to 16 blocks of MF data to be
+ * present in the blocks_cache, hence the multiply by 2 in the blocks count.
+ */
+#define MF_BLOCKS_PER_PACKET (NUM_U64_IN_ZMM_REG * 2)
+
+/* Blocks cache size is the maximum number of miniflow blocks that this
+ * implementation of lookup can handle.
+ */
+#define BLOCKS_CACHE_SIZE (NETDEV_MAX_BURST * MF_BLOCKS_PER_PACKET)
+
+/* The gather instruction can handle a scale for the size of the items to
+ * gather. For uint64_t data, this scale is 8.
+ */
+#define GATHER_SCALE_8 (8)
 
 
 VLOG_DEFINE_THIS_MODULE(dpif_lookup_avx512_gather);
@@ -69,22 +83,83 @@  netdev_rule_matches_key(const struct dpcls_rule *rule,
 {
     const uint64_t *keyp = miniflow_get_values(&rule->flow.mf);
     const uint64_t *maskp = miniflow_get_values(&rule->mask->mf);
-    const uint32_t lane_mask = (1 << mf_bits_total) - 1;
+    const uint32_t lane_mask = (1ULL << mf_bits_total) - 1;
 
     /* Always load a full cache line from blocks_cache. Other loads must be
      * trimmed to the amount of data required for mf_bits_total blocks.
      */
-    __m512i v_blocks = _mm512_loadu_si512(&block_cache[0]);
-    __m512i v_mask   = _mm512_maskz_loadu_epi64(lane_mask, &maskp[0]);
-    __m512i v_key    = _mm512_maskz_loadu_epi64(lane_mask, &keyp[0]);
+    uint32_t res_mask;
+
+    {
+        __m512i v_blocks = _mm512_loadu_si512(&block_cache[0]);
+        __m512i v_mask   = _mm512_maskz_loadu_epi64(lane_mask, &maskp[0]);
+        __m512i v_key    = _mm512_maskz_loadu_epi64(lane_mask, &keyp[0]);
+        __m512i v_data = _mm512_and_si512(v_blocks, v_mask);
+        res_mask = _mm512_mask_cmpeq_epi64_mask(lane_mask, v_data, v_key);
+    }
 
-    __m512i v_data = _mm512_and_si512(v_blocks, v_mask);
-    uint32_t res_mask = _mm512_mask_cmpeq_epi64_mask(lane_mask, v_data, v_key);
+    if (mf_bits_total > 8) {
+        uint32_t lane_mask_gt8 = lane_mask >> 8;
+        __m512i v_blocks = _mm512_loadu_si512(&block_cache[8]);
+        __m512i v_mask   = _mm512_maskz_loadu_epi64(lane_mask_gt8, &maskp[8]);
+        __m512i v_key    = _mm512_maskz_loadu_epi64(lane_mask_gt8, &keyp[8]);
+        __m512i v_data = _mm512_and_si512(v_blocks, v_mask);
+        uint32_t c = _mm512_mask_cmpeq_epi64_mask(lane_mask_gt8, v_data,
+                                                  v_key);
+        res_mask |= (c << 8);
+    }
 
-    /* returns 1 assuming result of SIMD compare is all blocks. */
+    /* returns 1 assuming result of SIMD compare is all blocks matching. */
     return res_mask == lane_mask;
 }
 
+/* Takes u0 and u1 inputs, and gathers the next 8 blocks to be stored
+ * contigously into the blocks cache. Note that the pointers and bitmasks
+ * passed into this function must be incremented for handling next 8 blocks.
+ */
+static inline ALWAYS_INLINE __m512i
+avx512_blocks_gather(__m512i v_u0, /* reg of u64 of all u0 bits */
+                     __m512i v_u1, /* reg of u64 of all u1 bits */
+                     const uint64_t *pkt_blocks, /* ptr pkt blocks to load */
+                     const void *tbl_blocks,     /* ptr to blocks in tbl */
+                     const void *tbl_mf_masks,   /* ptr to subtable mf masks */
+                     __mmask64 u1_bcast_msk,      /* mask of u1 lanes */
+                     const uint64_t pkt_mf_u0_pop, /* num bits in u0 of pkt */
+                     __mmask64 zero_mask, /* maskz if pkt not have mf bit */
+                     __mmask64 u64_lanes_mask) /* total lane count to use */
+{
+        /* Suggest to compiler to load tbl blocks ahead of gather(). */
+        __m512i v_tbl_blocks = _mm512_maskz_loadu_epi64(u64_lanes_mask,
+                                                        tbl_blocks);
+
+        /* Blend u0 and u1 bits together for these 8 blocks. */
+        __m512i v_pkt_bits = _mm512_mask_blend_epi64(u1_bcast_msk, v_u0, v_u1);
+
+        /* Load pre-created tbl miniflow bitmasks, bitwise AND with them. */
+        __m512i v_tbl_masks = _mm512_maskz_loadu_epi64(u64_lanes_mask,
+                                                      tbl_mf_masks);
+        __m512i v_masks = _mm512_and_si512(v_pkt_bits, v_tbl_masks);
+
+        /* Manual AVX512 popcount for u64 lanes. */
+        __m512i v_popcnts = _mm512_popcnt_epi64_manual(v_masks);
+
+        /* Add popcounts and offset for u1 bits. */
+        __m512i v_idx_u0_offset = _mm512_maskz_set1_epi64(u1_bcast_msk,
+                                                          pkt_mf_u0_pop);
+        __m512i v_indexes = _mm512_add_epi64(v_popcnts, v_idx_u0_offset);
+
+        /* Gather u64 blocks from packet miniflow. */
+        __m512i v_zeros = _mm512_setzero_si512();
+        __m512i v_blocks = _mm512_mask_i64gather_epi64(v_zeros, u64_lanes_mask,
+                                                       v_indexes, pkt_blocks,
+                                                       GATHER_SCALE_8);
+
+        /* Mask pkt blocks with subtable blocks, k-mask to zero lanes. */
+        __m512i v_masked_blocks = _mm512_maskz_and_epi64(zero_mask, v_blocks,
+                                                         v_tbl_blocks);
+        return v_masked_blocks;
+}
+
 static inline uint32_t ALWAYS_INLINE
 avx512_lookup_impl(struct dpcls_subtable *subtable,
                    uint32_t keys_map,
@@ -94,76 +169,86 @@  avx512_lookup_impl(struct dpcls_subtable *subtable,
                    const uint32_t bit_count_u1)
 {
     OVS_ALIGNED_VAR(CACHE_LINE_SIZE)uint64_t block_cache[BLOCKS_CACHE_SIZE];
-
-    const uint32_t bit_count_total = bit_count_u0 + bit_count_u1;
-    int i;
     uint32_t hashes[NETDEV_MAX_BURST];
+
     const uint32_t n_pkts = __builtin_popcountll(keys_map);
     ovs_assert(NETDEV_MAX_BURST >= n_pkts);
 
+    const uint32_t bit_count_total = bit_count_u0 + bit_count_u1;
+    const uint64_t bit_count_total_mask = (1ULL << bit_count_total) - 1;
+
     const uint64_t tbl_u0 = subtable->mask.mf.map.bits[0];
     const uint64_t tbl_u1 = subtable->mask.mf.map.bits[1];
 
-    /* Load subtable blocks for masking later. */
     const uint64_t *tbl_blocks = miniflow_get_values(&subtable->mask.mf);
-    const __m512i v_tbl_blocks = _mm512_loadu_si512(&tbl_blocks[0]);
-
-    /* Load pre-created subtable masks for each block in subtable. */
-    const __mmask8 bit_count_total_mask = (1 << bit_count_total) - 1;
-    const __m512i v_mf_masks = _mm512_maskz_loadu_epi64(bit_count_total_mask,
-                                                        subtable->mf_masks);
+    const uint64_t *tbl_mf_masks = subtable->mf_masks;
 
+    int i;
     ULLONG_FOR_EACH_1 (i, keys_map) {
+        /* Create mask register with packet-specific u0 offset.
+         * Note that as 16 blocks can be handled in total, the width of the
+         * mask register must be >=16.
+         */
         const uint64_t pkt_mf_u0_bits = keys[i]->mf.map.bits[0];
         const uint64_t pkt_mf_u0_pop = __builtin_popcountll(pkt_mf_u0_bits);
-
-        /* Pre-create register with *PER PACKET* u0 offset. */
-        const __mmask8 u1_bcast_mask = (UINT8_MAX << bit_count_u0);
-        const __m512i v_idx_u0_offset = _mm512_maskz_set1_epi64(u1_bcast_mask,
-                                                                pkt_mf_u0_pop);
+        const __mmask64 u1_bcast_mask = (UINT64_MAX << bit_count_u0);
 
         /* Broadcast u0, u1 bitmasks to 8x u64 lanes. */
-        __m512i v_u0 = _mm512_set1_epi64(pkt_mf_u0_bits);
-        __m512i v_pkt_bits = _mm512_mask_set1_epi64(v_u0, u1_bcast_mask,
-                                         keys[i]->mf.map.bits[1]);
-
-        /* Bitmask by pre-created masks. */
-        __m512i v_masks = _mm512_and_si512(v_pkt_bits, v_mf_masks);
-
-        /* Manual AVX512 popcount for u64 lanes. */
-        __m512i v_popcnts = _mm512_popcnt_epi64_manual(v_masks);
-
-        /* Offset popcounts for u1 with pre-created offset register. */
-        __m512i v_indexes = _mm512_add_epi64(v_popcnts, v_idx_u0_offset);
-
-        /* Gather u64 blocks from packet miniflow. */
-        const __m512i v_zeros = _mm512_setzero_si512();
-        const void *pkt_data = miniflow_get_values(&keys[i]->mf);
-        __m512i v_all_blocks = _mm512_mask_i64gather_epi64(v_zeros,
-                                   bit_count_total_mask, v_indexes,
-                                   pkt_data, 8);
+        __m512i v_u0 = _mm512_set1_epi64(keys[i]->mf.map.bits[0]);
+        __m512i v_u1 = _mm512_set1_epi64(keys[i]->mf.map.bits[1]);
 
         /* Zero out bits that pkt doesn't have:
          * - 2x pext() to extract bits from packet miniflow as needed by TBL
          * - Shift u1 over by bit_count of u0, OR to create zero bitmask
          */
-         uint64_t u0_to_zero = _pext_u64(keys[i]->mf.map.bits[0], tbl_u0);
-         uint64_t u1_to_zero = _pext_u64(keys[i]->mf.map.bits[1], tbl_u1);
-         uint64_t zero_mask = (u1_to_zero << bit_count_u0) | u0_to_zero;
-
-        /* Mask blocks using AND with subtable blocks, use k-mask to zero
-         * where lanes as required for this packet.
-         */
-        __m512i v_masked_blocks = _mm512_maskz_and_epi64(zero_mask,
-                                                v_all_blocks, v_tbl_blocks);
+        uint64_t u0_to_zero = _pext_u64(keys[i]->mf.map.bits[0], tbl_u0);
+        uint64_t u1_to_zero = _pext_u64(keys[i]->mf.map.bits[1], tbl_u1);
+        const uint64_t zero_mask_wip = (u1_to_zero << bit_count_u0) |
+                                       u0_to_zero;
+        const uint64_t zero_mask = zero_mask_wip & bit_count_total_mask;
+
+        /* Get ptr to packet data blocks. */
+        const uint64_t *pkt_blocks = miniflow_get_values(&keys[i]->mf);
+
+        /* Store first 8 blocks cache, full cache line aligned. */
+        __m512i v_blocks = avx512_blocks_gather(v_u0, v_u1,
+                                                &pkt_blocks[0],
+                                                &tbl_blocks[0],
+                                                &tbl_mf_masks[0],
+                                                u1_bcast_mask,
+                                                pkt_mf_u0_pop,
+                                                zero_mask,
+                                                bit_count_total_mask);
+        _mm512_storeu_si512(&block_cache[i * MF_BLOCKS_PER_PACKET], v_blocks);
+
+        if (bit_count_total > 8) {
+            /* Shift masks over by 8.
+             * Pkt blocks pointer remains 0, it is incremented by popcount.
+             * Move tbl and mf masks pointers forward.
+             * Increase offsets by 8.
+             * Re-run same gather code.
+             */
+            uint64_t zero_mask_gt8 = (zero_mask >> 8);
+            uint64_t u1_bcast_mask_gt8 = (u1_bcast_mask >> 8);
+            uint64_t bit_count_gt8_mask = bit_count_total_mask >> 8;
+
+            __m512i v_blocks_gt8 = avx512_blocks_gather(v_u0, v_u1,
+                                                    &pkt_blocks[0],
+                                                    &tbl_blocks[8],
+                                                    &tbl_mf_masks[8],
+                                                    u1_bcast_mask_gt8,
+                                                    pkt_mf_u0_pop,
+                                                    zero_mask_gt8,
+                                                    bit_count_gt8_mask);
+            _mm512_storeu_si512(&block_cache[(i * MF_BLOCKS_PER_PACKET) + 8],
+                                v_blocks_gt8);
+        }
 
-        /* Store to blocks cache, full cache line aligned. */
-        _mm512_storeu_si512(&block_cache[i * 8], v_masked_blocks);
     }
 
     /* Hash the now linearized blocks of packet metadata. */
     ULLONG_FOR_EACH_1 (i, keys_map) {
-        uint64_t *block_ptr = &block_cache[i * 8];
+        uint64_t *block_ptr = &block_cache[i * MF_BLOCKS_PER_PACKET];
         uint32_t hash = hash_add_words64(0, block_ptr, bit_count_total);
         hashes[i] = hash_finish(hash, bit_count_total * 8);
     }
@@ -183,7 +268,7 @@  avx512_lookup_impl(struct dpcls_subtable *subtable,
         struct dpcls_rule *rule;
 
         CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) {
-            const uint32_t cidx = i * 8;
+            const uint32_t cidx = i * MF_BLOCKS_PER_PACKET;
             uint32_t match = netdev_rule_matches_key(rule, bit_count_total,
                                                      &block_cache[cidx]);
             if (OVS_LIKELY(match)) {
@@ -220,7 +305,7 @@  DECLARE_OPTIMIZED_LOOKUP_FUNCTION(4, 0)
 
 /* Check if a specialized function is valid for the required subtable. */
 #define CHECK_LOOKUP_FUNCTION(U0, U1)                                         \
-    ovs_assert((U0 + U1) <= NUM_U64_IN_ZMM_REG);                              \
+    ovs_assert((U0 + U1) <= (NUM_U64_IN_ZMM_REG * 2));                        \
     if (!f && u0_bits == U0 && u1_bits == U1) {                               \
         f = dpcls_avx512_gather_mf_##U0##_##U1;                               \
     }
@@ -250,7 +335,11 @@  dpcls_subtable_avx512_gather_probe(uint32_t u0_bits, uint32_t u1_bits)
     CHECK_LOOKUP_FUNCTION(4, 1);
     CHECK_LOOKUP_FUNCTION(4, 0);
 
-    if (!f && (u0_bits + u1_bits) < NUM_U64_IN_ZMM_REG) {
+    /* Check if the _any looping version of the code can perform this miniflow
+     * lookup. Performance gain may be less pronounced due to non-specialized
+     * hashing, however there is usually a good performance win overall.
+     */
+    if (!f && (u0_bits + u1_bits) < (NUM_U64_IN_ZMM_REG * 2)) {
         f = dpcls_avx512_gather_mf_any;
         VLOG_INFO("Using avx512_gather_mf_any for subtable (%d,%d)\n",
                   u0_bits, u1_bits);