Message ID | 20180101051640.13043-1-blp@ovn.org |
---|---|
State | Accepted |
Delegated to: | Justin Pettit |
Headers | show |
Series | [ovs-dev,01/15] log: Add async commit support. | expand |
I read through lib/seq.c to learn how this patch works. Looks good to me, but I feel not very confident. Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com> On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote: > The OVSDB log code has always had the ability to commit the log to disk and > wait for the commit to finish. This patch introduces a new feature that > allows the client to start a commit in the background and then to determine > asynchronously that the commit has completed. This will be especially > useful later for the distributed database feature. > > Signed-off-by: Ben Pfaff <blp@ovn.org> > --- > ovsdb/file.c | 4 +- > ovsdb/log.c | 152 ++++++++++++++++++++++++++++++ > +++++++++++++++++++++-- > ovsdb/log.h | 7 ++- > ovsdb/ovsdb-tool.c | 2 +- > tests/test-ovsdb.c | 2 +- > 5 files changed, 158 insertions(+), 9 deletions(-) > > diff --git a/ovsdb/file.c b/ovsdb/file.c > index 90c2b9d20a9a..fdd5f8e35a44 100644 > --- a/ovsdb/file.c > +++ b/ovsdb/file.c > @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file) > > /* Commit the old version, so that we can be assured that we'll > eventually > * have either the old or the new version. */ > - error = ovsdb_log_commit(file->log); > + error = ovsdb_log_commit_block(file->log); > if (error) { > goto exit; > } > @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char > *comment, > } > > if (durable) { > - error = ovsdb_log_commit(log); > + error = ovsdb_log_commit_block(log); > if (error) { > return ovsdb_wrap_error(error, "committing transaction > failed"); > } > diff --git a/ovsdb/log.c b/ovsdb/log.c > index 0f8dafa30a8f..cc4bc2c6243e 100644 > --- a/ovsdb/log.c > +++ b/ovsdb/log.c > @@ -24,12 +24,17 @@ > #include <sys/stat.h> > #include <unistd.h> > > +#include "lockfile.h" > #include "openvswitch/dynamic-string.h" > #include "openvswitch/json.h" > #include "openvswitch/vlog.h" > -#include "lockfile.h" > -#include "ovsdb.h" > +#include "ovs-atomic.h" > +#include "ovs-rcu.h" > +#include "ovs-thread.h" > #include "ovsdb-error.h" > +#include "ovsdb.h" > +#include "openvswitch/poll-loop.h" > +#include "seq.h" > #include "sha1.h" > #include "socket-util.h" > #include "transaction.h" > @@ -78,6 +83,7 @@ struct ovsdb_log { > struct lockfile *lockfile; > FILE *stream; > off_t base; > + struct afsync *afsync; > }; > > /* Whether the OS supports renaming open files. > @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char > **magicp, > uint8_t sha1[SHA1_DIGEST_SIZE]); > static bool is_magic_ok(const char *needle, const char *haystack); > > +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); > +static uint64_t afsync_destroy(struct afsync *); > + > /* Attempts to open 'name' with the specified 'open_mode'. On success, > stores > * the new log into '*filep' and returns NULL; otherwise returns NULL and > * stores NULL into '*filep'. > @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, > file->prev_offset = 0; > file->offset = 0; > file->base = 0; > + file->afsync = NULL; > *filep = file; > return NULL; > > @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) > { > if (file) { > ovsdb_error_destroy(file->error); > + afsync_destroy(file->afsync); > free(file->name); > free(file->display_name); > free(file->magic); > @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct > json *json) > return NULL; > } > > +/* Attempts to commit 'file' to disk. Waits for the commit to succeed or > fail. > + * Returns NULL if successful, otherwise the error that occurred. */ > struct ovsdb_error * > -ovsdb_log_commit(struct ovsdb_log *file) > +ovsdb_log_commit_block(struct ovsdb_log *file) > { > if (file->stream && fsync(fileno(file->stream))) { > return ovsdb_io_error(errno, "%s: fsync failed", > file->display_name); > @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) > struct ovsdb_error * OVS_WARN_UNUSED_RESULT > ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) > { > - struct ovsdb_error *error = ovsdb_log_commit(new); > + struct ovsdb_error *error = ovsdb_log_commit_block(new); > if (error) { > ovsdb_log_replace_abort(new); > return error; > @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, > struct ovsdb_log *new) > ovsdb_error_destroy(old->error); > old->error = NULL; > /* prev_offset only matters for OVSDB_LOG_READ. */ > + if (old->afsync) { > + uint64_t ticket = afsync_destroy(old->afsync); > + old->afsync = afsync_create(fileno(old->stream), ticket + 1); > + } > old->offset = new->offset; > /* Keep old->name. */ > free(old->magic); > @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) > { > rename_open_files = false; > } > + > +struct afsync { > + pthread_t thread; > + atomic_uint64_t cur, next; > + struct seq *request, *complete; > + int fd; > +}; > + > +static void * > +afsync_thread(void *afsync_) > +{ > + struct afsync *afsync = afsync_; > + uint64_t cur = 0; > + for (;;) { > + ovsrcu_quiesce_start(); > + > + uint64_t request_seq = seq_read(afsync->request); > + > + uint64_t next; > + atomic_read_explicit(&afsync->next, &next, memory_order_acquire); > + if (next == UINT64_MAX) { > + break; > + } > + > + if (cur != next && afsync->fd != -1) { > + int error = fsync(afsync->fd) ? errno : 0; > + if (!error) { > + cur = next; > + atomic_store_explicit(&afsync->cur, cur, > memory_order_release); > + seq_change(afsync->complete); > + } else { > + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); > + } > + } > + > + seq_wait(afsync->request, request_seq); > + poll_block(); > + } > + return NULL; > +} > + > +static struct afsync * > +afsync_create(int fd, uint64_t initial_ticket) > +{ > + struct afsync *afsync = xzalloc(sizeof *afsync); > + atomic_init(&afsync->cur, initial_ticket); > + atomic_init(&afsync->next, initial_ticket); > + afsync->request = seq_create(); > + afsync->complete = seq_create(); > + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, > afsync); > + afsync->fd = fd; > + return afsync; > +} > + > +static uint64_t > +afsync_destroy(struct afsync *afsync) > +{ > + if (!afsync) { > + return 0; > + } > + > + uint64_t next; > + atomic_read(&afsync->next, &next); > + atomic_store(&afsync->next, UINT64_MAX); > + seq_change(afsync->request); > + xpthread_join(afsync->thread, NULL); > + > + seq_destroy(afsync->request); > + seq_destroy(afsync->complete); > + > + free(afsync); > + > + return next; > +} > + > +static struct afsync * > +ovsdb_log_get_afsync(struct ovsdb_log *log) > +{ > + if (!log->afsync) { > + log->afsync = afsync_create(log->stream ? fileno(log->stream) : > -1, 0); > + } > + return log->afsync; > +} > + > +/* Starts committing 'log' to disk. Returns a ticket that can be passed > to > + * ovsdb_log_commit_wait() or compared against the return value of > + * ovsdb_log_commit_progress() later. */ > +uint64_t > +ovsdb_log_commit_start(struct ovsdb_log *log) > +{ > + struct afsync *afsync = ovsdb_log_get_afsync(log); > + > + uint64_t orig; > + atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel); > + > + seq_change(afsync->request); > + > + return orig + 1; > +} > + > +/* Returns a ticket value that represents the current progress of commits > to > + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X > and any > + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then > commit > + * X is complete if and only if X <= Y. */ > +uint64_t > +ovsdb_log_commit_progress(struct ovsdb_log *log) > +{ > + struct afsync *afsync = ovsdb_log_get_afsync(log); > + uint64_t cur; > + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); > + return cur; > +} > + > +/* Causes poll_block() to wake up if and when > ovsdb_log_commit_progress(log) > + * would return at least 'goal'. */ > +void > +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) > +{ > + struct afsync *afsync = ovsdb_log_get_afsync(log); > + uint64_t complete = seq_read(afsync->complete); > + uint64_t cur = ovsdb_log_commit_progress(log); > + if (cur < goal) { > + seq_wait(afsync->complete, complete); > + } else { > + poll_immediate_wake(); > + } > +} > diff --git a/ovsdb/log.h b/ovsdb/log.h > index 18900fa50f44..bd0396f27ea8 100644 > --- a/ovsdb/log.h > +++ b/ovsdb/log.h > @@ -35,6 +35,7 @@ > * that compacting is advised. > */ > > +#include <stdint.h> > #include <sys/types.h> > #include "compiler.h" > > @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, > const char *magic, > > struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json > *) > OVS_WARN_UNUSED_RESULT; > -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *) > + > +uint64_t ovsdb_log_commit_start(struct ovsdb_log *); > +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *); > +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t); > +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *) > OVS_WARN_UNUSED_RESULT; > > void ovsdb_log_mark_base(struct ovsdb_log *); > diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c > index 4343e3ce5b22..cec64152f079 100644 > --- a/ovsdb/ovsdb-tool.c > +++ b/ovsdb/ovsdb-tool.c > @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx) > check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC, > OVSDB_LOG_CREATE_EXCL, -1, &log)); > check_ovsdb_error(ovsdb_log_write(log, json)); > - check_ovsdb_error(ovsdb_log_commit(log)); > + check_ovsdb_error(ovsdb_log_commit_block(log)); > ovsdb_log_close(log); > > json_destroy(json); > diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c > index 6b2cde863aba..c0c5a4df51af 100644 > --- a/tests/test-ovsdb.c > +++ b/tests/test-ovsdb.c > @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx) > error = ovsdb_log_write(target, json); > json_destroy(json); > } else if (!strcmp(command, "commit")) { > - error = ovsdb_log_commit(target); > + error = ovsdb_log_commit_block(target); > } else if (!strcmp(command, "replace_start")) { > ovs_assert(!replacement); > error = ovsdb_log_replace_start(log, &replacement); > -- > 2.10.2 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev >
Thanks for the review. The "seq" code introduces kind of a weird concept. I might have invented it, not sure, but it's probably some kind of distortion of a more standard idea. I'd like it to be easy for people to understand. Are there any questions that I could help to answer? Maybe I could answer them directly, or maybe I could update the documentation in seq.h (which tries to be comprehensive but maybe it isn't?). On Wed, Jan 10, 2018 at 04:04:53PM -0800, Yifeng Sun wrote: > I read through lib/seq.c to learn how this patch works. > Looks good to me, but I feel not very confident. > > Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com> > > On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote: > > > The OVSDB log code has always had the ability to commit the log to disk and > > wait for the commit to finish. This patch introduces a new feature that > > allows the client to start a commit in the background and then to determine > > asynchronously that the commit has completed. This will be especially > > useful later for the distributed database feature. > > > > Signed-off-by: Ben Pfaff <blp@ovn.org> > > --- > > ovsdb/file.c | 4 +- > > ovsdb/log.c | 152 ++++++++++++++++++++++++++++++ > > +++++++++++++++++++++-- > > ovsdb/log.h | 7 ++- > > ovsdb/ovsdb-tool.c | 2 +- > > tests/test-ovsdb.c | 2 +- > > 5 files changed, 158 insertions(+), 9 deletions(-) > > > > diff --git a/ovsdb/file.c b/ovsdb/file.c > > index 90c2b9d20a9a..fdd5f8e35a44 100644 > > --- a/ovsdb/file.c > > +++ b/ovsdb/file.c > > @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file) > > > > /* Commit the old version, so that we can be assured that we'll > > eventually > > * have either the old or the new version. */ > > - error = ovsdb_log_commit(file->log); > > + error = ovsdb_log_commit_block(file->log); > > if (error) { > > goto exit; > > } > > @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char > > *comment, > > } > > > > if (durable) { > > - error = ovsdb_log_commit(log); > > + error = ovsdb_log_commit_block(log); > > if (error) { > > return ovsdb_wrap_error(error, "committing transaction > > failed"); > > } > > diff --git a/ovsdb/log.c b/ovsdb/log.c > > index 0f8dafa30a8f..cc4bc2c6243e 100644 > > --- a/ovsdb/log.c > > +++ b/ovsdb/log.c > > @@ -24,12 +24,17 @@ > > #include <sys/stat.h> > > #include <unistd.h> > > > > +#include "lockfile.h" > > #include "openvswitch/dynamic-string.h" > > #include "openvswitch/json.h" > > #include "openvswitch/vlog.h" > > -#include "lockfile.h" > > -#include "ovsdb.h" > > +#include "ovs-atomic.h" > > +#include "ovs-rcu.h" > > +#include "ovs-thread.h" > > #include "ovsdb-error.h" > > +#include "ovsdb.h" > > +#include "openvswitch/poll-loop.h" > > +#include "seq.h" > > #include "sha1.h" > > #include "socket-util.h" > > #include "transaction.h" > > @@ -78,6 +83,7 @@ struct ovsdb_log { > > struct lockfile *lockfile; > > FILE *stream; > > off_t base; > > + struct afsync *afsync; > > }; > > > > /* Whether the OS supports renaming open files. > > @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char > > **magicp, > > uint8_t sha1[SHA1_DIGEST_SIZE]); > > static bool is_magic_ok(const char *needle, const char *haystack); > > > > +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); > > +static uint64_t afsync_destroy(struct afsync *); > > + > > /* Attempts to open 'name' with the specified 'open_mode'. On success, > > stores > > * the new log into '*filep' and returns NULL; otherwise returns NULL and > > * stores NULL into '*filep'. > > @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, > > file->prev_offset = 0; > > file->offset = 0; > > file->base = 0; > > + file->afsync = NULL; > > *filep = file; > > return NULL; > > > > @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) > > { > > if (file) { > > ovsdb_error_destroy(file->error); > > + afsync_destroy(file->afsync); > > free(file->name); > > free(file->display_name); > > free(file->magic); > > @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct > > json *json) > > return NULL; > > } > > > > +/* Attempts to commit 'file' to disk. Waits for the commit to succeed or > > fail. > > + * Returns NULL if successful, otherwise the error that occurred. */ > > struct ovsdb_error * > > -ovsdb_log_commit(struct ovsdb_log *file) > > +ovsdb_log_commit_block(struct ovsdb_log *file) > > { > > if (file->stream && fsync(fileno(file->stream))) { > > return ovsdb_io_error(errno, "%s: fsync failed", > > file->display_name); > > @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) > > struct ovsdb_error * OVS_WARN_UNUSED_RESULT > > ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) > > { > > - struct ovsdb_error *error = ovsdb_log_commit(new); > > + struct ovsdb_error *error = ovsdb_log_commit_block(new); > > if (error) { > > ovsdb_log_replace_abort(new); > > return error; > > @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, > > struct ovsdb_log *new) > > ovsdb_error_destroy(old->error); > > old->error = NULL; > > /* prev_offset only matters for OVSDB_LOG_READ. */ > > + if (old->afsync) { > > + uint64_t ticket = afsync_destroy(old->afsync); > > + old->afsync = afsync_create(fileno(old->stream), ticket + 1); > > + } > > old->offset = new->offset; > > /* Keep old->name. */ > > free(old->magic); > > @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) > > { > > rename_open_files = false; > > } > > + > > +struct afsync { > > + pthread_t thread; > > + atomic_uint64_t cur, next; > > + struct seq *request, *complete; > > + int fd; > > +}; > > + > > +static void * > > +afsync_thread(void *afsync_) > > +{ > > + struct afsync *afsync = afsync_; > > + uint64_t cur = 0; > > + for (;;) { > > + ovsrcu_quiesce_start(); > > + > > + uint64_t request_seq = seq_read(afsync->request); > > + > > + uint64_t next; > > + atomic_read_explicit(&afsync->next, &next, memory_order_acquire); > > + if (next == UINT64_MAX) { > > + break; > > + } > > + > > + if (cur != next && afsync->fd != -1) { > > + int error = fsync(afsync->fd) ? errno : 0; > > + if (!error) { > > + cur = next; > > + atomic_store_explicit(&afsync->cur, cur, > > memory_order_release); > > + seq_change(afsync->complete); > > + } else { > > + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); > > + } > > + } > > + > > + seq_wait(afsync->request, request_seq); > > + poll_block(); > > + } > > + return NULL; > > +} > > + > > +static struct afsync * > > +afsync_create(int fd, uint64_t initial_ticket) > > +{ > > + struct afsync *afsync = xzalloc(sizeof *afsync); > > + atomic_init(&afsync->cur, initial_ticket); > > + atomic_init(&afsync->next, initial_ticket); > > + afsync->request = seq_create(); > > + afsync->complete = seq_create(); > > + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, > > afsync); > > + afsync->fd = fd; > > + return afsync; > > +} > > + > > +static uint64_t > > +afsync_destroy(struct afsync *afsync) > > +{ > > + if (!afsync) { > > + return 0; > > + } > > + > > + uint64_t next; > > + atomic_read(&afsync->next, &next); > > + atomic_store(&afsync->next, UINT64_MAX); > > + seq_change(afsync->request); > > + xpthread_join(afsync->thread, NULL); > > + > > + seq_destroy(afsync->request); > > + seq_destroy(afsync->complete); > > + > > + free(afsync); > > + > > + return next; > > +} > > + > > +static struct afsync * > > +ovsdb_log_get_afsync(struct ovsdb_log *log) > > +{ > > + if (!log->afsync) { > > + log->afsync = afsync_create(log->stream ? fileno(log->stream) : > > -1, 0); > > + } > > + return log->afsync; > > +} > > + > > +/* Starts committing 'log' to disk. Returns a ticket that can be passed > > to > > + * ovsdb_log_commit_wait() or compared against the return value of > > + * ovsdb_log_commit_progress() later. */ > > +uint64_t > > +ovsdb_log_commit_start(struct ovsdb_log *log) > > +{ > > + struct afsync *afsync = ovsdb_log_get_afsync(log); > > + > > + uint64_t orig; > > + atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel); > > + > > + seq_change(afsync->request); > > + > > + return orig + 1; > > +} > > + > > +/* Returns a ticket value that represents the current progress of commits > > to > > + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X > > and any > > + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then > > commit > > + * X is complete if and only if X <= Y. */ > > +uint64_t > > +ovsdb_log_commit_progress(struct ovsdb_log *log) > > +{ > > + struct afsync *afsync = ovsdb_log_get_afsync(log); > > + uint64_t cur; > > + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); > > + return cur; > > +} > > + > > +/* Causes poll_block() to wake up if and when > > ovsdb_log_commit_progress(log) > > + * would return at least 'goal'. */ > > +void > > +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) > > +{ > > + struct afsync *afsync = ovsdb_log_get_afsync(log); > > + uint64_t complete = seq_read(afsync->complete); > > + uint64_t cur = ovsdb_log_commit_progress(log); > > + if (cur < goal) { > > + seq_wait(afsync->complete, complete); > > + } else { > > + poll_immediate_wake(); > > + } > > +} > > diff --git a/ovsdb/log.h b/ovsdb/log.h > > index 18900fa50f44..bd0396f27ea8 100644 > > --- a/ovsdb/log.h > > +++ b/ovsdb/log.h > > @@ -35,6 +35,7 @@ > > * that compacting is advised. > > */ > > > > +#include <stdint.h> > > #include <sys/types.h> > > #include "compiler.h" > > > > @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, > > const char *magic, > > > > struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json > > *) > > OVS_WARN_UNUSED_RESULT; > > -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *) > > + > > +uint64_t ovsdb_log_commit_start(struct ovsdb_log *); > > +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *); > > +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t); > > +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *) > > OVS_WARN_UNUSED_RESULT; > > > > void ovsdb_log_mark_base(struct ovsdb_log *); > > diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c > > index 4343e3ce5b22..cec64152f079 100644 > > --- a/ovsdb/ovsdb-tool.c > > +++ b/ovsdb/ovsdb-tool.c > > @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx) > > check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC, > > OVSDB_LOG_CREATE_EXCL, -1, &log)); > > check_ovsdb_error(ovsdb_log_write(log, json)); > > - check_ovsdb_error(ovsdb_log_commit(log)); > > + check_ovsdb_error(ovsdb_log_commit_block(log)); > > ovsdb_log_close(log); > > > > json_destroy(json); > > diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c > > index 6b2cde863aba..c0c5a4df51af 100644 > > --- a/tests/test-ovsdb.c > > +++ b/tests/test-ovsdb.c > > @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx) > > error = ovsdb_log_write(target, json); > > json_destroy(json); > > } else if (!strcmp(command, "commit")) { > > - error = ovsdb_log_commit(target); > > + error = ovsdb_log_commit_block(target); > > } else if (!strcmp(command, "replace_start")) { > > ovs_assert(!replacement); > > error = ovsdb_log_replace_start(log, &replacement); > > -- > > 2.10.2 > > > > _______________________________________________ > > dev mailing list > > dev@openvswitch.org > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > >
After further study, I understand the "seq" code and think it is well-designed. Thank you for this patch and the previous reply. This patch looks good to me. Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com> On Wed, Jan 10, 2018 at 4:21 PM, Ben Pfaff <blp@ovn.org> wrote: > Thanks for the review. > > The "seq" code introduces kind of a weird concept. I might have > invented it, not sure, but it's probably some kind of distortion of a > more standard idea. I'd like it to be easy for people to understand. > Are there any questions that I could help to answer? Maybe I could > answer them directly, or maybe I could update the documentation in seq.h > (which tries to be comprehensive but maybe it isn't?). > > On Wed, Jan 10, 2018 at 04:04:53PM -0800, Yifeng Sun wrote: > > I read through lib/seq.c to learn how this patch works. > > Looks good to me, but I feel not very confident. > > > > Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com> > > > > On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote: > > > > > The OVSDB log code has always had the ability to commit the log to > disk and > > > wait for the commit to finish. This patch introduces a new feature > that > > > allows the client to start a commit in the background and then to > determine > > > asynchronously that the commit has completed. This will be especially > > > useful later for the distributed database feature. > > > > > > Signed-off-by: Ben Pfaff <blp@ovn.org> > > > --- > > > ovsdb/file.c | 4 +- > > > ovsdb/log.c | 152 ++++++++++++++++++++++++++++++ > > > +++++++++++++++++++++-- > > > ovsdb/log.h | 7 ++- > > > ovsdb/ovsdb-tool.c | 2 +- > > > tests/test-ovsdb.c | 2 +- > > > 5 files changed, 158 insertions(+), 9 deletions(-) > > > > > > diff --git a/ovsdb/file.c b/ovsdb/file.c > > > index 90c2b9d20a9a..fdd5f8e35a44 100644 > > > --- a/ovsdb/file.c > > > +++ b/ovsdb/file.c > > > @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file) > > > > > > /* Commit the old version, so that we can be assured that we'll > > > eventually > > > * have either the old or the new version. */ > > > - error = ovsdb_log_commit(file->log); > > > + error = ovsdb_log_commit_block(file->log); > > > if (error) { > > > goto exit; > > > } > > > @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char > > > *comment, > > > } > > > > > > if (durable) { > > > - error = ovsdb_log_commit(log); > > > + error = ovsdb_log_commit_block(log); > > > if (error) { > > > return ovsdb_wrap_error(error, "committing transaction > > > failed"); > > > } > > > diff --git a/ovsdb/log.c b/ovsdb/log.c > > > index 0f8dafa30a8f..cc4bc2c6243e 100644 > > > --- a/ovsdb/log.c > > > +++ b/ovsdb/log.c > > > @@ -24,12 +24,17 @@ > > > #include <sys/stat.h> > > > #include <unistd.h> > > > > > > +#include "lockfile.h" > > > #include "openvswitch/dynamic-string.h" > > > #include "openvswitch/json.h" > > > #include "openvswitch/vlog.h" > > > -#include "lockfile.h" > > > -#include "ovsdb.h" > > > +#include "ovs-atomic.h" > > > +#include "ovs-rcu.h" > > > +#include "ovs-thread.h" > > > #include "ovsdb-error.h" > > > +#include "ovsdb.h" > > > +#include "openvswitch/poll-loop.h" > > > +#include "seq.h" > > > #include "sha1.h" > > > #include "socket-util.h" > > > #include "transaction.h" > > > @@ -78,6 +83,7 @@ struct ovsdb_log { > > > struct lockfile *lockfile; > > > FILE *stream; > > > off_t base; > > > + struct afsync *afsync; > > > }; > > > > > > /* Whether the OS supports renaming open files. > > > @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char > > > **magicp, > > > uint8_t sha1[SHA1_DIGEST_SIZE]); > > > static bool is_magic_ok(const char *needle, const char *haystack); > > > > > > +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); > > > +static uint64_t afsync_destroy(struct afsync *); > > > + > > > /* Attempts to open 'name' with the specified 'open_mode'. On > success, > > > stores > > > * the new log into '*filep' and returns NULL; otherwise returns NULL > and > > > * stores NULL into '*filep'. > > > @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, > > > file->prev_offset = 0; > > > file->offset = 0; > > > file->base = 0; > > > + file->afsync = NULL; > > > *filep = file; > > > return NULL; > > > > > > @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) > > > { > > > if (file) { > > > ovsdb_error_destroy(file->error); > > > + afsync_destroy(file->afsync); > > > free(file->name); > > > free(file->display_name); > > > free(file->magic); > > > @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const > struct > > > json *json) > > > return NULL; > > > } > > > > > > +/* Attempts to commit 'file' to disk. Waits for the commit to > succeed or > > > fail. > > > + * Returns NULL if successful, otherwise the error that occurred. */ > > > struct ovsdb_error * > > > -ovsdb_log_commit(struct ovsdb_log *file) > > > +ovsdb_log_commit_block(struct ovsdb_log *file) > > > { > > > if (file->stream && fsync(fileno(file->stream))) { > > > return ovsdb_io_error(errno, "%s: fsync failed", > > > file->display_name); > > > @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) > > > struct ovsdb_error * OVS_WARN_UNUSED_RESULT > > > ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log > *new) > > > { > > > - struct ovsdb_error *error = ovsdb_log_commit(new); > > > + struct ovsdb_error *error = ovsdb_log_commit_block(new); > > > if (error) { > > > ovsdb_log_replace_abort(new); > > > return error; > > > @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, > > > struct ovsdb_log *new) > > > ovsdb_error_destroy(old->error); > > > old->error = NULL; > > > /* prev_offset only matters for OVSDB_LOG_READ. */ > > > + if (old->afsync) { > > > + uint64_t ticket = afsync_destroy(old->afsync); > > > + old->afsync = afsync_create(fileno(old->stream), ticket + 1); > > > + } > > > old->offset = new->offset; > > > /* Keep old->name. */ > > > free(old->magic); > > > @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) > > > { > > > rename_open_files = false; > > > } > > > + > > > +struct afsync { > > > + pthread_t thread; > > > + atomic_uint64_t cur, next; > > > + struct seq *request, *complete; > > > + int fd; > > > +}; > > > + > > > +static void * > > > +afsync_thread(void *afsync_) > > > +{ > > > + struct afsync *afsync = afsync_; > > > + uint64_t cur = 0; > > > + for (;;) { > > > + ovsrcu_quiesce_start(); > > > + > > > + uint64_t request_seq = seq_read(afsync->request); > > > + > > > + uint64_t next; > > > + atomic_read_explicit(&afsync->next, &next, > memory_order_acquire); > > > + if (next == UINT64_MAX) { > > > + break; > > > + } > > > + > > > + if (cur != next && afsync->fd != -1) { > > > + int error = fsync(afsync->fd) ? errno : 0; > > > + if (!error) { > > > + cur = next; > > > + atomic_store_explicit(&afsync->cur, cur, > > > memory_order_release); > > > + seq_change(afsync->complete); > > > + } else { > > > + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); > > > + } > > > + } > > > + > > > + seq_wait(afsync->request, request_seq); > > > + poll_block(); > > > + } > > > + return NULL; > > > +} > > > + > > > +static struct afsync * > > > +afsync_create(int fd, uint64_t initial_ticket) > > > +{ > > > + struct afsync *afsync = xzalloc(sizeof *afsync); > > > + atomic_init(&afsync->cur, initial_ticket); > > > + atomic_init(&afsync->next, initial_ticket); > > > + afsync->request = seq_create(); > > > + afsync->complete = seq_create(); > > > + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, > > > afsync); > > > + afsync->fd = fd; > > > + return afsync; > > > +} > > > + > > > +static uint64_t > > > +afsync_destroy(struct afsync *afsync) > > > +{ > > > + if (!afsync) { > > > + return 0; > > > + } > > > + > > > + uint64_t next; > > > + atomic_read(&afsync->next, &next); > > > + atomic_store(&afsync->next, UINT64_MAX); > > > + seq_change(afsync->request); > > > + xpthread_join(afsync->thread, NULL); > > > + > > > + seq_destroy(afsync->request); > > > + seq_destroy(afsync->complete); > > > + > > > + free(afsync); > > > + > > > + return next; > > > +} > > > + > > > +static struct afsync * > > > +ovsdb_log_get_afsync(struct ovsdb_log *log) > > > +{ > > > + if (!log->afsync) { > > > + log->afsync = afsync_create(log->stream ? fileno(log->stream) > : > > > -1, 0); > > > + } > > > + return log->afsync; > > > +} > > > + > > > +/* Starts committing 'log' to disk. Returns a ticket that can be > passed > > > to > > > + * ovsdb_log_commit_wait() or compared against the return value of > > > + * ovsdb_log_commit_progress() later. */ > > > +uint64_t > > > +ovsdb_log_commit_start(struct ovsdb_log *log) > > > +{ > > > + struct afsync *afsync = ovsdb_log_get_afsync(log); > > > + > > > + uint64_t orig; > > > + atomic_add_explicit(&afsync->next, 1, &orig, > memory_order_acq_rel); > > > + > > > + seq_change(afsync->request); > > > + > > > + return orig + 1; > > > +} > > > + > > > +/* Returns a ticket value that represents the current progress of > commits > > > to > > > + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns > X > > > and any > > > + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. > Then > > > commit > > > + * X is complete if and only if X <= Y. */ > > > +uint64_t > > > +ovsdb_log_commit_progress(struct ovsdb_log *log) > > > +{ > > > + struct afsync *afsync = ovsdb_log_get_afsync(log); > > > + uint64_t cur; > > > + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); > > > + return cur; > > > +} > > > + > > > +/* Causes poll_block() to wake up if and when > > > ovsdb_log_commit_progress(log) > > > + * would return at least 'goal'. */ > > > +void > > > +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) > > > +{ > > > + struct afsync *afsync = ovsdb_log_get_afsync(log); > > > + uint64_t complete = seq_read(afsync->complete); > > > + uint64_t cur = ovsdb_log_commit_progress(log); > > > + if (cur < goal) { > > > + seq_wait(afsync->complete, complete); > > > + } else { > > > + poll_immediate_wake(); > > > + } > > > +} > > > diff --git a/ovsdb/log.h b/ovsdb/log.h > > > index 18900fa50f44..bd0396f27ea8 100644 > > > --- a/ovsdb/log.h > > > +++ b/ovsdb/log.h > > > @@ -35,6 +35,7 @@ > > > * that compacting is advised. > > > */ > > > > > > +#include <stdint.h> > > > #include <sys/types.h> > > > #include "compiler.h" > > > > > > @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, > > > const char *magic, > > > > > > struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct > json > > > *) > > > OVS_WARN_UNUSED_RESULT; > > > -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *) > > > + > > > +uint64_t ovsdb_log_commit_start(struct ovsdb_log *); > > > +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *); > > > +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t); > > > +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *) > > > OVS_WARN_UNUSED_RESULT; > > > > > > void ovsdb_log_mark_base(struct ovsdb_log *); > > > diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c > > > index 4343e3ce5b22..cec64152f079 100644 > > > --- a/ovsdb/ovsdb-tool.c > > > +++ b/ovsdb/ovsdb-tool.c > > > @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx) > > > check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC, > > > OVSDB_LOG_CREATE_EXCL, -1, > &log)); > > > check_ovsdb_error(ovsdb_log_write(log, json)); > > > - check_ovsdb_error(ovsdb_log_commit(log)); > > > + check_ovsdb_error(ovsdb_log_commit_block(log)); > > > ovsdb_log_close(log); > > > > > > json_destroy(json); > > > diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c > > > index 6b2cde863aba..c0c5a4df51af 100644 > > > --- a/tests/test-ovsdb.c > > > +++ b/tests/test-ovsdb.c > > > @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx) > > > error = ovsdb_log_write(target, json); > > > json_destroy(json); > > > } else if (!strcmp(command, "commit")) { > > > - error = ovsdb_log_commit(target); > > > + error = ovsdb_log_commit_block(target); > > > } else if (!strcmp(command, "replace_start")) { > > > ovs_assert(!replacement); > > > error = ovsdb_log_replace_start(log, &replacement); > > > -- > > > 2.10.2 > > > > > > _______________________________________________ > > > dev mailing list > > > dev@openvswitch.org > > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > > > >
diff --git a/ovsdb/file.c b/ovsdb/file.c index 90c2b9d20a9a..fdd5f8e35a44 100644 --- a/ovsdb/file.c +++ b/ovsdb/file.c @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file) /* Commit the old version, so that we can be assured that we'll eventually * have either the old or the new version. */ - error = ovsdb_log_commit(file->log); + error = ovsdb_log_commit_block(file->log); if (error) { goto exit; } @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char *comment, } if (durable) { - error = ovsdb_log_commit(log); + error = ovsdb_log_commit_block(log); if (error) { return ovsdb_wrap_error(error, "committing transaction failed"); } diff --git a/ovsdb/log.c b/ovsdb/log.c index 0f8dafa30a8f..cc4bc2c6243e 100644 --- a/ovsdb/log.c +++ b/ovsdb/log.c @@ -24,12 +24,17 @@ #include <sys/stat.h> #include <unistd.h> +#include "lockfile.h" #include "openvswitch/dynamic-string.h" #include "openvswitch/json.h" #include "openvswitch/vlog.h" -#include "lockfile.h" -#include "ovsdb.h" +#include "ovs-atomic.h" +#include "ovs-rcu.h" +#include "ovs-thread.h" #include "ovsdb-error.h" +#include "ovsdb.h" +#include "openvswitch/poll-loop.h" +#include "seq.h" #include "sha1.h" #include "socket-util.h" #include "transaction.h" @@ -78,6 +83,7 @@ struct ovsdb_log { struct lockfile *lockfile; FILE *stream; off_t base; + struct afsync *afsync; }; /* Whether the OS supports renaming open files. @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char **magicp, uint8_t sha1[SHA1_DIGEST_SIZE]); static bool is_magic_ok(const char *needle, const char *haystack); +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); +static uint64_t afsync_destroy(struct afsync *); + /* Attempts to open 'name' with the specified 'open_mode'. On success, stores * the new log into '*filep' and returns NULL; otherwise returns NULL and * stores NULL into '*filep'. @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, file->prev_offset = 0; file->offset = 0; file->base = 0; + file->afsync = NULL; *filep = file; return NULL; @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) { if (file) { ovsdb_error_destroy(file->error); + afsync_destroy(file->afsync); free(file->name); free(file->display_name); free(file->magic); @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct json *json) return NULL; } +/* Attempts to commit 'file' to disk. Waits for the commit to succeed or fail. + * Returns NULL if successful, otherwise the error that occurred. */ struct ovsdb_error * -ovsdb_log_commit(struct ovsdb_log *file) +ovsdb_log_commit_block(struct ovsdb_log *file) { if (file->stream && fsync(fileno(file->stream))) { return ovsdb_io_error(errno, "%s: fsync failed", file->display_name); @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) { - struct ovsdb_error *error = ovsdb_log_commit(new); + struct ovsdb_error *error = ovsdb_log_commit_block(new); if (error) { ovsdb_log_replace_abort(new); return error; @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) ovsdb_error_destroy(old->error); old->error = NULL; /* prev_offset only matters for OVSDB_LOG_READ. */ + if (old->afsync) { + uint64_t ticket = afsync_destroy(old->afsync); + old->afsync = afsync_create(fileno(old->stream), ticket + 1); + } old->offset = new->offset; /* Keep old->name. */ free(old->magic); @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) { rename_open_files = false; } + +struct afsync { + pthread_t thread; + atomic_uint64_t cur, next; + struct seq *request, *complete; + int fd; +}; + +static void * +afsync_thread(void *afsync_) +{ + struct afsync *afsync = afsync_; + uint64_t cur = 0; + for (;;) { + ovsrcu_quiesce_start(); + + uint64_t request_seq = seq_read(afsync->request); + + uint64_t next; + atomic_read_explicit(&afsync->next, &next, memory_order_acquire); + if (next == UINT64_MAX) { + break; + } + + if (cur != next && afsync->fd != -1) { + int error = fsync(afsync->fd) ? errno : 0; + if (!error) { + cur = next; + atomic_store_explicit(&afsync->cur, cur, memory_order_release); + seq_change(afsync->complete); + } else { + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); + } + } + + seq_wait(afsync->request, request_seq); + poll_block(); + } + return NULL; +} + +static struct afsync * +afsync_create(int fd, uint64_t initial_ticket) +{ + struct afsync *afsync = xzalloc(sizeof *afsync); + atomic_init(&afsync->cur, initial_ticket); + atomic_init(&afsync->next, initial_ticket); + afsync->request = seq_create(); + afsync->complete = seq_create(); + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync); + afsync->fd = fd; + return afsync; +} + +static uint64_t +afsync_destroy(struct afsync *afsync) +{ + if (!afsync) { + return 0; + } + + uint64_t next; + atomic_read(&afsync->next, &next); + atomic_store(&afsync->next, UINT64_MAX); + seq_change(afsync->request); + xpthread_join(afsync->thread, NULL); + + seq_destroy(afsync->request); + seq_destroy(afsync->complete); + + free(afsync); + + return next; +} + +static struct afsync * +ovsdb_log_get_afsync(struct ovsdb_log *log) +{ + if (!log->afsync) { + log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0); + } + return log->afsync; +} + +/* Starts committing 'log' to disk. Returns a ticket that can be passed to + * ovsdb_log_commit_wait() or compared against the return value of + * ovsdb_log_commit_progress() later. */ +uint64_t +ovsdb_log_commit_start(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + + uint64_t orig; + atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel); + + seq_change(afsync->request); + + return orig + 1; +} + +/* Returns a ticket value that represents the current progress of commits to + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X and any + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then commit + * X is complete if and only if X <= Y. */ +uint64_t +ovsdb_log_commit_progress(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t cur; + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); + return cur; +} + +/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log) + * would return at least 'goal'. */ +void +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t complete = seq_read(afsync->complete); + uint64_t cur = ovsdb_log_commit_progress(log); + if (cur < goal) { + seq_wait(afsync->complete, complete); + } else { + poll_immediate_wake(); + } +} diff --git a/ovsdb/log.h b/ovsdb/log.h index 18900fa50f44..bd0396f27ea8 100644 --- a/ovsdb/log.h +++ b/ovsdb/log.h @@ -35,6 +35,7 @@ * that compacting is advised. */ +#include <stdint.h> #include <sys/types.h> #include "compiler.h" @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, const char *magic, struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json *) OVS_WARN_UNUSED_RESULT; -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *) + +uint64_t ovsdb_log_commit_start(struct ovsdb_log *); +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *); +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t); +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *) OVS_WARN_UNUSED_RESULT; void ovsdb_log_mark_base(struct ovsdb_log *); diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c index 4343e3ce5b22..cec64152f079 100644 --- a/ovsdb/ovsdb-tool.c +++ b/ovsdb/ovsdb-tool.c @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx) check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC, OVSDB_LOG_CREATE_EXCL, -1, &log)); check_ovsdb_error(ovsdb_log_write(log, json)); - check_ovsdb_error(ovsdb_log_commit(log)); + check_ovsdb_error(ovsdb_log_commit_block(log)); ovsdb_log_close(log); json_destroy(json); diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c index 6b2cde863aba..c0c5a4df51af 100644 --- a/tests/test-ovsdb.c +++ b/tests/test-ovsdb.c @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx) error = ovsdb_log_write(target, json); json_destroy(json); } else if (!strcmp(command, "commit")) { - error = ovsdb_log_commit(target); + error = ovsdb_log_commit_block(target); } else if (!strcmp(command, "replace_start")) { ovs_assert(!replacement); error = ovsdb_log_replace_start(log, &replacement);
The OVSDB log code has always had the ability to commit the log to disk and wait for the commit to finish. This patch introduces a new feature that allows the client to start a commit in the background and then to determine asynchronously that the commit has completed. This will be especially useful later for the distributed database feature. Signed-off-by: Ben Pfaff <blp@ovn.org> --- ovsdb/file.c | 4 +- ovsdb/log.c | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++-- ovsdb/log.h | 7 ++- ovsdb/ovsdb-tool.c | 2 +- tests/test-ovsdb.c | 2 +- 5 files changed, 158 insertions(+), 9 deletions(-)