From patchwork Thu Feb 28 17:15:16 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Han Zhou X-Patchwork-Id: 1049632 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=openvswitch.org (client-ip=140.211.169.12; helo=mail.linuxfoundation.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=fail (p=none dis=none) header.from=gmail.com Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (2048-bit key; unprotected) header.d=gmail.com header.i=@gmail.com header.b="ciEGF2SL"; dkim-atps=neutral Received: from mail.linuxfoundation.org (mail.linuxfoundation.org [140.211.169.12]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 449KCx4kMvz9s4V for ; Fri, 1 Mar 2019 04:24:13 +1100 (AEDT) Received: from mail.linux-foundation.org (localhost [127.0.0.1]) by mail.linuxfoundation.org (Postfix) with ESMTP id 28573AB7B; Thu, 28 Feb 2019 17:23:07 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@mail.linuxfoundation.org Received: from smtp1.linuxfoundation.org (smtp1.linux-foundation.org [172.17.192.35]) by mail.linuxfoundation.org (Postfix) with ESMTPS id 38283AAF8 for ; Thu, 28 Feb 2019 17:15:38 +0000 (UTC) X-Greylist: whitelisted by SQLgrey-1.7.6 Received: from mail-pf1-f195.google.com (mail-pf1-f195.google.com [209.85.210.195]) by smtp1.linuxfoundation.org (Postfix) with ESMTPS id 5A65782F for ; Thu, 28 Feb 2019 17:15:36 +0000 (UTC) Received: by mail-pf1-f195.google.com with SMTP id g6so10008496pfh.13 for ; Thu, 28 Feb 2019 09:15:36 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=dKDbMUikwPsOb+ph2gxTtuys+UB8v+NFczPPriWGHkU=; b=ciEGF2SLU3QzbsYK3e8LBGz9OPjvQr5I23WXw2nAYEA392rM1xLhQmMH2P17VCdWSp lQD4uKvoMj0VwMPUfnRXL3xs0JaxrKxsjJxA92DF5WwqTrWUKxaNcfMQWI4LUqZm20Mu r6ibIx2gW/ZVjJgYUfPLeBz3IqbVp5A3xCllakcqSTsQz5P4q/4/TBFoVWov6H0I2wG+ bp5pIHjau7INlbKGVB2ndMRyM38uXXbz/sb5I195BjhP7RP62hF1SRELr4InE2HP/98F J441O7NRIBDDvqT4KIH8zpUkXgTSarmw1CbRU3TWSbmRmYs2BY3SjtWzi7VcgpfTu98p NNMw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=dKDbMUikwPsOb+ph2gxTtuys+UB8v+NFczPPriWGHkU=; b=kreEMHiI0U8vGp5/S2N6v25OcTG+pPI7lFGiUPj502/T6zLWrKRGOTtgm19xnNdxDa 3VWIudJLPlgBxQZRImDvb40dRkg5NT83sRxk6suMn0X1mhDIdz5maTODRcdq4PvKSRnx NCmIXjx2S7JqZLKY6qYMiAoAO2XiXwix0e7VxrONfZYufelToAvIO3xdJfFKb7eO2sz2 TVu9Y2l7cd4ny++75ipBEAgtOmtFlpBi9WLwZWQs/HTTD7c2Ed8xHPp1kN+P/EYwRE6s qU0XQeEe8sAMB5Rq4ZTZ72LwCrLMncXr3fe1xdrIT+jKkdhKSJ/7FkNR/mURZI5wZEsZ lyzw== X-Gm-Message-State: APjAAAWIoZFPY6lNTs+Hoe9Kr0utKGAQcll7fUqYTjKctnS+u5OYiASp 06qUHyGwWgMl+G7ahJM3K8VlOGow X-Google-Smtp-Source: APXvYqxXrdXqpedrJJV0A3DrTTLW9UnlYqosKhMUcFzvDxTTvpDGpoyiypAkmZ/LHjhrvNbqhKR/9w== X-Received: by 2002:a63:5b43:: with SMTP id l3mr201139pgm.298.1551374135329; Thu, 28 Feb 2019 09:15:35 -0800 (PST) Received: from localhost.localdomain.localdomain (c-76-21-108-74.hsd1.ca.comcast.net. [76.21.108.74]) by smtp.gmail.com with ESMTPSA id a4sm36972372pga.52.2019.02.28.09.15.34 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 28 Feb 2019 09:15:34 -0800 (PST) From: Han Zhou X-Google-Original-From: Han Zhou To: dev@openvswitch.org Date: Thu, 28 Feb 2019 09:15:16 -0800 Message-Id: <1551374120-44287-2-git-send-email-hzhou8@ebay.com> X-Mailer: git-send-email 2.1.0 In-Reply-To: <1551374120-44287-1-git-send-email-hzhou8@ebay.com> References: <1551374120-44287-1-git-send-email-hzhou8@ebay.com> X-Spam-Status: No, score=-2.0 required=5.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID, DKIM_VALID_AU, FREEMAIL_FROM, RCVD_IN_DNSWL_NONE autolearn=ham version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on smtp1.linux-foundation.org Subject: [ovs-dev] [PATCH v4 1/5] ovsdb-monitor: Refactor ovsdb monitor implementation. X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.12 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , MIME-Version: 1.0 Sender: ovs-dev-bounces@openvswitch.org Errors-To: ovs-dev-bounces@openvswitch.org From: Han Zhou Current ovsdb monitor maintains its own transaction version through an incremental integer and use it to identify changes starting from different version, and also use it to figure out if each set of changes should be flushed. In particular, it uses number 0 to represent that the change set contains all data for initial client population. It is a smart way but it prevents further extension of the monitoring mechanism to support future use case for clients to request changes starting from a given history point. This patch refactors the structures so that change sets are referenced directly through the pointer. It uses additional members such as init_change_set, new_change_set to indicates the specific change set explicitely, instead of through calculated version numbers based on implicite rules. At the same time, this patch provides better encapsulation for change set (composed of data in a list of tables), while still allowing traversing across change sets for a given table. Signed-off-by: Han Zhou --- ovsdb/jsonrpc-server.c | 18 +-- ovsdb/monitor.c | 417 ++++++++++++++++++++++++++----------------------- ovsdb/monitor.h | 10 +- 3 files changed, 236 insertions(+), 209 deletions(-) diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 77f15d9..f9b7c27 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -1216,8 +1216,7 @@ struct ovsdb_jsonrpc_monitor { struct ovsdb *db; struct json *monitor_id; struct ovsdb_monitor *dbmon; - uint64_t unflushed; /* The first transaction that has not been - flushed to the jsonrpc remote client. */ + struct ovsdb_monitor_change_set *change_set; enum ovsdb_monitor_version version; struct ovsdb_monitor_session_condition *condition;/* Session's condition */ }; @@ -1389,7 +1388,6 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db, if (version == OVSDB_MONITOR_V2) { m->condition = ovsdb_monitor_session_condition_create(); } - m->unflushed = 0; m->version = version; hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0)); m->monitor_id = json_clone(monitor_id); @@ -1436,7 +1434,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db, dbmon = ovsdb_monitor_add(m->dbmon); if (dbmon != m->dbmon) { /* Found an exisiting dbmon, reuse the current one. */ - ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed); + ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, NULL); ovsdb_monitor_add_jsonrpc_monitor(dbmon, m); m->dbmon = dbmon; } @@ -1446,7 +1444,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db, ovsdb_monitor_condition_bind(m->dbmon, m->condition); } - ovsdb_monitor_get_initial(m->dbmon); + ovsdb_monitor_get_initial(m->dbmon, &m->change_set); json = ovsdb_jsonrpc_monitor_compose_update(m, true); json = json ? json : json_object_create(); return jsonrpc_create_reply(json, request_id); @@ -1578,7 +1576,7 @@ ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s, struct json *update_json; update_json = ovsdb_monitor_get_update(m->dbmon, false, true, - m->condition, m->version, &m->unflushed); + m->condition, m->version, &m->change_set); if (update_json) { struct jsonrpc_msg *msg; struct json *p; @@ -1648,12 +1646,12 @@ ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m, bool initial) { - if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) { + if (!ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) { return NULL; } return ovsdb_monitor_get_update(m->dbmon, initial, false, - m->condition, m->version, &m->unflushed); + m->condition, m->version, &m->change_set); } static bool @@ -1662,7 +1660,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s) struct ovsdb_jsonrpc_monitor *m; HMAP_FOR_EACH (m, node, &s->monitors) { - if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) { + if (ovsdb_monitor_needs_flush(m->dbmon, m->change_set)) { return true; } } @@ -1686,7 +1684,7 @@ ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m, json_destroy(m->monitor_id); hmap_remove(&m->session->monitors, &m->node); - ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed); + ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->change_set); ovsdb_monitor_session_condition_destroy(m->condition); free(m); } diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c index 1175e28..03be5dc 100644 --- a/ovsdb/monitor.c +++ b/ovsdb/monitor.c @@ -69,17 +69,31 @@ struct ovsdb_monitor { struct shash tables; /* Holds "struct ovsdb_monitor_table"s. */ struct ovs_list jsonrpc_monitors; /* Contains "jsonrpc_monitor_node"s. */ struct ovsdb *db; - uint64_t n_transactions; /* Count number of committed transactions. */ - struct hmap_node hmap_node; /* Elements within ovsdb_monitors. */ - struct hmap json_cache; /* Contains "ovsdb_monitor_json_cache_node"s.*/ + + /* Contains "ovsdb_monitor_change_set". Each change set contains changes + * from some start point up to the latest committed transaction. There can + * be different change sets for the same struct ovsdb_monitor because there + * are different clients pending on changes starting from different points. + * The different change sets are maintained as a list. */ + struct ovs_list change_sets; + + /* The new change set that is to be populated for future transactions. */ + struct ovsdb_monitor_change_set *new_change_set; + + /* The change set that starts from the first transaction of the DB, which + * is used for populating the initial data for new clients. */ + struct ovsdb_monitor_change_set *init_change_set; + + struct hmap_node hmap_node; /* Elements within ovsdb_monitors. */ + struct hmap json_cache; /* Contains "ovsdb_monitor_json_cache_node"s.*/ }; -/* A json object of updates between 'from_txn' and 'dbmon->n_transactions' - * inclusive. */ +/* A json object of updates for the ovsdb_monitor_change_set and the given + * monitor version. */ struct ovsdb_monitor_json_cache_node { struct hmap_node hmap_node; /* Elements in json cache. */ enum ovsdb_monitor_version version; - uint64_t from_txn; + struct uuid change_set_uuid; struct json *json; /* Null, or a cloned of json */ }; @@ -97,29 +111,48 @@ struct ovsdb_monitor_column { /* A row that has changed in a monitored table. */ struct ovsdb_monitor_row { - struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */ + struct hmap_node hmap_node; /* In ovsdb_monitor_change_set_for_table. */ struct uuid uuid; /* UUID of row that changed. */ struct ovsdb_datum *old; /* Old data, NULL for an inserted row. */ struct ovsdb_datum *new; /* New data, NULL for a deleted row. */ }; -/* Contains 'struct ovsdb_monitor_row's for rows that have been - * updated but not yet flushed to all the jsonrpc connection. - * - * 'n_refs' represent the number of jsonrpc connections that have - * not received updates. Generate the update for the last jsonprc - * connection will also destroy the whole "struct ovsdb_monitor_changes" - * object. +/* Contains a set of changes that are not yet flushed to all the jsonrpc + * connections. * - * 'transaction' stores the first update's transaction id. - * */ -struct ovsdb_monitor_changes { - struct hmap_node hmap_node; /* Element in ovsdb_monitor_tables' changes - hmap. */ + * 'n_refs' represent the number of jsonrpc connections that depend on this + * change set (have not received updates). Generate the update for the last + * jsonprc connection will also destroy the whole "struct + * ovsdb_monitor_change_set" object. + */ +struct ovsdb_monitor_change_set { + /* Element in change_sets of ovsdb_monitor. */ + struct ovs_list list_node; + + /* Internally generated uuid that identifies this data structure. */ + struct uuid uuid; + + /* Contains struct ovsdb_monitor_change_set_for_table. */ + struct ovs_list change_set_for_tables; + + int n_refs; +}; + +/* Contains 'struct ovsdb_monitor_row's for rows in a specific table + * of struct ovsdb_monitor_change_set. It can also be searched from + * member 'change_sets' of struct ovsdb_monitor_table. */ +struct ovsdb_monitor_change_set_for_table { + /* Element in ovsdb_monitor_tables' change_sets list. */ + struct ovs_list list_in_mt; + + /* Element in ovsdb_monitor_change_sets' change_set_for_tables list. */ + struct ovs_list list_in_change_set; + struct ovsdb_monitor_table *mt; + struct ovsdb_monitor_change_set *mcs; + + /* Contains struct ovsdb_monitor_row. */ struct hmap rows; - int n_refs; - uint64_t transaction; /* Save the mt->n_columns that is used when creating the changes. * It can be different from the current mt->n_columns because @@ -147,8 +180,8 @@ struct ovsdb_monitor_table { * ovsdb_monitor_row. It is used for condition evaluation. */ unsigned int *columns_index_map; - /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */ - struct hmap changes; + /* Contains 'ovsdb_monitor_change_set_for_table'. */ + struct ovs_list change_sets; }; enum ovsdb_monitor_row_type { @@ -166,36 +199,30 @@ typedef struct json * size_t n_columns); static void ovsdb_monitor_destroy(struct ovsdb_monitor *); -static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes( - struct ovsdb_monitor_table *, uint64_t next_txn); -static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes( - struct ovsdb_monitor_table *, uint64_t unflushed); -static void ovsdb_monitor_changes_destroy( - struct ovsdb_monitor_changes *); -static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *, - uint64_t unflushed); +static struct ovsdb_monitor_change_set * ovsdb_monitor_add_change_set( + struct ovsdb_monitor *, bool init_only); +static void ovsdb_monitor_change_set_destroy( + struct ovsdb_monitor_change_set *); +static void ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *); static uint32_t -json_cache_hash(enum ovsdb_monitor_version version, uint64_t from_txn) +json_cache_hash(enum ovsdb_monitor_version version, + struct ovsdb_monitor_change_set *change_set) { - uint32_t hash; - - hash = hash_uint64(version); - hash = hash_uint64_basis(from_txn, hash); - - return hash; + return hash_uint64_basis(version, uuid_hash(&change_set->uuid)); } static struct ovsdb_monitor_json_cache_node * ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon, enum ovsdb_monitor_version version, - uint64_t from_txn) + struct ovsdb_monitor_change_set *change_set) { struct ovsdb_monitor_json_cache_node *node; - uint32_t hash = json_cache_hash(version, from_txn); + uint32_t hash = json_cache_hash(version, change_set); HMAP_FOR_EACH_WITH_HASH(node, hmap_node, hash, &dbmon->json_cache) { - if (node->from_txn == from_txn && node->version == version) { + if (uuid_equals(&node->change_set_uuid, &change_set->uuid) && + node->version == version) { return node; } } @@ -206,15 +233,16 @@ ovsdb_monitor_json_cache_search(const struct ovsdb_monitor *dbmon, static void ovsdb_monitor_json_cache_insert(struct ovsdb_monitor *dbmon, enum ovsdb_monitor_version version, - uint64_t from_txn, struct json *json) + struct ovsdb_monitor_change_set *change_set, + struct json *json) { struct ovsdb_monitor_json_cache_node *node; - uint32_t hash = json_cache_hash(version, from_txn); + uint32_t hash = json_cache_hash(version, change_set); node = xmalloc(sizeof *node); node->version = version; - node->from_txn = from_txn; + node->change_set_uuid = change_set->uuid; node->json = json ? json_clone(json) : NULL; hmap_insert(&dbmon->json_cache, &node->hmap_node, hash); @@ -231,6 +259,23 @@ ovsdb_monitor_json_cache_flush(struct ovsdb_monitor *dbmon) } } +/* Free all versions of json cache for a given change_set.*/ +static void +ovsdb_monitor_json_cache_destroy(struct ovsdb_monitor *dbmon, + struct ovsdb_monitor_change_set *change_set) +{ + enum ovsdb_monitor_version v; + for (v = OVSDB_MONITOR_V1; v < OVSDB_MONITOR_VERSION_MAX; v++) { + struct ovsdb_monitor_json_cache_node *node + = ovsdb_monitor_json_cache_search(dbmon, v, change_set); + if (node) { + hmap_remove(&dbmon->json_cache, &node->hmap_node); + json_destroy(node->json); + free(node); + } + } +} + static int compare_ovsdb_monitor_column(const void *a_, const void *b_) { @@ -248,8 +293,9 @@ compare_ovsdb_monitor_column(const void *a_, const void *b_) /* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the * given 'uuid', or NULL if there is no such row. */ static struct ovsdb_monitor_row * -ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes, - const struct uuid *uuid) +ovsdb_monitor_changes_row_find( + const struct ovsdb_monitor_change_set_for_table *changes, + const struct uuid *uuid) { struct ovsdb_monitor_row *row; @@ -386,7 +432,7 @@ ovsdb_monitor_create(struct ovsdb *db, ovs_list_push_back(&db->monitors, &dbmon->list_node); ovs_list_init(&dbmon->jsonrpc_monitors); dbmon->db = db; - dbmon->n_transactions = 0; + ovs_list_init(&dbmon->change_sets); shash_init(&dbmon->tables); hmap_node_nullify(&dbmon->hmap_node); hmap_init(&dbmon->json_cache); @@ -406,7 +452,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m, mt = xzalloc(sizeof *mt); mt->table = table; shash_add(&m->tables, table->schema->name, mt); - hmap_init(&mt->changes); + ovs_list_init(&mt->change_sets); mt->columns_index_map = xmalloc(sizeof *mt->columns_index_map * n_columns); for (i = 0; i < n_columns; i++) { @@ -492,82 +538,85 @@ ovsdb_monitor_table_exists(struct ovsdb_monitor *m, return shash_find_data(&m->tables, table->schema->name); } -static struct ovsdb_monitor_changes * -ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt, - uint64_t next_txn) +static struct ovsdb_monitor_change_set * +ovsdb_monitor_add_change_set(struct ovsdb_monitor *dbmon, + bool init_only) { - struct ovsdb_monitor_changes *changes; - - changes = xzalloc(sizeof *changes); - - changes->transaction = next_txn; - changes->mt = mt; - changes->n_refs = 1; - changes->n_columns = mt->n_columns; - hmap_init(&changes->rows); - hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn)); + struct ovsdb_monitor_change_set *change_set = xzalloc(sizeof *change_set); + change_set->uuid = uuid_random(); + ovs_list_push_back(&(dbmon->change_sets), &change_set->list_node); + ovs_list_init(&change_set->change_set_for_tables); + change_set->n_refs = 1; - return changes; -}; - -static struct ovsdb_monitor_changes * -ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt, - uint64_t transaction) -{ - struct ovsdb_monitor_changes *changes; - size_t hash = hash_uint64(transaction); - - HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) { - if (changes->transaction == transaction) { - return changes; + struct shash_node *node; + SHASH_FOR_EACH (node, &dbmon->tables) { + struct ovsdb_monitor_table *mt = node->data; + if (!init_only || (mt->select & OJMS_INITIAL)) { + struct ovsdb_monitor_change_set_for_table *mcst = + xzalloc(sizeof *mcst); + mcst->mt = mt; + mcst->n_columns = mt->n_columns; + mcst->mcs = change_set; + hmap_init(&mcst->rows); + ovs_list_push_back(&mt->change_sets, &mcst->list_in_mt); + ovs_list_push_back(&change_set->change_set_for_tables, + &mcst->list_in_change_set); } } - return NULL; -} + return change_set; +}; -/* Stop currently tracking changes to table 'mt' since 'transaction'. */ static void -ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt, - uint64_t transaction) +ovsdb_monitor_untrack_change_set(struct ovsdb_monitor *dbmon, + struct ovsdb_monitor_change_set *mcs) { - struct ovsdb_monitor_changes *changes = - ovsdb_monitor_table_find_changes(mt, transaction); - if (changes) { - if (--changes->n_refs == 0) { - hmap_remove(&mt->changes, &changes->hmap_node); - ovsdb_monitor_changes_destroy(changes); + ovs_assert(mcs); + if (--mcs->n_refs == 0) { + if (mcs == dbmon->init_change_set) { + dbmon->init_change_set = NULL; + } else if (mcs == dbmon->new_change_set) { + dbmon->new_change_set = NULL; } + ovsdb_monitor_json_cache_destroy(dbmon, mcs); + ovsdb_monitor_change_set_destroy(mcs); } } -/* Start tracking changes to table 'mt' begins from 'transaction' inclusive. - */ static void -ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt, - uint64_t transaction) +ovsdb_monitor_track_new_change_set(struct ovsdb_monitor *dbmon) { - struct ovsdb_monitor_changes *changes; + struct ovsdb_monitor_change_set *change_set = dbmon->new_change_set; - changes = ovsdb_monitor_table_find_changes(mt, transaction); - if (changes) { - changes->n_refs++; + if (change_set) { + change_set->n_refs++; } else { - ovsdb_monitor_table_add_changes(mt, transaction); + change_set = ovsdb_monitor_add_change_set(dbmon, false); + dbmon->new_change_set = change_set; } } static void -ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes) +ovsdb_monitor_change_set_destroy(struct ovsdb_monitor_change_set *mcs) { - struct ovsdb_monitor_row *row, *next; + ovs_list_remove(&mcs->list_node); - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) { - hmap_remove(&changes->rows, &row->hmap_node); - ovsdb_monitor_row_destroy(changes->mt, row, changes->n_columns); + struct ovsdb_monitor_change_set_for_table *mcst, *next_mcst; + LIST_FOR_EACH_SAFE (mcst, next_mcst, list_in_change_set, + &mcs->change_set_for_tables) { + ovs_list_remove(&mcst->list_in_change_set); + ovs_list_remove(&mcst->list_in_mt); + + struct ovsdb_monitor_row *row, *next; + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) { + hmap_remove(&mcst->rows, &row->hmap_node); + ovsdb_monitor_row_destroy(mcst->mt, row, mcst->n_columns); + } + hmap_destroy(&mcst->rows); + + free(mcst); } - hmap_destroy(&changes->rows); - free(changes); + free(mcs); } static enum ovsdb_monitor_selection @@ -1025,31 +1074,25 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name, static struct json* ovsdb_monitor_compose_update( struct ovsdb_monitor *dbmon, - bool initial, uint64_t transaction, + bool initial, struct ovsdb_monitor_change_set *mcs, const struct ovsdb_monitor_session_condition *condition, compose_row_update_cb_func row_update) { - struct shash_node *node; struct json *json; size_t max_columns = ovsdb_monitor_max_columns(dbmon); unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns)); json = NULL; - SHASH_FOR_EACH (node, &dbmon->tables) { - struct ovsdb_monitor_table *mt = node->data; + struct ovsdb_monitor_change_set_for_table *mcst; + LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) { struct ovsdb_monitor_row *row, *next; - struct ovsdb_monitor_changes *changes; struct json *table_json = NULL; + struct ovsdb_monitor_table *mt = mcst->mt; - changes = ovsdb_monitor_table_find_changes(mt, transaction); - if (!changes) { - continue; - } - - HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) { + HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) { struct json *row_json; row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row, - initial, changed, changes->n_columns); + initial, changed, mcst->n_columns); if (row_json) { ovsdb_monitor_add_json_row(&json, mt->table->schema->name, &table_json, row_json, @@ -1109,8 +1152,9 @@ ovsdb_monitor_compose_cond_change_update( } /* Returns JSON for a object (as described in RFC 7047) - * for all the outstanding changes within 'monitor' that starts from - * '*unflushed'. + * for all the outstanding changes in dbmon that are tracked by the change set + * *p_mcs. + * * If cond_updated is true all rows in the db that match conditions will be * sent. * @@ -1123,39 +1167,37 @@ ovsdb_monitor_get_update( bool initial, bool cond_updated, struct ovsdb_monitor_session_condition *condition, enum ovsdb_monitor_version version, - uint64_t *unflushed_) + struct ovsdb_monitor_change_set **p_mcs) { struct ovsdb_monitor_json_cache_node *cache_node = NULL; - struct shash_node *node; struct json *json; - const uint64_t unflushed = *unflushed_; - const uint64_t next_unflushed = dbmon->n_transactions + 1; + struct ovsdb_monitor_change_set *mcs = *p_mcs; - ovs_assert(cond_updated ? unflushed == next_unflushed : true); + ovs_assert(cond_updated ? mcs == dbmon->new_change_set : true); /* Return a clone of cached json if one exists. Otherwise, * generate a new one and add it to the cache. */ if (!condition || (!condition->conditional && !cond_updated)) { cache_node = ovsdb_monitor_json_cache_search(dbmon, version, - unflushed); + mcs); } if (cache_node) { json = cache_node->json ? json_clone(cache_node->json) : NULL; } else { if (version == OVSDB_MONITOR_V1) { json = - ovsdb_monitor_compose_update(dbmon, initial, unflushed, + ovsdb_monitor_compose_update(dbmon, initial, mcs, condition, ovsdb_monitor_compose_row_update); } else { ovs_assert(version == OVSDB_MONITOR_V2); if (!cond_updated) { - json = ovsdb_monitor_compose_update(dbmon, initial, unflushed, + json = ovsdb_monitor_compose_update(dbmon, initial, mcs, condition, ovsdb_monitor_compose_row_update2); if (!condition || !condition->conditional) { - ovsdb_monitor_json_cache_insert(dbmon, version, unflushed, + ovsdb_monitor_json_cache_insert(dbmon, version, mcs, json); } } else { @@ -1167,24 +1209,20 @@ ovsdb_monitor_get_update( } } - /* Maintain transaction id of 'changes'. */ - SHASH_FOR_EACH (node, &dbmon->tables) { - struct ovsdb_monitor_table *mt = node->data; - - ovsdb_monitor_table_untrack_changes(mt, unflushed); - ovsdb_monitor_table_track_changes(mt, next_unflushed); - } - *unflushed_ = next_unflushed; + /* Maintain tracking change set. */ + ovsdb_monitor_untrack_change_set(dbmon, mcs); + ovsdb_monitor_track_new_change_set(dbmon); + *p_mcs = dbmon->new_change_set; return json; } bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon, - uint64_t next_transaction) + struct ovsdb_monitor_change_set *change_set) { - ovs_assert(next_transaction <= dbmon->n_transactions + 1); - return (next_transaction <= dbmon->n_transactions); + ovs_assert(change_set); + return (change_set != dbmon->new_change_set); } void @@ -1243,18 +1281,18 @@ static void ovsdb_monitor_changes_update(const struct ovsdb_row *old, const struct ovsdb_row *new, const struct ovsdb_monitor_table *mt, - struct ovsdb_monitor_changes *changes) + struct ovsdb_monitor_change_set_for_table *mcst) { const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old); struct ovsdb_monitor_row *change; - change = ovsdb_monitor_changes_row_find(changes, uuid); + change = ovsdb_monitor_changes_row_find(mcst, uuid); if (!change) { change = xzalloc(sizeof *change); - hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid)); + hmap_insert(&mcst->rows, &change->hmap_node, uuid_hash(uuid)); change->uuid = *uuid; - change->old = clone_monitor_row_data(mt, old, changes->n_columns); - change->new = clone_monitor_row_data(mt, new, changes->n_columns); + change->old = clone_monitor_row_data(mt, old, mcst->n_columns); + change->new = clone_monitor_row_data(mt, new, mcst->n_columns); } else { if (new) { if (!change->new) { @@ -1293,19 +1331,17 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old, * replication, the row carries the same UUID as the row * just deleted. */ - change->new = clone_monitor_row_data(mt, new, - changes->n_columns); + change->new = clone_monitor_row_data(mt, new, mcst->n_columns); } else { - update_monitor_row_data(mt, new, change->new, - changes->n_columns); + update_monitor_row_data(mt, new, change->new, mcst->n_columns); } } else { - free_monitor_row_data(mt, change->new, changes->n_columns); + free_monitor_row_data(mt, change->new, mcst->n_columns); change->new = NULL; if (!change->old) { /* This row was added then deleted. Forget about it. */ - hmap_remove(&changes->rows, &change->hmap_node); + hmap_remove(&mcst->rows, &change->hmap_node); free(change); } } @@ -1363,7 +1399,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, const struct ovsdb_monitor *m = aux->monitor; struct ovsdb_table *table = new ? new->table : old->table; struct ovsdb_monitor_table *mt; - struct ovsdb_monitor_changes *changes; + struct ovsdb_monitor_change_set_for_table *mcst; if (!aux->mt || table != aux->mt->table) { aux->mt = shash_find_data(&m->tables, table->schema->name); @@ -1380,9 +1416,9 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, enum ovsdb_monitor_changes_efficacy efficacy = ovsdb_monitor_changes_classify(type, mt, changed); - HMAP_FOR_EACH(changes, hmap_node, &mt->changes) { - if (efficacy > OVSDB_CHANGES_NO_EFFECT) { - ovsdb_monitor_changes_update(old, new, mt, changes); + if (efficacy > OVSDB_CHANGES_NO_EFFECT) { + LIST_FOR_EACH (mcst, list_in_mt, &mt->change_sets) { + ovsdb_monitor_changes_update(old, new, mt, mcst); } } if (aux->efficacy < efficacy) { @@ -1393,34 +1429,35 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old, } void -ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon) +ovsdb_monitor_get_initial(struct ovsdb_monitor *dbmon, + struct ovsdb_monitor_change_set **p_mcs) { - struct shash_node *node; - - SHASH_FOR_EACH (node, &dbmon->tables) { - struct ovsdb_monitor_table *mt = node->data; - - if (mt->select & OJMS_INITIAL) { - struct ovsdb_row *row; - struct ovsdb_monitor_changes *changes; - - changes = ovsdb_monitor_table_find_changes(mt, 0); - if (!changes) { - changes = ovsdb_monitor_table_add_changes(mt, 0); - HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) { - ovsdb_monitor_changes_update(NULL, row, mt, changes); + if (!dbmon->init_change_set) { + struct ovsdb_monitor_change_set *change_set = + ovsdb_monitor_add_change_set(dbmon, true); + dbmon->init_change_set = change_set; + + struct ovsdb_monitor_change_set_for_table *mcst; + LIST_FOR_EACH (mcst, list_in_change_set, + &change_set->change_set_for_tables) { + if (mcst->mt->select & OJMS_INITIAL) { + struct ovsdb_row *row; + HMAP_FOR_EACH (row, hmap_node, &mcst->mt->table->rows) { + ovsdb_monitor_changes_update(NULL, row, mcst->mt, mcst); } - } else { - changes->n_refs++; } } + } else { + dbmon->init_change_set->n_refs++; } + + *p_mcs = dbmon->init_change_set; } void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon, struct ovsdb_jsonrpc_monitor *jsonrpc_monitor, - uint64_t unflushed) + struct ovsdb_monitor_change_set *change_set) { struct jsonrpc_monitor_node *jm; @@ -1433,10 +1470,8 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon, LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) { if (jm->jsonrpc_monitor == jsonrpc_monitor) { /* Release the tracked changes. */ - struct shash_node *node; - SHASH_FOR_EACH (node, &dbmon->tables) { - struct ovsdb_monitor_table *mt = node->data; - ovsdb_monitor_table_untrack_changes(mt, unflushed); + if (change_set) { + ovsdb_monitor_untrack_change_set(dbmon, change_set); } ovs_list_remove(&jm->node); free(jm); @@ -1567,15 +1602,14 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon) ovsdb_monitor_json_cache_flush(dbmon); hmap_destroy(&dbmon->json_cache); + struct ovsdb_monitor_change_set *cs, *cs_next; + LIST_FOR_EACH_SAFE (cs, cs_next, list_node, &dbmon->change_sets) { + ovsdb_monitor_change_set_destroy(cs); + } + SHASH_FOR_EACH (node, &dbmon->tables) { struct ovsdb_monitor_table *mt = node->data; - struct ovsdb_monitor_changes *changes, *next; - - HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) { - hmap_remove(&mt->changes, &changes->hmap_node); - ovsdb_monitor_changes_destroy(changes); - } - hmap_destroy(&mt->changes); + ovs_assert(ovs_list_is_empty(&mt->change_sets)); free(mt->columns); free(mt->columns_index_map); free(mt); @@ -1590,24 +1624,17 @@ ovsdb_monitor_commit(struct ovsdb_monitor *m, const struct ovsdb_txn *txn) struct ovsdb_monitor_aux aux; ovsdb_monitor_init_aux(&aux, m); - /* Update ovsdb_monitor's transaction number for - * each transaction, before calling ovsdb_monitor_change_cb(). */ - m->n_transactions++; ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux); - switch(aux.efficacy) { - case OVSDB_CHANGES_NO_EFFECT: - /* The transaction is ignored by the monitor. - * Roll back the 'n_transactions' as if the transaction - * has never happened. */ - m->n_transactions--; - break; - case OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE: - /* Nothing. */ - break; - case OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE: - ovsdb_monitor_json_cache_flush(m); - break; + if (aux.efficacy > OVSDB_CHANGES_NO_EFFECT) { + /* The transaction is has impact to the monitor. + * Reset new_change_set, so that a new change set will be + * created for future trackings. */ + m->new_change_set = NULL; + + if (aux.efficacy == OVSDB_CHANGES_REQUIRE_EXTERNAL_UPDATE) { + ovsdb_monitor_json_cache_flush(m); + } } } diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h index 9bc0613..112c672 100644 --- a/ovsdb/monitor.h +++ b/ovsdb/monitor.h @@ -56,9 +56,10 @@ struct ovsdb_monitor *ovsdb_monitor_add(struct ovsdb_monitor *); void ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *, struct ovsdb_jsonrpc_monitor *); +struct ovsdb_monitor_change_set; void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *, struct ovsdb_jsonrpc_monitor *, - uint64_t unflushed); + struct ovsdb_monitor_change_set *); void ovsdb_monitor_add_table(struct ovsdb_monitor *, const struct ovsdb_table *); @@ -77,16 +78,17 @@ struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *, bool cond_updated, struct ovsdb_monitor_session_condition *, enum ovsdb_monitor_version, - uint64_t *unflushed_transaction); + struct ovsdb_monitor_change_set **p_mcs); void ovsdb_monitor_table_add_select(struct ovsdb_monitor *, const struct ovsdb_table *, enum ovsdb_monitor_selection); bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *, - uint64_t next_transaction); + struct ovsdb_monitor_change_set *); -void ovsdb_monitor_get_initial(const struct ovsdb_monitor *); +void ovsdb_monitor_get_initial(struct ovsdb_monitor *, + struct ovsdb_monitor_change_set **); void ovsdb_monitor_get_memory_usage(struct simap *);