From patchwork Mon Jan 1 05:16:26 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Ben Pfaff X-Patchwork-Id: 854284 X-Patchwork-Delegate: jpettit@nicira.com Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (mailfrom) smtp.mailfrom=openvswitch.org (client-ip=140.211.169.12; helo=mail.linuxfoundation.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Received: from mail.linuxfoundation.org (mail.linuxfoundation.org [140.211.169.12]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 3z955y0bfSz9t84 for ; Mon, 1 Jan 2018 16:16:53 +1100 (AEDT) Received: from mail.linux-foundation.org (localhost [127.0.0.1]) by mail.linuxfoundation.org (Postfix) with ESMTP id 8AFBABC6; Mon, 1 Jan 2018 05:16:50 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@mail.linuxfoundation.org Received: from smtp1.linuxfoundation.org (smtp1.linux-foundation.org [172.17.192.35]) by mail.linuxfoundation.org (Postfix) with ESMTPS id 67A40BC3 for ; Mon, 1 Jan 2018 05:16:49 +0000 (UTC) X-Greylist: domain auto-whitelisted by SQLgrey-1.7.6 Received: from relay2-d.mail.gandi.net (relay2-d.mail.gandi.net [217.70.183.194]) by smtp1.linuxfoundation.org (Postfix) with ESMTPS id 6B70114B for ; Mon, 1 Jan 2018 05:16:48 +0000 (UTC) X-Originating-IP: 173.228.112.64 Received: from sigabrt.gateway.sonic.net (173-228-112-64.dsl.dynamic.fusionbroadband.com [173.228.112.64]) (Authenticated sender: blp@ovn.org) by relay2-d.mail.gandi.net (Postfix) with ESMTPSA id CE4FDC5A44; Mon, 1 Jan 2018 06:16:45 +0100 (CET) From: Ben Pfaff To: dev@openvswitch.org Date: Sun, 31 Dec 2017 21:16:26 -0800 Message-Id: <20180101051640.13043-1-blp@ovn.org> X-Mailer: git-send-email 2.10.2 X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_LOW autolearn=ham version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on smtp1.linux-foundation.org Cc: Ben Pfaff Subject: [ovs-dev] [PATCH 01/15] log: Add async commit support. X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.12 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , MIME-Version: 1.0 Sender: ovs-dev-bounces@openvswitch.org Errors-To: ovs-dev-bounces@openvswitch.org The OVSDB log code has always had the ability to commit the log to disk and wait for the commit to finish. This patch introduces a new feature that allows the client to start a commit in the background and then to determine asynchronously that the commit has completed. This will be especially useful later for the distributed database feature. Signed-off-by: Ben Pfaff Reviewed-by: Yifeng Sun Reviewed-by: Yifeng Sun --- ovsdb/file.c | 4 +- ovsdb/log.c | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++-- ovsdb/log.h | 7 ++- ovsdb/ovsdb-tool.c | 2 +- tests/test-ovsdb.c | 2 +- 5 files changed, 158 insertions(+), 9 deletions(-) diff --git a/ovsdb/file.c b/ovsdb/file.c index 90c2b9d20a9a..fdd5f8e35a44 100644 --- a/ovsdb/file.c +++ b/ovsdb/file.c @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file) /* Commit the old version, so that we can be assured that we'll eventually * have either the old or the new version. */ - error = ovsdb_log_commit(file->log); + error = ovsdb_log_commit_block(file->log); if (error) { goto exit; } @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char *comment, } if (durable) { - error = ovsdb_log_commit(log); + error = ovsdb_log_commit_block(log); if (error) { return ovsdb_wrap_error(error, "committing transaction failed"); } diff --git a/ovsdb/log.c b/ovsdb/log.c index 0f8dafa30a8f..cc4bc2c6243e 100644 --- a/ovsdb/log.c +++ b/ovsdb/log.c @@ -24,12 +24,17 @@ #include #include +#include "lockfile.h" #include "openvswitch/dynamic-string.h" #include "openvswitch/json.h" #include "openvswitch/vlog.h" -#include "lockfile.h" -#include "ovsdb.h" +#include "ovs-atomic.h" +#include "ovs-rcu.h" +#include "ovs-thread.h" #include "ovsdb-error.h" +#include "ovsdb.h" +#include "openvswitch/poll-loop.h" +#include "seq.h" #include "sha1.h" #include "socket-util.h" #include "transaction.h" @@ -78,6 +83,7 @@ struct ovsdb_log { struct lockfile *lockfile; FILE *stream; off_t base; + struct afsync *afsync; }; /* Whether the OS supports renaming open files. @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char **magicp, uint8_t sha1[SHA1_DIGEST_SIZE]); static bool is_magic_ok(const char *needle, const char *haystack); +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); +static uint64_t afsync_destroy(struct afsync *); + /* Attempts to open 'name' with the specified 'open_mode'. On success, stores * the new log into '*filep' and returns NULL; otherwise returns NULL and * stores NULL into '*filep'. @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, file->prev_offset = 0; file->offset = 0; file->base = 0; + file->afsync = NULL; *filep = file; return NULL; @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) { if (file) { ovsdb_error_destroy(file->error); + afsync_destroy(file->afsync); free(file->name); free(file->display_name); free(file->magic); @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct json *json) return NULL; } +/* Attempts to commit 'file' to disk. Waits for the commit to succeed or fail. + * Returns NULL if successful, otherwise the error that occurred. */ struct ovsdb_error * -ovsdb_log_commit(struct ovsdb_log *file) +ovsdb_log_commit_block(struct ovsdb_log *file) { if (file->stream && fsync(fileno(file->stream))) { return ovsdb_io_error(errno, "%s: fsync failed", file->display_name); @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) { - struct ovsdb_error *error = ovsdb_log_commit(new); + struct ovsdb_error *error = ovsdb_log_commit_block(new); if (error) { ovsdb_log_replace_abort(new); return error; @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) ovsdb_error_destroy(old->error); old->error = NULL; /* prev_offset only matters for OVSDB_LOG_READ. */ + if (old->afsync) { + uint64_t ticket = afsync_destroy(old->afsync); + old->afsync = afsync_create(fileno(old->stream), ticket + 1); + } old->offset = new->offset; /* Keep old->name. */ free(old->magic); @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) { rename_open_files = false; } + +struct afsync { + pthread_t thread; + atomic_uint64_t cur, next; + struct seq *request, *complete; + int fd; +}; + +static void * +afsync_thread(void *afsync_) +{ + struct afsync *afsync = afsync_; + uint64_t cur = 0; + for (;;) { + ovsrcu_quiesce_start(); + + uint64_t request_seq = seq_read(afsync->request); + + uint64_t next; + atomic_read_explicit(&afsync->next, &next, memory_order_acquire); + if (next == UINT64_MAX) { + break; + } + + if (cur != next && afsync->fd != -1) { + int error = fsync(afsync->fd) ? errno : 0; + if (!error) { + cur = next; + atomic_store_explicit(&afsync->cur, cur, memory_order_release); + seq_change(afsync->complete); + } else { + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); + } + } + + seq_wait(afsync->request, request_seq); + poll_block(); + } + return NULL; +} + +static struct afsync * +afsync_create(int fd, uint64_t initial_ticket) +{ + struct afsync *afsync = xzalloc(sizeof *afsync); + atomic_init(&afsync->cur, initial_ticket); + atomic_init(&afsync->next, initial_ticket); + afsync->request = seq_create(); + afsync->complete = seq_create(); + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync); + afsync->fd = fd; + return afsync; +} + +static uint64_t +afsync_destroy(struct afsync *afsync) +{ + if (!afsync) { + return 0; + } + + uint64_t next; + atomic_read(&afsync->next, &next); + atomic_store(&afsync->next, UINT64_MAX); + seq_change(afsync->request); + xpthread_join(afsync->thread, NULL); + + seq_destroy(afsync->request); + seq_destroy(afsync->complete); + + free(afsync); + + return next; +} + +static struct afsync * +ovsdb_log_get_afsync(struct ovsdb_log *log) +{ + if (!log->afsync) { + log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0); + } + return log->afsync; +} + +/* Starts committing 'log' to disk. Returns a ticket that can be passed to + * ovsdb_log_commit_wait() or compared against the return value of + * ovsdb_log_commit_progress() later. */ +uint64_t +ovsdb_log_commit_start(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + + uint64_t orig; + atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel); + + seq_change(afsync->request); + + return orig + 1; +} + +/* Returns a ticket value that represents the current progress of commits to + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X and any + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then commit + * X is complete if and only if X <= Y. */ +uint64_t +ovsdb_log_commit_progress(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t cur; + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); + return cur; +} + +/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log) + * would return at least 'goal'. */ +void +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t complete = seq_read(afsync->complete); + uint64_t cur = ovsdb_log_commit_progress(log); + if (cur < goal) { + seq_wait(afsync->complete, complete); + } else { + poll_immediate_wake(); + } +} diff --git a/ovsdb/log.h b/ovsdb/log.h index 18900fa50f44..bd0396f27ea8 100644 --- a/ovsdb/log.h +++ b/ovsdb/log.h @@ -35,6 +35,7 @@ * that compacting is advised. */ +#include #include #include "compiler.h" @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, const char *magic, struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json *) OVS_WARN_UNUSED_RESULT; -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *) + +uint64_t ovsdb_log_commit_start(struct ovsdb_log *); +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *); +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t); +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *) OVS_WARN_UNUSED_RESULT; void ovsdb_log_mark_base(struct ovsdb_log *); diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c index 4343e3ce5b22..cec64152f079 100644 --- a/ovsdb/ovsdb-tool.c +++ b/ovsdb/ovsdb-tool.c @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx) check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC, OVSDB_LOG_CREATE_EXCL, -1, &log)); check_ovsdb_error(ovsdb_log_write(log, json)); - check_ovsdb_error(ovsdb_log_commit(log)); + check_ovsdb_error(ovsdb_log_commit_block(log)); ovsdb_log_close(log); json_destroy(json); diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c index 6b2cde863aba..c0c5a4df51af 100644 --- a/tests/test-ovsdb.c +++ b/tests/test-ovsdb.c @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx) error = ovsdb_log_write(target, json); json_destroy(json); } else if (!strcmp(command, "commit")) { - error = ovsdb_log_commit(target); + error = ovsdb_log_commit_block(target); } else if (!strcmp(command, "replace_start")) { ovs_assert(!replacement); error = ovsdb_log_replace_start(log, &replacement);