[ovs-dev,v4,1/2] Add file descriptor persistence where possible
diff mbox series

Message ID 20200226152418.20338-1-anton.ivanov@cambridgegreys.com
State New
Headers show
Series
  • [ovs-dev,v4,1/2] Add file descriptor persistence where possible
Related show

Commit Message

Anton Ivanov Feb. 26, 2020, 3:24 p.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

1. Adds "persistent" behaviour where feasible streams.
These are waited upon in the same thread where they are created. This
allows them to be registered persistently with the OS (if possible)
as well as the OS to provide hints - is the FD ready, is it closed,
etc.

2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
which is not ready if the thread has a persistent poll loop.

3. Enables fd persistence for ovsdb main server thread only

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 include/openvswitch/poll-loop.h |  33 ++++++
 lib/poll-loop.c                 | 173 ++++++++++++++++++++++++++++++--
 lib/stream-fd.c                 |  63 +++++++++++-
 lib/stream-provider.h           |   4 +
 lib/stream-ssl.c                |  78 +++++++++++++-
 lib/stream-windows.c            |   2 +
 ovsdb/ovsdb-server.c            |   1 +
 tests/ovsdb-cluster.at          |   6 +-
 8 files changed, 340 insertions(+), 20 deletions(-)

Comments

0-day Robot Feb. 26, 2020, 4:09 p.m. UTC | #1
Bleep bloop.  Greetings Anton Ivanov, I am a robot and I have tried out your patch.
Thanks for your contribution.

I encountered some error that I wasn't expecting.  See the details below.


checkpatch:
WARNING: Line has trailing whitespace
#50 FILE: include/openvswitch/poll-loop.h:54:
 * containing the events passed by the OS in .revents. 

WARNING: Line is 86 characters long (recommended limit is 79)
#54 FILE: include/openvswitch/poll-loop.h:58:
bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);

WARNING: Line is 100 characters long (recommended limit is 79)
#55 FILE: include/openvswitch/poll-loop.h:59:
#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)

WARNING: Line is 96 characters long (recommended limit is 79)
#77 FILE: include/openvswitch/poll-loop.h:87:
#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)

WARNING: Line is 83 characters long (recommended limit is 79)
#102 FILE: lib/poll-loop.c:46:
    bool valid;                 /* Marked invalid if we got a HUP/NVAL from poll */

WARNING: Line is 82 characters long (recommended limit is 79)
#149 FILE: lib/poll-loop.c:166:
poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {

WARNING: Line has trailing whitespace
#158 FILE: lib/poll-loop.c:175:
    } 

WARNING: Line is 101 characters long (recommended limit is 79)
#262 FILE: lib/poll-loop.c:492:
                VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);

WARNING: Line is 91 characters long (recommended limit is 79)
#273 FILE: lib/poll-loop.c:503:
            * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc

WARNING: Line is 87 characters long (recommended limit is 79)
#274 FILE: lib/poll-loop.c:504:
            * behaviour which they should be using in real life instead of using poll()

WARNING: Line is 85 characters long (recommended limit is 79)
#277 FILE: lib/poll-loop.c:507:
            /* update "occured" events for use by streams and handlers. In case there

WARNING: Line is 83 characters long (recommended limit is 79)
#278 FILE: lib/poll-loop.c:508:
             * is an existing (but not consumed yet) event, we OR the events in the

WARNING: Line is 85 characters long (recommended limit is 79)
#279 FILE: lib/poll-loop.c:509:
             * stored record with the new ones - it is the job of the stream to clear

WARNING: Line is 93 characters long (recommended limit is 79)
#366 FILE: lib/stream-fd.c:114:
        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight

WARNING: Line is 89 characters long (recommended limit is 79)
#367 FILE: lib/stream-fd.c:115:
         * to the read which should return 0 if the HUP is a real one, if not we clear it

WARNING: Line is 87 characters long (recommended limit is 79)
#370 FILE: lib/stream-fd.c:118:
        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (!stream->rx_ready)) {

WARNING: Line is 93 characters long (recommended limit is 79)
#527 FILE: lib/stream-ssl.c:706:
        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight

WARNING: Line is 89 characters long (recommended limit is 79)
#528 FILE: lib/stream-ssl.c:707:
         * to the read which should return 0 if the HUP is a real one, if not we clear it

WARNING: Line is 98 characters long (recommended limit is 79)
#531 FILE: lib/stream-ssl.c:710:
        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (sslv->rx_want == SSL_READING)) {

WARNING: Line has trailing whitespace
#535 FILE: lib/stream-ssl.c:714:
                /* POLLIN event from poll loop, mark us as ready 

WARNING: Line is 83 characters long (recommended limit is 79)
#626 FILE: lib/stream-ssl.c:889:
                private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));

Lines checked: 729, Warnings: 21, Errors: 0


Please check this out.  If you feel there has been an error, please email aconole@redhat.com

Thanks,
0-day Robot
Ben Pfaff March 6, 2020, 9:38 p.m. UTC | #2
On Wed, Feb 26, 2020 at 03:24:17PM +0000, anton.ivanov@cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> 1. Adds "persistent" behaviour where feasible streams.
> These are waited upon in the same thread where they are created. This
> allows them to be registered persistently with the OS (if possible)
> as well as the OS to provide hints - is the FD ready, is it closed,
> etc.
> 
> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
> which is not ready if the thread has a persistent poll loop.
> 
> 3. Enables fd persistence for ovsdb main server thread only
> 
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>

Thanks for the patches.

These have lots and lots of coding style violations, as reported by the
checkpatch robot.  Would you mind fixing those up and resubmitting a v5?

Thanks,

Ben.
Anton Ivanov March 6, 2020, 10:36 p.m. UTC | #3
I have a cleaner version in the works which should be ready some times 
next week.

I will try to release the next version ~ Tue/Wed along with some level 
of async IO as well as reduction of copying in the json rpc TX path.

A.

On 06/03/2020 21:38, Ben Pfaff wrote:
> On Wed, Feb 26, 2020 at 03:24:17PM +0000, anton.ivanov@cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> 1. Adds "persistent" behaviour where feasible streams.
>> These are waited upon in the same thread where they are created. This
>> allows them to be registered persistently with the OS (if possible)
>> as well as the OS to provide hints - is the FD ready, is it closed,
>> etc.
>>
>> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
>> which is not ready if the thread has a persistent poll loop.
>>
>> 3. Enables fd persistence for ovsdb main server thread only
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> Thanks for the patches.
>
> These have lots and lots of coding style violations, as reported by the
> checkpatch robot.  Would you mind fixing those up and resubmitting a v5?
>
> Thanks,
>
> Ben.
>
Ben Pfaff March 6, 2020, 10:42 p.m. UTC | #4
Sounds good.

On Fri, Mar 06, 2020 at 10:36:28PM +0000, Anton Ivanov wrote:
> I have a cleaner version in the works which should be ready some times next
> week.
> 
> I will try to release the next version ~ Tue/Wed along with some level of
> async IO as well as reduction of copying in the json rpc TX path.
> 
> A.
> 
> On 06/03/2020 21:38, Ben Pfaff wrote:
> > On Wed, Feb 26, 2020 at 03:24:17PM +0000, anton.ivanov@cambridgegreys.com wrote:
> > > From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> > > 
> > > 1. Adds "persistent" behaviour where feasible streams.
> > > These are waited upon in the same thread where they are created. This
> > > allows them to be registered persistently with the OS (if possible)
> > > as well as the OS to provide hints - is the FD ready, is it closed,
> > > etc.
> > > 
> > > 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
> > > which is not ready if the thread has a persistent poll loop.
> > > 
> > > 3. Enables fd persistence for ovsdb main server thread only
> > > 
> > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> > Thanks for the patches.
> > 
> > These have lots and lots of coding style violations, as reported by the
> > checkpatch robot.  Would you mind fixing those up and resubmitting a v5?
> > 
> > Thanks,
> > 
> > Ben.
> > 
> 
> -- 
> Anton R. Ivanov
> Cambridgegreys Limited. Registered in England. Company Number 10273661
> https://www.cambridgegreys.com/
>
Aaron Conole March 12, 2020, 6:49 p.m. UTC | #5
anton.ivanov@cambridgegreys.com writes:

> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>
> 1. Adds "persistent" behaviour where feasible streams.
> These are waited upon in the same thread where they are created. This
> allows them to be registered persistently with the OS (if possible)
> as well as the OS to provide hints - is the FD ready, is it closed,
> etc.
>
> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
> which is not ready if the thread has a persistent poll loop.
>
> 3. Enables fd persistence for ovsdb main server thread only
>
> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> ---

Hi Anton,

Just a really cursory review.  The robot flagged a bunch of things to
address.

>  include/openvswitch/poll-loop.h |  33 ++++++
>  lib/poll-loop.c                 | 173 ++++++++++++++++++++++++++++++--
>  lib/stream-fd.c                 |  63 +++++++++++-
>  lib/stream-provider.h           |   4 +
>  lib/stream-ssl.c                |  78 +++++++++++++-
>  lib/stream-windows.c            |   2 +
>  ovsdb/ovsdb-server.c            |   1 +
>  tests/ovsdb-cluster.at          |   6 +-
>  8 files changed, 340 insertions(+), 20 deletions(-)
>
> diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
> index 532640497..50d49a8ed 100644
> --- a/include/openvswitch/poll-loop.h
> +++ b/include/openvswitch/poll-loop.h
> @@ -33,6 +33,8 @@
>  #ifndef POLL_LOOP_H
>  #define POLL_LOOP_H 1
>  
> +#include <stdbool.h>
> +
>  #ifndef _WIN32
>  #include <poll.h>
>  #endif
> @@ -45,6 +47,24 @@
>  extern "C" {
>  #endif
>  
> +/* Register a fd with a persistence framework if available so it can be served
> + * "faster" and the caller can be provided with "hints" on what caused the IO
> + * event.
> + * If the "hint" argument is supplied it set to point to the pollfd structure
> + * containing the events passed by the OS in .revents. 
> + * returns false if persistence is not enabled.
> + */
> +
> +bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
> +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
> +
> +/* De-register a fd which was registered as "private" with the persistence
> + * framework
> + */
> +
> +void poll_fd_deregister_at(int fd, const char *where);
> +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
> +
>  
>  /* Schedule events to wake up the following poll_block().
>   *
> @@ -58,6 +78,15 @@ extern "C" {
>  void poll_fd_wait_at(int fd, short int events, const char *where);
>  #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
>  
> +/* Schedule events to wake up the following poll_block() - "private fds"
> + * Same as poll_fd_wait, but for fds which have been registered and are
> + * expected to persist. If a "fast" OS fd notification framework is used
> + * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
> + */
> +void private_poll_fd_wait_at(int fd, int events, const char *where);
> +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
> +
> +
>  #ifdef _WIN32
>  void poll_wevent_wait_at(HANDLE wevent, const char *where);
>  #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
> @@ -76,6 +105,10 @@ void poll_immediate_wake_at(const char *where);
>  /* Wait until an event occurs. */
>  void poll_block(void);
>  
> +/* Enable persistence for this thread's poll loop */
> +void poll_enable_persist(void);
> +
> +
>  #ifdef  __cplusplus
>  }
>  #endif
> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
> index 4e751ff2c..ed8ad16b1 100644
> --- a/lib/poll-loop.c
> +++ b/lib/poll-loop.c
> @@ -43,6 +43,7 @@ struct poll_node {
>      struct pollfd pollfd;       /* Events to pass to time_poll(). */
>      HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
>      const char *where;          /* Where poll_node was created. */
> +    bool valid;                 /* Marked invalid if we got a HUP/NVAL from poll */
>  };
>  
>  struct poll_loop {
> @@ -53,6 +54,7 @@ struct poll_loop {
>       * wake up immediately, or LLONG_MAX to wait forever. */
>      long long int timeout_when; /* In msecs as returned by time_msec(). */
>      const char *timeout_where;  /* Where 'timeout_when' was set. */
> +    bool persist;
>  };
>  
>  static struct poll_loop *poll_loop(void);
> @@ -99,8 +101,8 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
>   * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
>   * automatically provide the caller's source file and line number for
>   * 'where'.) */
> -static void
> -poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
> +static struct poll_node
> +*poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>  {
>      struct poll_loop *loop = poll_loop();
>      struct poll_node *node;
> @@ -127,7 +129,9 @@ poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>  #endif
>          node->wevent = wevent;
>          node->where = where;
> +        node->valid = true;
>      }
> +    return node;
>  }
>  
>  /* Registers 'fd' as waiting for the specified 'events' (which should be POLLIN
> @@ -149,6 +153,44 @@ poll_fd_wait_at(int fd, short int events, const char *where)
>      poll_create_node(fd, 0, events, where);
>  }
>  
> +void
> +private_poll_fd_wait_at(int fd, int events, const char *where)
> +{
> +    if (events & (~POLLIN)) {
> +        poll_create_node(fd, 0, events, where);
> +    }
> +}
> +
> +
> +bool
> +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
> +    struct poll_loop *loop = poll_loop();
> +    struct poll_node *node;
> +    if (loop->persist) {
> +        node = poll_create_node(fd, 0, events, where);
> +        if (hint) {
> +            *hint = &node->pollfd;
> +        }
> +        return true;
> +    } 
> +    return false;
> +}
> +
> +
> +void
> +poll_fd_deregister_at(int fd, const char *where) {
> +    struct poll_loop *loop = poll_loop();
> +    struct poll_node *node;
> +
> +    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
> +
> +    node = find_poll_node(loop, fd, 0);
> +    if (node) {
> +        hmap_remove(&loop->poll_nodes, &node->hmap_node);
> +    }
> +}
> +
> +
>  #ifdef _WIN32
>  /* Registers for the next call to poll_block() to wake up when 'wevent' is
>   * signaled.
> @@ -312,13 +354,9 @@ free_poll_nodes(struct poll_loop *loop)
>      }
>  }
>  
> -/* Blocks until one or more of the events registered with poll_fd_wait()
> - * occurs, or until the minimum duration registered with poll_timer_wait()
> - * elapses, or not at all if poll_immediate_wake() has been called. */
> -void
> -poll_block(void)
> +static void
> +non_persist_poll_block(struct poll_loop *loop)
>  {
> -    struct poll_loop *loop = poll_loop();
>      struct poll_node *node;
>      struct pollfd *pollfds;
>      HANDLE *wevents = NULL;
> @@ -389,7 +427,117 @@ poll_block(void)
>  
>      seq_woke();
>  }
> -
> +
> +static void
> +persist_poll_block(struct poll_loop *loop)
> +{
> +    struct poll_node *node;
> +    struct pollfd *pollfds;
> +    HANDLE *wevents = NULL;
> +    int elapsed;
> +    int retval;
> +    int i, counter;

Please initialize 'i' here.

> +
> +    /* Register fatal signal events before actually doing any real work for
> +     * poll_block. */
> +    fatal_signal_wait();
> +
> +    if (loop->timeout_when == LLONG_MIN) {
> +        COVERAGE_INC(poll_zero_timeout);
> +    }
> +
> +    timewarp_run();
> +    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
> +
> +#ifdef _WIN32
> +    wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
> +#endif
> +
> +    /* Populate with all the fds and events. */
> +    counter = 0;
> +    HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
> +        if (node->pollfd.events && node->valid) {
> +            pollfds[counter] = node->pollfd;
> +            counter++;
> +        }
> +#ifdef _WIN32
> +        wevents[i] = node->wevent;
> +        if (node->pollfd.fd && node->wevent) {
> +            short int wsa_events = 0;
> +            if (node->pollfd.events & POLLIN) {
> +                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
> +            }
> +            if (node->pollfd.events & POLLOUT) {
> +                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
> +            }
> +            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
> +        }
> +#endif
> +        i++;

I think 'i' goes in the #define above.

> +    }
> +
> +    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
> +                       loop->timeout_when, &elapsed);
> +    if (retval < 0) {
> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> +        VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
> +    } else if (retval == 0) {
> +        log_wakeup(loop->timeout_where, NULL, elapsed);
> +    } else {
> +        for (i = 0; i < counter; i++) {
> +            node = find_poll_node(loop, pollfds[i].fd, 0);
> +            if (!node) {
> +                VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
> +            }
> +            if (pollfds[i].revents & (POLLHUP | POLLNVAL)) {
> +                node->valid = false;
> +            }
> +            if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> +                if (pollfds[i].revents) {
> +                    log_wakeup(node->where, &pollfds[i], 0);
> +                }
> +            }
> +            /* update "requested" events.
> +            * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
> +            * behaviour which they should be using in real life instead of using poll()
> +            */
> +            node->pollfd.events &= ~(pollfds[i].revents & (~POLLIN));
> +            /* update "occured" events for use by streams and handlers. In case there
> +             * is an existing (but not consumed yet) event, we OR the events in the
> +             * stored record with the new ones - it is the job of the stream to clear
> +             * that.
> +             */
> +            node->pollfd.revents |= pollfds[i].revents;
> +        }
> +    }
> +
> +    loop->timeout_when = LLONG_MAX;
> +    loop->timeout_where = NULL;
> +    free(pollfds);
> +    free(wevents);
> +
> +    /* Handle any pending signals before doing anything else. */
> +    fatal_signal_run();
> +
> +    seq_woke();
> +
> +}
> +/* Blocks until one or more of the events registered with poll_fd_wait()
> + * occurs, or until the minimum duration registered with poll_timer_wait()
> + * elapses, or not at all if poll_immediate_wake() has been called. */
> +
> +void
> +poll_block(void)
> +{
> +    struct poll_loop *loop = poll_loop();
> +    if (loop->persist) {
> +        persist_poll_block(loop);
> +    } else {
> +        non_persist_poll_block(loop);
> +    }
> +}
> +
> +
>  static void
>  free_poll_loop(void *loop_)
>  {
> @@ -400,6 +548,12 @@ free_poll_loop(void *loop_)
>      free(loop);
>  }
>  
> +void poll_enable_persist(void) {
> +    struct poll_loop *loop = poll_loop();
> +
> +    loop->persist = true;
> +}
> +
>  static struct poll_loop *
>  poll_loop(void)
>  {
> @@ -418,6 +572,7 @@ poll_loop(void)
>          loop->timeout_when = LLONG_MAX;
>          hmap_init(&loop->poll_nodes);
>          xpthread_setspecific(key, loop);
> +        loop->persist = false;
>      }
>      return loop;
>  }
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 46ee7ae27..22261fedb 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -65,6 +65,9 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
>  
>      s = xmalloc(sizeof *s);
>      stream_init(&s->stream, &stream_fd_class, connect_status, name);
> +    s->stream.persist = poll_fd_register(fd, POLLIN, &s->stream.hint);
> +    s->stream.rx_ready = true;
> +    s->stream.tx_ready = true;
>      s->fd = fd;
>      s->fd_type = fd_type;
>      *streamp = &s->stream;
> @@ -82,6 +85,9 @@ static void
>  fd_close(struct stream *stream)
>  {
>      struct stream_fd *s = stream_fd_cast(stream);
> +    if (s->stream.persist) {
> +        poll_fd_deregister(s->fd);
> +    }
>      closesocket(s->fd);
>      free(s);
>  }
> @@ -104,6 +110,26 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
>      ssize_t retval;
>      int error;
>  
> +    if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
> +         * for all other cases we belive what (e)poll has fed us.
> +         */
> +        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (!stream->rx_ready)) {
> +            if (!(stream->hint->revents & POLLIN)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready */
> +                stream->rx_ready = true;
> +                stream->hint->revents &= ~POLLIN;
> +            }
> +        } else {
> +            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
> +        }
> +    }
> +
> +
> +
>      retval = recv(s->fd, buffer, n, 0);
>      if (retval < 0) {
>          error = sock_errno();
> @@ -127,6 +153,19 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>      ssize_t retval;
>      int error;
>  
> +    if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO */
> +        if (!stream->tx_ready) {
> +            if (!(stream->hint->revents & POLLOUT)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLOUT event from poll loop, mark us as ready */
> +                stream->tx_ready = true;
> +                stream->hint->revents &= ~POLLOUT;
> +            }
> +        }
> +    }
> +
>      retval = send(s->fd, buffer, n, 0);
>      if (retval < 0) {
>          error = sock_errno();
> @@ -137,6 +176,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>  #endif
>          if (error != EAGAIN) {
>              VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
> +        } else {
> +            stream->tx_ready = false;
>          }
>          return -error;
>      }
> @@ -150,11 +191,19 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>      switch (wait) {
>      case STREAM_CONNECT:
>      case STREAM_SEND:
> -        poll_fd_wait(s->fd, POLLOUT);
> +        if (stream->persist) {
> +            private_poll_fd_wait(s->fd, POLLOUT);
> +        } else {
> +            poll_fd_wait(s->fd, POLLOUT);
> +        }
>          break;
>  
>      case STREAM_RECV:
> -        poll_fd_wait(s->fd, POLLIN);
> +        if (stream->persist) {
> +            private_poll_fd_wait(s->fd, POLLIN);
> +        } else {
> +            poll_fd_wait(s->fd, POLLIN);
> +        }
>          break;
>  
>      default:
> @@ -219,6 +268,7 @@ new_fd_pstream(char *name, int fd,
>  {
>      struct fd_pstream *ps = xmalloc(sizeof *ps);
>      pstream_init(&ps->pstream, &fd_pstream_class, name);
> +    ps->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
>      ps->fd = fd;
>      ps->accept_cb = accept_cb;
>      ps->unlink_path = unlink_path;
> @@ -230,6 +280,9 @@ static void
>  pfd_close(struct pstream *pstream)
>  {
>      struct fd_pstream *ps = fd_pstream_cast(pstream);
> +    if (pstream->persist) {
> +        poll_fd_deregister(ps->fd);
> +    }
>      closesocket(ps->fd);
>      maybe_unlink_and_free(ps->unlink_path);
>      free(ps);
> @@ -271,7 +324,11 @@ static void
>  pfd_wait(struct pstream *pstream)
>  {
>      struct fd_pstream *ps = fd_pstream_cast(pstream);
> -    poll_fd_wait(ps->fd, POLLIN);
> +    if (pstream->persist) {
> +        private_poll_fd_wait(ps->fd, POLLIN);
> +    } else {
> +        poll_fd_wait(ps->fd, POLLIN);
> +    }
>  }
>  
>  static const struct pstream_class fd_pstream_class = {
> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
> index 75f4f059b..53de231dc 100644
> --- a/lib/stream-provider.h
> +++ b/lib/stream-provider.h
> @@ -18,6 +18,7 @@
>  #define STREAM_PROVIDER_H 1
>  
>  #include <sys/types.h>
> +#include <poll.h>
>  #include "stream.h"
>  
>  /* Active stream connection. */
> @@ -31,6 +32,8 @@ struct stream {
>      int error;
>      char *name;
>      char *peer_id;
> +    bool persist, rx_ready, tx_ready;
> +    struct pollfd *hint;
>  };
>  
>  void stream_init(struct stream *, const struct stream_class *,
> @@ -133,6 +136,7 @@ struct pstream {
>      const struct pstream_class *class;
>      char *name;
>      ovs_be16 bound_port;
> +    bool persist;
>  };
>  
>  void pstream_init(struct pstream *, const struct pstream_class *, char *name);
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 078fcbc3a..056bb2023 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -310,6 +310,10 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
>          SSL_set_msg_callback_arg(ssl, sslv);
>      }
>  
> +    sslv->stream.persist = poll_fd_register(fd, POLLIN, &sslv->stream.hint);
> +    sslv->stream.rx_ready = true;
> +    sslv->stream.tx_ready = true;
> +
>      *streamp = &sslv->stream;
>      free(server_name);
>      return 0;
> @@ -604,6 +608,7 @@ ssl_close(struct stream *stream)
>      ERR_clear_error();
>  
>      SSL_free(sslv->ssl);
> +    poll_fd_deregister(sslv->fd);
>      closesocket(sslv->fd);
>      free(sslv);
>  }
> @@ -697,6 +702,26 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
>      /* Behavior of zero-byte SSL_read is poorly defined. */
>      ovs_assert(n > 0);
>  
> +    if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
> +         * for all other cases we belive what (e)poll has fed us.
> +         */
> +        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
> +            if (!(stream->hint->revents & POLLIN)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready 
> +                 * rx_want is cleared further down by reading ssl fsm
> +                 */
> +                stream->hint->revents &= ~POLLIN;
> +            }
> +        } else {
> +            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
> +        }
> +    }
> +
> +
>      old_state = SSL_get_state(sslv->ssl);
>      ret = SSL_read(sslv->ssl, buffer, n);
>      if (old_state != SSL_get_state(sslv->ssl)) {
> @@ -729,6 +754,21 @@ ssl_do_tx(struct stream *stream)
>  {
>      struct ssl_stream *sslv = ssl_stream_cast(stream);
>  
> +     if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO */
> +        if (sslv->tx_want == SSL_WRITING) {
> +            if (!(stream->hint->revents & POLLOUT)) {
> +                return EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready
> +                 * rx_want is cleared further down by reading ssl fsm
> +                 */
> +                stream->hint->revents &= ~POLLOUT;
> +            }
> +        }
> +    }
> +
> +
>      for (;;) {
>          int old_state = SSL_get_state(sslv->ssl);
>          int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
> @@ -771,6 +811,9 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
>              ssl_clear_txbuf(sslv);
>              return n;
>          case EAGAIN:
> +            if (stream->persist) {
> +                stream_send_wait(stream);
> +            }
>              return n;
>          default:
>              ssl_clear_txbuf(sslv);
> @@ -795,7 +838,11 @@ ssl_run_wait(struct stream *stream)
>      struct ssl_stream *sslv = ssl_stream_cast(stream);
>  
>      if (sslv->tx_want != SSL_NOTHING) {
> -        poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        if (stream->persist) {
> +            private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        } else {
> +            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        }
>      }
>  }
>  
> @@ -811,14 +858,23 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>          } else {
>              switch (sslv->state) {
>              case STATE_TCP_CONNECTING:
> -                poll_fd_wait(sslv->fd, POLLOUT);
> +                if (stream->persist) {
> +                    private_poll_fd_wait(sslv->fd, POLLOUT);
> +                } else {
> +                    poll_fd_wait(sslv->fd, POLLOUT);
> +                }
>                  break;
>  
>              case STATE_SSL_CONNECTING:
>                  /* ssl_connect() called SSL_accept() or SSL_connect(), which
>                   * set up the status that we test here. */
> -                poll_fd_wait(sslv->fd,
> +                if (stream->persist) {
> +                    private_poll_fd_wait(sslv->fd,
> +                               want_to_poll_events(SSL_want(sslv->ssl)));
> +                } else {
> +                    poll_fd_wait(sslv->fd,
>                                 want_to_poll_events(SSL_want(sslv->ssl)));
> +                }
>                  break;
>  
>              default:
> @@ -829,7 +885,11 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>  
>      case STREAM_RECV:
>          if (sslv->rx_want != SSL_NOTHING) {
> -            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            if (stream->persist) {
> +                private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            } else {
> +                poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            }
>          } else {
>              poll_immediate_wake();
>          }
> @@ -911,6 +971,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
>                   ds_steal_cstr(&bound_name));
>      pstream_set_bound_port(&pssl->pstream, htons(port));
>      pssl->fd = fd;
> +    pssl->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
>      *pstreamp = &pssl->pstream;
>  
>      return 0;
> @@ -921,6 +982,9 @@ pssl_close(struct pstream *pstream)
>  {
>      struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
>      closesocket(pssl->fd);
> +    if (pstream->persist) {
> +        poll_fd_deregister(pssl->fd);
> +    }
>      free(pssl);
>  }
>  
> @@ -965,7 +1029,11 @@ static void
>  pssl_wait(struct pstream *pstream)
>  {
>      struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
> -    poll_fd_wait(pssl->fd, POLLIN);
> +    if (pstream->persist) {
> +        private_poll_fd_wait(pssl->fd, POLLIN);
> +    } else {
> +        poll_fd_wait(pssl->fd, POLLIN);
> +    }
>  }
>  
>  const struct pstream_class pssl_pstream_class = {
> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
> index 34bc610b6..b845911b6 100644
> --- a/lib/stream-windows.c
> +++ b/lib/stream-windows.c
> @@ -175,6 +175,7 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
>      s->read_pending = false;
>      s->write_pending = false;
>      s->retry_connect = retry;
> +    s->stream.persist = false;
>      *streamp = &s->stream;
>      return 0;
>  }
> @@ -649,6 +650,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix,
>      p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
>      p->pending = false;
>      p->pipe_path = bind_path;
> +    p->stream.persist = false;
>      *pstreamp = &p->pstream;
>      return 0;
>  }
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index b6957d730..295a62afd 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -182,6 +182,7 @@ main_loop(struct server_config *config,
>      *exiting = false;
>      ssl_error = NULL;
>      remotes_error = NULL;
> +    poll_enable_persist();
>      while (!*exiting) {
>          memory_run();
>          if (memory_should_report()) {
> diff --git a/tests/ovsdb-cluster.at b/tests/ovsdb-cluster.at
> index 3a0bd4579..1f71d4fe8 100644
> --- a/tests/ovsdb-cluster.at
> +++ b/tests/ovsdb-cluster.at
> @@ -15,7 +15,7 @@ ovsdb_check_cluster () {
>  
>      on_exit 'kill `cat *.pid`'
>      for i in `seq $n`; do
> -        AT_CHECK([ovsdb-server -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
> +        AT_CHECK([ovsdb-server -vpoll_loop -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>      done
>      for i in `seq $n`; do
>          AT_CHECK([ovsdb_client_wait unix:s$i.ovsdb $schema connected])
> @@ -307,7 +307,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
>      start_server() {
>          local i=$1
>          printf "\ns$i: starting\n"
> -        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
> +        AT_CHECK([ovsdb-server -vjsonrpc -vpoll_loop -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>      }
>      connect_server() {
>          local i=$1
> @@ -465,7 +465,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
>      start_server() {
>          local i=$1
>          printf "\ns$i: starting\n"
> -        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
> +        AT_CHECK([ovsdb-server -vpoll_loop -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>      }
>      stop_server() {
>          local i=$1
Aaron Conole March 12, 2020, 6:50 p.m. UTC | #6
Anton Ivanov <anton.ivanov@cambridgegreys.com> writes:

> I have a cleaner version in the works which should be ready some times
> next week.
>
> I will try to release the next version ~ Tue/Wed along with some level
> of async IO as well as reduction of copying in the json rpc TX path.

Can you send a cover letter as well?  It helps as a single mail to point
for the series.

> A.
>
> On 06/03/2020 21:38, Ben Pfaff wrote:
>> On Wed, Feb 26, 2020 at 03:24:17PM +0000, anton.ivanov@cambridgegreys.com wrote:
>>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>>
>>> 1. Adds "persistent" behaviour where feasible streams.
>>> These are waited upon in the same thread where they are created. This
>>> allows them to be registered persistently with the OS (if possible)
>>> as well as the OS to provide hints - is the FD ready, is it closed,
>>> etc.
>>>
>>> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
>>> which is not ready if the thread has a persistent poll loop.
>>>
>>> 3. Enables fd persistence for ovsdb main server thread only
>>>
>>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> Thanks for the patches.
>>
>> These have lots and lots of coding style violations, as reported by the
>> checkpatch robot.  Would you mind fixing those up and resubmitting a v5?
>>
>> Thanks,
>>
>> Ben.
>>
Anton Ivanov March 12, 2020, 7:08 p.m. UTC | #7
On 12/03/2020 18:49, Aaron Conole wrote:
> anton.ivanov@cambridgegreys.com writes:
>
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> 1. Adds "persistent" behaviour where feasible streams.
>> These are waited upon in the same thread where they are created. This
>> allows them to be registered persistently with the OS (if possible)
>> as well as the OS to provide hints - is the FD ready, is it closed,
>> etc.
>>
>> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
>> which is not ready if the thread has a persistent poll loop.
>>
>> 3. Enables fd persistence for ovsdb main server thread only
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>> ---
> Hi Anton,
>
> Just a really cursory review.  The robot flagged a bunch of things to
> address.

Thanks.

I will address that in the next rewrite which will hopefully be ready by 
tomorrow.

It works does well on the ovs and ovsdb test suite, but flakes out quite 
often on the ovn one.

I have narrowed it down and once I have fixed the OVN flakes I will 
resubmit it.

A.

>
>>   include/openvswitch/poll-loop.h |  33 ++++++
>>   lib/poll-loop.c                 | 173 ++++++++++++++++++++++++++++++--
>>   lib/stream-fd.c                 |  63 +++++++++++-
>>   lib/stream-provider.h           |   4 +
>>   lib/stream-ssl.c                |  78 +++++++++++++-
>>   lib/stream-windows.c            |   2 +
>>   ovsdb/ovsdb-server.c            |   1 +
>>   tests/ovsdb-cluster.at          |   6 +-
>>   8 files changed, 340 insertions(+), 20 deletions(-)
>>
>> diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
>> index 532640497..50d49a8ed 100644
>> --- a/include/openvswitch/poll-loop.h
>> +++ b/include/openvswitch/poll-loop.h
>> @@ -33,6 +33,8 @@
>>   #ifndef POLL_LOOP_H
>>   #define POLL_LOOP_H 1
>>   
>> +#include <stdbool.h>
>> +
>>   #ifndef _WIN32
>>   #include <poll.h>
>>   #endif
>> @@ -45,6 +47,24 @@
>>   extern "C" {
>>   #endif
>>   
>> +/* Register a fd with a persistence framework if available so it can be served
>> + * "faster" and the caller can be provided with "hints" on what caused the IO
>> + * event.
>> + * If the "hint" argument is supplied it set to point to the pollfd structure
>> + * containing the events passed by the OS in .revents.
>> + * returns false if persistence is not enabled.
>> + */
>> +
>> +bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
>> +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
>> +
>> +/* De-register a fd which was registered as "private" with the persistence
>> + * framework
>> + */
>> +
>> +void poll_fd_deregister_at(int fd, const char *where);
>> +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
>> +
>>   
>>   /* Schedule events to wake up the following poll_block().
>>    *
>> @@ -58,6 +78,15 @@ extern "C" {
>>   void poll_fd_wait_at(int fd, short int events, const char *where);
>>   #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
>>   
>> +/* Schedule events to wake up the following poll_block() - "private fds"
>> + * Same as poll_fd_wait, but for fds which have been registered and are
>> + * expected to persist. If a "fast" OS fd notification framework is used
>> + * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
>> + */
>> +void private_poll_fd_wait_at(int fd, int events, const char *where);
>> +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
>> +
>> +
>>   #ifdef _WIN32
>>   void poll_wevent_wait_at(HANDLE wevent, const char *where);
>>   #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
>> @@ -76,6 +105,10 @@ void poll_immediate_wake_at(const char *where);
>>   /* Wait until an event occurs. */
>>   void poll_block(void);
>>   
>> +/* Enable persistence for this thread's poll loop */
>> +void poll_enable_persist(void);
>> +
>> +
>>   #ifdef  __cplusplus
>>   }
>>   #endif
>> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
>> index 4e751ff2c..ed8ad16b1 100644
>> --- a/lib/poll-loop.c
>> +++ b/lib/poll-loop.c
>> @@ -43,6 +43,7 @@ struct poll_node {
>>       struct pollfd pollfd;       /* Events to pass to time_poll(). */
>>       HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
>>       const char *where;          /* Where poll_node was created. */
>> +    bool valid;                 /* Marked invalid if we got a HUP/NVAL from poll */
>>   };
>>   
>>   struct poll_loop {
>> @@ -53,6 +54,7 @@ struct poll_loop {
>>        * wake up immediately, or LLONG_MAX to wait forever. */
>>       long long int timeout_when; /* In msecs as returned by time_msec(). */
>>       const char *timeout_where;  /* Where 'timeout_when' was set. */
>> +    bool persist;
>>   };
>>   
>>   static struct poll_loop *poll_loop(void);
>> @@ -99,8 +101,8 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
>>    * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
>>    * automatically provide the caller's source file and line number for
>>    * 'where'.) */
>> -static void
>> -poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>> +static struct poll_node
>> +*poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>>   {
>>       struct poll_loop *loop = poll_loop();
>>       struct poll_node *node;
>> @@ -127,7 +129,9 @@ poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>>   #endif
>>           node->wevent = wevent;
>>           node->where = where;
>> +        node->valid = true;
>>       }
>> +    return node;
>>   }
>>   
>>   /* Registers 'fd' as waiting for the specified 'events' (which should be POLLIN
>> @@ -149,6 +153,44 @@ poll_fd_wait_at(int fd, short int events, const char *where)
>>       poll_create_node(fd, 0, events, where);
>>   }
>>   
>> +void
>> +private_poll_fd_wait_at(int fd, int events, const char *where)
>> +{
>> +    if (events & (~POLLIN)) {
>> +        poll_create_node(fd, 0, events, where);
>> +    }
>> +}
>> +
>> +
>> +bool
>> +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
>> +    struct poll_loop *loop = poll_loop();
>> +    struct poll_node *node;
>> +    if (loop->persist) {
>> +        node = poll_create_node(fd, 0, events, where);
>> +        if (hint) {
>> +            *hint = &node->pollfd;
>> +        }
>> +        return true;
>> +    }
>> +    return false;
>> +}
>> +
>> +
>> +void
>> +poll_fd_deregister_at(int fd, const char *where) {
>> +    struct poll_loop *loop = poll_loop();
>> +    struct poll_node *node;
>> +
>> +    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
>> +
>> +    node = find_poll_node(loop, fd, 0);
>> +    if (node) {
>> +        hmap_remove(&loop->poll_nodes, &node->hmap_node);
>> +    }
>> +}
>> +
>> +
>>   #ifdef _WIN32
>>   /* Registers for the next call to poll_block() to wake up when 'wevent' is
>>    * signaled.
>> @@ -312,13 +354,9 @@ free_poll_nodes(struct poll_loop *loop)
>>       }
>>   }
>>   
>> -/* Blocks until one or more of the events registered with poll_fd_wait()
>> - * occurs, or until the minimum duration registered with poll_timer_wait()
>> - * elapses, or not at all if poll_immediate_wake() has been called. */
>> -void
>> -poll_block(void)
>> +static void
>> +non_persist_poll_block(struct poll_loop *loop)
>>   {
>> -    struct poll_loop *loop = poll_loop();
>>       struct poll_node *node;
>>       struct pollfd *pollfds;
>>       HANDLE *wevents = NULL;
>> @@ -389,7 +427,117 @@ poll_block(void)
>>   
>>       seq_woke();
>>   }
>> -
>> +
>> +static void
>> +persist_poll_block(struct poll_loop *loop)
>> +{
>> +    struct poll_node *node;
>> +    struct pollfd *pollfds;
>> +    HANDLE *wevents = NULL;
>> +    int elapsed;
>> +    int retval;
>> +    int i, counter;
> Please initialize 'i' here.
>
>> +
>> +    /* Register fatal signal events before actually doing any real work for
>> +     * poll_block. */
>> +    fatal_signal_wait();
>> +
>> +    if (loop->timeout_when == LLONG_MIN) {
>> +        COVERAGE_INC(poll_zero_timeout);
>> +    }
>> +
>> +    timewarp_run();
>> +    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
>> +
>> +#ifdef _WIN32
>> +    wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
>> +#endif
>> +
>> +    /* Populate with all the fds and events. */
>> +    counter = 0;
>> +    HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
>> +        if (node->pollfd.events && node->valid) {
>> +            pollfds[counter] = node->pollfd;
>> +            counter++;
>> +        }
>> +#ifdef _WIN32
>> +        wevents[i] = node->wevent;
>> +        if (node->pollfd.fd && node->wevent) {
>> +            short int wsa_events = 0;
>> +            if (node->pollfd.events & POLLIN) {
>> +                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
>> +            }
>> +            if (node->pollfd.events & POLLOUT) {
>> +                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
>> +            }
>> +            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
>> +        }
>> +#endif
>> +        i++;
> I think 'i' goes in the #define above.
>
>> +    }
>> +
>> +    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
>> +                       loop->timeout_when, &elapsed);
>> +    if (retval < 0) {
>> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>> +        VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
>> +    } else if (retval == 0) {
>> +        log_wakeup(loop->timeout_where, NULL, elapsed);
>> +    } else {
>> +        for (i = 0; i < counter; i++) {
>> +            node = find_poll_node(loop, pollfds[i].fd, 0);
>> +            if (!node) {
>> +                VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
>> +            }
>> +            if (pollfds[i].revents & (POLLHUP | POLLNVAL)) {
>> +                node->valid = false;
>> +            }
>> +            if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
>> +                if (pollfds[i].revents) {
>> +                    log_wakeup(node->where, &pollfds[i], 0);
>> +                }
>> +            }
>> +            /* update "requested" events.
>> +            * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
>> +            * behaviour which they should be using in real life instead of using poll()
>> +            */
>> +            node->pollfd.events &= ~(pollfds[i].revents & (~POLLIN));
>> +            /* update "occured" events for use by streams and handlers. In case there
>> +             * is an existing (but not consumed yet) event, we OR the events in the
>> +             * stored record with the new ones - it is the job of the stream to clear
>> +             * that.
>> +             */
>> +            node->pollfd.revents |= pollfds[i].revents;
>> +        }
>> +    }
>> +
>> +    loop->timeout_when = LLONG_MAX;
>> +    loop->timeout_where = NULL;
>> +    free(pollfds);
>> +    free(wevents);
>> +
>> +    /* Handle any pending signals before doing anything else. */
>> +    fatal_signal_run();
>> +
>> +    seq_woke();
>> +
>> +}
>> +/* Blocks until one or more of the events registered with poll_fd_wait()
>> + * occurs, or until the minimum duration registered with poll_timer_wait()
>> + * elapses, or not at all if poll_immediate_wake() has been called. */
>> +
>> +void
>> +poll_block(void)
>> +{
>> +    struct poll_loop *loop = poll_loop();
>> +    if (loop->persist) {
>> +        persist_poll_block(loop);
>> +    } else {
>> +        non_persist_poll_block(loop);
>> +    }
>> +}
>> +
>> +
>>   static void
>>   free_poll_loop(void *loop_)
>>   {
>> @@ -400,6 +548,12 @@ free_poll_loop(void *loop_)
>>       free(loop);
>>   }
>>   
>> +void poll_enable_persist(void) {
>> +    struct poll_loop *loop = poll_loop();
>> +
>> +    loop->persist = true;
>> +}
>> +
>>   static struct poll_loop *
>>   poll_loop(void)
>>   {
>> @@ -418,6 +572,7 @@ poll_loop(void)
>>           loop->timeout_when = LLONG_MAX;
>>           hmap_init(&loop->poll_nodes);
>>           xpthread_setspecific(key, loop);
>> +        loop->persist = false;
>>       }
>>       return loop;
>>   }
>> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
>> index 46ee7ae27..22261fedb 100644
>> --- a/lib/stream-fd.c
>> +++ b/lib/stream-fd.c
>> @@ -65,6 +65,9 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
>>   
>>       s = xmalloc(sizeof *s);
>>       stream_init(&s->stream, &stream_fd_class, connect_status, name);
>> +    s->stream.persist = poll_fd_register(fd, POLLIN, &s->stream.hint);
>> +    s->stream.rx_ready = true;
>> +    s->stream.tx_ready = true;
>>       s->fd = fd;
>>       s->fd_type = fd_type;
>>       *streamp = &s->stream;
>> @@ -82,6 +85,9 @@ static void
>>   fd_close(struct stream *stream)
>>   {
>>       struct stream_fd *s = stream_fd_cast(stream);
>> +    if (s->stream.persist) {
>> +        poll_fd_deregister(s->fd);
>> +    }
>>       closesocket(s->fd);
>>       free(s);
>>   }
>> @@ -104,6 +110,26 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
>>       ssize_t retval;
>>       int error;
>>   
>> +    if (stream->persist && stream->hint) {
>> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
>> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
>> +         * for all other cases we belive what (e)poll has fed us.
>> +         */
>> +        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (!stream->rx_ready)) {
>> +            if (!(stream->hint->revents & POLLIN)) {
>> +                return -EAGAIN;
>> +            } else {
>> +                /* POLLIN event from poll loop, mark us as ready */
>> +                stream->rx_ready = true;
>> +                stream->hint->revents &= ~POLLIN;
>> +            }
>> +        } else {
>> +            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
>> +        }
>> +    }
>> +
>> +
>> +
>>       retval = recv(s->fd, buffer, n, 0);
>>       if (retval < 0) {
>>           error = sock_errno();
>> @@ -127,6 +153,19 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>>       ssize_t retval;
>>       int error;
>>   
>> +    if (stream->persist && stream->hint) {
>> +        /* poll-loop is providing us with hints for IO */
>> +        if (!stream->tx_ready) {
>> +            if (!(stream->hint->revents & POLLOUT)) {
>> +                return -EAGAIN;
>> +            } else {
>> +                /* POLLOUT event from poll loop, mark us as ready */
>> +                stream->tx_ready = true;
>> +                stream->hint->revents &= ~POLLOUT;
>> +            }
>> +        }
>> +    }
>> +
>>       retval = send(s->fd, buffer, n, 0);
>>       if (retval < 0) {
>>           error = sock_errno();
>> @@ -137,6 +176,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>>   #endif
>>           if (error != EAGAIN) {
>>               VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
>> +        } else {
>> +            stream->tx_ready = false;
>>           }
>>           return -error;
>>       }
>> @@ -150,11 +191,19 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>>       switch (wait) {
>>       case STREAM_CONNECT:
>>       case STREAM_SEND:
>> -        poll_fd_wait(s->fd, POLLOUT);
>> +        if (stream->persist) {
>> +            private_poll_fd_wait(s->fd, POLLOUT);
>> +        } else {
>> +            poll_fd_wait(s->fd, POLLOUT);
>> +        }
>>           break;
>>   
>>       case STREAM_RECV:
>> -        poll_fd_wait(s->fd, POLLIN);
>> +        if (stream->persist) {
>> +            private_poll_fd_wait(s->fd, POLLIN);
>> +        } else {
>> +            poll_fd_wait(s->fd, POLLIN);
>> +        }
>>           break;
>>   
>>       default:
>> @@ -219,6 +268,7 @@ new_fd_pstream(char *name, int fd,
>>   {
>>       struct fd_pstream *ps = xmalloc(sizeof *ps);
>>       pstream_init(&ps->pstream, &fd_pstream_class, name);
>> +    ps->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
>>       ps->fd = fd;
>>       ps->accept_cb = accept_cb;
>>       ps->unlink_path = unlink_path;
>> @@ -230,6 +280,9 @@ static void
>>   pfd_close(struct pstream *pstream)
>>   {
>>       struct fd_pstream *ps = fd_pstream_cast(pstream);
>> +    if (pstream->persist) {
>> +        poll_fd_deregister(ps->fd);
>> +    }
>>       closesocket(ps->fd);
>>       maybe_unlink_and_free(ps->unlink_path);
>>       free(ps);
>> @@ -271,7 +324,11 @@ static void
>>   pfd_wait(struct pstream *pstream)
>>   {
>>       struct fd_pstream *ps = fd_pstream_cast(pstream);
>> -    poll_fd_wait(ps->fd, POLLIN);
>> +    if (pstream->persist) {
>> +        private_poll_fd_wait(ps->fd, POLLIN);
>> +    } else {
>> +        poll_fd_wait(ps->fd, POLLIN);
>> +    }
>>   }
>>   
>>   static const struct pstream_class fd_pstream_class = {
>> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
>> index 75f4f059b..53de231dc 100644
>> --- a/lib/stream-provider.h
>> +++ b/lib/stream-provider.h
>> @@ -18,6 +18,7 @@
>>   #define STREAM_PROVIDER_H 1
>>   
>>   #include <sys/types.h>
>> +#include <poll.h>
>>   #include "stream.h"
>>   
>>   /* Active stream connection. */
>> @@ -31,6 +32,8 @@ struct stream {
>>       int error;
>>       char *name;
>>       char *peer_id;
>> +    bool persist, rx_ready, tx_ready;
>> +    struct pollfd *hint;
>>   };
>>   
>>   void stream_init(struct stream *, const struct stream_class *,
>> @@ -133,6 +136,7 @@ struct pstream {
>>       const struct pstream_class *class;
>>       char *name;
>>       ovs_be16 bound_port;
>> +    bool persist;
>>   };
>>   
>>   void pstream_init(struct pstream *, const struct pstream_class *, char *name);
>> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
>> index 078fcbc3a..056bb2023 100644
>> --- a/lib/stream-ssl.c
>> +++ b/lib/stream-ssl.c
>> @@ -310,6 +310,10 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
>>           SSL_set_msg_callback_arg(ssl, sslv);
>>       }
>>   
>> +    sslv->stream.persist = poll_fd_register(fd, POLLIN, &sslv->stream.hint);
>> +    sslv->stream.rx_ready = true;
>> +    sslv->stream.tx_ready = true;
>> +
>>       *streamp = &sslv->stream;
>>       free(server_name);
>>       return 0;
>> @@ -604,6 +608,7 @@ ssl_close(struct stream *stream)
>>       ERR_clear_error();
>>   
>>       SSL_free(sslv->ssl);
>> +    poll_fd_deregister(sslv->fd);
>>       closesocket(sslv->fd);
>>       free(sslv);
>>   }
>> @@ -697,6 +702,26 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
>>       /* Behavior of zero-byte SSL_read is poorly defined. */
>>       ovs_assert(n > 0);
>>   
>> +    if (stream->persist && stream->hint) {
>> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
>> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
>> +         * for all other cases we belive what (e)poll has fed us.
>> +         */
>> +        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
>> +            if (!(stream->hint->revents & POLLIN)) {
>> +                return -EAGAIN;
>> +            } else {
>> +                /* POLLIN event from poll loop, mark us as ready
>> +                 * rx_want is cleared further down by reading ssl fsm
>> +                 */
>> +                stream->hint->revents &= ~POLLIN;
>> +            }
>> +        } else {
>> +            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
>> +        }
>> +    }
>> +
>> +
>>       old_state = SSL_get_state(sslv->ssl);
>>       ret = SSL_read(sslv->ssl, buffer, n);
>>       if (old_state != SSL_get_state(sslv->ssl)) {
>> @@ -729,6 +754,21 @@ ssl_do_tx(struct stream *stream)
>>   {
>>       struct ssl_stream *sslv = ssl_stream_cast(stream);
>>   
>> +     if (stream->persist && stream->hint) {
>> +        /* poll-loop is providing us with hints for IO */
>> +        if (sslv->tx_want == SSL_WRITING) {
>> +            if (!(stream->hint->revents & POLLOUT)) {
>> +                return EAGAIN;
>> +            } else {
>> +                /* POLLIN event from poll loop, mark us as ready
>> +                 * rx_want is cleared further down by reading ssl fsm
>> +                 */
>> +                stream->hint->revents &= ~POLLOUT;
>> +            }
>> +        }
>> +    }
>> +
>> +
>>       for (;;) {
>>           int old_state = SSL_get_state(sslv->ssl);
>>           int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
>> @@ -771,6 +811,9 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
>>               ssl_clear_txbuf(sslv);
>>               return n;
>>           case EAGAIN:
>> +            if (stream->persist) {
>> +                stream_send_wait(stream);
>> +            }
>>               return n;
>>           default:
>>               ssl_clear_txbuf(sslv);
>> @@ -795,7 +838,11 @@ ssl_run_wait(struct stream *stream)
>>       struct ssl_stream *sslv = ssl_stream_cast(stream);
>>   
>>       if (sslv->tx_want != SSL_NOTHING) {
>> -        poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
>> +        if (stream->persist) {
>> +            private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
>> +        } else {
>> +            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
>> +        }
>>       }
>>   }
>>   
>> @@ -811,14 +858,23 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>>           } else {
>>               switch (sslv->state) {
>>               case STATE_TCP_CONNECTING:
>> -                poll_fd_wait(sslv->fd, POLLOUT);
>> +                if (stream->persist) {
>> +                    private_poll_fd_wait(sslv->fd, POLLOUT);
>> +                } else {
>> +                    poll_fd_wait(sslv->fd, POLLOUT);
>> +                }
>>                   break;
>>   
>>               case STATE_SSL_CONNECTING:
>>                   /* ssl_connect() called SSL_accept() or SSL_connect(), which
>>                    * set up the status that we test here. */
>> -                poll_fd_wait(sslv->fd,
>> +                if (stream->persist) {
>> +                    private_poll_fd_wait(sslv->fd,
>> +                               want_to_poll_events(SSL_want(sslv->ssl)));
>> +                } else {
>> +                    poll_fd_wait(sslv->fd,
>>                                  want_to_poll_events(SSL_want(sslv->ssl)));
>> +                }
>>                   break;
>>   
>>               default:
>> @@ -829,7 +885,11 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>>   
>>       case STREAM_RECV:
>>           if (sslv->rx_want != SSL_NOTHING) {
>> -            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
>> +            if (stream->persist) {
>> +                private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
>> +            } else {
>> +                poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
>> +            }
>>           } else {
>>               poll_immediate_wake();
>>           }
>> @@ -911,6 +971,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
>>                    ds_steal_cstr(&bound_name));
>>       pstream_set_bound_port(&pssl->pstream, htons(port));
>>       pssl->fd = fd;
>> +    pssl->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
>>       *pstreamp = &pssl->pstream;
>>   
>>       return 0;
>> @@ -921,6 +982,9 @@ pssl_close(struct pstream *pstream)
>>   {
>>       struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
>>       closesocket(pssl->fd);
>> +    if (pstream->persist) {
>> +        poll_fd_deregister(pssl->fd);
>> +    }
>>       free(pssl);
>>   }
>>   
>> @@ -965,7 +1029,11 @@ static void
>>   pssl_wait(struct pstream *pstream)
>>   {
>>       struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
>> -    poll_fd_wait(pssl->fd, POLLIN);
>> +    if (pstream->persist) {
>> +        private_poll_fd_wait(pssl->fd, POLLIN);
>> +    } else {
>> +        poll_fd_wait(pssl->fd, POLLIN);
>> +    }
>>   }
>>   
>>   const struct pstream_class pssl_pstream_class = {
>> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
>> index 34bc610b6..b845911b6 100644
>> --- a/lib/stream-windows.c
>> +++ b/lib/stream-windows.c
>> @@ -175,6 +175,7 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
>>       s->read_pending = false;
>>       s->write_pending = false;
>>       s->retry_connect = retry;
>> +    s->stream.persist = false;
>>       *streamp = &s->stream;
>>       return 0;
>>   }
>> @@ -649,6 +650,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix,
>>       p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
>>       p->pending = false;
>>       p->pipe_path = bind_path;
>> +    p->stream.persist = false;
>>       *pstreamp = &p->pstream;
>>       return 0;
>>   }
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index b6957d730..295a62afd 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -182,6 +182,7 @@ main_loop(struct server_config *config,
>>       *exiting = false;
>>       ssl_error = NULL;
>>       remotes_error = NULL;
>> +    poll_enable_persist();
>>       while (!*exiting) {
>>           memory_run();
>>           if (memory_should_report()) {
>> diff --git a/tests/ovsdb-cluster.at b/tests/ovsdb-cluster.at
>> index 3a0bd4579..1f71d4fe8 100644
>> --- a/tests/ovsdb-cluster.at
>> +++ b/tests/ovsdb-cluster.at
>> @@ -15,7 +15,7 @@ ovsdb_check_cluster () {
>>   
>>       on_exit 'kill `cat *.pid`'
>>       for i in `seq $n`; do
>> -        AT_CHECK([ovsdb-server -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>> +        AT_CHECK([ovsdb-server -vpoll_loop -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>>       done
>>       for i in `seq $n`; do
>>           AT_CHECK([ovsdb_client_wait unix:s$i.ovsdb $schema connected])
>> @@ -307,7 +307,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
>>       start_server() {
>>           local i=$1
>>           printf "\ns$i: starting\n"
>> -        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>> +        AT_CHECK([ovsdb-server -vjsonrpc -vpoll_loop -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>>       }
>>       connect_server() {
>>           local i=$1
>> @@ -465,7 +465,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
>>       start_server() {
>>           local i=$1
>>           printf "\ns$i: starting\n"
>> -        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>> +        AT_CHECK([ovsdb-server -vpoll_loop -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>>       }
>>       stop_server() {
>>           local i=$1
>

Patch
diff mbox series

diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
index 532640497..50d49a8ed 100644
--- a/include/openvswitch/poll-loop.h
+++ b/include/openvswitch/poll-loop.h
@@ -33,6 +33,8 @@ 
 #ifndef POLL_LOOP_H
 #define POLL_LOOP_H 1
 
+#include <stdbool.h>
+
 #ifndef _WIN32
 #include <poll.h>
 #endif
@@ -45,6 +47,24 @@ 
 extern "C" {
 #endif
 
+/* Register a fd with a persistence framework if available so it can be served
+ * "faster" and the caller can be provided with "hints" on what caused the IO
+ * event.
+ * If the "hint" argument is supplied it set to point to the pollfd structure
+ * containing the events passed by the OS in .revents. 
+ * returns false if persistence is not enabled.
+ */
+
+bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
+#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
+
+/* De-register a fd which was registered as "private" with the persistence
+ * framework
+ */
+
+void poll_fd_deregister_at(int fd, const char *where);
+#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
+
 
 /* Schedule events to wake up the following poll_block().
  *
@@ -58,6 +78,15 @@  extern "C" {
 void poll_fd_wait_at(int fd, short int events, const char *where);
 #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
 
+/* Schedule events to wake up the following poll_block() - "private fds"
+ * Same as poll_fd_wait, but for fds which have been registered and are
+ * expected to persist. If a "fast" OS fd notification framework is used
+ * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
+ */
+void private_poll_fd_wait_at(int fd, int events, const char *where);
+#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
+
+
 #ifdef _WIN32
 void poll_wevent_wait_at(HANDLE wevent, const char *where);
 #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
@@ -76,6 +105,10 @@  void poll_immediate_wake_at(const char *where);
 /* Wait until an event occurs. */
 void poll_block(void);
 
+/* Enable persistence for this thread's poll loop */
+void poll_enable_persist(void);
+
+
 #ifdef  __cplusplus
 }
 #endif
diff --git a/lib/poll-loop.c b/lib/poll-loop.c
index 4e751ff2c..ed8ad16b1 100644
--- a/lib/poll-loop.c
+++ b/lib/poll-loop.c
@@ -43,6 +43,7 @@  struct poll_node {
     struct pollfd pollfd;       /* Events to pass to time_poll(). */
     HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
     const char *where;          /* Where poll_node was created. */
+    bool valid;                 /* Marked invalid if we got a HUP/NVAL from poll */
 };
 
 struct poll_loop {
@@ -53,6 +54,7 @@  struct poll_loop {
      * wake up immediately, or LLONG_MAX to wait forever. */
     long long int timeout_when; /* In msecs as returned by time_msec(). */
     const char *timeout_where;  /* Where 'timeout_when' was set. */
+    bool persist;
 };
 
 static struct poll_loop *poll_loop(void);
@@ -99,8 +101,8 @@  find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
  * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
  * automatically provide the caller's source file and line number for
  * 'where'.) */
-static void
-poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
+static struct poll_node
+*poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
 {
     struct poll_loop *loop = poll_loop();
     struct poll_node *node;
@@ -127,7 +129,9 @@  poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
 #endif
         node->wevent = wevent;
         node->where = where;
+        node->valid = true;
     }
+    return node;
 }
 
 /* Registers 'fd' as waiting for the specified 'events' (which should be POLLIN
@@ -149,6 +153,44 @@  poll_fd_wait_at(int fd, short int events, const char *where)
     poll_create_node(fd, 0, events, where);
 }
 
+void
+private_poll_fd_wait_at(int fd, int events, const char *where)
+{
+    if (events & (~POLLIN)) {
+        poll_create_node(fd, 0, events, where);
+    }
+}
+
+
+bool
+poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
+    struct poll_loop *loop = poll_loop();
+    struct poll_node *node;
+    if (loop->persist) {
+        node = poll_create_node(fd, 0, events, where);
+        if (hint) {
+            *hint = &node->pollfd;
+        }
+        return true;
+    } 
+    return false;
+}
+
+
+void
+poll_fd_deregister_at(int fd, const char *where) {
+    struct poll_loop *loop = poll_loop();
+    struct poll_node *node;
+
+    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
+
+    node = find_poll_node(loop, fd, 0);
+    if (node) {
+        hmap_remove(&loop->poll_nodes, &node->hmap_node);
+    }
+}
+
+
 #ifdef _WIN32
 /* Registers for the next call to poll_block() to wake up when 'wevent' is
  * signaled.
@@ -312,13 +354,9 @@  free_poll_nodes(struct poll_loop *loop)
     }
 }
 
-/* Blocks until one or more of the events registered with poll_fd_wait()
- * occurs, or until the minimum duration registered with poll_timer_wait()
- * elapses, or not at all if poll_immediate_wake() has been called. */
-void
-poll_block(void)
+static void
+non_persist_poll_block(struct poll_loop *loop)
 {
-    struct poll_loop *loop = poll_loop();
     struct poll_node *node;
     struct pollfd *pollfds;
     HANDLE *wevents = NULL;
@@ -389,7 +427,117 @@  poll_block(void)
 
     seq_woke();
 }
-
+
+static void
+persist_poll_block(struct poll_loop *loop)
+{
+    struct poll_node *node;
+    struct pollfd *pollfds;
+    HANDLE *wevents = NULL;
+    int elapsed;
+    int retval;
+    int i, counter;
+
+    /* Register fatal signal events before actually doing any real work for
+     * poll_block. */
+    fatal_signal_wait();
+
+    if (loop->timeout_when == LLONG_MIN) {
+        COVERAGE_INC(poll_zero_timeout);
+    }
+
+    timewarp_run();
+    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
+
+#ifdef _WIN32
+    wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
+#endif
+
+    /* Populate with all the fds and events. */
+    counter = 0;
+    HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
+        if (node->pollfd.events && node->valid) {
+            pollfds[counter] = node->pollfd;
+            counter++;
+        }
+#ifdef _WIN32
+        wevents[i] = node->wevent;
+        if (node->pollfd.fd && node->wevent) {
+            short int wsa_events = 0;
+            if (node->pollfd.events & POLLIN) {
+                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+            }
+            if (node->pollfd.events & POLLOUT) {
+                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+            }
+            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
+        }
+#endif
+        i++;
+    }
+
+    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
+                       loop->timeout_when, &elapsed);
+    if (retval < 0) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+        VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
+    } else if (retval == 0) {
+        log_wakeup(loop->timeout_where, NULL, elapsed);
+    } else {
+        for (i = 0; i < counter; i++) {
+            node = find_poll_node(loop, pollfds[i].fd, 0);
+            if (!node) {
+                VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
+            }
+            if (pollfds[i].revents & (POLLHUP | POLLNVAL)) {
+                node->valid = false;
+            }
+            if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+                if (pollfds[i].revents) {
+                    log_wakeup(node->where, &pollfds[i], 0);
+                }
+            }
+            /* update "requested" events.
+            * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
+            * behaviour which they should be using in real life instead of using poll()
+            */
+            node->pollfd.events &= ~(pollfds[i].revents & (~POLLIN));
+            /* update "occured" events for use by streams and handlers. In case there
+             * is an existing (but not consumed yet) event, we OR the events in the
+             * stored record with the new ones - it is the job of the stream to clear
+             * that.
+             */
+            node->pollfd.revents |= pollfds[i].revents;
+        }
+    }
+
+    loop->timeout_when = LLONG_MAX;
+    loop->timeout_where = NULL;
+    free(pollfds);
+    free(wevents);
+
+    /* Handle any pending signals before doing anything else. */
+    fatal_signal_run();
+
+    seq_woke();
+
+}
+/* Blocks until one or more of the events registered with poll_fd_wait()
+ * occurs, or until the minimum duration registered with poll_timer_wait()
+ * elapses, or not at all if poll_immediate_wake() has been called. */
+
+void
+poll_block(void)
+{
+    struct poll_loop *loop = poll_loop();
+    if (loop->persist) {
+        persist_poll_block(loop);
+    } else {
+        non_persist_poll_block(loop);
+    }
+}
+
+
 static void
 free_poll_loop(void *loop_)
 {
@@ -400,6 +548,12 @@  free_poll_loop(void *loop_)
     free(loop);
 }
 
+void poll_enable_persist(void) {
+    struct poll_loop *loop = poll_loop();
+
+    loop->persist = true;
+}
+
 static struct poll_loop *
 poll_loop(void)
 {
@@ -418,6 +572,7 @@  poll_loop(void)
         loop->timeout_when = LLONG_MAX;
         hmap_init(&loop->poll_nodes);
         xpthread_setspecific(key, loop);
+        loop->persist = false;
     }
     return loop;
 }
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 46ee7ae27..22261fedb 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -65,6 +65,9 @@  new_fd_stream(char *name, int fd, int connect_status, int fd_type,
 
     s = xmalloc(sizeof *s);
     stream_init(&s->stream, &stream_fd_class, connect_status, name);
+    s->stream.persist = poll_fd_register(fd, POLLIN, &s->stream.hint);
+    s->stream.rx_ready = true;
+    s->stream.tx_ready = true;
     s->fd = fd;
     s->fd_type = fd_type;
     *streamp = &s->stream;
@@ -82,6 +85,9 @@  static void
 fd_close(struct stream *stream)
 {
     struct stream_fd *s = stream_fd_cast(stream);
+    if (s->stream.persist) {
+        poll_fd_deregister(s->fd);
+    }
     closesocket(s->fd);
     free(s);
 }
@@ -104,6 +110,26 @@  fd_recv(struct stream *stream, void *buffer, size_t n)
     ssize_t retval;
     int error;
 
+    if (stream->persist && stream->hint) {
+        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
+         * to the read which should return 0 if the HUP is a real one, if not we clear it
+         * for all other cases we belive what (e)poll has fed us.
+         */
+        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (!stream->rx_ready)) {
+            if (!(stream->hint->revents & POLLIN)) {
+                return -EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready */
+                stream->rx_ready = true;
+                stream->hint->revents &= ~POLLIN;
+            }
+        } else {
+            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
+        }
+    }
+
+
+
     retval = recv(s->fd, buffer, n, 0);
     if (retval < 0) {
         error = sock_errno();
@@ -127,6 +153,19 @@  fd_send(struct stream *stream, const void *buffer, size_t n)
     ssize_t retval;
     int error;
 
+    if (stream->persist && stream->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (!stream->tx_ready) {
+            if (!(stream->hint->revents & POLLOUT)) {
+                return -EAGAIN;
+            } else {
+                /* POLLOUT event from poll loop, mark us as ready */
+                stream->tx_ready = true;
+                stream->hint->revents &= ~POLLOUT;
+            }
+        }
+    }
+
     retval = send(s->fd, buffer, n, 0);
     if (retval < 0) {
         error = sock_errno();
@@ -137,6 +176,8 @@  fd_send(struct stream *stream, const void *buffer, size_t n)
 #endif
         if (error != EAGAIN) {
             VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
+        } else {
+            stream->tx_ready = false;
         }
         return -error;
     }
@@ -150,11 +191,19 @@  fd_wait(struct stream *stream, enum stream_wait_type wait)
     switch (wait) {
     case STREAM_CONNECT:
     case STREAM_SEND:
-        poll_fd_wait(s->fd, POLLOUT);
+        if (stream->persist) {
+            private_poll_fd_wait(s->fd, POLLOUT);
+        } else {
+            poll_fd_wait(s->fd, POLLOUT);
+        }
         break;
 
     case STREAM_RECV:
-        poll_fd_wait(s->fd, POLLIN);
+        if (stream->persist) {
+            private_poll_fd_wait(s->fd, POLLIN);
+        } else {
+            poll_fd_wait(s->fd, POLLIN);
+        }
         break;
 
     default:
@@ -219,6 +268,7 @@  new_fd_pstream(char *name, int fd,
 {
     struct fd_pstream *ps = xmalloc(sizeof *ps);
     pstream_init(&ps->pstream, &fd_pstream_class, name);
+    ps->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
     ps->fd = fd;
     ps->accept_cb = accept_cb;
     ps->unlink_path = unlink_path;
@@ -230,6 +280,9 @@  static void
 pfd_close(struct pstream *pstream)
 {
     struct fd_pstream *ps = fd_pstream_cast(pstream);
+    if (pstream->persist) {
+        poll_fd_deregister(ps->fd);
+    }
     closesocket(ps->fd);
     maybe_unlink_and_free(ps->unlink_path);
     free(ps);
@@ -271,7 +324,11 @@  static void
 pfd_wait(struct pstream *pstream)
 {
     struct fd_pstream *ps = fd_pstream_cast(pstream);
-    poll_fd_wait(ps->fd, POLLIN);
+    if (pstream->persist) {
+        private_poll_fd_wait(ps->fd, POLLIN);
+    } else {
+        poll_fd_wait(ps->fd, POLLIN);
+    }
 }
 
 static const struct pstream_class fd_pstream_class = {
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 75f4f059b..53de231dc 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -18,6 +18,7 @@ 
 #define STREAM_PROVIDER_H 1
 
 #include <sys/types.h>
+#include <poll.h>
 #include "stream.h"
 
 /* Active stream connection. */
@@ -31,6 +32,8 @@  struct stream {
     int error;
     char *name;
     char *peer_id;
+    bool persist, rx_ready, tx_ready;
+    struct pollfd *hint;
 };
 
 void stream_init(struct stream *, const struct stream_class *,
@@ -133,6 +136,7 @@  struct pstream {
     const struct pstream_class *class;
     char *name;
     ovs_be16 bound_port;
+    bool persist;
 };
 
 void pstream_init(struct pstream *, const struct pstream_class *, char *name);
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 078fcbc3a..056bb2023 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -310,6 +310,10 @@  new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
         SSL_set_msg_callback_arg(ssl, sslv);
     }
 
+    sslv->stream.persist = poll_fd_register(fd, POLLIN, &sslv->stream.hint);
+    sslv->stream.rx_ready = true;
+    sslv->stream.tx_ready = true;
+
     *streamp = &sslv->stream;
     free(server_name);
     return 0;
@@ -604,6 +608,7 @@  ssl_close(struct stream *stream)
     ERR_clear_error();
 
     SSL_free(sslv->ssl);
+    poll_fd_deregister(sslv->fd);
     closesocket(sslv->fd);
     free(sslv);
 }
@@ -697,6 +702,26 @@  ssl_recv(struct stream *stream, void *buffer, size_t n)
     /* Behavior of zero-byte SSL_read is poorly defined. */
     ovs_assert(n > 0);
 
+    if (stream->persist && stream->hint) {
+        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
+         * to the read which should return 0 if the HUP is a real one, if not we clear it
+         * for all other cases we belive what (e)poll has fed us.
+         */
+        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
+            if (!(stream->hint->revents & POLLIN)) {
+                return -EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready 
+                 * rx_want is cleared further down by reading ssl fsm
+                 */
+                stream->hint->revents &= ~POLLIN;
+            }
+        } else {
+            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
+        }
+    }
+
+
     old_state = SSL_get_state(sslv->ssl);
     ret = SSL_read(sslv->ssl, buffer, n);
     if (old_state != SSL_get_state(sslv->ssl)) {
@@ -729,6 +754,21 @@  ssl_do_tx(struct stream *stream)
 {
     struct ssl_stream *sslv = ssl_stream_cast(stream);
 
+     if (stream->persist && stream->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (sslv->tx_want == SSL_WRITING) {
+            if (!(stream->hint->revents & POLLOUT)) {
+                return EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready
+                 * rx_want is cleared further down by reading ssl fsm
+                 */
+                stream->hint->revents &= ~POLLOUT;
+            }
+        }
+    }
+
+
     for (;;) {
         int old_state = SSL_get_state(sslv->ssl);
         int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
@@ -771,6 +811,9 @@  ssl_send(struct stream *stream, const void *buffer, size_t n)
             ssl_clear_txbuf(sslv);
             return n;
         case EAGAIN:
+            if (stream->persist) {
+                stream_send_wait(stream);
+            }
             return n;
         default:
             ssl_clear_txbuf(sslv);
@@ -795,7 +838,11 @@  ssl_run_wait(struct stream *stream)
     struct ssl_stream *sslv = ssl_stream_cast(stream);
 
     if (sslv->tx_want != SSL_NOTHING) {
-        poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+        if (stream->persist) {
+            private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+        } else {
+            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+        }
     }
 }
 
@@ -811,14 +858,23 @@  ssl_wait(struct stream *stream, enum stream_wait_type wait)
         } else {
             switch (sslv->state) {
             case STATE_TCP_CONNECTING:
-                poll_fd_wait(sslv->fd, POLLOUT);
+                if (stream->persist) {
+                    private_poll_fd_wait(sslv->fd, POLLOUT);
+                } else {
+                    poll_fd_wait(sslv->fd, POLLOUT);
+                }
                 break;
 
             case STATE_SSL_CONNECTING:
                 /* ssl_connect() called SSL_accept() or SSL_connect(), which
                  * set up the status that we test here. */
-                poll_fd_wait(sslv->fd,
+                if (stream->persist) {
+                    private_poll_fd_wait(sslv->fd,
+                               want_to_poll_events(SSL_want(sslv->ssl)));
+                } else {
+                    poll_fd_wait(sslv->fd,
                                want_to_poll_events(SSL_want(sslv->ssl)));
+                }
                 break;
 
             default:
@@ -829,7 +885,11 @@  ssl_wait(struct stream *stream, enum stream_wait_type wait)
 
     case STREAM_RECV:
         if (sslv->rx_want != SSL_NOTHING) {
-            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+            if (stream->persist) {
+                private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+            } else {
+                poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+            }
         } else {
             poll_immediate_wake();
         }
@@ -911,6 +971,7 @@  pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
                  ds_steal_cstr(&bound_name));
     pstream_set_bound_port(&pssl->pstream, htons(port));
     pssl->fd = fd;
+    pssl->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
     *pstreamp = &pssl->pstream;
 
     return 0;
@@ -921,6 +982,9 @@  pssl_close(struct pstream *pstream)
 {
     struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
     closesocket(pssl->fd);
+    if (pstream->persist) {
+        poll_fd_deregister(pssl->fd);
+    }
     free(pssl);
 }
 
@@ -965,7 +1029,11 @@  static void
 pssl_wait(struct pstream *pstream)
 {
     struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
-    poll_fd_wait(pssl->fd, POLLIN);
+    if (pstream->persist) {
+        private_poll_fd_wait(pssl->fd, POLLIN);
+    } else {
+        poll_fd_wait(pssl->fd, POLLIN);
+    }
 }
 
 const struct pstream_class pssl_pstream_class = {
diff --git a/lib/stream-windows.c b/lib/stream-windows.c
index 34bc610b6..b845911b6 100644
--- a/lib/stream-windows.c
+++ b/lib/stream-windows.c
@@ -175,6 +175,7 @@  windows_open(const char *name, char *suffix, struct stream **streamp,
     s->read_pending = false;
     s->write_pending = false;
     s->retry_connect = retry;
+    s->stream.persist = false;
     *streamp = &s->stream;
     return 0;
 }
@@ -649,6 +650,7 @@  pwindows_open(const char *name OVS_UNUSED, char *suffix,
     p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
     p->pending = false;
     p->pipe_path = bind_path;
+    p->stream.persist = false;
     *pstreamp = &p->pstream;
     return 0;
 }
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index b6957d730..295a62afd 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -182,6 +182,7 @@  main_loop(struct server_config *config,
     *exiting = false;
     ssl_error = NULL;
     remotes_error = NULL;
+    poll_enable_persist();
     while (!*exiting) {
         memory_run();
         if (memory_should_report()) {
diff --git a/tests/ovsdb-cluster.at b/tests/ovsdb-cluster.at
index 3a0bd4579..1f71d4fe8 100644
--- a/tests/ovsdb-cluster.at
+++ b/tests/ovsdb-cluster.at
@@ -15,7 +15,7 @@  ovsdb_check_cluster () {
 
     on_exit 'kill `cat *.pid`'
     for i in `seq $n`; do
-        AT_CHECK([ovsdb-server -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
+        AT_CHECK([ovsdb-server -vpoll_loop -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
     done
     for i in `seq $n`; do
         AT_CHECK([ovsdb_client_wait unix:s$i.ovsdb $schema connected])
@@ -307,7 +307,7 @@  ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
     start_server() {
         local i=$1
         printf "\ns$i: starting\n"
-        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
+        AT_CHECK([ovsdb-server -vjsonrpc -vpoll_loop -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
     }
     connect_server() {
         local i=$1
@@ -465,7 +465,7 @@  ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
     start_server() {
         local i=$1
         printf "\ns$i: starting\n"
-        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
+        AT_CHECK([ovsdb-server -vpoll_loop -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
     }
     stop_server() {
         local i=$1