[ovs-dev,RFC,v2,3/5] ovsdb: add support for role-based access controls

Submitted by Lance Richardson on April 13, 2017, 3 p.m.

Details

Message ID 20170413150026.394-4-lrichard@redhat.com
State Superseded
Headers show

Commit Message

Lance Richardson April 13, 2017, 3 p.m.
Add suport for ovsdb RBAC (role-based access control). This includes:

   - Support for new "--rbac <table>" command-line option to
     ovsdb-server, to specify the RBAC roles table to be used.

     This table has one row per role, with each row having a
     "name" column (role name) and a "permissions" column (map of
     table name to UUID of row in separate permission table.) The
     permission table has one row per access control configuration,
     with columns:
          "name" - name of table to which this row applies
          "authorization" - set of column names and column:key pairs
                            to be compared against client ID to
                            determine authorization status
          "insert_delete" - boolean, true if insertions and
                            authorized deletions are allowed.
          "update"        - Set of columns and column:key pairs for
                            which authorized updates are allowed.
   - Support for a new "role" column in the remote configuration
     table.
   - Logic for applying the RBAC role and permission tables, in
     combination with session role and client id, to determine
     whether operations modifying database contents should be
     permitted.

Signed-off-by: Lance Richardson <lrichard@redhat.com>
---
v2:
 - Added ovsdb_perm_error() to format permission error strings.
 - Re-implemented RBAC enforcing code to apply RBAC rules
   to entire delete/update/mutate operation before applying
   any changes to avoid partial updates.
 - Implemented basic unit tests for RBAC enforcement.

 lib/jsonrpc.c          |  10 ++
 lib/jsonrpc.h          |   2 +
 lib/ovsdb-error.c      |  13 ++
 lib/ovsdb-error.h      |   4 +
 ovsdb/automake.mk      |   2 +
 ovsdb/execution.c      |  41 ++++-
 ovsdb/jsonrpc-server.c |   6 +-
 ovsdb/jsonrpc-server.h |   1 +
 ovsdb/mutation.c       |   2 +
 ovsdb/mutation.h       |   5 +-
 ovsdb/ovsdb-server.c   |  70 +++++++-
 ovsdb/ovsdb-tool.c     |   2 +-
 ovsdb/ovsdb-util.c     |  39 +++++
 ovsdb/ovsdb-util.h     |   3 +
 ovsdb/ovsdb.h          |   1 +
 ovsdb/rbac.c           | 449 +++++++++++++++++++++++++++++++++++++++++++++++++
 ovsdb/rbac.h           |  36 ++++
 ovsdb/trigger.c        |   8 +-
 ovsdb/trigger.h        |   5 +-
 tests/automake.mk      |   1 +
 tests/ovsdb-rbac.at    | 253 ++++++++++++++++++++++++++++
 tests/ovsdb.at         |   1 +
 tests/test-ovsdb.c     |   5 +-
 23 files changed, 945 insertions(+), 14 deletions(-)
 create mode 100644 ovsdb/rbac.c
 create mode 100644 ovsdb/rbac.h
 create mode 100644 tests/ovsdb-rbac.at

Patch hide | download patch | download mbox

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index a0ade9c..2fae057 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -1005,6 +1005,16 @@  jsonrpc_session_get_name(const struct jsonrpc_session *s)
     return reconnect_get_name(s->reconnect);
 }
 
+const char *
+jsonrpc_session_get_id(const struct jsonrpc_session *s)
+{
+    if (s->rpc && s->rpc->stream) {
+        return stream_get_peer_id(s->rpc->stream);
+    } else {
+        return NULL;
+    }
+}
+
 /* Always takes ownership of 'msg', regardless of success. */
 int
 jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index 982017a..6a82954 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -130,5 +130,7 @@  void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
                                         int probe_interval);
 void jsonrpc_session_set_dscp(struct jsonrpc_session *,
                               uint8_t dscp);
+const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
+
 
 #endif /* jsonrpc.h */
diff --git a/lib/ovsdb-error.c b/lib/ovsdb-error.c
index dfa4249..6331668 100644
--- a/lib/ovsdb-error.c
+++ b/lib/ovsdb-error.c
@@ -167,6 +167,19 @@  ovsdb_internal_error(struct ovsdb_error *inner_error,
     return error;
 }
 
+struct ovsdb_error *ovsdb_perm_error(const char *details, ...)
+{
+    struct ovsdb_error *error;
+    va_list args;
+
+    va_start(args, details);
+    error = ovsdb_error_valist("permission error", details, args);
+    va_end(args);
+
+    return error;
+}
+
+
 void
 ovsdb_error_destroy(struct ovsdb_error *error)
 {
diff --git a/lib/ovsdb-error.h b/lib/ovsdb-error.h
index 2bc259a..da91b74 100644
--- a/lib/ovsdb-error.h
+++ b/lib/ovsdb-error.h
@@ -41,6 +41,10 @@  struct ovsdb_error *ovsdb_internal_error(struct ovsdb_error *error,
     OVS_PRINTF_FORMAT(4, 5)
     OVS_WARN_UNUSED_RESULT;
 
+struct ovsdb_error *ovsdb_perm_error(const char *details, ...)
+    OVS_PRINTF_FORMAT(1, 2)
+    OVS_WARN_UNUSED_RESULT;
+
 /* Returns a pointer to an ovsdb_error that represents an internal error for
  * the current file name and line number with MSG as the associated message.
  * The caller is responsible for freeing the internal error. */
diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index c218bf5..ac0f741 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -24,6 +24,8 @@  ovsdb_libovsdb_la_SOURCES = \
 	ovsdb/monitor.h \
 	ovsdb/query.c \
 	ovsdb/query.h \
+	ovsdb/rbac.c \
+	ovsdb/rbac.h \
 	ovsdb/replication.c \
 	ovsdb/replication.h \
 	ovsdb/row.c \
diff --git a/ovsdb/execution.c b/ovsdb/execution.c
index e2d320e..62d740e 100644
--- a/ovsdb/execution.c
+++ b/ovsdb/execution.c
@@ -32,6 +32,7 @@ 
 #include "table.h"
 #include "timeval.h"
 #include "transaction.h"
+#include "rbac.h"
 
 struct ovsdb_execution {
     struct ovsdb *db;
@@ -39,6 +40,8 @@  struct ovsdb_execution {
     struct ovsdb_txn *txn;
     struct ovsdb_symbol_table *symtab;
     bool durable;
+    const char *role;
+    const char *id;
 
     /* Triggers. */
     long long int elapsed_msec;
@@ -97,6 +100,7 @@  lookup_executor(const char *name, bool *read_only)
 struct json *
 ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
               const struct json *params, bool read_only,
+              const char *role, const char *id,
               long long int elapsed_msec, long long int *timeout_msec)
 {
     struct ovsdb_execution x;
@@ -126,6 +130,8 @@  ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
     x.txn = ovsdb_txn_create(db);
     x.symtab = ovsdb_symbol_table_create();
     x.durable = false;
+    x.role = role;
+    x.id = id;
     x.elapsed_msec = elapsed_msec;
     x.timeout_msec = LLONG_MAX;
     results = NULL;
@@ -305,6 +311,12 @@  ovsdb_execute_insert(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         return error;
     }
 
+    if (!ovsdb_rbac_insert(table, x->role, x->id)) {
+        return ovsdb_perm_error("RBAC rules for client \"%s\" role \"%s\" "
+                                "prohibit row insertion into table \"%s\".",
+                                x->id, x->role, table->schema->name);
+    }
+
     if (uuid_name) {
         struct ovsdb_symbol *symbol;
 
@@ -410,6 +422,8 @@  struct update_row_cbdata {
     struct ovsdb_txn *txn;
     const struct ovsdb_row *row;
     const struct ovsdb_column_set *columns;
+    const char *role;
+    const char *id;
 };
 
 static bool
@@ -470,7 +484,14 @@  ovsdb_execute_update(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         ur.txn = x->txn;
         ur.row = row;
         ur.columns = &columns;
-        ovsdb_query(table, &condition, update_row_cb, &ur);
+        if (ovsdb_rbac_update(table, &columns, &condition, x->role, x->id)) {
+            ovsdb_query(table, &condition, update_row_cb, &ur);
+        } else {
+            error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
+                                     "\"%s\" prohibit modification of "
+                                     "table \"%s\".",
+                                     x->id, x->role, table->schema->name);
+        }
         json_object_put(result, "count", json_integer_create(ur.n_matches));
     }
 
@@ -529,7 +550,14 @@  ovsdb_execute_mutate(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         mr.txn = x->txn;
         mr.mutations = &mutations;
         mr.error = &error;
-        ovsdb_query(table, &condition, mutate_row_cb, &mr);
+        if (ovsdb_rbac_mutate(table, &mutations, &condition, x->role, x->id)) {
+            ovsdb_query(table, &condition, mutate_row_cb, &mr);
+        } else {
+            error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
+                                     "\"%s\" prohibit mutate operation on "
+                                     "table \"%s\".",
+                                     x->id, x->role, table->schema->name);
+        }
         json_object_put(result, "count", json_integer_create(mr.n_matches));
     }
 
@@ -579,8 +607,15 @@  ovsdb_execute_delete(struct ovsdb_execution *x, struct ovsdb_parser *parser,
         dr.n_matches = 0;
         dr.table = table;
         dr.txn = x->txn;
-        ovsdb_query(table, &condition, delete_row_cb, &dr);
 
+        if (ovsdb_rbac_delete(table, &condition, x->role, x->id)) {
+            ovsdb_query(table, &condition, delete_row_cb, &dr);
+        } else {
+            error = ovsdb_perm_error("RBAC rules for client \"%s\" role "
+                                     "\"%s\" prohibit row deletion from "
+                                     "table \"%s\".",
+                                     x->id, x->role, table->schema->name);
+        }
         json_object_put(result, "count", json_integer_create(dr.n_matches));
     }
 
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 1ba6bb3..6edfe24 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -128,6 +128,7 @@  struct ovsdb_jsonrpc_remote {
     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
     uint8_t dscp;
     bool read_only;
+    char *role;
 };
 
 static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote(
@@ -270,6 +271,7 @@  ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
     ovs_list_init(&remote->sessions);
     remote->dscp = options->dscp;
     remote->read_only = options->read_only;
+    remote->role = options->role ? xstrdup(options->role) : NULL;
     shash_add(&svr->remotes, name, remote);
 
     if (!listener) {
@@ -287,6 +289,7 @@  ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
     ovsdb_jsonrpc_session_close_all(remote);
     pstream_close(remote->listener);
     shash_delete(&remote->server->remotes, node);
+    free(remote->role);
     free(remote);
 }
 
@@ -1038,7 +1041,8 @@  ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     /* Insert into trigger table. */
     t = xmalloc(sizeof *t);
     ovsdb_trigger_init(&s->up, db, &t->trigger, params, time_msec(),
-                       s->read_only);
+                       s->read_only, s->remote->role,
+                       jsonrpc_session_get_id(s->js));
     t->id = id;
     hmap_insert(&s->triggers, &t->hmap_node, hash);
 
diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
index 3cacbb6..1add327 100644
--- a/ovsdb/jsonrpc-server.h
+++ b/ovsdb/jsonrpc-server.h
@@ -37,6 +37,7 @@  struct ovsdb_jsonrpc_options {
     int probe_interval;         /* Max idle time before probing, in msec. */
     bool read_only;             /* Only read-only transactions are allowed. */
     int dscp;                   /* Dscp value for manager connections */
+    char *role;                 /* Role, for role-based access controls */
 };
 struct ovsdb_jsonrpc_options *
 ovsdb_jsonrpc_default_options(const char *target);
diff --git a/ovsdb/mutation.c b/ovsdb/mutation.c
index e5d192e..26d2d01 100644
--- a/ovsdb/mutation.c
+++ b/ovsdb/mutation.c
@@ -29,6 +29,8 @@ 
 
 #include "table.h"
 #include "util.h"
+#include "condition.h"
+#include "rbac.h"
 
 struct ovsdb_error *
 ovsdb_mutator_from_string(const char *name, enum ovsdb_mutator *mutator)
diff --git a/ovsdb/mutation.h b/ovsdb/mutation.h
index 7566ef1..3babf4c 100644
--- a/ovsdb/mutation.h
+++ b/ovsdb/mutation.h
@@ -51,6 +51,8 @@  struct ovsdb_mutation {
     const struct ovsdb_column *column;
     struct ovsdb_datum arg;
     struct ovsdb_type type;
+    const char *role;
+    const char *id;
 };
 
 struct ovsdb_mutation_set {
@@ -67,6 +69,7 @@  struct ovsdb_error *ovsdb_mutation_set_from_json(
 struct json *ovsdb_mutation_set_to_json(const struct ovsdb_mutation_set *);
 void ovsdb_mutation_set_destroy(struct ovsdb_mutation_set *);
 struct ovsdb_error *ovsdb_mutation_set_execute(
-    struct ovsdb_row *, const struct ovsdb_mutation_set *) OVS_WARN_UNUSED_RESULT;
+    struct ovsdb_row *, const struct ovsdb_mutation_set *)
+    OVS_WARN_UNUSED_RESULT;
 
 #endif /* ovsdb/mutation.h */
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 50c3555..33603e2 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -58,6 +58,7 @@ 
 #include "perf-counter.h"
 #include "ovsdb-util.h"
 #include "openvswitch/vlog.h"
+#include "rbac.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_server);
 
@@ -116,9 +117,14 @@  static void close_db(struct db *db);
 static void parse_options(int *argc, char **argvp[],
                           struct sset *remotes, char **unixctl_pathp,
                           char **run_command, char **sync_from,
-                          char **sync_exclude, bool *is_backup);
+                          char **sync_exclude, bool *is_backup,
+                          char **rbac);
 OVS_NO_RETURN static void usage(void);
 
+static char *ovsdb_parse_rbac(const struct shash *all_dbs,
+                              char *rbac,
+                              const struct ovsdb_table **rbac_roles_table);
+
 static char *reconfigure_remotes(struct ovsdb_jsonrpc_server *,
                                  const struct shash *all_dbs,
                                  struct sset *remotes);
@@ -259,6 +265,7 @@  main(int argc, char *argv[])
     struct sset remotes, db_filenames;
     char *sync_from, *sync_exclude;
     bool is_backup;
+    char *rbac;
     const char *db_filename;
     struct process *run_process;
     bool exiting;
@@ -269,6 +276,7 @@  main(int argc, char *argv[])
     struct shash_node *node, *next;
     char *error;
     int i;
+    const struct ovsdb_table *rbac_roles_table = NULL;
 
     ovs_cmdl_proctitle_init(argc, argv);
     set_program_name(argv[0]);
@@ -278,7 +286,7 @@  main(int argc, char *argv[])
 
     bool active = false;
     parse_options(&argc, &argv, &remotes, &unixctl_path, &run_command,
-                  &sync_from, &sync_exclude, &active);
+                  &sync_from, &sync_exclude, &active, &rbac);
     is_backup = sync_from && !active;
 
     daemon_become_new_user(false);
@@ -431,6 +439,13 @@  main(int argc, char *argv[])
         ovsdb_replication_init(sync_from, sync_exclude, &all_dbs, server_uuid);
     }
 
+    error = ovsdb_parse_rbac(&all_dbs, rbac, &rbac_roles_table);
+    if (error) {
+        ovs_fatal(0, "%s", error);
+    } else {
+        ovsdb_rbac_set_roles(rbac_roles_table);
+    }
+
     main_loop(jsonrpc, &all_dbs, unixctl, &remotes, run_process, &exiting,
               &is_backup);
 
@@ -660,6 +675,41 @@  query_db_string(const struct shash *all_dbs, const char *name,
     }
 }
 
+static char *
+ovsdb_parse_rbac(const struct shash *all_dbs, char *rbac,
+                 const struct ovsdb_table **rbac_roles_table)
+{
+    const char *db_name, *table_name;
+    const struct ovsdb_table *table;
+    const struct db *db;
+    char *save_ptr = NULL;
+
+    if (!rbac) {
+        return NULL;
+    }
+
+    strtok_r(rbac, ":", &save_ptr); /* "db:" */
+    db_name = strtok_r(NULL, ",", &save_ptr);
+    table_name = strtok_r(NULL, ",", &save_ptr);
+    if (!db_name || !table_name) {
+        return xasprintf("\"%s\": invalid syntax", rbac);
+    }
+
+    db = shash_find_data(all_dbs, db_name);
+    if (!db) {
+        return xasprintf("\"%s\": no database named %s", rbac, db_name);
+    }
+
+    table = ovsdb_get_table(db->db, table_name);
+    if (!table) {
+        return xasprintf("\"%s\": no table named %s", rbac, table_name);
+    }
+
+    *rbac_roles_table = table;
+    free(rbac);
+    return NULL;
+}
+
 static struct ovsdb_jsonrpc_options *
 add_remote(struct shash *remotes, const char *target)
 {
@@ -683,7 +733,7 @@  add_manager_options(struct shash *remotes, const struct ovsdb_row *row)
     struct ovsdb_jsonrpc_options *options;
     long long int max_backoff, probe_interval;
     bool read_only;
-    const char *target, *dscp_string;
+    const char *target, *dscp_string, *role;
 
     if (!ovsdb_util_read_string_column(row, "target", &target) || !target) {
         VLOG_INFO_RL(&rl, "Table `%s' has missing or invalid `target' column",
@@ -702,6 +752,10 @@  add_manager_options(struct shash *remotes, const struct ovsdb_row *row)
     if (ovsdb_util_read_bool_column(row, "read_only", &read_only)) {
         options->read_only = read_only;
     }
+    free(options->role);
+    if (ovsdb_util_read_string_column(row, "role", &role) && role) {
+        options->role = xstrdup(role); /* FIXME: free role */
+    }
 
     options->dscp = DSCP_DEFAULT;
     dscp_string = ovsdb_util_read_map_string_column(row, "other_config",
@@ -1339,7 +1393,8 @@  ovsdb_server_get_sync_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
 static void
 parse_options(int *argcp, char **argvp[],
               struct sset *remotes, char **unixctl_pathp, char **run_command,
-              char **sync_from, char **sync_exclude, bool *active)
+              char **sync_from, char **sync_exclude, bool *active,
+              char **rbac)
 {
     enum {
         OPT_REMOTE = UCHAR_MAX + 1,
@@ -1350,6 +1405,7 @@  parse_options(int *argcp, char **argvp[],
         OPT_SYNC_FROM,
         OPT_SYNC_EXCLUDE,
         OPT_ACTIVE,
+        OPT_RBAC,
         VLOG_OPTION_ENUMS,
         DAEMON_OPTION_ENUMS,
         SSL_OPTION_ENUMS,
@@ -1370,6 +1426,7 @@  parse_options(int *argcp, char **argvp[],
         {"sync-from",   required_argument, NULL, OPT_SYNC_FROM},
         {"sync-exclude-tables", required_argument, NULL, OPT_SYNC_EXCLUDE},
         {"active", no_argument, NULL, OPT_ACTIVE},
+        {"rbac", required_argument, NULL, OPT_RBAC},
         {NULL, 0, NULL, 0},
     };
     char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
@@ -1378,6 +1435,7 @@  parse_options(int *argcp, char **argvp[],
 
     *sync_from = NULL;
     *sync_exclude = NULL;
+    *rbac = NULL;
     sset_init(remotes);
     for (;;) {
         int c;
@@ -1456,6 +1514,10 @@  parse_options(int *argcp, char **argvp[],
             *active = true;
             break;
 
+        case OPT_RBAC:
+            *rbac = xstrdup(optarg);
+            break;
+
         case '?':
             exit(EXIT_FAILURE);
 
diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
index 8d7e76a..fb8eb1e 100644
--- a/ovsdb/ovsdb-tool.c
+++ b/ovsdb/ovsdb-tool.c
@@ -366,7 +366,7 @@  transact(bool read_only, int argc, char *argv[])
     check_ovsdb_error(ovsdb_file_open(db_file_name, read_only, &db, NULL));
 
     request = parse_json(transaction);
-    result = ovsdb_execute(db, NULL, request, false, 0, NULL);
+    result = ovsdb_execute(db, NULL, request, false, NULL, NULL, 0, NULL);
     json_destroy(request);
 
     print_and_free_json(result);
diff --git a/ovsdb/ovsdb-util.c b/ovsdb/ovsdb-util.c
index 647f5df..3700b0d 100644
--- a/ovsdb/ovsdb-util.c
+++ b/ovsdb/ovsdb-util.c
@@ -14,6 +14,7 @@ 
  */
 
 #include <config.h>
+
 #include "row.h"
 #include "sset.h"
 #include "table.h"
@@ -88,6 +89,44 @@  ovsdb_util_read_map_string_column(const struct ovsdb_row *row,
     return atom_value ? atom_value->string : NULL;
 }
 
+/* Read string-uuid key-values from a map.  Returns the row associated with
+ * 'key', if found, or NULL */
+const struct ovsdb_row *
+ovsdb_util_read_map_string_uuid_column(const struct ovsdb_row *row,
+                                       const char *column_name,
+                                       const char *key)
+{
+    union ovsdb_atom *atom_key = NULL, *atom_value = NULL;
+    const struct ovsdb_table *ref_table;
+    const struct ovsdb_column *column;
+    const struct ovsdb_datum *datum;
+    size_t i;
+
+    column = ovsdb_table_schema_get_column(row->table->schema, column_name);
+    if (!column ||
+        column->type.key.type != OVSDB_TYPE_STRING ||
+        column->type.value.type != OVSDB_TYPE_UUID) {
+        return NULL;
+    }
+
+    datum = &row->fields[column->index];
+
+    for (i = 0; i < datum->n; i++) {
+        atom_key = &datum->keys[i];
+        if (!strcmp(atom_key->string, key)) {
+            atom_value = &datum->values[i];
+            break;
+        }
+    }
+
+    if (!atom_value) {
+        return NULL;
+    }
+    ref_table = column->type.value.u.uuid.refTable;
+    return ovsdb_table_get_row(ref_table, &atom_value->uuid);
+}
+
+
 const union ovsdb_atom *
 ovsdb_util_read_column(const struct ovsdb_row *row, const char *column_name,
                        enum ovsdb_atomic_type type)
diff --git a/ovsdb/ovsdb-util.h b/ovsdb/ovsdb-util.h
index effbec8..55bdbe6 100644
--- a/ovsdb/ovsdb-util.h
+++ b/ovsdb/ovsdb-util.h
@@ -25,6 +25,9 @@  struct ovsdb_datum *ovsdb_util_get_datum(struct ovsdb_row *row,
 const char *ovsdb_util_read_map_string_column(const struct ovsdb_row *row,
                                               const char *column_name,
                                               const char *key);
+const struct ovsdb_row *ovsdb_util_read_map_string_uuid_column(const struct ovsdb_row *r,
+                                                    const char *column_name,
+                                                    const char *key);
 const union ovsdb_atom *ovsdb_util_read_column(const struct ovsdb_row *row,
                                                const char *column_name,
                                                enum ovsdb_atomic_type type);
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index fc45c80..793f829 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -73,6 +73,7 @@  struct ovsdb_table *ovsdb_get_table(const struct ovsdb *, const char *);
 
 struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
                            const struct json *params, bool read_only,
+                           const char *role, const char *id,
                            long long int elapsed_msec,
                            long long int *timeout_msec);
 
diff --git a/ovsdb/rbac.c b/ovsdb/rbac.c
new file mode 100644
index 0000000..b6aaaa8
--- /dev/null
+++ b/ovsdb/rbac.c
@@ -0,0 +1,449 @@ 
+/*
+ * Copyright (c) 2017 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <config.h>
+#include <limits.h>
+
+#include "column.h"
+#include "condition.h"
+#include "file.h"
+#include "mutation.h"
+#include "ovsdb-data.h"
+#include "ovsdb-error.h"
+#include "ovsdb-parser.h"
+#include "ovsdb.h"
+#include "query.h"
+#include "row.h"
+#include "server.h"
+#include "table.h"
+#include "timeval.h"
+#include "transaction.h"
+#include "ovsdb-util.h"
+#include "condition.h"
+#include "rbac.h"
+#include "openvswitch/vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(ovsdb_rbac);
+
+/* FIXME: this needs to be per-db, not global */
+const struct ovsdb_table *rbac_roles_table;
+
+void
+ovsdb_rbac_set_roles(const struct ovsdb_table *table)
+{
+    rbac_roles_table = table;
+}
+
+static const struct ovsdb_row *
+ovsdb_find_row_by_string_key(const struct ovsdb_table *table,
+                             const char *column_name,
+                             const char *key)
+{
+    const struct ovsdb_column *column;
+    const struct ovsdb_row *row;
+
+    column = ovsdb_table_schema_get_column(table->schema, column_name);
+
+    if (column) {
+        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+            const struct ovsdb_datum *datum;
+            size_t i;
+
+            datum = &row->fields[column->index];
+            for (i = 0; i < datum->n; i++) {
+                if (datum->keys[i].string[0] &&
+                    !strcmp(key, datum->keys[i].string)) {
+                    return row;
+                }
+            }
+        }
+    }
+    return NULL;
+}
+
+static const struct ovsdb_row *
+ovsdb_rbac_lookup_perms(const char *role, const char *table)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_row *role_row, *perm_row;
+    const struct ovsdb_column *column;
+
+    /* Lookup role in roles table */
+    role_row = ovsdb_find_row_by_string_key(rbac_roles_table, "name", role);
+    if (!role_row) {
+        VLOG_INFO_RL(&rl, "rbac: role \"%s\" not found in rbac roles table",
+                     role);
+        return NULL;
+    }
+
+    /* Find row in permissions column for table from "permissions" column */
+    column = ovsdb_table_schema_get_column(role_row->table->schema,
+                                           "permissions");
+    if (!column) {
+        VLOG_INFO_RL(&rl, "rbac: \"permissions\" column not present in rbac "
+                  "roles table");
+        return NULL;
+    }
+    perm_row = ovsdb_util_read_map_string_uuid_column(role_row, "permissions",
+                                                      table);
+
+    return perm_row;
+}
+
+static bool
+ovsdb_rbac_authorized(const struct ovsdb_row *perms,
+                      const char *id,
+                      const struct ovsdb_row *row)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_datum *datum;
+    size_t i;
+
+    datum = ovsdb_util_get_datum(CONST_CAST(struct ovsdb_row *, perms),
+                                 "authorization",
+                                 OVSDB_TYPE_STRING, OVSDB_TYPE_VOID, UINT_MAX);
+
+    if (!datum) {
+        VLOG_INFO_RL(&rl, "rbac: error reading authorization column");
+        return false;
+    }
+
+    for (i = 0; i < datum->n; i++) {
+        char *name = datum->keys[i].string;
+        const char *value = NULL;
+        bool is_map;
+
+        if (name[0] == '\0') {
+            /* empty string means all are authorized */
+            return true;
+        }
+
+        is_map = strchr(name, ':') != NULL;
+
+        if (is_map) {
+            char *tmp = xstrdup(name);
+            char *col_name, *key, *save_ptr = NULL;
+            col_name = strtok_r(name, ":", &save_ptr);
+            key = strtok_r(NULL, ":", &save_ptr);
+
+            if (col_name && key) {
+                value = ovsdb_util_read_map_string_column(row, col_name, key);
+            }
+            free(tmp);
+        } else {
+            ovsdb_util_read_string_column(row, name, &value);
+        }
+        if (value && !strcmp(value, id)) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+bool
+ovsdb_rbac_insert(const struct ovsdb_table *table,
+                  const char *role, const char *id)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_row *perms;
+    bool insdel;
+
+    if (!rbac_roles_table || !role || *role == '\0') {
+        return true;
+    }
+
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    if (!ovsdb_util_read_bool_column(perms, "insert_delete", &insdel)) {
+        return false;
+    }
+
+    if (insdel) {
+        return true;
+    }
+
+denied:
+    return false;
+}
+
+struct rbac_delete_cbdata {
+    const struct ovsdb_table *table;
+    const struct ovsdb_row *perms;
+    const char *role;
+    const char *id;
+    bool permitted;
+};
+
+static bool
+rbac_delete_cb(const struct ovsdb_row *row, void *rd_)
+{
+    struct rbac_delete_cbdata *rd = rd_;
+    bool insdel;
+
+    if (!ovsdb_rbac_authorized(rd->perms, rd->id, row)) {
+        goto denied;
+    }
+
+    if (!ovsdb_util_read_bool_column(rd->perms, "insert_delete", &insdel)) {
+        goto denied;
+    }
+
+    if (!insdel) {
+        goto denied;
+    }
+    return true;
+
+denied:
+    rd->permitted = false;
+    return false;
+}
+
+bool
+ovsdb_rbac_delete(struct ovsdb_table *table,
+                  struct ovsdb_condition *condition,
+                  const char *role, const char *id)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_row *perms;
+    struct rbac_delete_cbdata rd;
+
+    if (!rbac_roles_table || !role || *role == '\0') {
+        return true;
+    }
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    rd.permitted = true;
+    rd.perms = perms;
+    rd.table = table;
+    rd.role = role;
+    rd.id = id;
+
+    ovsdb_query(table, condition, rbac_delete_cb, &rd);
+
+    if (rd.permitted) {
+        return true;
+    }
+
+denied:
+    return false;
+}
+
+struct rbac_update_cbdata {
+    const struct ovsdb_table *table;
+    const struct ovsdb_column_set *columns; /* columns to be modified */
+    const struct ovsdb_datum *modifiable; /* modifiable column names */
+    const struct ovsdb_row *perms;
+    const char *role;
+    const char *id;
+    bool permitted;
+};
+
+static bool
+rbac_column_modification_permitted(const struct ovsdb_column *column,
+                                   const struct ovsdb_datum *modifiable)
+{
+    size_t i;
+
+    for (i = 0; i < modifiable->n; i++) {
+        char *name = modifiable->keys[i].string;
+
+        if (!strcmp(name, column->name)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+static bool
+rbac_update_cb(const struct ovsdb_row *row, void *ru_)
+{
+    struct rbac_update_cbdata *ru = ru_;
+    size_t i;
+
+    if (!ovsdb_rbac_authorized(ru->perms, ru->id, row)) {
+        goto denied;
+    }
+
+    for (i = 0; i < ru->columns->n_columns; i++) {
+        const struct ovsdb_column *column = ru->columns->columns[i];
+
+        if (!rbac_column_modification_permitted(column, ru->modifiable)) {
+            goto denied;
+        }
+    }
+    return true;
+
+denied:
+    ru->permitted = false;
+    return false;
+}
+
+bool
+ovsdb_rbac_update(struct ovsdb_table *table,
+                  struct ovsdb_column_set *columns,
+                  struct ovsdb_condition *condition,
+                  const char *role, const char *id)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_datum *datum;
+    const struct ovsdb_row *perms;
+    struct rbac_update_cbdata ru;
+
+    if (!rbac_roles_table || !role || *role == '\0') {
+        return true;
+    }
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    datum = ovsdb_util_get_datum(CONST_CAST(struct ovsdb_row *, perms),
+                                 "update",
+                                 OVSDB_TYPE_STRING, OVSDB_TYPE_VOID, UINT_MAX);
+
+    if (!datum) {
+        VLOG_INFO_RL(&rl, "ovsdb_rbac_update: could not read \"update\" "
+                     "column");
+        goto denied;
+    }
+
+    ru.table = table;
+    ru.columns = columns;
+    ru.role = role;
+    ru.id = id;
+    ru.perms = perms;
+    ru.modifiable = datum;
+    ru.permitted = true;
+
+    ovsdb_query(table, condition, rbac_update_cb, &ru);
+
+    if (ru.permitted) {
+        return true;
+    }
+
+denied:
+    return false;
+}
+
+struct rbac_mutate_cbdata {
+    const struct ovsdb_table *table;
+    const struct ovsdb_mutation_set *mutations; /* columns to be mutated */
+    const struct ovsdb_datum *modifiable; /* modifiable column names */
+    const struct ovsdb_row *perms;
+    const char *role;
+    const char *id;
+    bool permitted;
+};
+
+static bool
+rbac_mutate_cb(const struct ovsdb_row *row, void *rm_)
+{
+    struct rbac_mutate_cbdata *rm = rm_;
+    size_t i;
+
+    if (!ovsdb_rbac_authorized(rm->perms, rm->id, row)) {
+        goto denied;
+    }
+
+    for (i = 0; i < rm->mutations->n_mutations; i++) {
+        const struct ovsdb_column *column = rm->mutations->mutations[i].column;
+
+        if (!rbac_column_modification_permitted(column, rm->modifiable)) {
+            goto denied;
+        }
+    }
+
+    return true;
+
+denied:
+    rm->permitted = false;
+    return false;
+}
+
+bool
+ovsdb_rbac_mutate(struct ovsdb_table *table,
+                  struct ovsdb_mutation_set *mutations,
+                  struct ovsdb_condition *condition,
+                  const char *role, const char *id)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct ovsdb_datum *datum;
+    const struct ovsdb_row *perms;
+    struct rbac_mutate_cbdata rm;
+
+    if (!rbac_roles_table || !role || *role == '\0') {
+        return true;
+    }
+    if (!id) {
+        goto denied;
+    }
+
+    perms = ovsdb_rbac_lookup_perms(role, ts->name);
+
+    if (!perms) {
+        goto denied;
+    }
+
+    datum = ovsdb_util_get_datum(CONST_CAST(struct ovsdb_row *, perms),
+                                 "update",
+                                 OVSDB_TYPE_STRING, OVSDB_TYPE_VOID, UINT_MAX);
+
+    if (!datum) {
+        VLOG_INFO_RL(&rl, "ovsdb_rbac_mutate: could not read \"update\" "
+                     "column");
+        goto denied;
+    }
+
+    rm.table = table;
+    rm.mutations = mutations;
+    rm.role = role;
+    rm.id = id;
+    rm.perms = perms;
+    rm.modifiable = datum;
+    rm.permitted = true;
+
+    ovsdb_query(table, condition, rbac_mutate_cb, &rm);
+
+    if (rm.permitted) {
+        return true;
+    }
+
+denied:
+    return false;
+}
diff --git a/ovsdb/rbac.h b/ovsdb/rbac.h
new file mode 100644
index 0000000..b5bb9e0
--- /dev/null
+++ b/ovsdb/rbac.h
@@ -0,0 +1,36 @@ 
+/*
+ * Copyright (c) 2017 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVSDB_RBAC_H
+#define OVSDB_RBAC_H 1
+struct ovsdb_mutation_set;
+
+bool ovsdb_rbac_insert(const struct ovsdb_table *,
+                       const char *, const char *);
+bool ovsdb_rbac_delete(struct ovsdb_table *table,
+                       struct ovsdb_condition *condition,
+                       const char *role, const char *id);
+bool ovsdb_rbac_update(struct ovsdb_table *table,
+                       struct ovsdb_column_set *columns,
+                       struct ovsdb_condition *condition,
+                       const char *role, const char *id);
+bool ovsdb_rbac_mutate(struct ovsdb_table *table,
+                       struct ovsdb_mutation_set *mutations,
+                       struct ovsdb_condition *condition,
+                       const char *role, const char *id);
+void ovsdb_rbac_set_roles(const struct ovsdb_table *);
+
+#endif /* ovsdb/rbac.h */
diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c
index a859983..94b95ea 100644
--- a/ovsdb/trigger.c
+++ b/ovsdb/trigger.c
@@ -32,7 +32,8 @@  void
 ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
                    struct ovsdb_trigger *trigger,
                    struct json *request, long long int now,
-                   bool read_only)
+                   bool read_only, const char *role,
+                   const char *id)
 {
     trigger->session = session;
     trigger->db = db;
@@ -42,6 +43,8 @@  ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
     trigger->created = now;
     trigger->timeout_msec = LLONG_MAX;
     trigger->read_only = read_only;
+    trigger->role = role ? xstrdup(role): NULL;
+    trigger->id = id ? xstrdup(id): NULL;
     ovsdb_trigger_try(trigger, now);
 }
 
@@ -51,6 +54,8 @@  ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
     ovs_list_remove(&trigger->node);
     json_destroy(trigger->request);
     json_destroy(trigger->result);
+    free(trigger->role);
+    free(trigger->id);
 }
 
 bool
@@ -114,6 +119,7 @@  ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
 {
     t->result = ovsdb_execute(t->db, t->session,
                               t->request, t->read_only,
+                              t->role, t->id,
                               now - t->created, &t->timeout_msec);
     if (t->result) {
         ovsdb_trigger_complete(t);
diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h
index c8474a4..90246a4 100644
--- a/ovsdb/trigger.h
+++ b/ovsdb/trigger.h
@@ -30,12 +30,15 @@  struct ovsdb_trigger {
     long long int created;      /* Time created. */
     long long int timeout_msec; /* Max wait duration. */
     bool read_only;             /* Database is in read only mode. */
+    char *role;                 /* Role, for role-based access controls. */
+    char *id;                   /* ID, for role-based access controls. */
 };
 
 void ovsdb_trigger_init(struct ovsdb_session *, struct ovsdb *,
                         struct ovsdb_trigger *,
                         struct json *request, long long int now,
-                        bool read_only);
+                        bool read_only, const char *role,
+                        const char *id);
 void ovsdb_trigger_destroy(struct ovsdb_trigger *);
 
 bool ovsdb_trigger_is_complete(const struct ovsdb_trigger *);
diff --git a/tests/automake.mk b/tests/automake.mk
index c6bd120..0c28c47 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -82,6 +82,7 @@  TESTSUITE_AT = \
 	tests/ovsdb-monitor.at \
 	tests/ovsdb-idl.at \
 	tests/ovsdb-lock.at \
+	tests/ovsdb-rbac.at \
 	tests/ovs-vsctl.at \
 	tests/ovs-xapi-sync.at \
 	tests/stp.at \
diff --git a/tests/ovsdb-rbac.at b/tests/ovsdb-rbac.at
new file mode 100644
index 0000000..84b2db1
--- /dev/null
+++ b/tests/ovsdb-rbac.at
@@ -0,0 +1,253 @@ 
+AT_BANNER([OVSDB -- ovsdb-server rbac])
+
+AT_SETUP([ovsdb-server/rbac 2])
+AT_KEYWORDS([ovsdb server rbac])
+AT_SKIP_IF([test "$HAVE_OPENSSL" = no])
+
+RBAC_PKIDIR="$(pwd)"
+RBAC_PKI="sh $abs_top_srcdir/utilities/ovs-pki.in --dir=$RBAC_PKIDIR/pki --log=$RBAC_PKIDIR/rbac-pki.log"
+$RBAC_PKI -B 1024 init
+$RBAC_PKI -B 1024 req+sign ovsdb-server switch
+$RBAC_PKI -B 1024 req+sign client-1 switch
+$RBAC_PKI -B 1024 req+sign client-2 switch
+
+AT_DATA([schema],
+  [[{"name": "mydb",
+     "tables": {
+       "Root": {
+         "columns": {
+           "connections": {
+             "type": {
+               "key": {"type": "uuid", "refTable": "Connection"},
+               "min": 0,
+               "max": "unlimited"}}},
+          "isRoot": true},
+       "Connection": {
+         "columns": {
+           "target": {
+             "type": "string"},
+           "role": {
+             "type": "string"}}},
+        "RBAC_Role": {
+            "columns": {
+                "name": {"type": "string"},
+                "permissions": {
+                    "type": {"key": {"type": "string"},
+                             "value": {"type": "uuid",
+                                       "refTable": "RBAC_Permission",
+                                       "refType": "weak"},
+                                     "min": 0, "max": "unlimited"}}},
+            "isRoot": true},
+        "RBAC_Permission": {
+            "columns": {
+                "table": {"type": "string"},
+                "authorization": {"type": {"key": "string",
+                                           "min": 0,
+                                           "max": "unlimited"}},
+                "insert_delete": {"type": "boolean"},
+                "update" : {"type": {"key": "string",
+                                     "min": 0,
+                                     "max": "unlimited"}}},
+            "isRoot": true},
+       "fixed_colors": {
+         "columns": {
+           "name": {"type": "string"},
+           "value": {"type": "integer"}},
+         "indexes": [["name"]],
+         "isRoot": true},
+       "user_colors": {
+         "columns": {
+           "creator": {"type": "string"},
+           "name": {"type": "string"},
+           "value": {"type": "integer"}},
+         "indexes": [["name"]],
+         "isRoot": true}
+    },
+     "version": "5.1.3",
+     "cksum": "12345678 9"
+}
+]])
+
+AT_CHECK([ovsdb-tool create db schema], [0], [ignore], [ignore])
+AT_CHECK(
+  [[ovsdb-tool transact db \
+     '["mydb",
+       {"op": "insert",
+        "table": "Root",
+        "row": {
+          "connections": ["set", [["named-uuid", "x"]]]}},
+       {"op": "insert",
+        "table": "Connection",
+        "uuid-name": "x",
+        "row": {"target": "pssl:0:127.0.0.1",
+                "role": "testrole"}},
+       {"op": "insert",
+        "table": "fixed_colors",
+        "row": {"name": "red",
+                "value": '16711680'}},
+       {"op": "insert",
+        "table": "RBAC_Role",
+        "row": {"name": "testrole",
+                "permissions": ["map", [["user_colors", ["named-uuid", "y"]]]]}},
+       {"op": "insert",
+        "table": "RBAC_Permission",
+        "uuid-name": "y",
+        "row": {"authorization": "creator",
+                "insert_delete": true,
+                "table": "fixed_colors",
+                "update": ["set", ["name", "value"]]}}
+]']], [0], [ignore], [ignore])
+
+AT_CHECK([ovsdb-server --log-file --detach --no-chdir --pidfile --remote=db:mydb,Root,connections \
+        --private-key=$RBAC_PKIDIR/ovsdb-server-privkey.pem \
+        --certificate=$RBAC_PKIDIR/ovsdb-server-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        --rbac=db:mydb,RBAC_Role \
+        db], [0], [ignore], [ignore])
+PARSE_LISTENING_PORT([ovsdb-server.log], [SSL_PORT])
+
+# Test 1:
+# Attempt to insert a row into the "fixed_colors" table.  This should
+# fail as there are no permissions for role "testrole" for this table.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "insert",
+          "table": "fixed_colors",
+          "row": {"name": "chartreuse", "value": '8388352'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-1\" role \"testrole\" prohibit row insertion into table \"fixed_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 2:
+# Attempt to insert a row into the "user_colors" table.  This should
+# succeed since role "testrole" has permissions for this table that
+# allow row insertion.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "insert",
+          "table": "user_colors",
+          "row": {"creator": "client-1", "name": "chartreuse", "value": '8388352'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"uuid":["uuid","<0>"]}]]
+], [ignore])
+
+# Test 3:
+# Attempt to update a column in the "user_colors" table.  This should
+# succeed since role "testrole" has permissions for this table that
+# allow update of the "value" column when ID is equal to the value in
+# the "creator" column.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "update",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "row": {"value": '8388353'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"count":1}]]
+], [ignore])
+
+# Test 4:
+# Attempt to update a column in the "user_colors" table.  Same as
+# previous test, but with a different client ID. This should fail
+# the RBAC authorization test because "client-2" does not match the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-2-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-2-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "update",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "row": {"value": '8388354'}}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-2\" role \"testrole\" prohibit modification of table \"user_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 5:
+# Attempt to mutate a column in the "user_colors" table.  This should
+# succeed since role "testrole" has permissions for this table that
+# allow update of the "value" column when ID is equal to the value in
+# the "creator" column.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "mutate",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "mutations": [["value", "+=", '10']]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"count":1}]]
+], [ignore])
+
+# Test 6:
+# Attempt to mutate a column in the "user_colors" table.  Same as
+# previous test, but with a different client ID. This should fail
+# the RBAC authorization test because "client-2" does not match the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-2-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-2-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "mutate",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]],
+          "mutations": [["value", "+=", '10']]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-2\" role \"testrole\" prohibit mutate operation on table \"user_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 7:
+# Attempt to delete a row from the "user_colors" table. This should fail
+# the RBAC authorization test because "client-2" does not match the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-2-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-2-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "delete",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"details":"RBAC rules for client \"client-2\" role \"testrole\" prohibit row deletion from table \"user_colors\".","error":"permission error"}]]
+], [ignore])
+
+# Test 8:
+# Attempt to delete a row from the "user_colors" table. This should pass
+# the RBAC authorization test because "client-1" does matches the
+# "creator" column for this row.
+AT_CHECK([ovsdb-client transact ssl:127.0.0.1:$SSL_PORT \
+        --private-key=$RBAC_PKIDIR/client-1-privkey.pem \
+        --certificate=$RBAC_PKIDIR/client-1-cert.pem \
+        --ca-cert=$RBAC_PKIDIR/pki/switchca/cacert.pem \
+        ['["mydb",
+         {"op": "delete",
+          "table": "user_colors",
+          "where": [["name", "==", "chartreuse"]]}
+         ]']], [0], [stdout], [ignore])
+cat stdout >> output
+AT_CHECK([${PERL} $srcdir/uuidfilt.pl stdout], [0], [[[{"count":1}]]
+], [ignore])
+
+OVSDB_SERVER_SHUTDOWN
+AT_CLEANUP
diff --git a/tests/ovsdb.at b/tests/ovsdb.at
index fe617a5..8a389b8 100644
--- a/tests/ovsdb.at
+++ b/tests/ovsdb.at
@@ -148,3 +148,4 @@  m4_include([tests/ovsdb-server.at])
 m4_include([tests/ovsdb-monitor.at])
 m4_include([tests/ovsdb-idl.at])
 m4_include([tests/ovsdb-lock.at])
+m4_include([tests/ovsdb-rbac.at])
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index 09e4f0d..f8cf1cd 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -1435,7 +1435,7 @@  do_execute__(struct ovs_cmdl_context *ctx, bool ro)
         char *s;
 
         params = parse_json(ctx->argv[i]);
-        result = ovsdb_execute(db, NULL, params, ro,  0, NULL);
+        result = ovsdb_execute(db, NULL, params, ro,  NULL, NULL, 0, NULL);
         s = json_to_string(result, JSSF_SORT);
         printf("%s\n", s);
         free(s);
@@ -1513,7 +1513,8 @@  do_trigger(struct ovs_cmdl_context *ctx)
             json_destroy(params);
         } else {
             struct test_trigger *t = xmalloc(sizeof *t);
-            ovsdb_trigger_init(&session, db, &t->trigger, params, now, false);
+            ovsdb_trigger_init(&session, db, &t->trigger, params, now, false,
+                               NULL, NULL);
             t->number = number++;
             if (ovsdb_trigger_is_complete(&t->trigger)) {
                 do_trigger_dump(t, now, "immediate");