diff mbox

[ovs-dev,RFC] named-pipes-windows: Add pool for pending connections

Message ID 20161207190043.8960-1-aserdean@cloudbasesolutions.com
State Not Applicable
Delegated to: Guru Shetty
Headers show

Commit Message

Alin Serdean Dec. 7, 2016, 7:01 p.m. UTC
Current named pipe implementation does not have a queue for pending connections,
similar to the *nix ‘listen(fd, 1)’.

This patch tries to tackle this drawback by creating a pool of instances.
On each connect event we will retry to recreate the instances that were used.

Creating a pool of instances means we cannot wait on the connection event
anymore, since we do not have a single fd(handle) to be signaled.

As a temporary solution, add a predefined timer to wake up and retry accepting
connections. Further improvement can be done by adding an input/output
completion port
(https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx)
over the pool instances and wait for it to be signaled.

Setting up current IOCPs will require an additional effort.

Signed-off-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions.com>

---
 lib/stream-windows.c | 139 ++++++++++++++++++++++++++++++++-------------------
 1 file changed, 87 insertions(+), 52 deletions(-)

-- 
2.10.2.windows.1
diff mbox

Patch

diff --git a/lib/stream-windows.c b/lib/stream-windows.c
index 637920b..7e7932d 100644
--- a/lib/stream-windows.c
+++ b/lib/stream-windows.c
@@ -26,6 +26,8 @@ 
 #include "stream-provider.h"
 #include "openvswitch/vlog.h"
 
+#define MAX_NUMBER_OF_INSTANCES 64
+
 VLOG_DEFINE_THIS_MODULE(stream_windows);
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
@@ -138,7 +140,6 @@  windows_open(const char *name, char *suffix, struct stream **streamp,
      * connect function.  Use overlapped flag and file no buffering to ensure
      * asynchronous operations. */
     npipe = create_snpipe(connect_path);
-
     if (npipe == INVALID_HANDLE_VALUE && GetLastError() == ERROR_PIPE_BUSY) {
         retry = true;
     }
@@ -172,6 +173,9 @@  windows_open(const char *name, char *suffix, struct stream **streamp,
     s->write_pending = false;
     s->retry_connect = retry;
     *streamp = &s->stream;
+    if (retry) {
+        return EAGAIN;
+    }
     return 0;
 }
 
@@ -326,7 +330,7 @@  windows_send(struct stream *stream, const void *buffer, size_t n)
         return -WSAECONNRESET;
     } else if (!result) {
         VLOG_ERR_RL(&rl, "Could not send data on synchronous named pipe. Last "
-                    "error: %s", ovs_lasterror_to_string());
+                    "error: %s.", ovs_lasterror_to_string());
         return -EINVAL;
     }
     return (retval > 0 ? retval : -EAGAIN);
@@ -372,13 +376,14 @@  const struct stream_class windows_stream_class = {
 struct pwindows_pstream
 {
     struct pstream pstream;
-    HANDLE fd;
+    HANDLE fd[MAX_NUMBER_OF_INSTANCES];
     /* Unlink path to be deleted during close. */
     char *unlink_path;
     /* Overlapped operation used for connect. */
-    OVERLAPPED connect;
+    OVERLAPPED connect[MAX_NUMBER_OF_INSTANCES];
     /* Flag to check if an operation is pending. */
-    bool pending;
+    bool pending[MAX_NUMBER_OF_INSTANCES];
+    HANDLE hEvents[MAX_NUMBER_OF_INSTANCES];
     /* String used to create the named pipe. */
     char *pipe_path;
 };
@@ -393,7 +398,8 @@  pwindows_pstream_cast(struct pstream *pstream)
 }
 
 /* Create a named pipe with read/write access, overlapped, message mode for
- * writing, byte mode for reading and with a maximum of 64 active instances. */
+ * writing, byte mode for reading and with a maximum of MAX_NUMBER_OF_INSTANCES
+ * active instances. */
 static HANDLE
 create_pnpipe(char *name)
 {
@@ -406,8 +412,8 @@  create_pnpipe(char *name)
         return INVALID_HANDLE_VALUE;
     }
     return CreateNamedPipe(name, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
-                           PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT,
-                           64, BUFSIZE, BUFSIZE, 0, &sa);
+                           PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
+                           PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 0, &sa);
 }
 
 /* Passive named pipe connect.  This function creates a new named pipe and
@@ -417,56 +423,68 @@  pwindows_accept(struct pstream *pstream, struct stream **new_streamp)
 {
     struct pwindows_pstream *p = pwindows_pstream_cast(pstream);
     DWORD last_error = 0;
-    DWORD cbRet;
+    DWORD ret_value;
     HANDLE npipe;
+    int cur_handle = 0;
+
+    DWORD number = WaitForMultipleObjects(MAX_NUMBER_OF_INSTANCES,
+                                          p->hEvents,
+                                          FALSE,
+                                          0);
+
+    cur_handle = number - WAIT_OBJECT_0;
+    if (cur_handle < 0 || cur_handle >(MAX_NUMBER_OF_INSTANCES - 1)) {
+        return EAGAIN;
+    }
+    if (p->fd[cur_handle] == INVALID_HANDLE_VALUE) {
+        npipe = create_pnpipe(p->pipe_path);
+        if (npipe != INVALID_HANDLE_VALUE) {
+            p->fd[cur_handle];
+            ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+        }
+        return EAGAIN;
+    }
 
     /* If the connect operation was pending, verify the result. */
-    if (p->pending) {
-        if (!GetOverlappedResult(p->fd, &p->connect, &cbRet, FALSE)) {
+    if (p->pending[cur_handle]) {
+        if (!GetOverlappedResult(p->fd, &p->connect[cur_handle], &ret_value, FALSE)) {
             last_error = GetLastError();
             if (last_error == ERROR_IO_INCOMPLETE) {
                 /* If the operation is still pending, retry again. */
-                p->pending = true;
+                p->pending[cur_handle] = true;
                 return EAGAIN;
             } else {
                 VLOG_ERR_RL(&rl, "Could not connect named pipe. Last "
                             "error: %s", ovs_lasterror_to_string());
-                DisconnectNamedPipe(p->fd);
-                return EINVAL;
+                DisconnectNamedPipe(p->fd[cur_handle]);
+                ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+                return EAGAIN;
             }
         }
-        p->pending = false;
+        p->pending[cur_handle] = false;
     }
 
-    if (!p->pending && !ConnectNamedPipe(p->fd, &p->connect)) {
+    if (!p->pending[cur_handle] && !ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle])) {
         last_error = GetLastError();
         if (last_error == ERROR_IO_PENDING) {
             /* Mark the accept operation as pending. */
-            p->pending = true;
+            p->pending[cur_handle] = true;
             return EAGAIN;
         } else if (last_error != ERROR_PIPE_CONNECTED) {
             VLOG_ERR_RL(&rl, "Could not connect synchronous named pipe. Last "
                         "error: %s", ovs_lasterror_to_string());
-            DisconnectNamedPipe(p->fd);
-            return EINVAL;
-        } else {
-            /* If the pipe is connected, signal an event. */
-            SetEvent(&p->connect.hEvent);
+            ResetEvent(p->connect[cur_handle].hEvent);
+            DisconnectNamedPipe(p->fd[cur_handle]);
+            ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+            return EAGAIN;
         }
     }
 
-    npipe = create_pnpipe(p->pipe_path);
-    if (npipe == INVALID_HANDLE_VALUE) {
-        VLOG_ERR_RL(&rl, "Could not create a new named pipe after connect. ",
-                    ovs_lasterror_to_string());
-        return ENOENT;
-    }
-
     /* Give the handle p->fd to the new created active stream and specify it
      * was created by an active stream. */
     struct windows_stream *p_temp = xmalloc(sizeof *p_temp);
     stream_init(&p_temp->stream, &windows_stream_class, 0, "unix");
-    p_temp->fd = p->fd;
+    p_temp->fd = p->fd[cur_handle];
     /* Specify it was created by a passive stream. */
     p_temp->server = true;
     /* Create events for read/write operations. */
@@ -481,10 +499,18 @@  pwindows_accept(struct pstream *pstream, struct stream **new_streamp)
     *new_streamp = &p_temp->stream;
 
     /* The passive handle p->fd will be the new created handle. */
-    p->fd = npipe;
-    memset(&p->connect, 0, sizeof(p->connect));
-    p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
-    p->pending = false;
+    npipe = create_pnpipe(p->pipe_path);
+    p->fd[cur_handle] = npipe;
+    CloseHandle(p->connect[cur_handle].hEvent);
+    memset(&p->connect[cur_handle], 0, sizeof(p->connect[cur_handle]));
+    p->pending[cur_handle] = false;
+    if (npipe != INVALID_HANDLE_VALUE) {
+        ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+        p->connect[cur_handle].hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
+    } else {
+        p->connect[cur_handle].hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+    }
+    p->hEvents[cur_handle] = p->connect[cur_handle].hEvent;
     return 0;
 }
 
@@ -493,9 +519,11 @@  static void
 pwindows_close(struct pstream *pstream)
 {
     struct pwindows_pstream *p = pwindows_pstream_cast(pstream);
-    DisconnectNamedPipe(p->fd);
-    CloseHandle(p->fd);
-    CloseHandle(p->connect.hEvent);
+    for (int i = 0; i <= MAX_NUMBER_OF_INSTANCES; i++){
+        DisconnectNamedPipe(p->fd[i]);
+        CloseHandle(p->fd[i]);
+        CloseHandle(p->connect[i].hEvent);
+    }
     maybe_unlink_and_free(p->unlink_path);
     free(p->pipe_path);
     free(p);
@@ -505,8 +533,8 @@  pwindows_close(struct pstream *pstream)
 static void
 pwindows_wait(struct pstream *pstream)
 {
-    struct pwindows_pstream *p = pwindows_pstream_cast(pstream);
-    poll_wevent_wait(p->connect.hEvent);
+    long long now = time_msec();
+    poll_timer_wait_until(now + 16);
 }
 
 /* Passive named pipe. */
@@ -515,7 +543,7 @@  pwindows_open(const char *name OVS_UNUSED, char *suffix,
               struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
 {
     char *bind_path;
-    int error;
+    int error, i;
     HANDLE npipe;
     char *orig_path;
 
@@ -543,22 +571,29 @@  pwindows_open(const char *name OVS_UNUSED, char *suffix,
     bind_path = xasprintf("%s%s", LOCAL_PREFIX, remove_slashes(path));
     free(path);
 
-    npipe = create_pnpipe(bind_path);
-
-    if (npipe == INVALID_HANDLE_VALUE) {
-        VLOG_ERR_RL(&rl, "Could not create named pipe. Last error: %s",
-                    ovs_lasterror_to_string());
-        return ENOENT;
-    }
-
     struct pwindows_pstream *p = xmalloc(sizeof *p);
     pstream_init(&p->pstream, &pwindows_pstream_class, name);
-    p->fd = npipe;
     p->unlink_path = orig_path;
-    memset(&p->connect, 0, sizeof(p->connect));
-    p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
-    p->pending = false;
     p->pipe_path = bind_path;
+    for (i = 0; i < MAX_NUMBER_OF_INSTANCES; i++) {
+        npipe = create_pnpipe(bind_path);
+        if (npipe == INVALID_HANDLE_VALUE) {
+            VLOG_DBG_RL(&rl, "Could not create named pipe. Last error: %s",
+                ovs_lasterror_to_string());
+            break;
+        }
+        p->fd[i] = npipe;
+        memset(&p->connect[i], 0, sizeof(p->connect[i]));
+        p->connect[i].hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
+        p->pending[i] = false;
+        p->hEvents[i] = p->connect[i].hEvent;
+        ConnectNamedPipe(p->fd[i], &p->connect[i]);
+    }
+    if (i == 0) {
+        free(p);
+        return ENOENT;
+    }
+
     *pstreamp = &p->pstream;
     return 0;
 }