From patchwork Wed Feb 26 11:52:37 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1244960 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.133; helo=hemlock.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=cambridgegreys.com Received: from hemlock.osuosl.org (smtp2.osuosl.org [140.211.166.133]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 48SDhL4Vv3z9sNg for ; Wed, 26 Feb 2020 22:53:06 +1100 (AEDT) Received: from localhost (localhost [127.0.0.1]) by hemlock.osuosl.org (Postfix) with ESMTP id 324518796C; Wed, 26 Feb 2020 11:53:03 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from hemlock.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id IwbTez2Uy+j8; Wed, 26 Feb 2020 11:52:55 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by hemlock.osuosl.org (Postfix) with ESMTP id 0F857878CD; Wed, 26 Feb 2020 11:52:55 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 0383BC08A0; Wed, 26 Feb 2020 11:52:55 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from fraxinus.osuosl.org (smtp4.osuosl.org [140.211.166.137]) by lists.linuxfoundation.org (Postfix) with ESMTP id 4615FC0177 for ; Wed, 26 Feb 2020 11:52:52 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by fraxinus.osuosl.org (Postfix) with ESMTP id 35924862C1 for ; Wed, 26 Feb 2020 11:52:52 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from fraxinus.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id VbOwWG7SOwUz for ; Wed, 26 Feb 2020 11:52:50 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.7.6 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by fraxinus.osuosl.org (Postfix) with ESMTPS id E3C31862B4 for ; Wed, 26 Feb 2020 11:52:49 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1j6vF8-0005hh-9M; Wed, 26 Feb 2020 11:52:46 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1j6vF4-0000qd-I7; Wed, 26 Feb 2020 11:52:44 +0000 From: anton.ivanov@cambridgegreys.com To: dev@openvswitch.org Date: Wed, 26 Feb 2020 11:52:37 +0000 Message-Id: <20200226115239.3192-1-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [PATCH v3 1/3] Add file descriptor persistence where possible X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov 1. Adds "persistent" behaviour where feasible streams. These are waited upon in the same thread where they are created. This allows them to be registered persistently with the OS (if possible) as well as the OS to provide hints - is the FD ready, is it closed, etc. 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd which is not ready if the thread has a persistent poll loop. 3. Enables fd persistence for ovsdb main server thread only Signed-off-by: Anton Ivanov --- include/openvswitch/poll-loop.h | 33 ++++++ lib/poll-loop.c | 173 ++++++++++++++++++++++++++++++-- lib/stream-fd.c | 63 +++++++++++- lib/stream-provider.h | 4 + lib/stream-ssl.c | 78 +++++++++++++- lib/stream-windows.c | 2 + ovsdb/ovsdb-server.c | 1 + tests/ovsdb-cluster.at | 6 +- 8 files changed, 340 insertions(+), 20 deletions(-) diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h index 532640497..50d49a8ed 100644 --- a/include/openvswitch/poll-loop.h +++ b/include/openvswitch/poll-loop.h @@ -33,6 +33,8 @@ #ifndef POLL_LOOP_H #define POLL_LOOP_H 1 +#include + #ifndef _WIN32 #include #endif @@ -45,6 +47,24 @@ extern "C" { #endif +/* Register a fd with a persistence framework if available so it can be served + * "faster" and the caller can be provided with "hints" on what caused the IO + * event. + * If the "hint" argument is supplied it set to point to the pollfd structure + * containing the events passed by the OS in .revents. + * returns false if persistence is not enabled. + */ + +bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where); +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR) + +/* De-register a fd which was registered as "private" with the persistence + * framework + */ + +void poll_fd_deregister_at(int fd, const char *where); +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR) + /* Schedule events to wake up the following poll_block(). * @@ -58,6 +78,15 @@ extern "C" { void poll_fd_wait_at(int fd, short int events, const char *where); #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR) +/* Schedule events to wake up the following poll_block() - "private fds" + * Same as poll_fd_wait, but for fds which have been registered and are + * expected to persist. If a "fast" OS fd notification framework is used + * this version of wait may be a NOOP (f.e. for (E)POLLIN events. + */ +void private_poll_fd_wait_at(int fd, int events, const char *where); +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR) + + #ifdef _WIN32 void poll_wevent_wait_at(HANDLE wevent, const char *where); #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR) @@ -76,6 +105,10 @@ void poll_immediate_wake_at(const char *where); /* Wait until an event occurs. */ void poll_block(void); +/* Enable persistence for this thread's poll loop */ +void poll_enable_persist(void); + + #ifdef __cplusplus } #endif diff --git a/lib/poll-loop.c b/lib/poll-loop.c index 4e751ff2c..68e44eba2 100644 --- a/lib/poll-loop.c +++ b/lib/poll-loop.c @@ -43,6 +43,7 @@ struct poll_node { struct pollfd pollfd; /* Events to pass to time_poll(). */ HANDLE wevent; /* Events for WaitForMultipleObjects(). */ const char *where; /* Where poll_node was created. */ + bool valid; /* Marked invalid if we got a HUP/NVAL from poll */ }; struct poll_loop { @@ -53,6 +54,7 @@ struct poll_loop { * wake up immediately, or LLONG_MAX to wait forever. */ long long int timeout_when; /* In msecs as returned by time_msec(). */ const char *timeout_where; /* Where 'timeout_when' was set. */ + bool persist; }; static struct poll_loop *poll_loop(void); @@ -99,8 +101,8 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent) * ('where' is used in debug logging. Commonly one would use poll_fd_wait() to * automatically provide the caller's source file and line number for * 'where'.) */ -static void -poll_create_node(int fd, HANDLE wevent, short int events, const char *where) +static struct poll_node +*poll_create_node(int fd, HANDLE wevent, short int events, const char *where) { struct poll_loop *loop = poll_loop(); struct poll_node *node; @@ -127,7 +129,9 @@ poll_create_node(int fd, HANDLE wevent, short int events, const char *where) #endif node->wevent = wevent; node->where = where; + node->valid = true; } + return node; } /* Registers 'fd' as waiting for the specified 'events' (which should be POLLIN @@ -149,6 +153,44 @@ poll_fd_wait_at(int fd, short int events, const char *where) poll_create_node(fd, 0, events, where); } +void +private_poll_fd_wait_at(int fd, int events, const char *where) +{ + if (events & (~POLLIN)) { + poll_create_node(fd, 0, events, where); + } +} + + +bool +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) { + struct poll_loop *loop = poll_loop(); + struct poll_node *node; + if (loop->persist) { + node = poll_create_node(fd, 0, events, where); + if (hint) { + *hint = &node->pollfd; + } + return true; + } + return false; +} + + +void +poll_fd_deregister_at(int fd, const char *where) { + struct poll_loop *loop = poll_loop(); + struct poll_node *node; + + VLOG(VLL_DBG, "Deregister %d from %s", fd, where); + + node = find_poll_node(loop, fd, 0); + if (node) { + hmap_remove(&loop->poll_nodes, &node->hmap_node); + } +} + + #ifdef _WIN32 /* Registers for the next call to poll_block() to wake up when 'wevent' is * signaled. @@ -312,13 +354,9 @@ free_poll_nodes(struct poll_loop *loop) } } -/* Blocks until one or more of the events registered with poll_fd_wait() - * occurs, or until the minimum duration registered with poll_timer_wait() - * elapses, or not at all if poll_immediate_wake() has been called. */ -void -poll_block(void) +static void +non_persist_poll_block(struct poll_loop *loop) { - struct poll_loop *loop = poll_loop(); struct poll_node *node; struct pollfd *pollfds; HANDLE *wevents = NULL; @@ -389,7 +427,117 @@ poll_block(void) seq_woke(); } - + +static void +persist_poll_block(struct poll_loop *loop) +{ + struct poll_node *node; + struct pollfd *pollfds; + HANDLE *wevents = NULL; + int elapsed; + int retval; + int i, counter; + + /* Register fatal signal events before actually doing any real work for + * poll_block. */ + fatal_signal_wait(); + + if (loop->timeout_when == LLONG_MIN) { + COVERAGE_INC(poll_zero_timeout); + } + + timewarp_run(); + pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds); + +#ifdef _WIN32 + wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents); +#endif + + /* Populate with all the fds and events. */ + counter = 0; + HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) { + if (node->pollfd.events && node->valid) { + pollfds[counter] = node->pollfd; + counter++; + } +#ifdef _WIN32 + wevents[i] = node->wevent; + if (node->pollfd.fd && node->wevent) { + short int wsa_events = 0; + if (node->pollfd.events & POLLIN) { + wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE; + } + if (node->pollfd.events & POLLOUT) { + wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE; + } + WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events); + } +#endif + i++; + } + + retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents, + loop->timeout_when, &elapsed); + if (retval < 0) { + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); + VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval)); + } else if (retval == 0) { + log_wakeup(loop->timeout_where, NULL, elapsed); + } else { + for (i = 0; i < counter; i++) { + node = find_poll_node(loop, pollfds[i].fd, 0); + if (!node) { + VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd); + } + if (pollfds[i].revents & (POLLHUP | POLLNVAL)) { + node->valid = false; + } + if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) { + if (pollfds[i].revents) { + log_wakeup(node->where, &pollfds[i], 0); + } + } + /* update "requested" events. + * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc + * behaviour which they should be using in real life instead of using poll() + */ + node->pollfd.events &= ~(pollfds[i].revents & (~POLLIN)); + /* update "occured" events for use by streams and handlers. In case there + * is an existing (but not consumed yet) event, we OR the events in the + * stored record with the new ones - it is the job of the stream to clear + * that. + */ + node->pollfd.revents |= pollfds[i].revents; + } + } + + loop->timeout_when = LLONG_MAX; + loop->timeout_where = NULL; + free(pollfds); + free(wevents); + + /* Handle any pending signals before doing anything else. */ + fatal_signal_run(); + + seq_woke(); + +} +/* Blocks until one or more of the events registered with poll_fd_wait() + * occurs, or until the minimum duration registered with poll_timer_wait() + * elapses, or not at all if poll_immediate_wake() has been called. */ + +void +poll_block(void) +{ + struct poll_loop *loop = poll_loop(); + if (loop->persist) { + persist_poll_block(loop); + } else { + non_persist_poll_block(loop); + } +} + + static void free_poll_loop(void *loop_) { @@ -400,6 +548,12 @@ free_poll_loop(void *loop_) free(loop); } +void poll_enable_persist(void) { + struct poll_loop *loop = poll_loop(); + + loop->persist = true; +} + static struct poll_loop * poll_loop(void) { @@ -418,6 +572,7 @@ poll_loop(void) loop->timeout_when = LLONG_MAX; hmap_init(&loop->poll_nodes); xpthread_setspecific(key, loop); + loop->persist = false; } return loop; } diff --git a/lib/stream-fd.c b/lib/stream-fd.c index 46ee7ae27..791bb96f6 100644 --- a/lib/stream-fd.c +++ b/lib/stream-fd.c @@ -65,6 +65,9 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type, s = xmalloc(sizeof *s); stream_init(&s->stream, &stream_fd_class, connect_status, name); + s->stream.persist = poll_fd_register(fd, POLLIN, &s->stream.hint); + s->stream.rx_ready = true; + s->stream.tx_ready = true; s->fd = fd; s->fd_type = fd_type; *streamp = &s->stream; @@ -82,6 +85,9 @@ static void fd_close(struct stream *stream) { struct stream_fd *s = stream_fd_cast(stream); + if (s->stream.persist) { + poll_fd_deregister(s->fd); + } closesocket(s->fd); free(s); } @@ -104,6 +110,26 @@ fd_recv(struct stream *stream, void *buffer, size_t n) ssize_t retval; int error; + if (stream->persist && stream->hint) { + /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight + * to the read which should return 0 if the HUP is a real one, if not we clear it + * for all other cases we belive what (e)poll has fed us. + */ + if ((!(stream->hint->revents & (POLLHUP|POLLNVAL))) && (!stream->rx_ready)) { + if (!(stream->hint->revents & POLLIN)) { + return -EAGAIN; + } else { + /* POLLIN event from poll loop, mark us as ready */ + stream->rx_ready = true; + stream->hint->revents &= ~POLLIN; + } + } else { + stream->hint->revents &= ~(POLLHUP|POLLNVAL); + } + } + + + retval = recv(s->fd, buffer, n, 0); if (retval < 0) { error = sock_errno(); @@ -127,6 +153,19 @@ fd_send(struct stream *stream, const void *buffer, size_t n) ssize_t retval; int error; + if (stream->persist && stream->hint) { + /* poll-loop is providing us with hints for IO */ + if (!stream->tx_ready) { + if (!(stream->hint->revents & POLLOUT)) { + return -EAGAIN; + } else { + /* POLLOUT event from poll loop, mark us as ready */ + stream->tx_ready = true; + stream->hint->revents &= ~POLLOUT; + } + } + } + retval = send(s->fd, buffer, n, 0); if (retval < 0) { error = sock_errno(); @@ -137,6 +176,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n) #endif if (error != EAGAIN) { VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error)); + } else { + stream->tx_ready = false; } return -error; } @@ -150,11 +191,19 @@ fd_wait(struct stream *stream, enum stream_wait_type wait) switch (wait) { case STREAM_CONNECT: case STREAM_SEND: - poll_fd_wait(s->fd, POLLOUT); + if (stream->persist) { + private_poll_fd_wait(s->fd, POLLOUT); + } else { + poll_fd_wait(s->fd, POLLOUT); + } break; case STREAM_RECV: - poll_fd_wait(s->fd, POLLIN); + if (stream->persist) { + private_poll_fd_wait(s->fd, POLLIN); + } else { + poll_fd_wait(s->fd, POLLIN); + } break; default: @@ -219,6 +268,7 @@ new_fd_pstream(char *name, int fd, { struct fd_pstream *ps = xmalloc(sizeof *ps); pstream_init(&ps->pstream, &fd_pstream_class, name); + ps->pstream.persist = poll_fd_register(fd, POLLIN, NULL); ps->fd = fd; ps->accept_cb = accept_cb; ps->unlink_path = unlink_path; @@ -230,6 +280,9 @@ static void pfd_close(struct pstream *pstream) { struct fd_pstream *ps = fd_pstream_cast(pstream); + if (pstream->persist) { + poll_fd_deregister(ps->fd); + } closesocket(ps->fd); maybe_unlink_and_free(ps->unlink_path); free(ps); @@ -271,7 +324,11 @@ static void pfd_wait(struct pstream *pstream) { struct fd_pstream *ps = fd_pstream_cast(pstream); - poll_fd_wait(ps->fd, POLLIN); + if (pstream->persist) { + private_poll_fd_wait(ps->fd, POLLIN); + } else { + poll_fd_wait(ps->fd, POLLIN); + } } static const struct pstream_class fd_pstream_class = { diff --git a/lib/stream-provider.h b/lib/stream-provider.h index 75f4f059b..53de231dc 100644 --- a/lib/stream-provider.h +++ b/lib/stream-provider.h @@ -18,6 +18,7 @@ #define STREAM_PROVIDER_H 1 #include +#include #include "stream.h" /* Active stream connection. */ @@ -31,6 +32,8 @@ struct stream { int error; char *name; char *peer_id; + bool persist, rx_ready, tx_ready; + struct pollfd *hint; }; void stream_init(struct stream *, const struct stream_class *, @@ -133,6 +136,7 @@ struct pstream { const struct pstream_class *class; char *name; ovs_be16 bound_port; + bool persist; }; void pstream_init(struct pstream *, const struct pstream_class *, char *name); diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c index 078fcbc3a..14abacf4a 100644 --- a/lib/stream-ssl.c +++ b/lib/stream-ssl.c @@ -310,6 +310,10 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type, SSL_set_msg_callback_arg(ssl, sslv); } + sslv->stream.persist = poll_fd_register(fd, POLLIN, &sslv->stream.hint); + sslv->stream.rx_ready = true; + sslv->stream.tx_ready = true; + *streamp = &sslv->stream; free(server_name); return 0; @@ -604,6 +608,7 @@ ssl_close(struct stream *stream) ERR_clear_error(); SSL_free(sslv->ssl); + poll_fd_deregister(sslv->fd); closesocket(sslv->fd); free(sslv); } @@ -697,6 +702,26 @@ ssl_recv(struct stream *stream, void *buffer, size_t n) /* Behavior of zero-byte SSL_read is poorly defined. */ ovs_assert(n > 0); + if (stream->persist && stream->hint) { + /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight + * to the read which should return 0 if the HUP is a real one, if not we clear it + * for all other cases we belive what (e)poll has fed us. + */ + if ((!(stream->hint->revents & (POLLHUP|POLLNVAL))) && (sslv->rx_want == SSL_READING)) { + if (!(stream->hint->revents & POLLIN)) { + return -EAGAIN; + } else { + /* POLLIN event from poll loop, mark us as ready + * rx_want is cleared further down by reading ssl fsm + */ + stream->hint->revents &= ~POLLIN; + } + } else { + stream->hint->revents &= ~(POLLHUP|POLLNVAL); + } + } + + old_state = SSL_get_state(sslv->ssl); ret = SSL_read(sslv->ssl, buffer, n); if (old_state != SSL_get_state(sslv->ssl)) { @@ -729,6 +754,21 @@ ssl_do_tx(struct stream *stream) { struct ssl_stream *sslv = ssl_stream_cast(stream); + if (stream->persist && stream->hint) { + /* poll-loop is providing us with hints for IO */ + if (sslv->tx_want == SSL_WRITING) { + if (!(stream->hint->revents & POLLOUT)) { + return EAGAIN; + } else { + /* POLLIN event from poll loop, mark us as ready + * rx_want is cleared further down by reading ssl fsm + */ + stream->hint->revents &= ~POLLOUT; + } + } + } + + for (;;) { int old_state = SSL_get_state(sslv->ssl); int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size); @@ -771,6 +811,9 @@ ssl_send(struct stream *stream, const void *buffer, size_t n) ssl_clear_txbuf(sslv); return n; case EAGAIN: + if (stream->persist) { + stream_send_wait(stream); + } return n; default: ssl_clear_txbuf(sslv); @@ -795,7 +838,11 @@ ssl_run_wait(struct stream *stream) struct ssl_stream *sslv = ssl_stream_cast(stream); if (sslv->tx_want != SSL_NOTHING) { - poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want)); + if (stream->persist) { + private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want)); + } else { + poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want)); + } } } @@ -811,14 +858,23 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait) } else { switch (sslv->state) { case STATE_TCP_CONNECTING: - poll_fd_wait(sslv->fd, POLLOUT); + if (stream->persist) { + private_poll_fd_wait(sslv->fd, POLLOUT); + } else { + poll_fd_wait(sslv->fd, POLLOUT); + } break; case STATE_SSL_CONNECTING: /* ssl_connect() called SSL_accept() or SSL_connect(), which * set up the status that we test here. */ - poll_fd_wait(sslv->fd, + if (stream->persist) { + private_poll_fd_wait(sslv->fd, + want_to_poll_events(SSL_want(sslv->ssl))); + } else { + poll_fd_wait(sslv->fd, want_to_poll_events(SSL_want(sslv->ssl))); + } break; default: @@ -829,7 +885,11 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait) case STREAM_RECV: if (sslv->rx_want != SSL_NOTHING) { - poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want)); + if (stream->persist) { + private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want)); + } else { + poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want)); + } } else { poll_immediate_wake(); } @@ -911,6 +971,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp, ds_steal_cstr(&bound_name)); pstream_set_bound_port(&pssl->pstream, htons(port)); pssl->fd = fd; + pssl->pstream.persist = poll_fd_register(fd, POLLIN, NULL); *pstreamp = &pssl->pstream; return 0; @@ -921,6 +982,9 @@ pssl_close(struct pstream *pstream) { struct pssl_pstream *pssl = pssl_pstream_cast(pstream); closesocket(pssl->fd); + if (pstream->persist) { + poll_fd_deregister(pssl->fd); + } free(pssl); } @@ -965,7 +1029,11 @@ static void pssl_wait(struct pstream *pstream) { struct pssl_pstream *pssl = pssl_pstream_cast(pstream); - poll_fd_wait(pssl->fd, POLLIN); + if (pstream->persist) { + private_poll_fd_wait(pssl->fd, POLLIN); + } else { + poll_fd_wait(pssl->fd, POLLIN); + } } const struct pstream_class pssl_pstream_class = { diff --git a/lib/stream-windows.c b/lib/stream-windows.c index 34bc610b6..b845911b6 100644 --- a/lib/stream-windows.c +++ b/lib/stream-windows.c @@ -175,6 +175,7 @@ windows_open(const char *name, char *suffix, struct stream **streamp, s->read_pending = false; s->write_pending = false; s->retry_connect = retry; + s->stream.persist = false; *streamp = &s->stream; return 0; } @@ -649,6 +650,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix, p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL); p->pending = false; p->pipe_path = bind_path; + p->stream.persist = false; *pstreamp = &p->pstream; return 0; } diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index b6957d730..295a62afd 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -182,6 +182,7 @@ main_loop(struct server_config *config, *exiting = false; ssl_error = NULL; remotes_error = NULL; + poll_enable_persist(); while (!*exiting) { memory_run(); if (memory_should_report()) { diff --git a/tests/ovsdb-cluster.at b/tests/ovsdb-cluster.at index 3a0bd4579..1f71d4fe8 100644 --- a/tests/ovsdb-cluster.at +++ b/tests/ovsdb-cluster.at @@ -15,7 +15,7 @@ ovsdb_check_cluster () { on_exit 'kill `cat *.pid`' for i in `seq $n`; do - AT_CHECK([ovsdb-server -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db]) + AT_CHECK([ovsdb-server -vpoll_loop -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db]) done for i in `seq $n`; do AT_CHECK([ovsdb_client_wait unix:s$i.ovsdb $schema connected]) @@ -307,7 +307,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral start_server() { local i=$1 printf "\ns$i: starting\n" - AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db]) + AT_CHECK([ovsdb-server -vjsonrpc -vpoll_loop -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db]) } connect_server() { local i=$1 @@ -465,7 +465,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral start_server() { local i=$1 printf "\ns$i: starting\n" - AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db]) + AT_CHECK([ovsdb-server -vpoll_loop -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db]) } stop_server() { local i=$1