@@ -41,11 +41,29 @@
#include <windows.h>
#endif
+#ifdef __linux__
+#define OVS_USE_EPOLL
+#endif
+
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+
+#define OVS_POLLIN EPOLLIN
+#define OVS_POLLOUT EPOLLOUT
+#define OVS_POLLERR EPOLLERR
+#define OVS_POLLHUP EPOLLHUP
+#define OVS_ONESHOT EPOLLONESHOT
+
+#else
+
#define OVS_POLLIN POLLIN
#define OVS_POLLOUT POLLOUT
#define OVS_POLLERR POLLERR
#define OVS_POLLNVAL POLLNVAL
#define OVS_POLLHUP POLLHUP
+#define OVS_ONESHOT (1U << 30)
+
+#endif
#ifdef __cplusplus
extern "C" {
@@ -61,7 +79,13 @@ extern "C" {
* caller to supply a location explicitly, which is useful if the caller's own
* caller would be more useful in log output. See timer_wait_at() for an
* example. */
-void poll_fd_wait_at(int fd, short int events, const char *where);
+void poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
+#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
+
+void poll_fd_deregister_at(int fd, const char *where);
+#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
+
+void poll_fd_wait_at(int fd, int events, const char *where);
#define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
#ifdef _WIN32
@@ -1289,7 +1289,7 @@ dpif_netlink_port_poll_wait(const struct dpif *dpif_)
const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
if (dpif->port_notifier) {
- nl_sock_wait(dpif->port_notifier, POLLIN);
+ nl_sock_wait(dpif->port_notifier, OVS_POLLIN);
} else {
poll_immediate_wake();
}
@@ -2295,12 +2295,15 @@ static int
dpif_netlink_handler_init(struct dpif_handler *handler)
{
handler->epoll_fd = epoll_create(10);
+ /* we should consider merging this into the main epoll loop on Linux */
+ poll_fd_register(handler->epoll_fd, OVS_POLLIN, NULL);
return handler->epoll_fd < 0 ? errno : 0;
}
static void
dpif_netlink_handler_uninit(struct dpif_handler *handler)
{
+ poll_fd_deregister(handler->epoll_fd);
close(handler->epoll_fd);
}
#endif
@@ -2756,13 +2759,13 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif, uint32_t handler_id)
}
for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
- nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
+ nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN);
}
#else
if (dpif->handlers && handler_id < dpif->n_handlers) {
struct dpif_handler *handler = &dpif->handlers[handler_id];
- poll_fd_wait(handler->epoll_fd, POLLIN);
+ poll_fd_wait(handler->epoll_fd, OVS_POLLIN);
}
#endif
}
@@ -96,6 +96,7 @@ fatal_signal_init(void)
ovs_mutex_init_recursive(&mutex);
#ifndef _WIN32
xpipe_nonblocking(signal_fds);
+ poll_fd_register(signal_fds[0], OVS_POLLIN, NULL);
#else
wevent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!wevent) {
@@ -236,9 +237,12 @@ void
fatal_signal_run(void)
{
sig_atomic_t sig_nr;
+ char sigbuffer[_POSIX_PIPE_BUF];
fatal_signal_init();
+ read(signal_fds[0], sigbuffer, sizeof(sigbuffer));
+
sig_nr = stored_sig_nr;
if (sig_nr != SIG_ATOMIC_MAX) {
char namebuf[SIGNAL_NAME_BUFSIZE];
@@ -28,12 +28,14 @@ void
latch_init(struct latch *latch)
{
xpipe_nonblocking(latch->fds);
+ poll_fd_register(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, NULL); /* register, but not set any events */
}
/* Destroys 'latch'. */
void
latch_destroy(struct latch *latch)
{
+ poll_fd_deregister(latch->fds[0]);
close(latch->fds[0]);
close(latch->fds[1]);
}
@@ -83,5 +85,6 @@ latch_is_set(const struct latch *latch)
void
latch_wait_at(const struct latch *latch, const char *where)
{
- poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where);
+ /* Ask for wait and make it one-shot if persistence is in play */
+ poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where);
}
@@ -184,7 +184,7 @@ xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem,
if (xsk_ring_prod__needs_wakeup(&umem->fq)) {
pfd.fd = fd;
- pfd.events = OVS_POLLIN;
+ pfd.events = POLLIN;
ret = poll(&pfd, 1, 0);
if (OVS_UNLIKELY(ret < 0)) {
@@ -365,6 +365,8 @@ netdev_bsd_construct_tap(struct netdev *netdev_)
}
netdev->kernel_name = kernel_name;
+ /* BSD does not implement this yet so this is a noop at this point */
+ poll_fd_register(netdev->tap_fd, OVS_POLLIN, NULL);
return 0;
@@ -384,9 +386,11 @@ netdev_bsd_destruct(struct netdev *netdev_)
cache_notifier_unref();
if (netdev->tap_fd >= 0) {
+ poll_fd_deregister(netdev->tap_fd);
destroy_tap(netdev->tap_fd, netdev_get_kernel_name(netdev_));
}
if (netdev->pcap) {
+ poll_fd_deregister(pcap_get_selectable_fd(pcap));
pcap_close(netdev->pcap);
}
free(netdev->kernel_name);
@@ -459,6 +463,7 @@ netdev_bsd_open_pcap(const char *name, pcap_t **pcapp, int *fdp)
*pcapp = pcap;
*fdp = fd;
+ poll_fd_register(fd, OVS_POLLIN, NULL);
return 0;
error:
@@ -1149,6 +1149,10 @@ netdev_linux_rxq_construct(struct netdev_rxq *rxq_)
goto error;
}
}
+ /* Register with persistent event frameworks if available - this
+ * should be able to grab both raw and tap cases
+ */
+ poll_fd_register(rx->fd, OVS_POLLIN, NULL);
ovs_mutex_unlock(&netdev->mutex);
return 0;
@@ -1167,6 +1171,8 @@ netdev_linux_rxq_destruct(struct netdev_rxq *rxq_)
struct netdev_rxq_linux *rx = netdev_rxq_linux_cast(rxq_);
int i;
+ poll_fd_deregister(rx->fd);
+
if (!rx->is_tap) {
close(rx->fd);
}
@@ -230,6 +230,9 @@ nl_sock_create(int protocol, struct nl_sock **sockp)
sock->pid = local.nl_pid;
#endif
+ /* Register socket persistently where supported */
+ poll_fd_register(sock->fd, OVS_POLLIN, NULL);
+
*sockp = sock;
return 0;
@@ -276,6 +279,7 @@ nl_sock_destroy(struct nl_sock *sock)
}
CloseHandle(sock->handle);
#else
+ poll_fd_deregister(sock->fd);
close(sock->fd);
#endif
free(sock);
@@ -18,6 +18,12 @@
#include "openvswitch/poll-loop.h"
#include <errno.h>
#include <inttypes.h>
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifndef _WIN32
+#include <unistd.h>
+#endif
#include <poll.h>
#include <stdlib.h>
#include <string.h>
@@ -39,12 +45,17 @@ COVERAGE_DEFINE(poll_create_node);
COVERAGE_DEFINE(poll_zero_timeout);
struct poll_node {
+#ifndef OVS_USE_EPOLL
+ struct ovs_list list_node;
+#endif
struct hmap_node hmap_node;
struct pollfd pollfd; /* Events to pass to time_poll(). */
HANDLE wevent; /* Events for WaitForMultipleObjects(). */
const char *where; /* Where poll_node was created. */
};
+#define MAX_EPOLL_EVENTS 64
+
struct poll_loop {
/* All active poll waiters. */
struct hmap poll_nodes;
@@ -53,6 +64,10 @@ struct poll_loop {
* wake up immediately, or LLONG_MAX to wait forever. */
long long int timeout_when; /* In msecs as returned by time_msec(). */
const char *timeout_where; /* Where 'timeout_when' was set. */
+#ifdef OVS_USE_EPOLL
+ int epoll_fd;
+ struct epoll_event epoll_events[MAX_EPOLL_EVENTS];
+#endif
};
static struct poll_loop *poll_loop(void);
@@ -76,79 +91,108 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
}
return NULL;
}
-
-/* On Unix based systems:
- *
- * Registers 'fd' as waiting for the specified 'events' (which should be
- * OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to
- * poll_block() will wake up when 'fd' becomes ready for one or more of the
- * requested events. The 'fd's are given to poll() function later.
- *
- * On Windows system:
+/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
+ * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to poll_block() will
+ * wake up when 'fd' becomes ready for one or more of the requested events.
*
- * If 'fd' is specified, create a new 'wevent'. Association of 'fd' and
- * 'wevent' for 'events' happens in poll_block(). If 'wevent' is specified,
- * it is assumed that it is unrelated to any sockets and poll_block()
- * will wake up on any event on that 'wevent'. It is an error to pass
- * both 'wevent' and 'fd'.
+ * The event registration is PERSISTENT. This is intended for OSes which have a persistent
+ * event framework. For now it is implemented only for epoll and Linux, other
+ * implementations such as BSD kqueue and Solaris /dev/poll may follow.
*
- * The event registration is one-shot: only the following call to
- * poll_block() is affected. The event will need to be re-registered after
- * poll_block() is called if it is to persist.
+ * If the OS has no persistent even framework does nothing
*
* ('where' is used in debug logging. Commonly one would use poll_fd_wait() to
* automatically provide the caller's source file and line number for
* 'where'.) */
+
static void
-poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
+poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd **hint, const char *where)
{
struct poll_loop *loop = poll_loop();
struct poll_node *node;
+#ifdef OVS_USE_EPOLL
+ struct epoll_event event;
+#endif
- COVERAGE_INC(poll_create_node);
-
- /* Both 'fd' and 'wevent' cannot be set. */
ovs_assert(!fd != !wevent);
/* Check for duplicate. If found, "or" the events. */
node = find_poll_node(loop, fd, wevent);
if (node) {
- node->pollfd.events |= events;
+#ifdef OVS_USE_EPOLL
+ int old_event_mask = node->pollfd.events;
+#endif
+ /* If there is an existing event mask we do not need to inc - this will be waited upon */
+ node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll specific bits */
+
+#ifdef OVS_USE_EPOLL
+ /* modify existing epoll entry if there is an epoll specific ask or if the
+ * mask has changed
+ */
+ if ((events & 0xFFFF0000) || (old_event_mask != node->pollfd.events)) {
+ event.events = node->pollfd.events | events;
+ event.data.ptr = node;
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event);
+ }
+#endif
} else {
node = xzalloc(sizeof *node);
hmap_insert(&loop->poll_nodes, &node->hmap_node,
- hash_2words(fd, (uint32_t)wevent));
+ hash_2words(fd, 0));
node->pollfd.fd = fd;
- node->pollfd.events = events;
-#ifdef _WIN32
- if (!wevent) {
- wevent = CreateEvent(NULL, FALSE, FALSE, NULL);
- }
-#endif
+ node->pollfd.events = (events & 0x0000FFFF);
node->wevent = wevent;
node->where = where;
+#ifdef OVS_USE_EPOLL
+ event.events = node->pollfd.events;
+ event.data.ptr = node;
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &event);
+#endif
+ }
+ if (hint) {
+ *hint = &node->pollfd;
}
}
-/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
- * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to poll_block() will
- * wake up when 'fd' becomes ready for one or more of the requested events.
- *
- * On Windows, 'fd' must be a socket.
- *
- * The event registration is one-shot: only the following call to poll_block()
- * is affected. The event will need to be re-registered after poll_block() is
- * called if it is to persist.
- *
- * ('where' is used in debug logging. Commonly one would use poll_fd_wait() to
- * automatically provide the caller's source file and line number for
- * 'where'.) */
void
-poll_fd_wait_at(int fd, short int events, const char *where)
+poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
+ poll_fd_subscribe_at(fd, 0, events, hint, where);
+}
+
+/* Deregisters a fd for OSes which have persistent IO event frameworks */
+
+void
+poll_fd_deregister_at(int fd, const char *where)
{
- poll_create_node(fd, 0, events, where);
+ struct poll_loop *loop = poll_loop();
+ struct poll_node *node, *next;
+
+ VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
+ HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
+ if (fd == node->pollfd.fd) {
+#ifdef OVS_USE_EPOLL
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
+ hmap_remove(&loop->poll_nodes, &node->hmap_node);
+ free(node);
+ return;
+ }
+ }
}
+
+void
+poll_fd_wait_at(int fd, int events, const char *where)
+{
+#ifdef OVS_USE_EPOLL
+ /* on linux all pollfds are registered with epoll at creation for POLLIN */
+ if (!(events & (OVS_POLLOUT | OVS_ONESHOT)))
+ return;
+#endif
+ poll_fd_subscribe_at(fd, 0, events, NULL, where);
+}
+
+
#ifdef _WIN32
/* Registers for the next call to poll_block() to wake up when 'wevent' is
* signaled.
@@ -163,7 +207,7 @@ poll_fd_wait_at(int fd, short int events, const char *where)
void
poll_wevent_wait_at(HANDLE wevent, const char *where)
{
- poll_create_node(0, wevent, 0, where);
+ poll_fd_subscribe_at(0, wevent, 0, NULL, where);
}
#endif /* _WIN32 */
@@ -277,9 +321,12 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
if (pollfd->revents & OVS_POLLHUP) {
ds_put_cstr(&s, "[OVS_POLLHUP]");
}
+#ifndef OVS_USE_EPOLL
+ /* epoll does not have NVAL */
if (pollfd->revents & OVS_POLLNVAL) {
ds_put_cstr(&s, "[OVS_POLLNVAL]");
}
+#endif
ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description);
free(description);
} else {
@@ -295,12 +342,16 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
ds_destroy(&s);
}
+
static void
free_poll_nodes(struct poll_loop *loop)
{
struct poll_node *node, *next;
HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
+#ifdef OVS_USE_EPOLL
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
hmap_remove(&loop->poll_nodes, &node->hmap_node);
#ifdef _WIN32
if (node->wevent && node->pollfd.fd) {
@@ -320,11 +371,16 @@ poll_block(void)
{
struct poll_loop *loop = poll_loop();
struct poll_node *node;
+#ifndef OVS_USE_EPOLL
struct pollfd *pollfds;
+#endif
+#ifndef OVS_USE_EPOLL
HANDLE *wevents = NULL;
+#endif
int elapsed;
int retval;
int i;
+ int counter;
/* Register fatal signal events before actually doing any real work for
* poll_block. */
@@ -335,54 +391,112 @@ poll_block(void)
}
timewarp_run();
- pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
+#ifdef OVS_USE_EPOLL
+ retval = time_epoll_wait(loop->epoll_fd,
+ (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS, loop->timeout_when, &elapsed);
+ if (retval < 0) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+ VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval));
+ } else if (!retval) {
+ log_wakeup(loop->timeout_where, NULL, elapsed);
+ } else {
+ for (i = 0; i < retval; i++) {
+ node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+ if (loop->epoll_events[i].events) {
+ node->pollfd.revents |= loop->epoll_events[i].events;
+ }
+ if (loop->epoll_events[i].events & OVS_POLLOUT) {
+ struct epoll_event event;
+
+ node->pollfd.events = OVS_POLLIN; /* reset back to defaults - write needs one shot */
+ event.events = node->pollfd.events;
+ event.data.ptr = node;
+ epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd, &event);
+ }
+ }
+ if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+ for (i = 0; i < retval; i++) {
+ node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+ if (loop->epoll_events[i].events) {
+ node->pollfd.revents = loop->epoll_events[i].events;
+ log_wakeup(node->where, &node->pollfd, 0);
+ }
+ }
+ }
+ }
+#else
+ pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
#ifdef _WIN32
wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
#endif
+
/* Populate with all the fds and events. */
- i = 0;
+ counter = 0;
HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
- pollfds[i] = node->pollfd;
+ if (node->pollfd.events) {
+ pollfds[counter] = node->pollfd;
#ifdef _WIN32
- wevents[i] = node->wevent;
- if (node->pollfd.fd && node->wevent) {
- short int wsa_events = 0;
- if (node->pollfd.events & OVS_POLLIN) {
- wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
- }
- if (node->pollfd.events & OVS_POLLOUT) {
- wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+ wevents[counter] = node->wevent;
+ if (node->pollfd.fd && node->wevent) {
+ short int wsa_events = 0;
+ if (node->pollfd.events & OVS_POLLIN) {
+ wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+ }
+ if (node->pollfd.events & OVS_POLLOUT) {
+ wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+ }
+ WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
}
- WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
- }
#endif
- i++;
+ counter++;
+ }
}
- retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
+ retval = time_poll(pollfds, counter, wevents,
loop->timeout_when, &elapsed);
if (retval < 0) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
- } else if (!retval) {
+ } else if (retval == 0) {
log_wakeup(loop->timeout_where, NULL, elapsed);
- } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
- i = 0;
- HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
+ } else {
+ for (i = 0; i < counter; i++) {
if (pollfds[i].revents) {
- log_wakeup(node->where, &pollfds[i], 0);
+
+ node = find_poll_node(loop, pollfds[i].fd, 0);
+
+ if (!node) {
+ VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
+ }
+
+ if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+ log_wakeup(node->where, &pollfds[i], 0);
+ }
+ /* update "requested" events. Note - old code had a logical error
+ * here - it was overwriting all events even if ONLY one type of event
+ * has occured. F.E. - poll reported only read, but it was overwriting
+ * a request for both read + write.
+ * If you see a regression - fix your caller, this the way it SHOULD be.
+ */
+ node->pollfd.events &= ~pollfds[i].revents;
+ /* update "occured" events for use by streams and handlers. In case there
+ * is an existing (but not consumed yet) event, we OR the events in the
+ * stored record with the new ones - it is the job of the stream to clear
+ * that.
+ */
+ node->pollfd.revents |= pollfds[i].revents;
}
- i++;
}
}
- free_poll_nodes(loop);
+ free(pollfds);
+ if (wevents)
+ free(wevents);
+#endif
loop->timeout_when = LLONG_MAX;
loop->timeout_where = NULL;
- free(pollfds);
- free(wevents);
/* Handle any pending signals before doing anything else. */
fatal_signal_run();
@@ -418,6 +532,9 @@ poll_loop(void)
loop->timeout_when = LLONG_MAX;
hmap_init(&loop->poll_nodes);
xpthread_setspecific(key, loop);
+#ifdef OVS_USE_EPOLL
+ loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS);
+#endif
}
return loop;
}
@@ -592,6 +592,8 @@ process_wait(struct process *p)
if (p->exited) {
poll_immediate_wake();
} else {
+ /* Register fd persistently where supported */
+ poll_fd_register(fds[0], OVS_POLLIN, NULL);
poll_fd_wait(fds[0], OVS_POLLIN);
}
#else
@@ -34,6 +34,7 @@
#include "ovs-router.h"
#include "packets.h"
#include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
#include "util.h"
VLOG_DEFINE_THIS_MODULE(route_table_bsd);
@@ -40,6 +40,8 @@ struct stream_fd
struct stream stream;
int fd;
int fd_type;
+ bool rx_ready, tx_ready;
+ struct pollfd *hint;
};
static const struct stream_class stream_fd_class;
@@ -67,7 +69,12 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
stream_init(&s->stream, &stream_fd_class, connect_status, name);
s->fd = fd;
s->fd_type = fd_type;
+ s->rx_ready = true;
+ s->tx_ready = true;
+ s->hint = NULL;
*streamp = &s->stream;
+ /* Persistent registration on OSes which support it */
+ poll_fd_register(s->fd, OVS_POLLIN, &s->hint);
return 0;
}
@@ -82,6 +89,8 @@ static void
fd_close(struct stream *stream)
{
struct stream_fd *s = stream_fd_cast(stream);
+ /* Deregister the FD from any persistent registrations if supported */
+ poll_fd_deregister(s->fd);
closesocket(s->fd);
free(s);
}
@@ -104,6 +113,19 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
ssize_t retval;
int error;
+ if (s->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (!s->rx_ready) {
+ if (!(s->hint->revents & OVS_POLLIN)) {
+ return -EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready */
+ s->rx_ready = true;
+ s->hint->revents &= ~OVS_POLLIN;
+ }
+ }
+ }
+
retval = recv(s->fd, buffer, n, 0);
if (retval < 0) {
error = sock_errno();
@@ -114,6 +136,8 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
+ } else {
+ s->rx_ready = false;
}
return -error;
}
@@ -127,9 +151,29 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
ssize_t retval;
int error;
+ if (s->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (!s->tx_ready) {
+ if (!(s->hint->revents & OVS_POLLOUT)) {
+ return -EAGAIN;
+ } else {
+ /* POLLOUT event from poll loop, mark us as ready */
+ s->tx_ready = true;
+ s->hint->revents &= ~OVS_POLLOUT;
+ }
+ }
+ }
retval = send(s->fd, buffer, n, 0);
if (retval < 0) {
error = sock_errno();
+#ifdef __linux__
+ /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN. Usually seen
+ * on unix domain sockets
+ */
+ if (error == ENOBUFS) {
+ error = EAGAIN;
+ }
+#endif
#ifdef _WIN32
if (error == WSAEWOULDBLOCK) {
error = EAGAIN;
@@ -137,6 +181,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
#endif
if (error != EAGAIN) {
VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
+ } else {
+ s->tx_ready = false;
}
return -error;
}
@@ -223,6 +269,8 @@ new_fd_pstream(char *name, int fd,
ps->accept_cb = accept_cb;
ps->unlink_path = unlink_path;
*pstreamp = &ps->pstream;
+ /* Register fd with any long term persistence frameworks if available */
+ poll_fd_register(ps->fd, OVS_POLLIN, NULL);
return 0;
}
@@ -230,6 +278,7 @@ static void
pfd_close(struct pstream *pstream)
{
struct fd_pstream *ps = fd_pstream_cast(pstream);
+ poll_fd_deregister(ps->fd);
closesocket(ps->fd);
maybe_unlink_and_free(ps->unlink_path);
free(ps);
@@ -147,6 +147,7 @@ struct ssl_stream
/* A few bytes of header data in case SSL negotiation fails. */
uint8_t head[2];
short int n_head;
+ struct pollfd *hint;
};
/* SSL context created by ssl_init(). */
@@ -310,6 +311,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
SSL_set_msg_callback_arg(ssl, sslv);
}
+
+ poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint);
*streamp = &sslv->stream;
free(server_name);
return 0;
@@ -604,6 +607,7 @@ ssl_close(struct stream *stream)
ERR_clear_error();
SSL_free(sslv->ssl);
+ poll_fd_deregister(sslv->fd);
closesocket(sslv->fd);
free(sslv);
}
@@ -697,6 +701,21 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
/* Behavior of zero-byte SSL_read is poorly defined. */
ovs_assert(n > 0);
+ if (sslv->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (sslv->rx_want == SSL_READING) {
+ if (!(sslv->hint->revents & OVS_POLLIN)) {
+ return -EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready
+ * rx_want is cleared further down by reading ssl fsm
+ */
+ sslv->hint->revents &= ~OVS_POLLIN;
+ }
+ }
+ }
+
+
old_state = SSL_get_state(sslv->ssl);
ret = SSL_read(sslv->ssl, buffer, n);
if (old_state != SSL_get_state(sslv->ssl)) {
@@ -729,6 +748,19 @@ ssl_do_tx(struct stream *stream)
{
struct ssl_stream *sslv = ssl_stream_cast(stream);
+ if (sslv->hint) {
+ /* poll-loop is providing us with hints for IO */
+ if (sslv->tx_want == SSL_WRITING) {
+ if (!(sslv->hint->revents & OVS_POLLOUT)) {
+ return EAGAIN;
+ } else {
+ /* POLLIN event from poll loop, mark us as ready
+ * rx_want is cleared further down by reading ssl fsm
+ */
+ sslv->hint->revents &= ~OVS_POLLOUT;
+ }
+ }
+ }
for (;;) {
int old_state = SSL_get_state(sslv->ssl);
int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
@@ -771,6 +803,8 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
ssl_clear_txbuf(sslv);
return n;
case EAGAIN:
+ /* we want to know when this fd will become available again */
+ stream_send_wait(stream);
return n;
default:
ssl_clear_txbuf(sslv);
@@ -911,6 +945,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
ds_steal_cstr(&bound_name));
pstream_set_bound_port(&pssl->pstream, htons(port));
pssl->fd = fd;
+ poll_fd_register(fd, OVS_POLLIN, NULL);
*pstreamp = &pssl->pstream;
return 0;
@@ -920,6 +955,7 @@ static void
pssl_close(struct pstream *pstream)
{
struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
+ poll_fd_deregister(pssl->fd);
closesocket(pssl->fd);
free(pssl);
}
@@ -47,6 +47,7 @@ unix_open(const char *name, char *suffix, struct stream **streamp,
char *connect_path;
int fd;
+
connect_path = abs_file_name(ovs_rundir(), suffix);
fd = make_unix_socket(SOCK_STREAM, true, NULL, connect_path);
@@ -38,6 +38,7 @@
#include "unixctl.h"
#include "util.h"
#include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
VLOG_DEFINE_THIS_MODULE(timeval);
@@ -369,6 +370,88 @@ time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE *handles OVS_UNUSED,
return retval;
}
+#ifdef OVS_USE_EPOLL
+
+/* Like epoll_wait(), except:
+ *
+ * - The timeout is specified as an absolute time, as defined by
+ * time_msec(), instead of a duration.
+ *
+ * - On error, returns a negative error code (instead of setting errno).
+ *
+ * - If interrupted by a signal, retries automatically until the original
+ * timeout is reached. (Because of this property, this function will
+ * never return -EINTR.)
+ *
+ * Stores the number of milliseconds elapsed during poll in '*elapsed'. */
+int
+time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+ long long int timeout_when, int *elapsed)
+{
+ long long int *last_wakeup = last_wakeup_get();
+ long long int start;
+ bool quiescent;
+ int retval = 0;
+
+ time_init();
+ coverage_clear();
+ coverage_run();
+ if (*last_wakeup && !thread_is_pmd()) {
+ log_poll_interval(*last_wakeup);
+ }
+ start = time_msec();
+
+ timeout_when = MIN(timeout_when, deadline);
+ quiescent = ovsrcu_is_quiescent();
+
+ for (;;) {
+ long long int now = time_msec();
+ int time_left;
+
+ if (now >= timeout_when) {
+ time_left = 0;
+ } else if ((unsigned long long int) timeout_when - now > INT_MAX) {
+ time_left = INT_MAX;
+ } else {
+ time_left = timeout_when - now;
+ }
+
+ if (!quiescent) {
+ if (!time_left) {
+ ovsrcu_quiesce();
+ } else {
+ ovsrcu_quiesce_start();
+ }
+ }
+
+ retval = epoll_wait(epoll_fd, events, max, time_left);
+ if (retval < 0) {
+ retval = -errno;
+ }
+
+ if (!quiescent && time_left) {
+ ovsrcu_quiesce_end();
+ }
+
+ if (deadline <= time_msec()) {
+ fatal_signal_handler(SIGALRM);
+ if (retval < 0) {
+ retval = 0;
+ }
+ break;
+ }
+
+ if (retval != -EINTR) {
+ break;
+ }
+ }
+ *last_wakeup = time_msec();
+ refresh_rusage();
+ *elapsed = *last_wakeup - start;
+ return retval;
+}
+#endif
+
long long int
timespec_to_msec(const struct timespec *ts)
{
@@ -20,6 +20,9 @@
#include <time.h>
#include "openvswitch/type-props.h"
#include "util.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
#ifdef __cplusplus
extern "C" {
@@ -61,6 +64,10 @@ void time_wall_timespec(struct timespec *);
void time_alarm(unsigned int secs);
int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles,
long long int timeout_when, int *elapsed);
+#ifdef __linux__
+int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+ long long int timeout_when, int *elapsed);
+#endif
long long int timespec_to_msec(const struct timespec *);
long long int timespec_to_usec(const struct timespec *);