diff mbox series

[ovs-dev,2/2] ovsdb: Prepare snapshot JSON in a separate thread.

Message ID 20220630233407.623049-3-i.maximets@ovn.org
State Accepted
Commit 3cd2cbd684e023682d04dd11d2640b53e4725790
Headers show
Series ovsdb: datum lazy-copy + parallel snapshot preparation. | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed
ovsrobot/intel-ovs-compilation success test: success

Commit Message

Ilya Maximets June 30, 2022, 11:34 p.m. UTC
Conversion of the database data into JSON object, serialization
and destruction of that object are the most heavy operations
during the database compaction.  If these operations are moved
to a separate thread, the main thread can continue processing
database requests in the meantime.

With this change, the compaction is split in 3 phases:

1. Initialization:
   - Create a copy of the database.
   - Remember current database index.
   - Start a separate thread to convert a copy of the database
     into serialized JSON object.

2. Wait:
   - Continue normal operation until compaction thread is done.
   - Meanwhile, compaction thread:
     * Convert database copy to JSON.
     * Serialize resulted JSON.
     * Destroy original JSON object.

3. Finish:
   - Destroy the database copy.
   - Take the snapshot created by the thread.
   - Write on disk.

The key for this schema to be fast is the ability to create
a shallow copy of the database.  This doesn't take too much
time allowing the thread to do most of work.

Database copy is created and destroyed only by the main thread,
so there is no need for synchronization.

Such solution allows to reduce the time main thread is blocked
by compaction by 80-90%.  For example, in ovn-heater tests
with 120 node density-heavy scenario, where compaction normally
takes 5-6 seconds at the end of a test, measured compaction
times was all below 1 second with the change applied.  Also,
note that these measured times are the sum of phases 1 and 3,
so actual poll intervals are about half a second in this case.

Only implemented for raft storage for now.  The implementation
for standalone databases can be added later by using a file
offset as a database index and copying newly added changes
from the old file to a new one during ovsdb_log_replace().

Reported-at: https://bugzilla.redhat.com/2069108
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
---
 ovsdb/ovsdb-server.c |  18 +++++-
 ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
 ovsdb/ovsdb.h        |  24 ++++++++
 ovsdb/raft.c         |   8 ++-
 ovsdb/raft.h         |   3 +-
 ovsdb/row.c          |  17 +++++
 ovsdb/row.h          |   1 +
 ovsdb/storage.c      |  11 ++--
 ovsdb/storage.h      |   3 +-
 9 files changed, 204 insertions(+), 24 deletions(-)

Comments

Dumitru Ceara July 1, 2022, 3:31 p.m. UTC | #1
On 7/1/22 01:34, Ilya Maximets wrote:
> Conversion of the database data into JSON object, serialization
> and destruction of that object are the most heavy operations
> during the database compaction.  If these operations are moved
> to a separate thread, the main thread can continue processing
> database requests in the meantime.
> 

Hi Ilya,

> With this change, the compaction is split in 3 phases:
> 
> 1. Initialization:
>    - Create a copy of the database.
>    - Remember current database index.
>    - Start a separate thread to convert a copy of the database
>      into serialized JSON object.
> 
> 2. Wait:
>    - Continue normal operation until compaction thread is done.
>    - Meanwhile, compaction thread:
>      * Convert database copy to JSON.
>      * Serialize resulted JSON.
>      * Destroy original JSON object.
> 
> 3. Finish:
>    - Destroy the database copy.
>    - Take the snapshot created by the thread.
>    - Write on disk.
> 

The approach sounds good to me.

> The key for this schema to be fast is the ability to create
> a shallow copy of the database.  This doesn't take too much
> time allowing the thread to do most of work.
> 
> Database copy is created and destroyed only by the main thread,
> so there is no need for synchronization.
> 
> Such solution allows to reduce the time main thread is blocked
> by compaction by 80-90%.  For example, in ovn-heater tests
> with 120 node density-heavy scenario, where compaction normally
> takes 5-6 seconds at the end of a test, measured compaction
> times was all below 1 second with the change applied.  Also,
> note that these measured times are the sum of phases 1 and 3,
> so actual poll intervals are about half a second in this case.

Nice!

> 
> Only implemented for raft storage for now.  The implementation
> for standalone databases can be added later by using a file
> offset as a database index and copying newly added changes
> from the old file to a new one during ovsdb_log_replace().
> 
> Reported-at: https://bugzilla.redhat.com/2069108
> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
> ---
>  ovsdb/ovsdb-server.c |  18 +++++-
>  ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
>  ovsdb/ovsdb.h        |  24 ++++++++
>  ovsdb/raft.c         |   8 ++-
>  ovsdb/raft.h         |   3 +-
>  ovsdb/row.c          |  17 +++++
>  ovsdb/row.h          |   1 +
>  ovsdb/storage.c      |  11 ++--
>  ovsdb/storage.h      |   3 +-
>  9 files changed, 204 insertions(+), 24 deletions(-)
> 
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index 5549b4e3a..eae2f6679 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -252,7 +252,9 @@ main_loop(struct server_config *config,
>                  remove_db(config, node,
>                            xasprintf("removing database %s because storage "
>                                      "disconnected permanently", node->name));
> -            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
> +            } else if (!ovsdb_snapshot_in_progress(db->db)
> +                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
> +                           ovsdb_snapshot_ready(db->db))) {
>                  log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
>              }
>          }
> @@ -287,6 +289,7 @@ main_loop(struct server_config *config,
>              ovsdb_trigger_wait(db->db, time_msec());
>              ovsdb_storage_wait(db->db->storage);
>              ovsdb_storage_read_wait(db->db->storage);
> +            ovsdb_snapshot_wait(db->db);
>          }
>          if (run_process) {
>              process_wait(run_process);
> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc,
>              ? !strcmp(node->name, db_name)
>              : node->name[0] != '_') {
>              if (db->db) {
> +                struct ovsdb_error *error = NULL;
> +
>                  VLOG_INFO("compacting %s database by user request",
>                            node->name);
>  
> -                struct ovsdb_error *error = ovsdb_snapshot(db->db,
> -                                                           trim_memory);
> +                error = ovsdb_snapshot(db->db, trim_memory);
> +                if (!error && ovsdb_snapshot_in_progress(db->db)) {
> +                    while (ovsdb_snapshot_in_progress(db->db)) {
> +                        ovsdb_snapshot_wait(db->db);
> +                        poll_block();
> +                    }
> +                    error = ovsdb_snapshot(db->db, trim_memory);
> +                }
> +
>                  if (error) {
>                      char *s = ovsdb_error_to_string(error);
>                      ds_put_format(&reply, "%s\n", s);
> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
> index 91b4a01af..8cbefbe3d 100644
> --- a/ovsdb/ovsdb.c
> +++ b/ovsdb/ovsdb.c
> @@ -25,9 +25,13 @@
>  #include "file.h"
>  #include "monitor.h"
>  #include "openvswitch/json.h"
> +#include "openvswitch/poll-loop.h"
> +#include "ovs-thread.h"
>  #include "ovsdb-error.h"
>  #include "ovsdb-parser.h"
>  #include "ovsdb-types.h"
> +#include "row.h"
> +#include "seq.h"
>  #include "simap.h"
>  #include "storage.h"
>  #include "table.h"
> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
>      if (db) {
>          struct shash_node *node;
>  
> +        /* Need to wait for compaction thread to finish the work. */
> +        while (ovsdb_snapshot_in_progress(db)) {
> +            ovsdb_snapshot_wait(db);
> +            poll_block();
> +        }
> +        if (ovsdb_snapshot_ready(db)) {
> +            struct ovsdb_error *error = ovsdb_snapshot(db, false);
> +
> +            if (error) {
> +                char *s = ovsdb_error_to_string_free(error);
> +                VLOG_INFO("%s: %s", db->name, s);
> +                free(s);

Nit: this is very similar to log_and_free_error().

For now the rest of the patch looks good to me.  I still want to test it
some more though.

Thanks,
Dumitru
Ilya Maximets July 1, 2022, 3:45 p.m. UTC | #2
On 7/1/22 17:31, Dumitru Ceara wrote:
> On 7/1/22 01:34, Ilya Maximets wrote:
>> Conversion of the database data into JSON object, serialization
>> and destruction of that object are the most heavy operations
>> during the database compaction.  If these operations are moved
>> to a separate thread, the main thread can continue processing
>> database requests in the meantime.
>>
> 
> Hi Ilya,
> 
>> With this change, the compaction is split in 3 phases:
>>
>> 1. Initialization:
>>    - Create a copy of the database.
>>    - Remember current database index.
>>    - Start a separate thread to convert a copy of the database
>>      into serialized JSON object.
>>
>> 2. Wait:
>>    - Continue normal operation until compaction thread is done.
>>    - Meanwhile, compaction thread:
>>      * Convert database copy to JSON.
>>      * Serialize resulted JSON.
>>      * Destroy original JSON object.
>>
>> 3. Finish:
>>    - Destroy the database copy.
>>    - Take the snapshot created by the thread.
>>    - Write on disk.
>>
> 
> The approach sounds good to me.
> 
>> The key for this schema to be fast is the ability to create
>> a shallow copy of the database.  This doesn't take too much
>> time allowing the thread to do most of work.
>>
>> Database copy is created and destroyed only by the main thread,
>> so there is no need for synchronization.
>>
>> Such solution allows to reduce the time main thread is blocked
>> by compaction by 80-90%.  For example, in ovn-heater tests
>> with 120 node density-heavy scenario, where compaction normally
>> takes 5-6 seconds at the end of a test, measured compaction
>> times was all below 1 second with the change applied.  Also,
>> note that these measured times are the sum of phases 1 and 3,
>> so actual poll intervals are about half a second in this case.
> 
> Nice!
> 
>>
>> Only implemented for raft storage for now.  The implementation
>> for standalone databases can be added later by using a file
>> offset as a database index and copying newly added changes
>> from the old file to a new one during ovsdb_log_replace().
>>
>> Reported-at: https://bugzilla.redhat.com/2069108
>> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
>> ---
>>  ovsdb/ovsdb-server.c |  18 +++++-
>>  ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
>>  ovsdb/ovsdb.h        |  24 ++++++++
>>  ovsdb/raft.c         |   8 ++-
>>  ovsdb/raft.h         |   3 +-
>>  ovsdb/row.c          |  17 +++++
>>  ovsdb/row.h          |   1 +
>>  ovsdb/storage.c      |  11 ++--
>>  ovsdb/storage.h      |   3 +-
>>  9 files changed, 204 insertions(+), 24 deletions(-)
>>
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index 5549b4e3a..eae2f6679 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -252,7 +252,9 @@ main_loop(struct server_config *config,
>>                  remove_db(config, node,
>>                            xasprintf("removing database %s because storage "
>>                                      "disconnected permanently", node->name));
>> -            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
>> +            } else if (!ovsdb_snapshot_in_progress(db->db)
>> +                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
>> +                           ovsdb_snapshot_ready(db->db))) {
>>                  log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
>>              }
>>          }
>> @@ -287,6 +289,7 @@ main_loop(struct server_config *config,
>>              ovsdb_trigger_wait(db->db, time_msec());
>>              ovsdb_storage_wait(db->db->storage);
>>              ovsdb_storage_read_wait(db->db->storage);
>> +            ovsdb_snapshot_wait(db->db);
>>          }
>>          if (run_process) {
>>              process_wait(run_process);
>> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc,
>>              ? !strcmp(node->name, db_name)
>>              : node->name[0] != '_') {
>>              if (db->db) {
>> +                struct ovsdb_error *error = NULL;
>> +
>>                  VLOG_INFO("compacting %s database by user request",
>>                            node->name);
>>  
>> -                struct ovsdb_error *error = ovsdb_snapshot(db->db,
>> -                                                           trim_memory);
>> +                error = ovsdb_snapshot(db->db, trim_memory);
>> +                if (!error && ovsdb_snapshot_in_progress(db->db)) {
>> +                    while (ovsdb_snapshot_in_progress(db->db)) {
>> +                        ovsdb_snapshot_wait(db->db);
>> +                        poll_block();
>> +                    }
>> +                    error = ovsdb_snapshot(db->db, trim_memory);
>> +                }
>> +
>>                  if (error) {
>>                      char *s = ovsdb_error_to_string(error);
>>                      ds_put_format(&reply, "%s\n", s);
>> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
>> index 91b4a01af..8cbefbe3d 100644
>> --- a/ovsdb/ovsdb.c
>> +++ b/ovsdb/ovsdb.c
>> @@ -25,9 +25,13 @@
>>  #include "file.h"
>>  #include "monitor.h"
>>  #include "openvswitch/json.h"
>> +#include "openvswitch/poll-loop.h"
>> +#include "ovs-thread.h"
>>  #include "ovsdb-error.h"
>>  #include "ovsdb-parser.h"
>>  #include "ovsdb-types.h"
>> +#include "row.h"
>> +#include "seq.h"
>>  #include "simap.h"
>>  #include "storage.h"
>>  #include "table.h"
>> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
>>      if (db) {
>>          struct shash_node *node;
>>  
>> +        /* Need to wait for compaction thread to finish the work. */
>> +        while (ovsdb_snapshot_in_progress(db)) {
>> +            ovsdb_snapshot_wait(db);
>> +            poll_block();
>> +        }
>> +        if (ovsdb_snapshot_ready(db)) {
>> +            struct ovsdb_error *error = ovsdb_snapshot(db, false);
>> +
>> +            if (error) {
>> +                char *s = ovsdb_error_to_string_free(error);
>> +                VLOG_INFO("%s: %s", db->name, s);
>> +                free(s);
> 
> Nit: this is very similar to log_and_free_error().

That's because I copied this piece from there. :)
I wasn't sure if it worth re-naming and moving the
function to some common header.  It seems simple
enough to just re-implement.  Also, we can print
a bit more information like a database name if not
limited by only having a error message.

> 
> For now the rest of the patch looks good to me.

Thanks for review!

> I still want to test it some more though.

Sure.

> 
> Thanks,
> Dumitru
>
Dumitru Ceara July 11, 2022, 3:46 p.m. UTC | #3
On 7/1/22 01:34, Ilya Maximets wrote:
> Conversion of the database data into JSON object, serialization
> and destruction of that object are the most heavy operations
> during the database compaction.  If these operations are moved
> to a separate thread, the main thread can continue processing
> database requests in the meantime.
> 
> With this change, the compaction is split in 3 phases:
> 
> 1. Initialization:
>    - Create a copy of the database.
>    - Remember current database index.
>    - Start a separate thread to convert a copy of the database
>      into serialized JSON object.
> 
> 2. Wait:
>    - Continue normal operation until compaction thread is done.
>    - Meanwhile, compaction thread:
>      * Convert database copy to JSON.
>      * Serialize resulted JSON.
>      * Destroy original JSON object.
> 
> 3. Finish:
>    - Destroy the database copy.
>    - Take the snapshot created by the thread.
>    - Write on disk.
> 
> The key for this schema to be fast is the ability to create
> a shallow copy of the database.  This doesn't take too much
> time allowing the thread to do most of work.
> 
> Database copy is created and destroyed only by the main thread,
> so there is no need for synchronization.
> 
> Such solution allows to reduce the time main thread is blocked
> by compaction by 80-90%.  For example, in ovn-heater tests
> with 120 node density-heavy scenario, where compaction normally
> takes 5-6 seconds at the end of a test, measured compaction
> times was all below 1 second with the change applied.  Also,
> note that these measured times are the sum of phases 1 and 3,
> so actual poll intervals are about half a second in this case.
> 
> Only implemented for raft storage for now.  The implementation
> for standalone databases can be added later by using a file
> offset as a database index and copying newly added changes
> from the old file to a new one during ovsdb_log_replace().
> 

Let's add a TODO item, what do you think?

Aside from this I have a few minor comments/questions below.
Nothing that can't be fixed at apply time if needed.  The rest
looks good to me, thanks!

Acked-by: Dumitru Ceara <dceara@redhat.com>

> Reported-at: https://bugzilla.redhat.com/2069108
> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
> ---
>  ovsdb/ovsdb-server.c |  18 +++++-
>  ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
>  ovsdb/ovsdb.h        |  24 ++++++++
>  ovsdb/raft.c         |   8 ++-
>  ovsdb/raft.h         |   3 +-
>  ovsdb/row.c          |  17 +++++
>  ovsdb/row.h          |   1 +
>  ovsdb/storage.c      |  11 ++--
>  ovsdb/storage.h      |   3 +-
>  9 files changed, 204 insertions(+), 24 deletions(-)
> 
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index 5549b4e3a..eae2f6679 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -252,7 +252,9 @@ main_loop(struct server_config *config,
>                  remove_db(config, node,
>                            xasprintf("removing database %s because storage "
>                                      "disconnected permanently", node->name));
> -            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
> +            } else if (!ovsdb_snapshot_in_progress(db->db)
> +                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
> +                           ovsdb_snapshot_ready(db->db))) {
>                  log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
>              }
>          }
> @@ -287,6 +289,7 @@ main_loop(struct server_config *config,
>              ovsdb_trigger_wait(db->db, time_msec());
>              ovsdb_storage_wait(db->db->storage);
>              ovsdb_storage_read_wait(db->db->storage);
> +            ovsdb_snapshot_wait(db->db);
>          }
>          if (run_process) {
>              process_wait(run_process);
> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc,
>              ? !strcmp(node->name, db_name)
>              : node->name[0] != '_') {
>              if (db->db) {
> +                struct ovsdb_error *error = NULL;
> +
>                  VLOG_INFO("compacting %s database by user request",
>                            node->name);
>  
> -                struct ovsdb_error *error = ovsdb_snapshot(db->db,
> -                                                           trim_memory);
> +                error = ovsdb_snapshot(db->db, trim_memory);
> +                if (!error && ovsdb_snapshot_in_progress(db->db)) {
> +                    while (ovsdb_snapshot_in_progress(db->db)) {
> +                        ovsdb_snapshot_wait(db->db);
> +                        poll_block();
> +                    }

This is not worse than before in the sense that, if snapshots take long,
the appctl to "ovsdb-server/compact" will block everything in the main
thread potentially causing raft and others timeouts to fire.

Can we improve this now that snapshots happen in a background thread?
Maybe worth a TODO for a follow up patch, what do you think?

> +                    error = ovsdb_snapshot(db->db, trim_memory);
> +                }
> +
>                  if (error) {
>                      char *s = ovsdb_error_to_string(error);
>                      ds_put_format(&reply, "%s\n", s);
> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
> index 91b4a01af..8cbefbe3d 100644
> --- a/ovsdb/ovsdb.c
> +++ b/ovsdb/ovsdb.c
> @@ -25,9 +25,13 @@
>  #include "file.h"
>  #include "monitor.h"
>  #include "openvswitch/json.h"
> +#include "openvswitch/poll-loop.h"
> +#include "ovs-thread.h"
>  #include "ovsdb-error.h"
>  #include "ovsdb-parser.h"
>  #include "ovsdb-types.h"
> +#include "row.h"
> +#include "seq.h"
>  #include "simap.h"
>  #include "storage.h"
>  #include "table.h"
> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
>      if (db) {
>          struct shash_node *node;
>  
> +        /* Need to wait for compaction thread to finish the work. */
> +        while (ovsdb_snapshot_in_progress(db)) {
> +            ovsdb_snapshot_wait(db);
> +            poll_block();
> +        }
> +        if (ovsdb_snapshot_ready(db)) {
> +            struct ovsdb_error *error = ovsdb_snapshot(db, false);
> +
> +            if (error) {
> +                char *s = ovsdb_error_to_string_free(error);
> +                VLOG_INFO("%s: %s", db->name, s);
> +                free(s);
> +            }
> +        }
> +
>          /* Close the log. */
>          ovsdb_storage_close(db->storage);
>  
> @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char *name)
>      return shash_find_data(&db->tables, name);
>  }
>  
> +static struct ovsdb *
> +ovsdb_clone_data(const struct ovsdb *db)
> +{
> +    struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL);
> +
> +    struct shash_node *node;
> +    SHASH_FOR_EACH (node, &db->tables) {
> +        struct ovsdb_table *table = node->data;
> +        struct ovsdb_table *new_table = shash_find_data(&new->tables,
> +                                                        node->name);
> +        struct ovsdb_row *row, *new_row;
> +
> +        hmap_reserve(&new_table->rows, hmap_count(&table->rows));
> +        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
> +            new_row = ovsdb_row_datum_clone(row);
> +            hmap_insert(&new_table->rows, &new_row->hmap_node,
> +                        ovsdb_row_hash(new_row));
> +        }
> +    }
> +
> +    return new;
> +}
> +
> +static void *
> +compaction_thread(void *aux)
> +{
> +    struct ovsdb_compaction_state *state = aux;
> +    uint64_t start_time = time_msec();
> +    struct json *data;
> +
> +    VLOG_DBG("%s: Compaction thread started.", state->db->name);
> +    data = ovsdb_to_txn_json(state->db, "compacting database online");
> +    state->data = json_serialized_object_create(data);
> +    json_destroy(data);
> +
> +    state->thread_time = time_msec() - start_time;
> +
> +    VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.",
> +             state->db->name, state->thread_time);
> +    seq_change(state->done);
> +    return NULL;
> +}
> +
> +void
> +ovsdb_snapshot_wait(struct ovsdb *db)
> +{
> +    if (db->snap_state) {
> +        seq_wait(db->snap_state->done, db->snap_state->seqno);
> +    }
> +}
> +
> +bool
> +ovsdb_snapshot_in_progress(struct ovsdb *db)
> +{
> +    return db->snap_state &&
> +           seq_read(db->snap_state->done) == db->snap_state->seqno;
> +}
> +
> +bool
> +ovsdb_snapshot_ready(struct ovsdb *db)
> +{
> +    return db->snap_state &&
> +           seq_read(db->snap_state->done) != db->snap_state->seqno;
> +}
> +
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
>  {
> -    if (!db->storage) {
> +    if (!db->storage || ovsdb_snapshot_in_progress(db)) {
>          return NULL;
>      }
>  
> +    uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage);
>      uint64_t elapsed, start_time = time_msec();
> -    struct json *schema = ovsdb_schema_to_json(db->schema);
> -    struct json *data = ovsdb_to_txn_json(db, "compacting database online");
> -    struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage,
> -                                                             schema, data);
> -    json_destroy(schema);
> -    json_destroy(data);
> +    struct ovsdb_compaction_state *state;
> +
> +    if (!applied_index) {
> +        /* Parallel compaction is not supported for standalone databases. */


Nit: It might be a bit too much but I think I'd add an API like
ovsdb_storage_supports_parallel_compaction() instead of relying on
checking the applied_index value.  It's a detail of the parallel
compaction implementation that it needs the applied_index to work.

I you go this way something like the following (maybe with a better
name) should do:

bool
ovsdb_storage_supports_parallel_compaction(const struct ovsdb_storage *storage)
{
    return storage->raft != NULL;
}

> +        state = xzalloc(sizeof *state);
> +        state->data = ovsdb_to_txn_json(db, "compacting database online");
> +        state->schema = ovsdb_schema_to_json(db->schema);
> +    } else if (ovsdb_snapshot_ready(db)) {
> +        xpthread_join(db->snap_state->thread, NULL);
> +
> +        state = db->snap_state;
> +        db->snap_state = NULL;
> +
> +        ovsdb_destroy(state->db);
> +        seq_destroy(state->done);
> +    } else {
> +        /* Creating a thread. */
> +        ovs_assert(!db->snap_state);
> +        state = xzalloc(sizeof *state);
> +
> +        state->db = ovsdb_clone_data(db);
> +        state->schema = ovsdb_schema_to_json(db->schema);
> +        state->applied_index = applied_index;
> +        state->done = seq_create();
> +        state->seqno = seq_read(state->done);
> +        state->thread = ovs_thread_create("compaction",
> +                                          compaction_thread, state);
> +        state->init_time = time_msec() - start_time;
> +
> +        db->snap_state = state;
> +        return NULL;
> +    }
> +
> +    struct ovsdb_error *error;
> +
> +    error = ovsdb_storage_store_snapshot(db->storage, state->schema,
> +                                         state->data, state->applied_index);
> +    json_destroy(state->schema);
> +    json_destroy(state->data);
>  
>  #if HAVE_DECL_MALLOC_TRIM
>      if (!error && trim_memory) {
> @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
>  #endif
>  
>      elapsed = time_msec() - start_time;
> -    if (elapsed > 1000) {
> -        VLOG_INFO("%s: Database compaction took %"PRIu64"ms",
> -                  db->name, elapsed);
> -    }
> +    VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
> +         "%s: Database compaction took %"PRIu64"ms "
> +         "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
> +         db->name, elapsed + state->init_time,
> +         state->init_time, elapsed, state->thread_time);
> +
> +    free(state);
>      return error;
>  }
>  
> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
> index ec2d235ec..2f77821e0 100644
> --- a/ovsdb/ovsdb.h
> +++ b/ovsdb/ovsdb.h
> @@ -72,6 +72,24 @@ struct ovsdb_txn_history_node {
>      struct ovsdb_txn *txn;
>  };
>  
> +struct ovsdb_compaction_state {
> +    pthread_t thread;          /* Thread handle. */
> +
> +    struct ovsdb *db;          /* Copy of a database data to compact. */
> +
> +    struct json *data;         /* 'db' as a serialized json. */
> +    struct json *schema;       /* 'db' schema json. */
> +    uint64_t applied_index;    /* Last applied index reported by the storage
> +                                * at the moment of a database copy. */
> +
> +    /* Completion signaling. */
> +    struct seq *done;
> +    uint64_t seqno;
> +
> +    uint64_t init_time;        /* Time spent by the main thread preparing. */
> +    uint64_t thread_time;      /* Time spent for compaction by the thread. */
> +};
> +
>  struct ovsdb {
>      char *name;
>      struct ovsdb_schema *schema;
> @@ -101,6 +119,9 @@ struct ovsdb {
>      struct ovs_list txn_forward_new;
>      /* Hash map for transactions that are already sent and waits for reply. */
>      struct hmap txn_forward_sent;
> +
> +    /* Database compaction. */
> +    struct ovsdb_compaction_state *snap_state;
>  };
>  
>  struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
> @@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
>  
>  struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory)
>      OVS_WARN_UNUSED_RESULT;
> +void ovsdb_snapshot_wait(struct ovsdb *);
> +bool ovsdb_snapshot_in_progress(struct ovsdb *);
> +bool ovsdb_snapshot_ready(struct ovsdb *);
>  
>  void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src);
>  
> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
> index 856d083f2..b2c21e70f 100644
> --- a/ovsdb/raft.c
> +++ b/ovsdb/raft.c
> @@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft)
>   * only valuable to call it if raft_get_log_length() is significant and
>   * especially if raft_grew_lots() returns true. */
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
> -raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
> +raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data,
> +                    uint64_t applied_index)
>  {
>      if (raft->joining) {
>          return ovsdb_error(NULL,
> @@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
>                             "cannot store a snapshot following failure");
>      }
>  
> -    if (raft->last_applied < raft->log_start) {
> +    uint64_t new_log_start = applied_index ? applied_index + 1
> +                                           : raft->last_applied + 1;

I tried to figure it out but I'm not sure.  Is the following scenario
possible?

a. snapshot operation starts, we stored applied_index in db->snap_state
b. a (broken?) server sends an append request:
   raft_handle_append_request():
     raft_handle_append_entries():
       raft_truncate(raft, log_index)

Can 'log_index < raft->commit_index' ever be true?

Furthermore, can 'log_index < raft->last_applied' ever be true?

c. snapshot operation finishes:
   ovsdb_storage_store_snapshot():
     raft_store_snapshot(.., db->snap_state->applied_index):

Would we be potentially skipping entries that are not in the snapshot?

Anyhow, it's not related to this patch, but if that's the case maybe
we should add a check in raft_handle_append_entries() to only
raft_truncate() if log_index > raft->commit_index.

> +    if (new_log_start <= raft->log_start) {
>          return ovsdb_error(NULL, "not storing a duplicate snapshot");
>      }
>  
> -    uint64_t new_log_start = raft->last_applied + 1;
>      struct raft_entry new_snapshot = {
>          .term = raft_get_term(raft, new_log_start - 1),
>          .eid = *raft_get_eid(raft, new_log_start - 1),
> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
> index 599bc0ae8..403ed3dd7 100644
> --- a/ovsdb/raft.h
> +++ b/ovsdb/raft.h
> @@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *);
>  bool raft_may_snapshot(const struct raft *);
>  void raft_notify_snapshot_recommended(struct raft *);
>  struct ovsdb_error *raft_store_snapshot(struct raft *,
> -                                        const struct json *new_snapshot)
> +                                        const struct json *new_snapshot,
> +                                        uint64_t applied_index)
>      OVS_WARN_UNUSED_RESULT;
>  
>  /* Cluster management. */
> diff --git a/ovsdb/row.c b/ovsdb/row.c
> index fd50c7e7b..3f0bb8acf 100644
> --- a/ovsdb/row.c
> +++ b/ovsdb/row.c
> @@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old)
>      return new;
>  }
>  
> +struct ovsdb_row *
> +ovsdb_row_datum_clone(const struct ovsdb_row *old)
> +{
> +    const struct ovsdb_table *table = old->table;
> +    const struct shash_node *node;
> +    struct ovsdb_row *new;
> +
> +    new = allocate_row(table);
> +    SHASH_FOR_EACH (node, &table->schema->columns) {
> +        const struct ovsdb_column *column = node->data;
> +        ovsdb_datum_clone(&new->fields[column->index],
> +                          &old->fields[column->index]);
> +    }
> +    return new;
> +}
> +
> +
>  /* The caller is responsible for ensuring that 'row' has been removed from its
>   * table and that it is not participating in a transaction. */
>  void
> diff --git a/ovsdb/row.h b/ovsdb/row.h
> index 4d3c17afc..ff91288fe 100644
> --- a/ovsdb/row.h
> +++ b/ovsdb/row.h
> @@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *);
>  
>  struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
>  struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *);
> +struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *);
>  void ovsdb_row_destroy(struct ovsdb_row *);
>  
>  uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *,
> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
> index d4984be25..e8f95ce64 100644
> --- a/ovsdb/storage.c
> +++ b/ovsdb/storage.c
> @@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage *storage)
>  static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>                                 const struct json *schema,
> -                               const struct json *data)
> +                               const struct json *data, uint64_t index)
>  {
>      if (storage->raft) {
>          struct json *entries = json_array_create_empty();
> @@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>              json_array_add(entries, json_clone(data));
>          }
>          struct ovsdb_error *error = raft_store_snapshot(storage->raft,
> -                                                        entries);
> +                                                        entries, index);
>          json_destroy(entries);
>          return error;
>      } else if (storage->log) {
> @@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
>                               const struct json *schema,
> -                             const struct json *data)
> +                             const struct json *data, uint64_t index)
>  {
>      struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
> -                                                               schema, data);
> +                                                               schema, data,
> +                                                               index);
>      bool retry_quickly = error != NULL;
>      schedule_next_snapshot(storage, retry_quickly);
>      return error;
> @@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
>                                            prereq, &result);
>          json_destroy(txn_json);
>      } else if (storage->log) {
> -        w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
> +        w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0);
>      } else {
>          /* When 'error' and 'command' are both null, it indicates that the
>           * command is complete.  This is fine since this unbacked storage drops
> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
> index ff026b77f..a1fdaa564 100644
> --- a/ovsdb/storage.h
> +++ b/ovsdb/storage.h
> @@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *);
>  bool ovsdb_storage_should_snapshot(struct ovsdb_storage *);
>  struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
>                                                   const struct json *schema,
> -                                                 const struct json *snapshot)
> +                                                 const struct json *snapshot,
> +                                                 uint64_t applied_index)
>      OVS_WARN_UNUSED_RESULT;
>  
>  struct ovsdb_write *ovsdb_storage_write_schema_change(
Dumitru Ceara July 11, 2022, 3:46 p.m. UTC | #4
On 7/1/22 17:45, Ilya Maximets wrote:
> On 7/1/22 17:31, Dumitru Ceara wrote:
>> On 7/1/22 01:34, Ilya Maximets wrote:
>>> Conversion of the database data into JSON object, serialization
>>> and destruction of that object are the most heavy operations
>>> during the database compaction.  If these operations are moved
>>> to a separate thread, the main thread can continue processing
>>> database requests in the meantime.
>>>
>>
>> Hi Ilya,
>>
>>> With this change, the compaction is split in 3 phases:
>>>
>>> 1. Initialization:
>>>    - Create a copy of the database.
>>>    - Remember current database index.
>>>    - Start a separate thread to convert a copy of the database
>>>      into serialized JSON object.
>>>
>>> 2. Wait:
>>>    - Continue normal operation until compaction thread is done.
>>>    - Meanwhile, compaction thread:
>>>      * Convert database copy to JSON.
>>>      * Serialize resulted JSON.
>>>      * Destroy original JSON object.
>>>
>>> 3. Finish:
>>>    - Destroy the database copy.
>>>    - Take the snapshot created by the thread.
>>>    - Write on disk.
>>>
>>
>> The approach sounds good to me.
>>
>>> The key for this schema to be fast is the ability to create
>>> a shallow copy of the database.  This doesn't take too much
>>> time allowing the thread to do most of work.
>>>
>>> Database copy is created and destroyed only by the main thread,
>>> so there is no need for synchronization.
>>>
>>> Such solution allows to reduce the time main thread is blocked
>>> by compaction by 80-90%.  For example, in ovn-heater tests
>>> with 120 node density-heavy scenario, where compaction normally
>>> takes 5-6 seconds at the end of a test, measured compaction
>>> times was all below 1 second with the change applied.  Also,
>>> note that these measured times are the sum of phases 1 and 3,
>>> so actual poll intervals are about half a second in this case.
>>
>> Nice!
>>
>>>
>>> Only implemented for raft storage for now.  The implementation
>>> for standalone databases can be added later by using a file
>>> offset as a database index and copying newly added changes
>>> from the old file to a new one during ovsdb_log_replace().
>>>
>>> Reported-at: https://bugzilla.redhat.com/2069108
>>> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
>>> ---
>>>  ovsdb/ovsdb-server.c |  18 +++++-
>>>  ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
>>>  ovsdb/ovsdb.h        |  24 ++++++++
>>>  ovsdb/raft.c         |   8 ++-
>>>  ovsdb/raft.h         |   3 +-
>>>  ovsdb/row.c          |  17 +++++
>>>  ovsdb/row.h          |   1 +
>>>  ovsdb/storage.c      |  11 ++--
>>>  ovsdb/storage.h      |   3 +-
>>>  9 files changed, 204 insertions(+), 24 deletions(-)
>>>
>>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>>> index 5549b4e3a..eae2f6679 100644
>>> --- a/ovsdb/ovsdb-server.c
>>> +++ b/ovsdb/ovsdb-server.c
>>> @@ -252,7 +252,9 @@ main_loop(struct server_config *config,
>>>                  remove_db(config, node,
>>>                            xasprintf("removing database %s because storage "
>>>                                      "disconnected permanently", node->name));
>>> -            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
>>> +            } else if (!ovsdb_snapshot_in_progress(db->db)
>>> +                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
>>> +                           ovsdb_snapshot_ready(db->db))) {
>>>                  log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
>>>              }
>>>          }
>>> @@ -287,6 +289,7 @@ main_loop(struct server_config *config,
>>>              ovsdb_trigger_wait(db->db, time_msec());
>>>              ovsdb_storage_wait(db->db->storage);
>>>              ovsdb_storage_read_wait(db->db->storage);
>>> +            ovsdb_snapshot_wait(db->db);
>>>          }
>>>          if (run_process) {
>>>              process_wait(run_process);
>>> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc,
>>>              ? !strcmp(node->name, db_name)
>>>              : node->name[0] != '_') {
>>>              if (db->db) {
>>> +                struct ovsdb_error *error = NULL;
>>> +
>>>                  VLOG_INFO("compacting %s database by user request",
>>>                            node->name);
>>>  
>>> -                struct ovsdb_error *error = ovsdb_snapshot(db->db,
>>> -                                                           trim_memory);
>>> +                error = ovsdb_snapshot(db->db, trim_memory);
>>> +                if (!error && ovsdb_snapshot_in_progress(db->db)) {
>>> +                    while (ovsdb_snapshot_in_progress(db->db)) {
>>> +                        ovsdb_snapshot_wait(db->db);
>>> +                        poll_block();
>>> +                    }
>>> +                    error = ovsdb_snapshot(db->db, trim_memory);
>>> +                }
>>> +
>>>                  if (error) {
>>>                      char *s = ovsdb_error_to_string(error);
>>>                      ds_put_format(&reply, "%s\n", s);
>>> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
>>> index 91b4a01af..8cbefbe3d 100644
>>> --- a/ovsdb/ovsdb.c
>>> +++ b/ovsdb/ovsdb.c
>>> @@ -25,9 +25,13 @@
>>>  #include "file.h"
>>>  #include "monitor.h"
>>>  #include "openvswitch/json.h"
>>> +#include "openvswitch/poll-loop.h"
>>> +#include "ovs-thread.h"
>>>  #include "ovsdb-error.h"
>>>  #include "ovsdb-parser.h"
>>>  #include "ovsdb-types.h"
>>> +#include "row.h"
>>> +#include "seq.h"
>>>  #include "simap.h"
>>>  #include "storage.h"
>>>  #include "table.h"
>>> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
>>>      if (db) {
>>>          struct shash_node *node;
>>>  
>>> +        /* Need to wait for compaction thread to finish the work. */
>>> +        while (ovsdb_snapshot_in_progress(db)) {
>>> +            ovsdb_snapshot_wait(db);
>>> +            poll_block();
>>> +        }
>>> +        if (ovsdb_snapshot_ready(db)) {
>>> +            struct ovsdb_error *error = ovsdb_snapshot(db, false);
>>> +
>>> +            if (error) {
>>> +                char *s = ovsdb_error_to_string_free(error);
>>> +                VLOG_INFO("%s: %s", db->name, s);
>>> +                free(s);
>>
>> Nit: this is very similar to log_and_free_error().
> 
> That's because I copied this piece from there. :)

I didn't want to make it sound like that.  Or, at least, not explicitly. :)

> I wasn't sure if it worth re-naming and moving the
> function to some common header.  It seems simple
> enough to just re-implement.  Also, we can print
> a bit more information like a database name if not
> limited by only having a error message.
> 

Ok, let's leave it as-is.

>>
>> For now the rest of the patch looks good to me.
> 
> Thanks for review!
> 
>> I still want to test it some more though.
> 
> Sure.
> 

I left a few more comments on the original patch.

>>
>> Thanks,
>> Dumitru
>>
>
Ilya Maximets July 13, 2022, 7:49 p.m. UTC | #5
On 7/11/22 17:46, Dumitru Ceara wrote:
> On 7/1/22 01:34, Ilya Maximets wrote:
>> Conversion of the database data into JSON object, serialization
>> and destruction of that object are the most heavy operations
>> during the database compaction.  If these operations are moved
>> to a separate thread, the main thread can continue processing
>> database requests in the meantime.
>>
>> With this change, the compaction is split in 3 phases:
>>
>> 1. Initialization:
>>    - Create a copy of the database.
>>    - Remember current database index.
>>    - Start a separate thread to convert a copy of the database
>>      into serialized JSON object.
>>
>> 2. Wait:
>>    - Continue normal operation until compaction thread is done.
>>    - Meanwhile, compaction thread:
>>      * Convert database copy to JSON.
>>      * Serialize resulted JSON.
>>      * Destroy original JSON object.
>>
>> 3. Finish:
>>    - Destroy the database copy.
>>    - Take the snapshot created by the thread.
>>    - Write on disk.
>>
>> The key for this schema to be fast is the ability to create
>> a shallow copy of the database.  This doesn't take too much
>> time allowing the thread to do most of work.
>>
>> Database copy is created and destroyed only by the main thread,
>> so there is no need for synchronization.
>>
>> Such solution allows to reduce the time main thread is blocked
>> by compaction by 80-90%.  For example, in ovn-heater tests
>> with 120 node density-heavy scenario, where compaction normally
>> takes 5-6 seconds at the end of a test, measured compaction
>> times was all below 1 second with the change applied.  Also,
>> note that these measured times are the sum of phases 1 and 3,
>> so actual poll intervals are about half a second in this case.
>>
>> Only implemented for raft storage for now.  The implementation
>> for standalone databases can be added later by using a file
>> offset as a database index and copying newly added changes
>> from the old file to a new one during ovsdb_log_replace().
>>
> 
> Let's add a TODO item, what do you think?

Sure.  Though the TODO file needs some rework first.  I'll send
a separate patch to update the TODO file with some clean up and
all the new items I can remember.

> 
> Aside from this I have a few minor comments/questions below.
> Nothing that can't be fixed at apply time if needed.  The rest
> looks good to me, thanks!
> 
> Acked-by: Dumitru Ceara <dceara@redhat.com>

Thanks!  I didn't change anything for now in this patch. :)  See
replies below.  I made changes we discussed for the first patch.

With that, I applied the set to master.

best regards, Ilya Maximets.

> 
>> Reported-at: https://bugzilla.redhat.com/2069108
>> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
>> ---
>>  ovsdb/ovsdb-server.c |  18 +++++-
>>  ovsdb/ovsdb.c        | 143 +++++++++++++++++++++++++++++++++++++++----
>>  ovsdb/ovsdb.h        |  24 ++++++++
>>  ovsdb/raft.c         |   8 ++-
>>  ovsdb/raft.h         |   3 +-
>>  ovsdb/row.c          |  17 +++++
>>  ovsdb/row.h          |   1 +
>>  ovsdb/storage.c      |  11 ++--
>>  ovsdb/storage.h      |   3 +-
>>  9 files changed, 204 insertions(+), 24 deletions(-)
>>
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index 5549b4e3a..eae2f6679 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -252,7 +252,9 @@ main_loop(struct server_config *config,
>>                  remove_db(config, node,
>>                            xasprintf("removing database %s because storage "
>>                                      "disconnected permanently", node->name));
>> -            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
>> +            } else if (!ovsdb_snapshot_in_progress(db->db)
>> +                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
>> +                           ovsdb_snapshot_ready(db->db))) {
>>                  log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
>>              }
>>          }
>> @@ -287,6 +289,7 @@ main_loop(struct server_config *config,
>>              ovsdb_trigger_wait(db->db, time_msec());
>>              ovsdb_storage_wait(db->db->storage);
>>              ovsdb_storage_read_wait(db->db->storage);
>> +            ovsdb_snapshot_wait(db->db);
>>          }
>>          if (run_process) {
>>              process_wait(run_process);
>> @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc,
>>              ? !strcmp(node->name, db_name)
>>              : node->name[0] != '_') {
>>              if (db->db) {
>> +                struct ovsdb_error *error = NULL;
>> +
>>                  VLOG_INFO("compacting %s database by user request",
>>                            node->name);
>>  
>> -                struct ovsdb_error *error = ovsdb_snapshot(db->db,
>> -                                                           trim_memory);
>> +                error = ovsdb_snapshot(db->db, trim_memory);
>> +                if (!error && ovsdb_snapshot_in_progress(db->db)) {
>> +                    while (ovsdb_snapshot_in_progress(db->db)) {
>> +                        ovsdb_snapshot_wait(db->db);
>> +                        poll_block();
>> +                    }
> 
> This is not worse than before in the sense that, if snapshots take long,
> the appctl to "ovsdb-server/compact" will block everything in the main
> thread potentially causing raft and others timeouts to fire.

I thought about this while implementing.  And there is a way to do that,
but it wasn't very clean, so I didn't want to spend a lot of time.

> 
> Can we improve this now that snapshots happen in a background thread?
> Maybe worth a TODO for a follow up patch, what do you think?

Sure, will add a TODO item.  It's a good thing to implement.

> 
>> +                    error = ovsdb_snapshot(db->db, trim_memory);
>> +                }
>> +
>>                  if (error) {
>>                      char *s = ovsdb_error_to_string(error);
>>                      ds_put_format(&reply, "%s\n", s);
>> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
>> index 91b4a01af..8cbefbe3d 100644
>> --- a/ovsdb/ovsdb.c
>> +++ b/ovsdb/ovsdb.c
>> @@ -25,9 +25,13 @@
>>  #include "file.h"
>>  #include "monitor.h"
>>  #include "openvswitch/json.h"
>> +#include "openvswitch/poll-loop.h"
>> +#include "ovs-thread.h"
>>  #include "ovsdb-error.h"
>>  #include "ovsdb-parser.h"
>>  #include "ovsdb-types.h"
>> +#include "row.h"
>> +#include "seq.h"
>>  #include "simap.h"
>>  #include "storage.h"
>>  #include "table.h"
>> @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
>>      if (db) {
>>          struct shash_node *node;
>>  
>> +        /* Need to wait for compaction thread to finish the work. */
>> +        while (ovsdb_snapshot_in_progress(db)) {
>> +            ovsdb_snapshot_wait(db);
>> +            poll_block();
>> +        }
>> +        if (ovsdb_snapshot_ready(db)) {
>> +            struct ovsdb_error *error = ovsdb_snapshot(db, false);
>> +
>> +            if (error) {
>> +                char *s = ovsdb_error_to_string_free(error);
>> +                VLOG_INFO("%s: %s", db->name, s);
>> +                free(s);
>> +            }
>> +        }
>> +
>>          /* Close the log. */
>>          ovsdb_storage_close(db->storage);
>>  
>> @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char *name)
>>      return shash_find_data(&db->tables, name);
>>  }
>>  
>> +static struct ovsdb *
>> +ovsdb_clone_data(const struct ovsdb *db)
>> +{
>> +    struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL);
>> +
>> +    struct shash_node *node;
>> +    SHASH_FOR_EACH (node, &db->tables) {
>> +        struct ovsdb_table *table = node->data;
>> +        struct ovsdb_table *new_table = shash_find_data(&new->tables,
>> +                                                        node->name);
>> +        struct ovsdb_row *row, *new_row;
>> +
>> +        hmap_reserve(&new_table->rows, hmap_count(&table->rows));
>> +        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
>> +            new_row = ovsdb_row_datum_clone(row);
>> +            hmap_insert(&new_table->rows, &new_row->hmap_node,
>> +                        ovsdb_row_hash(new_row));
>> +        }
>> +    }
>> +
>> +    return new;
>> +}
>> +
>> +static void *
>> +compaction_thread(void *aux)
>> +{
>> +    struct ovsdb_compaction_state *state = aux;
>> +    uint64_t start_time = time_msec();
>> +    struct json *data;
>> +
>> +    VLOG_DBG("%s: Compaction thread started.", state->db->name);
>> +    data = ovsdb_to_txn_json(state->db, "compacting database online");
>> +    state->data = json_serialized_object_create(data);
>> +    json_destroy(data);
>> +
>> +    state->thread_time = time_msec() - start_time;
>> +
>> +    VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.",
>> +             state->db->name, state->thread_time);
>> +    seq_change(state->done);
>> +    return NULL;
>> +}
>> +
>> +void
>> +ovsdb_snapshot_wait(struct ovsdb *db)
>> +{
>> +    if (db->snap_state) {
>> +        seq_wait(db->snap_state->done, db->snap_state->seqno);
>> +    }
>> +}
>> +
>> +bool
>> +ovsdb_snapshot_in_progress(struct ovsdb *db)
>> +{
>> +    return db->snap_state &&
>> +           seq_read(db->snap_state->done) == db->snap_state->seqno;
>> +}
>> +
>> +bool
>> +ovsdb_snapshot_ready(struct ovsdb *db)
>> +{
>> +    return db->snap_state &&
>> +           seq_read(db->snap_state->done) != db->snap_state->seqno;
>> +}
>> +
>>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>>  ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
>>  {
>> -    if (!db->storage) {
>> +    if (!db->storage || ovsdb_snapshot_in_progress(db)) {
>>          return NULL;
>>      }
>>  
>> +    uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage);
>>      uint64_t elapsed, start_time = time_msec();
>> -    struct json *schema = ovsdb_schema_to_json(db->schema);
>> -    struct json *data = ovsdb_to_txn_json(db, "compacting database online");
>> -    struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage,
>> -                                                             schema, data);
>> -    json_destroy(schema);
>> -    json_destroy(data);
>> +    struct ovsdb_compaction_state *state;
>> +
>> +    if (!applied_index) {
>> +        /* Parallel compaction is not supported for standalone databases. */
> 
> 
> Nit: It might be a bit too much but I think I'd add an API like
> ovsdb_storage_supports_parallel_compaction() instead of relying on
> checking the applied_index value.  It's a detail of the parallel
> compaction implementation that it needs the applied_index to work.
> 
> I you go this way something like the following (maybe with a better
> name) should do:
> 
> bool
> ovsdb_storage_supports_parallel_compaction(const struct ovsdb_storage *storage)
> {
>     return storage->raft != NULL;
> }

Yeah.  You right that it's not a very clean solution.  Though
transaction history already depends on this call with a similar
semantics.  So, I think, it's ok to keep as-is for now.

We, probbaly, need a better storage abstraction than we have now
to make it clean.

> 
>> +        state = xzalloc(sizeof *state);
>> +        state->data = ovsdb_to_txn_json(db, "compacting database online");
>> +        state->schema = ovsdb_schema_to_json(db->schema);
>> +    } else if (ovsdb_snapshot_ready(db)) {
>> +        xpthread_join(db->snap_state->thread, NULL);
>> +
>> +        state = db->snap_state;
>> +        db->snap_state = NULL;
>> +
>> +        ovsdb_destroy(state->db);
>> +        seq_destroy(state->done);
>> +    } else {
>> +        /* Creating a thread. */
>> +        ovs_assert(!db->snap_state);
>> +        state = xzalloc(sizeof *state);
>> +
>> +        state->db = ovsdb_clone_data(db);
>> +        state->schema = ovsdb_schema_to_json(db->schema);
>> +        state->applied_index = applied_index;
>> +        state->done = seq_create();
>> +        state->seqno = seq_read(state->done);
>> +        state->thread = ovs_thread_create("compaction",
>> +                                          compaction_thread, state);
>> +        state->init_time = time_msec() - start_time;
>> +
>> +        db->snap_state = state;
>> +        return NULL;
>> +    }
>> +
>> +    struct ovsdb_error *error;
>> +
>> +    error = ovsdb_storage_store_snapshot(db->storage, state->schema,
>> +                                         state->data, state->applied_index);
>> +    json_destroy(state->schema);
>> +    json_destroy(state->data);
>>  
>>  #if HAVE_DECL_MALLOC_TRIM
>>      if (!error && trim_memory) {
>> @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
>>  #endif
>>  
>>      elapsed = time_msec() - start_time;
>> -    if (elapsed > 1000) {
>> -        VLOG_INFO("%s: Database compaction took %"PRIu64"ms",
>> -                  db->name, elapsed);
>> -    }
>> +    VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
>> +         "%s: Database compaction took %"PRIu64"ms "
>> +         "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
>> +         db->name, elapsed + state->init_time,
>> +         state->init_time, elapsed, state->thread_time);
>> +
>> +    free(state);
>>      return error;
>>  }
>>  
>> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
>> index ec2d235ec..2f77821e0 100644
>> --- a/ovsdb/ovsdb.h
>> +++ b/ovsdb/ovsdb.h
>> @@ -72,6 +72,24 @@ struct ovsdb_txn_history_node {
>>      struct ovsdb_txn *txn;
>>  };
>>  
>> +struct ovsdb_compaction_state {
>> +    pthread_t thread;          /* Thread handle. */
>> +
>> +    struct ovsdb *db;          /* Copy of a database data to compact. */
>> +
>> +    struct json *data;         /* 'db' as a serialized json. */
>> +    struct json *schema;       /* 'db' schema json. */
>> +    uint64_t applied_index;    /* Last applied index reported by the storage
>> +                                * at the moment of a database copy. */
>> +
>> +    /* Completion signaling. */
>> +    struct seq *done;
>> +    uint64_t seqno;
>> +
>> +    uint64_t init_time;        /* Time spent by the main thread preparing. */
>> +    uint64_t thread_time;      /* Time spent for compaction by the thread. */
>> +};
>> +
>>  struct ovsdb {
>>      char *name;
>>      struct ovsdb_schema *schema;
>> @@ -101,6 +119,9 @@ struct ovsdb {
>>      struct ovs_list txn_forward_new;
>>      /* Hash map for transactions that are already sent and waits for reply. */
>>      struct hmap txn_forward_sent;
>> +
>> +    /* Database compaction. */
>> +    struct ovsdb_compaction_state *snap_state;
>>  };
>>  
>>  struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
>> @@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
>>  
>>  struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory)
>>      OVS_WARN_UNUSED_RESULT;
>> +void ovsdb_snapshot_wait(struct ovsdb *);
>> +bool ovsdb_snapshot_in_progress(struct ovsdb *);
>> +bool ovsdb_snapshot_ready(struct ovsdb *);
>>  
>>  void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src);
>>  
>> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
>> index 856d083f2..b2c21e70f 100644
>> --- a/ovsdb/raft.c
>> +++ b/ovsdb/raft.c
>> @@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft)
>>   * only valuable to call it if raft_get_log_length() is significant and
>>   * especially if raft_grew_lots() returns true. */
>>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>> -raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
>> +raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data,
>> +                    uint64_t applied_index)
>>  {
>>      if (raft->joining) {
>>          return ovsdb_error(NULL,
>> @@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
>>                             "cannot store a snapshot following failure");
>>      }
>>  
>> -    if (raft->last_applied < raft->log_start) {
>> +    uint64_t new_log_start = applied_index ? applied_index + 1
>> +                                           : raft->last_applied + 1;
> 
> I tried to figure it out but I'm not sure.  Is the following scenario
> possible?
> 
> a. snapshot operation starts, we stored applied_index in db->snap_state
> b. a (broken?) server sends an append request:
>    raft_handle_append_request():
>      raft_handle_append_entries():
>        raft_truncate(raft, log_index)
> 
> Can 'log_index < raft->commit_index' ever be true?
> 
> Furthermore, can 'log_index < raft->last_applied' ever be true?
> 
> c. snapshot operation finishes:
>    ovsdb_storage_store_snapshot():
>      raft_store_snapshot(.., db->snap_state->applied_index):
> 
> Would we be potentially skipping entries that are not in the snapshot?
> 
> Anyhow, it's not related to this patch, but if that's the case maybe
> we should add a check in raft_handle_append_entries() to only
> raft_truncate() if log_index > raft->commit_index.

I think, database currently heavily depends on a fact that
committed entries are never rolled back.  It will be a very
seriuos problem if that can happen in practice.  And, yes,
if it can happen, it can happen even without compaction
involved, so not a concern for a current patch set.

I need to think more if this condition is actually possible.

> 
>> +    if (new_log_start <= raft->log_start) {
>>          return ovsdb_error(NULL, "not storing a duplicate snapshot");
>>      }
>>  
>> -    uint64_t new_log_start = raft->last_applied + 1;
>>      struct raft_entry new_snapshot = {
>>          .term = raft_get_term(raft, new_log_start - 1),
>>          .eid = *raft_get_eid(raft, new_log_start - 1),
>> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
>> index 599bc0ae8..403ed3dd7 100644
>> --- a/ovsdb/raft.h
>> +++ b/ovsdb/raft.h
>> @@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *);
>>  bool raft_may_snapshot(const struct raft *);
>>  void raft_notify_snapshot_recommended(struct raft *);
>>  struct ovsdb_error *raft_store_snapshot(struct raft *,
>> -                                        const struct json *new_snapshot)
>> +                                        const struct json *new_snapshot,
>> +                                        uint64_t applied_index)
>>      OVS_WARN_UNUSED_RESULT;
>>  
>>  /* Cluster management. */
>> diff --git a/ovsdb/row.c b/ovsdb/row.c
>> index fd50c7e7b..3f0bb8acf 100644
>> --- a/ovsdb/row.c
>> +++ b/ovsdb/row.c
>> @@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old)
>>      return new;
>>  }
>>  
>> +struct ovsdb_row *
>> +ovsdb_row_datum_clone(const struct ovsdb_row *old)
>> +{
>> +    const struct ovsdb_table *table = old->table;
>> +    const struct shash_node *node;
>> +    struct ovsdb_row *new;
>> +
>> +    new = allocate_row(table);
>> +    SHASH_FOR_EACH (node, &table->schema->columns) {
>> +        const struct ovsdb_column *column = node->data;
>> +        ovsdb_datum_clone(&new->fields[column->index],
>> +                          &old->fields[column->index]);
>> +    }
>> +    return new;
>> +}
>> +
>> +
>>  /* The caller is responsible for ensuring that 'row' has been removed from its
>>   * table and that it is not participating in a transaction. */
>>  void
>> diff --git a/ovsdb/row.h b/ovsdb/row.h
>> index 4d3c17afc..ff91288fe 100644
>> --- a/ovsdb/row.h
>> +++ b/ovsdb/row.h
>> @@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *);
>>  
>>  struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
>>  struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *);
>> +struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *);
>>  void ovsdb_row_destroy(struct ovsdb_row *);
>>  
>>  uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *,
>> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
>> index d4984be25..e8f95ce64 100644
>> --- a/ovsdb/storage.c
>> +++ b/ovsdb/storage.c
>> @@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage *storage)
>>  static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>>  ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>>                                 const struct json *schema,
>> -                               const struct json *data)
>> +                               const struct json *data, uint64_t index)
>>  {
>>      if (storage->raft) {
>>          struct json *entries = json_array_create_empty();
>> @@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>>              json_array_add(entries, json_clone(data));
>>          }
>>          struct ovsdb_error *error = raft_store_snapshot(storage->raft,
>> -                                                        entries);
>> +                                                        entries, index);
>>          json_destroy(entries);
>>          return error;
>>      } else if (storage->log) {
>> @@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
>>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>>  ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
>>                               const struct json *schema,
>> -                             const struct json *data)
>> +                             const struct json *data, uint64_t index)
>>  {
>>      struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
>> -                                                               schema, data);
>> +                                                               schema, data,
>> +                                                               index);
>>      bool retry_quickly = error != NULL;
>>      schedule_next_snapshot(storage, retry_quickly);
>>      return error;
>> @@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
>>                                            prereq, &result);
>>          json_destroy(txn_json);
>>      } else if (storage->log) {
>> -        w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
>> +        w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0);
>>      } else {
>>          /* When 'error' and 'command' are both null, it indicates that the
>>           * command is complete.  This is fine since this unbacked storage drops
>> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
>> index ff026b77f..a1fdaa564 100644
>> --- a/ovsdb/storage.h
>> +++ b/ovsdb/storage.h
>> @@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *);
>>  bool ovsdb_storage_should_snapshot(struct ovsdb_storage *);
>>  struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
>>                                                   const struct json *schema,
>> -                                                 const struct json *snapshot)
>> +                                                 const struct json *snapshot,
>> +                                                 uint64_t applied_index)
>>      OVS_WARN_UNUSED_RESULT;
>>  
>>  struct ovsdb_write *ovsdb_storage_write_schema_change(
>
Dumitru Ceara July 14, 2022, 7:49 a.m. UTC | #6
On 7/13/22 21:49, Ilya Maximets wrote:
>> Aside from this I have a few minor comments/questions below.
>> Nothing that can't be fixed at apply time if needed.  The rest
>> looks good to me, thanks!
>>
>> Acked-by: Dumitru Ceara <dceara@redhat.com>
> Thanks!  I didn't change anything for now in this patch. :)  See
> replies below.  I made changes we discussed for the first patch.
> 
> With that, I applied the set to master.
> 

Fine by me, thanks!
diff mbox series

Patch

diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 5549b4e3a..eae2f6679 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -252,7 +252,9 @@  main_loop(struct server_config *config,
                 remove_db(config, node,
                           xasprintf("removing database %s because storage "
                                     "disconnected permanently", node->name));
-            } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
+            } else if (!ovsdb_snapshot_in_progress(db->db)
+                       && (ovsdb_storage_should_snapshot(db->db->storage) ||
+                           ovsdb_snapshot_ready(db->db))) {
                 log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
             }
         }
@@ -287,6 +289,7 @@  main_loop(struct server_config *config,
             ovsdb_trigger_wait(db->db, time_msec());
             ovsdb_storage_wait(db->db->storage);
             ovsdb_storage_read_wait(db->db->storage);
+            ovsdb_snapshot_wait(db->db);
         }
         if (run_process) {
             process_wait(run_process);
@@ -1552,11 +1555,20 @@  ovsdb_server_compact(struct unixctl_conn *conn, int argc,
             ? !strcmp(node->name, db_name)
             : node->name[0] != '_') {
             if (db->db) {
+                struct ovsdb_error *error = NULL;
+
                 VLOG_INFO("compacting %s database by user request",
                           node->name);
 
-                struct ovsdb_error *error = ovsdb_snapshot(db->db,
-                                                           trim_memory);
+                error = ovsdb_snapshot(db->db, trim_memory);
+                if (!error && ovsdb_snapshot_in_progress(db->db)) {
+                    while (ovsdb_snapshot_in_progress(db->db)) {
+                        ovsdb_snapshot_wait(db->db);
+                        poll_block();
+                    }
+                    error = ovsdb_snapshot(db->db, trim_memory);
+                }
+
                 if (error) {
                     char *s = ovsdb_error_to_string(error);
                     ds_put_format(&reply, "%s\n", s);
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
index 91b4a01af..8cbefbe3d 100644
--- a/ovsdb/ovsdb.c
+++ b/ovsdb/ovsdb.c
@@ -25,9 +25,13 @@ 
 #include "file.h"
 #include "monitor.h"
 #include "openvswitch/json.h"
+#include "openvswitch/poll-loop.h"
+#include "ovs-thread.h"
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "ovsdb-types.h"
+#include "row.h"
+#include "seq.h"
 #include "simap.h"
 #include "storage.h"
 #include "table.h"
@@ -461,6 +465,21 @@  ovsdb_destroy(struct ovsdb *db)
     if (db) {
         struct shash_node *node;
 
+        /* Need to wait for compaction thread to finish the work. */
+        while (ovsdb_snapshot_in_progress(db)) {
+            ovsdb_snapshot_wait(db);
+            poll_block();
+        }
+        if (ovsdb_snapshot_ready(db)) {
+            struct ovsdb_error *error = ovsdb_snapshot(db, false);
+
+            if (error) {
+                char *s = ovsdb_error_to_string_free(error);
+                VLOG_INFO("%s: %s", db->name, s);
+                free(s);
+            }
+        }
+
         /* Close the log. */
         ovsdb_storage_close(db->storage);
 
@@ -535,20 +554,119 @@  ovsdb_get_table(const struct ovsdb *db, const char *name)
     return shash_find_data(&db->tables, name);
 }
 
+static struct ovsdb *
+ovsdb_clone_data(const struct ovsdb *db)
+{
+    struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL);
+
+    struct shash_node *node;
+    SHASH_FOR_EACH (node, &db->tables) {
+        struct ovsdb_table *table = node->data;
+        struct ovsdb_table *new_table = shash_find_data(&new->tables,
+                                                        node->name);
+        struct ovsdb_row *row, *new_row;
+
+        hmap_reserve(&new_table->rows, hmap_count(&table->rows));
+        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+            new_row = ovsdb_row_datum_clone(row);
+            hmap_insert(&new_table->rows, &new_row->hmap_node,
+                        ovsdb_row_hash(new_row));
+        }
+    }
+
+    return new;
+}
+
+static void *
+compaction_thread(void *aux)
+{
+    struct ovsdb_compaction_state *state = aux;
+    uint64_t start_time = time_msec();
+    struct json *data;
+
+    VLOG_DBG("%s: Compaction thread started.", state->db->name);
+    data = ovsdb_to_txn_json(state->db, "compacting database online");
+    state->data = json_serialized_object_create(data);
+    json_destroy(data);
+
+    state->thread_time = time_msec() - start_time;
+
+    VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.",
+             state->db->name, state->thread_time);
+    seq_change(state->done);
+    return NULL;
+}
+
+void
+ovsdb_snapshot_wait(struct ovsdb *db)
+{
+    if (db->snap_state) {
+        seq_wait(db->snap_state->done, db->snap_state->seqno);
+    }
+}
+
+bool
+ovsdb_snapshot_in_progress(struct ovsdb *db)
+{
+    return db->snap_state &&
+           seq_read(db->snap_state->done) == db->snap_state->seqno;
+}
+
+bool
+ovsdb_snapshot_ready(struct ovsdb *db)
+{
+    return db->snap_state &&
+           seq_read(db->snap_state->done) != db->snap_state->seqno;
+}
+
 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
 {
-    if (!db->storage) {
+    if (!db->storage || ovsdb_snapshot_in_progress(db)) {
         return NULL;
     }
 
+    uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage);
     uint64_t elapsed, start_time = time_msec();
-    struct json *schema = ovsdb_schema_to_json(db->schema);
-    struct json *data = ovsdb_to_txn_json(db, "compacting database online");
-    struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage,
-                                                             schema, data);
-    json_destroy(schema);
-    json_destroy(data);
+    struct ovsdb_compaction_state *state;
+
+    if (!applied_index) {
+        /* Parallel compaction is not supported for standalone databases. */
+        state = xzalloc(sizeof *state);
+        state->data = ovsdb_to_txn_json(db, "compacting database online");
+        state->schema = ovsdb_schema_to_json(db->schema);
+    } else if (ovsdb_snapshot_ready(db)) {
+        xpthread_join(db->snap_state->thread, NULL);
+
+        state = db->snap_state;
+        db->snap_state = NULL;
+
+        ovsdb_destroy(state->db);
+        seq_destroy(state->done);
+    } else {
+        /* Creating a thread. */
+        ovs_assert(!db->snap_state);
+        state = xzalloc(sizeof *state);
+
+        state->db = ovsdb_clone_data(db);
+        state->schema = ovsdb_schema_to_json(db->schema);
+        state->applied_index = applied_index;
+        state->done = seq_create();
+        state->seqno = seq_read(state->done);
+        state->thread = ovs_thread_create("compaction",
+                                          compaction_thread, state);
+        state->init_time = time_msec() - start_time;
+
+        db->snap_state = state;
+        return NULL;
+    }
+
+    struct ovsdb_error *error;
+
+    error = ovsdb_storage_store_snapshot(db->storage, state->schema,
+                                         state->data, state->applied_index);
+    json_destroy(state->schema);
+    json_destroy(state->data);
 
 #if HAVE_DECL_MALLOC_TRIM
     if (!error && trim_memory) {
@@ -557,10 +675,13 @@  ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
 #endif
 
     elapsed = time_msec() - start_time;
-    if (elapsed > 1000) {
-        VLOG_INFO("%s: Database compaction took %"PRIu64"ms",
-                  db->name, elapsed);
-    }
+    VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
+         "%s: Database compaction took %"PRIu64"ms "
+         "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
+         db->name, elapsed + state->init_time,
+         state->init_time, elapsed, state->thread_time);
+
+    free(state);
     return error;
 }
 
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index ec2d235ec..2f77821e0 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -72,6 +72,24 @@  struct ovsdb_txn_history_node {
     struct ovsdb_txn *txn;
 };
 
+struct ovsdb_compaction_state {
+    pthread_t thread;          /* Thread handle. */
+
+    struct ovsdb *db;          /* Copy of a database data to compact. */
+
+    struct json *data;         /* 'db' as a serialized json. */
+    struct json *schema;       /* 'db' schema json. */
+    uint64_t applied_index;    /* Last applied index reported by the storage
+                                * at the moment of a database copy. */
+
+    /* Completion signaling. */
+    struct seq *done;
+    uint64_t seqno;
+
+    uint64_t init_time;        /* Time spent by the main thread preparing. */
+    uint64_t thread_time;      /* Time spent for compaction by the thread. */
+};
+
 struct ovsdb {
     char *name;
     struct ovsdb_schema *schema;
@@ -101,6 +119,9 @@  struct ovsdb {
     struct ovs_list txn_forward_new;
     /* Hash map for transactions that are already sent and waits for reply. */
     struct hmap txn_forward_sent;
+
+    /* Database compaction. */
+    struct ovsdb_compaction_state *snap_state;
 };
 
 struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
@@ -124,6 +145,9 @@  struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
 
 struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory)
     OVS_WARN_UNUSED_RESULT;
+void ovsdb_snapshot_wait(struct ovsdb *);
+bool ovsdb_snapshot_in_progress(struct ovsdb *);
+bool ovsdb_snapshot_ready(struct ovsdb *);
 
 void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src);
 
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index 856d083f2..b2c21e70f 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -4295,7 +4295,8 @@  raft_notify_snapshot_recommended(struct raft *raft)
  * only valuable to call it if raft_get_log_length() is significant and
  * especially if raft_grew_lots() returns true. */
 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
-raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
+raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data,
+                    uint64_t applied_index)
 {
     if (raft->joining) {
         return ovsdb_error(NULL,
@@ -4311,11 +4312,12 @@  raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
                            "cannot store a snapshot following failure");
     }
 
-    if (raft->last_applied < raft->log_start) {
+    uint64_t new_log_start = applied_index ? applied_index + 1
+                                           : raft->last_applied + 1;
+    if (new_log_start <= raft->log_start) {
         return ovsdb_error(NULL, "not storing a duplicate snapshot");
     }
 
-    uint64_t new_log_start = raft->last_applied + 1;
     struct raft_entry new_snapshot = {
         .term = raft_get_term(raft, new_log_start - 1),
         .eid = *raft_get_eid(raft, new_log_start - 1),
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 599bc0ae8..403ed3dd7 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -180,7 +180,8 @@  uint64_t raft_get_log_length(const struct raft *);
 bool raft_may_snapshot(const struct raft *);
 void raft_notify_snapshot_recommended(struct raft *);
 struct ovsdb_error *raft_store_snapshot(struct raft *,
-                                        const struct json *new_snapshot)
+                                        const struct json *new_snapshot,
+                                        uint64_t applied_index)
     OVS_WARN_UNUSED_RESULT;
 
 /* Cluster management. */
diff --git a/ovsdb/row.c b/ovsdb/row.c
index fd50c7e7b..3f0bb8acf 100644
--- a/ovsdb/row.c
+++ b/ovsdb/row.c
@@ -155,6 +155,23 @@  ovsdb_row_clone(const struct ovsdb_row *old)
     return new;
 }
 
+struct ovsdb_row *
+ovsdb_row_datum_clone(const struct ovsdb_row *old)
+{
+    const struct ovsdb_table *table = old->table;
+    const struct shash_node *node;
+    struct ovsdb_row *new;
+
+    new = allocate_row(table);
+    SHASH_FOR_EACH (node, &table->schema->columns) {
+        const struct ovsdb_column *column = node->data;
+        ovsdb_datum_clone(&new->fields[column->index],
+                          &old->fields[column->index]);
+    }
+    return new;
+}
+
+
 /* The caller is responsible for ensuring that 'row' has been removed from its
  * table and that it is not participating in a transaction. */
 void
diff --git a/ovsdb/row.h b/ovsdb/row.h
index 4d3c17afc..ff91288fe 100644
--- a/ovsdb/row.h
+++ b/ovsdb/row.h
@@ -93,6 +93,7 @@  void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *);
 
 struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
 struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *);
+struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *);
 void ovsdb_row_destroy(struct ovsdb_row *);
 
 uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *,
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
index d4984be25..e8f95ce64 100644
--- a/ovsdb/storage.c
+++ b/ovsdb/storage.c
@@ -576,7 +576,7 @@  ovsdb_storage_should_snapshot(struct ovsdb_storage *storage)
 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
                                const struct json *schema,
-                               const struct json *data)
+                               const struct json *data, uint64_t index)
 {
     if (storage->raft) {
         struct json *entries = json_array_create_empty();
@@ -587,7 +587,7 @@  ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
             json_array_add(entries, json_clone(data));
         }
         struct ovsdb_error *error = raft_store_snapshot(storage->raft,
-                                                        entries);
+                                                        entries, index);
         json_destroy(entries);
         return error;
     } else if (storage->log) {
@@ -611,10 +611,11 @@  ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
                              const struct json *schema,
-                             const struct json *data)
+                             const struct json *data, uint64_t index)
 {
     struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
-                                                               schema, data);
+                                                               schema, data,
+                                                               index);
     bool retry_quickly = error != NULL;
     schedule_next_snapshot(storage, retry_quickly);
     return error;
@@ -638,7 +639,7 @@  ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
                                           prereq, &result);
         json_destroy(txn_json);
     } else if (storage->log) {
-        w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
+        w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0);
     } else {
         /* When 'error' and 'command' are both null, it indicates that the
          * command is complete.  This is fine since this unbacked storage drops
diff --git a/ovsdb/storage.h b/ovsdb/storage.h
index ff026b77f..a1fdaa564 100644
--- a/ovsdb/storage.h
+++ b/ovsdb/storage.h
@@ -79,7 +79,8 @@  void ovsdb_write_destroy(struct ovsdb_write *);
 bool ovsdb_storage_should_snapshot(struct ovsdb_storage *);
 struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
                                                  const struct json *schema,
-                                                 const struct json *snapshot)
+                                                 const struct json *snapshot,
+                                                 uint64_t applied_index)
     OVS_WARN_UNUSED_RESULT;
 
 struct ovsdb_write *ovsdb_storage_write_schema_change(