diff mbox series

[ovs-dev,3/3] Add file descriptor persistence

Message ID 20200207084033.23018-3-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,1/3] Replace direct use of POLLXXX macros with OVS_POLLXXX | expand

Commit Message

Anton Ivanov Feb. 7, 2020, 8:40 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

1. Provides signigificant performance improvement in large OVSDB
installs such as southdb in OVN by removing the current "feature" of
"trying to break an EAGAIN wall" in all streams. All streams will
attempt IO IF AND ONLY IF there is IO pending. If there is no IO
signalled by (e)poll they will not attempt any send/recv. This saves
600 microseconds on each iteration in json rpc server for 2000 sessions
(measured).

2. Allows support for persistent in-kernel frameworks which replace
poll on most OSes. These frameworks imply and require a persistent
fd design.

3. Adds support for EPOLL on Linux.

4. Fixes the logic in the current poll loop which claims to be "additive"
on input and output events, but in fact is not - they are wiped clean
every time and reinstated by upper layers after that.

5. Essential first step in setting up the IO loop to allow multi-threading
in SSL/JSON/IO processing (needed for northd and ovsdb in OVN) at a later
date.

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 include/openvswitch/poll-loop.h |  26 +++-
 lib/dpif-netlink.c              |   9 +-
 lib/fatal-signal.c              |   4 +
 lib/latch-unix.c                |   5 +-
 lib/netdev-afxdp.c              |   2 +-
 lib/netdev-bsd.c                |   5 +
 lib/netdev-linux.c              |   6 +
 lib/netlink-socket.c            |   4 +
 lib/poll-loop.c                 | 255 +++++++++++++++++++++++---------
 lib/process.c                   |   2 +
 lib/route-table-bsd.c           |   1 +
 lib/stream-fd.c                 |  49 ++++++
 lib/stream-ssl.c                |  36 +++++
 lib/stream-unix.c               |   1 +
 lib/timeval.c                   |  83 +++++++++++
 lib/timeval.h                   |   7 +
 16 files changed, 420 insertions(+), 75 deletions(-)
diff mbox series

Patch

diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
index 532d9caa6..2dda33c50 100644
--- a/include/openvswitch/poll-loop.h
+++ b/include/openvswitch/poll-loop.h
@@ -41,11 +41,29 @@ 
 #include <windows.h>
 #endif
 
+#ifdef __linux__
+#define OVS_USE_EPOLL
+#endif
+
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+
+#define OVS_POLLIN EPOLLIN
+#define OVS_POLLOUT EPOLLOUT
+#define OVS_POLLERR EPOLLERR
+#define OVS_POLLHUP EPOLLHUP
+#define OVS_ONESHOT EPOLLONESHOT
+
+#else
+
 #define OVS_POLLIN POLLIN
 #define OVS_POLLOUT POLLOUT
 #define OVS_POLLERR POLLERR
 #define OVS_POLLNVAL POLLNVAL
 #define OVS_POLLHUP POLLHUP
+#define OVS_ONESHOT (1U << 30)
+
+#endif 
 
 #ifdef  __cplusplus
 extern "C" {
@@ -61,7 +79,13 @@  extern "C" {
  * caller to supply a location explicitly, which is useful if the caller's own
  * caller would be more useful in log output.  See timer_wait_at() for an
  * example. */
-void poll_fd_wait_at(int fd, short int events, const char *where);
+void poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
+#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
+
+void poll_fd_deregister_at(int fd, const char *where);
+#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
+
+void poll_fd_wait_at(int fd, int events, const char *where);
 #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
 
 #ifdef _WIN32
diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index 5b5c96d72..9656553ce 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -1289,7 +1289,7 @@  dpif_netlink_port_poll_wait(const struct dpif *dpif_)
     const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
 
     if (dpif->port_notifier) {
-        nl_sock_wait(dpif->port_notifier, POLLIN);
+        nl_sock_wait(dpif->port_notifier, OVS_POLLIN);
     } else {
         poll_immediate_wake();
     }
@@ -2295,12 +2295,15 @@  static int
 dpif_netlink_handler_init(struct dpif_handler *handler)
 {
     handler->epoll_fd = epoll_create(10);
+    /* we should consider merging this into the main epoll loop on Linux */
+    poll_fd_register(handler->epoll_fd, OVS_POLLIN, NULL);
     return handler->epoll_fd < 0 ? errno : 0;
 }
 
 static void
 dpif_netlink_handler_uninit(struct dpif_handler *handler)
 {
+    poll_fd_deregister(handler->epoll_fd);
     close(handler->epoll_fd);
 }
 #endif
@@ -2756,13 +2759,13 @@  dpif_netlink_recv_wait__(struct dpif_netlink *dpif, uint32_t handler_id)
     }
 
     for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
-        nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
+        nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN);
     }
 #else
     if (dpif->handlers && handler_id < dpif->n_handlers) {
         struct dpif_handler *handler = &dpif->handlers[handler_id];
 
-        poll_fd_wait(handler->epoll_fd, POLLIN);
+        poll_fd_wait(handler->epoll_fd, OVS_POLLIN);
     }
 #endif
 }
diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
index 97d8d1dab..194d14d2f 100644
--- a/lib/fatal-signal.c
+++ b/lib/fatal-signal.c
@@ -96,6 +96,7 @@  fatal_signal_init(void)
         ovs_mutex_init_recursive(&mutex);
 #ifndef _WIN32
         xpipe_nonblocking(signal_fds);
+        poll_fd_register(signal_fds[0], OVS_POLLIN, NULL);
 #else
         wevent = CreateEvent(NULL, TRUE, FALSE, NULL);
         if (!wevent) {
@@ -236,9 +237,12 @@  void
 fatal_signal_run(void)
 {
     sig_atomic_t sig_nr;
+    char sigbuffer[_POSIX_PIPE_BUF];
 
     fatal_signal_init();
 
+    read(signal_fds[0], sigbuffer, sizeof(sigbuffer));
+
     sig_nr = stored_sig_nr;
     if (sig_nr != SIG_ATOMIC_MAX) {
         char namebuf[SIGNAL_NAME_BUFSIZE];
diff --git a/lib/latch-unix.c b/lib/latch-unix.c
index fea61ab28..7c21c7729 100644
--- a/lib/latch-unix.c
+++ b/lib/latch-unix.c
@@ -28,12 +28,14 @@  void
 latch_init(struct latch *latch)
 {
     xpipe_nonblocking(latch->fds);
+    poll_fd_register(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, NULL); /* register, but not set any events */
 }
 
 /* Destroys 'latch'. */
 void
 latch_destroy(struct latch *latch)
 {
+    poll_fd_deregister(latch->fds[0]);
     close(latch->fds[0]);
     close(latch->fds[1]);
 }
@@ -83,5 +85,6 @@  latch_is_set(const struct latch *latch)
 void
 latch_wait_at(const struct latch *latch, const char *where)
 {
-    poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where);
+    /* Ask for wait and make it one-shot if persistence is in play */
+    poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where);
 }
diff --git a/lib/netdev-afxdp.c b/lib/netdev-afxdp.c
index ef367e5ea..482400d8d 100644
--- a/lib/netdev-afxdp.c
+++ b/lib/netdev-afxdp.c
@@ -184,7 +184,7 @@  xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem,
 
     if (xsk_ring_prod__needs_wakeup(&umem->fq)) {
         pfd.fd = fd;
-        pfd.events = OVS_POLLIN;
+        pfd.events = POLLIN;
 
         ret = poll(&pfd, 1, 0);
         if (OVS_UNLIKELY(ret < 0)) {
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index 45385b187..8b5438203 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -365,6 +365,8 @@  netdev_bsd_construct_tap(struct netdev *netdev_)
     }
 
     netdev->kernel_name = kernel_name;
+    /* BSD does not implement this yet so this is a noop at this point */
+    poll_fd_register(netdev->tap_fd, OVS_POLLIN, NULL);
 
     return 0;
 
@@ -384,9 +386,11 @@  netdev_bsd_destruct(struct netdev *netdev_)
     cache_notifier_unref();
 
     if (netdev->tap_fd >= 0) {
+        poll_fd_deregister(netdev->tap_fd);
         destroy_tap(netdev->tap_fd, netdev_get_kernel_name(netdev_));
     }
     if (netdev->pcap) {
+        poll_fd_deregister(pcap_get_selectable_fd(pcap));
         pcap_close(netdev->pcap);
     }
     free(netdev->kernel_name);
@@ -459,6 +463,7 @@  netdev_bsd_open_pcap(const char *name, pcap_t **pcapp, int *fdp)
 
     *pcapp = pcap;
     *fdp = fd;
+    poll_fd_register(fd, OVS_POLLIN, NULL);
     return 0;
 
 error:
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index d0dc00d97..4af5d35b6 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -1149,6 +1149,10 @@  netdev_linux_rxq_construct(struct netdev_rxq *rxq_)
             goto error;
         }
     }
+    /* Register with persistent event frameworks if available - this
+     * should be able to grab both raw and tap cases
+     */
+    poll_fd_register(rx->fd, OVS_POLLIN, NULL);
     ovs_mutex_unlock(&netdev->mutex);
 
     return 0;
@@ -1167,6 +1171,8 @@  netdev_linux_rxq_destruct(struct netdev_rxq *rxq_)
     struct netdev_rxq_linux *rx = netdev_rxq_linux_cast(rxq_);
     int i;
 
+    poll_fd_deregister(rx->fd);
+
     if (!rx->is_tap) {
         close(rx->fd);
     }
diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
index 47077e947..2ec48aeb9 100644
--- a/lib/netlink-socket.c
+++ b/lib/netlink-socket.c
@@ -230,6 +230,9 @@  nl_sock_create(int protocol, struct nl_sock **sockp)
     sock->pid = local.nl_pid;
 #endif
 
+    /* Register socket persistently where supported */
+    poll_fd_register(sock->fd, OVS_POLLIN, NULL);
+
     *sockp = sock;
     return 0;
 
@@ -276,6 +279,7 @@  nl_sock_destroy(struct nl_sock *sock)
         }
         CloseHandle(sock->handle);
 #else
+        poll_fd_deregister(sock->fd);
         close(sock->fd);
 #endif
         free(sock);
diff --git a/lib/poll-loop.c b/lib/poll-loop.c
index 3902d6c1f..42d911271 100644
--- a/lib/poll-loop.c
+++ b/lib/poll-loop.c
@@ -18,6 +18,12 @@ 
 #include "openvswitch/poll-loop.h"
 #include <errno.h>
 #include <inttypes.h>
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifndef _WIN32
+#include <unistd.h>
+#endif
 #include <poll.h>
 #include <stdlib.h>
 #include <string.h>
@@ -39,12 +45,17 @@  COVERAGE_DEFINE(poll_create_node);
 COVERAGE_DEFINE(poll_zero_timeout);
 
 struct poll_node {
+#ifndef OVS_USE_EPOLL
+    struct ovs_list list_node;
+#endif
     struct hmap_node hmap_node;
     struct pollfd pollfd;       /* Events to pass to time_poll(). */
     HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
     const char *where;          /* Where poll_node was created. */
 };
 
+#define MAX_EPOLL_EVENTS 64
+
 struct poll_loop {
     /* All active poll waiters. */
     struct hmap poll_nodes;
@@ -53,6 +64,10 @@  struct poll_loop {
      * wake up immediately, or LLONG_MAX to wait forever. */
     long long int timeout_when; /* In msecs as returned by time_msec(). */
     const char *timeout_where;  /* Where 'timeout_when' was set. */
+#ifdef OVS_USE_EPOLL
+    int epoll_fd;
+    struct epoll_event epoll_events[MAX_EPOLL_EVENTS];
+#endif
 };
 
 static struct poll_loop *poll_loop(void);
@@ -76,79 +91,108 @@  find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
     }
     return NULL;
 }
-
-/* On Unix based systems:
- *
- *     Registers 'fd' as waiting for the specified 'events' (which should be
- *     OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to
- *     poll_block() will wake up when 'fd' becomes ready for one or more of the
- *     requested events. The 'fd's are given to poll() function later.
- *
- * On Windows system:
+/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
+ * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to poll_block() will
+ * wake up when 'fd' becomes ready for one or more of the requested events.
  *
- *     If 'fd' is specified, create a new 'wevent'. Association of 'fd' and
- *     'wevent' for 'events' happens in poll_block(). If 'wevent' is specified,
- *     it is assumed that it is unrelated to any sockets and poll_block()
- *     will wake up on any event on that 'wevent'. It is an error to pass
- *     both 'wevent' and 'fd'.
+ * The event registration is PERSISTENT. This is intended for OSes which have a persistent
+ * event framework. For now it is implemented only for epoll and Linux, other
+ * implementations such as BSD kqueue and Solaris /dev/poll may follow.
  *
- * The event registration is one-shot: only the following call to
- * poll_block() is affected.  The event will need to be re-registered after
- * poll_block() is called if it is to persist.
+ * If the OS has no persistent even framework does nothing
  *
  * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
  * automatically provide the caller's source file and line number for
  * 'where'.) */
+
 static void
-poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
+poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd **hint, const char *where)
 {
     struct poll_loop *loop = poll_loop();
     struct poll_node *node;
+#ifdef OVS_USE_EPOLL
+    struct epoll_event event;
+#endif
 
-    COVERAGE_INC(poll_create_node);
-
-    /* Both 'fd' and 'wevent' cannot be set. */
     ovs_assert(!fd != !wevent);
 
     /* Check for duplicate.  If found, "or" the events. */
     node = find_poll_node(loop, fd, wevent);
     if (node) {
-        node->pollfd.events |= events;
+#ifdef OVS_USE_EPOLL
+        int old_event_mask = node->pollfd.events;
+#endif
+        /* If there is an existing event mask we do not need to inc - this will be waited upon */
+        node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll specific bits */
+
+#ifdef OVS_USE_EPOLL
+        /* modify existing epoll entry if there is an epoll specific ask or if the
+         * mask has changed
+         */
+        if ((events & 0xFFFF0000) || (old_event_mask != node->pollfd.events)) {
+            event.events = node->pollfd.events | events;
+            event.data.ptr = node;
+            epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event);
+        }
+#endif
     } else {
         node = xzalloc(sizeof *node);
         hmap_insert(&loop->poll_nodes, &node->hmap_node,
-                    hash_2words(fd, (uint32_t)wevent));
+                    hash_2words(fd, 0));
         node->pollfd.fd = fd;
-        node->pollfd.events = events;
-#ifdef _WIN32
-        if (!wevent) {
-            wevent = CreateEvent(NULL, FALSE, FALSE, NULL);
-        }
-#endif
+        node->pollfd.events = (events & 0x0000FFFF);
         node->wevent = wevent;
         node->where = where;
+#ifdef OVS_USE_EPOLL
+        event.events = node->pollfd.events;
+        event.data.ptr = node;
+        epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &event);
+#endif
+    }
+    if (hint) {
+        *hint = &node->pollfd;
     }
 }
 
-/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
- * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to poll_block() will
- * wake up when 'fd' becomes ready for one or more of the requested events.
- *
- * On Windows, 'fd' must be a socket.
- *
- * The event registration is one-shot: only the following call to poll_block()
- * is affected.  The event will need to be re-registered after poll_block() is
- * called if it is to persist.
- *
- * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
- * automatically provide the caller's source file and line number for
- * 'where'.) */
 void
-poll_fd_wait_at(int fd, short int events, const char *where)
+poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
+    poll_fd_subscribe_at(fd, 0, events, hint, where);
+}
+
+/* Deregisters a fd for OSes which have persistent IO event frameworks */
+
+void
+poll_fd_deregister_at(int fd, const char *where)
 {
-    poll_create_node(fd, 0, events, where);
+    struct poll_loop *loop = poll_loop();
+    struct poll_node *node, *next;
+
+    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
+    HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
+        if (fd == node->pollfd.fd) {
+#ifdef OVS_USE_EPOLL
+            epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
+            hmap_remove(&loop->poll_nodes, &node->hmap_node);
+            free(node);
+            return;
+        }
+    }
 }
 
+
+void
+poll_fd_wait_at(int fd, int events, const char *where)
+{
+#ifdef OVS_USE_EPOLL
+    /* on linux all pollfds are registered with epoll at creation for POLLIN */
+    if (!(events & (OVS_POLLOUT | OVS_ONESHOT))) 
+        return;
+#endif
+    poll_fd_subscribe_at(fd, 0, events, NULL, where);
+}
+
+
 #ifdef _WIN32
 /* Registers for the next call to poll_block() to wake up when 'wevent' is
  * signaled.
@@ -163,7 +207,7 @@  poll_fd_wait_at(int fd, short int events, const char *where)
 void
 poll_wevent_wait_at(HANDLE wevent, const char *where)
 {
-    poll_create_node(0, wevent, 0, where);
+    poll_fd_subscribe_at(0, wevent, 0, NULL, where);
 }
 #endif /* _WIN32 */
 
@@ -277,9 +321,12 @@  log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
         if (pollfd->revents & OVS_POLLHUP) {
             ds_put_cstr(&s, "[OVS_POLLHUP]");
         }
+#ifndef OVS_USE_EPOLL
+        /* epoll does not have NVAL */
         if (pollfd->revents & OVS_POLLNVAL) {
             ds_put_cstr(&s, "[OVS_POLLNVAL]");
         }
+#endif
         ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description);
         free(description);
     } else {
@@ -295,12 +342,16 @@  log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
     ds_destroy(&s);
 }
 
+
 static void
 free_poll_nodes(struct poll_loop *loop)
 {
     struct poll_node *node, *next;
 
     HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
+#ifdef OVS_USE_EPOLL
+        epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
         hmap_remove(&loop->poll_nodes, &node->hmap_node);
 #ifdef _WIN32
         if (node->wevent && node->pollfd.fd) {
@@ -320,11 +371,16 @@  poll_block(void)
 {
     struct poll_loop *loop = poll_loop();
     struct poll_node *node;
+#ifndef OVS_USE_EPOLL
     struct pollfd *pollfds;
+#endif
+#ifndef OVS_USE_EPOLL
     HANDLE *wevents = NULL;
+#endif
     int elapsed;
     int retval;
     int i;
+    int counter;
 
     /* Register fatal signal events before actually doing any real work for
      * poll_block. */
@@ -335,54 +391,112 @@  poll_block(void)
     }
 
     timewarp_run();
-    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
 
+#ifdef OVS_USE_EPOLL
+    retval = time_epoll_wait(loop->epoll_fd,
+        (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS, loop->timeout_when, &elapsed);
+    if (retval < 0) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+        VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval));
+    } else if (!retval) {
+        log_wakeup(loop->timeout_where, NULL, elapsed);
+    } else {
+        for (i = 0; i < retval; i++) {
+            node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+            if (loop->epoll_events[i].events) {
+                node->pollfd.revents |= loop->epoll_events[i].events;
+            }
+            if (loop->epoll_events[i].events & OVS_POLLOUT) {
+                struct epoll_event event;
+
+                node->pollfd.events = OVS_POLLIN; /* reset back to defaults - write needs one shot */
+                event.events = node->pollfd.events;
+                event.data.ptr = node;
+                epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd, &event);
+            }
+        }
+        if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+            for (i = 0; i < retval; i++) {
+                node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+                if (loop->epoll_events[i].events) {
+                    node->pollfd.revents = loop->epoll_events[i].events;
+                    log_wakeup(node->where, &node->pollfd, 0);
+                }
+            }
+        }
+    }
+#else
+    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
 #ifdef _WIN32
     wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
 #endif
 
+
     /* Populate with all the fds and events. */
-    i = 0;
+    counter = 0;
     HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
-        pollfds[i] = node->pollfd;
+        if (node->pollfd.events) {
+            pollfds[counter] = node->pollfd;
 #ifdef _WIN32
-        wevents[i] = node->wevent;
-        if (node->pollfd.fd && node->wevent) {
-            short int wsa_events = 0;
-            if (node->pollfd.events & OVS_POLLIN) {
-                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
-            }
-            if (node->pollfd.events & OVS_POLLOUT) {
-                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+            wevents[counter] = node->wevent;
+            if (node->pollfd.fd && node->wevent) {
+                short int wsa_events = 0;
+                if (node->pollfd.events & OVS_POLLIN) {
+                    wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+                }
+                if (node->pollfd.events & OVS_POLLOUT) {
+                    wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+                }
+                WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
             }
-            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
-        }
 #endif
-        i++;
+            counter++;
+        }
     }
 
-    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
+    retval = time_poll(pollfds, counter, wevents,
                        loop->timeout_when, &elapsed);
     if (retval < 0) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
         VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
-    } else if (!retval) {
+    } else if (retval == 0) {
         log_wakeup(loop->timeout_where, NULL, elapsed);
-    } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
-        i = 0;
-        HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
+    } else {
+        for (i = 0; i < counter; i++) {
             if (pollfds[i].revents) {
-                log_wakeup(node->where, &pollfds[i], 0);
+
+                node = find_poll_node(loop, pollfds[i].fd, 0);
+
+                if (!node) {
+                    VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
+                }
+
+                if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+                    log_wakeup(node->where, &pollfds[i], 0);
+                }
+                /* update "requested" events. Note - old code had a logical error
+                 * here - it was overwriting all events even if ONLY one type of event
+                 * has occured. F.E. - poll reported only read, but it was overwriting
+                 *  a request for both read + write.
+                 * If you see a regression - fix your caller, this the way it SHOULD be.
+                 */
+                node->pollfd.events &= ~pollfds[i].revents;
+                /* update "occured" events for use by streams and handlers. In case there
+                 * is an existing (but not consumed yet) event, we OR the events in the
+                 * stored record with the new ones - it is the job of the stream to clear
+                 * that.
+                 */
+                node->pollfd.revents |= pollfds[i].revents;
             }
-            i++;
         }
     }
 
-    free_poll_nodes(loop);
+    free(pollfds);
+    if (wevents)
+        free(wevents);
+#endif
     loop->timeout_when = LLONG_MAX;
     loop->timeout_where = NULL;
-    free(pollfds);
-    free(wevents);
 
     /* Handle any pending signals before doing anything else. */
     fatal_signal_run();
@@ -418,6 +532,9 @@  poll_loop(void)
         loop->timeout_when = LLONG_MAX;
         hmap_init(&loop->poll_nodes);
         xpthread_setspecific(key, loop);
+#ifdef OVS_USE_EPOLL
+        loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS);
+#endif
     }
     return loop;
 }
diff --git a/lib/process.c b/lib/process.c
index 7a7f182e1..b7830a6e4 100644
--- a/lib/process.c
+++ b/lib/process.c
@@ -592,6 +592,8 @@  process_wait(struct process *p)
     if (p->exited) {
         poll_immediate_wake();
     } else {
+        /* Register fd persistently where supported */
+        poll_fd_register(fds[0], OVS_POLLIN, NULL);
         poll_fd_wait(fds[0], OVS_POLLIN);
     }
 #else
diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c
index 3dfa80c7f..16d155989 100644
--- a/lib/route-table-bsd.c
+++ b/lib/route-table-bsd.c
@@ -34,6 +34,7 @@ 
 #include "ovs-router.h"
 #include "packets.h"
 #include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
 #include "util.h"
 
 VLOG_DEFINE_THIS_MODULE(route_table_bsd);
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 62f768d45..faa07f082 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -40,6 +40,8 @@  struct stream_fd
     struct stream stream;
     int fd;
     int fd_type;
+    bool rx_ready, tx_ready;
+    struct pollfd *hint;
 };
 
 static const struct stream_class stream_fd_class;
@@ -67,7 +69,12 @@  new_fd_stream(char *name, int fd, int connect_status, int fd_type,
     stream_init(&s->stream, &stream_fd_class, connect_status, name);
     s->fd = fd;
     s->fd_type = fd_type;
+    s->rx_ready = true;
+    s->tx_ready = true;
+    s->hint = NULL;
     *streamp = &s->stream;
+    /* Persistent registration on OSes which support it */
+    poll_fd_register(s->fd, OVS_POLLIN, &s->hint);
     return 0;
 }
 
@@ -82,6 +89,8 @@  static void
 fd_close(struct stream *stream)
 {
     struct stream_fd *s = stream_fd_cast(stream);
+    /* Deregister the FD from any persistent registrations if supported */
+    poll_fd_deregister(s->fd);
     closesocket(s->fd);
     free(s);
 }
@@ -104,6 +113,19 @@  fd_recv(struct stream *stream, void *buffer, size_t n)
     ssize_t retval;
     int error;
 
+    if (s->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (!s->rx_ready) {
+            if (!(s->hint->revents & OVS_POLLIN)) {
+                return -EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready */
+                s->rx_ready = true;
+                s->hint->revents &= ~OVS_POLLIN;
+            }
+        }
+    }
+
     retval = recv(s->fd, buffer, n, 0);
     if (retval < 0) {
         error = sock_errno();
@@ -114,6 +136,8 @@  fd_recv(struct stream *stream, void *buffer, size_t n)
 #endif
         if (error != EAGAIN) {
             VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
+        } else {
+            s->rx_ready = false;
         }
         return -error;
     }
@@ -127,9 +151,29 @@  fd_send(struct stream *stream, const void *buffer, size_t n)
     ssize_t retval;
     int error;
 
+    if (s->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (!s->tx_ready) {
+            if (!(s->hint->revents & OVS_POLLOUT)) {
+                return -EAGAIN;
+            } else {
+                /* POLLOUT event from poll loop, mark us as ready */
+                s->tx_ready = true;
+                s->hint->revents &= ~OVS_POLLOUT;
+            }
+        }
+    }
     retval = send(s->fd, buffer, n, 0);
     if (retval < 0) {
         error = sock_errno();
+#ifdef __linux__
+        /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN. Usually seen
+         *  on unix domain sockets 
+         */
+        if (error == ENOBUFS) {
+           error = EAGAIN;
+        }
+#endif
 #ifdef _WIN32
         if (error == WSAEWOULDBLOCK) {
            error = EAGAIN;
@@ -137,6 +181,8 @@  fd_send(struct stream *stream, const void *buffer, size_t n)
 #endif
         if (error != EAGAIN) {
             VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
+        } else {
+            s->tx_ready = false;
         }
         return -error;
     }
@@ -223,6 +269,8 @@  new_fd_pstream(char *name, int fd,
     ps->accept_cb = accept_cb;
     ps->unlink_path = unlink_path;
     *pstreamp = &ps->pstream;
+    /* Register fd with any long term persistence frameworks if available */
+    poll_fd_register(ps->fd, OVS_POLLIN, NULL);
     return 0;
 }
 
@@ -230,6 +278,7 @@  static void
 pfd_close(struct pstream *pstream)
 {
     struct fd_pstream *ps = fd_pstream_cast(pstream);
+    poll_fd_deregister(ps->fd);
     closesocket(ps->fd);
     maybe_unlink_and_free(ps->unlink_path);
     free(ps);
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 3b7f9865e..939b0368e 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -147,6 +147,7 @@  struct ssl_stream
     /* A few bytes of header data in case SSL negotiation fails. */
     uint8_t head[2];
     short int n_head;
+    struct pollfd *hint;
 };
 
 /* SSL context created by ssl_init(). */
@@ -310,6 +311,8 @@  new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
         SSL_set_msg_callback_arg(ssl, sslv);
     }
 
+
+    poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint);
     *streamp = &sslv->stream;
     free(server_name);
     return 0;
@@ -604,6 +607,7 @@  ssl_close(struct stream *stream)
     ERR_clear_error();
 
     SSL_free(sslv->ssl);
+    poll_fd_deregister(sslv->fd);
     closesocket(sslv->fd);
     free(sslv);
 }
@@ -697,6 +701,21 @@  ssl_recv(struct stream *stream, void *buffer, size_t n)
     /* Behavior of zero-byte SSL_read is poorly defined. */
     ovs_assert(n > 0);
 
+     if (sslv->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (sslv->rx_want == SSL_READING) {
+            if (!(sslv->hint->revents & OVS_POLLIN)) {
+                return -EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready 
+                 * rx_want is cleared further down by reading ssl fsm
+                 */
+                sslv->hint->revents &= ~OVS_POLLIN;
+            }
+        }
+    }
+
+
     old_state = SSL_get_state(sslv->ssl);
     ret = SSL_read(sslv->ssl, buffer, n);
     if (old_state != SSL_get_state(sslv->ssl)) {
@@ -729,6 +748,19 @@  ssl_do_tx(struct stream *stream)
 {
     struct ssl_stream *sslv = ssl_stream_cast(stream);
 
+     if (sslv->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (sslv->tx_want == SSL_WRITING) {
+            if (!(sslv->hint->revents & OVS_POLLOUT)) {
+                return EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready 
+                 * rx_want is cleared further down by reading ssl fsm
+                 */
+                sslv->hint->revents &= ~OVS_POLLOUT;
+            }
+        }
+    }
     for (;;) {
         int old_state = SSL_get_state(sslv->ssl);
         int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
@@ -771,6 +803,8 @@  ssl_send(struct stream *stream, const void *buffer, size_t n)
             ssl_clear_txbuf(sslv);
             return n;
         case EAGAIN:
+            /* we want to know when this fd will become available again */
+            stream_send_wait(stream);
             return n;
         default:
             ssl_clear_txbuf(sslv);
@@ -911,6 +945,7 @@  pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
                  ds_steal_cstr(&bound_name));
     pstream_set_bound_port(&pssl->pstream, htons(port));
     pssl->fd = fd;
+    poll_fd_register(fd, OVS_POLLIN, NULL);
     *pstreamp = &pssl->pstream;
 
     return 0;
@@ -920,6 +955,7 @@  static void
 pssl_close(struct pstream *pstream)
 {
     struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
+    poll_fd_deregister(pssl->fd);
     closesocket(pssl->fd);
     free(pssl);
 }
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index d265efb83..866dd340f 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -47,6 +47,7 @@  unix_open(const char *name, char *suffix, struct stream **streamp,
     char *connect_path;
     int fd;
 
+
     connect_path = abs_file_name(ovs_rundir(), suffix);
     fd = make_unix_socket(SOCK_STREAM, true, NULL, connect_path);
 
diff --git a/lib/timeval.c b/lib/timeval.c
index 193c7bab1..59a12414f 100644
--- a/lib/timeval.c
+++ b/lib/timeval.c
@@ -38,6 +38,7 @@ 
 #include "unixctl.h"
 #include "util.h"
 #include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
 
 VLOG_DEFINE_THIS_MODULE(timeval);
 
@@ -369,6 +370,88 @@  time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE *handles OVS_UNUSED,
     return retval;
 }
 
+#ifdef OVS_USE_EPOLL
+
+/* Like epoll_wait(), except:
+ *
+ *      - The timeout is specified as an absolute time, as defined by
+ *        time_msec(), instead of a duration.
+ *
+ *      - On error, returns a negative error code (instead of setting errno).
+ *
+ *      - If interrupted by a signal, retries automatically until the original
+ *        timeout is reached.  (Because of this property, this function will
+ *        never return -EINTR.)
+ *
+ * Stores the number of milliseconds elapsed during poll in '*elapsed'. */
+int
+time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+          long long int timeout_when, int *elapsed)
+{
+    long long int *last_wakeup = last_wakeup_get();
+    long long int start;
+    bool quiescent;
+    int retval = 0;
+
+    time_init();
+    coverage_clear();
+    coverage_run();
+    if (*last_wakeup && !thread_is_pmd()) {
+        log_poll_interval(*last_wakeup);
+    }
+    start = time_msec();
+
+    timeout_when = MIN(timeout_when, deadline);
+    quiescent = ovsrcu_is_quiescent();
+
+    for (;;) {
+        long long int now = time_msec();
+        int time_left;
+
+        if (now >= timeout_when) {
+            time_left = 0;
+        } else if ((unsigned long long int) timeout_when - now > INT_MAX) {
+            time_left = INT_MAX;
+        } else {
+            time_left = timeout_when - now;
+        }
+
+        if (!quiescent) {
+            if (!time_left) {
+                ovsrcu_quiesce();
+            } else {
+                ovsrcu_quiesce_start();
+            }
+        }
+
+        retval = epoll_wait(epoll_fd, events, max, time_left);
+        if (retval < 0) {
+            retval = -errno;
+        }
+
+        if (!quiescent && time_left) {
+            ovsrcu_quiesce_end();
+        }
+
+        if (deadline <= time_msec()) {
+            fatal_signal_handler(SIGALRM);
+            if (retval < 0) {
+                retval = 0;
+            }
+            break;
+        }
+
+        if (retval != -EINTR) {
+            break;
+        }
+    }
+    *last_wakeup = time_msec();
+    refresh_rusage();
+    *elapsed = *last_wakeup - start;
+    return retval;
+}
+#endif
+
 long long int
 timespec_to_msec(const struct timespec *ts)
 {
diff --git a/lib/timeval.h b/lib/timeval.h
index 502f703d4..347a09d63 100644
--- a/lib/timeval.h
+++ b/lib/timeval.h
@@ -20,6 +20,9 @@ 
 #include <time.h>
 #include "openvswitch/type-props.h"
 #include "util.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
 
 #ifdef  __cplusplus
 extern "C" {
@@ -61,6 +64,10 @@  void time_wall_timespec(struct timespec *);
 void time_alarm(unsigned int secs);
 int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles,
               long long int timeout_when, int *elapsed);
+#ifdef __linux__
+int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+          long long int timeout_when, int *elapsed);
+#endif
 
 long long int timespec_to_msec(const struct timespec *);
 long long int timespec_to_usec(const struct timespec *);