diff mbox series

[ovs-dev,v2,1/2] ovsdb-data: Optimize union of sets.

Message ID 20210917171739.1060303-2-i.maximets@ovn.org
State Changes Requested
Headers show
Series ovsdb: Optimize set operations. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Ilya Maximets Sept. 17, 2021, 5:17 p.m. UTC
Current algorithm of ovsdb_datum_union looks like this:

  for-each atom in b:
      if not bin_search(a, atom):
          push(a, clone(atom))
  quicksort(a)

So, the complexity looks like this:

   Nb * log2(Na)   +    Nb     +   (Na + Nb) * log2(Na + Nb)
   Comparisons        clones       Comparisons for quicksort
   for search

ovsdb_datum_union() is heavily used in database transactions while
new element is added to a set.  For example, if new logical switch
port is added to a logical switch in OVN.  This is a very common
use case where CMS adds one new port to an existing switch that
already has, let's say, 100 ports.  For this case ovsdb-server will
have to perform:

   1 * log2(100)  + 1 clone + 101 * log2(101)
   Comparisons                Comparisons for
   for search                   quicksort.
       ~7           1            ~707
   Roughly 714 comparisons of atoms and 1 clone.

Since binary search can give us position, where new atom should go
(it's the 'low' index after the search completion) for free, the
logic can be re-worked like this:

  copied = 0
  for-each atom in b:
      desired_position = bin_search(a, atom)
      push(result, a[ copied : desired_position - 1 ])
      copied = desired_position
      push(result, clone(atom))
  push(result, a[ copied : Na ])
  swap(a, result)

Complexity of this schema:

   Nb * log2(Na)   +    Nb     +         Na
   Comparisons        clones       memory copy on push
   for search

'swap' is just a swap of a few pointers.  'push' is not a 'clone',
but a simple memory copy of 'union ovsdb_atom'.

In general, this schema substitutes complexity of a quicksort
with complexity of a memory copy of Na atom structures, where we're
not even copying strings that these atoms are pointing to.

Complexity in the example above goes down from 714 comparisons
to 7 comparisons and memcpy of 100 * sizeof (union ovsdb_atom) bytes.

General complexity of a memory copy should always be lower than
complexity of a quicksort, especially because these copies usually
performed in bulk, so this new schema should work faster for any input.

All in all, this change allows to execute several times more
transactions per second for transactions that adds new entries to sets.

Alternatively, union can be implemented as a linear merge of two
sorted arrays, but this will result in O(Na) comparisons, which
is more than Nb * log2(Na) in common case, since Na is usually
far bigger than Nb.  Linear merge will also mean per-atom memory
copies instead of copying in bulk.

'replace' functionality of ovsdb_datum_union() had no users, so it
just removed.  But it can easily be added back if needed in the future.

Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
---
 lib/db-ctl-base.c | 10 +++---
 lib/ovsdb-data.c  | 84 ++++++++++++++++++++++++++++++++---------------
 lib/ovsdb-data.h  |  6 ++--
 lib/ovsdb-idl.c   |  8 ++---
 ovsdb/mutation.c  |  2 +-
 vswitchd/bridge.c |  9 +++--
 6 files changed, 77 insertions(+), 42 deletions(-)

Comments

Mark Gray Sept. 21, 2021, 10:34 a.m. UTC | #1
On 17/09/2021 18:17, Ilya Maximets wrote:
> Current algorithm of ovsdb_datum_union looks like this:
> 
>   for-each atom in b:
>       if not bin_search(a, atom):
>           push(a, clone(atom))
>   quicksort(a)
> 
> So, the complexity looks like this:
> 
>    Nb * log2(Na)   +    Nb     +   (Na + Nb) * log2(Na + Nb)
>    Comparisons        clones       Comparisons for quicksort
>    for search
> 
> ovsdb_datum_union() is heavily used in database transactions while
> new element is added to a set.  For example, if new logical switch
> port is added to a logical switch in OVN.  This is a very common
> use case where CMS adds one new port to an existing switch that
> already has, let's say, 100 ports.  For this case ovsdb-server will
> have to perform:
> 
>    1 * log2(100)  + 1 clone + 101 * log2(101)
>    Comparisons                Comparisons for
>    for search                   quicksort.
>        ~7           1            ~707
>    Roughly 714 comparisons of atoms and 1 clone.
> 
> Since binary search can give us position, where new atom should go
> (it's the 'low' index after the search completion) for free, the
> logic can be re-worked like this:
> 
>   copied = 0
>   for-each atom in b:
>       desired_position = bin_search(a, atom)
>       push(result, a[ copied : desired_position - 1 ])
>       copied = desired_position
>       push(result, clone(atom))
>   push(result, a[ copied : Na ])
>   swap(a, result)
> 
> Complexity of this schema:
> 
>    Nb * log2(Na)   +    Nb     +         Na
>    Comparisons        clones       memory copy on push
>    for search
> 
> 'swap' is just a swap of a few pointers.  'push' is not a 'clone',
> but a simple memory copy of 'union ovsdb_atom'.
> 
> In general, this schema substitutes complexity of a quicksort
> with complexity of a memory copy of Na atom structures, where we're
> not even copying strings that these atoms are pointing to.
> 
> Complexity in the example above goes down from 714 comparisons
> to 7 comparisons and memcpy of 100 * sizeof (union ovsdb_atom) bytes.
> 
> General complexity of a memory copy should always be lower than
> complexity of a quicksort, especially because these copies usually
> performed in bulk, so this new schema should work faster for any input.
> 
> All in all, this change allows to execute several times more
> transactions per second for transactions that adds new entries to sets.
> 
> Alternatively, union can be implemented as a linear merge of two
> sorted arrays, but this will result in O(Na) comparisons, which
> is more than Nb * log2(Na) in common case, since Na is usually
> far bigger than Nb.  Linear merge will also mean per-atom memory
> copies instead of copying in bulk.
> 
> 'replace' functionality of ovsdb_datum_union() had no users, so it
> just removed.  But it can easily be added back if needed in the future.
> 
> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
> ---
>  lib/db-ctl-base.c | 10 +++---
>  lib/ovsdb-data.c  | 84 ++++++++++++++++++++++++++++++++---------------
>  lib/ovsdb-data.h  |  6 ++--
>  lib/ovsdb-idl.c   |  8 ++---
>  ovsdb/mutation.c  |  2 +-
>  vswitchd/bridge.c |  9 +++--
>  6 files changed, 77 insertions(+), 42 deletions(-)
> 
> diff --git a/lib/db-ctl-base.c b/lib/db-ctl-base.c
> index 77cc76a9f..f69868702 100644
> --- a/lib/db-ctl-base.c
> +++ b/lib/db-ctl-base.c
> @@ -321,7 +321,7 @@ get_row_by_id(struct ctl_context *ctx,
>              const union ovsdb_atom key_atom
>                  = { .string = CONST_CAST(char *, id->key) };
>              unsigned int i = ovsdb_datum_find_key(datum, &key_atom,
> -                                                  OVSDB_TYPE_STRING);
> +                                                  OVSDB_TYPE_STRING, NULL);
>              name = i == UINT_MAX ? NULL : &datum->values[i];
>          }
>          if (!name) {
> @@ -820,7 +820,7 @@ check_condition(const struct ovsdb_idl_table_class *table,
>          }
>  
>          idx = ovsdb_datum_find_key(have_datum,
> -                                   &want_key, column->type.key.type);
> +                                   &want_key, column->type.key.type, NULL);
>          if (idx == UINT_MAX && !is_set_operator(operator)) {
>              retval = false;
>          } else {
> @@ -993,7 +993,7 @@ cmd_get(struct ctl_context *ctx)
>              }
>  
>              idx = ovsdb_datum_find_key(datum, &key,
> -                                       column->type.key.type);
> +                                       column->type.key.type, NULL);
>              if (idx == UINT_MAX) {
>                  if (must_exist) {
>                      ctl_error(
> @@ -1375,7 +1375,7 @@ set_column(const struct ovsdb_idl_table_class *table,
>          ovsdb_atom_destroy(&value, column->type.value.type);
>  
>          ovsdb_datum_union(&datum, ovsdb_idl_read(row, column),
> -                          &column->type, false);
> +                          &column->type);
>          ovsdb_idl_txn_verify(row, column);
>          ovsdb_idl_txn_write(row, column, &datum);
>      } else {
> @@ -1514,7 +1514,7 @@ cmd_add(struct ctl_context *ctx)
>              ovsdb_datum_destroy(&old, &column->type);
>              return;
>          }
> -        ovsdb_datum_union(&old, &add, type, false);
> +        ovsdb_datum_union(&old, &add, type);
>          ovsdb_datum_destroy(&add, type);
>      }
>      if (old.n > type->n_max) {
> diff --git a/lib/ovsdb-data.c b/lib/ovsdb-data.c
> index c145f5ad9..11bf95fed 100644
> --- a/lib/ovsdb-data.c
> +++ b/lib/ovsdb-data.c
> @@ -799,7 +799,8 @@ ovsdb_atom_check_constraints(const union ovsdb_atom *atom,
>                               const struct ovsdb_base_type *base)
>  {
>      if (base->enum_
> -        && ovsdb_datum_find_key(base->enum_, atom, base->type) == UINT_MAX) {
> +        && ovsdb_datum_find_key(base->enum_, atom,
> +                                base->type, NULL) == UINT_MAX) {
>          struct ovsdb_error *error;
>          struct ds actual = DS_EMPTY_INITIALIZER;
>          struct ds valid = DS_EMPTY_INITIALIZER;
> @@ -1787,11 +1788,13 @@ ovsdb_datum_compare_3way(const struct ovsdb_datum *a,
>  /* If 'key' is one of the keys in 'datum', returns its index within 'datum',
>   * otherwise UINT_MAX.  'key.type' must be the type of the atoms stored in the
>   * 'keys' array in 'datum'.
> + * If 'key' is not found, index where it should have been returned in '*pos'.
>   */
>  unsigned int
>  ovsdb_datum_find_key(const struct ovsdb_datum *datum,
>                       const union ovsdb_atom *key,
> -                     enum ovsdb_atomic_type key_type)
> +                     enum ovsdb_atomic_type key_type,
> +                     unsigned int *pos)

bit of a nit but the interface seems a bit awkward now because both the
return code and the parameter can return the position. How about
something like the following. It returns 'true' or 'false' depending on
whether or not the key is found and 'pos' will return the actual
position for the 'true' case or the insertion position for the 'false'
case. At least to me, this seems more natural:

bool
ovsdb_datum_find_key(const struct ovsdb_datum *datum,
                     const union ovsdb_atom *key,
                     enum ovsdb_atomic_type key_type)
                     enum ovsdb_atomic_type key_type,
                     unsigned int *pos)


>  {
>      unsigned int low = 0;
>      unsigned int high = datum->n;
> @@ -1806,6 +1809,9 @@ ovsdb_datum_find_key(const struct ovsdb_datum *datum,
>              return idx;
>          }
>      }
> +    if (pos) {
> +        *pos = low;
> +    }
>      return UINT_MAX;
>  }
>  
> @@ -1821,7 +1827,7 @@ ovsdb_datum_find_key_value(const struct ovsdb_datum *datum,
>                             const union ovsdb_atom *value,
>                             enum ovsdb_atomic_type value_type)
>  {
> -    unsigned int idx = ovsdb_datum_find_key(datum, key, key_type);
> +    unsigned int idx = ovsdb_datum_find_key(datum, key, key_type, NULL);
>      if (idx != UINT_MAX
>          && value_type != OVSDB_TYPE_VOID
>          && !ovsdb_atom_equals(&datum->values[idx], value, value_type)) {
> @@ -1948,38 +1954,64 @@ ovsdb_datum_add_unsafe(struct ovsdb_datum *datum,
>      }
>  }
>  
> +/* Adds 'n' atoms starting from index 'start_idx' from 'src' to the end of
> + * 'dst'.  'dst' should have enough memory allocated to hold the additional
> + * 'n' atoms.  Atoms are not cloned, i.e. 'dst' will reference the same data.
> + * Caller also should take care of the result being sorted. */
> +static void
> +ovsdb_datum_push_unsafe(struct ovsdb_datum *dst,
> +                        const struct ovsdb_datum *src,
> +                        unsigned int start_idx, unsigned int n,
> +                        const struct ovsdb_type *type)
> +{
> +    memcpy(&dst->keys[dst->n], &src->keys[start_idx], n * sizeof src->keys[0]);
> +    if (type->value.type != OVSDB_TYPE_VOID) {
> +        memcpy(&dst->values[dst->n], &src->values[start_idx],
> +               n * sizeof src->values[0]);
> +    }
> +    dst->n += n;
> +}
> +
>  void
>  ovsdb_datum_union(struct ovsdb_datum *a, const struct ovsdb_datum *b,
> -                  const struct ovsdb_type *type, bool replace)
> +                  const struct ovsdb_type *type)
>  {
> -    unsigned int n;
> -    size_t bi;
> +    struct ovsdb_datum result;
> +    unsigned int copied, pos;
> +    size_t ai, bi;
>  
> -    n = a->n;
> -    for (bi = 0; bi < b->n; bi++) {
> -        unsigned int ai;
> +    ovsdb_datum_init_empty(&result);
> +    ovsdb_datum_reallocate(&result, type, a->n + b->n);

If there are duplicates in a and b, this will be bigger than the actual
number of elements. I guess its not an issue (other than wasted space)?

>  
> -        ai = ovsdb_datum_find_key(a, &b->keys[bi], type->key.type);
> -        if (ai == UINT_MAX) {
> -            if (n == a->n) {
> -                ovsdb_datum_reallocate(a, type, a->n + (b->n - bi));
> -            }
> -            ovsdb_atom_clone(&a->keys[n], &b->keys[bi], type->key.type);
> -            if (type->value.type != OVSDB_TYPE_VOID) {
> -                ovsdb_atom_clone(&a->values[n], &b->values[bi],
> -                                 type->value.type);
> -            }
> -            n++;
> -        } else if (replace && type->value.type != OVSDB_TYPE_VOID) {
> -            ovsdb_atom_destroy(&a->values[ai], type->value.type);
> -            ovsdb_atom_clone(&a->values[ai], &b->values[bi],
> +    copied = 0;
> +    for (bi = 0; bi < b->n; bi++) {
> +        ai = ovsdb_datum_find_key(a, &b->keys[bi], type->key.type, &pos);
> +        if (ai != UINT_MAX) {
> +            /* Atom with the same key already exists. */
> +            continue;
> +        }
> +        if (pos > copied) {
> +            /* Need to copy some atoms from 'a' first. */
> +            ovsdb_datum_push_unsafe(&result, a, copied, pos - copied, type);
> +            copied = pos;
> +        }
> +        /* Inserting new atom from 'b'. */
> +        ovsdb_atom_clone(&result.keys[result.n], &b->keys[bi], type->key.type);
> +        if (type->value.type != OVSDB_TYPE_VOID) {
> +            ovsdb_atom_clone(&result.values[result.n], &b->values[bi],
>                               type->value.type);
>          }
> +        result.n++;
>      }
> -    if (n != a->n) {
> -        a->n = n;
> -        ovs_assert(!ovsdb_datum_sort(a, type->key.type));
> +    if (a->n > copied) {
> +        /* Copying remaining atoms. */
> +        ovsdb_datum_push_unsafe(&result, a, copied, a->n - copied, type);
>      }
> +    /* All atoms are copied now. */
> +    a->n = 0;
> +
> +    ovsdb_datum_swap(&result, a);
> +    ovsdb_datum_destroy(&result, type);
>  }
>  
>  void
> diff --git a/lib/ovsdb-data.h b/lib/ovsdb-data.h
> index c5a80ee39..a580cd2bd 100644
> --- a/lib/ovsdb-data.h
> +++ b/lib/ovsdb-data.h
> @@ -211,7 +211,8 @@ bool ovsdb_datum_equals(const struct ovsdb_datum *,
>  /* Search. */
>  unsigned int ovsdb_datum_find_key(const struct ovsdb_datum *,
>                                    const union ovsdb_atom *key,
> -                                  enum ovsdb_atomic_type key_type);
> +                                  enum ovsdb_atomic_type key_type,
> +                                  unsigned int *pos);
>  unsigned int ovsdb_datum_find_key_value(const struct ovsdb_datum *,
>                                          const union ovsdb_atom *key,
>                                          enum ovsdb_atomic_type key_type,
> @@ -227,8 +228,7 @@ bool ovsdb_datum_excludes_all(const struct ovsdb_datum *,
>                                const struct ovsdb_type *);
>  void ovsdb_datum_union(struct ovsdb_datum *,
>                         const struct ovsdb_datum *,
> -                       const struct ovsdb_type *,
> -                       bool replace);
> +                       const struct ovsdb_type *);
>  void ovsdb_datum_subtract(struct ovsdb_datum *a,
>                            const struct ovsdb_type *a_type,
>                            const struct ovsdb_datum *b,
> diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
> index d3caccec8..d9b0993f6 100644
> --- a/lib/ovsdb-idl.c
> +++ b/lib/ovsdb-idl.c
> @@ -2842,7 +2842,7 @@ ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
>                      new_datum = map_op_datum(map_op);
>                      pos = ovsdb_datum_find_key(old_datum,
>                                                 &new_datum->keys[0],
> -                                               key_type);
> +                                               key_type, NULL);
>                      if (ovsdb_atom_equals(&new_datum->values[0],
>                                            &old_datum->values[pos],
>                                            value_type)) {
> @@ -2854,7 +2854,7 @@ ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
>                      unsigned int pos;
>                      pos = ovsdb_datum_find_key(old_datum,
>                                                 &map_op_datum(map_op)->keys[0],
> -                                               key_type);
> +                                               key_type, NULL);
>                      if (pos == UINT_MAX) {
>                          /* No key to delete.  Move on to next update. */
>                          VLOG_WARN("Trying to delete a key that doesn't "
> @@ -2953,7 +2953,7 @@ ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
>                      unsigned int pos;
>                      pos = ovsdb_datum_find_key(old_datum,
>                                                 &set_op_datum(set_op)->keys[0],
> -                                               key_type);
> +                                               key_type, NULL);
>                      if (pos == UINT_MAX) {
>                          /* No key to delete.  Move on to next update. */
>                          VLOG_WARN("Trying to delete a key that doesn't "
> @@ -4131,7 +4131,7 @@ ovsdb_idl_txn_write_partial_map(const struct ovsdb_idl_row *row_,
>      /* Find out if this is an insert or an update. */
>      key_type = column->type.key.type;
>      old_datum = ovsdb_idl_read(row, column);
> -    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type);
> +    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type, NULL);
>      op_type = pos == UINT_MAX ? MAP_OP_INSERT : MAP_OP_UPDATE;
>  
>      ovsdb_idl_txn_add_map_op(row, column, datum, op_type);
> diff --git a/ovsdb/mutation.c b/ovsdb/mutation.c
> index 56edc5f00..03d1c3499 100644
> --- a/ovsdb/mutation.c
> +++ b/ovsdb/mutation.c
> @@ -383,7 +383,7 @@ ovsdb_mutation_set_execute(struct ovsdb_row *row,
>              break;
>  
>          case OVSDB_M_INSERT:
> -            ovsdb_datum_union(dst, arg, dst_type, false);
> +            ovsdb_datum_union(dst, arg, dst_type);
>              error = ovsdb_mutation_check_count(dst, dst_type);
>              break;
>  
> diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
> index cb7c5cb76..c7a7e55e5 100644
> --- a/vswitchd/bridge.c
> +++ b/vswitchd/bridge.c
> @@ -4229,7 +4229,8 @@ bridge_configure_aa(struct bridge *br)
>          union ovsdb_atom atom;
>  
>          atom.integer = m->isid;
> -        if (ovsdb_datum_find_key(mc, &atom, OVSDB_TYPE_INTEGER) == UINT_MAX) {
> +        if (ovsdb_datum_find_key(mc, &atom,
> +                                 OVSDB_TYPE_INTEGER, NULL) == UINT_MAX) {
>              VLOG_INFO("Deleting isid=%"PRIu32", vlan=%"PRIu16,
>                        m->isid, m->vlan);
>              bridge_aa_mapping_destroy(m);
> @@ -4826,7 +4827,8 @@ queue_ids_include(const struct ovsdb_datum *queues, int64_t target)
>      union ovsdb_atom atom;
>  
>      atom.integer = target;
> -    return ovsdb_datum_find_key(queues, &atom, OVSDB_TYPE_INTEGER) != UINT_MAX;
> +    return ovsdb_datum_find_key(queues, &atom,
> +                                OVSDB_TYPE_INTEGER, NULL) != UINT_MAX;
>  }
>  
>  static void
> @@ -5020,7 +5022,8 @@ bridge_configure_mirrors(struct bridge *br)
>          union ovsdb_atom atom;
>  
>          atom.uuid = m->uuid;
> -        if (ovsdb_datum_find_key(mc, &atom, OVSDB_TYPE_UUID) == UINT_MAX) {
> +        if (ovsdb_datum_find_key(mc, &atom,
> +                                 OVSDB_TYPE_UUID, NULL) == UINT_MAX) {
>              mirror_destroy(m);
>          }
>      }
>
Ilya Maximets Sept. 22, 2021, 8:03 p.m. UTC | #2
On 9/21/21 12:34, Mark Gray wrote:
> On 17/09/2021 18:17, Ilya Maximets wrote:
>> Current algorithm of ovsdb_datum_union looks like this:
>>
>>   for-each atom in b:
>>       if not bin_search(a, atom):
>>           push(a, clone(atom))
>>   quicksort(a)
>>
>> So, the complexity looks like this:
>>
>>    Nb * log2(Na)   +    Nb     +   (Na + Nb) * log2(Na + Nb)
>>    Comparisons        clones       Comparisons for quicksort
>>    for search
>>
>> ovsdb_datum_union() is heavily used in database transactions while
>> new element is added to a set.  For example, if new logical switch
>> port is added to a logical switch in OVN.  This is a very common
>> use case where CMS adds one new port to an existing switch that
>> already has, let's say, 100 ports.  For this case ovsdb-server will
>> have to perform:
>>
>>    1 * log2(100)  + 1 clone + 101 * log2(101)
>>    Comparisons                Comparisons for
>>    for search                   quicksort.
>>        ~7           1            ~707
>>    Roughly 714 comparisons of atoms and 1 clone.
>>
>> Since binary search can give us position, where new atom should go
>> (it's the 'low' index after the search completion) for free, the
>> logic can be re-worked like this:
>>
>>   copied = 0
>>   for-each atom in b:
>>       desired_position = bin_search(a, atom)
>>       push(result, a[ copied : desired_position - 1 ])
>>       copied = desired_position
>>       push(result, clone(atom))
>>   push(result, a[ copied : Na ])
>>   swap(a, result)
>>
>> Complexity of this schema:
>>
>>    Nb * log2(Na)   +    Nb     +         Na
>>    Comparisons        clones       memory copy on push
>>    for search
>>
>> 'swap' is just a swap of a few pointers.  'push' is not a 'clone',
>> but a simple memory copy of 'union ovsdb_atom'.
>>
>> In general, this schema substitutes complexity of a quicksort
>> with complexity of a memory copy of Na atom structures, where we're
>> not even copying strings that these atoms are pointing to.
>>
>> Complexity in the example above goes down from 714 comparisons
>> to 7 comparisons and memcpy of 100 * sizeof (union ovsdb_atom) bytes.
>>
>> General complexity of a memory copy should always be lower than
>> complexity of a quicksort, especially because these copies usually
>> performed in bulk, so this new schema should work faster for any input.
>>
>> All in all, this change allows to execute several times more
>> transactions per second for transactions that adds new entries to sets.
>>
>> Alternatively, union can be implemented as a linear merge of two
>> sorted arrays, but this will result in O(Na) comparisons, which
>> is more than Nb * log2(Na) in common case, since Na is usually
>> far bigger than Nb.  Linear merge will also mean per-atom memory
>> copies instead of copying in bulk.
>>
>> 'replace' functionality of ovsdb_datum_union() had no users, so it
>> just removed.  But it can easily be added back if needed in the future.
>>
>> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
>> ---
>>  lib/db-ctl-base.c | 10 +++---
>>  lib/ovsdb-data.c  | 84 ++++++++++++++++++++++++++++++++---------------
>>  lib/ovsdb-data.h  |  6 ++--
>>  lib/ovsdb-idl.c   |  8 ++---
>>  ovsdb/mutation.c  |  2 +-
>>  vswitchd/bridge.c |  9 +++--
>>  6 files changed, 77 insertions(+), 42 deletions(-)
>>
>> diff --git a/lib/db-ctl-base.c b/lib/db-ctl-base.c
>> index 77cc76a9f..f69868702 100644
>> --- a/lib/db-ctl-base.c
>> +++ b/lib/db-ctl-base.c
>> @@ -321,7 +321,7 @@ get_row_by_id(struct ctl_context *ctx,
>>              const union ovsdb_atom key_atom
>>                  = { .string = CONST_CAST(char *, id->key) };
>>              unsigned int i = ovsdb_datum_find_key(datum, &key_atom,
>> -                                                  OVSDB_TYPE_STRING);
>> +                                                  OVSDB_TYPE_STRING, NULL);
>>              name = i == UINT_MAX ? NULL : &datum->values[i];
>>          }
>>          if (!name) {
>> @@ -820,7 +820,7 @@ check_condition(const struct ovsdb_idl_table_class *table,
>>          }
>>  
>>          idx = ovsdb_datum_find_key(have_datum,
>> -                                   &want_key, column->type.key.type);
>> +                                   &want_key, column->type.key.type, NULL);
>>          if (idx == UINT_MAX && !is_set_operator(operator)) {
>>              retval = false;
>>          } else {
>> @@ -993,7 +993,7 @@ cmd_get(struct ctl_context *ctx)
>>              }
>>  
>>              idx = ovsdb_datum_find_key(datum, &key,
>> -                                       column->type.key.type);
>> +                                       column->type.key.type, NULL);
>>              if (idx == UINT_MAX) {
>>                  if (must_exist) {
>>                      ctl_error(
>> @@ -1375,7 +1375,7 @@ set_column(const struct ovsdb_idl_table_class *table,
>>          ovsdb_atom_destroy(&value, column->type.value.type);
>>  
>>          ovsdb_datum_union(&datum, ovsdb_idl_read(row, column),
>> -                          &column->type, false);
>> +                          &column->type);
>>          ovsdb_idl_txn_verify(row, column);
>>          ovsdb_idl_txn_write(row, column, &datum);
>>      } else {
>> @@ -1514,7 +1514,7 @@ cmd_add(struct ctl_context *ctx)
>>              ovsdb_datum_destroy(&old, &column->type);
>>              return;
>>          }
>> -        ovsdb_datum_union(&old, &add, type, false);
>> +        ovsdb_datum_union(&old, &add, type);
>>          ovsdb_datum_destroy(&add, type);
>>      }
>>      if (old.n > type->n_max) {
>> diff --git a/lib/ovsdb-data.c b/lib/ovsdb-data.c
>> index c145f5ad9..11bf95fed 100644
>> --- a/lib/ovsdb-data.c
>> +++ b/lib/ovsdb-data.c
>> @@ -799,7 +799,8 @@ ovsdb_atom_check_constraints(const union ovsdb_atom *atom,
>>                               const struct ovsdb_base_type *base)
>>  {
>>      if (base->enum_
>> -        && ovsdb_datum_find_key(base->enum_, atom, base->type) == UINT_MAX) {
>> +        && ovsdb_datum_find_key(base->enum_, atom,
>> +                                base->type, NULL) == UINT_MAX) {
>>          struct ovsdb_error *error;
>>          struct ds actual = DS_EMPTY_INITIALIZER;
>>          struct ds valid = DS_EMPTY_INITIALIZER;
>> @@ -1787,11 +1788,13 @@ ovsdb_datum_compare_3way(const struct ovsdb_datum *a,
>>  /* If 'key' is one of the keys in 'datum', returns its index within 'datum',
>>   * otherwise UINT_MAX.  'key.type' must be the type of the atoms stored in the
>>   * 'keys' array in 'datum'.
>> + * If 'key' is not found, index where it should have been returned in '*pos'.
>>   */
>>  unsigned int
>>  ovsdb_datum_find_key(const struct ovsdb_datum *datum,
>>                       const union ovsdb_atom *key,
>> -                     enum ovsdb_atomic_type key_type)
>> +                     enum ovsdb_atomic_type key_type,
>> +                     unsigned int *pos)
> 
> bit of a nit but the interface seems a bit awkward now because both the
> return code and the parameter can return the position. How about
> something like the following. It returns 'true' or 'false' depending on
> whether or not the key is found and 'pos' will return the actual
> position for the 'true' case or the insertion position for the 'false'
> case. At least to me, this seems more natural:
> 
> bool
> ovsdb_datum_find_key(const struct ovsdb_datum *datum,
>                      const union ovsdb_atom *key,
>                      enum ovsdb_atomic_type key_type)
>                      enum ovsdb_atomic_type key_type,
>                      unsigned int *pos)

Yes.  That make sense.  I'll update the patch.

> 
> 
>>  {
>>      unsigned int low = 0;
>>      unsigned int high = datum->n;
>> @@ -1806,6 +1809,9 @@ ovsdb_datum_find_key(const struct ovsdb_datum *datum,
>>              return idx;
>>          }
>>      }
>> +    if (pos) {
>> +        *pos = low;
>> +    }
>>      return UINT_MAX;
>>  }
>>  
>> @@ -1821,7 +1827,7 @@ ovsdb_datum_find_key_value(const struct ovsdb_datum *datum,
>>                             const union ovsdb_atom *value,
>>                             enum ovsdb_atomic_type value_type)
>>  {
>> -    unsigned int idx = ovsdb_datum_find_key(datum, key, key_type);
>> +    unsigned int idx = ovsdb_datum_find_key(datum, key, key_type, NULL);
>>      if (idx != UINT_MAX
>>          && value_type != OVSDB_TYPE_VOID
>>          && !ovsdb_atom_equals(&datum->values[idx], value, value_type)) {
>> @@ -1948,38 +1954,64 @@ ovsdb_datum_add_unsafe(struct ovsdb_datum *datum,
>>      }
>>  }
>>  
>> +/* Adds 'n' atoms starting from index 'start_idx' from 'src' to the end of
>> + * 'dst'.  'dst' should have enough memory allocated to hold the additional
>> + * 'n' atoms.  Atoms are not cloned, i.e. 'dst' will reference the same data.
>> + * Caller also should take care of the result being sorted. */
>> +static void
>> +ovsdb_datum_push_unsafe(struct ovsdb_datum *dst,
>> +                        const struct ovsdb_datum *src,
>> +                        unsigned int start_idx, unsigned int n,
>> +                        const struct ovsdb_type *type)
>> +{
>> +    memcpy(&dst->keys[dst->n], &src->keys[start_idx], n * sizeof src->keys[0]);
>> +    if (type->value.type != OVSDB_TYPE_VOID) {
>> +        memcpy(&dst->values[dst->n], &src->values[start_idx],
>> +               n * sizeof src->values[0]);
>> +    }
>> +    dst->n += n;
>> +}
>> +
>>  void
>>  ovsdb_datum_union(struct ovsdb_datum *a, const struct ovsdb_datum *b,
>> -                  const struct ovsdb_type *type, bool replace)
>> +                  const struct ovsdb_type *type)
>>  {
>> -    unsigned int n;
>> -    size_t bi;
>> +    struct ovsdb_datum result;
>> +    unsigned int copied, pos;
>> +    size_t ai, bi;
>>  
>> -    n = a->n;
>> -    for (bi = 0; bi < b->n; bi++) {
>> -        unsigned int ai;
>> +    ovsdb_datum_init_empty(&result);
>> +    ovsdb_datum_reallocate(&result, type, a->n + b->n);
> 
> If there are duplicates in a and b, this will be bigger than the actual
> number of elements. I guess its not an issue (other than wasted space)?

Yes.  It's just a few extra allocated bytes in most cases.  Should not
be a big issue.

> 
>>  
>> -        ai = ovsdb_datum_find_key(a, &b->keys[bi], type->key.type);
>> -        if (ai == UINT_MAX) {
>> -            if (n == a->n) {
>> -                ovsdb_datum_reallocate(a, type, a->n + (b->n - bi));
>> -            }
>> -            ovsdb_atom_clone(&a->keys[n], &b->keys[bi], type->key.type);
>> -            if (type->value.type != OVSDB_TYPE_VOID) {
>> -                ovsdb_atom_clone(&a->values[n], &b->values[bi],
>> -                                 type->value.type);
>> -            }
>> -            n++;
>> -        } else if (replace && type->value.type != OVSDB_TYPE_VOID) {
>> -            ovsdb_atom_destroy(&a->values[ai], type->value.type);
>> -            ovsdb_atom_clone(&a->values[ai], &b->values[bi],
>> +    copied = 0;
>> +    for (bi = 0; bi < b->n; bi++) {
>> +        ai = ovsdb_datum_find_key(a, &b->keys[bi], type->key.type, &pos);
>> +        if (ai != UINT_MAX) {
>> +            /* Atom with the same key already exists. */
>> +            continue;
>> +        }
>> +        if (pos > copied) {
>> +            /* Need to copy some atoms from 'a' first. */
>> +            ovsdb_datum_push_unsafe(&result, a, copied, pos - copied, type);
>> +            copied = pos;
>> +        }
>> +        /* Inserting new atom from 'b'. */
>> +        ovsdb_atom_clone(&result.keys[result.n], &b->keys[bi], type->key.type);
>> +        if (type->value.type != OVSDB_TYPE_VOID) {
>> +            ovsdb_atom_clone(&result.values[result.n], &b->values[bi],
>>                               type->value.type);
>>          }
>> +        result.n++;
>>      }
>> -    if (n != a->n) {
>> -        a->n = n;
>> -        ovs_assert(!ovsdb_datum_sort(a, type->key.type));
>> +    if (a->n > copied) {
>> +        /* Copying remaining atoms. */
>> +        ovsdb_datum_push_unsafe(&result, a, copied, a->n - copied, type);
>>      }
>> +    /* All atoms are copied now. */
>> +    a->n = 0;
>> +
>> +    ovsdb_datum_swap(&result, a);
>> +    ovsdb_datum_destroy(&result, type);
>>  }
>>  
>>  void
>> diff --git a/lib/ovsdb-data.h b/lib/ovsdb-data.h
>> index c5a80ee39..a580cd2bd 100644
>> --- a/lib/ovsdb-data.h
>> +++ b/lib/ovsdb-data.h
>> @@ -211,7 +211,8 @@ bool ovsdb_datum_equals(const struct ovsdb_datum *,
>>  /* Search. */
>>  unsigned int ovsdb_datum_find_key(const struct ovsdb_datum *,
>>                                    const union ovsdb_atom *key,
>> -                                  enum ovsdb_atomic_type key_type);
>> +                                  enum ovsdb_atomic_type key_type,
>> +                                  unsigned int *pos);
>>  unsigned int ovsdb_datum_find_key_value(const struct ovsdb_datum *,
>>                                          const union ovsdb_atom *key,
>>                                          enum ovsdb_atomic_type key_type,
>> @@ -227,8 +228,7 @@ bool ovsdb_datum_excludes_all(const struct ovsdb_datum *,
>>                                const struct ovsdb_type *);
>>  void ovsdb_datum_union(struct ovsdb_datum *,
>>                         const struct ovsdb_datum *,
>> -                       const struct ovsdb_type *,
>> -                       bool replace);
>> +                       const struct ovsdb_type *);
>>  void ovsdb_datum_subtract(struct ovsdb_datum *a,
>>                            const struct ovsdb_type *a_type,
>>                            const struct ovsdb_datum *b,
>> diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
>> index d3caccec8..d9b0993f6 100644
>> --- a/lib/ovsdb-idl.c
>> +++ b/lib/ovsdb-idl.c
>> @@ -2842,7 +2842,7 @@ ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
>>                      new_datum = map_op_datum(map_op);
>>                      pos = ovsdb_datum_find_key(old_datum,
>>                                                 &new_datum->keys[0],
>> -                                               key_type);
>> +                                               key_type, NULL);
>>                      if (ovsdb_atom_equals(&new_datum->values[0],
>>                                            &old_datum->values[pos],
>>                                            value_type)) {
>> @@ -2854,7 +2854,7 @@ ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
>>                      unsigned int pos;
>>                      pos = ovsdb_datum_find_key(old_datum,
>>                                                 &map_op_datum(map_op)->keys[0],
>> -                                               key_type);
>> +                                               key_type, NULL);
>>                      if (pos == UINT_MAX) {
>>                          /* No key to delete.  Move on to next update. */
>>                          VLOG_WARN("Trying to delete a key that doesn't "
>> @@ -2953,7 +2953,7 @@ ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
>>                      unsigned int pos;
>>                      pos = ovsdb_datum_find_key(old_datum,
>>                                                 &set_op_datum(set_op)->keys[0],
>> -                                               key_type);
>> +                                               key_type, NULL);
>>                      if (pos == UINT_MAX) {
>>                          /* No key to delete.  Move on to next update. */
>>                          VLOG_WARN("Trying to delete a key that doesn't "
>> @@ -4131,7 +4131,7 @@ ovsdb_idl_txn_write_partial_map(const struct ovsdb_idl_row *row_,
>>      /* Find out if this is an insert or an update. */
>>      key_type = column->type.key.type;
>>      old_datum = ovsdb_idl_read(row, column);
>> -    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type);
>> +    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type, NULL);
>>      op_type = pos == UINT_MAX ? MAP_OP_INSERT : MAP_OP_UPDATE;
>>  
>>      ovsdb_idl_txn_add_map_op(row, column, datum, op_type);
>> diff --git a/ovsdb/mutation.c b/ovsdb/mutation.c
>> index 56edc5f00..03d1c3499 100644
>> --- a/ovsdb/mutation.c
>> +++ b/ovsdb/mutation.c
>> @@ -383,7 +383,7 @@ ovsdb_mutation_set_execute(struct ovsdb_row *row,
>>              break;
>>  
>>          case OVSDB_M_INSERT:
>> -            ovsdb_datum_union(dst, arg, dst_type, false);
>> +            ovsdb_datum_union(dst, arg, dst_type);
>>              error = ovsdb_mutation_check_count(dst, dst_type);
>>              break;
>>  
>> diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
>> index cb7c5cb76..c7a7e55e5 100644
>> --- a/vswitchd/bridge.c
>> +++ b/vswitchd/bridge.c
>> @@ -4229,7 +4229,8 @@ bridge_configure_aa(struct bridge *br)
>>          union ovsdb_atom atom;
>>  
>>          atom.integer = m->isid;
>> -        if (ovsdb_datum_find_key(mc, &atom, OVSDB_TYPE_INTEGER) == UINT_MAX) {
>> +        if (ovsdb_datum_find_key(mc, &atom,
>> +                                 OVSDB_TYPE_INTEGER, NULL) == UINT_MAX) {
>>              VLOG_INFO("Deleting isid=%"PRIu32", vlan=%"PRIu16,
>>                        m->isid, m->vlan);
>>              bridge_aa_mapping_destroy(m);
>> @@ -4826,7 +4827,8 @@ queue_ids_include(const struct ovsdb_datum *queues, int64_t target)
>>      union ovsdb_atom atom;
>>  
>>      atom.integer = target;
>> -    return ovsdb_datum_find_key(queues, &atom, OVSDB_TYPE_INTEGER) != UINT_MAX;
>> +    return ovsdb_datum_find_key(queues, &atom,
>> +                                OVSDB_TYPE_INTEGER, NULL) != UINT_MAX;
>>  }
>>  
>>  static void
>> @@ -5020,7 +5022,8 @@ bridge_configure_mirrors(struct bridge *br)
>>          union ovsdb_atom atom;
>>  
>>          atom.uuid = m->uuid;
>> -        if (ovsdb_datum_find_key(mc, &atom, OVSDB_TYPE_UUID) == UINT_MAX) {
>> +        if (ovsdb_datum_find_key(mc, &atom,
>> +                                 OVSDB_TYPE_UUID, NULL) == UINT_MAX) {
>>              mirror_destroy(m);
>>          }
>>      }
>>
>
diff mbox series

Patch

diff --git a/lib/db-ctl-base.c b/lib/db-ctl-base.c
index 77cc76a9f..f69868702 100644
--- a/lib/db-ctl-base.c
+++ b/lib/db-ctl-base.c
@@ -321,7 +321,7 @@  get_row_by_id(struct ctl_context *ctx,
             const union ovsdb_atom key_atom
                 = { .string = CONST_CAST(char *, id->key) };
             unsigned int i = ovsdb_datum_find_key(datum, &key_atom,
-                                                  OVSDB_TYPE_STRING);
+                                                  OVSDB_TYPE_STRING, NULL);
             name = i == UINT_MAX ? NULL : &datum->values[i];
         }
         if (!name) {
@@ -820,7 +820,7 @@  check_condition(const struct ovsdb_idl_table_class *table,
         }
 
         idx = ovsdb_datum_find_key(have_datum,
-                                   &want_key, column->type.key.type);
+                                   &want_key, column->type.key.type, NULL);
         if (idx == UINT_MAX && !is_set_operator(operator)) {
             retval = false;
         } else {
@@ -993,7 +993,7 @@  cmd_get(struct ctl_context *ctx)
             }
 
             idx = ovsdb_datum_find_key(datum, &key,
-                                       column->type.key.type);
+                                       column->type.key.type, NULL);
             if (idx == UINT_MAX) {
                 if (must_exist) {
                     ctl_error(
@@ -1375,7 +1375,7 @@  set_column(const struct ovsdb_idl_table_class *table,
         ovsdb_atom_destroy(&value, column->type.value.type);
 
         ovsdb_datum_union(&datum, ovsdb_idl_read(row, column),
-                          &column->type, false);
+                          &column->type);
         ovsdb_idl_txn_verify(row, column);
         ovsdb_idl_txn_write(row, column, &datum);
     } else {
@@ -1514,7 +1514,7 @@  cmd_add(struct ctl_context *ctx)
             ovsdb_datum_destroy(&old, &column->type);
             return;
         }
-        ovsdb_datum_union(&old, &add, type, false);
+        ovsdb_datum_union(&old, &add, type);
         ovsdb_datum_destroy(&add, type);
     }
     if (old.n > type->n_max) {
diff --git a/lib/ovsdb-data.c b/lib/ovsdb-data.c
index c145f5ad9..11bf95fed 100644
--- a/lib/ovsdb-data.c
+++ b/lib/ovsdb-data.c
@@ -799,7 +799,8 @@  ovsdb_atom_check_constraints(const union ovsdb_atom *atom,
                              const struct ovsdb_base_type *base)
 {
     if (base->enum_
-        && ovsdb_datum_find_key(base->enum_, atom, base->type) == UINT_MAX) {
+        && ovsdb_datum_find_key(base->enum_, atom,
+                                base->type, NULL) == UINT_MAX) {
         struct ovsdb_error *error;
         struct ds actual = DS_EMPTY_INITIALIZER;
         struct ds valid = DS_EMPTY_INITIALIZER;
@@ -1787,11 +1788,13 @@  ovsdb_datum_compare_3way(const struct ovsdb_datum *a,
 /* If 'key' is one of the keys in 'datum', returns its index within 'datum',
  * otherwise UINT_MAX.  'key.type' must be the type of the atoms stored in the
  * 'keys' array in 'datum'.
+ * If 'key' is not found, index where it should have been returned in '*pos'.
  */
 unsigned int
 ovsdb_datum_find_key(const struct ovsdb_datum *datum,
                      const union ovsdb_atom *key,
-                     enum ovsdb_atomic_type key_type)
+                     enum ovsdb_atomic_type key_type,
+                     unsigned int *pos)
 {
     unsigned int low = 0;
     unsigned int high = datum->n;
@@ -1806,6 +1809,9 @@  ovsdb_datum_find_key(const struct ovsdb_datum *datum,
             return idx;
         }
     }
+    if (pos) {
+        *pos = low;
+    }
     return UINT_MAX;
 }
 
@@ -1821,7 +1827,7 @@  ovsdb_datum_find_key_value(const struct ovsdb_datum *datum,
                            const union ovsdb_atom *value,
                            enum ovsdb_atomic_type value_type)
 {
-    unsigned int idx = ovsdb_datum_find_key(datum, key, key_type);
+    unsigned int idx = ovsdb_datum_find_key(datum, key, key_type, NULL);
     if (idx != UINT_MAX
         && value_type != OVSDB_TYPE_VOID
         && !ovsdb_atom_equals(&datum->values[idx], value, value_type)) {
@@ -1948,38 +1954,64 @@  ovsdb_datum_add_unsafe(struct ovsdb_datum *datum,
     }
 }
 
+/* Adds 'n' atoms starting from index 'start_idx' from 'src' to the end of
+ * 'dst'.  'dst' should have enough memory allocated to hold the additional
+ * 'n' atoms.  Atoms are not cloned, i.e. 'dst' will reference the same data.
+ * Caller also should take care of the result being sorted. */
+static void
+ovsdb_datum_push_unsafe(struct ovsdb_datum *dst,
+                        const struct ovsdb_datum *src,
+                        unsigned int start_idx, unsigned int n,
+                        const struct ovsdb_type *type)
+{
+    memcpy(&dst->keys[dst->n], &src->keys[start_idx], n * sizeof src->keys[0]);
+    if (type->value.type != OVSDB_TYPE_VOID) {
+        memcpy(&dst->values[dst->n], &src->values[start_idx],
+               n * sizeof src->values[0]);
+    }
+    dst->n += n;
+}
+
 void
 ovsdb_datum_union(struct ovsdb_datum *a, const struct ovsdb_datum *b,
-                  const struct ovsdb_type *type, bool replace)
+                  const struct ovsdb_type *type)
 {
-    unsigned int n;
-    size_t bi;
+    struct ovsdb_datum result;
+    unsigned int copied, pos;
+    size_t ai, bi;
 
-    n = a->n;
-    for (bi = 0; bi < b->n; bi++) {
-        unsigned int ai;
+    ovsdb_datum_init_empty(&result);
+    ovsdb_datum_reallocate(&result, type, a->n + b->n);
 
-        ai = ovsdb_datum_find_key(a, &b->keys[bi], type->key.type);
-        if (ai == UINT_MAX) {
-            if (n == a->n) {
-                ovsdb_datum_reallocate(a, type, a->n + (b->n - bi));
-            }
-            ovsdb_atom_clone(&a->keys[n], &b->keys[bi], type->key.type);
-            if (type->value.type != OVSDB_TYPE_VOID) {
-                ovsdb_atom_clone(&a->values[n], &b->values[bi],
-                                 type->value.type);
-            }
-            n++;
-        } else if (replace && type->value.type != OVSDB_TYPE_VOID) {
-            ovsdb_atom_destroy(&a->values[ai], type->value.type);
-            ovsdb_atom_clone(&a->values[ai], &b->values[bi],
+    copied = 0;
+    for (bi = 0; bi < b->n; bi++) {
+        ai = ovsdb_datum_find_key(a, &b->keys[bi], type->key.type, &pos);
+        if (ai != UINT_MAX) {
+            /* Atom with the same key already exists. */
+            continue;
+        }
+        if (pos > copied) {
+            /* Need to copy some atoms from 'a' first. */
+            ovsdb_datum_push_unsafe(&result, a, copied, pos - copied, type);
+            copied = pos;
+        }
+        /* Inserting new atom from 'b'. */
+        ovsdb_atom_clone(&result.keys[result.n], &b->keys[bi], type->key.type);
+        if (type->value.type != OVSDB_TYPE_VOID) {
+            ovsdb_atom_clone(&result.values[result.n], &b->values[bi],
                              type->value.type);
         }
+        result.n++;
     }
-    if (n != a->n) {
-        a->n = n;
-        ovs_assert(!ovsdb_datum_sort(a, type->key.type));
+    if (a->n > copied) {
+        /* Copying remaining atoms. */
+        ovsdb_datum_push_unsafe(&result, a, copied, a->n - copied, type);
     }
+    /* All atoms are copied now. */
+    a->n = 0;
+
+    ovsdb_datum_swap(&result, a);
+    ovsdb_datum_destroy(&result, type);
 }
 
 void
diff --git a/lib/ovsdb-data.h b/lib/ovsdb-data.h
index c5a80ee39..a580cd2bd 100644
--- a/lib/ovsdb-data.h
+++ b/lib/ovsdb-data.h
@@ -211,7 +211,8 @@  bool ovsdb_datum_equals(const struct ovsdb_datum *,
 /* Search. */
 unsigned int ovsdb_datum_find_key(const struct ovsdb_datum *,
                                   const union ovsdb_atom *key,
-                                  enum ovsdb_atomic_type key_type);
+                                  enum ovsdb_atomic_type key_type,
+                                  unsigned int *pos);
 unsigned int ovsdb_datum_find_key_value(const struct ovsdb_datum *,
                                         const union ovsdb_atom *key,
                                         enum ovsdb_atomic_type key_type,
@@ -227,8 +228,7 @@  bool ovsdb_datum_excludes_all(const struct ovsdb_datum *,
                               const struct ovsdb_type *);
 void ovsdb_datum_union(struct ovsdb_datum *,
                        const struct ovsdb_datum *,
-                       const struct ovsdb_type *,
-                       bool replace);
+                       const struct ovsdb_type *);
 void ovsdb_datum_subtract(struct ovsdb_datum *a,
                           const struct ovsdb_type *a_type,
                           const struct ovsdb_datum *b,
diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
index d3caccec8..d9b0993f6 100644
--- a/lib/ovsdb-idl.c
+++ b/lib/ovsdb-idl.c
@@ -2842,7 +2842,7 @@  ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
                     new_datum = map_op_datum(map_op);
                     pos = ovsdb_datum_find_key(old_datum,
                                                &new_datum->keys[0],
-                                               key_type);
+                                               key_type, NULL);
                     if (ovsdb_atom_equals(&new_datum->values[0],
                                           &old_datum->values[pos],
                                           value_type)) {
@@ -2854,7 +2854,7 @@  ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
                     unsigned int pos;
                     pos = ovsdb_datum_find_key(old_datum,
                                                &map_op_datum(map_op)->keys[0],
-                                               key_type);
+                                               key_type, NULL);
                     if (pos == UINT_MAX) {
                         /* No key to delete.  Move on to next update. */
                         VLOG_WARN("Trying to delete a key that doesn't "
@@ -2953,7 +2953,7 @@  ovsdb_idl_txn_extract_mutations(struct ovsdb_idl_row *row,
                     unsigned int pos;
                     pos = ovsdb_datum_find_key(old_datum,
                                                &set_op_datum(set_op)->keys[0],
-                                               key_type);
+                                               key_type, NULL);
                     if (pos == UINT_MAX) {
                         /* No key to delete.  Move on to next update. */
                         VLOG_WARN("Trying to delete a key that doesn't "
@@ -4131,7 +4131,7 @@  ovsdb_idl_txn_write_partial_map(const struct ovsdb_idl_row *row_,
     /* Find out if this is an insert or an update. */
     key_type = column->type.key.type;
     old_datum = ovsdb_idl_read(row, column);
-    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type);
+    pos = ovsdb_datum_find_key(old_datum, &datum->keys[0], key_type, NULL);
     op_type = pos == UINT_MAX ? MAP_OP_INSERT : MAP_OP_UPDATE;
 
     ovsdb_idl_txn_add_map_op(row, column, datum, op_type);
diff --git a/ovsdb/mutation.c b/ovsdb/mutation.c
index 56edc5f00..03d1c3499 100644
--- a/ovsdb/mutation.c
+++ b/ovsdb/mutation.c
@@ -383,7 +383,7 @@  ovsdb_mutation_set_execute(struct ovsdb_row *row,
             break;
 
         case OVSDB_M_INSERT:
-            ovsdb_datum_union(dst, arg, dst_type, false);
+            ovsdb_datum_union(dst, arg, dst_type);
             error = ovsdb_mutation_check_count(dst, dst_type);
             break;
 
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index cb7c5cb76..c7a7e55e5 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -4229,7 +4229,8 @@  bridge_configure_aa(struct bridge *br)
         union ovsdb_atom atom;
 
         atom.integer = m->isid;
-        if (ovsdb_datum_find_key(mc, &atom, OVSDB_TYPE_INTEGER) == UINT_MAX) {
+        if (ovsdb_datum_find_key(mc, &atom,
+                                 OVSDB_TYPE_INTEGER, NULL) == UINT_MAX) {
             VLOG_INFO("Deleting isid=%"PRIu32", vlan=%"PRIu16,
                       m->isid, m->vlan);
             bridge_aa_mapping_destroy(m);
@@ -4826,7 +4827,8 @@  queue_ids_include(const struct ovsdb_datum *queues, int64_t target)
     union ovsdb_atom atom;
 
     atom.integer = target;
-    return ovsdb_datum_find_key(queues, &atom, OVSDB_TYPE_INTEGER) != UINT_MAX;
+    return ovsdb_datum_find_key(queues, &atom,
+                                OVSDB_TYPE_INTEGER, NULL) != UINT_MAX;
 }
 
 static void
@@ -5020,7 +5022,8 @@  bridge_configure_mirrors(struct bridge *br)
         union ovsdb_atom atom;
 
         atom.uuid = m->uuid;
-        if (ovsdb_datum_find_key(mc, &atom, OVSDB_TYPE_UUID) == UINT_MAX) {
+        if (ovsdb_datum_find_key(mc, &atom,
+                                 OVSDB_TYPE_UUID, NULL) == UINT_MAX) {
             mirror_destroy(m);
         }
     }