@@ -120,6 +120,12 @@ struct ovsdb_monitor_changes {
struct hmap rows;
int n_refs;
uint64_t transaction;
+
+ /* Save the mt->n_columns that is used when creating the changes.
+ * It can be different from the current mt->n_columns because
+ * mt->n_columns can be increased when there are condition changes
+ * from any of the clients sharing the dbmon. */
+ size_t n_columns;
};
/* A particular table being monitored. */
@@ -156,7 +162,8 @@ typedef struct json *
const struct ovsdb_monitor_session_condition * condition,
enum ovsdb_monitor_row_type row_type,
const void *,
- bool initial, unsigned long int *changed);
+ bool initial, unsigned long int *changed,
+ size_t n_columns);
static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
@@ -255,14 +262,15 @@ ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
return NULL;
}
-/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
+/* Allocates an array of 'n_columns' ovsdb_datums and initializes them as
* copies of the data in 'row' drawn from the columns represented by
* mt->columns[]. Returns the array.
*
* If 'row' is NULL, returns NULL. */
static struct ovsdb_datum *
clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
- const struct ovsdb_row *row)
+ const struct ovsdb_row *row,
+ size_t n_columns)
{
struct ovsdb_datum *data;
size_t i;
@@ -271,8 +279,8 @@ clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
return NULL;
}
- data = xmalloc(mt->n_columns * sizeof *data);
- for (i = 0; i < mt->n_columns; i++) {
+ data = xmalloc(n_columns * sizeof *data);
+ for (i = 0; i < n_columns; i++) {
const struct ovsdb_column *c = mt->columns[i].column;
const struct ovsdb_datum *src = &row->fields[c->index];
struct ovsdb_datum *dst = &data[i];
@@ -283,16 +291,17 @@ clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
return data;
}
-/* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
+/* Replaces the n_columns ovsdb_datums in row[] by copies of the data from
* in 'row' drawn from the columns represented by mt->columns[]. */
static void
update_monitor_row_data(const struct ovsdb_monitor_table *mt,
const struct ovsdb_row *row,
- struct ovsdb_datum *data)
+ struct ovsdb_datum *data,
+ size_t n_columns)
{
size_t i;
- for (i = 0; i < mt->n_columns; i++) {
+ for (i = 0; i < n_columns; i++) {
const struct ovsdb_column *c = mt->columns[i].column;
const struct ovsdb_datum *src = &row->fields[c->index];
struct ovsdb_datum *dst = &data[i];
@@ -305,16 +314,17 @@ update_monitor_row_data(const struct ovsdb_monitor_table *mt,
}
}
-/* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
+/* Frees all of the n_columns ovsdb_datums in data[], using the types taken
* from mt->columns[], plus 'data' itself. */
static void
free_monitor_row_data(const struct ovsdb_monitor_table *mt,
- struct ovsdb_datum *data)
+ struct ovsdb_datum *data,
+ size_t n_columns)
{
if (data) {
size_t i;
- for (i = 0; i < mt->n_columns; i++) {
+ for (i = 0; i < n_columns; i++) {
const struct ovsdb_column *c = mt->columns[i].column;
ovsdb_datum_destroy(&data[i], &c->type);
@@ -326,11 +336,12 @@ free_monitor_row_data(const struct ovsdb_monitor_table *mt,
/* Frees 'row', which must have been created from 'mt'. */
static void
ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
- struct ovsdb_monitor_row *row)
+ struct ovsdb_monitor_row *row,
+ size_t n_columns)
{
if (row) {
- free_monitor_row_data(mt, row->old);
- free_monitor_row_data(mt, row->new);
+ free_monitor_row_data(mt, row->old, n_columns);
+ free_monitor_row_data(mt, row->new, n_columns);
free(row);
}
}
@@ -449,6 +460,7 @@ ovsdb_monitor_condition_add_columns(struct ovsdb_monitor *dbmon,
ovsdb_condition_get_columns(condition, &n_columns);
for (i = 0; i < n_columns; i++) {
+ //VLOG_WARN("dbmon %p, cond_add_column: %s, n_columns %"PRIuSIZE, dbmon, columns[i]->name, n_columns);
ovsdb_monitor_add_column(dbmon, table, columns[i],
OJMS_NONE, false);
}
@@ -492,6 +504,7 @@ ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
changes->transaction = next_txn;
changes->mt = mt;
changes->n_refs = 1;
+ changes->n_columns = mt->n_columns;
hmap_init(&changes->rows);
hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
@@ -522,7 +535,9 @@ ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
struct ovsdb_monitor_changes *changes =
ovsdb_monitor_table_find_changes(mt, transaction);
if (changes) {
+ //VLOG_WARN("untrack change: %"PRIu64", found changes %p, n_refs %d", transaction, changes, changes->n_refs);
if (--changes->n_refs == 0) {
+ //VLOG_WARN("untrack change: %"PRIu64", destroy changes %p, n_refs %d", transaction, changes, changes->n_refs);
hmap_remove(&mt->changes, &changes->hmap_node);
ovsdb_monitor_changes_destroy(changes);
}
@@ -540,8 +555,10 @@ ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
changes = ovsdb_monitor_table_find_changes(mt, transaction);
if (changes) {
changes->n_refs++;
+ //VLOG_WARN("track new change: %"PRIu64", found changes %p, n_refs %d", transaction, changes, changes->n_refs);
} else {
ovsdb_monitor_table_add_changes(mt, transaction);
+ //VLOG_WARN("track new change: %"PRIu64, transaction);
}
}
@@ -552,7 +569,7 @@ ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
hmap_remove(&changes->rows, &row->hmap_node);
- ovsdb_monitor_row_destroy(changes->mt, row);
+ ovsdb_monitor_row_destroy(changes->mt, row, changes->n_columns);
}
hmap_destroy(&changes->rows);
free(changes);
@@ -788,7 +805,8 @@ ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
const struct ovsdb_datum *old,
const struct ovsdb_datum *new,
enum ovsdb_monitor_selection type,
- unsigned long int *changed)
+ unsigned long int *changed,
+ size_t n_columns)
{
if (!(mt->select & type)) {
return true;
@@ -798,8 +816,8 @@ ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
size_t i, n_changes;
n_changes = 0;
- memset(changed, 0, bitmap_n_bytes(mt->n_columns));
- for (i = 0; i < mt->n_columns; i++) {
+ memset(changed, 0, bitmap_n_bytes(n_columns));
+ for (i = 0; i < n_columns; i++) {
const struct ovsdb_column *c = mt->columns[i].column;
size_t index = row_type == OVSDB_ROW ? c->index : i;
if (!ovsdb_datum_equals(&old[index], &new[index], &c->type)) {
@@ -825,14 +843,15 @@ ovsdb_monitor_row_skip_update(const struct ovsdb_monitor_table *mt,
* going to be used as part of an "update" notification.
*
* 'changed' must be a scratch buffer for internal use that is at least
- * bitmap_n_bytes(mt->n_columns) bytes long. */
+ * bitmap_n_bytes(n_columns) bytes long. */
static struct json *
ovsdb_monitor_compose_row_update(
const struct ovsdb_monitor_table *mt,
const struct ovsdb_monitor_session_condition *condition OVS_UNUSED,
enum ovsdb_monitor_row_type row_type OVS_UNUSED,
const void *_row,
- bool initial, unsigned long int *changed)
+ bool initial, unsigned long int *changed,
+ size_t n_columns OVS_UNUSED)
{
const struct ovsdb_monitor_row *row = _row;
enum ovsdb_monitor_selection type;
@@ -843,7 +862,8 @@ ovsdb_monitor_compose_row_update(
ovs_assert(row_type == OVSDB_MONITOR_ROW);
type = ovsdb_monitor_row_update_type(initial, row->old, row->new);
if (ovsdb_monitor_row_skip_update(mt, row_type, row->old,
- row->new, type, changed)) {
+ row->new, type, changed,
+ mt->n_columns)) {
return NULL;
}
@@ -891,14 +911,15 @@ ovsdb_monitor_compose_row_update(
* false if it is going to be used as part of an "update2" notification.
*
* 'changed' must be a scratch buffer for internal use that is at least
- * bitmap_n_bytes(mt->n_columns) bytes long. */
+ * bitmap_n_bytes(n_columns) bytes long. */
static struct json *
ovsdb_monitor_compose_row_update2(
const struct ovsdb_monitor_table *mt,
const struct ovsdb_monitor_session_condition *condition,
enum ovsdb_monitor_row_type row_type,
const void *_row,
- bool initial, unsigned long int *changed)
+ bool initial, unsigned long int *changed,
+ size_t n_columns)
{
enum ovsdb_monitor_selection type;
struct json *row_update2, *diff_json;
@@ -914,7 +935,8 @@ ovsdb_monitor_compose_row_update2(
type = ovsdb_monitor_row_update_type_condition(mt, condition, initial,
row_type, old, new);
- if (ovsdb_monitor_row_skip_update(mt, row_type, old, new, type, changed)) {
+ if (ovsdb_monitor_row_skip_update(mt, row_type, old, new, type, changed,
+ n_columns)) {
return NULL;
}
@@ -1032,7 +1054,7 @@ ovsdb_monitor_compose_update(
HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
struct json *row_json;
row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
- initial, changed);
+ initial, changed, changes->n_columns);
if (row_json) {
ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
&table_json, row_json,
@@ -1076,7 +1098,8 @@ ovsdb_monitor_compose_cond_change_update(
row_json = ovsdb_monitor_compose_row_update2(mt, condition,
OVSDB_ROW, row,
- false, changed);
+ false, changed,
+ mt->n_columns);
if (row_json) {
ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
&table_json, row_json,
@@ -1235,8 +1258,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
change = xzalloc(sizeof *change);
hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
change->uuid = *uuid;
- change->old = clone_monitor_row_data(mt, old);
- change->new = clone_monitor_row_data(mt, new);
+ change->old = clone_monitor_row_data(mt, old, changes->n_columns);
+ change->new = clone_monitor_row_data(mt, new, changes->n_columns);
} else {
if (new) {
if (!change->new) {
@@ -1275,12 +1298,14 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
* replication, the row carries the same UUID as the row
* just deleted.
*/
- change->new = clone_monitor_row_data(mt, new);
+ change->new = clone_monitor_row_data(mt, new,
+ changes->n_columns);
} else {
- update_monitor_row_data(mt, new, change->new);
+ update_monitor_row_data(mt, new, change->new,
+ changes->n_columns);
}
} else {
- free_monitor_row_data(mt, change->new);
+ free_monitor_row_data(mt, change->new, changes->n_columns);
change->new = NULL;
if (!change->old) {
@@ -1589,6 +1614,7 @@ ovsdb_monitor_commit(struct ovsdb_monitor *m, const struct ovsdb_txn *txn)
ovsdb_monitor_json_cache_flush(m);
break;
}
+ //VLOG_WARN("monitor commit: m_transactions: %"PRIu64, m->n_transactions);
}
void
@@ -589,3 +589,71 @@ row,action,name,number,_version
[[[["name","==","one"]]]],
[[[false]]],
[[[true]]]])
+
+
+AT_SETUP(monitor-cond-change with many sessions pending)
+AT_KEYWORDS([ovsdb server monitor monitor-cond negative])
+ordinal_schema > schema
+AT_CHECK([ovsdb-tool create db schema], [0], [stdout], [ignore])
+
+AT_CAPTURE_FILE([ovsdb-server-log])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --remote=punix:socket --log-file="`pwd`"/ovsdb-server-log db >/dev/null 2>&1])
+on_exit 'kill `cat ovsdb-server.pid`'
+for txn in m4_foreach([txn], [[[["ordinals",
+ {"op": "insert",
+ "table": "ordinals",
+ "row": {"number": 0, "name": "zero"}},
+ {"op": "insert",
+ "table": "ordinals",
+ "row": {"number": 1, "name": "one"}},
+ {"op": "insert",
+ "table": "ordinals",
+ "row": {"number": 2, "name": "two"}}]]]], ['txn' ]); do
+ AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0], [ignore], [ignore])
+done
+
+# 1001 clients monitoring column "name" and with condition for "name" only.
+# The clients are created in a way that the 991th client will request condition
+# change, so that the chance is high that the condition change will be handled
+# before some pending changes are freed.
+
+cond='[[["name","==","ten"]]]'
+for i in `seq 1 990`; do
+ AT_CHECK([ovsdb-client -vjsonrpc --pidfile=ovsdb-client$i.pid --detach --no-chdir -d json monitor-cond --format=csv unix:socket ordinals $cond ordinals ["name"]], [0], [ignore], [ignore])
+done
+
+AT_CHECK([ovsdb-client -vjsonrpc --pidfile --detach --no-chdir -d json monitor-cond --format=csv unix:socket ordinals $cond ordinals ["name"] > output],
+ [0], [ignore], [ignore])
+
+for i in `seq 991 1000`; do
+ AT_CHECK([ovsdb-client -vjsonrpc --pidfile=ovsdb-client$i.pid --detach --no-chdir -d json monitor-cond --format=csv unix:socket ordinals $cond ordinals ["name"]], [0], [ignore], [ignore])
+done
+
+for txn in m4_foreach([txn], [[[["ordinals",
+ {"op": "insert",
+ "table": "ordinals",
+ "row": {"number": 10, "name": "ten"}}]]]], ['txn' ]); do
+ AT_CHECK([ovsdb-client transact unix:socket "$txn"], [0],
+ [ignore], [ignore], [kill `cat server-pid client-pid`])
+done
+
+# Change the condition so that a new column "number" is added to monitor table.
+cond='[[["number","==",1]]]'
+AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/cond_change ordinals $cond], [0], [ignore], [ignore])
+
+# Give some time for the server to flush and free pending changes
+# (to crash, when n_columns is not handled properly)
+sleep 1
+
+AT_CHECK([ovsdb-client transact unix:socket '[["ordinals"]]'], [0],
+ [ignore], [ignore])
+AT_CHECK([ovs-appctl -t ovsdb-server -e exit], [0], [ignore], [ignore])
+OVS_WAIT_UNTIL([test ! -e ovsdb-server.pid && test ! -e ovsdb-client.pid])
+AT_CHECK([$PYTHON $srcdir/ovsdb-monitor-sort.py < output | uuidfilt], [0], [[row,action,name
+<0>,insert,"""ten"""
+
+row,action,name
+<0>,delete,
+<1>,insert,"""one"""
+]], [ignore])
+AT_CLEANUP