diff mbox series

[ovs-dev,2/5] stream: Add record/replay functionality.

Message ID 20200630012435.205590-3-i.maximets@ovn.org
State Changes Requested
Headers show
Series Stream Record/Replay. | expand

Commit Message

Ilya Maximets June 30, 2020, 1:24 a.m. UTC
For debugging purposes it is useful to be able to record all the
incoming transactions and commands and replay them locally under
debugger or with additional logging enabled.  This patch introduces
ability to record all the incoming stream data and replay it via new
stream provider named 'stream-replay'.  During the record phase all
the incoming stream data written to special replay_* files in the
application rundir.  On replay phase instead of opening real streams
application will open replay_* files and read all the incoming data
directly from them.

If enabled for ovsdb-server, for example, this allows to record all
the connections and transactions from the big setup and replay them
locally afterwards to debug the behaviour or tests performance.

To start application in recording mode there is a --stream-replay-record
cmdline option. --stream-replay is to replay previously recorded
streams.  If application depends on generation of UUIDs, it's likely
required to pass --predictable-uuids-with-seed=XXX cmdline option in
both runs in order to have same UUIDs generated.

Current version doesn't work well with time-based stream events like
inactivity probes or any other events generated internally.  This is
a point for further improvement.

Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
---
 lib/automake.mk         |   1 +
 lib/stream-provider.h   |   4 +
 lib/stream-replay.c     | 561 ++++++++++++++++++++++++++++++++++++++++
 lib/stream.c            |  38 ++-
 lib/stream.h            |  40 ++-
 ovsdb/ovsdb-client.c    |   2 +-
 ovsdb/ovsdb-server.c    |   2 +-
 tests/test-jsonrpc.c    |   2 +-
 utilities/ovs-vsctl.c   |   2 +-
 vswitchd/ovs-vswitchd.c |   2 +-
 vtep/vtep-ctl.c         |   2 +-
 11 files changed, 642 insertions(+), 14 deletions(-)
 create mode 100644 lib/stream-replay.c

Comments

Dumitru Ceara July 14, 2020, 1:35 p.m. UTC | #1
On 6/30/20 3:24 AM, Ilya Maximets wrote:
> For debugging purposes it is useful to be able to record all the
> incoming transactions and commands and replay them locally under
> debugger or with additional logging enabled.  This patch introduces
> ability to record all the incoming stream data and replay it via new
> stream provider named 'stream-replay'.  During the record phase all
> the incoming stream data written to special replay_* files in the
> application rundir.  On replay phase instead of opening real streams
> application will open replay_* files and read all the incoming data
> directly from them.
> 
> If enabled for ovsdb-server, for example, this allows to record all
> the connections and transactions from the big setup and replay them
> locally afterwards to debug the behaviour or tests performance.
> 
> To start application in recording mode there is a --stream-replay-record
> cmdline option. --stream-replay is to replay previously recorded
> streams.  If application depends on generation of UUIDs, it's likely
> required to pass --predictable-uuids-with-seed=XXX cmdline option in
> both runs in order to have same UUIDs generated.
> 
> Current version doesn't work well with time-based stream events like
> inactivity probes or any other events generated internally.  This is
> a point for further improvement.
> 
> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
> ---
>  lib/automake.mk         |   1 +
>  lib/stream-provider.h   |   4 +
>  lib/stream-replay.c     | 561 ++++++++++++++++++++++++++++++++++++++++
>  lib/stream.c            |  38 ++-
>  lib/stream.h            |  40 ++-
>  ovsdb/ovsdb-client.c    |   2 +-
>  ovsdb/ovsdb-server.c    |   2 +-
>  tests/test-jsonrpc.c    |   2 +-
>  utilities/ovs-vsctl.c   |   2 +-
>  vswitchd/ovs-vswitchd.c |   2 +-
>  vtep/vtep-ctl.c         |   2 +-
>  11 files changed, 642 insertions(+), 14 deletions(-)
>  create mode 100644 lib/stream-replay.c
> 
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 86940ccd2..bbe7d04f1 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -281,6 +281,7 @@ lib_libopenvswitch_la_SOURCES = \
>  	lib/stream-fd.c \
>  	lib/stream-fd.h \
>  	lib/stream-provider.h \
> +	lib/stream-replay.c \
>  	lib/stream-ssl.h \
>  	lib/stream-tcp.c \
>  	lib/stream.c \
> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
> index 75f4f059b..0ce4f6f4c 100644
> --- a/lib/stream-provider.h
> +++ b/lib/stream-provider.h
> @@ -29,6 +29,7 @@ struct stream {
>      const struct stream_class *class;
>      int state;
>      int error;
> +    FILE *replay_wfd;
>      char *name;
>      char *peer_id;
>  };
> @@ -133,6 +134,7 @@ struct pstream {
>      const struct pstream_class *class;
>      char *name;
>      ovs_be16 bound_port;
> +    FILE *replay_wfd;
>  };
>  
>  void pstream_init(struct pstream *, const struct pstream_class *, char *name);
> @@ -200,5 +202,7 @@ extern const struct pstream_class pwindows_pstream_class;
>  extern const struct stream_class ssl_stream_class;
>  extern const struct pstream_class pssl_pstream_class;
>  #endif
> +extern const struct stream_class replay_stream_class;
> +extern const struct pstream_class preplay_pstream_class;
>  
>  #endif /* stream-provider.h */
> diff --git a/lib/stream-replay.c b/lib/stream-replay.c
> new file mode 100644
> index 000000000..16486f94e
> --- /dev/null
> +++ b/lib/stream-replay.c
> @@ -0,0 +1,561 @@
> +/*
> + * Copyright (c) 2020, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <ctype.h>
> +#include <errno.h>
> +#include <poll.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include <sys/socket.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +#include "dirs.h"
> +#include "ovs-atomic.h"
> +#include "util.h"
> +#include "stream-provider.h"
> +#include "stream.h"
> +#include "openvswitch/poll-loop.h"
> +#include "openvswitch/vlog.h"
> +
> +VLOG_DEFINE_THIS_MODULE(stream_replay);
> +
> +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
> +
> +/* Stream replay. */
> +
> +static struct ovs_mutex replay_mutex = OVS_MUTEX_INITIALIZER;
> +static int replay_seqno OVS_GUARDED_BY(replay_mutex) = 0;
> +static atomic_int replay_state = ATOMIC_VAR_INIT(STREAM_REPLAY_NONE);
> +
> +void
> +stream_replay_set_state(enum stream_replay_state state)
> +{
> +    atomic_store_relaxed(&replay_state, state);
> +}
> +
> +enum stream_replay_state
> +stream_replay_get_state(void)
> +{
> +    int state;
> +
> +    atomic_read_relaxed(&replay_state, &state);
> +    return state;
> +}
> +
> +static char *
> +replay_file_name(const char *name, int seqno)
> +{
> +    char *local_name = xstrdup(name);
> +    char *filename, *p, *c;
> +    bool skip = false;
> +
> +    /* Replace all the numbers and special symbols with single underscore.
> +     * Numbers might be PIDs or port numbers that could change between record
> +     * and replay phases, special symbols might be not good as a filename.
> +     * We have a unique seuqence number as part of the name, so we don't care
> +     * keeping too much information. */
> +    for (c = p = local_name; *p; p++) {
> +         if (!isalpha((unsigned char) *p)) {
> +             if (!skip) {
> +                *c++ = '_';
> +                skip = true;
> +             }
> +         } else {
> +             *c++ = *p;
> +             skip = false;
> +         }
> +    }
> +    if (skip) {
> +        c--;
> +    }
> +    *c = '\0';
> +    filename = xasprintf("replay_%s_%d", local_name, seqno);
> +    VLOG_DBG("Constructing replay filename: '%s' --> '%s' --> '%s'.",
> +             name, local_name, filename);
> +    free(local_name);
> +
> +    return filename;
> +}
> +
> +/* In write mode creates a new replay file to write stream replay.
> + * In read mode opens an existing replay file. */
> +static int
> +replay_file_open(const char *name, FILE **f, int *seqno)
> +    OVS_REQUIRES(replay_mutex)
> +{
> +    char *file_path, *filename;
> +    int state = stream_replay_get_state();
> +
> +    ovs_assert(state != STREAM_REPLAY_NONE);
> +
> +    filename = replay_file_name(name, replay_seqno);
> +    file_path = abs_file_name(ovs_rundir(), filename);
> +    free(filename);
> +
> +    *f = fopen(file_path, state == STREAM_REPLAY_WRITE ? "wb" : "rb");
> +    if (!*f) {
> +        VLOG_ERR("%s: fopen failed: %s", file_path, ovs_strerror(errno));
> +        free(file_path);
> +        return errno;
> +    }
> +    free(file_path);
> +
> +    if (state == STREAM_REPLAY_READ
> +        && fread(seqno, sizeof *seqno, 1, *f) != 1) {
> +        VLOG_INFO("%s: failed to read seqno: stream might be empty.", name);
> +        *seqno = INT_MAX;
> +    }
> +    replay_seqno++;  /* New file opened. */
> +    return 0;
> +}
> +
> +static int
> +replay_write(FILE *f, const void *buffer, int n, bool is_read)
> +{
> +    int state = stream_replay_get_state();
> +    int seqno_to_write;
> +    int retval = 0;
> +
> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
> +        return 0;
> +    }
> +
> +    ovs_mutex_lock(&replay_mutex);
> +
> +    seqno_to_write = is_read ? replay_seqno : -replay_seqno;
> +    if (fwrite(&seqno_to_write, sizeof seqno_to_write, 1, f) != 1) {
> +        VLOG_ERR_RL(&rl, "Failed to write seqno.");
> +        retval = -1;
> +        goto out;
> +    }
> +    if (fwrite(&n, sizeof n, 1, f) != 1) {
> +        VLOG_ERR_RL(&rl, "Failed to write length.");
> +        retval = -1;
> +        goto out;
> +    }
> +    if (n > 0 && is_read && fwrite(buffer, 1, n, f) != n) {
> +        VLOG_ERR_RL(&rl, "Failed to write data.");
> +        retval = -1;
> +    }
> +out:
> +    replay_seqno++; /* Write completed. */
> +    ovs_mutex_unlock(&replay_mutex);
> +    return retval;
> +}
> +
> +static int
> +replay_read(FILE *f, void *buffer, int buffer_size,
> +            int *len, int *seqno, bool is_read)
> +    OVS_REQUIRES(replay_mutex)
> +{
> +    int retval = EINVAL;
> +
> +    if (fread(len, sizeof *len, 1, f) != 1
> +        || (is_read && *len > buffer_size)) {
> +        VLOG_ERR("Failed to read replay length.");
> +        goto out;
> +    }
> +
> +    if (*len > 0 && is_read && fread(buffer, 1, *len, f) != *len) {
> +        VLOG_ERR("Failed to read replay buffer.");
> +        goto out;
> +    }
> +
> +    if (fread(seqno, sizeof *seqno, 1, f) != 1) {
> +        *seqno = INT_MAX;  /* Most likely EOF. */
> +        if (ferror(f)) {
> +            VLOG_INFO("Failed to read replay seqno.");
> +            goto out;
> +        }
> +    }
> +
> +    retval = 0;
> +out:
> +    replay_seqno++;  /* Read completed. */
> +    return retval;
> +}
> +
> +
> +/* Active replay stream. */
> +
> +struct stream_replay
> +{
> +    struct stream stream;
> +    FILE *f;
> +    int seqno;
> +};
> +
> +const struct stream_class replay_stream_class;
> +
> +static inline bool
> +seqno_is_read(int seqno)
> +{
> +    return seqno >= 0;
> +}
> +
> +static inline int
> +normalized_seqno(int seqno)
> +{
> +    return seqno >= 0 ? seqno : -seqno;
> +}
> +
> +
> +/* Creates a new stream named 'name' that will emulate sending and receiving
> + * data using replay file and stores a pointer to the stream in '*streamp'.
> + *
> + * Takes ownership of 'name'.
> + *
> + * Returns 0 if successful, otherwise a positive errno value. */
> +static int
> +new_replay_stream(char *name, struct stream **streamp)
> +    OVS_REQUIRES(replay_mutex)
> +{
> +    struct stream_replay *s;
> +    int seqno = 0, error;
> +    FILE *f;
> +
> +    error = replay_file_open(name, &f, &seqno);
> +    if (error) {
> +        VLOG_ERR("%s: failed to open stream.", name);
> +        return error;
> +    }
> +
> +    s = xmalloc(sizeof *s);
> +    stream_init(&s->stream, &replay_stream_class, 0, name);
> +    s->f = f;
> +    s->seqno = seqno;
> +    *streamp = &s->stream;
> +    return 0;
> +}
> +
> +static struct stream_replay *
> +stream_replay_cast(struct stream *stream)
> +{
> +    stream_assert_class(stream, &replay_stream_class);
> +    return CONTAINER_OF(stream, struct stream_replay, stream);
> +}
> +
> +void
> +stream_replay_open_wfd(struct stream *s)
> +{
> +    FILE *f;
> +    int state = stream_replay_get_state();
> +
> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
> +        return;
> +    }
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    if (!replay_file_open(s->name, &f, NULL)) {
> +        s->replay_wfd = f;
> +    }
> +    ovs_mutex_unlock(&replay_mutex);
> +}
> +
> +void
> +stream_replay_write(struct stream *s, const void *buffer, int n, bool is_read)
> +{
> +    int state = stream_replay_get_state();
> +
> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
> +        return;
> +    }
> +    if (replay_write(s->replay_wfd, buffer, n, is_read)) {
> +        VLOG_ERR("%s: failed to write buffer.", s->name);
> +    }
> +}
> +
> +void
> +stream_replay_close_wfd(struct stream *s)
> +{
> +    if (s->replay_wfd) {
> +        fclose(s->replay_wfd);
> +    }
> +}
> +
> +static int
> +replay_open(const char *name, char *suffix OVS_UNUSED, struct stream **streamp,
> +          uint8_t dscp OVS_UNUSED)
> +{
> +    int retval;
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    retval = new_replay_stream(xstrdup(name), streamp);
> +    ovs_mutex_unlock(&replay_mutex);
> +
> +    return retval;
> +}
> +
> +static void
> +replay_close(struct stream *stream)
> +{
> +    struct stream_replay *s = stream_replay_cast(stream);
> +    fclose(s->f);
> +    free(s);
> +}
> +
> +static ssize_t
> +replay_recv(struct stream *stream, void *buffer, size_t n)
> +{
> +    struct stream_replay *s = stream_replay_cast(stream);
> +    int norm_seqno = normalized_seqno(s->seqno);
> +    int error, len;
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    ovs_assert(norm_seqno >= replay_seqno);
> +
> +    if (norm_seqno != replay_seqno || !seqno_is_read(s->seqno)) {
> +        error = EAGAIN;
> +        goto unlock;
> +    }
> +
> +    error = replay_read(s->f, buffer, n, &len, &s->seqno, true);
> +    if (error) {
> +        VLOG_ERR("%s: failed to read from replay file.", stream->name);
> +        goto unlock;
> +    }
> +
> +unlock:
> +    ovs_mutex_unlock(&replay_mutex);
> +    return error ? -error : len;
> +}
> +
> +static ssize_t
> +replay_send(struct stream *stream OVS_UNUSED, const void *buffer OVS_UNUSED,
> +            size_t n)
> +{
> +    struct stream_replay *s = stream_replay_cast(stream);
> +    int norm_seqno = normalized_seqno(s->seqno);
> +    int error, len;
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    ovs_assert(norm_seqno >= replay_seqno);
> +
> +    if (norm_seqno != replay_seqno || seqno_is_read(s->seqno)) {
> +        error = EAGAIN;
> +        goto unlock;
> +    }
> +
> +    error = replay_read(s->f, NULL, 0, &len, &s->seqno, false);
> +    if (error) {
> +        VLOG_ERR("%s: failed to read from replay file.", stream->name);
> +        goto unlock;
> +    }
> +    ovs_assert(len < 0 || len <= n);
> +
> +unlock:
> +    ovs_mutex_unlock(&replay_mutex);
> +    return error ? -error : len;
> +}
> +
> +static void
> +replay_wait(struct stream *stream, enum stream_wait_type wait)
> +{
> +    struct stream_replay *s = stream_replay_cast(stream);
> +    switch (wait) {
> +    case STREAM_CONNECT:
> +        /* Connect does nothing and always avaialable. */
> +        poll_immediate_wake();
> +        break;
> +
> +    case STREAM_SEND:
> +        if (s->seqno != INT_MAX && !seqno_is_read(s->seqno)) {
> +            /* Stream waits for write. */
> +            poll_immediate_wake();
> +        }
> +        break;
> +
> +    case STREAM_RECV:
> +        if (s->seqno != INT_MAX && seqno_is_read(s->seqno)) {
> +            /* We still have something to read. */
> +            poll_immediate_wake();
> +        }
> +        break;
> +
> +    default:
> +        OVS_NOT_REACHED();
> +    }
> +}
> +
> +const struct stream_class replay_stream_class = {
> +    "replay",                   /* name */
> +    false,                      /* needs_probes */
> +    replay_open,                /* open */
> +    replay_close,               /* close */
> +    NULL,                       /* connect */
> +    replay_recv,                /* recv */
> +    replay_send,                /* send */
> +    NULL,                       /* run */
> +    NULL,                       /* run_wait */
> +    replay_wait,                /* wait */
> +};
> +
> +/* Passive file descriptor stream. */
> +
> +struct replay_pstream
> +{
> +    struct pstream pstream;
> +    FILE *f;
> +    int seqno;
> +};
> +
> +const struct pstream_class preplay_pstream_class;
> +
> +static struct replay_pstream *
> +replay_pstream_cast(struct pstream *pstream)
> +{
> +    pstream_assert_class(pstream, &preplay_pstream_class);
> +    return CONTAINER_OF(pstream, struct replay_pstream, pstream);
> +}
> +
> +/* Creates a new pstream named 'name' that will accept new replay connections
> + * reading them from the replay file and stores a pointer to the stream in
> + * '*pstreamp'.
> + *
> + * Takes ownership of 'name'.
> + *
> + * Returns 0 if successful, otherwise a positive errno value. */
> +static int
> +preplay_listen(const char *name, char *suffix OVS_UNUSED,
> +               struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
> +{
> +    int seqno = 0, error;
> +    FILE *f;
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    error = replay_file_open(name, &f, &seqno);
> +    ovs_mutex_unlock(&replay_mutex);
> +    if (error) {
> +        VLOG_ERR("%s: failed to open pstream.", name);
> +        return error;
> +    }
> +
> +    struct replay_pstream *ps = xmalloc(sizeof *ps);
> +    pstream_init(&ps->pstream, &preplay_pstream_class, xstrdup(name));
> +    ps->f = f;
> +    ps->seqno = seqno;
> +    *pstreamp = &ps->pstream;
> +    return 0;
> +}
> +
> +void
> +pstream_replay_open_wfd(struct pstream *ps)
> +{
> +    FILE *f;
> +    int state = stream_replay_get_state();
> +
> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
> +        return;
> +    }
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    if (!replay_file_open(ps->name, &f, NULL)) {
> +        ps->replay_wfd = f;
> +    }
> +    ovs_mutex_unlock(&replay_mutex);
> +}
> +
> +
> +void
> +pstream_replay_write_accept(struct pstream *ps, const struct stream *s)
> +{
> +    int state = stream_replay_get_state();
> +    int len;
> +
> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
> +        return;
> +    }
> +
> +    len = strlen(s->name);
> +    if (replay_write(ps->replay_wfd, s->name, len, true)) {
> +        VLOG_ERR("%s: failed to write accept name: %s", ps->name, s->name);
> +    }
> +}
> +
> +void
> +pstream_replay_close_wfd(struct pstream *ps)
> +{
> +    if (ps->replay_wfd) {
> +        fclose(ps->replay_wfd);
> +    }
> +}
> +
> +
> +static void
> +preplay_close(struct pstream *pstream)
> +{
> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
> +
> +    fclose(ps->f);
> +    free(ps);
> +}
> +
> +#define MAX_NAME_LEN 65536
> +
> +static int
> +preplay_accept(struct pstream *pstream, struct stream **new_streamp)
> +{
> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
> +    int norm_seqno = normalized_seqno(ps->seqno);
> +    int retval, len;
> +    char name[MAX_NAME_LEN];
> +
> +    ovs_mutex_lock(&replay_mutex);
> +    ovs_assert(norm_seqno >= replay_seqno);
> +
> +    if (norm_seqno != replay_seqno || !seqno_is_read(ps->seqno)) {
> +        retval = EAGAIN;
> +        goto unlock;
> +    }
> +
> +    retval = replay_read(ps->f, name, MAX_NAME_LEN - 1,
> +                         &len, &ps->seqno, true);
> +    if (retval) {
> +        VLOG_ERR("%s: failed to read from replay file.", pstream->name);
> +        goto unlock;
> +    }
> +
> +    if (len > 0) {
> +        name[len] = 0;
> +        retval = new_replay_stream(xstrdup(name), new_streamp);
> +    } else {
> +        retval = len;
> +    }
> +unlock:
> +    ovs_mutex_unlock(&replay_mutex);
> +    return retval;
> +}
> +
> +static void
> +preplay_wait(struct pstream *pstream)
> +{
> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
> +
> +    if (ps->seqno != INT_MAX) {
> +        /* Replay always has somthing to say. */
> +        poll_immediate_wake();
> +    }
> +}
> +
> +const struct pstream_class preplay_pstream_class = {
> +    "preplay",
> +    false,
> +    preplay_listen,
> +    preplay_close,
> +    preplay_accept,
> +    preplay_wait,
> +};
> diff --git a/lib/stream.c b/lib/stream.c
> index e246b3773..2ee392462 100644
> --- a/lib/stream.c
> +++ b/lib/stream.c
> @@ -117,7 +117,7 @@ check_stream_classes(void)
>   * connection methods supported by the stream. */
>  void
>  stream_usage(const char *name, bool active, bool passive,
> -             bool bootstrap OVS_UNUSED)
> +             bool bootstrap OVS_UNUSED, bool replay)

This is a non backwards compatible change that will break OVN
compilation. Is there a way we can avoid that? I guess if this series
gets applied, a follow up OVN patch is needed immediately to fix the
stream_usage() callers.

>  {
>      /* Really this should be implemented via callbacks into the stream
>       * providers, but that seems too heavy-weight to bother with at the
> @@ -161,6 +161,11 @@ stream_usage(const char *name, bool active, bool passive,
>             "  --ssl-protocols=PROTOS  list of SSL protocols to enable\n"
>             "  --ssl-ciphers=CIPHERS   list of SSL ciphers to enable\n");
>  #endif
> +    if (replay) {
> +        printf("Replay options:\n"
> +               "  --stream-replay-record turn on writing replay files\n"
> +               "  --stream-replay        run connections from replay files\n");
> +    }
>  }
>  
>  /* Given 'name', a stream name in the form "TYPE:ARGS", stores the class
> @@ -185,6 +190,9 @@ stream_lookup_class(const char *name, const struct stream_class **classp)
>          if (strlen(class->name) == prefix_len
>              && !memcmp(class->name, name, prefix_len)) {
>              *classp = class;
> +            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
> +                *classp = &replay_stream_class;
> +            }
>              return 0;
>          }
>      }
> @@ -295,6 +303,7 @@ stream_close(struct stream *stream)
>      if (stream != NULL) {
>          char *name = stream->name;
>          char *peer_id = stream->peer_id;
> +        stream_replay_close_wfd(stream);
>          (stream->class->close)(stream);
>          free(name);
>          free(peer_id);
> @@ -367,9 +376,13 @@ int
>  stream_recv(struct stream *stream, void *buffer, size_t n)
>  {
>      int retval = stream_connect(stream);
> -    return (retval ? -retval
> -            : n == 0 ? 0
> -            : (stream->class->recv)(stream, buffer, n));
> +
> +    retval = retval ? -retval
> +             : n == 0 ? 0
> +             : (stream->class->recv)(stream, buffer, n);
> +
> +    stream_replay_write(stream, buffer, retval, true);
> +    return retval;
>  }
>  
>  /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
> @@ -385,9 +398,12 @@ int
>  stream_send(struct stream *stream, const void *buffer, size_t n)
>  {
>      int retval = stream_connect(stream);
> -    return (retval ? -retval
> -            : n == 0 ? 0
> -            : (stream->class->send)(stream, buffer, n));
> +    retval = retval ? -retval
> +             : n == 0 ? 0
> +             : (stream->class->send)(stream, buffer, n);
> +
> +    stream_replay_write(stream, buffer, retval, false);
> +    return retval;
>  }
>  
>  /* Allows 'stream' to perform maintenance activities, such as flushing
> @@ -483,6 +499,9 @@ pstream_lookup_class(const char *name, const struct pstream_class **classp)
>          if (strlen(class->name) == prefix_len
>              && !memcmp(class->name, name, prefix_len)) {
>              *classp = class;
> +            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
> +                *classp = &preplay_pstream_class;
> +            }
>              return 0;
>          }
>      }
> @@ -548,6 +567,8 @@ pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
>          goto error;
>      }
>  
> +    pstream_replay_open_wfd(pstream);
> +

If we fail listening on pstream (e.g.,
"ovsdb_jsonrpc_server|ERR|ptcp:6641:172.17.1.78: listen failed: Cannot
assign requested address") we won't reach this point. Later on, when
trying to replay the recorded execution we'll get an error like:

stream_replay|ERR|/var/run/openvswitch/replay_ptcp_83: fopen failed: No
such file or directory
stream_replay|ERR|ptcp:6641:172.17.1.78: failed to open pstream.

I was a bit confused at first when trying the replay and I was wondering
if I was using the functionality properly. I wonder if we can improve
this somehow, and if it is possible to maybe record that listen failed.

>      /* Success. */
>      *pstreamp = pstream;
>      return 0;
> @@ -571,6 +592,7 @@ pstream_close(struct pstream *pstream)
>  {
>      if (pstream != NULL) {
>          char *name = pstream->name;
> +        pstream_replay_close_wfd(pstream);
>          (pstream->class->close)(pstream);
>          free(name);
>      }
> @@ -591,6 +613,8 @@ pstream_accept(struct pstream *pstream, struct stream **new_stream)
>      } else {
>          ovs_assert((*new_stream)->state != SCS_CONNECTING
>                     || (*new_stream)->class->connect);
> +        pstream_replay_write_accept(pstream, *new_stream);
> +        stream_replay_open_wfd(*new_stream);
>      }
>      return retval;
>  }
> diff --git a/lib/stream.h b/lib/stream.h
> index 77bffa498..40357137e 100644
> --- a/lib/stream.h
> +++ b/lib/stream.h
> @@ -29,7 +29,8 @@ struct pstream;
>  struct stream;
>  struct vlog_module;
>  
> -void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
> +void stream_usage(const char *name, bool active, bool passive,
> +                  bool bootstrap, bool replay);
>  
>  /* Bidirectional byte streams. */
>  int stream_verify_name(const char *name);
> @@ -94,4 +95,41 @@ enum stream_content_type {
>  void stream_report_content(const void *, ssize_t, enum stream_content_type,
>                             struct vlog_module *, const char *stream_name);
>  
> +
> +/* Replay state. */
> +enum stream_replay_state {
> +    STREAM_REPLAY_NONE,
> +    STREAM_REPLAY_WRITE,
> +    STREAM_REPLAY_READ,
> +};
> +
> +void stream_replay_set_state(enum stream_replay_state);
> +enum stream_replay_state stream_replay_get_state(void);
> +void stream_replay_open_wfd(struct stream *);
> +void pstream_replay_open_wfd(struct pstream *);
> +void stream_replay_close_wfd(struct stream *);
> +void pstream_replay_close_wfd(struct pstream *);
> +void stream_replay_write(struct stream *, const void *, int, bool is_read);
> +void pstream_replay_write_accept(struct pstream *, const struct stream *);
> +
> +#define STREAM_REPLAY_OPTION_ENUMS  \
> +        OPT_STREAM_REPLAY_REC,      \
> +        OPT_STREAM_REPLAY
> +
> +#define STREAM_REPLAY_LONG_OPTIONS                                           \
> +        {"stream-replay-record", no_argument, NULL, OPT_STREAM_REPLAY_REC},  \
> +        {"stream-replay",        no_argument, NULL, OPT_STREAM_REPLAY}
> +
> +#define STREAM_REPLAY_OPTION_HANDLERS                       \
> +        case OPT_STREAM_REPLAY_REC:                         \
> +            stream_replay_set_state(STREAM_REPLAY_WRITE);   \

I'm not sure exactly where but I think it might be useful to store the
full command line that was use to start the process when recording was
requested.

Maybe also automatically create a snapshot of the DB file in case the
user forgets to do that manually.

It's not always easy to figure out what the command line is when
ovsdb-server is started from a script, e.g., ovn-ctl starting
ovsdb-server for NB/SB.

Thanks,
Dumitru

> +            break;                                          \
> +                                                            \
> +        case OPT_STREAM_REPLAY:                             \
> +            stream_replay_set_state(STREAM_REPLAY_READ);    \
> +            break;
> +
> +#define STREAM_REPLAY_CASES \
> +        case OPT_STREAM_REPLAY_REC: case OPT_STREAM_REPLAY:
> +
>  #endif /* stream.h */
> diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
> index 72756eb1f..1925f5213 100644
> --- a/ovsdb/ovsdb-client.c
> +++ b/ovsdb/ovsdb-client.c
> @@ -462,7 +462,7 @@ usage(void)
>             "\nThe default SERVER is unix:%s/db.sock.\n"
>             "The default DATABASE is Open_vSwitch.\n",
>             program_name, program_name, ovs_rundir());
> -    stream_usage("SERVER", true, true, true);
> +    stream_usage("SERVER", true, true, true, false);
>      table_usage();
>      printf("  --timestamp                 timestamp \"monitor\" output");
>      daemon_usage();
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index 42ba0053b..3af09c8c1 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -1868,7 +1868,7 @@ usage(void)
>             program_name, program_name, ovs_dbdir());
>      printf("\nJSON-RPC options (may be specified any number of times):\n"
>             "  --remote=REMOTE         connect or listen to REMOTE\n");
> -    stream_usage("JSON-RPC", true, true, true);
> +    stream_usage("JSON-RPC", true, true, true, false);
>      daemon_usage();
>      vlog_usage();
>      replication_usage();
> diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
> index 04e941b14..fa9c90b95 100644
> --- a/tests/test-jsonrpc.c
> +++ b/tests/test-jsonrpc.c
> @@ -109,7 +109,7 @@ usage(void)
>             "  request REMOTE METHOD PARAMS   send request, print reply\n"
>             "  notify REMOTE METHOD PARAMS  send notification and exit\n",
>             program_name, program_name);
> -    stream_usage("JSON-RPC", true, true, true);
> +    stream_usage("JSON-RPC", true, true, true, false);
>      daemon_usage();
>      vlog_usage();
>      printf("\nOther options:\n"
> diff --git a/utilities/ovs-vsctl.c b/utilities/ovs-vsctl.c
> index 37cc72d40..e7dc50ed5 100644
> --- a/utilities/ovs-vsctl.c
> +++ b/utilities/ovs-vsctl.c
> @@ -458,7 +458,7 @@ Options:\n\
>      vlog_usage();
>      printf("\
>    --no-syslog             equivalent to --verbose=vsctl:syslog:warn\n");
> -    stream_usage("database", true, true, true);
> +    stream_usage("database", true, true, true, false);
>      printf("\n\
>  Other options:\n\
>    -h, --help                  display this help message\n\
> diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
> index 1e72b628b..e61a1c3b7 100644
> --- a/vswitchd/ovs-vswitchd.c
> +++ b/vswitchd/ovs-vswitchd.c
> @@ -276,7 +276,7 @@ usage(void)
>             "where DATABASE is a socket on which ovsdb-server is listening\n"
>             "      (default: \"unix:%s/db.sock\").\n",
>             program_name, program_name, ovs_rundir());
> -    stream_usage("DATABASE", true, false, true);
> +    stream_usage("DATABASE", true, false, true, false);
>      daemon_usage();
>      vlog_usage();
>      printf("\nDPDK options:\n"
> diff --git a/vtep/vtep-ctl.c b/vtep/vtep-ctl.c
> index ab552457d..09d407ac9 100644
> --- a/vtep/vtep-ctl.c
> +++ b/vtep/vtep-ctl.c
> @@ -373,7 +373,7 @@ Options:\n\
>      vlog_usage();
>      printf("\
>    --no-syslog                 equivalent to --verbose=vtep_ctl:syslog:warn\n");
> -    stream_usage("database", true, true, false);
> +    stream_usage("database", true, true, false, false);
>      printf("\n\
>  Other options:\n\
>    -h, --help                  display this help message\n\
>
Ilya Maximets July 16, 2020, 5:44 p.m. UTC | #2
On 7/14/20 3:35 PM, Dumitru Ceara wrote:
> On 6/30/20 3:24 AM, Ilya Maximets wrote:
>> For debugging purposes it is useful to be able to record all the
>> incoming transactions and commands and replay them locally under
>> debugger or with additional logging enabled.  This patch introduces
>> ability to record all the incoming stream data and replay it via new
>> stream provider named 'stream-replay'.  During the record phase all
>> the incoming stream data written to special replay_* files in the
>> application rundir.  On replay phase instead of opening real streams
>> application will open replay_* files and read all the incoming data
>> directly from them.
>>
>> If enabled for ovsdb-server, for example, this allows to record all
>> the connections and transactions from the big setup and replay them
>> locally afterwards to debug the behaviour or tests performance.
>>
>> To start application in recording mode there is a --stream-replay-record
>> cmdline option. --stream-replay is to replay previously recorded
>> streams.  If application depends on generation of UUIDs, it's likely
>> required to pass --predictable-uuids-with-seed=XXX cmdline option in
>> both runs in order to have same UUIDs generated.
>>
>> Current version doesn't work well with time-based stream events like
>> inactivity probes or any other events generated internally.  This is
>> a point for further improvement.
>>
>> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
>> ---
>>  lib/automake.mk         |   1 +
>>  lib/stream-provider.h   |   4 +
>>  lib/stream-replay.c     | 561 ++++++++++++++++++++++++++++++++++++++++
>>  lib/stream.c            |  38 ++-
>>  lib/stream.h            |  40 ++-
>>  ovsdb/ovsdb-client.c    |   2 +-
>>  ovsdb/ovsdb-server.c    |   2 +-
>>  tests/test-jsonrpc.c    |   2 +-
>>  utilities/ovs-vsctl.c   |   2 +-
>>  vswitchd/ovs-vswitchd.c |   2 +-
>>  vtep/vtep-ctl.c         |   2 +-
>>  11 files changed, 642 insertions(+), 14 deletions(-)
>>  create mode 100644 lib/stream-replay.c
>>
>> diff --git a/lib/automake.mk b/lib/automake.mk
>> index 86940ccd2..bbe7d04f1 100644
>> --- a/lib/automake.mk
>> +++ b/lib/automake.mk
>> @@ -281,6 +281,7 @@ lib_libopenvswitch_la_SOURCES = \
>>  	lib/stream-fd.c \
>>  	lib/stream-fd.h \
>>  	lib/stream-provider.h \
>> +	lib/stream-replay.c \
>>  	lib/stream-ssl.h \
>>  	lib/stream-tcp.c \
>>  	lib/stream.c \
>> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
>> index 75f4f059b..0ce4f6f4c 100644
>> --- a/lib/stream-provider.h
>> +++ b/lib/stream-provider.h
>> @@ -29,6 +29,7 @@ struct stream {
>>      const struct stream_class *class;
>>      int state;
>>      int error;
>> +    FILE *replay_wfd;
>>      char *name;
>>      char *peer_id;
>>  };
>> @@ -133,6 +134,7 @@ struct pstream {
>>      const struct pstream_class *class;
>>      char *name;
>>      ovs_be16 bound_port;
>> +    FILE *replay_wfd;
>>  };
>>  
>>  void pstream_init(struct pstream *, const struct pstream_class *, char *name);
>> @@ -200,5 +202,7 @@ extern const struct pstream_class pwindows_pstream_class;
>>  extern const struct stream_class ssl_stream_class;
>>  extern const struct pstream_class pssl_pstream_class;
>>  #endif
>> +extern const struct stream_class replay_stream_class;
>> +extern const struct pstream_class preplay_pstream_class;
>>  
>>  #endif /* stream-provider.h */
>> diff --git a/lib/stream-replay.c b/lib/stream-replay.c
>> new file mode 100644
>> index 000000000..16486f94e
>> --- /dev/null
>> +++ b/lib/stream-replay.c
>> @@ -0,0 +1,561 @@
>> +/*
>> + * Copyright (c) 2020, Red Hat, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +#include <ctype.h>
>> +#include <errno.h>
>> +#include <poll.h>
>> +#include <stdlib.h>
>> +#include <string.h>
>> +#include <sys/socket.h>
>> +#include <sys/types.h>
>> +#include <unistd.h>
>> +#include "dirs.h"
>> +#include "ovs-atomic.h"
>> +#include "util.h"
>> +#include "stream-provider.h"
>> +#include "stream.h"
>> +#include "openvswitch/poll-loop.h"
>> +#include "openvswitch/vlog.h"
>> +
>> +VLOG_DEFINE_THIS_MODULE(stream_replay);
>> +
>> +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
>> +
>> +/* Stream replay. */
>> +
>> +static struct ovs_mutex replay_mutex = OVS_MUTEX_INITIALIZER;
>> +static int replay_seqno OVS_GUARDED_BY(replay_mutex) = 0;
>> +static atomic_int replay_state = ATOMIC_VAR_INIT(STREAM_REPLAY_NONE);
>> +
>> +void
>> +stream_replay_set_state(enum stream_replay_state state)
>> +{
>> +    atomic_store_relaxed(&replay_state, state);
>> +}
>> +
>> +enum stream_replay_state
>> +stream_replay_get_state(void)
>> +{
>> +    int state;
>> +
>> +    atomic_read_relaxed(&replay_state, &state);
>> +    return state;
>> +}
>> +
>> +static char *
>> +replay_file_name(const char *name, int seqno)
>> +{
>> +    char *local_name = xstrdup(name);
>> +    char *filename, *p, *c;
>> +    bool skip = false;
>> +
>> +    /* Replace all the numbers and special symbols with single underscore.
>> +     * Numbers might be PIDs or port numbers that could change between record
>> +     * and replay phases, special symbols might be not good as a filename.
>> +     * We have a unique seuqence number as part of the name, so we don't care
>> +     * keeping too much information. */
>> +    for (c = p = local_name; *p; p++) {
>> +         if (!isalpha((unsigned char) *p)) {
>> +             if (!skip) {
>> +                *c++ = '_';
>> +                skip = true;
>> +             }
>> +         } else {
>> +             *c++ = *p;
>> +             skip = false;
>> +         }
>> +    }
>> +    if (skip) {
>> +        c--;
>> +    }
>> +    *c = '\0';
>> +    filename = xasprintf("replay_%s_%d", local_name, seqno);
>> +    VLOG_DBG("Constructing replay filename: '%s' --> '%s' --> '%s'.",
>> +             name, local_name, filename);
>> +    free(local_name);
>> +
>> +    return filename;
>> +}
>> +
>> +/* In write mode creates a new replay file to write stream replay.
>> + * In read mode opens an existing replay file. */
>> +static int
>> +replay_file_open(const char *name, FILE **f, int *seqno)
>> +    OVS_REQUIRES(replay_mutex)
>> +{
>> +    char *file_path, *filename;
>> +    int state = stream_replay_get_state();
>> +
>> +    ovs_assert(state != STREAM_REPLAY_NONE);
>> +
>> +    filename = replay_file_name(name, replay_seqno);
>> +    file_path = abs_file_name(ovs_rundir(), filename);
>> +    free(filename);
>> +
>> +    *f = fopen(file_path, state == STREAM_REPLAY_WRITE ? "wb" : "rb");
>> +    if (!*f) {
>> +        VLOG_ERR("%s: fopen failed: %s", file_path, ovs_strerror(errno));
>> +        free(file_path);
>> +        return errno;
>> +    }
>> +    free(file_path);
>> +
>> +    if (state == STREAM_REPLAY_READ
>> +        && fread(seqno, sizeof *seqno, 1, *f) != 1) {
>> +        VLOG_INFO("%s: failed to read seqno: stream might be empty.", name);
>> +        *seqno = INT_MAX;
>> +    }
>> +    replay_seqno++;  /* New file opened. */
>> +    return 0;
>> +}
>> +
>> +static int
>> +replay_write(FILE *f, const void *buffer, int n, bool is_read)
>> +{
>> +    int state = stream_replay_get_state();
>> +    int seqno_to_write;
>> +    int retval = 0;
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return 0;
>> +    }
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +
>> +    seqno_to_write = is_read ? replay_seqno : -replay_seqno;
>> +    if (fwrite(&seqno_to_write, sizeof seqno_to_write, 1, f) != 1) {
>> +        VLOG_ERR_RL(&rl, "Failed to write seqno.");
>> +        retval = -1;
>> +        goto out;
>> +    }
>> +    if (fwrite(&n, sizeof n, 1, f) != 1) {
>> +        VLOG_ERR_RL(&rl, "Failed to write length.");
>> +        retval = -1;
>> +        goto out;
>> +    }
>> +    if (n > 0 && is_read && fwrite(buffer, 1, n, f) != n) {
>> +        VLOG_ERR_RL(&rl, "Failed to write data.");
>> +        retval = -1;
>> +    }
>> +out:
>> +    replay_seqno++; /* Write completed. */
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return retval;
>> +}
>> +
>> +static int
>> +replay_read(FILE *f, void *buffer, int buffer_size,
>> +            int *len, int *seqno, bool is_read)
>> +    OVS_REQUIRES(replay_mutex)
>> +{
>> +    int retval = EINVAL;
>> +
>> +    if (fread(len, sizeof *len, 1, f) != 1
>> +        || (is_read && *len > buffer_size)) {
>> +        VLOG_ERR("Failed to read replay length.");
>> +        goto out;
>> +    }
>> +
>> +    if (*len > 0 && is_read && fread(buffer, 1, *len, f) != *len) {
>> +        VLOG_ERR("Failed to read replay buffer.");
>> +        goto out;
>> +    }
>> +
>> +    if (fread(seqno, sizeof *seqno, 1, f) != 1) {
>> +        *seqno = INT_MAX;  /* Most likely EOF. */
>> +        if (ferror(f)) {
>> +            VLOG_INFO("Failed to read replay seqno.");
>> +            goto out;
>> +        }
>> +    }
>> +
>> +    retval = 0;
>> +out:
>> +    replay_seqno++;  /* Read completed. */
>> +    return retval;
>> +}
>> +
>> +
>> +/* Active replay stream. */
>> +
>> +struct stream_replay
>> +{
>> +    struct stream stream;
>> +    FILE *f;
>> +    int seqno;
>> +};
>> +
>> +const struct stream_class replay_stream_class;
>> +
>> +static inline bool
>> +seqno_is_read(int seqno)
>> +{
>> +    return seqno >= 0;
>> +}
>> +
>> +static inline int
>> +normalized_seqno(int seqno)
>> +{
>> +    return seqno >= 0 ? seqno : -seqno;
>> +}
>> +
>> +
>> +/* Creates a new stream named 'name' that will emulate sending and receiving
>> + * data using replay file and stores a pointer to the stream in '*streamp'.
>> + *
>> + * Takes ownership of 'name'.
>> + *
>> + * Returns 0 if successful, otherwise a positive errno value. */
>> +static int
>> +new_replay_stream(char *name, struct stream **streamp)
>> +    OVS_REQUIRES(replay_mutex)
>> +{
>> +    struct stream_replay *s;
>> +    int seqno = 0, error;
>> +    FILE *f;
>> +
>> +    error = replay_file_open(name, &f, &seqno);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to open stream.", name);
>> +        return error;
>> +    }
>> +
>> +    s = xmalloc(sizeof *s);
>> +    stream_init(&s->stream, &replay_stream_class, 0, name);
>> +    s->f = f;
>> +    s->seqno = seqno;
>> +    *streamp = &s->stream;
>> +    return 0;
>> +}
>> +
>> +static struct stream_replay *
>> +stream_replay_cast(struct stream *stream)
>> +{
>> +    stream_assert_class(stream, &replay_stream_class);
>> +    return CONTAINER_OF(stream, struct stream_replay, stream);
>> +}
>> +
>> +void
>> +stream_replay_open_wfd(struct stream *s)
>> +{
>> +    FILE *f;
>> +    int state = stream_replay_get_state();
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    if (!replay_file_open(s->name, &f, NULL)) {
>> +        s->replay_wfd = f;
>> +    }
>> +    ovs_mutex_unlock(&replay_mutex);
>> +}
>> +
>> +void
>> +stream_replay_write(struct stream *s, const void *buffer, int n, bool is_read)
>> +{
>> +    int state = stream_replay_get_state();
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +    if (replay_write(s->replay_wfd, buffer, n, is_read)) {
>> +        VLOG_ERR("%s: failed to write buffer.", s->name);
>> +    }
>> +}
>> +
>> +void
>> +stream_replay_close_wfd(struct stream *s)
>> +{
>> +    if (s->replay_wfd) {
>> +        fclose(s->replay_wfd);
>> +    }
>> +}
>> +
>> +static int
>> +replay_open(const char *name, char *suffix OVS_UNUSED, struct stream **streamp,
>> +          uint8_t dscp OVS_UNUSED)
>> +{
>> +    int retval;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    retval = new_replay_stream(xstrdup(name), streamp);
>> +    ovs_mutex_unlock(&replay_mutex);
>> +
>> +    return retval;
>> +}
>> +
>> +static void
>> +replay_close(struct stream *stream)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    fclose(s->f);
>> +    free(s);
>> +}
>> +
>> +static ssize_t
>> +replay_recv(struct stream *stream, void *buffer, size_t n)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    int norm_seqno = normalized_seqno(s->seqno);
>> +    int error, len;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    ovs_assert(norm_seqno >= replay_seqno);
>> +
>> +    if (norm_seqno != replay_seqno || !seqno_is_read(s->seqno)) {
>> +        error = EAGAIN;
>> +        goto unlock;
>> +    }
>> +
>> +    error = replay_read(s->f, buffer, n, &len, &s->seqno, true);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to read from replay file.", stream->name);
>> +        goto unlock;
>> +    }
>> +
>> +unlock:
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return error ? -error : len;
>> +}
>> +
>> +static ssize_t
>> +replay_send(struct stream *stream OVS_UNUSED, const void *buffer OVS_UNUSED,
>> +            size_t n)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    int norm_seqno = normalized_seqno(s->seqno);
>> +    int error, len;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    ovs_assert(norm_seqno >= replay_seqno);
>> +
>> +    if (norm_seqno != replay_seqno || seqno_is_read(s->seqno)) {
>> +        error = EAGAIN;
>> +        goto unlock;
>> +    }
>> +
>> +    error = replay_read(s->f, NULL, 0, &len, &s->seqno, false);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to read from replay file.", stream->name);
>> +        goto unlock;
>> +    }
>> +    ovs_assert(len < 0 || len <= n);
>> +
>> +unlock:
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return error ? -error : len;
>> +}
>> +
>> +static void
>> +replay_wait(struct stream *stream, enum stream_wait_type wait)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    switch (wait) {
>> +    case STREAM_CONNECT:
>> +        /* Connect does nothing and always avaialable. */
>> +        poll_immediate_wake();
>> +        break;
>> +
>> +    case STREAM_SEND:
>> +        if (s->seqno != INT_MAX && !seqno_is_read(s->seqno)) {
>> +            /* Stream waits for write. */
>> +            poll_immediate_wake();
>> +        }
>> +        break;
>> +
>> +    case STREAM_RECV:
>> +        if (s->seqno != INT_MAX && seqno_is_read(s->seqno)) {
>> +            /* We still have something to read. */
>> +            poll_immediate_wake();
>> +        }
>> +        break;
>> +
>> +    default:
>> +        OVS_NOT_REACHED();
>> +    }
>> +}
>> +
>> +const struct stream_class replay_stream_class = {
>> +    "replay",                   /* name */
>> +    false,                      /* needs_probes */
>> +    replay_open,                /* open */
>> +    replay_close,               /* close */
>> +    NULL,                       /* connect */
>> +    replay_recv,                /* recv */
>> +    replay_send,                /* send */
>> +    NULL,                       /* run */
>> +    NULL,                       /* run_wait */
>> +    replay_wait,                /* wait */
>> +};
>> +
>> +/* Passive file descriptor stream. */
>> +
>> +struct replay_pstream
>> +{
>> +    struct pstream pstream;
>> +    FILE *f;
>> +    int seqno;
>> +};
>> +
>> +const struct pstream_class preplay_pstream_class;
>> +
>> +static struct replay_pstream *
>> +replay_pstream_cast(struct pstream *pstream)
>> +{
>> +    pstream_assert_class(pstream, &preplay_pstream_class);
>> +    return CONTAINER_OF(pstream, struct replay_pstream, pstream);
>> +}
>> +
>> +/* Creates a new pstream named 'name' that will accept new replay connections
>> + * reading them from the replay file and stores a pointer to the stream in
>> + * '*pstreamp'.
>> + *
>> + * Takes ownership of 'name'.
>> + *
>> + * Returns 0 if successful, otherwise a positive errno value. */
>> +static int
>> +preplay_listen(const char *name, char *suffix OVS_UNUSED,
>> +               struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
>> +{
>> +    int seqno = 0, error;
>> +    FILE *f;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    error = replay_file_open(name, &f, &seqno);
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to open pstream.", name);
>> +        return error;
>> +    }
>> +
>> +    struct replay_pstream *ps = xmalloc(sizeof *ps);
>> +    pstream_init(&ps->pstream, &preplay_pstream_class, xstrdup(name));
>> +    ps->f = f;
>> +    ps->seqno = seqno;
>> +    *pstreamp = &ps->pstream;
>> +    return 0;
>> +}
>> +
>> +void
>> +pstream_replay_open_wfd(struct pstream *ps)
>> +{
>> +    FILE *f;
>> +    int state = stream_replay_get_state();
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    if (!replay_file_open(ps->name, &f, NULL)) {
>> +        ps->replay_wfd = f;
>> +    }
>> +    ovs_mutex_unlock(&replay_mutex);
>> +}
>> +
>> +
>> +void
>> +pstream_replay_write_accept(struct pstream *ps, const struct stream *s)
>> +{
>> +    int state = stream_replay_get_state();
>> +    int len;
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +
>> +    len = strlen(s->name);
>> +    if (replay_write(ps->replay_wfd, s->name, len, true)) {
>> +        VLOG_ERR("%s: failed to write accept name: %s", ps->name, s->name);
>> +    }
>> +}
>> +
>> +void
>> +pstream_replay_close_wfd(struct pstream *ps)
>> +{
>> +    if (ps->replay_wfd) {
>> +        fclose(ps->replay_wfd);
>> +    }
>> +}
>> +
>> +
>> +static void
>> +preplay_close(struct pstream *pstream)
>> +{
>> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
>> +
>> +    fclose(ps->f);
>> +    free(ps);
>> +}
>> +
>> +#define MAX_NAME_LEN 65536
>> +
>> +static int
>> +preplay_accept(struct pstream *pstream, struct stream **new_streamp)
>> +{
>> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
>> +    int norm_seqno = normalized_seqno(ps->seqno);
>> +    int retval, len;
>> +    char name[MAX_NAME_LEN];
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    ovs_assert(norm_seqno >= replay_seqno);
>> +
>> +    if (norm_seqno != replay_seqno || !seqno_is_read(ps->seqno)) {
>> +        retval = EAGAIN;
>> +        goto unlock;
>> +    }
>> +
>> +    retval = replay_read(ps->f, name, MAX_NAME_LEN - 1,
>> +                         &len, &ps->seqno, true);
>> +    if (retval) {
>> +        VLOG_ERR("%s: failed to read from replay file.", pstream->name);
>> +        goto unlock;
>> +    }
>> +
>> +    if (len > 0) {
>> +        name[len] = 0;
>> +        retval = new_replay_stream(xstrdup(name), new_streamp);
>> +    } else {
>> +        retval = len;
>> +    }
>> +unlock:
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return retval;
>> +}
>> +
>> +static void
>> +preplay_wait(struct pstream *pstream)
>> +{
>> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
>> +
>> +    if (ps->seqno != INT_MAX) {
>> +        /* Replay always has somthing to say. */
>> +        poll_immediate_wake();
>> +    }
>> +}
>> +
>> +const struct pstream_class preplay_pstream_class = {
>> +    "preplay",
>> +    false,
>> +    preplay_listen,
>> +    preplay_close,
>> +    preplay_accept,
>> +    preplay_wait,
>> +};
>> diff --git a/lib/stream.c b/lib/stream.c
>> index e246b3773..2ee392462 100644
>> --- a/lib/stream.c
>> +++ b/lib/stream.c
>> @@ -117,7 +117,7 @@ check_stream_classes(void)
>>   * connection methods supported by the stream. */
>>  void
>>  stream_usage(const char *name, bool active, bool passive,
>> -             bool bootstrap OVS_UNUSED)
>> +             bool bootstrap OVS_UNUSED, bool replay)
> 
> This is a non backwards compatible change that will break OVN
> compilation. Is there a way we can avoid that? I guess if this series
> gets applied, a follow up OVN patch is needed immediately to fix the
> stream_usage() callers.

Yeah, I know.  One option to avoid that is to write a separate function
just for replay stuff.  Or rename this one and make 2 wrappers on top.
One with a new argument and one like it is now.

> 
>>  {
>>      /* Really this should be implemented via callbacks into the stream
>>       * providers, but that seems too heavy-weight to bother with at the
>> @@ -161,6 +161,11 @@ stream_usage(const char *name, bool active, bool passive,
>>             "  --ssl-protocols=PROTOS  list of SSL protocols to enable\n"
>>             "  --ssl-ciphers=CIPHERS   list of SSL ciphers to enable\n");
>>  #endif
>> +    if (replay) {
>> +        printf("Replay options:\n"
>> +               "  --stream-replay-record turn on writing replay files\n"
>> +               "  --stream-replay        run connections from replay files\n");
>> +    }
>>  }
>>  
>>  /* Given 'name', a stream name in the form "TYPE:ARGS", stores the class
>> @@ -185,6 +190,9 @@ stream_lookup_class(const char *name, const struct stream_class **classp)
>>          if (strlen(class->name) == prefix_len
>>              && !memcmp(class->name, name, prefix_len)) {
>>              *classp = class;
>> +            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
>> +                *classp = &replay_stream_class;
>> +            }
>>              return 0;
>>          }
>>      }
>> @@ -295,6 +303,7 @@ stream_close(struct stream *stream)
>>      if (stream != NULL) {
>>          char *name = stream->name;
>>          char *peer_id = stream->peer_id;
>> +        stream_replay_close_wfd(stream);
>>          (stream->class->close)(stream);
>>          free(name);
>>          free(peer_id);
>> @@ -367,9 +376,13 @@ int
>>  stream_recv(struct stream *stream, void *buffer, size_t n)
>>  {
>>      int retval = stream_connect(stream);
>> -    return (retval ? -retval
>> -            : n == 0 ? 0
>> -            : (stream->class->recv)(stream, buffer, n));
>> +
>> +    retval = retval ? -retval
>> +             : n == 0 ? 0
>> +             : (stream->class->recv)(stream, buffer, n);
>> +
>> +    stream_replay_write(stream, buffer, retval, true);
>> +    return retval;
>>  }
>>  
>>  /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
>> @@ -385,9 +398,12 @@ int
>>  stream_send(struct stream *stream, const void *buffer, size_t n)
>>  {
>>      int retval = stream_connect(stream);
>> -    return (retval ? -retval
>> -            : n == 0 ? 0
>> -            : (stream->class->send)(stream, buffer, n));
>> +    retval = retval ? -retval
>> +             : n == 0 ? 0
>> +             : (stream->class->send)(stream, buffer, n);
>> +
>> +    stream_replay_write(stream, buffer, retval, false);
>> +    return retval;
>>  }
>>  
>>  /* Allows 'stream' to perform maintenance activities, such as flushing
>> @@ -483,6 +499,9 @@ pstream_lookup_class(const char *name, const struct pstream_class **classp)
>>          if (strlen(class->name) == prefix_len
>>              && !memcmp(class->name, name, prefix_len)) {
>>              *classp = class;
>> +            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
>> +                *classp = &preplay_pstream_class;
>> +            }
>>              return 0;
>>          }
>>      }
>> @@ -548,6 +567,8 @@ pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
>>          goto error;
>>      }
>>  
>> +    pstream_replay_open_wfd(pstream);
>> +
> 
> If we fail listening on pstream (e.g.,
> "ovsdb_jsonrpc_server|ERR|ptcp:6641:172.17.1.78: listen failed: Cannot
> assign requested address") we won't reach this point. Later on, when
> trying to replay the recorded execution we'll get an error like:
> 
> stream_replay|ERR|/var/run/openvswitch/replay_ptcp_83: fopen failed: No
> such file or directory
> stream_replay|ERR|ptcp:6641:172.17.1.78: failed to open pstream.
> 
> I was a bit confused at first when trying the replay and I was wondering
> if I was using the functionality properly. I wonder if we can improve
> this somehow, and if it is possible to maybe record that listen failed.

Hmm.  I never cathed a listen failure, but yes, this event should be
recorded and replayed.

> 
>>      /* Success. */
>>      *pstreamp = pstream;
>>      return 0;
>> @@ -571,6 +592,7 @@ pstream_close(struct pstream *pstream)
>>  {
>>      if (pstream != NULL) {
>>          char *name = pstream->name;
>> +        pstream_replay_close_wfd(pstream);
>>          (pstream->class->close)(pstream);
>>          free(name);
>>      }
>> @@ -591,6 +613,8 @@ pstream_accept(struct pstream *pstream, struct stream **new_stream)
>>      } else {
>>          ovs_assert((*new_stream)->state != SCS_CONNECTING
>>                     || (*new_stream)->class->connect);
>> +        pstream_replay_write_accept(pstream, *new_stream);
>> +        stream_replay_open_wfd(*new_stream);
>>      }
>>      return retval;
>>  }
>> diff --git a/lib/stream.h b/lib/stream.h
>> index 77bffa498..40357137e 100644
>> --- a/lib/stream.h
>> +++ b/lib/stream.h
>> @@ -29,7 +29,8 @@ struct pstream;
>>  struct stream;
>>  struct vlog_module;
>>  
>> -void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
>> +void stream_usage(const char *name, bool active, bool passive,
>> +                  bool bootstrap, bool replay);
>>  
>>  /* Bidirectional byte streams. */
>>  int stream_verify_name(const char *name);
>> @@ -94,4 +95,41 @@ enum stream_content_type {
>>  void stream_report_content(const void *, ssize_t, enum stream_content_type,
>>                             struct vlog_module *, const char *stream_name);
>>  
>> +
>> +/* Replay state. */
>> +enum stream_replay_state {
>> +    STREAM_REPLAY_NONE,
>> +    STREAM_REPLAY_WRITE,
>> +    STREAM_REPLAY_READ,
>> +};
>> +
>> +void stream_replay_set_state(enum stream_replay_state);
>> +enum stream_replay_state stream_replay_get_state(void);
>> +void stream_replay_open_wfd(struct stream *);
>> +void pstream_replay_open_wfd(struct pstream *);
>> +void stream_replay_close_wfd(struct stream *);
>> +void pstream_replay_close_wfd(struct pstream *);
>> +void stream_replay_write(struct stream *, const void *, int, bool is_read);
>> +void pstream_replay_write_accept(struct pstream *, const struct stream *);
>> +
>> +#define STREAM_REPLAY_OPTION_ENUMS  \
>> +        OPT_STREAM_REPLAY_REC,      \
>> +        OPT_STREAM_REPLAY
>> +
>> +#define STREAM_REPLAY_LONG_OPTIONS                                           \
>> +        {"stream-replay-record", no_argument, NULL, OPT_STREAM_REPLAY_REC},  \
>> +        {"stream-replay",        no_argument, NULL, OPT_STREAM_REPLAY}
>> +
>> +#define STREAM_REPLAY_OPTION_HANDLERS                       \
>> +        case OPT_STREAM_REPLAY_REC:                         \
>> +            stream_replay_set_state(STREAM_REPLAY_WRITE);   \
> 
> I'm not sure exactly where but I think it might be useful to store the
> full command line that was use to start the process when recording was
> requested.
> 
> Maybe also automatically create a snapshot of the DB file in case the
> user forgets to do that manually.
> 
> It's not always easy to figure out what the command line is when
> ovsdb-server is started from a script, e.g., ovn-ctl starting
> ovsdb-server for NB/SB.

That is definitely useful.  For my testing I modified ovn scale test
framework to dump cmdlines and DB, so they could be used later on replay.
But yes, I think, it might be good to actually integrate and dump
right from the ovsdb process.

> 
> Thanks,
> Dumitru
> 
>> +            break;                                          \
>> +                                                            \
>> +        case OPT_STREAM_REPLAY:                             \
>> +            stream_replay_set_state(STREAM_REPLAY_READ);    \
>> +            break;
>> +
>> +#define STREAM_REPLAY_CASES \
>> +        case OPT_STREAM_REPLAY_REC: case OPT_STREAM_REPLAY:
>> +
>>  #endif /* stream.h */
>> diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
>> index 72756eb1f..1925f5213 100644
>> --- a/ovsdb/ovsdb-client.c
>> +++ b/ovsdb/ovsdb-client.c
>> @@ -462,7 +462,7 @@ usage(void)
>>             "\nThe default SERVER is unix:%s/db.sock.\n"
>>             "The default DATABASE is Open_vSwitch.\n",
>>             program_name, program_name, ovs_rundir());
>> -    stream_usage("SERVER", true, true, true);
>> +    stream_usage("SERVER", true, true, true, false);
>>      table_usage();
>>      printf("  --timestamp                 timestamp \"monitor\" output");
>>      daemon_usage();
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index 42ba0053b..3af09c8c1 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -1868,7 +1868,7 @@ usage(void)
>>             program_name, program_name, ovs_dbdir());
>>      printf("\nJSON-RPC options (may be specified any number of times):\n"
>>             "  --remote=REMOTE         connect or listen to REMOTE\n");
>> -    stream_usage("JSON-RPC", true, true, true);
>> +    stream_usage("JSON-RPC", true, true, true, false);
>>      daemon_usage();
>>      vlog_usage();
>>      replication_usage();
>> diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
>> index 04e941b14..fa9c90b95 100644
>> --- a/tests/test-jsonrpc.c
>> +++ b/tests/test-jsonrpc.c
>> @@ -109,7 +109,7 @@ usage(void)
>>             "  request REMOTE METHOD PARAMS   send request, print reply\n"
>>             "  notify REMOTE METHOD PARAMS  send notification and exit\n",
>>             program_name, program_name);
>> -    stream_usage("JSON-RPC", true, true, true);
>> +    stream_usage("JSON-RPC", true, true, true, false);
>>      daemon_usage();
>>      vlog_usage();
>>      printf("\nOther options:\n"
>> diff --git a/utilities/ovs-vsctl.c b/utilities/ovs-vsctl.c
>> index 37cc72d40..e7dc50ed5 100644
>> --- a/utilities/ovs-vsctl.c
>> +++ b/utilities/ovs-vsctl.c
>> @@ -458,7 +458,7 @@ Options:\n\
>>      vlog_usage();
>>      printf("\
>>    --no-syslog             equivalent to --verbose=vsctl:syslog:warn\n");
>> -    stream_usage("database", true, true, true);
>> +    stream_usage("database", true, true, true, false);
>>      printf("\n\
>>  Other options:\n\
>>    -h, --help                  display this help message\n\
>> diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
>> index 1e72b628b..e61a1c3b7 100644
>> --- a/vswitchd/ovs-vswitchd.c
>> +++ b/vswitchd/ovs-vswitchd.c
>> @@ -276,7 +276,7 @@ usage(void)
>>             "where DATABASE is a socket on which ovsdb-server is listening\n"
>>             "      (default: \"unix:%s/db.sock\").\n",
>>             program_name, program_name, ovs_rundir());
>> -    stream_usage("DATABASE", true, false, true);
>> +    stream_usage("DATABASE", true, false, true, false);
>>      daemon_usage();
>>      vlog_usage();
>>      printf("\nDPDK options:\n"
>> diff --git a/vtep/vtep-ctl.c b/vtep/vtep-ctl.c
>> index ab552457d..09d407ac9 100644
>> --- a/vtep/vtep-ctl.c
>> +++ b/vtep/vtep-ctl.c
>> @@ -373,7 +373,7 @@ Options:\n\
>>      vlog_usage();
>>      printf("\
>>    --no-syslog                 equivalent to --verbose=vtep_ctl:syslog:warn\n");
>> -    stream_usage("database", true, true, false);
>> +    stream_usage("database", true, true, false, false);
>>      printf("\n\
>>  Other options:\n\
>>    -h, --help                  display this help message\n\
>>
>
diff mbox series

Patch

diff --git a/lib/automake.mk b/lib/automake.mk
index 86940ccd2..bbe7d04f1 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -281,6 +281,7 @@  lib_libopenvswitch_la_SOURCES = \
 	lib/stream-fd.c \
 	lib/stream-fd.h \
 	lib/stream-provider.h \
+	lib/stream-replay.c \
 	lib/stream-ssl.h \
 	lib/stream-tcp.c \
 	lib/stream.c \
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 75f4f059b..0ce4f6f4c 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -29,6 +29,7 @@  struct stream {
     const struct stream_class *class;
     int state;
     int error;
+    FILE *replay_wfd;
     char *name;
     char *peer_id;
 };
@@ -133,6 +134,7 @@  struct pstream {
     const struct pstream_class *class;
     char *name;
     ovs_be16 bound_port;
+    FILE *replay_wfd;
 };
 
 void pstream_init(struct pstream *, const struct pstream_class *, char *name);
@@ -200,5 +202,7 @@  extern const struct pstream_class pwindows_pstream_class;
 extern const struct stream_class ssl_stream_class;
 extern const struct pstream_class pssl_pstream_class;
 #endif
+extern const struct stream_class replay_stream_class;
+extern const struct pstream_class preplay_pstream_class;
 
 #endif /* stream-provider.h */
diff --git a/lib/stream-replay.c b/lib/stream-replay.c
new file mode 100644
index 000000000..16486f94e
--- /dev/null
+++ b/lib/stream-replay.c
@@ -0,0 +1,561 @@ 
+/*
+ * Copyright (c) 2020, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <ctype.h>
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include "dirs.h"
+#include "ovs-atomic.h"
+#include "util.h"
+#include "stream-provider.h"
+#include "stream.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(stream_replay);
+
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
+
+/* Stream replay. */
+
+static struct ovs_mutex replay_mutex = OVS_MUTEX_INITIALIZER;
+static int replay_seqno OVS_GUARDED_BY(replay_mutex) = 0;
+static atomic_int replay_state = ATOMIC_VAR_INIT(STREAM_REPLAY_NONE);
+
+void
+stream_replay_set_state(enum stream_replay_state state)
+{
+    atomic_store_relaxed(&replay_state, state);
+}
+
+enum stream_replay_state
+stream_replay_get_state(void)
+{
+    int state;
+
+    atomic_read_relaxed(&replay_state, &state);
+    return state;
+}
+
+static char *
+replay_file_name(const char *name, int seqno)
+{
+    char *local_name = xstrdup(name);
+    char *filename, *p, *c;
+    bool skip = false;
+
+    /* Replace all the numbers and special symbols with single underscore.
+     * Numbers might be PIDs or port numbers that could change between record
+     * and replay phases, special symbols might be not good as a filename.
+     * We have a unique seuqence number as part of the name, so we don't care
+     * keeping too much information. */
+    for (c = p = local_name; *p; p++) {
+         if (!isalpha((unsigned char) *p)) {
+             if (!skip) {
+                *c++ = '_';
+                skip = true;
+             }
+         } else {
+             *c++ = *p;
+             skip = false;
+         }
+    }
+    if (skip) {
+        c--;
+    }
+    *c = '\0';
+    filename = xasprintf("replay_%s_%d", local_name, seqno);
+    VLOG_DBG("Constructing replay filename: '%s' --> '%s' --> '%s'.",
+             name, local_name, filename);
+    free(local_name);
+
+    return filename;
+}
+
+/* In write mode creates a new replay file to write stream replay.
+ * In read mode opens an existing replay file. */
+static int
+replay_file_open(const char *name, FILE **f, int *seqno)
+    OVS_REQUIRES(replay_mutex)
+{
+    char *file_path, *filename;
+    int state = stream_replay_get_state();
+
+    ovs_assert(state != STREAM_REPLAY_NONE);
+
+    filename = replay_file_name(name, replay_seqno);
+    file_path = abs_file_name(ovs_rundir(), filename);
+    free(filename);
+
+    *f = fopen(file_path, state == STREAM_REPLAY_WRITE ? "wb" : "rb");
+    if (!*f) {
+        VLOG_ERR("%s: fopen failed: %s", file_path, ovs_strerror(errno));
+        free(file_path);
+        return errno;
+    }
+    free(file_path);
+
+    if (state == STREAM_REPLAY_READ
+        && fread(seqno, sizeof *seqno, 1, *f) != 1) {
+        VLOG_INFO("%s: failed to read seqno: stream might be empty.", name);
+        *seqno = INT_MAX;
+    }
+    replay_seqno++;  /* New file opened. */
+    return 0;
+}
+
+static int
+replay_write(FILE *f, const void *buffer, int n, bool is_read)
+{
+    int state = stream_replay_get_state();
+    int seqno_to_write;
+    int retval = 0;
+
+    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
+        return 0;
+    }
+
+    ovs_mutex_lock(&replay_mutex);
+
+    seqno_to_write = is_read ? replay_seqno : -replay_seqno;
+    if (fwrite(&seqno_to_write, sizeof seqno_to_write, 1, f) != 1) {
+        VLOG_ERR_RL(&rl, "Failed to write seqno.");
+        retval = -1;
+        goto out;
+    }
+    if (fwrite(&n, sizeof n, 1, f) != 1) {
+        VLOG_ERR_RL(&rl, "Failed to write length.");
+        retval = -1;
+        goto out;
+    }
+    if (n > 0 && is_read && fwrite(buffer, 1, n, f) != n) {
+        VLOG_ERR_RL(&rl, "Failed to write data.");
+        retval = -1;
+    }
+out:
+    replay_seqno++; /* Write completed. */
+    ovs_mutex_unlock(&replay_mutex);
+    return retval;
+}
+
+static int
+replay_read(FILE *f, void *buffer, int buffer_size,
+            int *len, int *seqno, bool is_read)
+    OVS_REQUIRES(replay_mutex)
+{
+    int retval = EINVAL;
+
+    if (fread(len, sizeof *len, 1, f) != 1
+        || (is_read && *len > buffer_size)) {
+        VLOG_ERR("Failed to read replay length.");
+        goto out;
+    }
+
+    if (*len > 0 && is_read && fread(buffer, 1, *len, f) != *len) {
+        VLOG_ERR("Failed to read replay buffer.");
+        goto out;
+    }
+
+    if (fread(seqno, sizeof *seqno, 1, f) != 1) {
+        *seqno = INT_MAX;  /* Most likely EOF. */
+        if (ferror(f)) {
+            VLOG_INFO("Failed to read replay seqno.");
+            goto out;
+        }
+    }
+
+    retval = 0;
+out:
+    replay_seqno++;  /* Read completed. */
+    return retval;
+}
+
+
+/* Active replay stream. */
+
+struct stream_replay
+{
+    struct stream stream;
+    FILE *f;
+    int seqno;
+};
+
+const struct stream_class replay_stream_class;
+
+static inline bool
+seqno_is_read(int seqno)
+{
+    return seqno >= 0;
+}
+
+static inline int
+normalized_seqno(int seqno)
+{
+    return seqno >= 0 ? seqno : -seqno;
+}
+
+
+/* Creates a new stream named 'name' that will emulate sending and receiving
+ * data using replay file and stores a pointer to the stream in '*streamp'.
+ *
+ * Takes ownership of 'name'.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. */
+static int
+new_replay_stream(char *name, struct stream **streamp)
+    OVS_REQUIRES(replay_mutex)
+{
+    struct stream_replay *s;
+    int seqno = 0, error;
+    FILE *f;
+
+    error = replay_file_open(name, &f, &seqno);
+    if (error) {
+        VLOG_ERR("%s: failed to open stream.", name);
+        return error;
+    }
+
+    s = xmalloc(sizeof *s);
+    stream_init(&s->stream, &replay_stream_class, 0, name);
+    s->f = f;
+    s->seqno = seqno;
+    *streamp = &s->stream;
+    return 0;
+}
+
+static struct stream_replay *
+stream_replay_cast(struct stream *stream)
+{
+    stream_assert_class(stream, &replay_stream_class);
+    return CONTAINER_OF(stream, struct stream_replay, stream);
+}
+
+void
+stream_replay_open_wfd(struct stream *s)
+{
+    FILE *f;
+    int state = stream_replay_get_state();
+
+    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
+        return;
+    }
+
+    ovs_mutex_lock(&replay_mutex);
+    if (!replay_file_open(s->name, &f, NULL)) {
+        s->replay_wfd = f;
+    }
+    ovs_mutex_unlock(&replay_mutex);
+}
+
+void
+stream_replay_write(struct stream *s, const void *buffer, int n, bool is_read)
+{
+    int state = stream_replay_get_state();
+
+    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
+        return;
+    }
+    if (replay_write(s->replay_wfd, buffer, n, is_read)) {
+        VLOG_ERR("%s: failed to write buffer.", s->name);
+    }
+}
+
+void
+stream_replay_close_wfd(struct stream *s)
+{
+    if (s->replay_wfd) {
+        fclose(s->replay_wfd);
+    }
+}
+
+static int
+replay_open(const char *name, char *suffix OVS_UNUSED, struct stream **streamp,
+          uint8_t dscp OVS_UNUSED)
+{
+    int retval;
+
+    ovs_mutex_lock(&replay_mutex);
+    retval = new_replay_stream(xstrdup(name), streamp);
+    ovs_mutex_unlock(&replay_mutex);
+
+    return retval;
+}
+
+static void
+replay_close(struct stream *stream)
+{
+    struct stream_replay *s = stream_replay_cast(stream);
+    fclose(s->f);
+    free(s);
+}
+
+static ssize_t
+replay_recv(struct stream *stream, void *buffer, size_t n)
+{
+    struct stream_replay *s = stream_replay_cast(stream);
+    int norm_seqno = normalized_seqno(s->seqno);
+    int error, len;
+
+    ovs_mutex_lock(&replay_mutex);
+    ovs_assert(norm_seqno >= replay_seqno);
+
+    if (norm_seqno != replay_seqno || !seqno_is_read(s->seqno)) {
+        error = EAGAIN;
+        goto unlock;
+    }
+
+    error = replay_read(s->f, buffer, n, &len, &s->seqno, true);
+    if (error) {
+        VLOG_ERR("%s: failed to read from replay file.", stream->name);
+        goto unlock;
+    }
+
+unlock:
+    ovs_mutex_unlock(&replay_mutex);
+    return error ? -error : len;
+}
+
+static ssize_t
+replay_send(struct stream *stream OVS_UNUSED, const void *buffer OVS_UNUSED,
+            size_t n)
+{
+    struct stream_replay *s = stream_replay_cast(stream);
+    int norm_seqno = normalized_seqno(s->seqno);
+    int error, len;
+
+    ovs_mutex_lock(&replay_mutex);
+    ovs_assert(norm_seqno >= replay_seqno);
+
+    if (norm_seqno != replay_seqno || seqno_is_read(s->seqno)) {
+        error = EAGAIN;
+        goto unlock;
+    }
+
+    error = replay_read(s->f, NULL, 0, &len, &s->seqno, false);
+    if (error) {
+        VLOG_ERR("%s: failed to read from replay file.", stream->name);
+        goto unlock;
+    }
+    ovs_assert(len < 0 || len <= n);
+
+unlock:
+    ovs_mutex_unlock(&replay_mutex);
+    return error ? -error : len;
+}
+
+static void
+replay_wait(struct stream *stream, enum stream_wait_type wait)
+{
+    struct stream_replay *s = stream_replay_cast(stream);
+    switch (wait) {
+    case STREAM_CONNECT:
+        /* Connect does nothing and always avaialable. */
+        poll_immediate_wake();
+        break;
+
+    case STREAM_SEND:
+        if (s->seqno != INT_MAX && !seqno_is_read(s->seqno)) {
+            /* Stream waits for write. */
+            poll_immediate_wake();
+        }
+        break;
+
+    case STREAM_RECV:
+        if (s->seqno != INT_MAX && seqno_is_read(s->seqno)) {
+            /* We still have something to read. */
+            poll_immediate_wake();
+        }
+        break;
+
+    default:
+        OVS_NOT_REACHED();
+    }
+}
+
+const struct stream_class replay_stream_class = {
+    "replay",                   /* name */
+    false,                      /* needs_probes */
+    replay_open,                /* open */
+    replay_close,               /* close */
+    NULL,                       /* connect */
+    replay_recv,                /* recv */
+    replay_send,                /* send */
+    NULL,                       /* run */
+    NULL,                       /* run_wait */
+    replay_wait,                /* wait */
+};
+
+/* Passive file descriptor stream. */
+
+struct replay_pstream
+{
+    struct pstream pstream;
+    FILE *f;
+    int seqno;
+};
+
+const struct pstream_class preplay_pstream_class;
+
+static struct replay_pstream *
+replay_pstream_cast(struct pstream *pstream)
+{
+    pstream_assert_class(pstream, &preplay_pstream_class);
+    return CONTAINER_OF(pstream, struct replay_pstream, pstream);
+}
+
+/* Creates a new pstream named 'name' that will accept new replay connections
+ * reading them from the replay file and stores a pointer to the stream in
+ * '*pstreamp'.
+ *
+ * Takes ownership of 'name'.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. */
+static int
+preplay_listen(const char *name, char *suffix OVS_UNUSED,
+               struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
+{
+    int seqno = 0, error;
+    FILE *f;
+
+    ovs_mutex_lock(&replay_mutex);
+    error = replay_file_open(name, &f, &seqno);
+    ovs_mutex_unlock(&replay_mutex);
+    if (error) {
+        VLOG_ERR("%s: failed to open pstream.", name);
+        return error;
+    }
+
+    struct replay_pstream *ps = xmalloc(sizeof *ps);
+    pstream_init(&ps->pstream, &preplay_pstream_class, xstrdup(name));
+    ps->f = f;
+    ps->seqno = seqno;
+    *pstreamp = &ps->pstream;
+    return 0;
+}
+
+void
+pstream_replay_open_wfd(struct pstream *ps)
+{
+    FILE *f;
+    int state = stream_replay_get_state();
+
+    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
+        return;
+    }
+
+    ovs_mutex_lock(&replay_mutex);
+    if (!replay_file_open(ps->name, &f, NULL)) {
+        ps->replay_wfd = f;
+    }
+    ovs_mutex_unlock(&replay_mutex);
+}
+
+
+void
+pstream_replay_write_accept(struct pstream *ps, const struct stream *s)
+{
+    int state = stream_replay_get_state();
+    int len;
+
+    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
+        return;
+    }
+
+    len = strlen(s->name);
+    if (replay_write(ps->replay_wfd, s->name, len, true)) {
+        VLOG_ERR("%s: failed to write accept name: %s", ps->name, s->name);
+    }
+}
+
+void
+pstream_replay_close_wfd(struct pstream *ps)
+{
+    if (ps->replay_wfd) {
+        fclose(ps->replay_wfd);
+    }
+}
+
+
+static void
+preplay_close(struct pstream *pstream)
+{
+    struct replay_pstream *ps = replay_pstream_cast(pstream);
+
+    fclose(ps->f);
+    free(ps);
+}
+
+#define MAX_NAME_LEN 65536
+
+static int
+preplay_accept(struct pstream *pstream, struct stream **new_streamp)
+{
+    struct replay_pstream *ps = replay_pstream_cast(pstream);
+    int norm_seqno = normalized_seqno(ps->seqno);
+    int retval, len;
+    char name[MAX_NAME_LEN];
+
+    ovs_mutex_lock(&replay_mutex);
+    ovs_assert(norm_seqno >= replay_seqno);
+
+    if (norm_seqno != replay_seqno || !seqno_is_read(ps->seqno)) {
+        retval = EAGAIN;
+        goto unlock;
+    }
+
+    retval = replay_read(ps->f, name, MAX_NAME_LEN - 1,
+                         &len, &ps->seqno, true);
+    if (retval) {
+        VLOG_ERR("%s: failed to read from replay file.", pstream->name);
+        goto unlock;
+    }
+
+    if (len > 0) {
+        name[len] = 0;
+        retval = new_replay_stream(xstrdup(name), new_streamp);
+    } else {
+        retval = len;
+    }
+unlock:
+    ovs_mutex_unlock(&replay_mutex);
+    return retval;
+}
+
+static void
+preplay_wait(struct pstream *pstream)
+{
+    struct replay_pstream *ps = replay_pstream_cast(pstream);
+
+    if (ps->seqno != INT_MAX) {
+        /* Replay always has somthing to say. */
+        poll_immediate_wake();
+    }
+}
+
+const struct pstream_class preplay_pstream_class = {
+    "preplay",
+    false,
+    preplay_listen,
+    preplay_close,
+    preplay_accept,
+    preplay_wait,
+};
diff --git a/lib/stream.c b/lib/stream.c
index e246b3773..2ee392462 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -117,7 +117,7 @@  check_stream_classes(void)
  * connection methods supported by the stream. */
 void
 stream_usage(const char *name, bool active, bool passive,
-             bool bootstrap OVS_UNUSED)
+             bool bootstrap OVS_UNUSED, bool replay)
 {
     /* Really this should be implemented via callbacks into the stream
      * providers, but that seems too heavy-weight to bother with at the
@@ -161,6 +161,11 @@  stream_usage(const char *name, bool active, bool passive,
            "  --ssl-protocols=PROTOS  list of SSL protocols to enable\n"
            "  --ssl-ciphers=CIPHERS   list of SSL ciphers to enable\n");
 #endif
+    if (replay) {
+        printf("Replay options:\n"
+               "  --stream-replay-record turn on writing replay files\n"
+               "  --stream-replay        run connections from replay files\n");
+    }
 }
 
 /* Given 'name', a stream name in the form "TYPE:ARGS", stores the class
@@ -185,6 +190,9 @@  stream_lookup_class(const char *name, const struct stream_class **classp)
         if (strlen(class->name) == prefix_len
             && !memcmp(class->name, name, prefix_len)) {
             *classp = class;
+            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
+                *classp = &replay_stream_class;
+            }
             return 0;
         }
     }
@@ -295,6 +303,7 @@  stream_close(struct stream *stream)
     if (stream != NULL) {
         char *name = stream->name;
         char *peer_id = stream->peer_id;
+        stream_replay_close_wfd(stream);
         (stream->class->close)(stream);
         free(name);
         free(peer_id);
@@ -367,9 +376,13 @@  int
 stream_recv(struct stream *stream, void *buffer, size_t n)
 {
     int retval = stream_connect(stream);
-    return (retval ? -retval
-            : n == 0 ? 0
-            : (stream->class->recv)(stream, buffer, n));
+
+    retval = retval ? -retval
+             : n == 0 ? 0
+             : (stream->class->recv)(stream, buffer, n);
+
+    stream_replay_write(stream, buffer, retval, true);
+    return retval;
 }
 
 /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
@@ -385,9 +398,12 @@  int
 stream_send(struct stream *stream, const void *buffer, size_t n)
 {
     int retval = stream_connect(stream);
-    return (retval ? -retval
-            : n == 0 ? 0
-            : (stream->class->send)(stream, buffer, n));
+    retval = retval ? -retval
+             : n == 0 ? 0
+             : (stream->class->send)(stream, buffer, n);
+
+    stream_replay_write(stream, buffer, retval, false);
+    return retval;
 }
 
 /* Allows 'stream' to perform maintenance activities, such as flushing
@@ -483,6 +499,9 @@  pstream_lookup_class(const char *name, const struct pstream_class **classp)
         if (strlen(class->name) == prefix_len
             && !memcmp(class->name, name, prefix_len)) {
             *classp = class;
+            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
+                *classp = &preplay_pstream_class;
+            }
             return 0;
         }
     }
@@ -548,6 +567,8 @@  pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
         goto error;
     }
 
+    pstream_replay_open_wfd(pstream);
+
     /* Success. */
     *pstreamp = pstream;
     return 0;
@@ -571,6 +592,7 @@  pstream_close(struct pstream *pstream)
 {
     if (pstream != NULL) {
         char *name = pstream->name;
+        pstream_replay_close_wfd(pstream);
         (pstream->class->close)(pstream);
         free(name);
     }
@@ -591,6 +613,8 @@  pstream_accept(struct pstream *pstream, struct stream **new_stream)
     } else {
         ovs_assert((*new_stream)->state != SCS_CONNECTING
                    || (*new_stream)->class->connect);
+        pstream_replay_write_accept(pstream, *new_stream);
+        stream_replay_open_wfd(*new_stream);
     }
     return retval;
 }
diff --git a/lib/stream.h b/lib/stream.h
index 77bffa498..40357137e 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -29,7 +29,8 @@  struct pstream;
 struct stream;
 struct vlog_module;
 
-void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
+void stream_usage(const char *name, bool active, bool passive,
+                  bool bootstrap, bool replay);
 
 /* Bidirectional byte streams. */
 int stream_verify_name(const char *name);
@@ -94,4 +95,41 @@  enum stream_content_type {
 void stream_report_content(const void *, ssize_t, enum stream_content_type,
                            struct vlog_module *, const char *stream_name);
 
+
+/* Replay state. */
+enum stream_replay_state {
+    STREAM_REPLAY_NONE,
+    STREAM_REPLAY_WRITE,
+    STREAM_REPLAY_READ,
+};
+
+void stream_replay_set_state(enum stream_replay_state);
+enum stream_replay_state stream_replay_get_state(void);
+void stream_replay_open_wfd(struct stream *);
+void pstream_replay_open_wfd(struct pstream *);
+void stream_replay_close_wfd(struct stream *);
+void pstream_replay_close_wfd(struct pstream *);
+void stream_replay_write(struct stream *, const void *, int, bool is_read);
+void pstream_replay_write_accept(struct pstream *, const struct stream *);
+
+#define STREAM_REPLAY_OPTION_ENUMS  \
+        OPT_STREAM_REPLAY_REC,      \
+        OPT_STREAM_REPLAY
+
+#define STREAM_REPLAY_LONG_OPTIONS                                           \
+        {"stream-replay-record", no_argument, NULL, OPT_STREAM_REPLAY_REC},  \
+        {"stream-replay",        no_argument, NULL, OPT_STREAM_REPLAY}
+
+#define STREAM_REPLAY_OPTION_HANDLERS                       \
+        case OPT_STREAM_REPLAY_REC:                         \
+            stream_replay_set_state(STREAM_REPLAY_WRITE);   \
+            break;                                          \
+                                                            \
+        case OPT_STREAM_REPLAY:                             \
+            stream_replay_set_state(STREAM_REPLAY_READ);    \
+            break;
+
+#define STREAM_REPLAY_CASES \
+        case OPT_STREAM_REPLAY_REC: case OPT_STREAM_REPLAY:
+
 #endif /* stream.h */
diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
index 72756eb1f..1925f5213 100644
--- a/ovsdb/ovsdb-client.c
+++ b/ovsdb/ovsdb-client.c
@@ -462,7 +462,7 @@  usage(void)
            "\nThe default SERVER is unix:%s/db.sock.\n"
            "The default DATABASE is Open_vSwitch.\n",
            program_name, program_name, ovs_rundir());
-    stream_usage("SERVER", true, true, true);
+    stream_usage("SERVER", true, true, true, false);
     table_usage();
     printf("  --timestamp                 timestamp \"monitor\" output");
     daemon_usage();
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 42ba0053b..3af09c8c1 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -1868,7 +1868,7 @@  usage(void)
            program_name, program_name, ovs_dbdir());
     printf("\nJSON-RPC options (may be specified any number of times):\n"
            "  --remote=REMOTE         connect or listen to REMOTE\n");
-    stream_usage("JSON-RPC", true, true, true);
+    stream_usage("JSON-RPC", true, true, true, false);
     daemon_usage();
     vlog_usage();
     replication_usage();
diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
index 04e941b14..fa9c90b95 100644
--- a/tests/test-jsonrpc.c
+++ b/tests/test-jsonrpc.c
@@ -109,7 +109,7 @@  usage(void)
            "  request REMOTE METHOD PARAMS   send request, print reply\n"
            "  notify REMOTE METHOD PARAMS  send notification and exit\n",
            program_name, program_name);
-    stream_usage("JSON-RPC", true, true, true);
+    stream_usage("JSON-RPC", true, true, true, false);
     daemon_usage();
     vlog_usage();
     printf("\nOther options:\n"
diff --git a/utilities/ovs-vsctl.c b/utilities/ovs-vsctl.c
index 37cc72d40..e7dc50ed5 100644
--- a/utilities/ovs-vsctl.c
+++ b/utilities/ovs-vsctl.c
@@ -458,7 +458,7 @@  Options:\n\
     vlog_usage();
     printf("\
   --no-syslog             equivalent to --verbose=vsctl:syslog:warn\n");
-    stream_usage("database", true, true, true);
+    stream_usage("database", true, true, true, false);
     printf("\n\
 Other options:\n\
   -h, --help                  display this help message\n\
diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
index 1e72b628b..e61a1c3b7 100644
--- a/vswitchd/ovs-vswitchd.c
+++ b/vswitchd/ovs-vswitchd.c
@@ -276,7 +276,7 @@  usage(void)
            "where DATABASE is a socket on which ovsdb-server is listening\n"
            "      (default: \"unix:%s/db.sock\").\n",
            program_name, program_name, ovs_rundir());
-    stream_usage("DATABASE", true, false, true);
+    stream_usage("DATABASE", true, false, true, false);
     daemon_usage();
     vlog_usage();
     printf("\nDPDK options:\n"
diff --git a/vtep/vtep-ctl.c b/vtep/vtep-ctl.c
index ab552457d..09d407ac9 100644
--- a/vtep/vtep-ctl.c
+++ b/vtep/vtep-ctl.c
@@ -373,7 +373,7 @@  Options:\n\
     vlog_usage();
     printf("\
   --no-syslog                 equivalent to --verbose=vtep_ctl:syslog:warn\n");
-    stream_usage("database", true, true, false);
+    stream_usage("database", true, true, false, false);
     printf("\n\
 Other options:\n\
   -h, --help                  display this help message\n\