diff mbox

[ovs-dev] ovsdb-data: Add support for integer ranges in database commands

Message ID CAKw8BiicyY1h4NpFSJ4WJVCy_cFTUNKcM-3pVjBuLPUt6SHtpg@mail.gmail.com
State Changes Requested
Headers show

Commit Message

Lukasz Rzasik Dec. 5, 2016, 8:47 p.m. UTC
Adding / removing a range of integers to a column accepting a set of
integers requires enumarating all of the integers. This patch simplifies
it by introducing 'range' concept to the database commands. Two integers
separated by a hyphen represent an inclusive range.

The patch adds positive and negative tests for the new syntax.
The patch was tested by 'make check'. Covarage was tested by
'make check-lcov'.

Signed-off-by: Lukasz Rzasik <lukasz.rzasik@gmail.com>
Suggested-by: <my_ovs_discuss@yahoo.com>
Suggested-by: Ben Pfaff <blp@ovn.org>
---
 lib/db-ctl-base.c   |  10 +--
 lib/db-ctl-base.man |   4 +-
 lib/ovsdb-data.c    | 184
++++++++++++++++++++++++++++++++++++++--------------
 lib/ovsdb-data.h    |  11 +++-
 lib/util.c          |  34 +++++++++-
 lib/util.h          |   2 +
 tests/ovs-vsctl.at  |   2 +-
 tests/ovsdb-data.at |  43 +++++++++++-
 tests/test-ovsdb.c  |  16 ++++-
 9 files changed, 240 insertions(+), 66 deletions(-)

 }

Comments

Ben Pfaff Dec. 13, 2016, 12:17 a.m. UTC | #1
On Mon, Dec 05, 2016 at 09:47:47PM +0100, Łukasz Rząsik wrote:
> Adding / removing a range of integers to a column accepting a set of
> integers requires enumarating all of the integers. This patch simplifies
> it by introducing 'range' concept to the database commands. Two integers
> separated by a hyphen represent an inclusive range.
> 
> The patch adds positive and negative tests for the new syntax.
> The patch was tested by 'make check'. Covarage was tested by
> 'make check-lcov'.
> 
> Signed-off-by: Lukasz Rzasik <lukasz.rzasik@gmail.com>
> Suggested-by: <my_ovs_discuss@yahoo.com>
> Suggested-by: Ben Pfaff <blp@ovn.org>

Thanks for contributing to Open vSwitch!

This makes it easy for a user to explode OVS memory requirements.  For
example, if I just write "1-1000000000", it will use about 16 GB of
memory in ovs-vsctl (maybe double or triple that!), transfer I guess
about the same amount of data over the wire, and then write the same
amount to disk at the database server.  Do you have an idea of how to
avoid that kind of problem?

Please use 4-space indents consistently.  I see some 2-space indents.

Please use /* */ comments consistently.  I see some // comments.

Thanks,

Ben.
Lukasz Rzasik Dec. 14, 2016, 9:39 p.m. UTC | #2
Hi Ben,

Thank you for looking at the patch.

I was not worrying about this problem because I assumed it was already
there. A user could provide a very long list of integers and this would
affect memory in similar way. There is a limit for how long the list could
get, kernel's limit on size of arguments, so in reality the list would not
get that long but this is outside of OVS control.

Of course with the change it's much easier for a user to specify something
that will explode OVS memory.

I would propose three possible solutions, starting from the easiest and
simplest:
1. Specify an arbitrary limit for size of a range. If a user specifies a
range bigger than the limit, OVS will abort the command and inform the user
about the limit. Actually most of the columns in the database already have
a limit. Quickly looking through the schema I found only one without the
limit: cfm_remote_mpids in Interface table.
2. Internally split very big ranges into smaller ones and commit each range
separately.
3. Create a new struct, e.g. ovsdb_atom_range. Instead of adding every atom
separately to datum, add the struct, send it to the database and handle it
on the database side.

What do you think about it? Maybe I'm missing something? Do you have a
solution in mind?

Thanks,

Lucas

2016-12-13 1:17 GMT+01:00 Ben Pfaff <blp@ovn.org>:

> On Mon, Dec 05, 2016 at 09:47:47PM +0100, Łukasz Rząsik wrote:
> > Adding / removing a range of integers to a column accepting a set of
> > integers requires enumarating all of the integers. This patch simplifies
> > it by introducing 'range' concept to the database commands. Two integers
> > separated by a hyphen represent an inclusive range.
> >
> > The patch adds positive and negative tests for the new syntax.
> > The patch was tested by 'make check'. Covarage was tested by
> > 'make check-lcov'.
> >
> > Signed-off-by: Lukasz Rzasik <lukasz.rzasik@gmail.com>
> > Suggested-by: <my_ovs_discuss@yahoo.com>
> > Suggested-by: Ben Pfaff <blp@ovn.org>
>
> Thanks for contributing to Open vSwitch!
>
> This makes it easy for a user to explode OVS memory requirements.  For
> example, if I just write "1-1000000000", it will use about 16 GB of
> memory in ovs-vsctl (maybe double or triple that!), transfer I guess
> about the same amount of data over the wire, and then write the same
> amount to disk at the database server.  Do you have an idea of how to
> avoid that kind of problem?
>
> Please use 4-space indents consistently.  I see some 2-space indents.
>
> Please use /* */ comments consistently.  I see some // comments.
>
> Thanks,
>
> Ben.
>
Ben Pfaff Dec. 21, 2016, 11:16 p.m. UTC | #3
On Wed, Dec 14, 2016 at 10:39:35PM +0100, Łukasz Rząsik wrote:
> I was not worrying about this problem because I assumed it was already
> there. A user could provide a very long list of integers and this would
> affect memory in similar way. There is a limit for how long the list could
> get, kernel's limit on size of arguments, so in reality the list would not
> get that long but this is outside of OVS control.
> 
> Of course with the change it's much easier for a user to specify something
> that will explode OVS memory.

That's the issue.  Of course a user could provide a long list before,
but it was obvious to him that he was doing so.  Now, it's trivial and
can be done without intent.

> I would propose three possible solutions, starting from the easiest and
> simplest:
> 1. Specify an arbitrary limit for size of a range. If a user specifies a
> range bigger than the limit, OVS will abort the command and inform the user
> about the limit. Actually most of the columns in the database already have
> a limit. Quickly looking through the schema I found only one without the
> limit: cfm_remote_mpids in Interface table.
> 2. Internally split very big ranges into smaller ones and commit each range
> separately.
> 3. Create a new struct, e.g. ovsdb_atom_range. Instead of adding every atom
> separately to datum, add the struct, send it to the database and handle it
> on the database side.
> 
> What do you think about it? Maybe I'm missing something? Do you have a
> solution in mind?

I think a simple way to start out would be to introduce an arbitrary
4096-element maximum.  That covers the common networking use case of a
set of VLANs.
diff mbox

Patch

diff --git a/lib/db-ctl-base.c b/lib/db-ctl-base.c
index 02eb328..e32e945 100644
--- a/lib/db-ctl-base.c
+++ b/lib/db-ctl-base.c
@@ -677,7 +677,7 @@  is_condition_satisfied(const struct ctl_table_class
*table,
                       column->name);
         }

-        die_if_error(ovsdb_atom_from_string(&want_key, &column->type.key,
+        die_if_error(ovsdb_atom_from_string(&want_key, NULL,
&column->type.key,
                                             key_string, symtab));

         type.key = type.value;
@@ -823,7 +823,7 @@  cmd_get(struct ctl_context *ctx)
                           column->name);
             }

-            die_if_error(ovsdb_atom_from_string(&key,
+            die_if_error(ovsdb_atom_from_string(&key, NULL,
                                                 &column->type.key,
                                                 key_string, ctx->symtab));

@@ -1118,13 +1118,13 @@  set_column(const struct ctl_table_class *table,
                       column->name);
         }

-        die_if_error(ovsdb_atom_from_string(&key, &column->type.key,
+        die_if_error(ovsdb_atom_from_string(&key, NULL, &column->type.key,
                                             key_string, symtab));
-        die_if_error(ovsdb_atom_from_string(&value, &column->type.value,
+        die_if_error(ovsdb_atom_from_string(&value, NULL,
&column->type.value,
                                             value_string, symtab));

         ovsdb_datum_init_empty(&datum);
-        ovsdb_datum_add_unsafe(&datum, &key, &value, &column->type);
+        ovsdb_datum_add_unsafe(&datum, &key, &value, &column->type, NULL);

         ovsdb_atom_destroy(&key, column->type.key.type);
         ovsdb_atom_destroy(&value, column->type.value.type);
diff --git a/lib/db-ctl-base.man b/lib/db-ctl-base.man
index 7b30501..74d406c 100644
--- a/lib/db-ctl-base.man
+++ b/lib/db-ctl-base.man
@@ -29,7 +29,9 @@  single comma.  When multiple values are present,
duplicates are not
 allowed, and order is not important.  Conversely, some database
 columns can have an empty set of values, represented as \fB[]\fR, and
 square brackets may optionally enclose other non-empty sets or single
-values as well.
+values as well. For a column accepting a set of integers, database commands
+accept a range. A range is represented by two integers separated by
+\fB-\fR. A range is inclusive.
 .PP
 A few database columns are ``maps'' of key-value pairs, where the key
 and the value are each some fixed database type.  These are specified
diff --git a/lib/ovsdb-data.c b/lib/ovsdb-data.c
index 0dda73a..f89f9ce 100644
--- a/lib/ovsdb-data.c
+++ b/lib/ovsdb-data.c
@@ -305,6 +305,25 @@  ovsdb_symbol_referenced(struct ovsdb_symbol *symbol,
     }
 }

+static union ovsdb_atom *
+alloc_default_atoms(enum ovsdb_atomic_type type, size_t n)
+{
+    if (type != OVSDB_TYPE_VOID && n) {
+        union ovsdb_atom *atoms;
+        unsigned int i;
+
+        atoms = xmalloc(n * sizeof *atoms);
+        for (i = 0; i < n; i++) {
+            ovsdb_atom_init_default(&atoms[i], type);
+        }
+        return atoms;
+    } else {
+        /* Avoid wasting memory in the n == 0 case, because xmalloc(0) is
+         * treated as xmalloc(1). */
+        return NULL;
+    }
+}
+
 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 ovsdb_atom_parse_uuid(struct uuid *uuid, const struct json *json,
                       struct ovsdb_symbol_table *symtab,
@@ -468,6 +487,7 @@  ovsdb_atom_to_json(const union ovsdb_atom *atom, enum
ovsdb_atomic_type type)

 static char *
 ovsdb_atom_from_string__(union ovsdb_atom *atom,
+                         union ovsdb_atom **range_end_atom,
                          const struct ovsdb_base_type *base, const char *s,
                          struct ovsdb_symbol_table *symtab)
 {
@@ -478,9 +498,20 @@  ovsdb_atom_from_string__(union ovsdb_atom *atom,
         OVS_NOT_REACHED();

     case OVSDB_TYPE_INTEGER: {
-        long long int integer;
-        if (!str_to_llong(s, 10, &integer)) {
-            return xasprintf("\"%s\" is not a valid integer", s);
+        long long int integer, end;
+        if (range_end_atom
+            && str_to_llong_range(s, 10, &integer, &end)) {
+            if (!ovsdb_atom_range_check(integer, end)) {
+                return xasprintf("\"%s\" is not a valid range. "
+                    "Range start has to be less than end.", s);
+            }
+            *range_end_atom = alloc_default_atoms(type, 1);
+            if (!(*range_end_atom)) {
+                return xasprintf("\"%s\" is not a valid range", s);
+            }
+            (*range_end_atom)->integer = end;
+        } else if (!str_to_llong(s, 10, &integer)) {
+            return xasprintf("\"%s\" is not a valid integer or range", s);
         }
         atom->integer = integer;
     }
@@ -549,10 +580,13 @@  ovsdb_atom_from_string__(union ovsdb_atom *atom,
     return NULL;
 }

-/* Initializes 'atom' to a value of type 'base' parsed from 's', which
takes
- * one of the following forms:
+/* Initializes 'atom' and optionally 'range_end_atom' to a value of type
'base'
+ * parsed from 's', which takes one of the following forms:
  *
- *      - OVSDB_TYPE_INTEGER: A decimal integer optionally preceded by a
sign.
+ *      - OVSDB_TYPE_INTEGER: A decimal integer optionally preceded by a
sign
+ *        or two decimal integers optionally preceded by a sign and
separated
+ *        by a hyphen, representing inclusive range of integers
+ *        ['atom', 'range_end_atom'].
  *
  *      - OVSDB_TYPE_REAL: A floating-point number in the format accepted
by
  *        strtod().
@@ -574,23 +608,59 @@  ovsdb_atom_from_string__(union ovsdb_atom *atom,
  * Returns a null pointer if successful, otherwise an error message
describing
  * the problem.  On failure, the contents of 'atom' are indeterminate.  The
  * caller is responsible for freeing the atom or the error.
+ *
+ * Does not attempt to parse range if 'range_end_atom' is a null pointer.
+ * Dynamically allocates ovdsb_atom and stores its address in
'*range_end_atom'
+ * if successfully parses range. Caller is responsible for deallocating
+ * the memory by calling 'ovsdb_atom_destroy' and then 'free' on the
address.
+ * Does not allocate memory and sets '*range_end_atom' to a null pointer
+ * if does not parse a range or fails for any reason.
  */
 char *
 ovsdb_atom_from_string(union ovsdb_atom *atom,
+                       union ovsdb_atom **range_end_atom,
                        const struct ovsdb_base_type *base, const char *s,
                        struct ovsdb_symbol_table *symtab)
 {
     struct ovsdb_error *error;
     char *msg;

-    msg = ovsdb_atom_from_string__(atom, base, s, symtab);
+    if (range_end_atom) {
+        *range_end_atom = NULL;
+    }
+
+    msg = ovsdb_atom_from_string__(atom, range_end_atom, base, s, symtab);
     if (msg) {
         return msg;
     }

     error = ovsdb_atom_check_constraints(atom, base);
+
+    if (!error && range_end_atom && *range_end_atom) {
+      // Check range constraints
+      if (base->enum_) {
+          // If enum, every atom in range has to be checked
+          union ovsdb_atom ai = *atom;
+          ++ai.integer;
+          while (!error) {
+              error = ovsdb_atom_check_constraints(&ai, base);
+              if (ai.integer == (*range_end_atom)->integer) {
+                  break;
+              }
+              ++ai.integer;
+          }
+      } else {
+          error = ovsdb_atom_check_constraints(*range_end_atom, base);
+      }
+    }
+
     if (error) {
         ovsdb_atom_destroy(atom, base->type);
+        if (range_end_atom && *range_end_atom) {
+            ovsdb_atom_destroy(*range_end_atom, base->type);
+            free(*range_end_atom);
+            *range_end_atom = NULL;
+        }
         msg = ovsdb_error_to_string(error);
         ovsdb_error_destroy(error);
     }
@@ -811,25 +881,6 @@  ovsdb_atom_check_constraints(const union ovsdb_atom
*atom,
     }
 }

-static union ovsdb_atom *
-alloc_default_atoms(enum ovsdb_atomic_type type, size_t n)
-{
-    if (type != OVSDB_TYPE_VOID && n) {
-        union ovsdb_atom *atoms;
-        unsigned int i;
-
-        atoms = xmalloc(n * sizeof *atoms);
-        for (i = 0; i < n; i++) {
-            ovsdb_atom_init_default(&atoms[i], type);
-        }
-        return atoms;
-    } else {
-        /* Avoid wasting memory in the n == 0 case, because xmalloc(0) is
-         * treated as xmalloc(1). */
-        return NULL;
-    }
-}
-
 /* Initializes 'datum' as an empty datum.  (An empty datum can be treated
as
  * any type.) */
 void
@@ -1336,13 +1387,15 @@  skip_spaces(const char *p)

 static char *
 parse_atom_token(const char **s, const struct ovsdb_base_type *base,
-                 union ovsdb_atom *atom, struct ovsdb_symbol_table *symtab)
+                 union ovsdb_atom *atom, union ovsdb_atom **range_end_atom,
+                 struct ovsdb_symbol_table *symtab)
 {
     char *token, *error;

     error = ovsdb_token_parse(s, &token);
     if (!error) {
-        error = ovsdb_atom_from_string(atom, base, token, symtab);
+        error = ovsdb_atom_from_string(atom, range_end_atom,
+                                       base, token, symtab);
         free(token);
     }
     return error;
@@ -1351,37 +1404,50 @@  parse_atom_token(const char **s, const struct
ovsdb_base_type *base,
 static char *
 parse_key_value(const char **s, const struct ovsdb_type *type,
                 union ovsdb_atom *key, union ovsdb_atom *value,
-                struct ovsdb_symbol_table *symtab)
+                struct ovsdb_symbol_table *symtab,
+                union ovsdb_atom **range_end_key)
 {
     const char *start = *s;
     char *error;

-    error = parse_atom_token(s, &type->key, key, symtab);
+    error = parse_atom_token(s, &type->key, key, range_end_key, symtab);
+
     if (!error && type->value.type != OVSDB_TYPE_VOID) {
         *s = skip_spaces(*s);
         if (**s == '=') {
             (*s)++;
             *s = skip_spaces(*s);
-            error = parse_atom_token(s, &type->value, value, symtab);
+            error = parse_atom_token(s, &type->value, value, NULL, symtab);
         } else {
             error = xasprintf("%s: syntax error at \"%c\" expecting \"=\"",
                               start, **s);
         }
         if (error) {
             ovsdb_atom_destroy(key, type->key.type);
+            if (range_end_key && *range_end_key) {
+                ovsdb_atom_destroy(*range_end_key, type->key.type);
+                free(*range_end_key);
+                *range_end_key = NULL;
+            }
         }
     }
     return error;
 }

 static void
-free_key_value(const struct ovsdb_type *type,
-               union ovsdb_atom *key, union ovsdb_atom *value)
+free_key_value_range(const struct ovsdb_type *type,
+                     union ovsdb_atom *key, union ovsdb_atom *value,
+                     union ovsdb_atom **range_end_atom)
 {
     ovsdb_atom_destroy(key, type->key.type);
     if (type->value.type != OVSDB_TYPE_VOID) {
         ovsdb_atom_destroy(value, type->value.type);
     }
+    if (range_end_atom && *range_end_atom) {
+        ovsdb_atom_destroy(*range_end_atom, type->key.type);
+        free(*range_end_atom);
+        *range_end_atom = NULL;
+    }
 }

 /* Initializes 'datum' as a datum of the given 'type', parsing its contents
@@ -1423,6 +1489,7 @@  ovsdb_datum_from_string(struct ovsdb_datum *datum,

     while (*p && *p != end_delim) {
         union ovsdb_atom key, value;
+        union ovsdb_atom *range_end_key = NULL;

         if (ovsdb_token_is_delim(*p)) {
             char *type_str = ovsdb_type_to_english(type);
@@ -1433,12 +1500,13 @@  ovsdb_datum_from_string(struct ovsdb_datum *datum,
         }

         /* Add to datum. */
-        error = parse_key_value(&p, type, &key, &value, symtab);
+        error = parse_key_value(&p, type, &key, &value,
+                                symtab, &range_end_key);
         if (error) {
             goto error;
         }
-        ovsdb_datum_add_unsafe(datum, &key, &value, type);
-        free_key_value(type, &key, &value);
+        ovsdb_datum_add_unsafe(datum, &key, &value, type, range_end_key);
+        free_key_value_range(type, &key, &value, &range_end_key);

         /* Skip optional white space and comma. */
         p = skip_spaces(p);
@@ -1760,11 +1828,16 @@  ovsdb_datum_remove_unsafe(struct ovsdb_datum
*datum, size_t idx,
 }

 /* Adds the element with the given 'key' and 'value' to 'datum', which must
- * have the specified 'type'.
+ * have the specified 'type'. Optionally if 'range_end_atom' is not
+ * a null pointer, adds a set of integers to 'datum' from inclusive
+ * range ['key', 'range_end_atom'].
  *
  * This function always allocates memory, so it is not an efficient way to
add
  * a number of elements to a datum.
  *
+ * When adding a range of integers, this function allocates the memory once
+ * for the whole range.
+ *
  * This function does not maintain ovsdb_datum invariants.  Use
  * ovsdb_datum_sort() to check and restore these invariants.  (But a datum
with
  * 0 or 1 elements cannot violate the invariants anyhow.) */
@@ -1772,15 +1845,26 @@  void
 ovsdb_datum_add_unsafe(struct ovsdb_datum *datum,
                        const union ovsdb_atom *key,
                        const union ovsdb_atom *value,
-                       const struct ovsdb_type *type)
+                       const struct ovsdb_type *type,
+                       const union ovsdb_atom *range_end_atom)
 {
-    size_t idx = datum->n++;
+    size_t idx = datum->n;
+    datum->n += range_end_atom ?
+                (range_end_atom->integer - key->integer + 1) : 1;
     datum->keys = xrealloc(datum->keys, datum->n * sizeof *datum->keys);
-    ovsdb_atom_clone(&datum->keys[idx], key, type->key.type);
-    if (type->value.type != OVSDB_TYPE_VOID) {
-        datum->values = xrealloc(datum->values,
-                                 datum->n * sizeof *datum->values);
-        ovsdb_atom_clone(&datum->values[idx], value, type->value.type);
+    if (range_end_atom
+        && ovsdb_atom_range_check(key->integer, range_end_atom->integer)) {
+        union ovsdb_atom ai;
+        for (ai = *key; ai.integer <= range_end_atom->integer;
++ai.integer) {
+            ovsdb_atom_clone(&datum->keys[idx++], &ai, type->key.type);
+        }
+    } else {
+        ovsdb_atom_clone(&datum->keys[idx], key, type->key.type);
+        if (type->value.type != OVSDB_TYPE_VOID) {
+            datum->values = xrealloc(datum->values,
+                                     datum->n * sizeof *datum->values);
+            ovsdb_atom_clone(&datum->values[idx], value, type->value.type);
+        }
     }
 }

@@ -1937,29 +2021,31 @@  ovsdb_datum_diff(struct ovsdb_datum *diff,
                                         type->key.type);
         if (c < 0) {
             ovsdb_datum_add_unsafe(diff, &old->keys[oi], &old->values[oi],
-                                   type);
+                                   type, NULL);
             oi++;
         } else if (c > 0) {
             ovsdb_datum_add_unsafe(diff, &new->keys[ni], &new->values[ni],
-                                   type);
+                                   type, NULL);
             ni++;
         } else {
             if (type->value.type != OVSDB_TYPE_VOID &&
                 ovsdb_atom_compare_3way(&old->values[oi], &new->values[ni],
                                         type->value.type)) {
                 ovsdb_datum_add_unsafe(diff, &new->keys[ni],
&new->values[ni],
-                                       type);
+                                       type, NULL);
             }
             oi++; ni++;
         }
     }

     for (; oi < old->n; oi++) {
-        ovsdb_datum_add_unsafe(diff, &old->keys[oi], &old->values[oi],
type);
+        ovsdb_datum_add_unsafe(diff, &old->keys[oi], &old->values[oi],
+                               type, NULL);
     }

     for (; ni < new->n; ni++) {
-        ovsdb_datum_add_unsafe(diff, &new->keys[ni], &new->values[ni],
type);
+        ovsdb_datum_add_unsafe(diff, &new->keys[ni], &new->values[ni],
+                               type, NULL);
     }
 }

diff --git a/lib/ovsdb-data.h b/lib/ovsdb-data.h
index ae2672e..28fd1f8 100644
--- a/lib/ovsdb-data.h
+++ b/lib/ovsdb-data.h
@@ -89,7 +89,7 @@  struct ovsdb_error *ovsdb_atom_from_json(union ovsdb_atom
*,
 struct json *ovsdb_atom_to_json(const union ovsdb_atom *,
                                 enum ovsdb_atomic_type);

-char *ovsdb_atom_from_string(union ovsdb_atom *,
+char *ovsdb_atom_from_string(union ovsdb_atom *, union ovsdb_atom **,
                              const struct ovsdb_base_type *, const char *,
                              struct ovsdb_symbol_table *)
     OVS_WARN_UNUSED_RESULT;
@@ -234,7 +234,8 @@  void ovsdb_datum_remove_unsafe(struct ovsdb_datum *,
size_t idx,
 void ovsdb_datum_add_unsafe(struct ovsdb_datum *,
                             const union ovsdb_atom *key,
                             const union ovsdb_atom *value,
-                            const struct ovsdb_type *);
+                            const struct ovsdb_type *,
+                            const union ovsdb_atom *range_end_atom);

 /* Type checking. */
 static inline bool
@@ -275,4 +276,10 @@  struct ovsdb_symbol *ovsdb_symbol_table_insert(struct
ovsdb_symbol_table *,
 char *ovsdb_token_parse(const char **, char **outp) OVS_WARN_UNUSED_RESULT;
 bool ovsdb_token_is_delim(unsigned char);

+static inline bool
+ovsdb_atom_range_check(long long begin, long long end)
+{
+    return begin < end ? true : false;
+}
+
 #endif /* ovsdb-data.h */
diff --git a/lib/util.c b/lib/util.c
index 4208aa8..4d10d70 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -640,11 +640,22 @@  str_to_long(const char *s, int base, long *li)
 bool
 str_to_llong(const char *s, int base, long long *x)
 {
-    int save_errno = errno;
     char *tail;
+    bool ok = str_to_llong_with_tail(s, &tail, base, x);
+    if (*tail != '\0') {
+        *x = 0;
+        return false;
+    }
+    return ok;
+}
+
+bool
+str_to_llong_with_tail(const char *s, char **tail, int base, long long *x)
+{
+    int save_errno = errno;
     errno = 0;
-    *x = strtoll(s, &tail, base);
-    if (errno == EINVAL || errno == ERANGE || tail == s || *tail != '\0') {
+    *x = strtoll(s, tail, base);
+    if (errno == EINVAL || errno == ERANGE || *tail == s) {
         errno = save_errno;
         *x = 0;
         return false;
@@ -668,6 +679,23 @@  str_to_uint(const char *s, int base, unsigned int *u)
     }
 }

+bool str_to_llong_range(const char *s, int base, long long *begin,
+                        long long *end)
+{
+    char *tail;
+    if (!str_to_llong_with_tail(s, &tail, base, begin) || *tail != '-') {
+        *begin = 0;
+        *end = 0;
+        return false;
+    }
+    if (!str_to_llong(++tail, base, end)) {
+        *begin = 0;
+        *end = 0;
+        return false;
+    }
+    return true;
+}
+
 /* Converts floating-point string 's' into a double.  If successful, stores
  * the double in '*d' and returns true; on failure, stores 0 in '*d' and
  * returns false.
diff --git a/lib/util.h b/lib/util.h
index 5ad25cb..aa38122 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -153,7 +153,9 @@  void ovs_hex_dump(FILE *, const void *, size_t,
uintptr_t offset, bool ascii);
 bool str_to_int(const char *, int base, int *);
 bool str_to_long(const char *, int base, long *);
 bool str_to_llong(const char *, int base, long long *);
+bool str_to_llong_with_tail(const char *, char **, int base, long long *);
 bool str_to_uint(const char *, int base, unsigned int *);
+bool str_to_llong_range(const char *, int base, long long *, long long *);

 bool ovs_scan(const char *s, const char *format, ...) OVS_SCANF_FORMAT(2,
3);
 bool ovs_scan_len(const char *s, int *n, const char *format, ...);
diff --git a/tests/ovs-vsctl.at b/tests/ovs-vsctl.at
index 9737589..1db8d7d 100644
--- a/tests/ovs-vsctl.at
+++ b/tests/ovs-vsctl.at
@@ -860,7 +860,7 @@  AT_CHECK([RUN_OVS_VSCTL([remove bridge br0 name br1])],
   [1], [], [ovs-vsctl: cannot modify read-only column name in table Bridge
 ], [OVS_VSCTL_CLEANUP])
 AT_CHECK([RUN_OVS_VSCTL([remove bridge br1 flood-vlans true])],
-  [1], [], [ovs-vsctl: "true" is not a valid integer
+  [1], [], [ovs-vsctl: "true" is not a valid integer or range
 ], [OVS_VSCTL_CLEANUP])
 AT_CHECK([RUN_OVS_VSCTL([clear bridge br1 name])],
   [1], [], [ovs-vsctl: cannot modify read-only column name in table Bridge
diff --git a/tests/ovsdb-data.at b/tests/ovsdb-data.at
index a8c092f..df80359 100644
--- a/tests/ovsdb-data.at
+++ b/tests/ovsdb-data.at
@@ -92,12 +92,20 @@  OVSDB_CHECK_POSITIVE([integer atom from string],
     '-1' \
     '+1000' \
     '9223372036854775807' \
-    '-9223372036854775808' ]],
+    '-9223372036854775808' \
+    '0-1000' \
+    '-1000-+1000' \
+    '-1000--10' \
+    '+10-+1000']],
   [0
 -1
 1000
 9223372036854775807
--9223372036854775808])
+-9223372036854775808
+0-1000
+-1000-1000
+-1000--10
+10-1000])

 OVSDB_CHECK_POSITIVE_CPY([real atom from JSON],
   [[parse-atoms '["real"]' \
@@ -258,6 +266,10 @@  OVSDB_CHECK_NEGATIVE([real not acceptable integer
string atom],
   [[parse-atom-strings '["integer"]' '0.5' ]],
   ["0.5" is not a valid integer])

+OVSDB_CHECK_NEGATIVE([inverted range is not acceptable integer string
atom],
+  [[parse-atom-strings '["integer"]' '10--10' ]],
+  ["10--10" is not a valid range. Range start has to be less than end.])
+
 OVSDB_CHECK_POSITIVE_CPY([string "true" not acceptable boolean JSON atom],
   [[parse-atoms '["boolean"]' '["true"]' ]],
   [syntax ""true"": syntax error: expected boolean])
@@ -323,6 +335,27 @@  constraint violation: 9 is not one of the allowed
values ([1, 6, 8, 10])
 10
 constraint violation: 11 is not one of the allowed values ([1, 6, 8,
10])]])

+OVSDB_CHECK_POSITIVE([integer atom enum from string],
+  [[parse-atom-strings '[{"type": "integer", "enum": ["set", [1, 6, 8, 10,
20, 21, 22, 23, 24, 25]]}]' \
+    '1' \
+    '6' \
+    '8' \
+    '10' \
+    '20-25']],
+  [[1
+6
+8
+10
+20-25]])
+
+OVSDB_CHECK_NEGATIVE([integer not in enum set from string],
+  [[parse-atom-strings '[{"type": "integer", "enum": ["set", [1, 6, 8,
10]]}]' '0' ]],
+  [[constraint violation: 0 is not one of the allowed values ([1, 6, 8,
10])]])
+
+OVSDB_CHECK_NEGATIVE([integer range not in enum set from string],
+  [[parse-atom-strings '[{"type": "integer", "enum": ["set", [1, 6, 8,
10]]}]' '8-10' ]],
+  [[constraint violation: 9 is not one of the allowed values ([1, 6, 8,
10])]])
+
 OVSDB_CHECK_POSITIVE_CPY([real atom enum],
   [[parse-atoms '[{"type": "real", "enum": ["set", [-1.5, 1.5]]}]' \
     '[-2]' \
@@ -590,12 +623,16 @@  OVSDB_CHECK_POSITIVE([string set of 0 or more
integers],
     '0, 1, 2' \
     '[0, 1,2, 3, 4, 5]' \
     '0, 1,2, 3,4, 5, 6, 7, 8' \
-    '[0, 1, 2, 3, 4,5, 6,7, 8, 9, 10]']],
+    '[0, 1, 2, 3, 4,5, 6,7, 8, 9, 10]' \
+    '0-8' \
+    '[0-10']]],
   [[[0]
 [0, 1]
 [0, 1, 2]
 [0, 1, 2, 3, 4, 5]
 [0, 1, 2, 3, 4, 5, 6, 7, 8]
+[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+[0, 1, 2, 3, 4, 5, 6, 7, 8]
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]])

 OVSDB_CHECK_POSITIVE_CPY([JSON set of 1 to 3 uuids],
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index 6a7d467..cb332b7 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -554,17 +554,29 @@  do_parse_atom_strings(struct ovs_cmdl_context *ctx)
     json_destroy(json);

     for (i = 2; i < ctx->argc; i++) {
-        union ovsdb_atom atom;
+        union ovsdb_atom atom, *range_end_atom = NULL;
         struct ds out;

-        die_if_error(ovsdb_atom_from_string(&atom, &base, ctx->argv[i],
NULL));
+        die_if_error(ovsdb_atom_from_string(&atom, &range_end_atom, &base,
+                                            ctx->argv[i], NULL));

         ds_init(&out);
         ovsdb_atom_to_string(&atom, base.type, &out);
+        if (range_end_atom) {
+            struct ds range_end_ds;
+            ds_init(&range_end_ds);
+            ovsdb_atom_to_string(range_end_atom, base.type, &range_end_ds);
+            ds_put_char(&out, '-');
+            ds_put_cstr(&out, ds_cstr(&range_end_ds));;
+            ds_destroy(&range_end_ds);
+        }
         puts(ds_cstr(&out));
         ds_destroy(&out);

         ovsdb_atom_destroy(&atom, base.type);
+        if (range_end_atom) {
+            ovsdb_atom_destroy(range_end_atom, base.type);
+        }
     }
     ovsdb_base_type_destroy(&base);