diff mbox series

[ovs-dev,07/13] log: New functions for replacing a log's contents.

Message ID 20171208001240.25829-8-blp@ovn.org
State Accepted
Headers show
Series OVSDB log enhancements | expand

Commit Message

Ben Pfaff Dec. 8, 2017, 12:12 a.m. UTC
These functions will acquire users in future commits.

This new functionality made the internal state machine of the log hard
enough to understand that I thought that it was best to make it explicit
with a 'state' variable, so this commit introduces one.

This commit duplicates code and logic from ovsdb_rename() and
ovsdb_compact() in ovsdb/file.c.  A future commit removes this duplication.

Signed-off-by: Ben Pfaff <blp@ovn.org>
---
 ovsdb/log.c        | 294 ++++++++++++++++++++++++++++++++++++++++++++++++-----
 ovsdb/log.h        |  14 +++
 tests/ovsdb-log.at |  74 ++++++++++++++
 tests/test-ovsdb.c |  58 +++++++++--
 4 files changed, 404 insertions(+), 36 deletions(-)

Comments

Justin Pettit Dec. 23, 2017, 5:39 a.m. UTC | #1
> On Dec 7, 2017, at 4:12 PM, Ben Pfaff <blp@ovn.org> wrote:
> 
> These functions will acquire users in future commits.
> 
> This new functionality made the internal state machine of the log hard
> enough to understand that I thought that it was best to make it explicit
> with a 'state' variable, so this commit introduces one.
> 
> This commit duplicates code and logic from ovsdb_rename() and
> ovsdb_compact() in ovsdb/file.c.  A future commit removes this duplication.
> 
> Signed-off-by: Ben Pfaff <blp@ovn.org>

Acked-by: Justin Pettit <jpettit@ovn.org>

--Justin
Ben Pfaff Dec. 24, 2017, 8:02 p.m. UTC | #2
On Fri, Dec 22, 2017 at 09:39:11PM -0800, Justin Pettit wrote:
> 
> 
> > On Dec 7, 2017, at 4:12 PM, Ben Pfaff <blp@ovn.org> wrote:
> > 
> > These functions will acquire users in future commits.
> > 
> > This new functionality made the internal state machine of the log hard
> > enough to understand that I thought that it was best to make it explicit
> > with a 'state' variable, so this commit introduces one.
> > 
> > This commit duplicates code and logic from ovsdb_rename() and
> > ovsdb_compact() in ovsdb/file.c.  A future commit removes this duplication.
> > 
> > Signed-off-by: Ben Pfaff <blp@ovn.org>
> 
> Acked-by: Justin Pettit <jpettit@ovn.org>

Applied to master, thanks!
diff mbox series

Patch

diff --git a/ovsdb/log.c b/ovsdb/log.c
index 6a1a4484ad34..22cd9fe89599 100644
--- a/ovsdb/log.c
+++ b/ovsdb/log.c
@@ -36,23 +36,57 @@ 
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_log);
 
-enum ovsdb_log_mode {
-    OVSDB_LOG_READ,
-    OVSDB_LOG_WRITE
+/* State in a log's state machine.
+ *
+ * OVSDB_LOG_READ is the initial state for a newly opened log.  Log records may
+ * be read in this state only.  Reaching end of file does not cause a state
+ * transition.  A read error transitions to OVSDB_LOG_READ_ERROR.
+ *
+ * OVSDB_LOG_READ_ERROR prevents further reads from succeeding; they will
+ * report the same error as before.  A log write transitions away to
+ * OVSDB_LOG_WRITE or OVSDB_LOG_WRITE_ERROR.
+ *
+ * OVSDB_LOG_WRITE is the state following a call to ovsdb_log_write(), when all
+ * goes well.  Any state other than OVSDB_LOG_BROKEN may transition to this
+ * state.  A write error transitions to OVSDB_LOG_WRITE_ERROR.
+ *
+ * OVSDB_LOG_WRITE_ERROR is the state following a write error.  Further writes
+ * retry and might transition back to OVSDB_LOG_WRITE.
+ *
+ * OVSDB_LOG_BROKEN is the state following a call to ovsdb_log_replace() or
+ * ovsdb_log_replace_commit(), if it fails in a spectacular enough way that no
+ * further reads or writes can succeed.  This is a terminal state.
+ */
+enum ovsdb_log_state {
+    OVSDB_LOG_READ,             /* Ready to read. */
+    OVSDB_LOG_READ_ERROR,       /* Read failed, see 'error' for details. */
+    OVSDB_LOG_WRITE,            /* Ready to write. */
+    OVSDB_LOG_WRITE_ERROR,      /* Write failed, see 'error' for details. */
+    OVSDB_LOG_BROKEN,           /* Disk on fire, see 'error' for details. */
 };
 
 struct ovsdb_log {
+    enum ovsdb_log_state state;
+    struct ovsdb_error *error;
+
     off_t prev_offset;
     off_t offset;
     char *name;
     char *magic;
     struct lockfile *lockfile;
     FILE *stream;
-    struct ovsdb_error *read_error;
-    bool write_error;
-    enum ovsdb_log_mode mode;
 };
 
+/* Whether the OS supports renaming open files.
+ *
+ * (Making this a variable makes it easier to test both strategies on Unix-like
+ * systems.) */
+#ifdef _WIN32
+static bool rename_open_files = false;
+#else
+static bool rename_open_files = true;
+#endif
+
 /* Attempts to open 'name' with the specified 'open_mode'.  On success, stores
  * the new log into '*filep' and returns NULL; otherwise returns NULL and
  * stores NULL into '*filep'.
@@ -173,15 +207,14 @@  ovsdb_log_open(const char *name, const char *magic,
     }
 
     file = xmalloc(sizeof *file);
+    file->state = OVSDB_LOG_READ;
+    file->error = NULL;
     file->name = xstrdup(name);
     file->magic = xstrdup(magic);
     file->lockfile = lockfile;
     file->stream = stream;
     file->prev_offset = 0;
     file->offset = 0;
-    file->read_error = NULL;
-    file->write_error = false;
-    file->mode = OVSDB_LOG_READ;
     *filep = file;
     return NULL;
 
@@ -197,10 +230,12 @@  void
 ovsdb_log_close(struct ovsdb_log *file)
 {
     if (file) {
+        ovsdb_error_destroy(file->error);
         free(file->name);
-        fclose(file->stream);
+        if (file->stream) {
+            fclose(file->stream);
+        }
         lockfile_unlock(file->lockfile);
-        ovsdb_error_destroy(file->read_error);
         free(file);
     }
 }
@@ -283,6 +318,20 @@  parse_body(struct ovsdb_log *file, off_t offset, unsigned long int length,
 struct ovsdb_error *
 ovsdb_log_read(struct ovsdb_log *file, struct json **jsonp)
 {
+    *jsonp = NULL;
+    switch (file->state) {
+    case OVSDB_LOG_READ:
+        break;
+
+    case OVSDB_LOG_READ_ERROR:
+    case OVSDB_LOG_WRITE_ERROR:
+    case OVSDB_LOG_BROKEN:
+        return ovsdb_error_clone(file->error);
+
+    case OVSDB_LOG_WRITE:
+        return NULL;
+    }
+
     uint8_t expected_sha1[SHA1_DIGEST_SIZE];
     uint8_t actual_sha1[SHA1_DIGEST_SIZE];
     struct ovsdb_error *error;
@@ -291,20 +340,13 @@  ovsdb_log_read(struct ovsdb_log *file, struct json **jsonp)
     struct json *json;
     char header[128];
 
-    *jsonp = json = NULL;
-
-    if (file->read_error) {
-        return ovsdb_error_clone(file->read_error);
-    } else if (file->mode == OVSDB_LOG_WRITE) {
-        return NULL;
-    }
+    json = NULL;
 
     if (!fgets(header, sizeof header, file->stream)) {
         if (feof(file->stream)) {
-            error = NULL;
-        } else {
-            error = ovsdb_io_error(errno, "%s: read failed", file->name);
+            return NULL;
         }
+        error = ovsdb_io_error(errno, "%s: read failed", file->name);
         goto error;
     }
 
@@ -355,7 +397,8 @@  ovsdb_log_read(struct ovsdb_log *file, struct json **jsonp)
     return NULL;
 
 error:
-    file->read_error = ovsdb_error_clone(error);
+    file->state = OVSDB_LOG_READ_ERROR;
+    file->error = ovsdb_error_clone(error);
     json_destroy(json);
     return error;
 }
@@ -372,7 +415,7 @@  error:
 void
 ovsdb_log_unread(struct ovsdb_log *file)
 {
-    ovs_assert(file->mode == OVSDB_LOG_READ);
+    ovs_assert(file->state == OVSDB_LOG_READ);
     file->offset = file->prev_offset;
 }
 
@@ -389,9 +432,16 @@  ovsdb_log_write(struct ovsdb_log *file, const struct json *json)
 
     json_string = NULL;
 
-    if (file->mode == OVSDB_LOG_READ || file->write_error) {
-        file->mode = OVSDB_LOG_WRITE;
-        file->write_error = false;
+    switch (file->state) {
+    case OVSDB_LOG_WRITE:
+        break;
+
+    case OVSDB_LOG_READ:
+    case OVSDB_LOG_READ_ERROR:
+    case OVSDB_LOG_WRITE_ERROR:
+        file->state = OVSDB_LOG_WRITE;
+        ovsdb_error_destroy(file->error);
+        file->error = NULL;
         if (fseeko(file->stream, file->offset, SEEK_SET)) {
             error = ovsdb_io_error(errno, "%s: cannot seek to offset %lld",
                                    file->name, (long long int) file->offset);
@@ -402,6 +452,10 @@  ovsdb_log_write(struct ovsdb_log *file, const struct json *json)
                                    file->name, (long long int) file->offset);
             goto error;
         }
+        break;
+
+    case OVSDB_LOG_BROKEN:
+        return ovsdb_error_clone(file->error);
     }
 
     if (json->type != JSON_OBJECT && json->type != JSON_ARRAY) {
@@ -443,7 +497,8 @@  ovsdb_log_write(struct ovsdb_log *file, const struct json *json)
     return NULL;
 
 error:
-    file->write_error = true;
+    file->state = OVSDB_LOG_WRITE_ERROR;
+    file->error = ovsdb_error_clone(error);
     free(json_string);
     return error;
 }
@@ -451,7 +506,7 @@  error:
 struct ovsdb_error *
 ovsdb_log_commit(struct ovsdb_log *file)
 {
-    if (fsync(fileno(file->stream))) {
+    if (file->stream && fsync(fileno(file->stream))) {
         return ovsdb_io_error(errno, "%s: fsync failed", file->name);
     }
     return NULL;
@@ -465,3 +520,186 @@  ovsdb_log_get_offset(const struct ovsdb_log *log)
 {
     return log->offset;
 }
+
+/* Attempts to atomically replace the contents of 'log', on disk, by the 'n'
+ * entries in 'entries'.  If successful, returns NULL, otherwise returns an
+ * error (which the caller must eventually free).
+ *
+ * If successful, 'log' will be in write mode at the end of the log. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_log_replace(struct ovsdb_log *log, struct json **entries, size_t n)
+{
+    struct ovsdb_error *error;
+    struct ovsdb_log *new;
+
+    error = ovsdb_log_replace_start(log, &new);
+    if (error) {
+        return error;
+    }
+
+    for (size_t i = 0; i < n; i++) {
+        error = ovsdb_log_write(new, entries[i]);
+        if (error) {
+            ovsdb_log_replace_abort(new);
+            return error;
+        }
+    }
+
+    return ovsdb_log_replace_commit(log, new);
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_log_replace_start(struct ovsdb_log *old,
+                        struct ovsdb_log **newp)
+{
+    /* If old->name is a symlink, then we want the new file to be in the same
+     * directory as the symlink's referent. */
+    char *deref_name = follow_symlinks(old->name);
+    char *tmp_name = xasprintf("%s.tmp", deref_name);
+    free(deref_name);
+
+    struct ovsdb_error *error;
+
+    ovs_assert(old->lockfile);
+
+    /* Remove temporary file.  (It might not exist.) */
+    if (unlink(tmp_name) < 0 && errno != ENOENT) {
+        error = ovsdb_io_error(errno, "failed to remove %s", tmp_name);
+        free(tmp_name);
+        *newp = NULL;
+        return error;
+    }
+
+    /* Create temporary file. */
+    error = ovsdb_log_open(tmp_name, old->magic, OVSDB_LOG_CREATE_EXCL,
+                           false, newp);
+    free(tmp_name);
+    return error;
+}
+
+/* Rename 'old' to 'new', replacing 'new' if it exists.  Returns NULL if
+ * successful, otherwise an ovsdb_error that the caller must destroy. */
+static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_rename(const char *old, const char *new)
+{
+#ifdef _WIN32
+    /* Avoid rename() because it fails if the destination exists. */
+    int error = (MoveFileEx(old, new, MOVEFILE_REPLACE_EXISTING
+                            | MOVEFILE_WRITE_THROUGH | MOVEFILE_COPY_ALLOWED)
+                 ? 0 : EACCES);
+#else
+    int error = rename(old, new) ? errno : 0;
+#endif
+
+    return (error
+            ? ovsdb_io_error(error, "failed to rename \"%s\" to \"%s\"",
+                             old, new)
+            : NULL);
+}
+
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
+{
+    struct ovsdb_error *error = ovsdb_log_commit(new);
+    if (error) {
+        ovsdb_log_replace_abort(new);
+        return error;
+    }
+
+    /* Replace original file by the temporary file.
+     *
+     * We support two strategies:
+     *
+     *     - The preferred strategy is to rename the temporary file over the
+     *       original one in-place, then close the original one.  This works on
+     *       Unix-like systems.  It does not work on Windows, which does not
+     *       allow open files to be renamed.  The approach has the advantage
+     *       that, at any point, we can drop back to something that already
+     *       works.
+     *
+     *     - Alternatively, we can close both files, rename, then open the new
+     *       file (which now has the original name).  This works on all
+     *       systems, but if reopening the file fails then 'old' is broken.
+     *
+     * We make the strategy a variable instead of an #ifdef to make it easier
+     * to test both strategies on Unix-like systems, and to make the code
+     * easier to read. */
+    if (!rename_open_files) {
+        fclose(old->stream);
+        old->stream = NULL;
+
+        fclose(new->stream);
+        new->stream = NULL;
+    }
+
+    /* Rename 'old' to 'new'.  We dereference the old name because, if it is a
+     * symlink, we want to replace the referent of the symlink instead of the
+     * symlink itself. */
+    char *deref_name = follow_symlinks(old->name);
+    error = ovsdb_rename(new->name, deref_name);
+    free(deref_name);
+
+    if (error) {
+        ovsdb_log_replace_abort(new);
+        return error;
+    }
+    if (rename_open_files) {
+        fsync_parent_dir(old->name);
+        fclose(old->stream);
+        old->stream = new->stream;
+        new->stream = NULL;
+    } else {
+        old->stream = fopen(old->name, "r+b");
+        if (!old->stream) {
+            old->error = ovsdb_io_error(errno, "%s: could not reopen log",
+                                        old->name);
+            old->state = OVSDB_LOG_BROKEN;
+            return ovsdb_error_clone(old->error);
+        }
+
+        if (fseek(old->stream, new->offset, SEEK_SET)) {
+            old->error = ovsdb_io_error(errno, "%s: seek failed", old->name);
+            old->state = OVSDB_LOG_BROKEN;
+            return ovsdb_error_clone(old->error);
+        }
+    }
+
+    /* Replace 'old' by 'new' in memory.
+     *
+     * 'old' transitions to OVSDB_LOG_WRITE (it was probably in that mode
+     * anyway). */
+    old->state = OVSDB_LOG_WRITE;
+    ovsdb_error_destroy(old->error);
+    old->error = NULL;
+    /* prev_offset only matters for OVSDB_LOG_READ. */
+    old->offset = new->offset;
+    /* Keep old->name. */
+    free(old->magic);
+    old->magic = new->magic;
+    new->magic = NULL;
+    /* Keep old->lockfile. */
+    /* stream was already replaced above. */
+    /* Free 'new'. */
+    ovsdb_log_close(new);
+
+    return NULL;
+}
+
+void
+ovsdb_log_replace_abort(struct ovsdb_log *new)
+{
+    if (new) {
+        /* Unlink the new file, but only after we close it (because Windows
+         * does not allow removing an open file). */
+        char *name = xstrdup(new->name);
+        ovsdb_log_close(new);
+        unlink(name);
+        free(name);
+    }
+}
+
+void
+ovsdb_log_disable_renaming_open_files(void)
+{
+    rename_open_files = false;
+}
diff --git a/ovsdb/log.h b/ovsdb/log.h
index 08c6a7783094..41734e73b351 100644
--- a/ovsdb/log.h
+++ b/ovsdb/log.h
@@ -49,4 +49,18 @@  struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
 
 off_t ovsdb_log_get_offset(const struct ovsdb_log *);
 
+struct ovsdb_error *ovsdb_log_replace(struct ovsdb_log *,
+                                      struct json **entries, size_t n)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_log_replace_start(struct ovsdb_log *old,
+                                            struct ovsdb_log **newp)
+    OVS_WARN_UNUSED_RESULT;
+struct ovsdb_error *ovsdb_log_replace_commit(struct ovsdb_log *old,
+                                             struct ovsdb_log *new)
+    OVS_WARN_UNUSED_RESULT;
+void ovsdb_log_replace_abort(struct ovsdb_log *new);
+
+/* For testing. */
+void ovsdb_log_disable_renaming_open_files(void);
+
 #endif /* ovsdb/log.h */
diff --git a/tests/ovsdb-log.at b/tests/ovsdb-log.at
index 826e334efddd..125ddd187551 100644
--- a/tests/ovsdb-log.at
+++ b/tests/ovsdb-log.at
@@ -78,6 +78,80 @@  file: read: end of log
 AT_CHECK([test -f .file.~lock~])
 AT_CLEANUP
 
+AT_SETUP([write one, replace, commit])
+AT_KEYWORDS([ovsdb log])
+AT_CAPTURE_FILE([file])
+for option in '' --no-rename-open-files; do
+    rm -f file
+    AT_CHECK(
+      [[test-ovsdb $option log-io file create \
+	  'write:{"x":0}' \
+	  'replace_start' \
+	  'new-write:{"x":1}' \
+	  'new-write:{"x":2}' \
+	  'old-write:{"x":4}' \
+	  'replace_commit' \
+	  'read' \
+	  'write:{"x":3}']], [0],
+      [[file: open successful
+file: write:{"x":0} successful
+file: replace_start successful
+(temp): write:{"x":1} successful
+(temp): write:{"x":2} successful
+file: write:{"x":4} successful
+file: replace_commit successful
+file: read: end of log
+file: write:{"x":3} successful
+]])
+    AT_CHECK(
+      [test-ovsdb log-io file read-only read read read read], [0],
+      [[file: open successful
+file: read: {"x":1}
+file: read: {"x":2}
+file: read: {"x":3}
+file: read: end of log
+]], [ignore])
+done
+AT_CHECK([test -f .file.~lock~])
+AT_CLEANUP
+
+AT_SETUP([write one, replace, abort])
+AT_KEYWORDS([ovsdb log])
+AT_CAPTURE_FILE([file])
+for option in '' --no-rename-open-files; do
+    rm -f file
+    AT_CHECK(
+      [[test-ovsdb $option log-io file create \
+	  'write:{"x":0}' \
+	  'replace_start' \
+	  'new-write:{"x":1}' \
+	  'new-write:{"x":2}' \
+	  'old-write:{"x":4}' \
+	  'replace_abort' \
+	  'read' \
+	  'write:{"x":3}']], [0],
+      [[file: open successful
+file: write:{"x":0} successful
+file: replace_start successful
+(temp): write:{"x":1} successful
+(temp): write:{"x":2} successful
+file: write:{"x":4} successful
+file: replace_abort successful
+file: read: end of log
+file: write:{"x":3} successful
+]])
+    AT_CHECK(
+      [test-ovsdb log-io file read-only read read read read], [0],
+      [[file: open successful
+file: read: {"x":0}
+file: read: {"x":4}
+file: read: {"x":3}
+file: read: end of log
+]], [ignore])
+done
+AT_CHECK([test -f .file.~lock~])
+AT_CLEANUP
+
 AT_SETUP([write one, reread - alternative magic])
 AT_KEYWORDS([ovsdb log])
 AT_CAPTURE_FILE([file])
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index e06ecad4f368..6d2799c1a131 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -81,12 +81,14 @@  parse_options(int argc, char *argv[], struct test_ovsdb_pvt_context *pvt)
 {
     enum {
         OPT_MAGIC = CHAR_MAX + 1,
+        OPT_NO_RENAME_OPEN_FILES
     };
     static const struct option long_options[] = {
         {"timeout", required_argument, NULL, 't'},
         {"verbose", optional_argument, NULL, 'v'},
         {"change-track", optional_argument, NULL, 'c'},
         {"magic", required_argument, NULL, OPT_MAGIC},
+        {"no-rename-open-files", no_argument, NULL, OPT_NO_RENAME_OPEN_FILES},
         {"help", no_argument, NULL, 'h'},
         {NULL, 0, NULL, 0},
     };
@@ -127,6 +129,10 @@  parse_options(int argc, char *argv[], struct test_ovsdb_pvt_context *pvt)
             magic = optarg;
             break;
 
+        case OPT_NO_RENAME_OPEN_FILES:
+            ovsdb_log_disable_renaming_open_files();
+            break;
+
         case '?':
             exit(EXIT_FAILURE);
 
@@ -142,7 +148,8 @@  usage(void)
 {
     printf("%s: Open vSwitch database test utility\n"
            "usage: %s [OPTIONS] COMMAND [ARG...]\n\n"
-           "  [--magic=MAGIC] log-io FILE FLAGS COMMAND...\n"
+           "  [--magic=MAGIC] [--no-rename-open-files] "
+           " log-io FILE FLAGS COMMAND...\n"
            "    open FILE with FLAGS (and MAGIC), run COMMANDs\n"
            "  default-atoms\n"
            "    test ovsdb_atom_default()\n"
@@ -314,7 +321,6 @@  do_log_io(struct ovs_cmdl_context *ctx)
 
     struct ovsdb_error *error;
     enum ovsdb_log_open_mode mode;
-    struct ovsdb_log *log;
     int i;
 
     if (!strcmp(mode_string, "read-only")) {
@@ -329,17 +335,41 @@  do_log_io(struct ovs_cmdl_context *ctx)
         ovs_fatal(0, "unknown log-io open mode \"%s\"", mode_string);
     }
 
+    struct ovsdb_log *log;
     check_ovsdb_error(ovsdb_log_open(name, magic, mode, -1, &log));
     printf("%s: open successful\n", name);
 
+    struct ovsdb_log *replacement = NULL;
+
     for (i = 3; i < ctx->argc; i++) {
         const char *command = ctx->argv[i];
+
+        struct ovsdb_log *target;
+        const char *target_name;
+        if (!strncmp(command, "old-", 4)) {
+            command += 4;
+            target = log;
+            target_name = name;
+        } else if (!strncmp(command, "new-", 4)) {
+            if (!replacement) {
+                ovs_fatal(0, "%s: can't execute command without "
+                          "replacement log", command);
+            }
+
+            command += 4;
+            target = replacement;
+            target_name = "(temp)";
+        } else {
+            target = log;
+            target_name = name;
+        }
+
         if (!strcmp(command, "read")) {
             struct json *json;
 
-            error = ovsdb_log_read(log, &json);
+            error = ovsdb_log_read(target, &json);
             if (!error) {
-                printf("%s: read: ", name);
+                printf("%s: read: ", target_name);
                 if (json) {
                     print_and_free_json(json);
                 } else {
@@ -349,20 +379,32 @@  do_log_io(struct ovs_cmdl_context *ctx)
             }
         } else if (!strncmp(command, "write:", 6)) {
             struct json *json = parse_json(command + 6);
-            error = ovsdb_log_write(log, json);
+            error = ovsdb_log_write(target, json);
             json_destroy(json);
         } else if (!strcmp(command, "commit")) {
-            error = ovsdb_log_commit(log);
+            error = ovsdb_log_commit(target);
+        } else if (!strcmp(command, "replace_start")) {
+            ovs_assert(!replacement);
+            error = ovsdb_log_replace_start(log, &replacement);
+        } else if (!strcmp(command, "replace_commit")) {
+            ovs_assert(replacement);
+            error = ovsdb_log_replace_commit(log, replacement);
+            replacement = NULL;
+        } else if (!strcmp(command, "replace_abort")) {
+            ovs_assert(replacement);
+            ovsdb_log_replace_abort(replacement);
+            replacement = NULL;
+            error = NULL;
         } else {
             ovs_fatal(0, "unknown log-io command \"%s\"", command);
         }
         if (error) {
             char *s = ovsdb_error_to_string(error);
-            printf("%s: %s failed: %s\n", name, command, s);
+            printf("%s: %s failed: %s\n", target_name, command, s);
             free(s);
             ovsdb_error_destroy(error);
         } else {
-            printf("%s: %s successful\n", name, command);
+            printf("%s: %s successful\n", target_name, command);
         }
     }