@@ -33,6 +33,8 @@
#ifndef POLL_LOOP_H
#define POLL_LOOP_H 1
+#include <stdbool.h>
+
#ifndef _WIN32
#include <poll.h>
#endif
@@ -45,6 +47,24 @@
extern "C" {
#endif
+/* Register a fd with a persistence framework if available so it can be served
+ * "faster" and the caller can be provided with "hints" on what caused the IO
+ * event.
+ * If the "hint" argument is supplied it set to point to the pollfd structure
+ * containing the events passed by the OS in .revents.
+ * returns false if persistence is not enabled.
+ */
+
+bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
+#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
+
+/* De-register a fd which was registered as "private" with the persistence
+ * framework
+ */
+
+void poll_fd_deregister_at(int fd, const char *where);
+#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
+
/* Schedule events to wake up the following poll_block().
*
@@ -58,6 +78,15 @@ extern "C" {
void poll_fd_wait_at(int fd, short int events, const char *where);
#define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
+/* Schedule events to wake up the following poll_block() - "private fds"
+ * Same as poll_fd_wait, but for fds which have been registered and are
+ * expected to persist. If a "fast" OS fd notification framework is used
+ * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
+ */
+void private_poll_fd_wait_at(int fd, int events, const char *where);
+#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
+
+
#ifdef _WIN32
void poll_wevent_wait_at(HANDLE wevent, const char *where);
#define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
@@ -76,6 +105,10 @@ void poll_immediate_wake_at(const char *where);
/* Wait until an event occurs. */
void poll_block(void);
+/* Enable persistence for this thread's poll loop */
+void poll_enable_persist(void);
+
+
#ifdef __cplusplus
}
#endif
@@ -43,6 +43,7 @@ struct poll_node {
struct pollfd pollfd; /* Events to pass to time_poll(). */
HANDLE wevent; /* Events for WaitForMultipleObjects(). */
const char *where; /* Where poll_node was created. */
+ bool valid; /* Marked invalid if we got a HUP/NVAL from poll */
};
struct poll_loop {
@@ -53,6 +54,7 @@ struct poll_loop {
* wake up immediately, or LLONG_MAX to wait forever. */
long long int timeout_when; /* In msecs as returned by time_msec(). */
const char *timeout_where; /* Where 'timeout_when' was set. */
+ bool persist;
};
static struct poll_loop *poll_loop(void);
@@ -99,8 +101,8 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
* ('where' is used in debug logging. Commonly one would use poll_fd_wait() to
* automatically provide the caller's source file and line number for
* 'where'.) */
-static void
-poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
+static struct poll_node
+*poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
{
struct poll_loop *loop = poll_loop();
struct poll_node *node;
@@ -127,7 +129,9 @@ poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
#endif
node->wevent = wevent;
node->where = where;
+ node->valid = true;
}
+ return node;
}
/* Registers 'fd' as waiting for the specified 'events' (which should be POLLIN
@@ -149,6 +153,44 @@ poll_fd_wait_at(int fd, short int events, const char *where)
poll_create_node(fd, 0, events, where);
}
+void
+private_poll_fd_wait_at(int fd, int events, const char *where)
+{
+ if (events & (~POLLIN)) {
+ poll_create_node(fd, 0, events, where);
+ }
+}
+
+
+bool
+poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
+ struct poll_loop *loop = poll_loop();
+ struct poll_node *node;
+ if (loop->persist) {
+ node = poll_create_node(fd, 0, events, where);
+ if (hint) {
+ *hint = &node->pollfd;
+ }
+ return true;
+ }
+ return false;
+}
+
+
+void
+poll_fd_deregister_at(int fd, const char *where) {
+ struct poll_loop *loop = poll_loop();
+ struct poll_node *node;
+
+ VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
+
+ node = find_poll_node(loop, fd, 0);
+ if (node) {
+ hmap_remove(&loop->poll_nodes, &node->hmap_node);
+ }
+}
+
+
#ifdef _WIN32
/* Registers for the next call to poll_block() to wake up when 'wevent' is
* signaled.
@@ -312,13 +354,9 @@ free_poll_nodes(struct poll_loop *loop)
}
}
-/* Blocks until one or more of the events registered with poll_fd_wait()
- * occurs, or until the minimum duration registered with poll_timer_wait()
- * elapses, or not at all if poll_immediate_wake() has been called. */
-void
-poll_block(void)
+static void
+non_persist_poll_block(struct poll_loop *loop)
{
- struct poll_loop *loop = poll_loop();
struct poll_node *node;
struct pollfd *pollfds;
HANDLE *wevents = NULL;
@@ -389,7 +427,117 @@ poll_block(void)
seq_woke();
}
-
+
+static void
+persist_poll_block(struct poll_loop *loop)
+{
+ struct poll_node *node;
+ struct pollfd *pollfds;
+ HANDLE *wevents = NULL;
+ int elapsed;
+ int retval;
+ int i, counter;
+
+ /* Register fatal signal events before actually doing any real work for
+ * poll_block. */
+ fatal_signal_wait();
+
+ if (loop->timeout_when == LLONG_MIN) {
+ COVERAGE_INC(poll_zero_timeout);
+ }
+
+ timewarp_run();
+ pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
+
+#ifdef _WIN32
+ wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
+#endif
+
+ /* Populate with all the fds and events. */
+ counter = 0;
+ HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
+ if (node->pollfd.events && node->valid) {
+ pollfds[counter] = node->pollfd;
+ counter++;
+ }
+#ifdef _WIN32
+ wevents[i] = node->wevent;
+ if (node->pollfd.fd && node->wevent) {
+ short int wsa_events = 0;
+ if (node->pollfd.events & POLLIN) {
+ wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+ }
+ if (node->pollfd.events & POLLOUT) {
+ wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+ }
+ WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
+ }
+#endif
+ i++;
+ }
+
+ retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
+ loop->timeout_when, &elapsed);
+ if (retval < 0) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+ VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
+ } else if (retval == 0) {
+ log_wakeup(loop->timeout_where, NULL, elapsed);
+ } else {
+ for (i = 0; i < counter; i++) {
+ node = find_poll_node(loop, pollfds[i].fd, 0);
+ if (!node) {
+ VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
+ }
+ if (pollfds[i].revents & (POLLHUP | POLLNVAL)) {
+ node->valid = false;
+ }
+ if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+ if (pollfds[i].revents) {
+ log_wakeup(node->where, &pollfds[i], 0);
+ }
+ }
+ /* update "requested" events.
+ * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
+ * behaviour which they should be using in real life instead of using poll()
+ */
+ node->pollfd.events &= ~(pollfds[i].revents & (~POLLIN));
+ /* update "occured" events for use by streams and handlers. In case there
+ * is an existing (but not consumed yet) event, we OR the events in the
+ * stored record with the new ones - it is the job of the stream to clear
+ * that.
+ */
+ node->pollfd.revents |= pollfds[i].revents;
+ }
+ }
+
+ loop->timeout_when = LLONG_MAX;
+ loop->timeout_where = NULL;
+ free(pollfds);
+ free(wevents);
+
+ /* Handle any pending signals before doing anything else. */
+ fatal_signal_run();
+
+ seq_woke();
+
+}
+/* Blocks until one or more of the events registered with poll_fd_wait()
+ * occurs, or until the minimum duration registered with poll_timer_wait()
+ * elapses, or not at all if poll_immediate_wake() has been called. */
+
+void
+poll_block(void)
+{
+ struct poll_loop *loop = poll_loop();
+ if (loop->persist) {
+ persist_poll_block(loop);
+ } else {
+ non_persist_poll_block(loop);
+ }
+}
+
+
static void
free_poll_loop(void *loop_)
{
@@ -400,6 +548,12 @@ free_poll_loop(void *loop_)
free(loop);
}
+void poll_enable_persist(void) {
+ struct poll_loop *loop = poll_loop();
+
+ loop->persist = true;
+}
+
static struct poll_loop *
poll_loop(void)
{
@@ -418,6 +572,7 @@ poll_loop(void)
loop->timeout_when = LLONG_MAX;
hmap_init(&loop->poll_nodes);
xpthread_setspecific(key, loop);
+ loop->persist = false;
}
return loop;
}
@@ -65,6 +65,9 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
s = xmalloc(sizeof *s);
stream_init(&s->stream, &stream_fd_class, connect_status, name);
+ s->stream.persist = poll_fd_register(fd, POLLIN, &s->stream.hint);
+ s->stream.rx_ready = true;
+ s->stream.tx_ready = true;
s->fd = fd;
s->fd_type = fd_type;
*streamp = &s->stream;
@@ -82,6 +85,9 @@ static void
fd_close(struct stream *stream)
{
struct stream_fd *s = stream_fd_cast(stream);
+ if (s->stream.persist) {
+ poll_fd_deregister(s->fd);
+ }
closesocket(s->fd);
free(s);
}
@@ -104,6 +110,26 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
ssize_t retval;
int error;
+ if (stream->persist && stream->hint) {
+ /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
+ * to the read which should return 0 if the HUP is a real one, if not we clear it
+ * for all other cases we belive what (e)poll has fed us.
+ */
+ if ((!(stream->hint->revents & (POLLHUP|POLLNVAL))) && (!stream->rx_ready)) {
+ if (!(stream->hint->revents & POLLIN)) {
+ return -EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready */
+ stream->rx_ready = true;
+ stream->hint->revents &= ~POLLIN;
+ }
+ } else {
+ stream->hint->revents &= ~(POLLHUP|POLLNVAL);
+ }
+ }
+
+
+
retval = recv(s->fd, buffer, n, 0);
if (retval < 0) {
error = sock_errno();
@@ -127,6 +153,19 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
ssize_t retval;
int error;
+ if (stream->persist && stream->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (!stream->tx_ready) {
+ if (!(stream->hint->revents & POLLOUT)) {
+ return -EAGAIN;
+ } else {
+ /* POLLOUT event from poll loop, mark us as ready */
+ stream->tx_ready = true;
+ stream->hint->revents &= ~POLLOUT;
+ }
+ }
+ }
+
retval = send(s->fd, buffer, n, 0);
if (retval < 0) {
error = sock_errno();
@@ -137,6 +176,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
+ } else {
+ stream->tx_ready = false;
}
return -error;
}
@@ -150,11 +191,19 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
switch (wait) {
case STREAM_CONNECT:
case STREAM_SEND:
- poll_fd_wait(s->fd, POLLOUT);
+ if (stream->persist) {
+ private_poll_fd_wait(s->fd, POLLOUT);
+ } else {
+ poll_fd_wait(s->fd, POLLOUT);
+ }
break;
case STREAM_RECV:
- poll_fd_wait(s->fd, POLLIN);
+ if (stream->persist) {
+ private_poll_fd_wait(s->fd, POLLIN);
+ } else {
+ poll_fd_wait(s->fd, POLLIN);
+ }
break;
default:
@@ -219,6 +268,7 @@ new_fd_pstream(char *name, int fd,
{
struct fd_pstream *ps = xmalloc(sizeof *ps);
pstream_init(&ps->pstream, &fd_pstream_class, name);
+ ps->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
ps->fd = fd;
ps->accept_cb = accept_cb;
ps->unlink_path = unlink_path;
@@ -230,6 +280,9 @@ static void
pfd_close(struct pstream *pstream)
{
struct fd_pstream *ps = fd_pstream_cast(pstream);
+ if (pstream->persist) {
+ poll_fd_deregister(ps->fd);
+ }
closesocket(ps->fd);
maybe_unlink_and_free(ps->unlink_path);
free(ps);
@@ -271,7 +324,11 @@ static void
pfd_wait(struct pstream *pstream)
{
struct fd_pstream *ps = fd_pstream_cast(pstream);
- poll_fd_wait(ps->fd, POLLIN);
+ if (pstream->persist) {
+ private_poll_fd_wait(ps->fd, POLLIN);
+ } else {
+ poll_fd_wait(ps->fd, POLLIN);
+ }
}
static const struct pstream_class fd_pstream_class = {
@@ -18,6 +18,7 @@
#define STREAM_PROVIDER_H 1
#include <sys/types.h>
+#include <poll.h>
#include "stream.h"
/* Active stream connection. */
@@ -31,6 +32,8 @@ struct stream {
int error;
char *name;
char *peer_id;
+ bool persist, rx_ready, tx_ready;
+ struct pollfd *hint;
};
void stream_init(struct stream *, const struct stream_class *,
@@ -133,6 +136,7 @@ struct pstream {
const struct pstream_class *class;
char *name;
ovs_be16 bound_port;
+ bool persist;
};
void pstream_init(struct pstream *, const struct pstream_class *, char *name);
@@ -310,6 +310,10 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
SSL_set_msg_callback_arg(ssl, sslv);
}
+ sslv->stream.persist = poll_fd_register(fd, POLLIN, &sslv->stream.hint);
+ sslv->stream.rx_ready = true;
+ sslv->stream.tx_ready = true;
+
*streamp = &sslv->stream;
free(server_name);
return 0;
@@ -604,6 +608,7 @@ ssl_close(struct stream *stream)
ERR_clear_error();
SSL_free(sslv->ssl);
+ poll_fd_deregister(sslv->fd);
closesocket(sslv->fd);
free(sslv);
}
@@ -697,6 +702,26 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
/* Behavior of zero-byte SSL_read is poorly defined. */
ovs_assert(n > 0);
+ if (stream->persist && stream->hint) {
+ /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
+ * to the read which should return 0 if the HUP is a real one, if not we clear it
+ * for all other cases we belive what (e)poll has fed us.
+ */
+ if ((!(stream->hint->revents & (POLLHUP|POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
+ if (!(stream->hint->revents & POLLIN)) {
+ return -EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready
+ * rx_want is cleared further down by reading ssl fsm
+ */
+ stream->hint->revents &= ~POLLIN;
+ }
+ } else {
+ stream->hint->revents &= ~(POLLHUP|POLLNVAL);
+ }
+ }
+
+
old_state = SSL_get_state(sslv->ssl);
ret = SSL_read(sslv->ssl, buffer, n);
if (old_state != SSL_get_state(sslv->ssl)) {
@@ -729,6 +754,21 @@ ssl_do_tx(struct stream *stream)
{
struct ssl_stream *sslv = ssl_stream_cast(stream);
+ if (stream->persist && stream->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (sslv->tx_want == SSL_WRITING) {
+ if (!(stream->hint->revents & POLLOUT)) {
+ return EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready
+ * rx_want is cleared further down by reading ssl fsm
+ */
+ stream->hint->revents &= ~POLLOUT;
+ }
+ }
+ }
+
+
for (;;) {
int old_state = SSL_get_state(sslv->ssl);
int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
@@ -771,6 +811,9 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
ssl_clear_txbuf(sslv);
return n;
case EAGAIN:
+ if (stream->persist) {
+ stream_send_wait(stream);
+ }
return n;
default:
ssl_clear_txbuf(sslv);
@@ -795,7 +838,11 @@ ssl_run_wait(struct stream *stream)
struct ssl_stream *sslv = ssl_stream_cast(stream);
if (sslv->tx_want != SSL_NOTHING) {
- poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+ if (stream->persist) {
+ private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+ } else {
+ poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+ }
}
}
@@ -811,14 +858,23 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
} else {
switch (sslv->state) {
case STATE_TCP_CONNECTING:
- poll_fd_wait(sslv->fd, POLLOUT);
+ if (stream->persist) {
+ private_poll_fd_wait(sslv->fd, POLLOUT);
+ } else {
+ poll_fd_wait(sslv->fd, POLLOUT);
+ }
break;
case STATE_SSL_CONNECTING:
/* ssl_connect() called SSL_accept() or SSL_connect(), which
* set up the status that we test here. */
- poll_fd_wait(sslv->fd,
+ if (stream->persist) {
+ private_poll_fd_wait(sslv->fd,
+ want_to_poll_events(SSL_want(sslv->ssl)));
+ } else {
+ poll_fd_wait(sslv->fd,
want_to_poll_events(SSL_want(sslv->ssl)));
+ }
break;
default:
@@ -829,7 +885,11 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
case STREAM_RECV:
if (sslv->rx_want != SSL_NOTHING) {
- poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+ if (stream->persist) {
+ private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+ } else {
+ poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+ }
} else {
poll_immediate_wake();
}
@@ -911,6 +971,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
ds_steal_cstr(&bound_name));
pstream_set_bound_port(&pssl->pstream, htons(port));
pssl->fd = fd;
+ pssl->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
*pstreamp = &pssl->pstream;
return 0;
@@ -921,6 +982,9 @@ pssl_close(struct pstream *pstream)
{
struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
closesocket(pssl->fd);
+ if (pstream->persist) {
+ poll_fd_deregister(pssl->fd);
+ }
free(pssl);
}
@@ -965,7 +1029,11 @@ static void
pssl_wait(struct pstream *pstream)
{
struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
- poll_fd_wait(pssl->fd, POLLIN);
+ if (pstream->persist) {
+ private_poll_fd_wait(pssl->fd, POLLIN);
+ } else {
+ poll_fd_wait(pssl->fd, POLLIN);
+ }
}
const struct pstream_class pssl_pstream_class = {
@@ -175,6 +175,7 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
s->read_pending = false;
s->write_pending = false;
s->retry_connect = retry;
+ s->stream.persist = false;
*streamp = &s->stream;
return 0;
}
@@ -649,6 +650,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix,
p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
p->pending = false;
p->pipe_path = bind_path;
+ p->stream.persist = false;
*pstreamp = &p->pstream;
return 0;
}
@@ -182,6 +182,7 @@ main_loop(struct server_config *config,
*exiting = false;
ssl_error = NULL;
remotes_error = NULL;
+ poll_enable_persist();
while (!*exiting) {
memory_run();
if (memory_should_report()) {
@@ -15,7 +15,7 @@ ovsdb_check_cluster () {
on_exit 'kill `cat *.pid`'
for i in `seq $n`; do
- AT_CHECK([ovsdb-server -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
+ AT_CHECK([ovsdb-server -vpoll_loop -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
done
for i in `seq $n`; do
AT_CHECK([ovsdb_client_wait unix:s$i.ovsdb $schema connected])
@@ -307,7 +307,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
start_server() {
local i=$1
printf "\ns$i: starting\n"
- AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
+ AT_CHECK([ovsdb-server -vjsonrpc -vpoll_loop -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
}
connect_server() {
local i=$1
@@ -465,7 +465,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
start_server() {
local i=$1
printf "\ns$i: starting\n"
- AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
+ AT_CHECK([ovsdb-server -vpoll_loop -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
}
stop_server() {
local i=$1