diff mbox series

[ovs-dev,01/15] log: Add async commit support.

Message ID 20180101051640.13043-1-blp@ovn.org
State Accepted
Delegated to: Justin Pettit
Headers show
Series [ovs-dev,01/15] log: Add async commit support. | expand

Commit Message

Ben Pfaff Jan. 1, 2018, 5:16 a.m. UTC
The OVSDB log code has always had the ability to commit the log to disk and
wait for the commit to finish.  This patch introduces a new feature that
allows the client to start a commit in the background and then to determine
asynchronously that the commit has completed.  This will be especially
useful later for the distributed database feature.

Signed-off-by: Ben Pfaff <blp@ovn.org>
---
 ovsdb/file.c       |   4 +-
 ovsdb/log.c        | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 ovsdb/log.h        |   7 ++-
 ovsdb/ovsdb-tool.c |   2 +-
 tests/test-ovsdb.c |   2 +-
 5 files changed, 158 insertions(+), 9 deletions(-)

Comments

Yifeng Sun Jan. 11, 2018, 12:04 a.m. UTC | #1
I read through lib/seq.c to learn how this patch works.
Looks good to me, but I feel not very confident.

Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com>

On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote:

> The OVSDB log code has always had the ability to commit the log to disk and
> wait for the commit to finish.  This patch introduces a new feature that
> allows the client to start a commit in the background and then to determine
> asynchronously that the commit has completed.  This will be especially
> useful later for the distributed database feature.
>
> Signed-off-by: Ben Pfaff <blp@ovn.org>
> ---
>  ovsdb/file.c       |   4 +-
>  ovsdb/log.c        | 152 ++++++++++++++++++++++++++++++
> +++++++++++++++++++++--
>  ovsdb/log.h        |   7 ++-
>  ovsdb/ovsdb-tool.c |   2 +-
>  tests/test-ovsdb.c |   2 +-
>  5 files changed, 158 insertions(+), 9 deletions(-)
>
> diff --git a/ovsdb/file.c b/ovsdb/file.c
> index 90c2b9d20a9a..fdd5f8e35a44 100644
> --- a/ovsdb/file.c
> +++ b/ovsdb/file.c
> @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file)
>
>      /* Commit the old version, so that we can be assured that we'll
> eventually
>       * have either the old or the new version. */
> -    error = ovsdb_log_commit(file->log);
> +    error = ovsdb_log_commit_block(file->log);
>      if (error) {
>          goto exit;
>      }
> @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char
> *comment,
>      }
>
>      if (durable) {
> -        error = ovsdb_log_commit(log);
> +        error = ovsdb_log_commit_block(log);
>          if (error) {
>              return ovsdb_wrap_error(error, "committing transaction
> failed");
>          }
> diff --git a/ovsdb/log.c b/ovsdb/log.c
> index 0f8dafa30a8f..cc4bc2c6243e 100644
> --- a/ovsdb/log.c
> +++ b/ovsdb/log.c
> @@ -24,12 +24,17 @@
>  #include <sys/stat.h>
>  #include <unistd.h>
>
> +#include "lockfile.h"
>  #include "openvswitch/dynamic-string.h"
>  #include "openvswitch/json.h"
>  #include "openvswitch/vlog.h"
> -#include "lockfile.h"
> -#include "ovsdb.h"
> +#include "ovs-atomic.h"
> +#include "ovs-rcu.h"
> +#include "ovs-thread.h"
>  #include "ovsdb-error.h"
> +#include "ovsdb.h"
> +#include "openvswitch/poll-loop.h"
> +#include "seq.h"
>  #include "sha1.h"
>  #include "socket-util.h"
>  #include "transaction.h"
> @@ -78,6 +83,7 @@ struct ovsdb_log {
>      struct lockfile *lockfile;
>      FILE *stream;
>      off_t base;
> +    struct afsync *afsync;
>  };
>
>  /* Whether the OS supports renaming open files.
> @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char
> **magicp,
>                           uint8_t sha1[SHA1_DIGEST_SIZE]);
>  static bool is_magic_ok(const char *needle, const char *haystack);
>
> +static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
> +static uint64_t afsync_destroy(struct afsync *);
> +
>  /* Attempts to open 'name' with the specified 'open_mode'.  On success,
> stores
>   * the new log into '*filep' and returns NULL; otherwise returns NULL and
>   * stores NULL into '*filep'.
> @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic,
>      file->prev_offset = 0;
>      file->offset = 0;
>      file->base = 0;
> +    file->afsync = NULL;
>      *filep = file;
>      return NULL;
>
> @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file)
>  {
>      if (file) {
>          ovsdb_error_destroy(file->error);
> +        afsync_destroy(file->afsync);
>          free(file->name);
>          free(file->display_name);
>          free(file->magic);
> @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct
> json *json)
>      return NULL;
>  }
>
> +/* Attempts to commit 'file' to disk.  Waits for the commit to succeed or
> fail.
> + * Returns NULL if successful, otherwise the error that occurred. */
>  struct ovsdb_error *
> -ovsdb_log_commit(struct ovsdb_log *file)
> +ovsdb_log_commit_block(struct ovsdb_log *file)
>  {
>      if (file->stream && fsync(fileno(file->stream))) {
>          return ovsdb_io_error(errno, "%s: fsync failed",
> file->display_name);
> @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new)
>  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>  ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
>  {
> -    struct ovsdb_error *error = ovsdb_log_commit(new);
> +    struct ovsdb_error *error = ovsdb_log_commit_block(new);
>      if (error) {
>          ovsdb_log_replace_abort(new);
>          return error;
> @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old,
> struct ovsdb_log *new)
>      ovsdb_error_destroy(old->error);
>      old->error = NULL;
>      /* prev_offset only matters for OVSDB_LOG_READ. */
> +    if (old->afsync) {
> +        uint64_t ticket = afsync_destroy(old->afsync);
> +        old->afsync = afsync_create(fileno(old->stream), ticket + 1);
> +    }
>      old->offset = new->offset;
>      /* Keep old->name. */
>      free(old->magic);
> @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void)
>  {
>      rename_open_files = false;
>  }
> +
> +struct afsync {
> +    pthread_t thread;
> +    atomic_uint64_t cur, next;
> +    struct seq *request, *complete;
> +    int fd;
> +};
> +
> +static void *
> +afsync_thread(void *afsync_)
> +{
> +    struct afsync *afsync = afsync_;
> +    uint64_t cur = 0;
> +    for (;;) {
> +        ovsrcu_quiesce_start();
> +
> +        uint64_t request_seq = seq_read(afsync->request);
> +
> +        uint64_t next;
> +        atomic_read_explicit(&afsync->next, &next, memory_order_acquire);
> +        if (next == UINT64_MAX) {
> +            break;
> +        }
> +
> +        if (cur != next && afsync->fd != -1) {
> +            int error = fsync(afsync->fd) ? errno : 0;
> +            if (!error) {
> +                cur = next;
> +                atomic_store_explicit(&afsync->cur, cur,
> memory_order_release);
> +                seq_change(afsync->complete);
> +            } else {
> +                VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
> +            }
> +        }
> +
> +        seq_wait(afsync->request, request_seq);
> +        poll_block();
> +    }
> +    return NULL;
> +}
> +
> +static struct afsync *
> +afsync_create(int fd, uint64_t initial_ticket)
> +{
> +    struct afsync *afsync = xzalloc(sizeof *afsync);
> +    atomic_init(&afsync->cur, initial_ticket);
> +    atomic_init(&afsync->next, initial_ticket);
> +    afsync->request = seq_create();
> +    afsync->complete = seq_create();
> +    afsync->thread = ovs_thread_create("log_fsync", afsync_thread,
> afsync);
> +    afsync->fd = fd;
> +    return afsync;
> +}
> +
> +static uint64_t
> +afsync_destroy(struct afsync *afsync)
> +{
> +    if (!afsync) {
> +        return 0;
> +    }
> +
> +    uint64_t next;
> +    atomic_read(&afsync->next, &next);
> +    atomic_store(&afsync->next, UINT64_MAX);
> +    seq_change(afsync->request);
> +    xpthread_join(afsync->thread, NULL);
> +
> +    seq_destroy(afsync->request);
> +    seq_destroy(afsync->complete);
> +
> +    free(afsync);
> +
> +    return next;
> +}
> +
> +static struct afsync *
> +ovsdb_log_get_afsync(struct ovsdb_log *log)
> +{
> +    if (!log->afsync) {
> +        log->afsync = afsync_create(log->stream ? fileno(log->stream) :
> -1, 0);
> +    }
> +    return log->afsync;
> +}
> +
> +/* Starts committing 'log' to disk.  Returns a ticket that can be passed
> to
> + * ovsdb_log_commit_wait() or compared against the return value of
> + * ovsdb_log_commit_progress() later. */
> +uint64_t
> +ovsdb_log_commit_start(struct ovsdb_log *log)
> +{
> +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> +
> +    uint64_t orig;
> +    atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel);
> +
> +    seq_change(afsync->request);
> +
> +    return orig + 1;
> +}
> +
> +/* Returns a ticket value that represents the current progress of commits
> to
> + * 'log'.  Suppose that some call to ovsdb_log_commit_start() returns X
> and any
> + * call ovsdb_log_commit_progress() returns Y, for the same 'log'.  Then
> commit
> + * X is complete if and only if X <= Y. */
> +uint64_t
> +ovsdb_log_commit_progress(struct ovsdb_log *log)
> +{
> +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> +    uint64_t cur;
> +    atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
> +    return cur;
> +}
> +
> +/* Causes poll_block() to wake up if and when
> ovsdb_log_commit_progress(log)
> + * would return at least 'goal'. */
> +void
> +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
> +{
> +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> +    uint64_t complete = seq_read(afsync->complete);
> +    uint64_t cur = ovsdb_log_commit_progress(log);
> +    if (cur < goal) {
> +        seq_wait(afsync->complete, complete);
> +    } else {
> +        poll_immediate_wake();
> +    }
> +}
> diff --git a/ovsdb/log.h b/ovsdb/log.h
> index 18900fa50f44..bd0396f27ea8 100644
> --- a/ovsdb/log.h
> +++ b/ovsdb/log.h
> @@ -35,6 +35,7 @@
>   * that compacting is advised.
>   */
>
> +#include <stdint.h>
>  #include <sys/types.h>
>  #include "compiler.h"
>
> @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *,
> const char *magic,
>
>  struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json
> *)
>      OVS_WARN_UNUSED_RESULT;
> -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
> +
> +uint64_t ovsdb_log_commit_start(struct ovsdb_log *);
> +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *);
> +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t);
> +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *)
>      OVS_WARN_UNUSED_RESULT;
>
>  void ovsdb_log_mark_base(struct ovsdb_log *);
> diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
> index 4343e3ce5b22..cec64152f079 100644
> --- a/ovsdb/ovsdb-tool.c
> +++ b/ovsdb/ovsdb-tool.c
> @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx)
>      check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC,
>                                       OVSDB_LOG_CREATE_EXCL, -1, &log));
>      check_ovsdb_error(ovsdb_log_write(log, json));
> -    check_ovsdb_error(ovsdb_log_commit(log));
> +    check_ovsdb_error(ovsdb_log_commit_block(log));
>      ovsdb_log_close(log);
>
>      json_destroy(json);
> diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
> index 6b2cde863aba..c0c5a4df51af 100644
> --- a/tests/test-ovsdb.c
> +++ b/tests/test-ovsdb.c
> @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx)
>              error = ovsdb_log_write(target, json);
>              json_destroy(json);
>          } else if (!strcmp(command, "commit")) {
> -            error = ovsdb_log_commit(target);
> +            error = ovsdb_log_commit_block(target);
>          } else if (!strcmp(command, "replace_start")) {
>              ovs_assert(!replacement);
>              error = ovsdb_log_replace_start(log, &replacement);
> --
> 2.10.2
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Ben Pfaff Jan. 11, 2018, 12:21 a.m. UTC | #2
Thanks for the review.

The "seq" code introduces kind of a weird concept.  I might have
invented it, not sure, but it's probably some kind of distortion of a
more standard idea.  I'd like it to be easy for people to understand.
Are there any questions that I could help to answer?  Maybe I could
answer them directly, or maybe I could update the documentation in seq.h
(which tries to be comprehensive but maybe it isn't?).

On Wed, Jan 10, 2018 at 04:04:53PM -0800, Yifeng Sun wrote:
> I read through lib/seq.c to learn how this patch works.
> Looks good to me, but I feel not very confident.
> 
> Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com>
> 
> On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote:
> 
> > The OVSDB log code has always had the ability to commit the log to disk and
> > wait for the commit to finish.  This patch introduces a new feature that
> > allows the client to start a commit in the background and then to determine
> > asynchronously that the commit has completed.  This will be especially
> > useful later for the distributed database feature.
> >
> > Signed-off-by: Ben Pfaff <blp@ovn.org>
> > ---
> >  ovsdb/file.c       |   4 +-
> >  ovsdb/log.c        | 152 ++++++++++++++++++++++++++++++
> > +++++++++++++++++++++--
> >  ovsdb/log.h        |   7 ++-
> >  ovsdb/ovsdb-tool.c |   2 +-
> >  tests/test-ovsdb.c |   2 +-
> >  5 files changed, 158 insertions(+), 9 deletions(-)
> >
> > diff --git a/ovsdb/file.c b/ovsdb/file.c
> > index 90c2b9d20a9a..fdd5f8e35a44 100644
> > --- a/ovsdb/file.c
> > +++ b/ovsdb/file.c
> > @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file)
> >
> >      /* Commit the old version, so that we can be assured that we'll
> > eventually
> >       * have either the old or the new version. */
> > -    error = ovsdb_log_commit(file->log);
> > +    error = ovsdb_log_commit_block(file->log);
> >      if (error) {
> >          goto exit;
> >      }
> > @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char
> > *comment,
> >      }
> >
> >      if (durable) {
> > -        error = ovsdb_log_commit(log);
> > +        error = ovsdb_log_commit_block(log);
> >          if (error) {
> >              return ovsdb_wrap_error(error, "committing transaction
> > failed");
> >          }
> > diff --git a/ovsdb/log.c b/ovsdb/log.c
> > index 0f8dafa30a8f..cc4bc2c6243e 100644
> > --- a/ovsdb/log.c
> > +++ b/ovsdb/log.c
> > @@ -24,12 +24,17 @@
> >  #include <sys/stat.h>
> >  #include <unistd.h>
> >
> > +#include "lockfile.h"
> >  #include "openvswitch/dynamic-string.h"
> >  #include "openvswitch/json.h"
> >  #include "openvswitch/vlog.h"
> > -#include "lockfile.h"
> > -#include "ovsdb.h"
> > +#include "ovs-atomic.h"
> > +#include "ovs-rcu.h"
> > +#include "ovs-thread.h"
> >  #include "ovsdb-error.h"
> > +#include "ovsdb.h"
> > +#include "openvswitch/poll-loop.h"
> > +#include "seq.h"
> >  #include "sha1.h"
> >  #include "socket-util.h"
> >  #include "transaction.h"
> > @@ -78,6 +83,7 @@ struct ovsdb_log {
> >      struct lockfile *lockfile;
> >      FILE *stream;
> >      off_t base;
> > +    struct afsync *afsync;
> >  };
> >
> >  /* Whether the OS supports renaming open files.
> > @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char
> > **magicp,
> >                           uint8_t sha1[SHA1_DIGEST_SIZE]);
> >  static bool is_magic_ok(const char *needle, const char *haystack);
> >
> > +static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
> > +static uint64_t afsync_destroy(struct afsync *);
> > +
> >  /* Attempts to open 'name' with the specified 'open_mode'.  On success,
> > stores
> >   * the new log into '*filep' and returns NULL; otherwise returns NULL and
> >   * stores NULL into '*filep'.
> > @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic,
> >      file->prev_offset = 0;
> >      file->offset = 0;
> >      file->base = 0;
> > +    file->afsync = NULL;
> >      *filep = file;
> >      return NULL;
> >
> > @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file)
> >  {
> >      if (file) {
> >          ovsdb_error_destroy(file->error);
> > +        afsync_destroy(file->afsync);
> >          free(file->name);
> >          free(file->display_name);
> >          free(file->magic);
> > @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct
> > json *json)
> >      return NULL;
> >  }
> >
> > +/* Attempts to commit 'file' to disk.  Waits for the commit to succeed or
> > fail.
> > + * Returns NULL if successful, otherwise the error that occurred. */
> >  struct ovsdb_error *
> > -ovsdb_log_commit(struct ovsdb_log *file)
> > +ovsdb_log_commit_block(struct ovsdb_log *file)
> >  {
> >      if (file->stream && fsync(fileno(file->stream))) {
> >          return ovsdb_io_error(errno, "%s: fsync failed",
> > file->display_name);
> > @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new)
> >  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
> >  ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
> >  {
> > -    struct ovsdb_error *error = ovsdb_log_commit(new);
> > +    struct ovsdb_error *error = ovsdb_log_commit_block(new);
> >      if (error) {
> >          ovsdb_log_replace_abort(new);
> >          return error;
> > @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old,
> > struct ovsdb_log *new)
> >      ovsdb_error_destroy(old->error);
> >      old->error = NULL;
> >      /* prev_offset only matters for OVSDB_LOG_READ. */
> > +    if (old->afsync) {
> > +        uint64_t ticket = afsync_destroy(old->afsync);
> > +        old->afsync = afsync_create(fileno(old->stream), ticket + 1);
> > +    }
> >      old->offset = new->offset;
> >      /* Keep old->name. */
> >      free(old->magic);
> > @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void)
> >  {
> >      rename_open_files = false;
> >  }
> > +
> > +struct afsync {
> > +    pthread_t thread;
> > +    atomic_uint64_t cur, next;
> > +    struct seq *request, *complete;
> > +    int fd;
> > +};
> > +
> > +static void *
> > +afsync_thread(void *afsync_)
> > +{
> > +    struct afsync *afsync = afsync_;
> > +    uint64_t cur = 0;
> > +    for (;;) {
> > +        ovsrcu_quiesce_start();
> > +
> > +        uint64_t request_seq = seq_read(afsync->request);
> > +
> > +        uint64_t next;
> > +        atomic_read_explicit(&afsync->next, &next, memory_order_acquire);
> > +        if (next == UINT64_MAX) {
> > +            break;
> > +        }
> > +
> > +        if (cur != next && afsync->fd != -1) {
> > +            int error = fsync(afsync->fd) ? errno : 0;
> > +            if (!error) {
> > +                cur = next;
> > +                atomic_store_explicit(&afsync->cur, cur,
> > memory_order_release);
> > +                seq_change(afsync->complete);
> > +            } else {
> > +                VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
> > +            }
> > +        }
> > +
> > +        seq_wait(afsync->request, request_seq);
> > +        poll_block();
> > +    }
> > +    return NULL;
> > +}
> > +
> > +static struct afsync *
> > +afsync_create(int fd, uint64_t initial_ticket)
> > +{
> > +    struct afsync *afsync = xzalloc(sizeof *afsync);
> > +    atomic_init(&afsync->cur, initial_ticket);
> > +    atomic_init(&afsync->next, initial_ticket);
> > +    afsync->request = seq_create();
> > +    afsync->complete = seq_create();
> > +    afsync->thread = ovs_thread_create("log_fsync", afsync_thread,
> > afsync);
> > +    afsync->fd = fd;
> > +    return afsync;
> > +}
> > +
> > +static uint64_t
> > +afsync_destroy(struct afsync *afsync)
> > +{
> > +    if (!afsync) {
> > +        return 0;
> > +    }
> > +
> > +    uint64_t next;
> > +    atomic_read(&afsync->next, &next);
> > +    atomic_store(&afsync->next, UINT64_MAX);
> > +    seq_change(afsync->request);
> > +    xpthread_join(afsync->thread, NULL);
> > +
> > +    seq_destroy(afsync->request);
> > +    seq_destroy(afsync->complete);
> > +
> > +    free(afsync);
> > +
> > +    return next;
> > +}
> > +
> > +static struct afsync *
> > +ovsdb_log_get_afsync(struct ovsdb_log *log)
> > +{
> > +    if (!log->afsync) {
> > +        log->afsync = afsync_create(log->stream ? fileno(log->stream) :
> > -1, 0);
> > +    }
> > +    return log->afsync;
> > +}
> > +
> > +/* Starts committing 'log' to disk.  Returns a ticket that can be passed
> > to
> > + * ovsdb_log_commit_wait() or compared against the return value of
> > + * ovsdb_log_commit_progress() later. */
> > +uint64_t
> > +ovsdb_log_commit_start(struct ovsdb_log *log)
> > +{
> > +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> > +
> > +    uint64_t orig;
> > +    atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel);
> > +
> > +    seq_change(afsync->request);
> > +
> > +    return orig + 1;
> > +}
> > +
> > +/* Returns a ticket value that represents the current progress of commits
> > to
> > + * 'log'.  Suppose that some call to ovsdb_log_commit_start() returns X
> > and any
> > + * call ovsdb_log_commit_progress() returns Y, for the same 'log'.  Then
> > commit
> > + * X is complete if and only if X <= Y. */
> > +uint64_t
> > +ovsdb_log_commit_progress(struct ovsdb_log *log)
> > +{
> > +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> > +    uint64_t cur;
> > +    atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
> > +    return cur;
> > +}
> > +
> > +/* Causes poll_block() to wake up if and when
> > ovsdb_log_commit_progress(log)
> > + * would return at least 'goal'. */
> > +void
> > +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
> > +{
> > +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> > +    uint64_t complete = seq_read(afsync->complete);
> > +    uint64_t cur = ovsdb_log_commit_progress(log);
> > +    if (cur < goal) {
> > +        seq_wait(afsync->complete, complete);
> > +    } else {
> > +        poll_immediate_wake();
> > +    }
> > +}
> > diff --git a/ovsdb/log.h b/ovsdb/log.h
> > index 18900fa50f44..bd0396f27ea8 100644
> > --- a/ovsdb/log.h
> > +++ b/ovsdb/log.h
> > @@ -35,6 +35,7 @@
> >   * that compacting is advised.
> >   */
> >
> > +#include <stdint.h>
> >  #include <sys/types.h>
> >  #include "compiler.h"
> >
> > @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *,
> > const char *magic,
> >
> >  struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json
> > *)
> >      OVS_WARN_UNUSED_RESULT;
> > -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
> > +
> > +uint64_t ovsdb_log_commit_start(struct ovsdb_log *);
> > +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *);
> > +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t);
> > +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *)
> >      OVS_WARN_UNUSED_RESULT;
> >
> >  void ovsdb_log_mark_base(struct ovsdb_log *);
> > diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
> > index 4343e3ce5b22..cec64152f079 100644
> > --- a/ovsdb/ovsdb-tool.c
> > +++ b/ovsdb/ovsdb-tool.c
> > @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx)
> >      check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC,
> >                                       OVSDB_LOG_CREATE_EXCL, -1, &log));
> >      check_ovsdb_error(ovsdb_log_write(log, json));
> > -    check_ovsdb_error(ovsdb_log_commit(log));
> > +    check_ovsdb_error(ovsdb_log_commit_block(log));
> >      ovsdb_log_close(log);
> >
> >      json_destroy(json);
> > diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
> > index 6b2cde863aba..c0c5a4df51af 100644
> > --- a/tests/test-ovsdb.c
> > +++ b/tests/test-ovsdb.c
> > @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx)
> >              error = ovsdb_log_write(target, json);
> >              json_destroy(json);
> >          } else if (!strcmp(command, "commit")) {
> > -            error = ovsdb_log_commit(target);
> > +            error = ovsdb_log_commit_block(target);
> >          } else if (!strcmp(command, "replace_start")) {
> >              ovs_assert(!replacement);
> >              error = ovsdb_log_replace_start(log, &replacement);
> > --
> > 2.10.2
> >
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> >
Yifeng Sun Jan. 11, 2018, 10:45 p.m. UTC | #3
After further study, I understand the "seq" code and think it is
well-designed.
Thank you for this patch and the previous reply.

This patch looks good to me.

Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com>

On Wed, Jan 10, 2018 at 4:21 PM, Ben Pfaff <blp@ovn.org> wrote:

> Thanks for the review.
>
> The "seq" code introduces kind of a weird concept.  I might have
> invented it, not sure, but it's probably some kind of distortion of a
> more standard idea.  I'd like it to be easy for people to understand.
> Are there any questions that I could help to answer?  Maybe I could
> answer them directly, or maybe I could update the documentation in seq.h
> (which tries to be comprehensive but maybe it isn't?).
>
> On Wed, Jan 10, 2018 at 04:04:53PM -0800, Yifeng Sun wrote:
> > I read through lib/seq.c to learn how this patch works.
> > Looks good to me, but I feel not very confident.
> >
> > Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com>
> >
> > On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp@ovn.org> wrote:
> >
> > > The OVSDB log code has always had the ability to commit the log to
> disk and
> > > wait for the commit to finish.  This patch introduces a new feature
> that
> > > allows the client to start a commit in the background and then to
> determine
> > > asynchronously that the commit has completed.  This will be especially
> > > useful later for the distributed database feature.
> > >
> > > Signed-off-by: Ben Pfaff <blp@ovn.org>
> > > ---
> > >  ovsdb/file.c       |   4 +-
> > >  ovsdb/log.c        | 152 ++++++++++++++++++++++++++++++
> > > +++++++++++++++++++++--
> > >  ovsdb/log.h        |   7 ++-
> > >  ovsdb/ovsdb-tool.c |   2 +-
> > >  tests/test-ovsdb.c |   2 +-
> > >  5 files changed, 158 insertions(+), 9 deletions(-)
> > >
> > > diff --git a/ovsdb/file.c b/ovsdb/file.c
> > > index 90c2b9d20a9a..fdd5f8e35a44 100644
> > > --- a/ovsdb/file.c
> > > +++ b/ovsdb/file.c
> > > @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file)
> > >
> > >      /* Commit the old version, so that we can be assured that we'll
> > > eventually
> > >       * have either the old or the new version. */
> > > -    error = ovsdb_log_commit(file->log);
> > > +    error = ovsdb_log_commit_block(file->log);
> > >      if (error) {
> > >          goto exit;
> > >      }
> > > @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char
> > > *comment,
> > >      }
> > >
> > >      if (durable) {
> > > -        error = ovsdb_log_commit(log);
> > > +        error = ovsdb_log_commit_block(log);
> > >          if (error) {
> > >              return ovsdb_wrap_error(error, "committing transaction
> > > failed");
> > >          }
> > > diff --git a/ovsdb/log.c b/ovsdb/log.c
> > > index 0f8dafa30a8f..cc4bc2c6243e 100644
> > > --- a/ovsdb/log.c
> > > +++ b/ovsdb/log.c
> > > @@ -24,12 +24,17 @@
> > >  #include <sys/stat.h>
> > >  #include <unistd.h>
> > >
> > > +#include "lockfile.h"
> > >  #include "openvswitch/dynamic-string.h"
> > >  #include "openvswitch/json.h"
> > >  #include "openvswitch/vlog.h"
> > > -#include "lockfile.h"
> > > -#include "ovsdb.h"
> > > +#include "ovs-atomic.h"
> > > +#include "ovs-rcu.h"
> > > +#include "ovs-thread.h"
> > >  #include "ovsdb-error.h"
> > > +#include "ovsdb.h"
> > > +#include "openvswitch/poll-loop.h"
> > > +#include "seq.h"
> > >  #include "sha1.h"
> > >  #include "socket-util.h"
> > >  #include "transaction.h"
> > > @@ -78,6 +83,7 @@ struct ovsdb_log {
> > >      struct lockfile *lockfile;
> > >      FILE *stream;
> > >      off_t base;
> > > +    struct afsync *afsync;
> > >  };
> > >
> > >  /* Whether the OS supports renaming open files.
> > > @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char
> > > **magicp,
> > >                           uint8_t sha1[SHA1_DIGEST_SIZE]);
> > >  static bool is_magic_ok(const char *needle, const char *haystack);
> > >
> > > +static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
> > > +static uint64_t afsync_destroy(struct afsync *);
> > > +
> > >  /* Attempts to open 'name' with the specified 'open_mode'.  On
> success,
> > > stores
> > >   * the new log into '*filep' and returns NULL; otherwise returns NULL
> and
> > >   * stores NULL into '*filep'.
> > > @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic,
> > >      file->prev_offset = 0;
> > >      file->offset = 0;
> > >      file->base = 0;
> > > +    file->afsync = NULL;
> > >      *filep = file;
> > >      return NULL;
> > >
> > > @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file)
> > >  {
> > >      if (file) {
> > >          ovsdb_error_destroy(file->error);
> > > +        afsync_destroy(file->afsync);
> > >          free(file->name);
> > >          free(file->display_name);
> > >          free(file->magic);
> > > @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const
> struct
> > > json *json)
> > >      return NULL;
> > >  }
> > >
> > > +/* Attempts to commit 'file' to disk.  Waits for the commit to
> succeed or
> > > fail.
> > > + * Returns NULL if successful, otherwise the error that occurred. */
> > >  struct ovsdb_error *
> > > -ovsdb_log_commit(struct ovsdb_log *file)
> > > +ovsdb_log_commit_block(struct ovsdb_log *file)
> > >  {
> > >      if (file->stream && fsync(fileno(file->stream))) {
> > >          return ovsdb_io_error(errno, "%s: fsync failed",
> > > file->display_name);
> > > @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new)
> > >  struct ovsdb_error * OVS_WARN_UNUSED_RESULT
> > >  ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log
> *new)
> > >  {
> > > -    struct ovsdb_error *error = ovsdb_log_commit(new);
> > > +    struct ovsdb_error *error = ovsdb_log_commit_block(new);
> > >      if (error) {
> > >          ovsdb_log_replace_abort(new);
> > >          return error;
> > > @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old,
> > > struct ovsdb_log *new)
> > >      ovsdb_error_destroy(old->error);
> > >      old->error = NULL;
> > >      /* prev_offset only matters for OVSDB_LOG_READ. */
> > > +    if (old->afsync) {
> > > +        uint64_t ticket = afsync_destroy(old->afsync);
> > > +        old->afsync = afsync_create(fileno(old->stream), ticket + 1);
> > > +    }
> > >      old->offset = new->offset;
> > >      /* Keep old->name. */
> > >      free(old->magic);
> > > @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void)
> > >  {
> > >      rename_open_files = false;
> > >  }
> > > +
> > > +struct afsync {
> > > +    pthread_t thread;
> > > +    atomic_uint64_t cur, next;
> > > +    struct seq *request, *complete;
> > > +    int fd;
> > > +};
> > > +
> > > +static void *
> > > +afsync_thread(void *afsync_)
> > > +{
> > > +    struct afsync *afsync = afsync_;
> > > +    uint64_t cur = 0;
> > > +    for (;;) {
> > > +        ovsrcu_quiesce_start();
> > > +
> > > +        uint64_t request_seq = seq_read(afsync->request);
> > > +
> > > +        uint64_t next;
> > > +        atomic_read_explicit(&afsync->next, &next,
> memory_order_acquire);
> > > +        if (next == UINT64_MAX) {
> > > +            break;
> > > +        }
> > > +
> > > +        if (cur != next && afsync->fd != -1) {
> > > +            int error = fsync(afsync->fd) ? errno : 0;
> > > +            if (!error) {
> > > +                cur = next;
> > > +                atomic_store_explicit(&afsync->cur, cur,
> > > memory_order_release);
> > > +                seq_change(afsync->complete);
> > > +            } else {
> > > +                VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
> > > +            }
> > > +        }
> > > +
> > > +        seq_wait(afsync->request, request_seq);
> > > +        poll_block();
> > > +    }
> > > +    return NULL;
> > > +}
> > > +
> > > +static struct afsync *
> > > +afsync_create(int fd, uint64_t initial_ticket)
> > > +{
> > > +    struct afsync *afsync = xzalloc(sizeof *afsync);
> > > +    atomic_init(&afsync->cur, initial_ticket);
> > > +    atomic_init(&afsync->next, initial_ticket);
> > > +    afsync->request = seq_create();
> > > +    afsync->complete = seq_create();
> > > +    afsync->thread = ovs_thread_create("log_fsync", afsync_thread,
> > > afsync);
> > > +    afsync->fd = fd;
> > > +    return afsync;
> > > +}
> > > +
> > > +static uint64_t
> > > +afsync_destroy(struct afsync *afsync)
> > > +{
> > > +    if (!afsync) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    uint64_t next;
> > > +    atomic_read(&afsync->next, &next);
> > > +    atomic_store(&afsync->next, UINT64_MAX);
> > > +    seq_change(afsync->request);
> > > +    xpthread_join(afsync->thread, NULL);
> > > +
> > > +    seq_destroy(afsync->request);
> > > +    seq_destroy(afsync->complete);
> > > +
> > > +    free(afsync);
> > > +
> > > +    return next;
> > > +}
> > > +
> > > +static struct afsync *
> > > +ovsdb_log_get_afsync(struct ovsdb_log *log)
> > > +{
> > > +    if (!log->afsync) {
> > > +        log->afsync = afsync_create(log->stream ? fileno(log->stream)
> :
> > > -1, 0);
> > > +    }
> > > +    return log->afsync;
> > > +}
> > > +
> > > +/* Starts committing 'log' to disk.  Returns a ticket that can be
> passed
> > > to
> > > + * ovsdb_log_commit_wait() or compared against the return value of
> > > + * ovsdb_log_commit_progress() later. */
> > > +uint64_t
> > > +ovsdb_log_commit_start(struct ovsdb_log *log)
> > > +{
> > > +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> > > +
> > > +    uint64_t orig;
> > > +    atomic_add_explicit(&afsync->next, 1, &orig,
> memory_order_acq_rel);
> > > +
> > > +    seq_change(afsync->request);
> > > +
> > > +    return orig + 1;
> > > +}
> > > +
> > > +/* Returns a ticket value that represents the current progress of
> commits
> > > to
> > > + * 'log'.  Suppose that some call to ovsdb_log_commit_start() returns
> X
> > > and any
> > > + * call ovsdb_log_commit_progress() returns Y, for the same 'log'.
> Then
> > > commit
> > > + * X is complete if and only if X <= Y. */
> > > +uint64_t
> > > +ovsdb_log_commit_progress(struct ovsdb_log *log)
> > > +{
> > > +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> > > +    uint64_t cur;
> > > +    atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
> > > +    return cur;
> > > +}
> > > +
> > > +/* Causes poll_block() to wake up if and when
> > > ovsdb_log_commit_progress(log)
> > > + * would return at least 'goal'. */
> > > +void
> > > +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
> > > +{
> > > +    struct afsync *afsync = ovsdb_log_get_afsync(log);
> > > +    uint64_t complete = seq_read(afsync->complete);
> > > +    uint64_t cur = ovsdb_log_commit_progress(log);
> > > +    if (cur < goal) {
> > > +        seq_wait(afsync->complete, complete);
> > > +    } else {
> > > +        poll_immediate_wake();
> > > +    }
> > > +}
> > > diff --git a/ovsdb/log.h b/ovsdb/log.h
> > > index 18900fa50f44..bd0396f27ea8 100644
> > > --- a/ovsdb/log.h
> > > +++ b/ovsdb/log.h
> > > @@ -35,6 +35,7 @@
> > >   * that compacting is advised.
> > >   */
> > >
> > > +#include <stdint.h>
> > >  #include <sys/types.h>
> > >  #include "compiler.h"
> > >
> > > @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *,
> > > const char *magic,
> > >
> > >  struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct
> json
> > > *)
> > >      OVS_WARN_UNUSED_RESULT;
> > > -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
> > > +
> > > +uint64_t ovsdb_log_commit_start(struct ovsdb_log *);
> > > +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *);
> > > +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t);
> > > +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *)
> > >      OVS_WARN_UNUSED_RESULT;
> > >
> > >  void ovsdb_log_mark_base(struct ovsdb_log *);
> > > diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
> > > index 4343e3ce5b22..cec64152f079 100644
> > > --- a/ovsdb/ovsdb-tool.c
> > > +++ b/ovsdb/ovsdb-tool.c
> > > @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx)
> > >      check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC,
> > >                                       OVSDB_LOG_CREATE_EXCL, -1,
> &log));
> > >      check_ovsdb_error(ovsdb_log_write(log, json));
> > > -    check_ovsdb_error(ovsdb_log_commit(log));
> > > +    check_ovsdb_error(ovsdb_log_commit_block(log));
> > >      ovsdb_log_close(log);
> > >
> > >      json_destroy(json);
> > > diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
> > > index 6b2cde863aba..c0c5a4df51af 100644
> > > --- a/tests/test-ovsdb.c
> > > +++ b/tests/test-ovsdb.c
> > > @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx)
> > >              error = ovsdb_log_write(target, json);
> > >              json_destroy(json);
> > >          } else if (!strcmp(command, "commit")) {
> > > -            error = ovsdb_log_commit(target);
> > > +            error = ovsdb_log_commit_block(target);
> > >          } else if (!strcmp(command, "replace_start")) {
> > >              ovs_assert(!replacement);
> > >              error = ovsdb_log_replace_start(log, &replacement);
> > > --
> > > 2.10.2
> > >
> > > _______________________________________________
> > > dev mailing list
> > > dev@openvswitch.org
> > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> > >
>
diff mbox series

Patch

diff --git a/ovsdb/file.c b/ovsdb/file.c
index 90c2b9d20a9a..fdd5f8e35a44 100644
--- a/ovsdb/file.c
+++ b/ovsdb/file.c
@@ -661,7 +661,7 @@  ovsdb_file_compact(struct ovsdb_file *file)
 
     /* Commit the old version, so that we can be assured that we'll eventually
      * have either the old or the new version. */
-    error = ovsdb_log_commit(file->log);
+    error = ovsdb_log_commit_block(file->log);
     if (error) {
         goto exit;
     }
@@ -857,7 +857,7 @@  ovsdb_file_txn_commit(struct json *json, const char *comment,
     }
 
     if (durable) {
-        error = ovsdb_log_commit(log);
+        error = ovsdb_log_commit_block(log);
         if (error) {
             return ovsdb_wrap_error(error, "committing transaction failed");
         }
diff --git a/ovsdb/log.c b/ovsdb/log.c
index 0f8dafa30a8f..cc4bc2c6243e 100644
--- a/ovsdb/log.c
+++ b/ovsdb/log.c
@@ -24,12 +24,17 @@ 
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "lockfile.h"
 #include "openvswitch/dynamic-string.h"
 #include "openvswitch/json.h"
 #include "openvswitch/vlog.h"
-#include "lockfile.h"
-#include "ovsdb.h"
+#include "ovs-atomic.h"
+#include "ovs-rcu.h"
+#include "ovs-thread.h"
 #include "ovsdb-error.h"
+#include "ovsdb.h"
+#include "openvswitch/poll-loop.h"
+#include "seq.h"
 #include "sha1.h"
 #include "socket-util.h"
 #include "transaction.h"
@@ -78,6 +83,7 @@  struct ovsdb_log {
     struct lockfile *lockfile;
     FILE *stream;
     off_t base;
+    struct afsync *afsync;
 };
 
 /* Whether the OS supports renaming open files.
@@ -95,6 +101,9 @@  static bool parse_header(char *header, const char **magicp,
                          uint8_t sha1[SHA1_DIGEST_SIZE]);
 static bool is_magic_ok(const char *needle, const char *haystack);
 
+static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
+static uint64_t afsync_destroy(struct afsync *);
+
 /* Attempts to open 'name' with the specified 'open_mode'.  On success, stores
  * the new log into '*filep' and returns NULL; otherwise returns NULL and
  * stores NULL into '*filep'.
@@ -269,6 +278,7 @@  ovsdb_log_open(const char *name, const char *magic,
     file->prev_offset = 0;
     file->offset = 0;
     file->base = 0;
+    file->afsync = NULL;
     *filep = file;
     return NULL;
 
@@ -308,6 +318,7 @@  ovsdb_log_close(struct ovsdb_log *file)
 {
     if (file) {
         ovsdb_error_destroy(file->error);
+        afsync_destroy(file->afsync);
         free(file->name);
         free(file->display_name);
         free(file->magic);
@@ -634,8 +645,10 @@  ovsdb_log_write(struct ovsdb_log *file, const struct json *json)
     return NULL;
 }
 
+/* Attempts to commit 'file' to disk.  Waits for the commit to succeed or fail.
+ * Returns NULL if successful, otherwise the error that occurred. */
 struct ovsdb_error *
-ovsdb_log_commit(struct ovsdb_log *file)
+ovsdb_log_commit_block(struct ovsdb_log *file)
 {
     if (file->stream && fsync(fileno(file->stream))) {
         return ovsdb_io_error(errno, "%s: fsync failed", file->display_name);
@@ -740,7 +753,7 @@  ovsdb_rename(const char *old, const char *new)
 struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
 {
-    struct ovsdb_error *error = ovsdb_log_commit(new);
+    struct ovsdb_error *error = ovsdb_log_commit_block(new);
     if (error) {
         ovsdb_log_replace_abort(new);
         return error;
@@ -812,6 +825,10 @@  ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
     ovsdb_error_destroy(old->error);
     old->error = NULL;
     /* prev_offset only matters for OVSDB_LOG_READ. */
+    if (old->afsync) {
+        uint64_t ticket = afsync_destroy(old->afsync);
+        old->afsync = afsync_create(fileno(old->stream), ticket + 1);
+    }
     old->offset = new->offset;
     /* Keep old->name. */
     free(old->magic);
@@ -844,3 +861,130 @@  ovsdb_log_disable_renaming_open_files(void)
 {
     rename_open_files = false;
 }
+
+struct afsync {
+    pthread_t thread;
+    atomic_uint64_t cur, next;
+    struct seq *request, *complete;
+    int fd;
+};
+
+static void *
+afsync_thread(void *afsync_)
+{
+    struct afsync *afsync = afsync_;
+    uint64_t cur = 0;
+    for (;;) {
+        ovsrcu_quiesce_start();
+
+        uint64_t request_seq = seq_read(afsync->request);
+
+        uint64_t next;
+        atomic_read_explicit(&afsync->next, &next, memory_order_acquire);
+        if (next == UINT64_MAX) {
+            break;
+        }
+
+        if (cur != next && afsync->fd != -1) {
+            int error = fsync(afsync->fd) ? errno : 0;
+            if (!error) {
+                cur = next;
+                atomic_store_explicit(&afsync->cur, cur, memory_order_release);
+                seq_change(afsync->complete);
+            } else {
+                VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
+            }
+        }
+
+        seq_wait(afsync->request, request_seq);
+        poll_block();
+    }
+    return NULL;
+}
+
+static struct afsync *
+afsync_create(int fd, uint64_t initial_ticket)
+{
+    struct afsync *afsync = xzalloc(sizeof *afsync);
+    atomic_init(&afsync->cur, initial_ticket);
+    atomic_init(&afsync->next, initial_ticket);
+    afsync->request = seq_create();
+    afsync->complete = seq_create();
+    afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync);
+    afsync->fd = fd;
+    return afsync;
+}
+
+static uint64_t
+afsync_destroy(struct afsync *afsync)
+{
+    if (!afsync) {
+        return 0;
+    }
+
+    uint64_t next;
+    atomic_read(&afsync->next, &next);
+    atomic_store(&afsync->next, UINT64_MAX);
+    seq_change(afsync->request);
+    xpthread_join(afsync->thread, NULL);
+
+    seq_destroy(afsync->request);
+    seq_destroy(afsync->complete);
+
+    free(afsync);
+
+    return next;
+}
+
+static struct afsync *
+ovsdb_log_get_afsync(struct ovsdb_log *log)
+{
+    if (!log->afsync) {
+        log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0);
+    }
+    return log->afsync;
+}
+
+/* Starts committing 'log' to disk.  Returns a ticket that can be passed to
+ * ovsdb_log_commit_wait() or compared against the return value of
+ * ovsdb_log_commit_progress() later. */
+uint64_t
+ovsdb_log_commit_start(struct ovsdb_log *log)
+{
+    struct afsync *afsync = ovsdb_log_get_afsync(log);
+
+    uint64_t orig;
+    atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel);
+
+    seq_change(afsync->request);
+
+    return orig + 1;
+}
+
+/* Returns a ticket value that represents the current progress of commits to
+ * 'log'.  Suppose that some call to ovsdb_log_commit_start() returns X and any
+ * call ovsdb_log_commit_progress() returns Y, for the same 'log'.  Then commit
+ * X is complete if and only if X <= Y. */
+uint64_t
+ovsdb_log_commit_progress(struct ovsdb_log *log)
+{
+    struct afsync *afsync = ovsdb_log_get_afsync(log);
+    uint64_t cur;
+    atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
+    return cur;
+}
+
+/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log)
+ * would return at least 'goal'. */
+void
+ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
+{
+    struct afsync *afsync = ovsdb_log_get_afsync(log);
+    uint64_t complete = seq_read(afsync->complete);
+    uint64_t cur = ovsdb_log_commit_progress(log);
+    if (cur < goal) {
+        seq_wait(afsync->complete, complete);
+    } else {
+        poll_immediate_wake();
+    }
+}
diff --git a/ovsdb/log.h b/ovsdb/log.h
index 18900fa50f44..bd0396f27ea8 100644
--- a/ovsdb/log.h
+++ b/ovsdb/log.h
@@ -35,6 +35,7 @@ 
  * that compacting is advised.
  */
 
+#include <stdint.h>
 #include <sys/types.h>
 #include "compiler.h"
 
@@ -70,7 +71,11 @@  void ovsdb_log_compose_record(const struct json *, const char *magic,
 
 struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json *)
     OVS_WARN_UNUSED_RESULT;
-struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
+
+uint64_t ovsdb_log_commit_start(struct ovsdb_log *);
+uint64_t ovsdb_log_commit_progress(struct ovsdb_log *);
+void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t);
+struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *)
     OVS_WARN_UNUSED_RESULT;
 
 void ovsdb_log_mark_base(struct ovsdb_log *);
diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
index 4343e3ce5b22..cec64152f079 100644
--- a/ovsdb/ovsdb-tool.c
+++ b/ovsdb/ovsdb-tool.c
@@ -222,7 +222,7 @@  do_create(struct ovs_cmdl_context *ctx)
     check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC,
                                      OVSDB_LOG_CREATE_EXCL, -1, &log));
     check_ovsdb_error(ovsdb_log_write(log, json));
-    check_ovsdb_error(ovsdb_log_commit(log));
+    check_ovsdb_error(ovsdb_log_commit_block(log));
     ovsdb_log_close(log);
 
     json_destroy(json);
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index 6b2cde863aba..c0c5a4df51af 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -380,7 +380,7 @@  do_log_io(struct ovs_cmdl_context *ctx)
             error = ovsdb_log_write(target, json);
             json_destroy(json);
         } else if (!strcmp(command, "commit")) {
-            error = ovsdb_log_commit(target);
+            error = ovsdb_log_commit_block(target);
         } else if (!strcmp(command, "replace_start")) {
             ovs_assert(!replacement);
             error = ovsdb_log_replace_start(log, &replacement);