Message ID | 1483456542-7522-3-git-send-email-abalutoiu@cloudbasesolutions.com |
---|---|
State | Superseded |
Headers | show |
Please ignore this patch, I will send another one soon. Thanks, Alin Balutoiu. > -----Original Message----- > From: Alin Balutoiu > Sent: Tuesday, January 3, 2017 5:17 PM > To: dev@openvswitch.org > Cc: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>; Paul Boca > <pboca@cloudbasesolutions.com> > Subject: [PATCH V2 2/5] Python tests: Ported UNIX sockets to Windows > > From: Alin Balutoiu <abalutoiu@cloudbasesolutions.com> > > Unix sockets (AF_UNIX) are not supported on Windows. > The replacement of Unix sockets on Windows is implemented using named > pipes, we are trying to mimic the behaviour of unix sockets. > > Instead of using Unix sockets to communicate between components Named > Pipes are used. This makes the python sockets compatible with the Named > Pipe used in Windows applications. > > Signed-off-by: Paul-Daniel Boca <pboca@cloudbasesolutions.com> > Signed-off-by: Alin-Gheorghe Balutoiu <abalutoiu@cloudbasesolutions.com> > --- > V2: No changes. > --- > python/ovs/jsonrpc.py | 6 + > python/ovs/poller.py | 72 ++++++--- > python/ovs/socket_util.py | 31 +++- > python/ovs/stream.py | 351 > ++++++++++++++++++++++++++++++++++++++++--- > python/ovs/unixctl/server.py | 4 + > tests/test-jsonrpc.py | 16 +- > 6 files changed, 436 insertions(+), 44 deletions(-) > > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index > 6300c67..5a11500 100644 > --- a/python/ovs/jsonrpc.py > +++ b/python/ovs/jsonrpc.py > @@ -14,6 +14,7 @@ > > import errno > import os > +import sys > > import six > > @@ -274,6 +275,11 @@ class Connection(object): > except UnicodeError: > error = errno.EILSEQ > if error: > + if (sys.platform == "win32" and > + error == errno.WSAEWOULDBLOCK): > + # WSAEWOULDBLOCK would be the equivalent on Windows > + # for EAGAIN on Unix. > + error = errno.EAGAIN > if error == errno.EAGAIN: > return error, None > else: > diff --git a/python/ovs/poller.py b/python/ovs/poller.py index > d7cb7d3..d836483 100644 > --- a/python/ovs/poller.py > +++ b/python/ovs/poller.py > @@ -18,6 +18,10 @@ import ovs.vlog > import select > import socket > import os > +import sys > + > +if sys.platform == "win32": > + import ovs.winutils as winutils > > try: > from OpenSSL import SSL > @@ -62,7 +66,9 @@ class _SelectSelect(object): > if SSL and isinstance(fd, SSL.Connection): > fd = fd.fileno() > > - assert isinstance(fd, int) > + if sys.platform != 'win32': > + # Skip this on Windows, it also register events > + assert isinstance(fd, int) > if events & POLLIN: > self.rlist.append(fd) > events &= ~POLLIN > @@ -73,28 +79,58 @@ class _SelectSelect(object): > self.xlist.append(fd) > > def poll(self, timeout): > - if timeout == -1: > - # epoll uses -1 for infinite timeout, select uses None. > - timeout = None > - else: > - timeout = float(timeout) / 1000 > # XXX workaround a bug in eventlet > # see https://github.com/eventlet/eventlet/pull/25 > if timeout == 0 and _using_eventlet_green_select(): > timeout = 0.1 > + if sys.platform == 'win32': > + events = self.rlist + self.wlist + self.xlist > + if not events: > + return [] > + if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS: > + raise WindowsError("Cannot handle more than maximum wait" > + "objects\n") > + > + # win32event.INFINITE timeout is -1 > + # timeout must be an int number, expressed in ms > + if timeout == 0.1: > + timeout = 100 > + else: > + timeout = int(timeout) > + > + # Wait until any of the events is set to signaled > + try: > + retval = winutils.win32event.WaitForMultipleObjects( > + events, > + False, # Wait all > + timeout) > + except winutils.pywintypes.error: > + return [(0, POLLERR)] > > - rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist, > - timeout) > - events_dict = {} > - for fd in rlist: > - events_dict[fd] = events_dict.get(fd, 0) | POLLIN > - for fd in wlist: > - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT > - for fd in xlist: > - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | > - POLLHUP | > - POLLNVAL) > - return list(events_dict.items()) > + if retval == winutils.winerror.WAIT_TIMEOUT: > + return [] > + > + return [(events[retval], 0)] > + else: > + if timeout == -1: > + # epoll uses -1 for infinite timeout, select uses None. > + timeout = None > + else: > + timeout = float(timeout) / 1000 > + rlist, wlist, xlist = select.select(self.rlist, > + self.wlist, > + self.xlist, > + timeout) > + events_dict = {} > + for fd in rlist: > + events_dict[fd] = events_dict.get(fd, 0) | POLLIN > + for fd in wlist: > + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT > + for fd in xlist: > + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | > + POLLHUP | > + POLLNVAL) > + return list(events_dict.items()) > > > SelectPoll = _SelectSelect > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py index > b358b05..fb6cee4 100644 > --- a/python/ovs/socket_util.py > +++ b/python/ovs/socket_util.py > @@ -17,6 +17,7 @@ import os > import os.path > import random > import socket > +import sys > > import six > from six.moves import range > @@ -25,6 +26,10 @@ import ovs.fatal_signal import ovs.poller import > ovs.vlog > > +if sys.platform == 'win32': > + import ovs.winutils as winutils > + import win32file > + > vlog = ovs.vlog.Vlog("socket_util") > > > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, > connect_path, short=False): > > def check_connection_completion(sock): > p = ovs.poller.SelectPoll() > - p.register(sock, ovs.poller.POLLOUT) > + if sys.platform == "win32": > + event = winutils.get_new_event(None, False, True, None) > + # Receive notification of readiness for writing, of completed > + # connection or multipoint join operation, and of socket closure. > + win32file.WSAEventSelect(sock, event, > + win32file.FD_WRITE | > + win32file.FD_CONNECT | > + win32file.FD_CLOSE) > + p.register(event, ovs.poller.POLLOUT) > + else: > + p.register(sock, ovs.poller.POLLOUT) > pfds = p.poll(0) > if len(pfds) == 1: > revents = pfds[0][1] > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, > dscp): > try: > sock.connect(address) > except socket.error as e: > - if get_exception_errno(e) != errno.EINPROGRESS: > + error = get_exception_errno(e) > + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: > + # WSAEWOULDBLOCK would be the equivalent on Windows > + # for EINPROGRESS on Unix. > + error = errno.EINPROGRESS > + if error != errno.EINPROGRESS: > raise > return 0, sock > except socket.error as e: > @@ -257,9 +277,12 @@ def get_null_fd(): > global null_fd > if null_fd < 0: > try: > - null_fd = os.open("/dev/null", os.O_RDWR) > + # os.devnull ensures compatibility with Windows, returns > + # '/dev/null' for Unix and 'nul' for Windows > + null_fd = os.open(os.devnull, os.O_RDWR) > except OSError as e: > - vlog.err("could not open /dev/null: %s" % os.strerror(e.errno)) > + vlog.err("could not open %s: %s" % (os.devnull, > + os.strerror(e.errno))) > return -e.errno > return null_fd > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py index > cd57eb3..e8e5700 100644 > --- a/python/ovs/stream.py > +++ b/python/ovs/stream.py > @@ -15,6 +15,7 @@ > import errno > import os > import socket > +import sys > > import six > > @@ -27,6 +28,13 @@ try: > except ImportError: > SSL = None > > +if sys.platform == 'win32': > + import ovs.winutils as winutils > + import pywintypes > + import win32event > + import win32file > + import win32pipe > + > vlog = ovs.vlog.Vlog("stream") > > > @@ -63,6 +71,13 @@ class Stream(object): > _SSL_certificate_file = None > _SSL_ca_cert_file = None > > + # Windows only > + _write = None # overlapped for write operation > + _read = None # overlapped for read operation > + _write_pending = False > + _read_pending = False > + _retry_connect = False > + > @staticmethod > def register_method(method, cls): > Stream._SOCKET_METHODS[method + ":"] = cls @@ -81,8 +96,23 @@ > class Stream(object): > otherwise False.""" > return bool(Stream._find_method(name)) > > - def __init__(self, socket, name, status): > + def __init__(self, socket, name, status, pipe=None, is_server=False): > self.socket = socket > + self.pipe = pipe > + if sys.platform == 'win32': > + self._read = pywintypes.OVERLAPPED() > + self._read.hEvent = winutils.get_new_event() > + self._write = pywintypes.OVERLAPPED() > + self._write.hEvent = winutils.get_new_event() > + if pipe is not None: > + # Flag to check if fd is a server HANDLE. In the case of a > + # server handle we have to issue a disconnect before closing > + # the actual handle. > + self._server = is_server > + suffix = name.split(":", 1)[1] > + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > + self._pipename = winutils.get_pipe_name(suffix) > + > self.name = name > if status == errno.EAGAIN: > self.state = Stream.__S_CONNECTING @@ -120,6 +150,38 @@ class > Stream(object): > suffix = name.split(":", 1)[1] > if name.startswith("unix:"): > suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > + if sys.platform == 'win32': > + pipename = winutils.get_pipe_name(suffix) > + > + if len(suffix) > 255: > + # Return invalid argument if the name is too long > + return errno.ENOENT, None > + > + try: > + # In case of "unix:" argument, the assumption is that > + # there is a file created in the path (suffix). > + open(suffix, 'r').close() > + except: > + return errno.ENOENT, None > + > + try: > + npipe = winutils.create_file(pipename) > + try: > + winutils.set_pipe_mode(npipe, > + win32pipe.PIPE_READMODE_BYTE) > + except pywintypes.error as e: > + return errno.ENOENT, None > + except pywintypes.error as e: > + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: > + # Pipe is busy, set the retry flag to true and retry > + # again during the connect function. > + Stream.retry_connect = True > + return 0, cls(None, name, errno.EAGAIN, > + pipe=win32file.INVALID_HANDLE_VALUE, > + is_server=False) > + return errno.ENOENT, None > + return 0, cls(None, name, 0, pipe=npipe, > + is_server=False) > + > error, sock = cls._open(suffix, dscp) > if error: > return error, None > @@ -145,6 +207,10 @@ class Stream(object): > if not error: > while True: > error = stream.connect() > + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: > + # WSAEWOULDBLOCK would be the equivalent on Windows > + # for EAGAIN on Unix. > + error = errno.EAGAIN > if error != errno.EAGAIN: > break > stream.run() > @@ -152,7 +218,8 @@ class Stream(object): > stream.run_wait(poller) > stream.connect_wait(poller) > poller.block() > - assert error != errno.EINPROGRESS > + if stream.socket is not None: > + assert error != errno.EINPROGRESS > > if error and stream: > stream.close() > @@ -160,11 +227,35 @@ class Stream(object): > return error, stream > > def close(self): > - self.socket.close() > + if self.socket is not None: > + self.socket.close() > + if self.pipe is not None: > + if self._server: > + win32pipe.DisconnectNamedPipe(self.pipe) > + winutils.close_handle(self.pipe, vlog.warn) > + winutils.close_handle(self._read.hEvent, vlog.warn) > + winutils.close_handle(self._write.hEvent, vlog.warn) > > def __scs_connecting(self): > - retval = ovs.socket_util.check_connection_completion(self.socket) > - assert retval != errno.EINPROGRESS > + if self.socket is not None: > + retval = ovs.socket_util.check_connection_completion(self.socket) > + assert retval != errno.EINPROGRESS > + elif sys.platform == 'win32' and self.retry_connect: > + try: > + self.pipe = winutils.create_file(self._pipename) > + self._retry_connect = False > + retval = 0 > + except pywintypes.error as e: > + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: > + retval = errno.EAGAIN > + else: > + self._retry_connect = False > + retval = errno.ENOENT > + else: > + # Windows only, if retry_connect is false, it means it's already > + # connected so we can set the value of retval to 0 > + retval = 0 > + > if retval == 0: > self.state = Stream.__S_CONNECTED > elif retval != errno.EAGAIN: > @@ -209,11 +300,63 @@ class Stream(object): > elif n == 0: > return (0, "") > > + if sys.platform == 'win32' and self.socket is None: > + return self.__recv_windows(n) > + > try: > return (0, self.socket.recv(n)) > except socket.error as e: > return (ovs.socket_util.get_exception_errno(e), "") > > + def __recv_windows(self, n): > + if self._read_pending: > + try: > + nBytesRead = winutils.get_overlapped_result(self.pipe, > + self._read, > + False) > + self._read_pending = False > + recvBuffer = self._read_buffer[:nBytesRead] > + > + return (0, winutils.get_decoded_buffer(recvBuffer)) > + except pywintypes.error as e: > + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: > + # The operation is still pending, try again > + self._read_pending = True > + return (errno.EAGAIN, "") > + elif e.winerror in winutils.pipe_disconnected_errors: > + # If the pipe was disconnected, return 0. > + return (0, "") > + else: > + return (errno.EINVAL, "") > + > + (errCode, self._read_buffer) = winutils.read_file(self.pipe, > + n, > + self._read) > + if errCode: > + if errCode == winutils.winerror.ERROR_IO_PENDING: > + self._read_pending = True > + return (errno.EAGAIN, "") > + elif errCode in winutils.pipe_disconnected_errors: > + # If the pipe was disconnected, return 0. > + return (0, "") > + else: > + return (errCode, "") > + > + try: > + nBytesRead = winutils.get_overlapped_result(self.pipe, > + self._read, > + False) > + winutils.win32event.SetEvent(self._read.hEvent) > + except pywintypes.error as e: > + if e.winerror in winutils.pipe_disconnected_errors: > + # If the pipe was disconnected, return 0. > + return (0, "") > + else: > + return (e.winerror, "") > + > + recvBuffer = self._read_buffer[:nBytesRead] > + return (0, winutils.get_decoded_buffer(recvBuffer)) > + > def send(self, buf): > """Tries to send 'buf' on this stream. > > @@ -231,6 +374,9 @@ class Stream(object): > elif len(buf) == 0: > return 0 > > + if sys.platform == 'win32' and self.socket is None: > + return self.__send_windows(buf) > + > try: > # Python 3 has separate types for strings and bytes. We must have > # bytes here. > @@ -240,6 +386,40 @@ class Stream(object): > except socket.error as e: > return -ovs.socket_util.get_exception_errno(e) > > + def __send_windows(self, buf): > + if self._write_pending: > + try: > + nBytesWritten = winutils.get_overlapped_result(self.pipe, > + self._write, > + False) > + self._write_pending = False > + return nBytesWritten > + except pywintypes.error as e: > + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: > + # The operation is still pending, try again > + self._read_pending = True > + return -errno.EAGAIN > + elif e.winerror in winutils.pipe_disconnected_errors: > + # If the pipe was disconnected, return connection reset. > + return -errno.ECONNRESET > + else: > + return -errno.EINVAL > + > + buf = winutils.get_encoded_buffer(buf) > + self._write_pending = False > + (errCode, nBytesWritten) = winutils.write_file(self.pipe, > + buf, > + self._write) > + if errCode: > + if errCode == winutils.winerror.ERROR_IO_PENDING: > + self._write_pending = True > + return -errno.EAGAIN > + if (not nBytesWritten and > + errCode in winutils.pipe_disconnected_errors): > + # If the pipe was disconnected, return connection reset. > + return -errno.ECONNRESET > + return nBytesWritten > + > def run(self): > pass > > @@ -255,11 +435,52 @@ class Stream(object): > > if self.state == Stream.__S_CONNECTING: > wait = Stream.W_CONNECT > + > + if sys.platform == 'win32': > + self.__wait_windows(poller, wait) > + return > + > if wait == Stream.W_RECV: > poller.fd_wait(self.socket, ovs.poller.POLLIN) > else: > poller.fd_wait(self.socket, ovs.poller.POLLOUT) > > + def __wait_windows(self, poller, wait): > + if self.socket is not None: > + if wait == Stream.W_RECV: > + read_flags = (win32file.FD_READ | > + win32file.FD_ACCEPT | > + win32file.FD_CLOSE) > + try: > + win32file.WSAEventSelect(self.socket, > + self._read.hEvent, > + read_flags) > + except pywintypes.error as e: > + vlog.err("failed to associate events with socket: %s" > + % e.strerror) > + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) > + else: > + write_flags = (win32file.FD_WRITE | > + win32file.FD_CONNECT | > + win32file.FD_CLOSE) > + try: > + win32file.WSAEventSelect(self.socket, > + self._write.hEvent, > + write_flags) > + except pywintypes.error as e: > + vlog.err("failed to associate events with socket: %s" > + % e.strerror) > + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) > + else: > + if wait == Stream.W_RECV: > + if self._read: > + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) > + elif wait == Stream.W_SEND: > + if self._write: > + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) > + elif wait == Stream.W_CONNECT: > + return > + > def connect_wait(self, poller): > self.wait(poller, Stream.W_CONNECT) > > @@ -267,11 +488,22 @@ class Stream(object): > self.wait(poller, Stream.W_RECV) > > def send_wait(self, poller): > + if sys.platform == 'win32': > + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) > self.wait(poller, Stream.W_SEND) > > def __del__(self): > # Don't delete the file: we might have forked. > - self.socket.close() > + if self.socket is not None: > + self.socket.close() > + if self.pipe is not None: > + # Check if there are any remaining valid handles and close them > + if self.pipe: > + winutils.close_handle(self.pipe) > + if self._read.hEvent: > + winutils.close_handle(self._read.hEvent) > + if self._write.hEvent: > + winutils.close_handle(self._write.hEvent) > > @staticmethod > def ssl_set_private_key_file(file_name): > @@ -287,6 +519,10 @@ class Stream(object): > > > class PassiveStream(object): > + # Windows only > + connect = None # overlapped for read operation > + connect_pending = False > + > @staticmethod > def is_valid_name(name): > """Returns True if 'name' is a passive stream name in the form @@ - > 294,9 +530,18 @@ class PassiveStream(object): > "punix:" or "ptcp"), otherwise False.""" > return name.startswith("punix:") | name.startswith("ptcp:") > > - def __init__(self, sock, name, bind_path): > + def __init__(self, sock, name, bind_path, pipe=None): > self.name = name > + self.pipe = pipe > self.socket = sock > + if pipe is not None: > + self.connect = pywintypes.OVERLAPPED() > + self.connect.hEvent = winutils.get_new_event(bManualReset=True) > + self.connect_pending = False > + suffix = name.split(":", 1)[1] > + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) > + self._pipename = winutils.get_pipe_name(suffix) > + > self.bind_path = bind_path > > @staticmethod > @@ -315,11 +560,27 @@ class PassiveStream(object): > bind_path = name[6:] > if name.startswith("punix:"): > bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) > - error, sock = > ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, > - True, bind_path, > - None) > - if error: > - return error, None > + if sys.platform != 'win32': > + error, sock = ovs.socket_util.make_unix_socket( > + socket.SOCK_STREAM, True, bind_path, None) > + if error: > + return error, None > + else: > + # Branch used only on Windows > + try: > + open(bind_path, 'w').close() > + except: > + return errno.ENOENT, None > + > + pipename = winutils.get_pipe_name(bind_path) > + if len(pipename) > 255: > + # Return invalid argument if the name is too long > + return errno.ENOENT, None > + > + npipe = winutils.create_named_pipe(pipename) > + if not npipe: > + return errno.ENOENT, None > + return 0, PassiveStream(None, name, bind_path, > + pipe=npipe) > > elif name.startswith("ptcp:"): > sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ - > 341,7 +602,11 @@ class PassiveStream(object): > > def close(self): > """Closes this PassiveStream.""" > - self.socket.close() > + if self.socket is not None: > + self.socket.close() > + if self.pipe is not None: > + winutils.close_handle(self.pipe, vlog.warn) > + winutils.close_handle(self.connect.hEvent, vlog.warn) > if self.bind_path is not None: > ovs.fatal_signal.unlink_file_now(self.bind_path) > self.bind_path = None > @@ -354,28 +619,80 @@ class PassiveStream(object): > > Will not block waiting for a connection. If no connection is ready to > be accepted, returns (errno.EAGAIN, None) immediately.""" > - > + if sys.platform == 'win32' and self.socket is None: > + return self.__accept_windows() > while True: > try: > sock, addr = self.socket.accept() > ovs.socket_util.set_nonblocking(sock) > - if (sock.family == socket.AF_UNIX): > + if (sys.platform != 'win32' and sock.family == socket.AF_UNIX): > return 0, Stream(sock, "unix:%s" % addr, 0) > return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], > str(addr[1])), 0) > except socket.error as e: > error = ovs.socket_util.get_exception_errno(e) > + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: > + # WSAEWOULDBLOCK would be the equivalent on Windows > + # for EAGAIN on Unix. > + error = errno.EAGAIN > if error != errno.EAGAIN: > # XXX rate-limit > vlog.dbg("accept: %s" % os.strerror(error)) > return error, None > > + def __accept_windows(self): > + if self.connect_pending: > + try: > + winutils.get_overlapped_result(self.pipe, self.connect, False) > + except pywintypes.error as e: > + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: > + # The operation is still pending, try again > + self.connect_pending = True > + return errno.EAGAIN, None > + else: > + if self.pipe: > + win32pipe.DisconnectNamedPipe(self.pipe) > + return errno.EINVAL, None > + self.connect_pending = False > + > + error = winutils.connect_named_pipe(self.pipe, self.connect) > + if error: > + if error == winutils.winerror.ERROR_IO_PENDING: > + self.connect_pending = True > + return errno.EAGAIN, None > + elif error != winutils.winerror.ERROR_PIPE_CONNECTED: > + if self.pipe: > + win32pipe.DisconnectNamedPipe(self.pipe) > + self.connect_pending = False > + return errno.EINVAL, None > + else: > + win32event.SetEvent(self.connect.hEvent) > + > + npipe = winutils.create_named_pipe(self._pipename) > + if not npipe: > + return errno.ENOENT, None > + > + old_pipe = self.pipe > + self.pipe = npipe > + winutils.win32event.ResetEvent(self.connect.hEvent) > + return 0, Stream(None, self.name, 0, pipe=old_pipe) > + > def wait(self, poller): > - poller.fd_wait(self.socket, ovs.poller.POLLIN) > + if sys.platform != 'win32' or self.socket is not None: > + poller.fd_wait(self.socket, ovs.poller.POLLIN) > + else: > + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) > > def __del__(self): > # Don't delete the file: we might have forked. > - self.socket.close() > + if self.socket is not None: > + self.socket.close() > + if self.pipe is not None: > + # Check if there are any remaining valid handles and close them > + if self.pipe: > + winutils.close_handle(self.pipe) > + if self._connect.hEvent: > + winutils.close_handle(self._read.hEvent) > > > def usage(name): > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py > index 8595ed8..3f3e051 100644 > --- a/python/ovs/unixctl/server.py > +++ b/python/ovs/unixctl/server.py > @@ -148,6 +148,10 @@ class UnixctlServer(object): > def run(self): > for _ in range(10): > error, stream = self._listener.accept() > + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: > + # WSAEWOULDBLOCK would be the equivalent on Windows > + # for EAGAIN on Unix. > + error = errno.EAGAIN > if not error: > rpc = ovs.jsonrpc.Connection(stream) > self._conns.append(UnixctlConnection(rpc)) > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py index > 18634e6..3eabcd7 100644 > --- a/tests/test-jsonrpc.py > +++ b/tests/test-jsonrpc.py > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg): > > > def do_listen(name): > - error, pstream = ovs.stream.PassiveStream.open(name) > - if error: > - sys.stderr.write("could not listen on \"%s\": %s\n" > - % (name, os.strerror(error))) > - sys.exit(1) > + if sys.platform != 'win32' or ( > + ovs.daemon._detach and ovs.daemon._detached): > + # On Windows the child is a new process created which should be the > + # one that creates the PassiveStream. Without this check, the new > + # child process will create a new PassiveStream overwriting the one > + # that the parent process created. > + error, pstream = ovs.stream.PassiveStream.open(name) > + if error: > + sys.stderr.write("could not listen on \"%s\": %s\n" > + % (name, os.strerror(error))) > + sys.exit(1) > > ovs.daemon.daemonize() > > -- > 2.10.0.windows.1
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index 6300c67..5a11500 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -14,6 +14,7 @@ import errno import os +import sys import six @@ -274,6 +275,11 @@ class Connection(object): except UnicodeError: error = errno.EILSEQ if error: + if (sys.platform == "win32" and + error == errno.WSAEWOULDBLOCK): + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if error == errno.EAGAIN: return error, None else: diff --git a/python/ovs/poller.py b/python/ovs/poller.py index d7cb7d3..d836483 100644 --- a/python/ovs/poller.py +++ b/python/ovs/poller.py @@ -18,6 +18,10 @@ import ovs.vlog import select import socket import os +import sys + +if sys.platform == "win32": + import ovs.winutils as winutils try: from OpenSSL import SSL @@ -62,7 +66,9 @@ class _SelectSelect(object): if SSL and isinstance(fd, SSL.Connection): fd = fd.fileno() - assert isinstance(fd, int) + if sys.platform != 'win32': + # Skip this on Windows, it also register events + assert isinstance(fd, int) if events & POLLIN: self.rlist.append(fd) events &= ~POLLIN @@ -73,28 +79,58 @@ class _SelectSelect(object): self.xlist.append(fd) def poll(self, timeout): - if timeout == -1: - # epoll uses -1 for infinite timeout, select uses None. - timeout = None - else: - timeout = float(timeout) / 1000 # XXX workaround a bug in eventlet # see https://github.com/eventlet/eventlet/pull/25 if timeout == 0 and _using_eventlet_green_select(): timeout = 0.1 + if sys.platform == 'win32': + events = self.rlist + self.wlist + self.xlist + if not events: + return [] + if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS: + raise WindowsError("Cannot handle more than maximum wait" + "objects\n") + + # win32event.INFINITE timeout is -1 + # timeout must be an int number, expressed in ms + if timeout == 0.1: + timeout = 100 + else: + timeout = int(timeout) + + # Wait until any of the events is set to signaled + try: + retval = winutils.win32event.WaitForMultipleObjects( + events, + False, # Wait all + timeout) + except winutils.pywintypes.error: + return [(0, POLLERR)] - rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist, - timeout) - events_dict = {} - for fd in rlist: - events_dict[fd] = events_dict.get(fd, 0) | POLLIN - for fd in wlist: - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT - for fd in xlist: - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | - POLLHUP | - POLLNVAL) - return list(events_dict.items()) + if retval == winutils.winerror.WAIT_TIMEOUT: + return [] + + return [(events[retval], 0)] + else: + if timeout == -1: + # epoll uses -1 for infinite timeout, select uses None. + timeout = None + else: + timeout = float(timeout) / 1000 + rlist, wlist, xlist = select.select(self.rlist, + self.wlist, + self.xlist, + timeout) + events_dict = {} + for fd in rlist: + events_dict[fd] = events_dict.get(fd, 0) | POLLIN + for fd in wlist: + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT + for fd in xlist: + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | + POLLHUP | + POLLNVAL) + return list(events_dict.items()) SelectPoll = _SelectSelect diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py index b358b05..fb6cee4 100644 --- a/python/ovs/socket_util.py +++ b/python/ovs/socket_util.py @@ -17,6 +17,7 @@ import os import os.path import random import socket +import sys import six from six.moves import range @@ -25,6 +26,10 @@ import ovs.fatal_signal import ovs.poller import ovs.vlog +if sys.platform == 'win32': + import ovs.winutils as winutils + import win32file + vlog = ovs.vlog.Vlog("socket_util") @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False): def check_connection_completion(sock): p = ovs.poller.SelectPoll() - p.register(sock, ovs.poller.POLLOUT) + if sys.platform == "win32": + event = winutils.get_new_event(None, False, True, None) + # Receive notification of readiness for writing, of completed + # connection or multipoint join operation, and of socket closure. + win32file.WSAEventSelect(sock, event, + win32file.FD_WRITE | + win32file.FD_CONNECT | + win32file.FD_CLOSE) + p.register(event, ovs.poller.POLLOUT) + else: + p.register(sock, ovs.poller.POLLOUT) pfds = p.poll(0) if len(pfds) == 1: revents = pfds[0][1] @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, dscp): try: sock.connect(address) except socket.error as e: - if get_exception_errno(e) != errno.EINPROGRESS: + error = get_exception_errno(e) + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EINPROGRESS on Unix. + error = errno.EINPROGRESS + if error != errno.EINPROGRESS: raise return 0, sock except socket.error as e: @@ -257,9 +277,12 @@ def get_null_fd(): global null_fd if null_fd < 0: try: - null_fd = os.open("/dev/null", os.O_RDWR) + # os.devnull ensures compatibility with Windows, returns + # '/dev/null' for Unix and 'nul' for Windows + null_fd = os.open(os.devnull, os.O_RDWR) except OSError as e: - vlog.err("could not open /dev/null: %s" % os.strerror(e.errno)) + vlog.err("could not open %s: %s" % (os.devnull, + os.strerror(e.errno))) return -e.errno return null_fd diff --git a/python/ovs/stream.py b/python/ovs/stream.py index cd57eb3..e8e5700 100644 --- a/python/ovs/stream.py +++ b/python/ovs/stream.py @@ -15,6 +15,7 @@ import errno import os import socket +import sys import six @@ -27,6 +28,13 @@ try: except ImportError: SSL = None +if sys.platform == 'win32': + import ovs.winutils as winutils + import pywintypes + import win32event + import win32file + import win32pipe + vlog = ovs.vlog.Vlog("stream") @@ -63,6 +71,13 @@ class Stream(object): _SSL_certificate_file = None _SSL_ca_cert_file = None + # Windows only + _write = None # overlapped for write operation + _read = None # overlapped for read operation + _write_pending = False + _read_pending = False + _retry_connect = False + @staticmethod def register_method(method, cls): Stream._SOCKET_METHODS[method + ":"] = cls @@ -81,8 +96,23 @@ class Stream(object): otherwise False.""" return bool(Stream._find_method(name)) - def __init__(self, socket, name, status): + def __init__(self, socket, name, status, pipe=None, is_server=False): self.socket = socket + self.pipe = pipe + if sys.platform == 'win32': + self._read = pywintypes.OVERLAPPED() + self._read.hEvent = winutils.get_new_event() + self._write = pywintypes.OVERLAPPED() + self._write.hEvent = winutils.get_new_event() + if pipe is not None: + # Flag to check if fd is a server HANDLE. In the case of a + # server handle we have to issue a disconnect before closing + # the actual handle. + self._server = is_server + suffix = name.split(":", 1)[1] + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + self._pipename = winutils.get_pipe_name(suffix) + self.name = name if status == errno.EAGAIN: self.state = Stream.__S_CONNECTING @@ -120,6 +150,38 @@ class Stream(object): suffix = name.split(":", 1)[1] if name.startswith("unix:"): suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + if sys.platform == 'win32': + pipename = winutils.get_pipe_name(suffix) + + if len(suffix) > 255: + # Return invalid argument if the name is too long + return errno.ENOENT, None + + try: + # In case of "unix:" argument, the assumption is that + # there is a file created in the path (suffix). + open(suffix, 'r').close() + except: + return errno.ENOENT, None + + try: + npipe = winutils.create_file(pipename) + try: + winutils.set_pipe_mode(npipe, + win32pipe.PIPE_READMODE_BYTE) + except pywintypes.error as e: + return errno.ENOENT, None + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: + # Pipe is busy, set the retry flag to true and retry + # again during the connect function. + Stream.retry_connect = True + return 0, cls(None, name, errno.EAGAIN, + pipe=win32file.INVALID_HANDLE_VALUE, + is_server=False) + return errno.ENOENT, None + return 0, cls(None, name, 0, pipe=npipe, is_server=False) + error, sock = cls._open(suffix, dscp) if error: return error, None @@ -145,6 +207,10 @@ class Stream(object): if not error: while True: error = stream.connect() + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if error != errno.EAGAIN: break stream.run() @@ -152,7 +218,8 @@ class Stream(object): stream.run_wait(poller) stream.connect_wait(poller) poller.block() - assert error != errno.EINPROGRESS + if stream.socket is not None: + assert error != errno.EINPROGRESS if error and stream: stream.close() @@ -160,11 +227,35 @@ class Stream(object): return error, stream def close(self): - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + if self._server: + win32pipe.DisconnectNamedPipe(self.pipe) + winutils.close_handle(self.pipe, vlog.warn) + winutils.close_handle(self._read.hEvent, vlog.warn) + winutils.close_handle(self._write.hEvent, vlog.warn) def __scs_connecting(self): - retval = ovs.socket_util.check_connection_completion(self.socket) - assert retval != errno.EINPROGRESS + if self.socket is not None: + retval = ovs.socket_util.check_connection_completion(self.socket) + assert retval != errno.EINPROGRESS + elif sys.platform == 'win32' and self.retry_connect: + try: + self.pipe = winutils.create_file(self._pipename) + self._retry_connect = False + retval = 0 + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: + retval = errno.EAGAIN + else: + self._retry_connect = False + retval = errno.ENOENT + else: + # Windows only, if retry_connect is false, it means it's already + # connected so we can set the value of retval to 0 + retval = 0 + if retval == 0: self.state = Stream.__S_CONNECTED elif retval != errno.EAGAIN: @@ -209,11 +300,63 @@ class Stream(object): elif n == 0: return (0, "") + if sys.platform == 'win32' and self.socket is None: + return self.__recv_windows(n) + try: return (0, self.socket.recv(n)) except socket.error as e: return (ovs.socket_util.get_exception_errno(e), "") + def __recv_windows(self, n): + if self._read_pending: + try: + nBytesRead = winutils.get_overlapped_result(self.pipe, + self._read, + False) + self._read_pending = False + recvBuffer = self._read_buffer[:nBytesRead] + + return (0, winutils.get_decoded_buffer(recvBuffer)) + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: + # The operation is still pending, try again + self._read_pending = True + return (errno.EAGAIN, "") + elif e.winerror in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return 0. + return (0, "") + else: + return (errno.EINVAL, "") + + (errCode, self._read_buffer) = winutils.read_file(self.pipe, + n, + self._read) + if errCode: + if errCode == winutils.winerror.ERROR_IO_PENDING: + self._read_pending = True + return (errno.EAGAIN, "") + elif errCode in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return 0. + return (0, "") + else: + return (errCode, "") + + try: + nBytesRead = winutils.get_overlapped_result(self.pipe, + self._read, + False) + winutils.win32event.SetEvent(self._read.hEvent) + except pywintypes.error as e: + if e.winerror in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return 0. + return (0, "") + else: + return (e.winerror, "") + + recvBuffer = self._read_buffer[:nBytesRead] + return (0, winutils.get_decoded_buffer(recvBuffer)) + def send(self, buf): """Tries to send 'buf' on this stream. @@ -231,6 +374,9 @@ class Stream(object): elif len(buf) == 0: return 0 + if sys.platform == 'win32' and self.socket is None: + return self.__send_windows(buf) + try: # Python 3 has separate types for strings and bytes. We must have # bytes here. @@ -240,6 +386,40 @@ class Stream(object): except socket.error as e: return -ovs.socket_util.get_exception_errno(e) + def __send_windows(self, buf): + if self._write_pending: + try: + nBytesWritten = winutils.get_overlapped_result(self.pipe, + self._write, + False) + self._write_pending = False + return nBytesWritten + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: + # The operation is still pending, try again + self._read_pending = True + return -errno.EAGAIN + elif e.winerror in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return connection reset. + return -errno.ECONNRESET + else: + return -errno.EINVAL + + buf = winutils.get_encoded_buffer(buf) + self._write_pending = False + (errCode, nBytesWritten) = winutils.write_file(self.pipe, + buf, + self._write) + if errCode: + if errCode == winutils.winerror.ERROR_IO_PENDING: + self._write_pending = True + return -errno.EAGAIN + if (not nBytesWritten and + errCode in winutils.pipe_disconnected_errors): + # If the pipe was disconnected, return connection reset. + return -errno.ECONNRESET + return nBytesWritten + def run(self): pass @@ -255,11 +435,52 @@ class Stream(object): if self.state == Stream.__S_CONNECTING: wait = Stream.W_CONNECT + + if sys.platform == 'win32': + self.__wait_windows(poller, wait) + return + if wait == Stream.W_RECV: poller.fd_wait(self.socket, ovs.poller.POLLIN) else: poller.fd_wait(self.socket, ovs.poller.POLLOUT) + def __wait_windows(self, poller, wait): + if self.socket is not None: + if wait == Stream.W_RECV: + read_flags = (win32file.FD_READ | + win32file.FD_ACCEPT | + win32file.FD_CLOSE) + try: + win32file.WSAEventSelect(self.socket, + self._read.hEvent, + read_flags) + except pywintypes.error as e: + vlog.err("failed to associate events with socket: %s" + % e.strerror) + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) + else: + write_flags = (win32file.FD_WRITE | + win32file.FD_CONNECT | + win32file.FD_CLOSE) + try: + win32file.WSAEventSelect(self.socket, + self._write.hEvent, + write_flags) + except pywintypes.error as e: + vlog.err("failed to associate events with socket: %s" + % e.strerror) + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) + else: + if wait == Stream.W_RECV: + if self._read: + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) + elif wait == Stream.W_SEND: + if self._write: + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) + elif wait == Stream.W_CONNECT: + return + def connect_wait(self, poller): self.wait(poller, Stream.W_CONNECT) @@ -267,11 +488,22 @@ class Stream(object): self.wait(poller, Stream.W_RECV) def send_wait(self, poller): + if sys.platform == 'win32': + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) self.wait(poller, Stream.W_SEND) def __del__(self): # Don't delete the file: we might have forked. - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + # Check if there are any remaining valid handles and close them + if self.pipe: + winutils.close_handle(self.pipe) + if self._read.hEvent: + winutils.close_handle(self._read.hEvent) + if self._write.hEvent: + winutils.close_handle(self._write.hEvent) @staticmethod def ssl_set_private_key_file(file_name): @@ -287,6 +519,10 @@ class Stream(object): class PassiveStream(object): + # Windows only + connect = None # overlapped for read operation + connect_pending = False + @staticmethod def is_valid_name(name): """Returns True if 'name' is a passive stream name in the form @@ -294,9 +530,18 @@ class PassiveStream(object): "punix:" or "ptcp"), otherwise False.""" return name.startswith("punix:") | name.startswith("ptcp:") - def __init__(self, sock, name, bind_path): + def __init__(self, sock, name, bind_path, pipe=None): self.name = name + self.pipe = pipe self.socket = sock + if pipe is not None: + self.connect = pywintypes.OVERLAPPED() + self.connect.hEvent = winutils.get_new_event(bManualReset=True) + self.connect_pending = False + suffix = name.split(":", 1)[1] + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + self._pipename = winutils.get_pipe_name(suffix) + self.bind_path = bind_path @staticmethod @@ -315,11 +560,27 @@ class PassiveStream(object): bind_path = name[6:] if name.startswith("punix:"): bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) - error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, - True, bind_path, - None) - if error: - return error, None + if sys.platform != 'win32': + error, sock = ovs.socket_util.make_unix_socket( + socket.SOCK_STREAM, True, bind_path, None) + if error: + return error, None + else: + # Branch used only on Windows + try: + open(bind_path, 'w').close() + except: + return errno.ENOENT, None + + pipename = winutils.get_pipe_name(bind_path) + if len(pipename) > 255: + # Return invalid argument if the name is too long + return errno.ENOENT, None + + npipe = winutils.create_named_pipe(pipename) + if not npipe: + return errno.ENOENT, None + return 0, PassiveStream(None, name, bind_path, pipe=npipe) elif name.startswith("ptcp:"): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -341,7 +602,11 @@ class PassiveStream(object): def close(self): """Closes this PassiveStream.""" - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + winutils.close_handle(self.pipe, vlog.warn) + winutils.close_handle(self.connect.hEvent, vlog.warn) if self.bind_path is not None: ovs.fatal_signal.unlink_file_now(self.bind_path) self.bind_path = None @@ -354,28 +619,80 @@ class PassiveStream(object): Will not block waiting for a connection. If no connection is ready to be accepted, returns (errno.EAGAIN, None) immediately.""" - + if sys.platform == 'win32' and self.socket is None: + return self.__accept_windows() while True: try: sock, addr = self.socket.accept() ovs.socket_util.set_nonblocking(sock) - if (sock.family == socket.AF_UNIX): + if (sys.platform != 'win32' and sock.family == socket.AF_UNIX): return 0, Stream(sock, "unix:%s" % addr, 0) return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], str(addr[1])), 0) except socket.error as e: error = ovs.socket_util.get_exception_errno(e) + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if error != errno.EAGAIN: # XXX rate-limit vlog.dbg("accept: %s" % os.strerror(error)) return error, None + def __accept_windows(self): + if self.connect_pending: + try: + winutils.get_overlapped_result(self.pipe, self.connect, False) + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: + # The operation is still pending, try again + self.connect_pending = True + return errno.EAGAIN, None + else: + if self.pipe: + win32pipe.DisconnectNamedPipe(self.pipe) + return errno.EINVAL, None + self.connect_pending = False + + error = winutils.connect_named_pipe(self.pipe, self.connect) + if error: + if error == winutils.winerror.ERROR_IO_PENDING: + self.connect_pending = True + return errno.EAGAIN, None + elif error != winutils.winerror.ERROR_PIPE_CONNECTED: + if self.pipe: + win32pipe.DisconnectNamedPipe(self.pipe) + self.connect_pending = False + return errno.EINVAL, None + else: + win32event.SetEvent(self.connect.hEvent) + + npipe = winutils.create_named_pipe(self._pipename) + if not npipe: + return errno.ENOENT, None + + old_pipe = self.pipe + self.pipe = npipe + winutils.win32event.ResetEvent(self.connect.hEvent) + return 0, Stream(None, self.name, 0, pipe=old_pipe) + def wait(self, poller): - poller.fd_wait(self.socket, ovs.poller.POLLIN) + if sys.platform != 'win32' or self.socket is not None: + poller.fd_wait(self.socket, ovs.poller.POLLIN) + else: + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) def __del__(self): # Don't delete the file: we might have forked. - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + # Check if there are any remaining valid handles and close them + if self.pipe: + winutils.close_handle(self.pipe) + if self._connect.hEvent: + winutils.close_handle(self._read.hEvent) def usage(name): diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py index 8595ed8..3f3e051 100644 --- a/python/ovs/unixctl/server.py +++ b/python/ovs/unixctl/server.py @@ -148,6 +148,10 @@ class UnixctlServer(object): def run(self): for _ in range(10): error, stream = self._listener.accept() + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if not error: rpc = ovs.jsonrpc.Connection(stream) self._conns.append(UnixctlConnection(rpc)) diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py index 18634e6..3eabcd7 100644 --- a/tests/test-jsonrpc.py +++ b/tests/test-jsonrpc.py @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg): def do_listen(name): - error, pstream = ovs.stream.PassiveStream.open(name) - if error: - sys.stderr.write("could not listen on \"%s\": %s\n" - % (name, os.strerror(error))) - sys.exit(1) + if sys.platform != 'win32' or ( + ovs.daemon._detach and ovs.daemon._detached): + # On Windows the child is a new process created which should be the + # one that creates the PassiveStream. Without this check, the new + # child process will create a new PassiveStream overwriting the one + # that the parent process created. + error, pstream = ovs.stream.PassiveStream.open(name) + if error: + sys.stderr.write("could not listen on \"%s\": %s\n" + % (name, os.strerror(error))) + sys.exit(1) ovs.daemon.daemonize()