@@ -26,6 +26,8 @@
#include "stream-provider.h"
#include "openvswitch/vlog.h"
+#define MAX_NUMBER_OF_INSTANCES 64
+
VLOG_DEFINE_THIS_MODULE(stream_windows);
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
@@ -138,7 +140,6 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
* connect function. Use overlapped flag and file no buffering to ensure
* asynchronous operations. */
npipe = create_snpipe(connect_path);
-
if (npipe == INVALID_HANDLE_VALUE && GetLastError() == ERROR_PIPE_BUSY) {
retry = true;
}
@@ -172,6 +173,9 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
s->write_pending = false;
s->retry_connect = retry;
*streamp = &s->stream;
+ if (retry) {
+ return EAGAIN;
+ }
return 0;
}
@@ -326,7 +330,7 @@ windows_send(struct stream *stream, const void *buffer, size_t n)
return -WSAECONNRESET;
} else if (!result) {
VLOG_ERR_RL(&rl, "Could not send data on synchronous named pipe. Last "
- "error: %s", ovs_lasterror_to_string());
+ "error: %s.", ovs_lasterror_to_string());
return -EINVAL;
}
return (retval > 0 ? retval : -EAGAIN);
@@ -372,13 +376,14 @@ const struct stream_class windows_stream_class = {
struct pwindows_pstream
{
struct pstream pstream;
- HANDLE fd;
+ HANDLE fd[MAX_NUMBER_OF_INSTANCES];
/* Unlink path to be deleted during close. */
char *unlink_path;
/* Overlapped operation used for connect. */
- OVERLAPPED connect;
+ OVERLAPPED connect[MAX_NUMBER_OF_INSTANCES];
/* Flag to check if an operation is pending. */
- bool pending;
+ bool pending[MAX_NUMBER_OF_INSTANCES];
+ HANDLE hEvents[MAX_NUMBER_OF_INSTANCES];
/* String used to create the named pipe. */
char *pipe_path;
};
@@ -393,7 +398,8 @@ pwindows_pstream_cast(struct pstream *pstream)
}
/* Create a named pipe with read/write access, overlapped, message mode for
- * writing, byte mode for reading and with a maximum of 64 active instances. */
+ * writing, byte mode for reading and with a maximum of MAX_NUMBER_OF_INSTANCES
+ * active instances. */
static HANDLE
create_pnpipe(char *name)
{
@@ -406,8 +412,8 @@ create_pnpipe(char *name)
return INVALID_HANDLE_VALUE;
}
return CreateNamedPipe(name, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT,
- 64, BUFSIZE, BUFSIZE, 0, &sa);
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
+ PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 0, &sa);
}
/* Passive named pipe connect. This function creates a new named pipe and
@@ -417,56 +423,68 @@ pwindows_accept(struct pstream *pstream, struct stream **new_streamp)
{
struct pwindows_pstream *p = pwindows_pstream_cast(pstream);
DWORD last_error = 0;
- DWORD cbRet;
+ DWORD ret_value;
HANDLE npipe;
+ int cur_handle = 0;
+
+ DWORD number = WaitForMultipleObjects(MAX_NUMBER_OF_INSTANCES,
+ p->hEvents,
+ FALSE,
+ 0);
+
+ cur_handle = number - WAIT_OBJECT_0;
+ if (cur_handle < 0 || cur_handle >(MAX_NUMBER_OF_INSTANCES - 1)) {
+ return EAGAIN;
+ }
+ if (p->fd[cur_handle] == INVALID_HANDLE_VALUE) {
+ npipe = create_pnpipe(p->pipe_path);
+ if (npipe != INVALID_HANDLE_VALUE) {
+ p->fd[cur_handle];
+ ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+ }
+ return EAGAIN;
+ }
/* If the connect operation was pending, verify the result. */
- if (p->pending) {
- if (!GetOverlappedResult(p->fd, &p->connect, &cbRet, FALSE)) {
+ if (p->pending[cur_handle]) {
+ if (!GetOverlappedResult(p->fd, &p->connect[cur_handle], &ret_value, FALSE)) {
last_error = GetLastError();
if (last_error == ERROR_IO_INCOMPLETE) {
/* If the operation is still pending, retry again. */
- p->pending = true;
+ p->pending[cur_handle] = true;
return EAGAIN;
} else {
VLOG_ERR_RL(&rl, "Could not connect named pipe. Last "
"error: %s", ovs_lasterror_to_string());
- DisconnectNamedPipe(p->fd);
- return EINVAL;
+ DisconnectNamedPipe(p->fd[cur_handle]);
+ ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+ return EAGAIN;
}
}
- p->pending = false;
+ p->pending[cur_handle] = false;
}
- if (!p->pending && !ConnectNamedPipe(p->fd, &p->connect)) {
+ if (!p->pending[cur_handle] && !ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle])) {
last_error = GetLastError();
if (last_error == ERROR_IO_PENDING) {
/* Mark the accept operation as pending. */
- p->pending = true;
+ p->pending[cur_handle] = true;
return EAGAIN;
} else if (last_error != ERROR_PIPE_CONNECTED) {
VLOG_ERR_RL(&rl, "Could not connect synchronous named pipe. Last "
"error: %s", ovs_lasterror_to_string());
- DisconnectNamedPipe(p->fd);
- return EINVAL;
- } else {
- /* If the pipe is connected, signal an event. */
- SetEvent(&p->connect.hEvent);
+ ResetEvent(p->connect[cur_handle].hEvent);
+ DisconnectNamedPipe(p->fd[cur_handle]);
+ ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+ return EAGAIN;
}
}
- npipe = create_pnpipe(p->pipe_path);
- if (npipe == INVALID_HANDLE_VALUE) {
- VLOG_ERR_RL(&rl, "Could not create a new named pipe after connect. ",
- ovs_lasterror_to_string());
- return ENOENT;
- }
-
/* Give the handle p->fd to the new created active stream and specify it
* was created by an active stream. */
struct windows_stream *p_temp = xmalloc(sizeof *p_temp);
stream_init(&p_temp->stream, &windows_stream_class, 0, "unix");
- p_temp->fd = p->fd;
+ p_temp->fd = p->fd[cur_handle];
/* Specify it was created by a passive stream. */
p_temp->server = true;
/* Create events for read/write operations. */
@@ -481,10 +499,18 @@ pwindows_accept(struct pstream *pstream, struct stream **new_streamp)
*new_streamp = &p_temp->stream;
/* The passive handle p->fd will be the new created handle. */
- p->fd = npipe;
- memset(&p->connect, 0, sizeof(p->connect));
- p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
- p->pending = false;
+ npipe = create_pnpipe(p->pipe_path);
+ p->fd[cur_handle] = npipe;
+ CloseHandle(p->connect[cur_handle].hEvent);
+ memset(&p->connect[cur_handle], 0, sizeof(p->connect[cur_handle]));
+ p->pending[cur_handle] = false;
+ if (npipe != INVALID_HANDLE_VALUE) {
+ ConnectNamedPipe(p->fd[cur_handle], &p->connect[cur_handle]);
+ p->connect[cur_handle].hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
+ } else {
+ p->connect[cur_handle].hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ }
+ p->hEvents[cur_handle] = p->connect[cur_handle].hEvent;
return 0;
}
@@ -493,9 +519,11 @@ static void
pwindows_close(struct pstream *pstream)
{
struct pwindows_pstream *p = pwindows_pstream_cast(pstream);
- DisconnectNamedPipe(p->fd);
- CloseHandle(p->fd);
- CloseHandle(p->connect.hEvent);
+ for (int i = 0; i <= MAX_NUMBER_OF_INSTANCES; i++){
+ DisconnectNamedPipe(p->fd[i]);
+ CloseHandle(p->fd[i]);
+ CloseHandle(p->connect[i].hEvent);
+ }
maybe_unlink_and_free(p->unlink_path);
free(p->pipe_path);
free(p);
@@ -505,8 +533,8 @@ pwindows_close(struct pstream *pstream)
static void
pwindows_wait(struct pstream *pstream)
{
- struct pwindows_pstream *p = pwindows_pstream_cast(pstream);
- poll_wevent_wait(p->connect.hEvent);
+ long long now = time_msec();
+ poll_timer_wait_until(now + 16);
}
/* Passive named pipe. */
@@ -515,7 +543,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix,
struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
{
char *bind_path;
- int error;
+ int error, i;
HANDLE npipe;
char *orig_path;
@@ -543,22 +571,29 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix,
bind_path = xasprintf("%s%s", LOCAL_PREFIX, remove_slashes(path));
free(path);
- npipe = create_pnpipe(bind_path);
-
- if (npipe == INVALID_HANDLE_VALUE) {
- VLOG_ERR_RL(&rl, "Could not create named pipe. Last error: %s",
- ovs_lasterror_to_string());
- return ENOENT;
- }
-
struct pwindows_pstream *p = xmalloc(sizeof *p);
pstream_init(&p->pstream, &pwindows_pstream_class, name);
- p->fd = npipe;
p->unlink_path = orig_path;
- memset(&p->connect, 0, sizeof(p->connect));
- p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
- p->pending = false;
p->pipe_path = bind_path;
+ for (i = 0; i < MAX_NUMBER_OF_INSTANCES; i++) {
+ npipe = create_pnpipe(bind_path);
+ if (npipe == INVALID_HANDLE_VALUE) {
+ VLOG_DBG_RL(&rl, "Could not create named pipe. Last error: %s",
+ ovs_lasterror_to_string());
+ break;
+ }
+ p->fd[i] = npipe;
+ memset(&p->connect[i], 0, sizeof(p->connect[i]));
+ p->connect[i].hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
+ p->pending[i] = false;
+ p->hEvents[i] = p->connect[i].hEvent;
+ ConnectNamedPipe(p->fd[i], &p->connect[i]);
+ }
+ if (i == 0) {
+ free(p);
+ return ENOENT;
+ }
+
*pstreamp = &p->pstream;
return 0;
}
Current named pipe implementation does not have a queue for pending connections, similar to the *nix ‘listen(fd, 1)’. This patch tries to tackle this drawback by creating a pool of instances. On each connect event we will retry to recreate the instances that were used. Creating a pool of instances means we cannot wait on the connection event anymore, since we do not have a single fd(handle) to be signaled. As a temporary solution, add a predefined timer to wake up and retry accepting connections. Further improvement can be done by adding an input/output completion port (https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx) over the pool instances and wait for it to be signaled. Setting up current IOCPs will require an additional effort. Signed-off-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions.com> --- lib/stream-windows.c | 139 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 87 insertions(+), 52 deletions(-) -- 2.10.2.windows.1