[ovs-dev,RFC,v3,4/6] ovsdb: add support for role-based access controls

Submitted by Lance Richardson on April 19, 2017, 7:07 p.m.

Details

Message ID 20170419190700.4278-1-lrichard@redhat.com
State Superseded
Headers show

Commit Message

Lance Richardson April 19, 2017, 7:07 p.m.
Add suport for ovsdb RBAC (role-based access control). This includes:

   - Support for "RBAC_Role" table, db containing a table
     by this name will enable role-based access controls and
     use this table for RBAC role configuration.

     This table has one row per role, with each row having a
     "name" column (role name) and a "permissions" column (map of
     table name to UUID of row in separate permission table.) The
     permission table has one row per access control configuration,
     with columns:
          "name" - name of table to which this row applies
          "authorization" - set of column names and column:key pairs
                            to be compared against client ID to
                            determine authorization status
          "insert_delete" - boolean, true if insertions and
                            authorized deletions are allowed.
          "update"        - Set of columns and column:key pairs for
                            which authorized updates are allowed.
   - Support for a new "role" column in the remote configuration
     table.
   - Logic for applying the RBAC role and permission tables, in
     combination with session role and client id, to determine
     whether operations modifying database contents should be
     permitted.

Signed-off-by: Lance Richardson <lrichard@redhat.com>
---
v2:
 - Added ovsdb_perm_error() to format permission error strings.
 - Re-implemented RBAC enforcing code to apply RBAC rules
   to entire delete/update/mutate operation before applying
   any changes to avoid partial updates.
 - Implemented basic unit tests for RBAC enforcement.

v3: Eliminated "--rbac=" command-line option to ovsdb-server. Now
    the table named "RBAC_Role" (if present) will be used as the RBAC
    role table.

 lib/jsonrpc.c          |  10 ++
 lib/jsonrpc.h          |   2 +
 lib/ovsdb-error.c      |  13 ++
 lib/ovsdb-error.h      |   4 +
 ovsdb/automake.mk      |   2 +
 ovsdb/execution.c      |  43 ++++-
 ovsdb/jsonrpc-server.c |   6 +-
 ovsdb/jsonrpc-server.h |   1 +
 ovsdb/ovsdb-server.c   |   7 +-
 ovsdb/ovsdb-tool.c     |   2 +-
 ovsdb/ovsdb-util.c     |  38 +++++
 ovsdb/ovsdb-util.h     |   4 +
 ovsdb/ovsdb.c          |   3 +
 ovsdb/ovsdb.h          |   3 +
 ovsdb/rbac.c           | 443 +++++++++++++++++++++++++++++++++++++++++++++++++
 ovsdb/rbac.h           |  38 +++++
 ovsdb/trigger.c        |   8 +-
 ovsdb/trigger.h        |   5 +-
 tests/automake.mk      |   1 +
 tests/ovsdb-rbac.at    | 252 ++++++++++++++++++++++++++++
 tests/ovsdb.at         |   1 +
 tests/test-ovsdb.c     |   5 +-
 22 files changed, 881 insertions(+), 10 deletions(-)
 create mode 100644 ovsdb/rbac.c
 create mode 100644 ovsdb/rbac.h
 create mode 100644 tests/ovsdb-rbac.at

Comments

Lance Richardson April 19, 2017, 9:38 p.m.
----- Original Message -----
> From: "Lance Richardson" <lrichard@redhat.com>
>  
> +struct ovsdb_error *ovsdb_perm_error(const char *details, ...)
> +{
> +    struct ovsdb_error *error;
> +    va_list args;
> +
> +    va_start(args, details);
> +    error = ovsdb_error_valist("permission error", details, args);
> +    va_end(args);
> +
> +    return error;
> +}
> +
> +

I had been assuming that these errors would be logged by the client, but
this doesn't seem to be the case (other than in ovsdb-client, anyway).

At the moment, there are no logs produced when a transaction by ovn-controller
is denied due to RBAC checks. I'm wondering if this is something that should
be done by the idl/jsonrpc infrastructure or informational logs in ovsdb-server.
Ideally (I think) the client should be logging these, but it's not clear whether
there's any way currently for e.g. the ovn-controller idl loop to report
transaction errors.

Any suggestions for how to handle reporting RBAC failures or pointers to
existing examples where transaction failures are reported would be appreciated.

Thanks,

    Lance
Ben Pfaff April 24, 2017, 6:02 p.m.
On Wed, Apr 19, 2017 at 05:38:50PM -0400, Lance Richardson wrote:
> ----- Original Message -----
> > From: "Lance Richardson" <lrichard@redhat.com>
> >  
> > +struct ovsdb_error *ovsdb_perm_error(const char *details, ...)
> > +{
> > +    struct ovsdb_error *error;
> > +    va_list args;
> > +
> > +    va_start(args, details);
> > +    error = ovsdb_error_valist("permission error", details, args);
> > +    va_end(args);
> > +
> > +    return error;
> > +}
> > +
> > +
> 
> I had been assuming that these errors would be logged by the client, but
> this doesn't seem to be the case (other than in ovsdb-client, anyway).
> 
> At the moment, there are no logs produced when a transaction by ovn-controller
> is denied due to RBAC checks. I'm wondering if this is something that should
> be done by the idl/jsonrpc infrastructure or informational logs in ovsdb-server.
> Ideally (I think) the client should be logging these, but it's not clear whether
> there's any way currently for e.g. the ovn-controller idl loop to report
> transaction errors.
> 
> Any suggestions for how to handle reporting RBAC failures or pointers to
> existing examples where transaction failures are reported would be appreciated.

The ovsdb-idl client should probably log JSON-RPC errors in response to
transactions.  Until now, they've been pretty rare.  But, I would have
it log errors that it thinks it understands only at DBG level, if at
all.  I think that other errors, that it doesn't think it understands,
are already passed to ovsdb_idl_txn_set_error_json(), so probably that
would be a good point at which to log them.

Patch hide | download patch | download mbox

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index a0ade9c..2fae057 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -1005,6 +1005,16 @@  jsonrpc_session_get_name(const struct jsonrpc_session *s)
     return reconnect_get_name(s->reconnect);
 }
 
+const char *
+jsonrpc_session_get_id(const struct jsonrpc_session *s)
+{
+    if (s->rpc && s->rpc->stream) {
+        return stream_get_peer_id(s->rpc->stream);
+    } else {
+        return NULL;
+    }
+}
+
 /* Always takes ownership of 'msg', regardless of success. */
 int
 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 982017a..6a82954 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -130,5 +130,7 @@  void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
                                         int probe_interval);
 void jsonrpc_session_set_dscp(struct jsonrpc_session *,
                               uint8_t dscp);
+const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
+
 
 #endif /* jsonrpc.h */
diff --git a/lib/ovsdb-error.c b/lib/ovsdb-error.c
index dfa4249..6331668 100644
--- a/lib/ovsdb-error.c
+++ b/lib/ovsdb-error.c
@@ -167,6 +167,19 @@  ovsdb_internal_error(struct ovsdb_error *inner_error,
     return error;
 }
 
+struct ovsdb_error *ovsdb_perm_error(const char *details, ...)
+{
+    struct ovsdb_error *error;
+    va_list args;
+
+    va_start(args, details);
+    error = ovsdb_error_valist("permission error", details, args);
+    va_end(args);
+
+    return error;
+}
+
+
 void
 ovsdb_error_destroy(struct ovsdb_error *error)
 {
diff --git a/lib/ovsdb-error.h b/lib/ovsdb-error.h
index 2bc259a..da91b74 100644
--- a/lib/ovsdb-error.h
+++ b/lib/ovsdb-error.h
@@ -41,6 +41,10 @@  struct ovsdb_error *ovsdb_internal_error(struct ovsdb_error *error,
     OVS_PRINTF_FORMAT(4, 5)
     OVS_WARN_UNUSED_RESULT;
 
+struct ovsdb_error *ovsdb_perm_error(const char *details, ...)
+    OVS_PRINTF_FORMAT(1, 2)
+    OVS_WARN_UNUSED_RESULT;
+
 /* Returns a pointer to an ovsdb_error that represents an internal error for
  * the current file name and line number with MSG as the associated message.
  * The caller is responsible for freeing the internal error. */
diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index c218bf5..ac0f741 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -24,6 +24,8 @@  ovsdb_libovsdb_la_SOURCES = \
 	ovsdb/monitor.h \
 	ovsdb/query.c \
 	ovsdb/query.h \
+	ovsdb/rbac.c \
+	ovsdb/rbac.h \
 	ovsdb/replication.c \
 	ovsdb/replication.h \
 	ovsdb/row.c \
diff --git a/ovsdb/execution.c b/ovsdb/execution.c
index e2d320e..9f27457 100644
--- a/ovsdb/execution.c
+++ b/ovsdb/execution.c
@@ -32,6 +32,7 @@ 
 #include "table.h"
 #include "timeval.h"
 #include "transaction.h"
+#include "rbac.h"
 
 struct ovsdb_execution {
     struct ovsdb *db;
@@ -39,6 +40,8 @@  struct ovsdb_execution {
     struct ovsdb_txn *txn;
     struct ovsdb_symbol_table *symtab;
     bool durable;
+    const char *role;
+    const char *id;
 
     /* Triggers. */
     long long int elapsed_msec;
@@ -97,6 +100,7 @@  lookup_executor(const char *name, bool *read_only)
 struct json *
 ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
               const struct json *params, bool read_only,
+              const char *role, const char *id,
               long long int elapsed_msec, long long int *timeout_msec)
 {
     struct ovsdb_execution x;
@@ -126,6 +130,8 @@  ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
     x.txn = ovsdb_txn_create(db);
     x.symtab = ovsdb_symbol_table_create();
     x.durable = false;
+    x.role = role;
+    x.id = id;
     x.elapsed_msec = elapsed_msec;
     x.timeout_msec = LLONG_MAX;
     results = NULL;
@@ -305,6 +311,12 @@  ovsdb_execute_insert(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         return error;
     }
 
+    if (!ovsdb_rbac_insert(x->db, table, x->role, x->id)) {
+        return ovsdb_perm_error("RBAC rules for client \"%s\" role \"%s\" "
+                                "prohibit row insertion into table \"%s\".",
+                                x->id, x->role, table->schema->name);
+    }
+
     if (uuid_name) {
         struct ovsdb_symbol *symbol;
 
@@ -410,6 +422,8 @@  struct update_row_cbdata {
     struct ovsdb_txn *txn;
     const struct ovsdb_row *row;
     const struct ovsdb_column_set *columns;
+    const char *role;
+    const char *id;
 };
 
 static bool
@@ -470,7 +484,15 @@  ovsdb_execute_update(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         ur.txn = x->txn;
         ur.row = row;
         ur.columns = &columns;
-        ovsdb_query(table, &condition, update_row_cb, &ur);
+        if (ovsdb_rbac_update(x->db, table, &columns, &condition, x->role,
+                              x->id)) {
+            ovsdb_query(table, &condition, update_row_cb, &ur);
+        } else {
+            error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
+                                     "\"%s\" prohibit modification of "
+                                     "table \"%s\".",
+                                     x->id, x->role, table->schema->name);
+        }
         json_object_put(result, "count", json_integer_create(ur.n_matches));
     }
 
@@ -529,7 +551,15 @@  ovsdb_execute_mutate(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         mr.txn = x->txn;
         mr.mutations = &mutations;
         mr.error = &error;
-        ovsdb_query(table, &condition, mutate_row_cb, &mr);
+        if (ovsdb_rbac_mutate(x->db, table, &mutations, &condition, x->role,
+                              x->id)) {
+            ovsdb_query(table, &condition, mutate_row_cb, &mr);
+        } else {
+            error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
+                                     "\"%s\" prohibit mutate operation on "
+                                     "table \"%s\".",
+                                     x->id, x->role, table->schema->name);
+        }
         json_object_put(result, "count", json_integer_create(mr.n_matches));
     }
 
@@ -579,8 +609,15 @@  ovsdb_execute_delete(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         dr.n_matches = 0;
         dr.table = table;
         dr.txn = x->txn;
-        ovsdb_query(table, &condition, delete_row_cb, &dr);
 
+        if (ovsdb_rbac_delete(x->db, table, &condition, x->role, x->id)) {
+            ovsdb_query(table, &condition, delete_row_cb, &dr);
+        } else {
+            error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
+                                     "\"%s\" prohibit row deletion from "
+                                     "table \"%s\".",
+                                     x->id, x->role, table->schema->name);
+        }
         json_object_put(result, "count", json_integer_create(dr.n_matches));
     }
 
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 1ba6bb3..6edfe24 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -128,6 +128,7 @@  struct ovsdb_jsonrpc_remote {
     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
     uint8_t dscp;
     bool read_only;
+    char *role;
 };
 
 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
@@ -270,6 +271,7 @@  ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
     ovs_list_init(&remote->sessions);
     remote->dscp = options->dscp;
     remote->read_only = options->read_only;
+    remote->role = options->role ? xstrdup(options->role) : NULL;
     shash_add(&svr->remotes, name, remote);
 
     if (!listener) {
@@ -287,6 +289,7 @@  ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
     ovsdb_jsonrpc_session_close_all(remote);
     pstream_close(remote->listener);
     shash_delete(&remote->server->remotes, node);
+    free(remote->role);
     free(remote);
 }
 
@@ -1038,7 +1041,8 @@  ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     /* Insert into trigger table. */
     t = xmalloc(sizeof *t);
     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec(),
-                       s->read_only);
+                       s->read_only, s->remote->role,
+                       jsonrpc_session_get_id(s->js));
     t->id = id;
     hmap_insert(&s->triggers, &t->hmap_node, hash);
 
diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
index 3cacbb6..1add327 100644
--- a/ovsdb/jsonrpc-server.h
+++ b/ovsdb/jsonrpc-server.h
@@ -37,6 +37,7 @@  struct ovsdb_jsonrpc_options {
     int probe_interval;         /* Max idle time before probing, in msec. */
     bool read_only;             /* Only read-only transactions are allowed. */
     int dscp;                   /* Dscp value for manager connections */
+    char *role;                 /* Role, for role-based access controls */
 };
 struct ovsdb_jsonrpc_options *
 ovsdb_jsonrpc_default_options(const char *target);
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 50c3555..32ee96c 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -683,7 +683,7 @@  add_manager_options(struct shash *remotes, const struct ovsdb_row *row)
     struct ovsdb_jsonrpc_options *options;
     long long int max_backoff, probe_interval;
     bool read_only;
-    const char *target, *dscp_string;
+    const char *target, *dscp_string, *role;
 
     if (!ovsdb_util_read_string_column(row, "target", &target) || !target) {
         VLOG_INFO_RL(&rl, "Table `%s' has missing or invalid `target' column",
@@ -702,6 +702,10 @@  add_manager_options(struct shash *remotes, const struct ovsdb_row *row)
     if (ovsdb_util_read_bool_column(row, "read_only", &read_only)) {
         options->read_only = read_only;
     }
+    free(options->role);
+    if (ovsdb_util_read_string_column(row, "role", &role) && role) {
+        options->role = xstrdup(role);
+    }
 
     options->dscp = DSCP_DEFAULT;
     dscp_string = ovsdb_util_read_map_string_column(row, "other_config",
@@ -1350,6 +1354,7 @@  parse_options(int *argcp, char **argvp[],
         OPT_SYNC_FROM,
         OPT_SYNC_EXCLUDE,
         OPT_ACTIVE,
+        OPT_RBAC,
         VLOG_OPTION_ENUMS,
         DAEMON_OPTION_ENUMS,
         SSL_OPTION_ENUMS,
diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
index 8d7e76a..fb8eb1e 100644
--- a/ovsdb/ovsdb-tool.c
+++ b/ovsdb/ovsdb-tool.c
@@ -366,7 +366,7 @@  transact(bool read_only, int argc, char *argv[])
     check_ovsdb_error(ovsdb_file_open(db_file_name, read_only, &db, NULL));
 
     request = parse_json(transaction);
-    result = ovsdb_execute(db, NULL, request, false, 0, NULL);
+    result = ovsdb_execute(db, NULL, request, false, NULL, NULL, 0, NULL);
     json_destroy(request);
 
     print_and_free_json(result);
diff --git a/ovsdb/ovsdb-util.c b/ovsdb/ovsdb-util.c
index 647f5df..c26812b 100644
--- a/ovsdb/ovsdb-util.c
+++ b/ovsdb/ovsdb-util.c
@@ -88,6 +88,44 @@  ovsdb_util_read_map_string_column(const struct ovsdb_row *row,
     return atom_value ? atom_value->string : NULL;
 }
 
+/* Read string-uuid key-values from a map.  Returns the row associated with
+ * 'key', if found, or NULL */
+const struct ovsdb_row *
+ovsdb_util_read_map_string_uuid_column(const struct ovsdb_row *row,
+                                       const char *column_name,
+                                       const char *key)
+{
+    union ovsdb_atom *atom_key = NULL, *atom_value = NULL;
+    const struct ovsdb_table *ref_table;
+    const struct ovsdb_column *column;
+    const struct ovsdb_datum *datum;
+    size_t i;
+
+    column = ovsdb_table_schema_get_column(row->table->schema, column_name);
+    if (!column ||
+        column->type.key.type != OVSDB_TYPE_STRING ||
+        column->type.value.type != OVSDB_TYPE_UUID) {
+        return NULL;
+    }
+
+    datum = &row->fields[column->index];
+
+    for (i = 0; i < datum->n; i++) {
+        atom_key = &datum->keys[i];
+        if (!strcmp(atom_key->string, key)) {
+            atom_value = &datum->values[i];
+            break;
+        }
+    }
+
+    if (!atom_value) {
+        return NULL;
+    }
+    ref_table = column->type.value.u.uuid.refTable;
+    return ovsdb_table_get_row(ref_table, &atom_value->uuid);
+}
+
+
 const union ovsdb_atom *
 ovsdb_util_read_column(const struct ovsdb_row *row, const char *column_name,
                        enum ovsdb_atomic_type type)
diff --git a/ovsdb/ovsdb-util.h b/ovsdb/ovsdb-util.h
index effbec8..abd81ff 100644
--- a/ovsdb/ovsdb-util.h
+++ b/ovsdb/ovsdb-util.h
@@ -25,6 +25,10 @@  struct ovsdb_datum *ovsdb_util_get_datum(struct ovsdb_row *row,
 const char *ovsdb_util_read_map_string_column(const struct ovsdb_row *row,
                                               const char *column_name,
                                               const char *key);
+const struct ovsdb_row *ovsdb_util_read_map_string_uuid_column(
+                                                    const struct ovsdb_row *r,
+                                                    const char *column_name,
+                                                    const char *key);
 const union ovsdb_atom *ovsdb_util_read_column(const struct ovsdb_row *row,
                                                const char *column_name,
                                                enum ovsdb_atomic_type type);
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
index 0391900..d8f441a 100644
--- a/ovsdb/ovsdb.c
+++ b/ovsdb/ovsdb.c
@@ -351,6 +351,9 @@  ovsdb_create(struct ovsdb_schema *schema)
         }
     }
 
+    /* Use RBAC roles table if present. */
+    db->rbac_role = ovsdb_get_table(db, "RBAC_Role");
+
     return db;
 }
 
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index fc45c80..89bbfa2 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -62,6 +62,8 @@  struct ovsdb {
     /* Triggers. */
     struct ovs_list triggers;   /* Contains "struct ovsdb_trigger"s. */
     bool run_triggers;
+
+    struct ovsdb_table *rbac_role;
 };
 
 struct ovsdb *ovsdb_create(struct ovsdb_schema *);
@@ -73,6 +75,7 @@  struct ovsdb_table *ovsdb_get_table(const struct ovsdb *, const char *);
 
 struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
                            const struct json *params, bool read_only,
+                           const char *role, const char *id,
                            long long int elapsed_msec,
                            long long int *timeout_msec);
 
diff --git a/ovsdb/rbac.c b/ovsdb/rbac.c
new file mode 100644
index 0000000..9ce75c8
--- /dev/null
+++ b/ovsdb/rbac.c
@@ -0,0 +1,443 @@ 
+/*
+ * Copyright (c) 2017 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <config.h>
+#include <limits.h>
+
+#include "column.h"
+#include "condition.h"
+#include "file.h"
+#include "mutation.h"
+#include "ovsdb-data.h"
+#include "ovsdb-error.h"
+#include "ovsdb-parser.h"
+#include "ovsdb.h"
+#include "query.h"
+#include "row.h"
+#include "server.h"
+#include "table.h"
+#include "timeval.h"
+#include "transaction.h"
+#include "ovsdb-util.h"
+#include "condition.h"
+#include "rbac.h"
+#include "openvswitch/vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(ovsdb_rbac);
+
+static const struct ovsdb_row *
+ovsdb_find_row_by_string_key(const struct ovsdb_table *table,
+                             const char *column_name,
+                             const char *key)
+{
+    const struct ovsdb_column *column;
+    const struct ovsdb_row *row;
+
+    column = ovsdb_table_schema_get_column(table->schema, column_name);
+
+    if (column) {
+        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+            const struct ovsdb_datum *datum;
+            size_t i;
+
+            datum = &row->fields[column->index];
+            for (i = 0; i < datum->n; i++) {
+                if (datum->keys[i].string[0] &&
+                    !strcmp(key, datum->keys[i].string)) {
+                    return row;
+                }
+            }
+        }
+    }
+    return NULL;
+}
+
+static const struct ovsdb_row *
+ovsdb_rbac_lookup_perms(const struct ovsdb *db, const char *role,
+                        const char *table)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_row *role_row, *perm_row;
+    const struct ovsdb_column *column;
+
+    /* Lookup role in roles table */
+    role_row = ovsdb_find_row_by_string_key(db->rbac_role, "name", role);
+    if (!role_row) {
+        VLOG_INFO_RL(&rl, "rbac: role \"%s\" not found in rbac roles table",
+                     role);
+        return NULL;
+    }
+
+    /* Find row in permissions column for table from "permissions" column */
+    column = ovsdb_table_schema_get_column(role_row->table->schema,
+                                           "permissions");
+    if (!column) {
+        VLOG_INFO_RL(&rl, "rbac: \"permissions\" column not present in rbac "
+                  "roles table");
+        return NULL;
+    }
+    perm_row = ovsdb_util_read_map_string_uuid_column(role_row, "permissions",
+                                                      table);
+
+    return perm_row;
+}
+
+static bool
+ovsdb_rbac_authorized(const struct ovsdb_row *perms,
+                      const char *id,
+                      const struct ovsdb_row *row)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_datum *datum;
+    size_t i;
+
+    datum = ovsdb_util_get_datum(CONST_CAST(struct ovsdb_row *, perms),
+                                 "authorization",
+                                 OVSDB_TYPE_STRING, OVSDB_TYPE_VOID, UINT_MAX);
+
+    if (!datum) {
+        VLOG_INFO_RL(&rl, "rbac: error reading authorization column");
+        return false;
+    }
+
+    for (i = 0; i < datum->n; i++) {
+        char *name = datum->keys[i].string;
+        const char *value = NULL;
+        bool is_map;
+
+        if (name[0] == '\0') {
+            /* empty string means all are authorized */
+            return true;
+        }
+
+        is_map = strchr(name, ':') != NULL;
+
+        if (is_map) {
+            char *tmp = xstrdup(name);
+            char *col_name, *key, *save_ptr = NULL;
+            col_name = strtok_r(name, ":", &save_ptr);
+            key = strtok_r(NULL, ":", &save_ptr);
+
+            if (col_name && key) {
+                value = ovsdb_util_read_map_string_column(row, col_name, key);
+            }
+            free(tmp);
+        } else {
+            ovsdb_util_read_string_column(row, name, &value);
+        }
+        if (value && !strcmp(value, id)) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+bool
+ovsdb_rbac_insert(const struct ovsdb *db, const struct ovsdb_table *table,
+                  const char *role, const char *id)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_row *perms;
+    bool insdel;
+
+    if (!db->rbac_role || !role || *role == '\0') {
+        return true;
+    }
+
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(db, role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    if (!ovsdb_util_read_bool_column(perms, "insert_delete", &insdel)) {
+        return false;
+    }
+
+    if (insdel) {
+        return true;
+    }
+
+denied:
+    return false;
+}
+
+struct rbac_delete_cbdata {
+    const struct ovsdb_table *table;
+    const struct ovsdb_row *perms;
+    const char *role;
+    const char *id;
+    bool permitted;
+};
+
+static bool
+rbac_delete_cb(const struct ovsdb_row *row, void *rd_)
+{
+    struct rbac_delete_cbdata *rd = rd_;
+    bool insdel;
+
+    if (!ovsdb_rbac_authorized(rd->perms, rd->id, row)) {
+        goto denied;
+    }
+
+    if (!ovsdb_util_read_bool_column(rd->perms, "insert_delete", &insdel)) {
+        goto denied;
+    }
+
+    if (!insdel) {
+        goto denied;
+    }
+    return true;
+
+denied:
+    rd->permitted = false;
+    return false;
+}
+
+bool
+ovsdb_rbac_delete(const struct ovsdb *db, struct ovsdb_table *table,
+                  struct ovsdb_condition *condition,
+                  const char *role, const char *id)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_row *perms;
+    struct rbac_delete_cbdata rd;
+
+    if (!db->rbac_role || !role || *role == '\0') {
+        return true;
+    }
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(db, role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    rd.permitted = true;
+    rd.perms = perms;
+    rd.table = table;
+    rd.role = role;
+    rd.id = id;
+
+    ovsdb_query(table, condition, rbac_delete_cb, &rd);
+
+    if (rd.permitted) {
+        return true;
+    }
+
+denied:
+    return false;
+}
+
+struct rbac_update_cbdata {
+    const struct ovsdb_table *table;
+    const struct ovsdb_column_set *columns; /* columns to be modified */
+    const struct ovsdb_datum *modifiable; /* modifiable column names */
+    const struct ovsdb_row *perms;
+    const char *role;
+    const char *id;
+    bool permitted;
+};
+
+static bool
+rbac_column_modification_permitted(const struct ovsdb_column *column,
+                                   const struct ovsdb_datum *modifiable)
+{
+    size_t i;
+
+    for (i = 0; i < modifiable->n; i++) {
+        char *name = modifiable->keys[i].string;
+
+        if (!strcmp(name, column->name)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+static bool
+rbac_update_cb(const struct ovsdb_row *row, void *ru_)
+{
+    struct rbac_update_cbdata *ru = ru_;
+    size_t i;
+
+    if (!ovsdb_rbac_authorized(ru->perms, ru->id, row)) {
+        goto denied;
+    }
+
+    for (i = 0; i < ru->columns->n_columns; i++) {
+        const struct ovsdb_column *column = ru->columns->columns[i];
+
+        if (!rbac_column_modification_permitted(column, ru->modifiable)) {
+            goto denied;
+        }
+    }
+    return true;
+
+denied:
+    ru->permitted = false;
+    return false;
+}
+
+bool
+ovsdb_rbac_update(const struct ovsdb *db,
+                  struct ovsdb_table *table,
+                  struct ovsdb_column_set *columns,
+                  struct ovsdb_condition *condition,
+                  const char *role, const char *id)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_datum *datum;
+    const struct ovsdb_row *perms;
+    struct rbac_update_cbdata ru;
+
+    if (!db->rbac_role || !role || *role == '\0') {
+        return true;
+    }
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(db, role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    datum = ovsdb_util_get_datum(CONST_CAST(struct ovsdb_row *, perms),
+                                 "update",
+                                 OVSDB_TYPE_STRING, OVSDB_TYPE_VOID, UINT_MAX);
+
+    if (!datum) {
+        VLOG_INFO_RL(&rl, "ovsdb_rbac_update: could not read \"update\" "
+                     "column");
+        goto denied;
+    }
+
+    ru.table = table;
+    ru.columns = columns;
+    ru.role = role;
+    ru.id = id;
+    ru.perms = perms;
+    ru.modifiable = datum;
+    ru.permitted = true;
+
+    ovsdb_query(table, condition, rbac_update_cb, &ru);
+
+    if (ru.permitted) {
+        return true;
+    }
+
+denied:
+    return false;
+}
+
+struct rbac_mutate_cbdata {
+    const struct ovsdb_table *table;
+    const struct ovsdb_mutation_set *mutations; /* columns to be mutated */
+    const struct ovsdb_datum *modifiable; /* modifiable column names */
+    const struct ovsdb_row *perms;
+    const char *role;
+    const char *id;
+    bool permitted;
+};
+
+static bool
+rbac_mutate_cb(const struct ovsdb_row *row, void *rm_)
+{
+    struct rbac_mutate_cbdata *rm = rm_;
+    size_t i;
+
+    if (!ovsdb_rbac_authorized(rm->perms, rm->id, row)) {
+        goto denied;
+    }
+
+    for (i = 0; i < rm->mutations->n_mutations; i++) {
+        const struct ovsdb_column *column = rm->mutations->mutations[i].column;
+
+        if (!rbac_column_modification_permitted(column, rm->modifiable)) {
+            goto denied;
+        }
+    }
+
+    return true;
+
+denied:
+    rm->permitted = false;
+    return false;
+}
+
+bool
+ovsdb_rbac_mutate(const struct ovsdb *db,
+                  struct ovsdb_table *table,
+                  struct ovsdb_mutation_set *mutations,
+                  struct ovsdb_condition *condition,
+                  const char *role, const char *id)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_datum *datum;
+    const struct ovsdb_row *perms;
+    struct rbac_mutate_cbdata rm;
+
+    if (!db->rbac_role || !role || *role == '\0') {
+        return true;
+    }
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(db, role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    datum = ovsdb_util_get_datum(CONST_CAST(struct ovsdb_row *, perms),
+                                 "update",
+                                 OVSDB_TYPE_STRING, OVSDB_TYPE_VOID, UINT_MAX);
+
+    if (!datum) {
+        VLOG_INFO_RL(&rl, "ovsdb_rbac_mutate: could not read \"update\" "
+                     "column");
+        goto denied;
+    }
+
+    rm.table = table;
+    rm.mutations = mutations;
+    rm.role = role;
+    rm.id = id;
+    rm.perms = perms;
+    rm.modifiable = datum;
+    rm.permitted = true;
+
+    ovsdb_query(table, condition, rbac_mutate_cb, &rm);
+
+    if (rm.permitted) {
+        return true;
+    }
+
+denied:
+    return false;
+}
diff --git a/ovsdb/rbac.h b/ovsdb/rbac.h
new file mode 100644
index 0000000..1c2664b
--- /dev/null
+++ b/ovsdb/rbac.h
@@ -0,0 +1,38 @@ 
+/*
+ * Copyright (c) 2017 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVSDB_RBAC_H
+#define OVSDB_RBAC_H 1
+struct ovsdb_mutation_set;
+
+bool ovsdb_rbac_insert(const struct ovsdb *,
+                       const struct ovsdb_table *,
+                       const char *, const char *);
+bool ovsdb_rbac_delete(const struct ovsdb *,
+                       struct ovsdb_table *table,
+                       struct ovsdb_condition *condition,
+                       const char *role, const char *id);
+bool ovsdb_rbac_update(const struct ovsdb *,
+                       struct ovsdb_table *table,
+                       struct ovsdb_column_set *columns,
+                       struct ovsdb_condition *condition,
+                       const char *role, const char *id);
+bool ovsdb_rbac_mutate(const struct ovsdb *,
+                       struct ovsdb_table *table,
+                       struct ovsdb_mutation_set *mutations,
+                       struct ovsdb_condition *condition,
+                       const char *role, const char *id);
+#endif /* ovsdb/rbac.h */
diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c
index a859983..94b95ea 100644
--- a/ovsdb/trigger.c
+++ b/ovsdb/trigger.c
@@ -32,7 +32,8 @@  void
 ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
                    struct ovsdb_trigger *trigger,
                    struct json *request, long long int now,
-                   bool read_only)
+                   bool read_only, const char *role,
+                   const char *id)
 {
     trigger->session = session;
     trigger->db = db;
@@ -42,6 +43,8 @@  ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
     trigger->created = now;
     trigger->timeout_msec = LLONG_MAX;
     trigger->read_only = read_only;
+    trigger->role = role ? xstrdup(role): NULL;
+    trigger->id = id ? xstrdup(id): NULL;
     ovsdb_trigger_try(trigger, now);
 }
 
@@ -51,6 +54,8 @@  ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
     ovs_list_remove(&trigger->node);
     json_destroy(trigger->request);
     json_destroy(trigger->result);
+    free(trigger->role);
+    free(trigger->id);
 }
 
 bool
@@ -114,6 +119,7 @@  ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
 {
     t->result = ovsdb_execute(t->db, t->session,
                               t->request, t->read_only,
+                              t->role, t->id,
                               now - t->created, &t->timeout_msec);
     if (t->result) {
         ovsdb_trigger_complete(t);
diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h
index c8474a4..90246a4 100644
--- a/ovsdb/trigger.h
+++ b/ovsdb/trigger.h
@@ -30,12 +30,15 @@  struct ovsdb_trigger {
     long long int created;      /* Time created. */
     long long int timeout_msec; /* Max wait duration. */
     bool read_only;             /* Database is in read only mode. */
+    char *role;                 /* Role, for role-based access controls. */
+    char *id;                   /* ID, for role-based access controls. */
 };
 
 void ovsdb_trigger_init(struct ovsdb_session *, struct ovsdb *,
                         struct ovsdb_trigger *,
                         struct json *request, long long int now,
-                        bool read_only);
+                        bool read_only, const char *role,
+                        const char *id);
 void ovsdb_trigger_destroy(struct ovsdb_trigger *);
 
 bool ovsdb_trigger_is_complete(const struct ovsdb_trigger *);
diff --git a/tests/automake.mk b/tests/automake.mk
index c6bd120..0c28c47 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -82,6 +82,7 @@  TESTSUITE_AT = \
 	tests/ovsdb-monitor.at \
 	tests/ovsdb-idl.at \
 	tests/ovsdb-lock.at \
+	tests/ovsdb-rbac.at \
 	tests/ovs-vsctl.at \
 	tests/ovs-xapi-sync.at \
 	tests/stp.at \
diff --git a/tests/ovsdb-rbac.at b/tests/ovsdb-rbac.at
new file mode 100644
index 0000000..50202cb
--- /dev/null
+++ b/tests/ovsdb-rbac.at
@@ -0,0 +1,252 @@ 
+AT_BANNER([OVSDB -- ovsdb-server rbac])
+
+AT_SETUP([ovsdb-server/rbac 2])
+AT_KEYWORDS([ovsdb server rbac])
+AT_SKIP_IF([test "$HAVE_OPENSSL" = no])
+
+RBAC_PKIDIR="$(pwd)"
+RBAC_PKI="sh $abs_top_srcdir/utilities/ovs-pki.in --dir=$RBAC_PKIDIR/pki --log=$RBAC_PKIDIR/rbac-pki.log"
+$RBAC_PKI -B 1024 init
+$RBAC_PKI -B 1024 req+sign ovsdb-server switch
+$RBAC_PKI -B 1024 -u req+sign client-1 switch
+$RBAC_PKI -B 1024 -u req+sign client-2 switch
+
+AT_DATA([schema],
+  [[{"name": "mydb",
+     "tables": {
+       "Root": {
+         "columns": {
+           "connections": {
+             "type": {
+               "key": {"type": "uuid", "refTable": "Connection"},
+               "min": 0,
+               "max": "unlimited"}}},
+          "isRoot": true},
+       "Connection": {
+         "columns": {
+           "target": {
+             "type": "string"},
+           "role": {
+             "type": "string"}}},
+        "RBAC_Role": {
+            "columns": {
+                "name": {"type": "string"},
+                "permissions": {
+                    "type": {"key": {"type": "string"},
+                             "value": {"type": "uuid",
+                                       "refTable": "RBAC_Permission",
+                                       "refType": "weak"},
+                                     "min": 0, "max": "unlimited"}}},
+            "isRoot": true},
+        "RBAC_Permission": {
+            "columns": {
+                "table": {"type": "string"},
+                "authorization": {"type": {"key": "string",
+                                           "min": 0,
+                                           "max": "unlimited"}},
+                "insert_delete": {"type": "boolean"},
+                "update" : {"type": {"key": "string",
+                                     "min": 0,
+                                     "max": "unlimited"}}},
+            "isRoot": true},
+       "fixed_colors": {
+         "columns": {
+           "name": {"type": "string"},
+           "value": {"type": "integer"}},
+         "indexes": [["name"]],
+         "isRoot": true},
+       "user_colors": {
+         "columns": {
+           "creator": {"type": "string"},
+           "name": {"type": "string"},
+           "value": {"type": "integer"}},
+         "indexes": [["name"]],
+         "isRoot": true}
+    },
+     "version": "5.1.3",
+     "cksum": "12345678 9"
+}
+]])
+
+AT_CHECK([ovsdb-tool create db schema], [0], [ignore], [ignore])
+AT_CHECK(
+  [[ovsdb-tool transact db \
+     '["mydb",
+       {"op": "insert",
+        "table": "Root",
+        "row": {
+          "connections": ["set", [["named-uuid", "x"]]]}},
+       {"op": "insert",
+        "table": "Connection",
+        "uuid-name": "x",
+        "row": {"target": "pssl:0:127.0.0.1",
+                "role": "testrole"}},
+       {"op": "insert",
+        "table": "fixed_colors",
+        "row": {"name": "red",
+                "value": '16711680'}},
+       {"op": "insert",
+        "table": "RBAC_Role",
+        "row": {"name": "testrole",
+                "permissions": ["map", [["user_colors", ["named-uuid", "y"]]]]}},
+       {"op": "insert",
+        "table": "RBAC_Permission",
+        "uuid-name": "y",
+        "row": {"authorization": "creator",
+                "insert_delete": true,
+                "table": "fixed_colors",
+                "update": ["set", ["name", "value"]]}}
+]']], [0], [ignore], [ignore])
+
+AT_CHECK([ovsdb-server --log-file --detach --no-chdir --pidfile --remote=db:mydb,Root,connections \
+        --private-key=$RBAC_PKIDIR/ovsdb-server-privkey.pem \
+        --certificate=$RBAC_PKIDIR/ovsdb-server-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        db], [0], [ignore], [ignore])
+PARSE_LISTENING_PORT([ovsdb-server.log], [SSL_PORT])
+
+# Test 1:
+# Attempt to insert a row into the "fixed_colors" table.  This should
+# fail as there are no permissions for role "testrole" for this table.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "insert",
+          "table": "fixed_colors",
+          "row": {"name": "chartreuse", "value": '8388352'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-1\" role \"testrole\" prohibit row insertion into table \"fixed_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 2:
+# Attempt to insert a row into the "user_colors" table.  This should
+# succeed since role "testrole" has permissions for this table that
+# allow row insertion.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "insert",
+          "table": "user_colors",
+          "row": {"creator": "client-1", "name": "chartreuse", "value": '8388352'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"uuid":["uuid","<0>"]}]]
+], [ignore])
+
+# Test 3:
+# Attempt to update a column in the "user_colors" table.  This should
+# succeed since role "testrole" has permissions for this table that
+# allow update of the "value" column when ID is equal to the value in
+# the "creator" column.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "update",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "row": {"value": '8388353'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"count":1}]]
+], [ignore])
+
+# Test 4:
+# Attempt to update a column in the "user_colors" table.  Same as
+# previous test, but with a different client ID. This should fail
+# the RBAC authorization test because "client-2" does not match the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-2-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-2-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "update",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "row": {"value": '8388354'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-2\" role \"testrole\" prohibit modification of table \"user_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 5:
+# Attempt to mutate a column in the "user_colors" table.  This should
+# succeed since role "testrole" has permissions for this table that
+# allow update of the "value" column when ID is equal to the value in
+# the "creator" column.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "mutate",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "mutations": [["value", "+=", '10']]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"count":1}]]
+], [ignore])
+
+# Test 6:
+# Attempt to mutate a column in the "user_colors" table.  Same as
+# previous test, but with a different client ID. This should fail
+# the RBAC authorization test because "client-2" does not match the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-2-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-2-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "mutate",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "mutations": [["value", "+=", '10']]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-2\" role \"testrole\" prohibit mutate operation on table \"user_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 7:
+# Attempt to delete a row from the "user_colors" table. This should fail
+# the RBAC authorization test because "client-2" does not match the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-2-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-2-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "delete",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-2\" role \"testrole\" prohibit row deletion from table \"user_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 8:
+# Attempt to delete a row from the "user_colors" table. This should pass
+# the RBAC authorization test because "client-1" does matches the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "delete",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"count":1}]]
+], [ignore])
+
+OVSDB_SERVER_SHUTDOWN
+AT_CLEANUP
diff --git a/tests/ovsdb.at b/tests/ovsdb.at
index fe617a5..8a389b8 100644
--- a/tests/ovsdb.at
+++ b/tests/ovsdb.at
@@ -148,3 +148,4 @@  m4_include([tests/ovsdb-server.at])
 m4_include([tests/ovsdb-monitor.at])
 m4_include([tests/ovsdb-idl.at])
 m4_include([tests/ovsdb-lock.at])
+m4_include([tests/ovsdb-rbac.at])
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index 09e4f0d..f8cf1cd 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -1435,7 +1435,7 @@  do_execute__(struct ovs_cmdl_context *ctx, bool ro)
         char *s;
 
         params = parse_json(ctx->argv[i]);
-        result = ovsdb_execute(db, NULL, params, ro,  0, NULL);
+        result = ovsdb_execute(db, NULL, params, ro,  NULL, NULL, 0, NULL);
         s = json_to_string(result, JSSF_SORT);
         printf("%s\n", s);
         free(s);
@@ -1513,7 +1513,8 @@  do_trigger(struct ovs_cmdl_context *ctx)
             json_destroy(params);
         } else {
             struct test_trigger *t = xmalloc(sizeof *t);
-            ovsdb_trigger_init(&session, db, &t->trigger, params, now, false);
+            ovsdb_trigger_init(&session, db, &t->trigger, params, now, false,
+                               NULL, NULL);
             t->number = number++;
             if (ovsdb_trigger_is_complete(&t->trigger)) {
                 do_trigger_dump(t, now, "immediate");