From patchwork Tue Mar 29 21:30:08 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Cabrera Vega, Mario Alberto" X-Patchwork-Id: 603155 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from archives.nicira.com (archives.nicira.com [96.126.127.54]) by ozlabs.org (Postfix) with ESMTP id 3qZP8n6Fy3z9sBf for ; Wed, 30 Mar 2016 08:31:05 +1100 (AEDT) Received: from archives.nicira.com (localhost [127.0.0.1]) by archives.nicira.com (Postfix) with ESMTP id D0CF210603; Tue, 29 Mar 2016 14:30:53 -0700 (PDT) X-Original-To: dev@openvswitch.org Delivered-To: dev@openvswitch.org Received: from mx3v3.cudamail.com (mx3.cudamail.com [64.34.241.5]) by archives.nicira.com (Postfix) with ESMTPS id C8D85105FB for ; Tue, 29 Mar 2016 14:30:50 -0700 (PDT) Received: from bar6.cudamail.com (localhost [127.0.0.1]) by mx3v3.cudamail.com (Postfix) with ESMTPS id 604121621AC for ; Tue, 29 Mar 2016 15:30:50 -0600 (MDT) X-ASG-Debug-ID: 1459287047-0b323745eb01840001-byXFYA Received: from mx3-pf2.cudamail.com ([192.168.14.1]) by bar6.cudamail.com with ESMTP id YvAg4GnqzEVm78xr (version=TLSv1 cipher=DHE-RSA-AES256-SHA bits=256 verify=NO) for ; Tue, 29 Mar 2016 15:30:47 -0600 (MDT) X-Barracuda-Envelope-From: mario.cabrera@hpe.com X-Barracuda-RBL-Trusted-Forwarder: 192.168.14.1 Received: from unknown (HELO g1t5424.austin.hp.com) (15.216.225.54) by mx3-pf2.cudamail.com with ESMTPS (DHE-RSA-AES256-SHA encrypted); 29 Mar 2016 21:30:46 -0000 Received-SPF: none (mx3-pf2.cudamail.com: domain at hpe.com does not designate permitted sender hosts) X-Barracuda-Apparent-Source-IP: 15.216.225.54 X-Barracuda-RBL-IP: 15.216.225.54 Received: from G2W6310.americas.hpqcorp.net (g2w6310.austin.hp.com [16.197.64.52]) (using TLSv1.2 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by g1t5424.austin.hp.com (Postfix) with ESMTPS id 9176944 for ; Tue, 29 Mar 2016 21:30:45 +0000 (UTC) Received: from G2W6310.americas.hpqcorp.net (2002:10ef:4076::10ef:4076) by G2W6310.americas.hpqcorp.net (2002:10ef:4076::10ef:4076) with Microsoft SMTP Server (TLS) id 15.0.1076.9; Tue, 29 Mar 2016 21:30:16 +0000 Received: from G4W6303.americas.hpqcorp.net (16.210.26.228) by G2W6310.americas.hpqcorp.net (16.197.64.52) with Microsoft SMTP Server (TLS) id 15.0.1076.9 via Frontend Transport; Tue, 29 Mar 2016 21:30:16 +0000 Received: from G4W3298.americas.hpqcorp.net ([169.254.4.51]) by G4W6303.americas.hpqcorp.net ([16.210.26.228]) with mapi id 14.03.0169.001; Tue, 29 Mar 2016 21:30:09 +0000 X-CudaMail-Envelope-Sender: mario.cabrera@hpe.com From: "Cabrera Vega, Mario Alberto" To: "dev@openvswitch.org" X-CudaMail-MID: CM-V2-328055445 X-CudaMail-DTE: 032916 X-CudaMail-Originating-IP: 15.216.225.54 Thread-Topic: [PATCH 2/4] ovsdb: Introduce OVSDB replication feature X-ASG-Orig-Subj: [##CM-V2-328055445##][PATCH 2/4] ovsdb: Introduce OVSDB replication feature Thread-Index: AdGKAO2R/p4e+py2RpmGvpKpAvh+nw== Date: Tue, 29 Mar 2016 21:30:08 +0000 Message-ID: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-originating-ip: [16.210.48.19] MIME-Version: 1.0 X-GBUdb-Analysis: 0, 15.216.225.54, Ugly c=0 p=0 Source New X-MessageSniffer-Rules: 0-0-0-32767-c X-Barracuda-Connect: UNKNOWN[192.168.14.1] X-Barracuda-Start-Time: 1459287047 X-Barracuda-Encrypted: DHE-RSA-AES256-SHA X-Barracuda-URL: https://web.cudamail.com:443/cgi-mod/mark.cgi X-Barracuda-BRTS-Status: 1 X-Virus-Scanned: by bsmtpd at cudamail.com X-Barracuda-Spam-Score: 0.10 X-Barracuda-Spam-Status: No, SCORE=0.10 using per-user scores of TAG_LEVEL=3.5 QUARANTINE_LEVEL=1000.0 KILL_LEVEL=4.0 tests=HTML_MESSAGE, RDNS_NONE X-Barracuda-Spam-Report: Code version 3.2, rules version 3.2.3.28278 Rule breakdown below pts rule name description ---- ---------------------- -------------------------------------------------- 0.00 HTML_MESSAGE BODY: HTML included in message 0.10 RDNS_NONE Delivered to trusted network by a host with no rDNS X-Content-Filtered-By: Mailman/MimeDel 2.1.16 Subject: [ovs-dev] [PATCH 2/4] ovsdb: Introduce OVSDB replication feature X-BeenThere: dev@openvswitch.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@openvswitch.org Sender: "dev" Replication is enabled by using the following option when starting the database server: --sync-from=server Where 'server' can take any form described in the ovsdb-client(1) manpage as an active connection. If this option is specified, the replication process is immediately started. Signed-off-by: Mario Cabrera --- ovsdb/automake.mk | 6 +- ovsdb/ovsdb-server.1.in | 3 + ovsdb/ovsdb-server.c | 46 ++-- ovsdb/replication-syn.man | 2 + ovsdb/replication.c | 597 ++++++++++++++++++++++++++++++++++++++++++++++ ovsdb/replication.h | 39 +++ ovsdb/replication.man | 8 + tests/ovsdb-server.at | 51 ++++ 8 files changed, 725 insertions(+), 27 deletions(-) create mode 100644 ovsdb/replication-syn.man create mode 100644 ovsdb/replication.c create mode 100644 ovsdb/replication.h create mode 100644 ovsdb/replication.man AT_CLEANUP]) EXECUTION_EXAMPLES + +AT_BANNER([OVSDB -- ovsdb-server replication (TCP IPv4 sockets)]) + +# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS]) +# +# Creates two databases with the given SCHEMA, and starts an ovsdb-server on +# each database. +# Runs each of the TRANSACTIONS (which should be a quoted list of +# quoted strings) against one of the servers with ovsdb-client one at a +# time. The server replicates its database to the other ovsdb-server. +# +# Checks that the dump of both databases are the same. +# +# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS. +m4_define([OVSDB_CHECK_EXECUTION], + [AT_SETUP([$1]) + AT_KEYWORDS([ovsdb server tcp replication $5]) + $2 > schema + AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore]) + AT_CHECK([ovsdb-tool create db2 schema], [0], [stdout], [ignore]) + + AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log --pidfile="`pwd`"/pid --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db1], [0], [ignore], [ignore]) + PARSE_LISTENING_PORT([ovsdb-server1.log], [TCP_PORT1]) + + AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server2.log --pidfile="`pwd`"/pid2 --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl2 --sync-from=tcp:127.0.0.1:$TCP_PORT1 db2], [0], [ignore], [ignore]) + PARSE_LISTENING_PORT([ovsdb-server2.log], [TCP_PORT2]) + + m4_foreach([txn], [$3], + [AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT1 'txn'; sleep 2], [0], [stdout], [ignore], + [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`]) + ]) + + AT_CHECK([ovsdb-client dump tcp:127.0.0.1:$TCP_PORT1], [0], [stdout], [ignore], + [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`]) + cat stdout >> dump1 + AT_CHECK([ovsdb-client dump tcp:127.0.0.1:$TCP_PORT2], [0], [stdout], [ignore], + [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`]) + cat stdout >> dump2 + + AT_CHECK([diff dump1 dump2], [0], [], [ignore], + [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`]) + OVSDB_SERVER_SHUTDOWN + OVSDB_SERVER_SHUTDOWN2 + AT_CLEANUP]) + +EXECUTION_EXAMPLES -- 1.9.1 diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk index 7db6fea..099ed3c 100644 --- a/ovsdb/automake.mk +++ b/ovsdb/automake.mk @@ -24,6 +24,8 @@ ovsdb_libovsdb_la_SOURCES = \ ovsdb/monitor.h \ ovsdb/query.c \ ovsdb/query.h \ + ovsdb/replication.c \ + ovsdb/replication.h \ ovsdb/row.c \ ovsdb/row.h \ ovsdb/server.c \ @@ -42,7 +44,9 @@ pkgconfig_DATA += \ MAN_FRAGMENTS += \ ovsdb/remote-active.man \ - ovsdb/remote-passive.man + ovsdb/remote-passive.man \ + ovsdb/replication.man \ + ovsdb/replication-syn.man # ovsdb-tool bin_PROGRAMS += ovsdb/ovsdb-tool diff --git a/ovsdb/ovsdb-server.1.in b/ovsdb/ovsdb-server.1.in index 6c85729..1025ade 100644 --- a/ovsdb/ovsdb-server.1.in +++ b/ovsdb/ovsdb-server.1.in @@ -19,6 +19,7 @@ ovsdb\-server \- Open vSwitch database server .so lib/daemon-syn.man .so lib/service-syn.man .so lib/vlog-syn.man +.so ovsdb/replication-syn.man .so lib/ssl-syn.man .so lib/ssl-bootstrap-syn.man .so lib/ssl-peer-ca-cert-syn.man @@ -100,6 +101,8 @@ configured remotes. .so lib/service.man .SS "Logging Options" .so lib/vlog.man +.SS "Syncing Options" +.so ovsdb/replication.man .SS "Public Key Infrastructure Options" The options described below for configuring the SSL public key infrastructure accept a special syntax for obtaining their diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index fa662b1..63dd209 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -42,6 +42,7 @@ #include "ovsdb-error.h" #include "poll-loop.h" #include "process.h" +#include "replication.h" #include "row.h" #include "simap.h" #include "shash.h" @@ -59,15 +60,7 @@ VLOG_DEFINE_THIS_MODULE(ovsdb_server); -struct db { - /* Initialized in main(). */ - char *filename; - struct ovsdb_file *file; - struct ovsdb *db; - - /* Only used by update_remote_status(). */ - struct ovsdb_txn *txn; -}; +struct db; /* SSL configuration. */ static char *private_key_file; @@ -75,6 +68,9 @@ static char *certificate_file; static char *ca_cert_file; static bool bootstrap_ca_cert; +/* Replication configuration. */ +static bool connect_to_remote_server; + static unixctl_cb_func ovsdb_server_exit; static unixctl_cb_func ovsdb_server_compact; static unixctl_cb_func ovsdb_server_reconnect; @@ -159,6 +155,10 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs, report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error); ovsdb_jsonrpc_server_run(jsonrpc); + if (connect_to_remote_server) { + replication_run(all_dbs); + } + SHASH_FOR_EACH(node, all_dbs) { struct db *db = node->data; ovsdb_trigger_run(db->db, time_msec()); @@ -170,9 +170,9 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs, } } - /* update Manager status(es) every 5 seconds */ + /* update Manager status(es) every 2.5 seconds */ if (time_msec() >= status_timer) { - status_timer = time_msec() + 5000; + status_timer = time_msec() + 2500; update_remote_status(jsonrpc, remotes, all_dbs); } @@ -350,6 +350,7 @@ main(int argc, char *argv[]) sset_destroy(&remotes); sset_destroy(&db_filenames); unixctl_server_destroy(unixctl); + disconnect_remote_server(); if (run_process && process_exited(run_process)) { int status = process_status(run_process); @@ -433,21 +434,6 @@ open_db(struct server_config *config, const char *filename) return error; } -static const struct db * -find_db(const struct shash *all_dbs, const char *db_name) -{ - struct shash_node *node; - - SHASH_FOR_EACH(node, all_dbs) { - struct db *db = node->data; - if (!strcmp(db->db->schema->name, db_name)) { - return db; - } - } - - return NULL; -} - static char * OVS_WARN_UNUSED_RESULT parse_db_column__(const struct shash *all_dbs, const char *name_, char *name, @@ -1278,6 +1264,7 @@ parse_options(int *argcp, char **argvp[], OPT_RUN, OPT_BOOTSTRAP_CA_CERT, OPT_PEER_CA_CERT, + OPT_SYNC_FROM, VLOG_OPTION_ENUMS, DAEMON_OPTION_ENUMS }; @@ -1296,6 +1283,7 @@ parse_options(int *argcp, char **argvp[], {"private-key", required_argument, NULL, 'p'}, {"certificate", required_argument, NULL, 'c'}, {"ca-cert", required_argument, NULL, 'C'}, + {"sync-from", required_argument, NULL, OPT_SYNC_FROM}, {NULL, 0, NULL, 0}, }; char *short_options = ovs_cmdl_long_options_to_short_options(long_options); @@ -1356,6 +1344,11 @@ parse_options(int *argcp, char **argvp[], stream_ssl_set_peer_ca_cert_file(optarg); break; + case OPT_SYNC_FROM: + set_remote_ovsdb_server(optarg); + connect_to_remote_server = true; + break; + case '?': exit(EXIT_FAILURE); @@ -1382,6 +1375,7 @@ usage(void) stream_usage("JSON-RPC", true, true, true); daemon_usage(); vlog_usage(); + replication_usage(); printf("\nOther options:\n" " --run COMMAND run COMMAND as subprocess then exit\n" " --unixctl=SOCKET override default control socket name\n" diff --git a/ovsdb/replication-syn.man b/ovsdb/replication-syn.man new file mode 100644 index 0000000..adfd7c2 --- /dev/null +++ b/ovsdb/replication-syn.man @@ -0,0 +1,2 @@ +.IP "Syncing options:" +[\fB\-\-sync\-from=\fIserver\fR] diff --git a/ovsdb/replication.c b/ovsdb/replication.c new file mode 100644 index 0000000..d9e609e --- /dev/null +++ b/ovsdb/replication.c @@ -0,0 +1,597 @@ +/* + * (c) Copyright 2016 Hewlett Packard Enterprise Development LP + * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "replication.h" + +#include "condition.h" +#include "json.h" +#include "jsonrpc.h" +#include "ovsdb.h" +#include "ovsdb-error.h" +#include "query.h" +#include "row.h" +#include "stream.h" +#include "sset.h" +#include "svec.h" +#include "table.h" +#include "transaction.h" + +static char *remote_ovsdb_server; +static struct jsonrpc *rpc; +static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables); +static bool reset_dbs = true; + +static struct jsonrpc *open_jsonrpc(const char *server); +static struct ovsdb_error *check_jsonrpc_error(int error, + struct jsonrpc_msg **reply_); +static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs); +static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc, + const char *database); + +static void send_monitor_requests(struct shash *all_dbs); +static void add_monitored_table(struct ovsdb_table_schema *table, + struct json *monitor_requests); + +static void get_initial_db_state(const struct db *database); +static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn); +static struct ovsdb_error *reset_databases(struct shash *all_dbs); + +static void check_for_notifications(struct shash *all_dbs); +static void process_notification(struct json *table_updates, + struct ovsdb *database); +static struct ovsdb_error *process_table_update(struct json *table_update, + const char *table_name, + struct ovsdb *database, + struct ovsdb_txn *txn); + +static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn, + const char *uuid, + struct ovsdb_table *table, + struct json *new); +static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn, + const char *uuid, + struct ovsdb_table *table); +static struct ovsdb_error *execute_update(struct ovsdb_txn *txn, + const char *uuid, + struct ovsdb_table *table, + struct json *new); + +void +replication_run(struct shash *all_dbs) +{ + if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) { + /* Reset local databases. */ + if (reset_dbs) { + struct ovsdb_error *error = reset_databases(all_dbs); + if (!error) { + reset_dbs = false; + } + /* In case of success reseting the databases, + * return in order to notify monitors. */ + return; + } + + /* Open JSON-RPC. */ + jsonrpc_close(rpc); + rpc = open_jsonrpc(remote_ovsdb_server); + if (!rpc) { + return; + } + + /* Send monitor requests. */ + send_monitor_requests(all_dbs); + } + if (!sset_is_empty(&monitored_tables)) { + check_for_notifications(all_dbs); + } +} + +void +set_remote_ovsdb_server(const char *remote_server) +{ + remote_ovsdb_server = remote_server ? strdup(remote_server) : NULL; +} + +void +disconnect_remote_server(void) +{ + jsonrpc_close(rpc); + sset_destroy(&monitored_tables); + + if (remote_ovsdb_server) { + free(remote_ovsdb_server); + remote_ovsdb_server = NULL; + } +} + +const struct db * +find_db(const struct shash *all_dbs, const char *db_name) +{ + struct shash_node *node; + + SHASH_FOR_EACH(node, all_dbs) { + struct db *db = node->data; + if (!strcmp(db->db->schema->name, db_name)) { + return db; + } + } + + return NULL; +} + +static struct ovsdb_error * +reset_databases(struct shash *all_dbs) +{ + struct shash_node *db_node; + struct ovsdb_error *error = NULL; + + SHASH_FOR_EACH(db_node, all_dbs) { + struct db *db = db_node->data; + struct ovsdb_txn *txn = ovsdb_txn_create(db->db); + reset_database(db->db, txn); + error = ovsdb_txn_commit(txn, false); + } + + return error; +} + +static void +reset_database(struct ovsdb *db, struct ovsdb_txn *txn) +{ + struct shash_node *table_node; + + SHASH_FOR_EACH(table_node, &db->tables) { + struct ovsdb_table *table = table_node->data; + struct ovsdb_row *row; + + HMAP_FOR_EACH (row, hmap_node, &table->rows) { + ovsdb_txn_row_delete(txn, row); + } + } +} + +static struct jsonrpc * +open_jsonrpc(const char *server) +{ + struct stream *stream; + int error; + + error = stream_open_block(jsonrpc_stream_open(server, &stream, + DSCP_DEFAULT), &stream); + + return error ? NULL : jsonrpc_open(stream); +} + +static struct ovsdb_error * +check_jsonrpc_error(int error, struct jsonrpc_msg **reply_) +{ + struct jsonrpc_msg *reply = *reply_; + + if (error) { + return ovsdb_error("transaction failed", + "transaction returned error %d", + error); + } + + if (reply->error) { + return ovsdb_error("transaction failed", + "transaction returned error: %s", + json_to_string(reply->error, 0)); + } + return NULL; +} + +static void +fetch_dbs(struct jsonrpc *rpc, struct svec *dbs) +{ + struct jsonrpc_msg *request, *reply; + struct ovsdb_error *error; + size_t i; + + request = jsonrpc_create_request("list_dbs", json_array_create_empty(), + NULL); + + error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply), + &reply); + if (error) { + ovsdb_error_assert(error); + return; + } + + if (reply->result->type != JSON_ARRAY) { + ovsdb_error_assert(ovsdb_error("list-dbs failed", + "list_dbs response is not array")); + return; + } + + for (i = 0; i < reply->result->u.array.n; i++) { + const struct json *name = reply->result->u.array.elems[i]; + + if (name->type != JSON_STRING) { + ovsdb_error_assert(ovsdb_error( + "list_dbs failed", + "list_dbs response %"PRIuSIZE" is not string", + i)); + } + svec_add(dbs, name->u.string); + } + jsonrpc_msg_destroy(reply); + svec_sort(dbs); +} + +static struct ovsdb_schema * +fetch_schema(struct jsonrpc *rpc, const char *database) +{ + struct jsonrpc_msg *request, *reply; + struct ovsdb_schema *schema; + struct ovsdb_error *error; + + request = jsonrpc_create_request("get_schema", + json_array_create_1( + json_string_create(database)), + NULL); + error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply), + &reply); + if (error) { + jsonrpc_msg_destroy(reply); + ovsdb_error_assert(error); + return NULL; + } + + error = ovsdb_schema_from_json(reply->result, &schema); + if (error) { + jsonrpc_msg_destroy(reply); + ovsdb_error_assert(error); + return NULL; + } + jsonrpc_msg_destroy(reply); + + return schema; +} + +static void +send_monitor_requests(struct shash *all_dbs) +{ + const char *db_name; + struct svec dbs; + size_t i; + + svec_init(&dbs); + fetch_dbs(rpc, &dbs); + SVEC_FOR_EACH (i, db_name, &dbs) { + const struct db *database = find_db(all_dbs, db_name); + + if (database) { + struct ovsdb_schema *local_schema, *remote_schema; + + local_schema = database->db->schema; + remote_schema = fetch_schema(rpc, db_name); + if (ovsdb_schema_equal(local_schema, remote_schema)) { + struct jsonrpc_msg *request; + struct json *monitor, *monitor_request; + + monitor_request = json_object_create(); + size_t n = shash_count(&local_schema->tables); + const struct shash_node **nodes = shash_sort( + &local_schema->tables); + + for (int j = 0; j < n; j++) { + struct ovsdb_table_schema *table = nodes[j]->data; + add_monitored_table(table, monitor_request); + } + free(nodes); + + /* Send monitor request. */ + monitor = json_array_create_3( + json_string_create(db_name), + json_string_create(db_name), + monitor_request); + request = jsonrpc_create_request("monitor", monitor, NULL); + jsonrpc_send(rpc, request); + get_initial_db_state(database); + } + ovsdb_schema_destroy(remote_schema); + } + } + svec_destroy(&dbs); +} + +static void +get_initial_db_state(const struct db *database) +{ + struct jsonrpc_msg *msg; + + jsonrpc_recv_block(rpc, &msg); + + if (msg->type == JSONRPC_REPLY) { + process_notification(msg->result, database->db); + } +} + +static void +add_monitored_table(struct ovsdb_table_schema *table, + struct json *monitor_request) +{ + struct json *monitor_request_array; + + sset_add(&monitored_tables, table->name); + + monitor_request_array = json_array_create_empty(); + json_array_add(monitor_request_array, json_object_create()); + + json_object_put(monitor_request, table->name, monitor_request_array); +} + +static void +check_for_notifications(struct shash *all_dbs) +{ + struct jsonrpc_msg *msg; + int error; + + error = jsonrpc_recv(rpc, &msg); + if (error == EAGAIN) { + return; + } else if (error) { + rpc = open_jsonrpc(remote_ovsdb_server); + if (!rpc) { + /* Remote server went down. */ + disconnect_remote_server(); + } + jsonrpc_msg_destroy(msg); + return; + } + if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) { + jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params), + msg->id)); + } else if (msg->type == JSONRPC_NOTIFY + && !strcmp(msg->method, "update")) { + struct json *params = msg->params; + if (params->type == JSON_ARRAY + && params->u.array.n == 2) { + char *db_name = params->u.array.elems[0]->u.string; + const struct db *database = find_db(all_dbs, db_name); + if (database) { + process_notification(params->u.array.elems[1], database->db); + } + } + } + jsonrpc_msg_destroy(msg); + jsonrpc_run(rpc); +} + +static void +process_notification(struct json *table_updates, struct ovsdb *database) +{ + struct ovsdb_error *error; + struct ovsdb_txn *txn; + + if (table_updates->type != JSON_OBJECT) { + sset_clear(&monitored_tables); + return; + } + + txn = ovsdb_txn_create(database); + error = NULL; + + /* Process each table update. */ + struct shash_node *node; + SHASH_FOR_EACH(node, json_object(table_updates)) { + struct json *table_update = node->data; + if (table_update) { + error = process_table_update(table_update, node->name, database, txn); + if (error) { + break; + } + } + } + + if (!error){ + /* Commit transaction. */ + error = ovsdb_txn_commit(txn, false); + if (error) { + ovsdb_error_assert(error); + sset_clear(&monitored_tables); + } + } else { + ovsdb_txn_abort(txn); + ovsdb_error_assert(error); + sset_clear(&monitored_tables); + } + + ovsdb_error_destroy(error); +} + +static struct ovsdb_error * +process_table_update(struct json *table_update, const char *table_name, + struct ovsdb *database, struct ovsdb_txn *txn) +{ + struct shash_node *node; + struct ovsdb_table *table; + struct ovsdb_error *error; + + if (table_update->type != JSON_OBJECT) { + error = ovsdb_error("Not a JSON object", + " for table is not object"); + } + + table = ovsdb_get_table(database, table_name); + error = NULL; + + SHASH_FOR_EACH (node, json_object(table_update)) { + struct json *row_update = node->data; + struct json *old, *new; + + if (row_update->type != JSON_OBJECT) { + error = ovsdb_error("NOt a JSON object", + " is not object"); + break; + } + old = shash_find_data(json_object(row_update), "old"); + new = shash_find_data(json_object(row_update), "new"); + + if (!old) { + error = execute_insert(txn, node->name, table, new); + } else{ + if (!new) { + error = execute_delete(txn, node->name, table); + } else { + error = execute_update(txn, node->name, table, new); + } + } + } + return error; +} + +static struct ovsdb_error * +execute_insert(struct ovsdb_txn *txn, const char *uuid, + struct ovsdb_table *table, struct json *json_row) +{ + struct ovsdb_row *row = NULL; + struct uuid row_uuid; + struct ovsdb_error *error; + + row = ovsdb_row_create(table); + error = ovsdb_row_from_json(row, json_row, NULL, NULL); + if (!error) { + /* Add UUID to row. */ + uuid_from_string(&row_uuid, uuid); + *ovsdb_row_get_uuid_rw(row) = row_uuid; + ovsdb_txn_row_insert(txn, row); + } else { + ovsdb_row_destroy(row); + } + + return error; +} + +struct delete_row_cbdata { + size_t n_matches; + const struct ovsdb_table *table; + struct ovsdb_txn *txn; +}; + +static bool +delete_row_cb(const struct ovsdb_row *row, void *dr_) +{ + struct delete_row_cbdata *dr = dr_; + + dr->n_matches++; + ovsdb_txn_row_delete(dr->txn, row); + + return true; +} + +static struct ovsdb_error * +execute_delete(struct ovsdb_txn *txn, const char *uuid, + struct ovsdb_table *table) +{ + const struct json *where; + struct ovsdb_error *error; + struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER; + char where_string[UUID_LEN+29]; + + if (!table) { + return OVSDB_BUG("null table"); + } + + snprintf(where_string, sizeof where_string, "%s%s%s", + "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]"); + + where = json_from_string(where_string); + error = ovsdb_condition_from_json(table->schema, where, NULL, &condition); + if (!error) { + struct delete_row_cbdata dr; + + dr.n_matches = 0; + dr.table = table; + dr.txn = txn; + ovsdb_query(table, &condition, delete_row_cb, &dr); + } + + ovsdb_condition_destroy(&condition); + return error; +} + +struct update_row_cbdata { + size_t n_matches; + struct ovsdb_txn *txn; + const struct ovsdb_row *row; + const struct ovsdb_column_set *columns; +}; + +static bool +update_row_cb(const struct ovsdb_row *row, void *ur_) +{ + struct update_row_cbdata *ur = ur_; + + ur->n_matches++; + if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) { + ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row), + ur->row, ur->columns); + } + + return true; +} + +static struct ovsdb_error * +execute_update(struct ovsdb_txn *txn, const char *uuid, + struct ovsdb_table *table, struct json *json_row) +{ + struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER; + struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER; + struct update_row_cbdata ur; + struct ovsdb_row *row; + struct ovsdb_error *error; + const struct json *where; + char where_string[UUID_LEN+29]; + + snprintf(where_string, sizeof where_string, "%s%s%s", + "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]"); + where = json_from_string(where_string); + + row = ovsdb_row_create(table); + error = ovsdb_row_from_json(row, json_row, NULL, &columns); + if (!error) { + error = ovsdb_condition_from_json(table->schema, where, NULL, + &condition); + } + if (!error) { + ur.n_matches = 0; + ur.txn = txn; + ur.row = row; + ur.columns = &columns; + ovsdb_query(table, &condition, update_row_cb, &ur); + } + + ovsdb_row_destroy(row); + ovsdb_column_set_destroy(&columns); + ovsdb_condition_destroy(&condition); + + return error; +} + +void +replication_usage(void) +{ + printf("\n\ +Syncing options:\n\ + --sync-from=SERVER sync DATABASE from remote SERVER\n\ + --sync-exclude-tables=DB:TABLE,...\n\ + exclude the TABLE in DB from syncing\n"); +} diff --git a/ovsdb/replication.h b/ovsdb/replication.h new file mode 100644 index 0000000..f9b7d63 --- /dev/null +++ b/ovsdb/replication.h @@ -0,0 +1,39 @@ +/* + * (c) Copyright 2016 Hewlett Packard Enterprise Development LP + * Copyright (c) 2009, 2010, 2012, 2013 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef REPLICATION_H +#define REPLICATION_H 1 + +#include "shash.h" + +struct db { + /* Initialized in main(). */ + char *filename; + struct ovsdb_file *file; + struct ovsdb *db; + + /* Only used by update_remote_status(). */ + struct ovsdb_txn *txn; +}; + +void replication_run(struct shash *dbs); +void set_remote_ovsdb_server(const char *remote_server); +void disconnect_remote_server(void); +const struct db *find_db(const struct shash *all_dbs, const char *db_name); +void replication_usage(void); + +#endif /* ovsdb/replication.h */ diff --git a/ovsdb/replication.man b/ovsdb/replication.man new file mode 100644 index 0000000..26420bc --- /dev/null +++ b/ovsdb/replication.man @@ -0,0 +1,8 @@ +The following options allow \fBovsdb\-server\fR to synchronize its databases +with another running \fBovsdb\-server\fR. +.TP +\fB\-\-sync\-from=\fIserver\fR +Causes \fBovsdb\-server\fR to synchronize its databases with the databases in +\fIserver\fR. Every transaction committed by \fIserver\fR will be replicated +to \fBovsdb\-server\fR. \fIserver\fR is an active connection method in one of +the forms documented in \fBovsdb\-client(1). diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at index c869d6f..81209af 100644 --- a/tests/ovsdb-server.at +++ b/tests/ovsdb-server.at @@ -5,6 +5,11 @@ m4_define([OVSDB_SERVER_SHUTDOWN], AT_CHECK([ovs-appctl -t "`pwd`"/unixctl -e exit], [0], [ignore], [ignore]) OVS_WAIT_WHILE([kill -0 `cat savepid`], [kill `cat savepid`])]) +m4_define([OVSDB_SERVER_SHUTDOWN2], + [cp pid2 savepid2 + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], [ignore]) + OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])]) + # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS]) # # Creates a database with the given SCHEMA, starts an ovsdb-server on @@ -963,3 +968,49 @@ m4_define([OVSDB_CHECK_EXECUTION],