diff mbox series

[ovs-dev,v2,1/5] Enable kernel probes and map stream probes onto them

Message ID 20200602072152.25918-2-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v2,1/5] Enable kernel probes and map stream probes onto them | expand

Commit Message

Anton Ivanov June 2, 2020, 7:21 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

1. Fix probe logic. The stream_or_pstream_needs_probes
function returning a mix of integer and boolean. As a
result probes were NOT turned off in a number of cases on
unix domain sockets and other transports where there
should be no probing. It now returns -1 (do not know),
0 (no probe needed), 1 (definitely needs probes).

2. Allow delegating probing to keepalive facilities in the
stream layer if avaialable.

3. Provide TCP KEEPALIVE probing at stream layer on supported
platforms for stream-ssl and stream-tcp.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/jsonrpc.c          | 36 ++++++++++++++++++++++++++++++++--
 lib/rconn.c            |  2 +-
 lib/socket-util.c      | 44 ++++++++++++++++++++++++++++++++++++++++++
 lib/socket-util.h      |  8 ++++++++
 lib/stream-fd.c        |  9 +++++++++
 lib/stream-provider.h  |  6 ++++++
 lib/stream-ssl.c       |  8 ++++++++
 lib/stream-tcp.c       |  1 +
 lib/stream-unix.c      |  1 +
 lib/stream-windows.c   |  1 +
 lib/stream.c           | 17 ++++++++++++++--
 lib/stream.h           |  1 +
 ovsdb/jsonrpc-server.c |  2 +-
 13 files changed, 130 insertions(+), 6 deletions(-)

Comments

Mark Michelson June 4, 2020, 2:43 p.m. UTC | #1
Hi Anton, a couple of comments in-line below.

On 6/2/20 3:21 AM, anton.ivanov@cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> 1. Fix probe logic. The stream_or_pstream_needs_probes
> function returning a mix of integer and boolean. As a
> result probes were NOT turned off in a number of cases on
> unix domain sockets and other transports where there
> should be no probing. It now returns -1 (do not know),
> 0 (no probe needed), 1 (definitely needs probes).

In the patch, a return of -1 and a return of 0 are always treated the 
same. I think a simpler fix here would be to change the -1 return in 
stream_or_pstream_needs_probes() to a 0 return instead. This way, the 
code can continue treating a non-zero return as needing probes instead 
of using a magic number comparison.

> 
> 2. Allow delegating probing to keepalive facilities in the
> stream layer if avaialable.
> 
> 3. Provide TCP KEEPALIVE probing at stream layer on supported
> platforms for stream-ssl and stream-tcp.
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---
>   lib/jsonrpc.c          | 36 ++++++++++++++++++++++++++++++++--
>   lib/rconn.c            |  2 +-
>   lib/socket-util.c      | 44 ++++++++++++++++++++++++++++++++++++++++++
>   lib/socket-util.h      |  8 ++++++++
>   lib/stream-fd.c        |  9 +++++++++
>   lib/stream-provider.h  |  6 ++++++
>   lib/stream-ssl.c       |  8 ++++++++
>   lib/stream-tcp.c       |  1 +
>   lib/stream-unix.c      |  1 +
>   lib/stream-windows.c   |  1 +
>   lib/stream.c           | 17 ++++++++++++++--
>   lib/stream.h           |  1 +
>   ovsdb/jsonrpc-server.c |  2 +-
>   13 files changed, 130 insertions(+), 6 deletions(-)
> 
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index ed748dbde..15e8d3527 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -787,6 +787,7 @@ struct jsonrpc_session {
>       int last_error;
>       unsigned int seqno;
>       uint8_t dscp;
> +    int probe_interval;
>   };
>   
>   static void
> @@ -839,6 +840,7 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
>       s->seqno = 0;
>       s->dscp = 0;
>       s->last_error = 0;
> +    s->probe_interval = reconnect_get_probe_interval(s->reconnect);
>   
>       const char *name = reconnect_get_name(s->reconnect);
>       if (!pstream_verify_name(name)) {
> @@ -848,8 +850,9 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
>           reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
>       }
>   
> -    if (!stream_or_pstream_needs_probes(name)) {
> +    if (stream_or_pstream_needs_probes(name) < 1) {
>           reconnect_set_probe_interval(s->reconnect, 0);
> +        s->probe_interval = 0;
>       }
>   
>       return s;
> @@ -879,6 +882,12 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
>       s->stream = NULL;
>       s->pstream = NULL;
>       s->seqno = 1;
> +    s->probe_interval = reconnect_get_probe_interval(s->reconnect);
> +
> +    if (stream_or_pstream_needs_probes(reconnect_get_name(s->reconnect)) < 1) {
> +        reconnect_set_probe_interval(s->reconnect, 0);
> +        s->probe_interval = 0;
> +    }
>   
>       return s;
>   }
> @@ -934,6 +943,12 @@ jsonrpc_session_connect(struct jsonrpc_session *s)
>           error = jsonrpc_stream_open(name, &s->stream, s->dscp);
>           if (!error) {
>               reconnect_connecting(s->reconnect, time_msec());
> +            if (stream_set_probe_interval(s->stream, s->probe_interval)) {
> +                /* we have delegated probing to the stream layer */
> +                reconnect_set_probe_interval(s->reconnect, 0);
> +            } else {
> +                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
> +            }
>           } else {
>               s->last_error = error;
>           }
> @@ -967,6 +982,12 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>                   jsonrpc_session_disconnect(s);
>               }
>               reconnect_connected(s->reconnect, time_msec());
> +            if (stream_set_probe_interval(stream, s->probe_interval)) {
> +                /* we have delegated probing to the stream layer */
> +                reconnect_set_probe_interval(s->reconnect, 0);
> +            } else {
> +                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
> +            }
>               s->rpc = jsonrpc_open(stream);
>               s->seqno++;
>           } else if (error != EAGAIN) {
> @@ -1008,6 +1029,12 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>           if (!error) {
>               reconnect_connected(s->reconnect, time_msec());
>               s->rpc = jsonrpc_open(s->stream);
> +            if (stream_set_probe_interval(s->stream, s->probe_interval)) {
> +                /* we have delegated probing to the stream layer */
> +                reconnect_set_probe_interval(s->reconnect, 0);
> +            } else {
> +                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
> +            }
>               s->stream = NULL;
>               s->seqno++;
>           } else if (error != EAGAIN) {
> @@ -1231,7 +1258,12 @@ void
>   jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
>                                      int probe_interval)
>   {
> -    reconnect_set_probe_interval(s->reconnect, probe_interval);
> +    s->probe_interval = probe_interval;
> +    if (s->stream && s->probe_interval) {
> +        if (!stream_set_probe_interval(s->stream, probe_interval)) {
> +           reconnect_set_probe_interval(s->reconnect, probe_interval);

I might be missing something, but the logic seems a bit off here.

1) If probe_interval is 0, then this doesn't alter probe intervals at 
all. The comment above the function says that a 0 value should disable 
probe intervals.
2) If s->stream is NULL, then this won't alter s->reconnect's 
probe_interval at all. This could be an issue if the intent was to 
lengthen the probe interval or disable it temporarily during downtime.
3) In the case where stream_set_probe_interval succeeds, shouldn't we 
call reconnect_set_probe_interval(s->reconnect, 0)?

> +        }
> +    }
>   }
>   
>   /* Sets the DSCP value used for 's''s connection to 'dscp'.  If this is
> diff --git a/lib/rconn.c b/lib/rconn.c
> index a96b2eb8b..16e7a44d4 100644
> --- a/lib/rconn.c
> +++ b/lib/rconn.c
> @@ -346,7 +346,7 @@ rconn_connect(struct rconn *rc, const char *target, const char *name)
>       rconn_disconnect__(rc);
>       rconn_set_target__(rc, target, name);
>       rc->reliable = true;
> -    if (!stream_or_pstream_needs_probes(target)) {
> +    if (stream_or_pstream_needs_probes(target) < 1) {
>           rc->probe_interval = 0;
>       }
>       reconnect(rc);
> diff --git a/lib/socket-util.c b/lib/socket-util.c
> index 4f1ffecf5..d6d1967f6 100644
> --- a/lib/socket-util.c
> +++ b/lib/socket-util.c
> @@ -114,6 +114,50 @@ setsockopt_tcp_nodelay(int fd)
>       }
>   }
>   
> +#ifdef HAS_KERNEL_KEEPALIVES
> +bool tcp_set_probe_interval(int fd, int probe_interval) {

Is there any way to attempt to make this function atomic? For example, 
we could manage to turn on keepalives, successfully set TCP_KEEPCNT, but 
then fail to set TCP_KEEPIDLE. This would result in a false return, 
which callers currently interpret to mean a complete failure to set 
keepalives. However, in actuality they've been enabled but just not with 
the configured probe_interval. This could lead to some odd debugging 
scenarios.

Also, there should be a special case in this function where if 
probe_interval is 0, it disables keepalives. I have no idea what happens 
if you turn on keepalives but set TCP_KEEPIDLE and TCP_KEEPINTVL to 0.

> +    int on = 1;
> +    int retval;
> +    int value;
> +
> +    on = 1;
> +    retval = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof on);
> +    if (retval) {
> +        retval = sock_errno();
> +        VLOG_DBG("setsockopt(SO_KEEPALIVE): %s", sock_strerror(retval));
> +        return false;
> +    }
> +    value = 2;
> +    retval = setsockopt(fd,
> +            IPPROTO_TCP, TCP_KEEPCNT, &value, sizeof value);
> +    if (retval) {
> +        retval = sock_errno();
> +        VLOG_DBG("setsockopt(TCP_KEEPCNT): %s", sock_strerror(retval));
> +        return false;
> +    }
> +    value = probe_interval;
> +    retval = setsockopt(fd,
> +            IPPROTO_TCP, TCP_KEEPIDLE, &value, sizeof value);
> +    if (retval) {
> +        retval = sock_errno();
> +        VLOG_DBG("setsockopt(TCP_KEEPIDLE): %s", sock_strerror(retval));
> +        return false;
> +    }
> +    value = probe_interval;
> +    retval = setsockopt(fd,
> +            IPPROTO_TCP, TCP_KEEPINTVL, &value, sizeof value);
> +    if (retval) {
> +        retval = sock_errno();
> +        VLOG_DBG("setsockopt(SO_KEEPALIVE): %s", sock_strerror(retval));
> +        return false;
> +    }
> +    return true;
> +#else
> +bool tcp_set_probe_interval(int fd OVS_UNUSED, int probe_interval OVS_UNUSED) {
> +    return false;
> +#endif
> +}
> +
>   /* Sets the DSCP value of socket 'fd' to 'dscp', which must be 63 or less.
>    * 'family' must indicate the socket's address family (AF_INET or AF_INET6, to
>    * do anything useful). */
> diff --git a/lib/socket-util.h b/lib/socket-util.h
> index 9ccb7d4cc..2439e2f78 100644
> --- a/lib/socket-util.h
> +++ b/lib/socket-util.h
> @@ -27,6 +27,7 @@
>   #include "openvswitch/types.h"
>   #include <netinet/in_systm.h>
>   #include <netinet/ip.h>
> +#include <netinet/tcp.h>
>   
>   struct ds;
>   
> @@ -181,4 +182,11 @@ static inline int sock_errno(void)
>   #endif
>   }
>   
> +#if defined (SO_KEEPALIVE) && defined (TCP_KEEPCNT) && \
> +    defined (TCP_KEEPIDLE) && defined (TCP_KEEPINTVL)
> +#define HAS_KERNEL_KEEPALIVES 1
> +#endif
> +
> +bool tcp_set_probe_interval(int fd, int probe_interval);
> +
>   #endif /* socket-util.h */
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 46ee7ae27..30622929b 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -162,6 +162,14 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>       }
>   }
>   
> +static bool fd_set_probe_interval(struct stream *stream, int probe_interval) {
> +    struct stream_fd *sf = stream_fd_cast(stream);
> +
> +    return tcp_set_probe_interval(sf->fd, probe_interval);
> +}
> +
> +
> +
>   static const struct stream_class stream_fd_class = {
>       "fd",                       /* name */
>       false,                      /* needs_probes */
> @@ -173,6 +181,7 @@ static const struct stream_class stream_fd_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       fd_wait,                    /* wait */
> +    fd_set_probe_interval,      /* set_probe_interval */
>   };
>   
>   /* Passive file descriptor stream. */
> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
> index 75f4f059b..6c28cb50b 100644
> --- a/lib/stream-provider.h
> +++ b/lib/stream-provider.h
> @@ -124,6 +124,12 @@ struct stream_class {
>       /* Arranges for the poll loop to wake up when 'stream' is ready to take an
>        * action of the given 'type'. */
>       void (*wait)(struct stream *stream, enum stream_wait_type type);
> +    /* Sets low level keepalives if supported
> +     *
> +     *     If successful returns true
> +     *
> +     */
> +    bool (*set_probe_interval)(struct stream *stream, int probe_interval);
>   };
>   
>   /* Passive listener for incoming stream connections.
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 078fcbc3a..575c55f5b 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -789,6 +789,13 @@ ssl_run(struct stream *stream)
>       }
>   }
>   
> +static bool ssl_set_probe_interval(struct stream *stream, int probe_interval) {
> +    struct ssl_stream *sslv = ssl_stream_cast(stream);
> +
> +    return tcp_set_probe_interval(sslv->fd, probe_interval);
> +}
> +
> +
>   static void
>   ssl_run_wait(struct stream *stream)
>   {
> @@ -861,6 +868,7 @@ const struct stream_class ssl_stream_class = {
>       ssl_run,                    /* run */
>       ssl_run_wait,               /* run_wait */
>       ssl_wait,                   /* wait */
> +    ssl_set_probe_interval,     /* set_probe_interval */
>   };
>   
>   /* Passive SSL. */
> diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
> index e8dc2bfaa..67c912105 100644
> --- a/lib/stream-tcp.c
> +++ b/lib/stream-tcp.c
> @@ -73,6 +73,7 @@ const struct stream_class tcp_stream_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       NULL,                       /* wait */
> +    NULL,
>   };
>   
>   /* Passive TCP. */
> diff --git a/lib/stream-unix.c b/lib/stream-unix.c
> index d265efb83..2322df955 100644
> --- a/lib/stream-unix.c
> +++ b/lib/stream-unix.c
> @@ -73,6 +73,7 @@ const struct stream_class unix_stream_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       NULL,                       /* wait */
> +    NULL,
>   };
>   
>   /* Passive UNIX socket. */
> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
> index 5c4c55e5d..836112f75 100644
> --- a/lib/stream-windows.c
> +++ b/lib/stream-windows.c
> @@ -374,6 +374,7 @@ const struct stream_class windows_stream_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       windows_wait,               /* wait */
> +    NULL,
>   };
>   
>   struct pwindows_pstream
> diff --git a/lib/stream.c b/lib/stream.c
> index e246b3773..f0b16fb4f 100644
> --- a/lib/stream.c
> +++ b/lib/stream.c
> @@ -430,6 +430,19 @@ stream_wait(struct stream *stream, enum stream_wait_type wait)
>       (stream->class->wait)(stream, wait);
>   }
>   
> +
> +bool stream_set_probe_interval(struct stream *stream, int probe_interval) {
> +    if (! stream->class->needs_probes) {
> +        return true;
> +    }
> +    if (probe_interval && stream->class->set_probe_interval) {
> +        return (stream->class->set_probe_interval)(
> +                stream, probe_interval / 1000);
> +    }
> +    return false;
> +}
> +
> +
>   void
>   stream_connect_wait(struct stream *stream)
>   {
> @@ -509,9 +522,9 @@ stream_or_pstream_needs_probes(const char *name)
>       const struct stream_class *class;
>   
>       if (!stream_lookup_class(name, &class)) {
> -        return class->needs_probes;
> +        return class->needs_probes ? 1 : 0;
>       } else if (!pstream_lookup_class(name, &pclass)) {
> -        return pclass->needs_probes;
> +        return pclass->needs_probes ? 1 : 0;
>       } else {
>           return -1;
>       }
> diff --git a/lib/stream.h b/lib/stream.h
> index 77bffa498..e343f75a5 100644
> --- a/lib/stream.h
> +++ b/lib/stream.h
> @@ -40,6 +40,7 @@ const char *stream_get_name(const struct stream *);
>   int stream_connect(struct stream *);
>   int stream_recv(struct stream *, void *buffer, size_t n);
>   int stream_send(struct stream *, const void *buffer, size_t n);
> +bool stream_set_probe_interval(struct stream *, int probe_interval);
>   
>   void stream_run(struct stream *);
>   void stream_run_wait(struct stream *);
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index 4e2dfc3d7..fe8ffc317 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -212,7 +212,7 @@ ovsdb_jsonrpc_default_options(const char *target)
>   {
>       struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
>       options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
> -    options->probe_interval = (stream_or_pstream_needs_probes(target)
> +    options->probe_interval = (stream_or_pstream_needs_probes(target) < 1
>                                  ? RECONNECT_DEFAULT_PROBE_INTERVAL
>                                  : 0);
>       return options;
>
Anton Ivanov June 4, 2020, 3:11 p.m. UTC | #2
On 04/06/2020 15:43, Mark Michelson wrote:
> Hi Anton, a couple of comments in-line below.
>
> On 6/2/20 3:21 AM, anton.ivanov@cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> 1. Fix probe logic. The stream_or_pstream_needs_probes
>> function returning a mix of integer and boolean. As a
>> result probes were NOT turned off in a number of cases on
>> unix domain sockets and other transports where there
>> should be no probing. It now returns -1 (do not know),
>> 0 (no probe needed), 1 (definitely needs probes).
>
> In the patch, a return of -1 and a return of 0 are always treated the same. I think a simpler fix here would be to change the -1 return in stream_or_pstream_needs_probes() to a 0 return instead. This way, the code can continue treating a non-zero return as needing probes instead of using a magic number comparison.

Ack.

I did not want to lose the "I do not know" case in case someone needs it in the future. For the time being nobody does so we can indeed return 0 in needs_probes.

I will update the patch in the next revision.

>
>>
>> 2. Allow delegating probing to keepalive facilities in the
>> stream layer if avaialable.
>>
>> 3. Provide TCP KEEPALIVE probing at stream layer on supported
>> platforms for stream-ssl and stream-tcp.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> ---
>>   lib/jsonrpc.c          | 36 ++++++++++++++++++++++++++++++++--
>>   lib/rconn.c            |  2 +-
>>   lib/socket-util.c      | 44 ++++++++++++++++++++++++++++++++++++++++++
>>   lib/socket-util.h      |  8 ++++++++
>>   lib/stream-fd.c        |  9 +++++++++
>>   lib/stream-provider.h  |  6 ++++++
>>   lib/stream-ssl.c       |  8 ++++++++
>>   lib/stream-tcp.c       |  1 +
>>   lib/stream-unix.c      |  1 +
>>   lib/stream-windows.c   |  1 +
>>   lib/stream.c           | 17 ++++++++++++++--
>>   lib/stream.h           |  1 +
>>   ovsdb/jsonrpc-server.c |  2 +-
>>   13 files changed, 130 insertions(+), 6 deletions(-)
>>
>> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
>> index ed748dbde..15e8d3527 100644
>> --- a/lib/jsonrpc.c
>> +++ b/lib/jsonrpc.c
>> @@ -787,6 +787,7 @@ struct jsonrpc_session {
>>       int last_error;
>>       unsigned int seqno;
>>       uint8_t dscp;
>> +    int probe_interval;
>>   };
>>     static void
>> @@ -839,6 +840,7 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
>>       s->seqno = 0;
>>       s->dscp = 0;
>>       s->last_error = 0;
>> +    s->probe_interval = reconnect_get_probe_interval(s->reconnect);
>>         const char *name = reconnect_get_name(s->reconnect);
>>       if (!pstream_verify_name(name)) {
>> @@ -848,8 +850,9 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
>>           reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
>>       }
>>   -    if (!stream_or_pstream_needs_probes(name)) {
>> +    if (stream_or_pstream_needs_probes(name) < 1) {
>>           reconnect_set_probe_interval(s->reconnect, 0);
>> +        s->probe_interval = 0;
>>       }
>>         return s;
>> @@ -879,6 +882,12 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
>>       s->stream = NULL;
>>       s->pstream = NULL;
>>       s->seqno = 1;
>> +    s->probe_interval = reconnect_get_probe_interval(s->reconnect);
>> +
>> +    if (stream_or_pstream_needs_probes(reconnect_get_name(s->reconnect)) < 1) {
>> +        reconnect_set_probe_interval(s->reconnect, 0);
>> +        s->probe_interval = 0;
>> +    }
>>         return s;
>>   }
>> @@ -934,6 +943,12 @@ jsonrpc_session_connect(struct jsonrpc_session *s)
>>           error = jsonrpc_stream_open(name, &s->stream, s->dscp);
>>           if (!error) {
>>               reconnect_connecting(s->reconnect, time_msec());
>> +            if (stream_set_probe_interval(s->stream, s->probe_interval)) {
>> +                /* we have delegated probing to the stream layer */
>> +                reconnect_set_probe_interval(s->reconnect, 0);
>> +            } else {
>> +                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
>> +            }
>>           } else {
>>               s->last_error = error;
>>           }
>> @@ -967,6 +982,12 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>>                   jsonrpc_session_disconnect(s);
>>               }
>>               reconnect_connected(s->reconnect, time_msec());
>> +            if (stream_set_probe_interval(stream, s->probe_interval)) {
>> +                /* we have delegated probing to the stream layer */
>> +                reconnect_set_probe_interval(s->reconnect, 0);
>> +            } else {
>> +                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
>> +            }
>>               s->rpc = jsonrpc_open(stream);
>>               s->seqno++;
>>           } else if (error != EAGAIN) {
>> @@ -1008,6 +1029,12 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>>           if (!error) {
>>               reconnect_connected(s->reconnect, time_msec());
>>               s->rpc = jsonrpc_open(s->stream);
>> +            if (stream_set_probe_interval(s->stream, s->probe_interval)) {
>> +                /* we have delegated probing to the stream layer */
>> +                reconnect_set_probe_interval(s->reconnect, 0);
>> +            } else {
>> +                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
>> +            }
>>               s->stream = NULL;
>>               s->seqno++;
>>           } else if (error != EAGAIN) {
>> @@ -1231,7 +1258,12 @@ void
>>   jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
>>                                      int probe_interval)
>>   {
>> -    reconnect_set_probe_interval(s->reconnect, probe_interval);
>> +    s->probe_interval = probe_interval;
>> +    if (s->stream && s->probe_interval) {
>> +        if (!stream_set_probe_interval(s->stream, probe_interval)) {
>> +           reconnect_set_probe_interval(s->reconnect, probe_interval);
>
> I might be missing something, but the logic seems a bit off here.
>
> 1) If probe_interval is 0, then this doesn't alter probe intervals at all. The comment above the function says that a 0 value should disable probe intervals.
> 2) If s->stream is NULL, then this won't alter s->reconnect's probe_interval at all. This could be an issue if the intent was to lengthen the probe interval or disable it temporarily during downtime.
> 3) In the case where stream_set_probe_interval succeeds, shouldn't we call reconnect_set_probe_interval(s->reconnect, 0)?
>
>> +        }
>> +    }
>>   }
>>     /* Sets the DSCP value used for 's''s connection to 'dscp'. If this is
>> diff --git a/lib/rconn.c b/lib/rconn.c
>> index a96b2eb8b..16e7a44d4 100644
>> --- a/lib/rconn.c
>> +++ b/lib/rconn.c
>> @@ -346,7 +346,7 @@ rconn_connect(struct rconn *rc, const char *target, const char *name)
>>       rconn_disconnect__(rc);
>>       rconn_set_target__(rc, target, name);
>>       rc->reliable = true;
>> -    if (!stream_or_pstream_needs_probes(target)) {
>> +    if (stream_or_pstream_needs_probes(target) < 1) {
>>           rc->probe_interval = 0;
>>       }
>>       reconnect(rc);
>> diff --git a/lib/socket-util.c b/lib/socket-util.c
>> index 4f1ffecf5..d6d1967f6 100644
>> --- a/lib/socket-util.c
>> +++ b/lib/socket-util.c
>> @@ -114,6 +114,50 @@ setsockopt_tcp_nodelay(int fd)
>>       }
>>   }
>>   +#ifdef HAS_KERNEL_KEEPALIVES
>> +bool tcp_set_probe_interval(int fd, int probe_interval) {
>
> Is there any way to attempt to make this function atomic? For example, we could manage to turn on keepalives, successfully set TCP_KEEPCNT, but then fail to set TCP_KEEPIDLE. This would result in a false return, which callers currently interpret to mean a complete failure to set keepalives. However, in actuality they've been enabled but just not with the configured probe_interval. This could lead to some odd debugging scenarios.
>
> Also, there should be a special case in this function where if probe_interval is 0, it disables keepalives. I have no idea what happens if you turn on keepalives but set TCP_KEEPIDLE and TCP_KEEPINTVL to 0.
>
>> +    int on = 1;
>> +    int retval;
>> +    int value;
>> +
>> +    on = 1;
>> +    retval = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof on);
>> +    if (retval) {
>> +        retval = sock_errno();
>> +        VLOG_DBG("setsockopt(SO_KEEPALIVE): %s", sock_strerror(retval));
>> +        return false;
>> +    }
>> +    value = 2;
>> +    retval = setsockopt(fd,
>> +            IPPROTO_TCP, TCP_KEEPCNT, &value, sizeof value);
>> +    if (retval) {
>> +        retval = sock_errno();
>> +        VLOG_DBG("setsockopt(TCP_KEEPCNT): %s", sock_strerror(retval));
>> +        return false;
>> +    }
>> +    value = probe_interval;
>> +    retval = setsockopt(fd,
>> +            IPPROTO_TCP, TCP_KEEPIDLE, &value, sizeof value);
>> +    if (retval) {
>> +        retval = sock_errno();
>> +        VLOG_DBG("setsockopt(TCP_KEEPIDLE): %s", sock_strerror(retval));
>> +        return false;
>> +    }
>> +    value = probe_interval;
>> +    retval = setsockopt(fd,
>> +            IPPROTO_TCP, TCP_KEEPINTVL, &value, sizeof value);
>> +    if (retval) {
>> +        retval = sock_errno();
>> +        VLOG_DBG("setsockopt(SO_KEEPALIVE): %s", sock_strerror(retval));
>> +        return false;
>> +    }
>> +    return true;
>> +#else
>> +bool tcp_set_probe_interval(int fd OVS_UNUSED, int probe_interval OVS_UNUSED) {
>> +    return false;
>> +#endif
>> +}
>> +
>>   /* Sets the DSCP value of socket 'fd' to 'dscp', which must be 63 or less.
>>    * 'family' must indicate the socket's address family (AF_INET or AF_INET6, to
>>    * do anything useful). */
>> diff --git a/lib/socket-util.h b/lib/socket-util.h
>> index 9ccb7d4cc..2439e2f78 100644
>> --- a/lib/socket-util.h
>> +++ b/lib/socket-util.h
>> @@ -27,6 +27,7 @@
>>   #include "openvswitch/types.h"
>>   #include <netinet/in_systm.h>
>>   #include <netinet/ip.h>
>> +#include <netinet/tcp.h>
>>     struct ds;
>>   @@ -181,4 +182,11 @@ static inline int sock_errno(void)
>>   #endif
>>   }
>>   +#if defined (SO_KEEPALIVE) && defined (TCP_KEEPCNT) && \
>> +    defined (TCP_KEEPIDLE) && defined (TCP_KEEPINTVL)
>> +#define HAS_KERNEL_KEEPALIVES 1
>> +#endif
>> +
>> +bool tcp_set_probe_interval(int fd, int probe_interval);
>> +
>>   #endif /* socket-util.h */
>> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
>> index 46ee7ae27..30622929b 100644
>> --- a/lib/stream-fd.c
>> +++ b/lib/stream-fd.c
>> @@ -162,6 +162,14 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>>       }
>>   }
>>   +static bool fd_set_probe_interval(struct stream *stream, int probe_interval) {
>> +    struct stream_fd *sf = stream_fd_cast(stream);
>> +
>> +    return tcp_set_probe_interval(sf->fd, probe_interval);
>> +}
>> +
>> +
>> +
>>   static const struct stream_class stream_fd_class = {
>>       "fd",                       /* name */
>>       false,                      /* needs_probes */
>> @@ -173,6 +181,7 @@ static const struct stream_class stream_fd_class = {
>>       NULL,                       /* run */
>>       NULL,                       /* run_wait */
>>       fd_wait,                    /* wait */
>> +    fd_set_probe_interval,      /* set_probe_interval */
>>   };
>>   
>>   /* Passive file descriptor stream. */
>> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
>> index 75f4f059b..6c28cb50b 100644
>> --- a/lib/stream-provider.h
>> +++ b/lib/stream-provider.h
>> @@ -124,6 +124,12 @@ struct stream_class {
>>       /* Arranges for the poll loop to wake up when 'stream' is ready to take an
>>        * action of the given 'type'. */
>>       void (*wait)(struct stream *stream, enum stream_wait_type type);
>> +    /* Sets low level keepalives if supported
>> +     *
>> +     *     If successful returns true
>> +     *
>> +     */
>> +    bool (*set_probe_interval)(struct stream *stream, int probe_interval);
>>   };
>>   
>>   /* Passive listener for incoming stream connections.
>> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
>> index 078fcbc3a..575c55f5b 100644
>> --- a/lib/stream-ssl.c
>> +++ b/lib/stream-ssl.c
>> @@ -789,6 +789,13 @@ ssl_run(struct stream *stream)
>>       }
>>   }
>>   +static bool ssl_set_probe_interval(struct stream *stream, int probe_interval) {
>> +    struct ssl_stream *sslv = ssl_stream_cast(stream);
>> +
>> +    return tcp_set_probe_interval(sslv->fd, probe_interval);
>> +}
>> +
>> +
>>   static void
>>   ssl_run_wait(struct stream *stream)
>>   {
>> @@ -861,6 +868,7 @@ const struct stream_class ssl_stream_class = {
>>       ssl_run,                    /* run */
>>       ssl_run_wait,               /* run_wait */
>>       ssl_wait,                   /* wait */
>> +    ssl_set_probe_interval,     /* set_probe_interval */
>>   };
>>   
>>   /* Passive SSL. */
>> diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
>> index e8dc2bfaa..67c912105 100644
>> --- a/lib/stream-tcp.c
>> +++ b/lib/stream-tcp.c
>> @@ -73,6 +73,7 @@ const struct stream_class tcp_stream_class = {
>>       NULL,                       /* run */
>>       NULL,                       /* run_wait */
>>       NULL,                       /* wait */
>> +    NULL,
>>   };
>>   
>>   /* Passive TCP. */
>> diff --git a/lib/stream-unix.c b/lib/stream-unix.c
>> index d265efb83..2322df955 100644
>> --- a/lib/stream-unix.c
>> +++ b/lib/stream-unix.c
>> @@ -73,6 +73,7 @@ const struct stream_class unix_stream_class = {
>>       NULL,                       /* run */
>>       NULL,                       /* run_wait */
>>       NULL,                       /* wait */
>> +    NULL,
>>   };
>>   
>>   /* Passive UNIX socket. */
>> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
>> index 5c4c55e5d..836112f75 100644
>> --- a/lib/stream-windows.c
>> +++ b/lib/stream-windows.c
>> @@ -374,6 +374,7 @@ const struct stream_class windows_stream_class = {
>>       NULL,                       /* run */
>>       NULL,                       /* run_wait */
>>       windows_wait,               /* wait */
>> +    NULL,
>>   };
>>     struct pwindows_pstream
>> diff --git a/lib/stream.c b/lib/stream.c
>> index e246b3773..f0b16fb4f 100644
>> --- a/lib/stream.c
>> +++ b/lib/stream.c
>> @@ -430,6 +430,19 @@ stream_wait(struct stream *stream, enum stream_wait_type wait)
>>       (stream->class->wait)(stream, wait);
>>   }
>>   +
>> +bool stream_set_probe_interval(struct stream *stream, int probe_interval) {
>> +    if (! stream->class->needs_probes) {
>> +        return true;
>> +    }
>> +    if (probe_interval && stream->class->set_probe_interval) {
>> +        return (stream->class->set_probe_interval)(
>> +                stream, probe_interval / 1000);
>> +    }
>> +    return false;
>> +}
>> +
>> +
>>   void
>>   stream_connect_wait(struct stream *stream)
>>   {
>> @@ -509,9 +522,9 @@ stream_or_pstream_needs_probes(const char *name)
>>       const struct stream_class *class;
>>         if (!stream_lookup_class(name, &class)) {
>> -        return class->needs_probes;
>> +        return class->needs_probes ? 1 : 0;
>>       } else if (!pstream_lookup_class(name, &pclass)) {
>> -        return pclass->needs_probes;
>> +        return pclass->needs_probes ? 1 : 0;
>>       } else {
>>           return -1;
>>       }
>> diff --git a/lib/stream.h b/lib/stream.h
>> index 77bffa498..e343f75a5 100644
>> --- a/lib/stream.h
>> +++ b/lib/stream.h
>> @@ -40,6 +40,7 @@ const char *stream_get_name(const struct stream *);
>>   int stream_connect(struct stream *);
>>   int stream_recv(struct stream *, void *buffer, size_t n);
>>   int stream_send(struct stream *, const void *buffer, size_t n);
>> +bool stream_set_probe_interval(struct stream *, int probe_interval);
>>     void stream_run(struct stream *);
>>   void stream_run_wait(struct stream *);
>> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
>> index 4e2dfc3d7..fe8ffc317 100644
>> --- a/ovsdb/jsonrpc-server.c
>> +++ b/ovsdb/jsonrpc-server.c
>> @@ -212,7 +212,7 @@ ovsdb_jsonrpc_default_options(const char *target)
>>   {
>>       struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
>>       options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
>> -    options->probe_interval = (stream_or_pstream_needs_probes(target)
>> +    options->probe_interval = (stream_or_pstream_needs_probes(target) < 1
>>                                  ? RECONNECT_DEFAULT_PROBE_INTERVAL
>>                                  : 0);
>>       return options;
>>
>
>
diff mbox series

Patch

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index ed748dbde..15e8d3527 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -787,6 +787,7 @@  struct jsonrpc_session {
     int last_error;
     unsigned int seqno;
     uint8_t dscp;
+    int probe_interval;
 };
 
 static void
@@ -839,6 +840,7 @@  jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
     s->seqno = 0;
     s->dscp = 0;
     s->last_error = 0;
+    s->probe_interval = reconnect_get_probe_interval(s->reconnect);
 
     const char *name = reconnect_get_name(s->reconnect);
     if (!pstream_verify_name(name)) {
@@ -848,8 +850,9 @@  jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
         reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
     }
 
-    if (!stream_or_pstream_needs_probes(name)) {
+    if (stream_or_pstream_needs_probes(name) < 1) {
         reconnect_set_probe_interval(s->reconnect, 0);
+        s->probe_interval = 0;
     }
 
     return s;
@@ -879,6 +882,12 @@  jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     s->stream = NULL;
     s->pstream = NULL;
     s->seqno = 1;
+    s->probe_interval = reconnect_get_probe_interval(s->reconnect);
+
+    if (stream_or_pstream_needs_probes(reconnect_get_name(s->reconnect)) < 1) {
+        reconnect_set_probe_interval(s->reconnect, 0);
+        s->probe_interval = 0;
+    }
 
     return s;
 }
@@ -934,6 +943,12 @@  jsonrpc_session_connect(struct jsonrpc_session *s)
         error = jsonrpc_stream_open(name, &s->stream, s->dscp);
         if (!error) {
             reconnect_connecting(s->reconnect, time_msec());
+            if (stream_set_probe_interval(s->stream, s->probe_interval)) {
+                /* we have delegated probing to the stream layer */
+                reconnect_set_probe_interval(s->reconnect, 0);
+            } else {
+                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
+            }
         } else {
             s->last_error = error;
         }
@@ -967,6 +982,12 @@  jsonrpc_session_run(struct jsonrpc_session *s)
                 jsonrpc_session_disconnect(s);
             }
             reconnect_connected(s->reconnect, time_msec());
+            if (stream_set_probe_interval(stream, s->probe_interval)) {
+                /* we have delegated probing to the stream layer */
+                reconnect_set_probe_interval(s->reconnect, 0);
+            } else {
+                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
+            }
             s->rpc = jsonrpc_open(stream);
             s->seqno++;
         } else if (error != EAGAIN) {
@@ -1008,6 +1029,12 @@  jsonrpc_session_run(struct jsonrpc_session *s)
         if (!error) {
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(s->stream);
+            if (stream_set_probe_interval(s->stream, s->probe_interval)) {
+                /* we have delegated probing to the stream layer */
+                reconnect_set_probe_interval(s->reconnect, 0);
+            } else {
+                reconnect_set_probe_interval(s->reconnect, s->probe_interval);
+            }
             s->stream = NULL;
             s->seqno++;
         } else if (error != EAGAIN) {
@@ -1231,7 +1258,12 @@  void
 jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
                                    int probe_interval)
 {
-    reconnect_set_probe_interval(s->reconnect, probe_interval);
+    s->probe_interval = probe_interval;
+    if (s->stream && s->probe_interval) {
+        if (!stream_set_probe_interval(s->stream, probe_interval)) {
+           reconnect_set_probe_interval(s->reconnect, probe_interval);
+        }
+    }
 }
 
 /* Sets the DSCP value used for 's''s connection to 'dscp'.  If this is
diff --git a/lib/rconn.c b/lib/rconn.c
index a96b2eb8b..16e7a44d4 100644
--- a/lib/rconn.c
+++ b/lib/rconn.c
@@ -346,7 +346,7 @@  rconn_connect(struct rconn *rc, const char *target, const char *name)
     rconn_disconnect__(rc);
     rconn_set_target__(rc, target, name);
     rc->reliable = true;
-    if (!stream_or_pstream_needs_probes(target)) {
+    if (stream_or_pstream_needs_probes(target) < 1) {
         rc->probe_interval = 0;
     }
     reconnect(rc);
diff --git a/lib/socket-util.c b/lib/socket-util.c
index 4f1ffecf5..d6d1967f6 100644
--- a/lib/socket-util.c
+++ b/lib/socket-util.c
@@ -114,6 +114,50 @@  setsockopt_tcp_nodelay(int fd)
     }
 }
 
+#ifdef HAS_KERNEL_KEEPALIVES
+bool tcp_set_probe_interval(int fd, int probe_interval) {
+    int on = 1;
+    int retval;
+    int value;
+
+    on = 1;
+    retval = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof on);
+    if (retval) {
+        retval = sock_errno();
+        VLOG_DBG("setsockopt(SO_KEEPALIVE): %s", sock_strerror(retval));
+        return false;
+    }
+    value = 2;
+    retval = setsockopt(fd,
+            IPPROTO_TCP, TCP_KEEPCNT, &value, sizeof value);
+    if (retval) {
+        retval = sock_errno();
+        VLOG_DBG("setsockopt(TCP_KEEPCNT): %s", sock_strerror(retval));
+        return false;
+    }
+    value = probe_interval;
+    retval = setsockopt(fd,
+            IPPROTO_TCP, TCP_KEEPIDLE, &value, sizeof value);
+    if (retval) {
+        retval = sock_errno();
+        VLOG_DBG("setsockopt(TCP_KEEPIDLE): %s", sock_strerror(retval));
+        return false;
+    }
+    value = probe_interval;
+    retval = setsockopt(fd,
+            IPPROTO_TCP, TCP_KEEPINTVL, &value, sizeof value);
+    if (retval) {
+        retval = sock_errno();
+        VLOG_DBG("setsockopt(SO_KEEPALIVE): %s", sock_strerror(retval));
+        return false;
+    }
+    return true;
+#else
+bool tcp_set_probe_interval(int fd OVS_UNUSED, int probe_interval OVS_UNUSED) {
+    return false;
+#endif
+}
+
 /* Sets the DSCP value of socket 'fd' to 'dscp', which must be 63 or less.
  * 'family' must indicate the socket's address family (AF_INET or AF_INET6, to
  * do anything useful). */
diff --git a/lib/socket-util.h b/lib/socket-util.h
index 9ccb7d4cc..2439e2f78 100644
--- a/lib/socket-util.h
+++ b/lib/socket-util.h
@@ -27,6 +27,7 @@ 
 #include "openvswitch/types.h"
 #include <netinet/in_systm.h>
 #include <netinet/ip.h>
+#include <netinet/tcp.h>
 
 struct ds;
 
@@ -181,4 +182,11 @@  static inline int sock_errno(void)
 #endif
 }
 
+#if defined (SO_KEEPALIVE) && defined (TCP_KEEPCNT) && \
+    defined (TCP_KEEPIDLE) && defined (TCP_KEEPINTVL)
+#define HAS_KERNEL_KEEPALIVES 1
+#endif
+
+bool tcp_set_probe_interval(int fd, int probe_interval);
+
 #endif /* socket-util.h */
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 46ee7ae27..30622929b 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -162,6 +162,14 @@  fd_wait(struct stream *stream, enum stream_wait_type wait)
     }
 }
 
+static bool fd_set_probe_interval(struct stream *stream, int probe_interval) {
+    struct stream_fd *sf = stream_fd_cast(stream);
+
+    return tcp_set_probe_interval(sf->fd, probe_interval);
+}
+
+
+
 static const struct stream_class stream_fd_class = {
     "fd",                       /* name */
     false,                      /* needs_probes */
@@ -173,6 +181,7 @@  static const struct stream_class stream_fd_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     fd_wait,                    /* wait */
+    fd_set_probe_interval,      /* set_probe_interval */
 };
 
 /* Passive file descriptor stream. */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 75f4f059b..6c28cb50b 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -124,6 +124,12 @@  struct stream_class {
     /* Arranges for the poll loop to wake up when 'stream' is ready to take an
      * action of the given 'type'. */
     void (*wait)(struct stream *stream, enum stream_wait_type type);
+    /* Sets low level keepalives if supported
+     *
+     *     If successful returns true
+     *
+     */
+    bool (*set_probe_interval)(struct stream *stream, int probe_interval);
 };
 
 /* Passive listener for incoming stream connections.
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 078fcbc3a..575c55f5b 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -789,6 +789,13 @@  ssl_run(struct stream *stream)
     }
 }
 
+static bool ssl_set_probe_interval(struct stream *stream, int probe_interval) {
+    struct ssl_stream *sslv = ssl_stream_cast(stream);
+
+    return tcp_set_probe_interval(sslv->fd, probe_interval);
+}
+
+
 static void
 ssl_run_wait(struct stream *stream)
 {
@@ -861,6 +868,7 @@  const struct stream_class ssl_stream_class = {
     ssl_run,                    /* run */
     ssl_run_wait,               /* run_wait */
     ssl_wait,                   /* wait */
+    ssl_set_probe_interval,     /* set_probe_interval */
 };
 
 /* Passive SSL. */
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index e8dc2bfaa..67c912105 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -73,6 +73,7 @@  const struct stream_class tcp_stream_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     NULL,                       /* wait */
+    NULL,
 };
 
 /* Passive TCP. */
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index d265efb83..2322df955 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -73,6 +73,7 @@  const struct stream_class unix_stream_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     NULL,                       /* wait */
+    NULL,
 };
 
 /* Passive UNIX socket. */
diff --git a/lib/stream-windows.c b/lib/stream-windows.c
index 5c4c55e5d..836112f75 100644
--- a/lib/stream-windows.c
+++ b/lib/stream-windows.c
@@ -374,6 +374,7 @@  const struct stream_class windows_stream_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     windows_wait,               /* wait */
+    NULL,
 };
 
 struct pwindows_pstream
diff --git a/lib/stream.c b/lib/stream.c
index e246b3773..f0b16fb4f 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -430,6 +430,19 @@  stream_wait(struct stream *stream, enum stream_wait_type wait)
     (stream->class->wait)(stream, wait);
 }
 
+
+bool stream_set_probe_interval(struct stream *stream, int probe_interval) {
+    if (! stream->class->needs_probes) {
+        return true;
+    }
+    if (probe_interval && stream->class->set_probe_interval) {
+        return (stream->class->set_probe_interval)(
+                stream, probe_interval / 1000);
+    }
+    return false;
+}
+
+
 void
 stream_connect_wait(struct stream *stream)
 {
@@ -509,9 +522,9 @@  stream_or_pstream_needs_probes(const char *name)
     const struct stream_class *class;
 
     if (!stream_lookup_class(name, &class)) {
-        return class->needs_probes;
+        return class->needs_probes ? 1 : 0;
     } else if (!pstream_lookup_class(name, &pclass)) {
-        return pclass->needs_probes;
+        return pclass->needs_probes ? 1 : 0;
     } else {
         return -1;
     }
diff --git a/lib/stream.h b/lib/stream.h
index 77bffa498..e343f75a5 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -40,6 +40,7 @@  const char *stream_get_name(const struct stream *);
 int stream_connect(struct stream *);
 int stream_recv(struct stream *, void *buffer, size_t n);
 int stream_send(struct stream *, const void *buffer, size_t n);
+bool stream_set_probe_interval(struct stream *, int probe_interval);
 
 void stream_run(struct stream *);
 void stream_run_wait(struct stream *);
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 4e2dfc3d7..fe8ffc317 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -212,7 +212,7 @@  ovsdb_jsonrpc_default_options(const char *target)
 {
     struct ovsdb_jsonrpc_options *options = xzalloc(sizeof *options);
     options->max_backoff = RECONNECT_DEFAULT_MAX_BACKOFF;
-    options->probe_interval = (stream_or_pstream_needs_probes(target)
+    options->probe_interval = (stream_or_pstream_needs_probes(target) < 1
                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
                                : 0);
     return options;