diff mbox

[ovs-dev,V2,04/10] python tests: Ported UNIX sockets to Windows

Message ID 1472558417-2016-5-git-send-email-pboca@cloudbasesolutions.com
State Superseded
Delegated to: Guru Shetty
Headers show

Commit Message

Paul Boca Aug. 30, 2016, 12:01 p.m. UTC
AF_UNIX sockets are not supported on Windows.
Instead of an AF_UNIX socket use named pipes to communicate
between components. This makes the python sockets compatible with
the named pipe used in Windows applications.
Added stream_windows.py with named pipe and localhost
tcp connections support.

Signed-off-by: Paul-Daniel Boca <pboca@cloudbasesolutions.com>
---
V2: Moved test-jsonrpc.py changes from current patch to previous one.
    Fixed named pipe disconnect errors.
---
 python/automake.mk           |   1 +
 python/ovs/jsonrpc.py        |   9 +-
 python/ovs/poller.py         |  49 +++-
 python/ovs/socket_util.py    |  20 +-
 python/ovs/stream_windows.py | 607 +++++++++++++++++++++++++++++++++++++++++++
 python/ovs/unixctl/client.py |   6 +-
 python/ovs/unixctl/server.py |  11 +-
 tests/test-jsonrpc.py        |   9 +-
 tests/test-ovsdb.py          |   8 +-
 9 files changed, 695 insertions(+), 25 deletions(-)
 create mode 100644 python/ovs/stream_windows.py
diff mbox

Patch

diff --git a/python/automake.mk b/python/automake.mk
index 3fe9519..7bbf382 100644
--- a/python/automake.mk
+++ b/python/automake.mk
@@ -27,6 +27,7 @@  ovs_pyfiles = \
 	python/ovs/process.py \
 	python/ovs/reconnect.py \
 	python/ovs/socket_util.py \
+	python/ovs/stream_windows.py \
 	python/ovs/stream_unix.py \
 	python/ovs/timeval.py \
 	python/ovs/unixctl/__init__.py \
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
index 8ca01a0..d70f13e 100644
--- a/python/ovs/jsonrpc.py
+++ b/python/ovs/jsonrpc.py
@@ -14,13 +14,17 @@ 
 
 import errno
 import os
+import sys
 
 import six
 
 import ovs.json
 import ovs.poller
 import ovs.reconnect
-import ovs.stream_unix as ovs_stream
+if sys.platform == "win32":
+    import ovs.stream_windows as ovs_stream
+else:
+    import ovs.stream_unix as ovs_stream
 import ovs.timeval
 import ovs.util
 import ovs.vlog
@@ -274,6 +278,9 @@  class Connection(object):
                     except UnicodeError:
                         error = errno.EILSEQ
                 if error:
+                    if (sys.platform == "win32"
+                       and error == errno.WSAEWOULDBLOCK):
+                        error = errno.EAGAIN
                     if error == errno.EAGAIN:
                         return error, None
                     else:
diff --git a/python/ovs/poller.py b/python/ovs/poller.py
index de6bf22..970decc 100644
--- a/python/ovs/poller.py
+++ b/python/ovs/poller.py
@@ -18,6 +18,7 @@  import ovs.vlog
 import select
 import socket
 import os
+import sys
 
 try:
     import eventlet.patcher
@@ -54,7 +55,8 @@  class _SelectSelect(object):
     def register(self, fd, events):
         if isinstance(fd, socket.socket):
             fd = fd.fileno()
-        assert isinstance(fd, int)
+        if not sys.platform == "win32":
+            assert isinstance(fd, int)
         if events & POLLIN:
             self.rlist.append(fd)
             events &= ~POLLIN
@@ -75,18 +77,39 @@  class _SelectSelect(object):
         if timeout == 0 and _using_eventlet_green_select():
             timeout = 0.1
 
-        rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
-                                            timeout)
-        events_dict = {}
-        for fd in rlist:
-            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
-        for fd in wlist:
-            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
-        for fd in xlist:
-            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
-                                                        POLLHUP |
-                                                        POLLNVAL)
-        return list(events_dict.items())
+        if sys.platform == "win32":
+            import win32event
+            import winerror
+
+            if timeout is None:
+                timeout = 0xFFFFFFFF
+            else:
+                timeout = int(timeout * 1000)
+
+            events = self.rlist + self.wlist + self.xlist
+            if not events:
+                return list()
+            error = win32event.WaitForMultipleObjectsEx(events, False,
+                                                        timeout, False)
+            if error == winerror.WAIT_TIMEOUT:
+                return list()
+
+            return [(events[error], 0)]
+        else:
+            rlist, wlist, xlist = select.select(self.rlist,
+                                                self.wlist,
+                                                self.xlist,
+                                                timeout)
+            events_dict = {}
+            for fd in rlist:
+                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
+            for fd in wlist:
+                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
+            for fd in xlist:
+                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
+                                                            POLLHUP |
+                                                            POLLNVAL)
+            return list(events_dict.items())
 
 
 SelectPoll = _SelectSelect
diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
index b358b05..54f448d 100644
--- a/python/ovs/socket_util.py
+++ b/python/ovs/socket_util.py
@@ -17,6 +17,7 @@  import os
 import os.path
 import random
 import socket
+import sys
 
 import six
 from six.moves import range
@@ -25,6 +26,10 @@  import ovs.fatal_signal
 import ovs.poller
 import ovs.vlog
 
+if sys.platform == "win32":
+    import win32file
+    import win32event
+
 vlog = ovs.vlog.Vlog("socket_util")
 
 
@@ -158,7 +163,15 @@  def make_unix_socket(style, nonblock, bind_path, connect_path, short=False):
 
 def check_connection_completion(sock):
     p = ovs.poller.SelectPoll()
-    p.register(sock, ovs.poller.POLLOUT)
+    if sys.platform == "win32":
+        event = win32event.CreateEvent(None, False, True, None)
+        win32file.WSAEventSelect(sock, event,
+                                win32file.FD_WRITE |
+                                win32file.FD_CONNECT |
+                                win32file.FD_CLOSE)
+        p.register(event, ovs.poller.POLLOUT)
+    else:
+        p.register(sock, ovs.poller.POLLOUT)
     pfds = p.poll(0)
     if len(pfds) == 1:
         revents = pfds[0][1]
@@ -228,7 +241,10 @@  def inet_open_active(style, target, default_port, dscp):
         try:
             sock.connect(address)
         except socket.error as e:
-            if get_exception_errno(e) != errno.EINPROGRESS:
+            error = get_exception_errno(e)
+            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
+                error = errno.EINPROGRESS
+            if error != errno.EINPROGRESS:
                 raise
         return 0, sock
     except socket.error as e:
diff --git a/python/ovs/stream_windows.py b/python/ovs/stream_windows.py
new file mode 100644
index 0000000..064cbcb
--- /dev/null
+++ b/python/ovs/stream_windows.py
@@ -0,0 +1,607 @@ 
+# Copyright (c) 2010, 2011, 2012 Nicira, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import errno
+import os
+import socket
+import sys
+import six
+
+import ovs.poller
+import ovs.socket_util
+import ovs.vlog
+
+import pywintypes
+import winerror
+import win32pipe
+import win32con
+import win32security
+import win32file
+import win32event
+
+vlog = ovs.vlog.Vlog("stream")
+
+
+def stream_or_pstream_needs_probes(name):
+    """ 1 if the stream or pstream specified by 'name' needs periodic probes to
+    verify connectivity.  For [p]streams which need probes, it can take a long
+    time to notice the connection was dropped.  Returns 0 if probes aren't
+    needed, and -1 if 'name' is invalid"""
+
+    if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name):
+        # Only unix and punix are supported currently.
+        return 0
+    else:
+        return -1
+
+
+class Stream(object):
+    """Bidirectional byte stream.  Currently only Unix domain sockets
+    are implemented."""
+
+    # States.
+    __S_CONNECTING = 0
+    __S_CONNECTED = 1
+    __S_DISCONNECTED = 2
+
+    # Kinds of events that one might wait for.
+    W_CONNECT = 0               # Connect complete (success or failure).
+    W_RECV = 1                  # Data received.
+    W_SEND = 2                  # Send buffer room available.
+
+    _SOCKET_METHODS = {}
+
+    write = None                # overlapped for write operation
+    read = None                 # overlapped for read operation
+    write_pending = False
+    read_pending = False
+    retry_connect = False
+
+    @staticmethod
+    def register_method(method, cls):
+        Stream._SOCKET_METHODS[method + ":"] = cls
+
+    @staticmethod
+    def _find_method(name):
+        for method, cls in six.iteritems(Stream._SOCKET_METHODS):
+            if name.startswith(method):
+                return cls
+        return None
+
+    @staticmethod
+    def is_valid_name(name):
+        """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
+        TYPE is a supported stream type (currently only "unix:" and "tcp:"),
+        otherwise False."""
+        return bool(Stream._find_method(name))
+
+    def __init__(self, sock, name, status):
+        if isinstance(sock, socket.socket):
+            self.socket = sock
+        else:
+            self.pipe = sock
+            self.read = pywintypes.OVERLAPPED()
+            self.read.hEvent = win32event.CreateEvent(None, True, True, None)
+            self.write = pywintypes.OVERLAPPED()
+            self.write.hEvent = win32event.CreateEvent(None, True, True, None)
+
+        self.name = name
+        if status == errno.EAGAIN:
+            self.state = Stream.__S_CONNECTING
+        elif status == 0:
+            self.state = Stream.__S_CONNECTED
+        else:
+            self.state = Stream.__S_DISCONNECTED
+
+        self.error = 0
+
+    # Default value of dscp bits for connection between controller and manager.
+    # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined
+    # in <netinet/ip.h> is used.
+    IPTOS_PREC_INTERNETCONTROL = 0xc0
+    DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2
+
+    @staticmethod
+    def open(name, dscp=DSCP_DEFAULT):
+        """Attempts to connect a stream to a remote peer.  'name' is a
+        connection name in the form "TYPE:ARGS", where TYPE is an active stream
+        class's name and ARGS are stream class-specific.  Currently the only
+        supported TYPEs are "unix" and "tcp".
+
+        Returns (error, stream): on success 'error' is 0 and 'stream' is the
+        new Stream, on failure 'error' is a positive errno value and 'stream'
+        is None.
+
+        Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0
+        and a new Stream.  The connect() method can be used to check for
+        successful connection completion."""
+        cls = Stream._find_method(name)
+        if not cls:
+            return errno.EAFNOSUPPORT, None
+
+        suffix = name.split(":", 1)[1]
+        if name.startswith("unix:"):
+            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+            suffix = suffix.replace('/', '')
+            suffix = suffix.replace('\\', '')
+            suffix = "\\\\.\\pipe\\" + suffix
+
+            saAttr = win32security.SECURITY_ATTRIBUTES()
+            saAttr.bInheritHandle = 1
+            try:
+                npipe = win32file.CreateFile(
+                            suffix,
+                            win32file.GENERIC_READ | win32file.GENERIC_WRITE,
+                            0, None,
+                            win32file.OPEN_EXISTING,
+                            win32file.FILE_ATTRIBUTE_NORMAL |
+                            win32file.FILE_FLAG_OVERLAPPED |
+                            win32file.FILE_FLAG_NO_BUFFERING,
+                            None)
+            except pywintypes.error as e:
+                return e.winerror, None
+
+            return 0, Stream(npipe, suffix, 0)
+        else:
+            error, sock = cls._open(suffix, dscp)
+            if error:
+                return error, None
+            else:
+                status = ovs.socket_util.check_connection_completion(sock)
+                return 0, Stream(sock, name, status)
+
+    @staticmethod
+    def _open(suffix, dscp):
+        raise NotImplementedError("This method must be overrided by subclass")
+
+    @staticmethod
+    def open_block(error_stream):
+        """Blocks until a Stream completes its connection attempt, either
+        succeeding or failing.  (error, stream) should be the tuple returned by
+        Stream.open().  Returns a tuple of the same form.
+
+        Typical usage:
+        error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
+
+        # Py3 doesn't support tuple parameter unpacking - PEP 3113
+        error, stream = error_stream
+        if not error:
+            while True:
+                error = stream.connect()
+                if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
+                    error = errno.EAGAIN
+                if error != errno.EAGAIN:
+                    break
+                stream.run()
+                poller = ovs.poller.Poller()
+                stream.run_wait(poller)
+                stream.connect_wait(poller)
+                poller.block()
+            assert error != errno.EINPROGRESS
+
+        if error and stream:
+            stream.close()
+            stream = None
+        return error, stream
+
+    def close(self):
+        if hasattr(self, "socket"):
+            self.socket.close()
+
+    def __scs_connecting(self):
+        if hasattr(self, "socket"):
+            retval = ovs.socket_util.check_connection_completion(self.socket)
+        elif self.retry_connect:
+            saAttr = win32security.SECURITY_ATTRIBUTES()
+            saAttr.bInheritHandle = 1
+
+            try:
+                self.pipe = win32file.CreateFile(
+                            self.name,
+                            win32file.GENERIC_READ | win32file.GENERIC_WRITE,
+                            0, None,
+                            win32file.OPEN_EXISTING,
+                            win32file.FILE_ATTRIBUTE_NORMAL |
+                            win32file.FILE_FLAG_OVERLAPPED |
+                            win32file.FILE_FLAG_NO_BUFFERING,
+                            None)
+            except pywintypes.error:
+                retval = errno.EAGAIN
+                self.retry_connect = True
+
+        assert retval != errno.EINPROGRESS
+        if retval == 0:
+            self.state = Stream.__S_CONNECTED
+        elif retval != errno.EAGAIN:
+            self.state = Stream.__S_DISCONNECTED
+            self.error = retval
+
+    def connect(self):
+        """Tries to complete the connection on this stream.  If the connection
+        is complete, returns 0 if the connection was successful or a positive
+        errno value if it failed.  If the connection is still in progress,
+        returns errno.EAGAIN."""
+        # raise
+        if self.state == Stream.__S_CONNECTING:
+            self.__scs_connecting()
+
+        if self.state == Stream.__S_CONNECTING:
+            return errno.EAGAIN
+        elif self.state == Stream.__S_CONNECTED:
+            return 0
+        else:
+            assert self.state == Stream.__S_DISCONNECTED
+            return self.error
+
+    def recv(self, n):
+        """Tries to receive up to 'n' bytes from this stream.  Returns a
+        (error, string) tuple:
+
+            - If successful, 'error' is zero and 'string' contains between 1
+              and 'n' bytes of data.
+
+            - On error, 'error' is a positive errno value.
+
+            - If the connection has been closed in the normal fashion or if 'n'
+              is 0, the tuple is (0, "").
+
+        The recv function will not block waiting for data to arrive.  If no
+        data have been received, it returns (errno.EAGAIN, "") immediately."""
+
+        retval = self.connect()
+        if retval != 0:
+            return (retval, "")
+        elif n == 0:
+            return (0, "")
+        if hasattr(self, "socket"):
+            try:
+                return (0, self.socket.recv(n))
+            except socket.error as e:
+                return (ovs.socket_util.get_exception_errno(e), "")
+        else:
+            if self.read_pending:
+                try:
+                    nBytesRead = win32file.GetOverlappedResult(self.pipe,
+                                                        self.read,
+                                                        False)
+                    self.read_pending = False
+                    recvBuffer = self.read_buffer[:nBytesRead]
+                    if six.PY3:
+                        return (0, bytes(recvBuffer).decode("utf-8"))
+                    else:
+                        return (0, str(recvBuffer))
+                except pywintypes.error as e:
+                    return (errno.EAGAIN, "")
+
+            try:
+                (errCode, self.read_buffer) = win32file.ReadFile(self.pipe,
+                                                             n,
+                                                             self.read)
+
+                if errCode == winerror.ERROR_IO_PENDING:
+                    self.read_pending = True
+                    return (errno.EAGAIN, "")
+                # elif errCode:
+                    # return (errCode, "")
+
+                nBytesRead = win32file.GetOverlappedResult(self.pipe,
+                                                        self.read,
+                                                        False)
+                win32event.SetEvent(self.read.hEvent)
+                recvBuffer = self.read_buffer[:nBytesRead]
+                if six.PY3:
+                    return (0, bytes(recvBuffer).decode("utf-8"))
+                else:
+                    return (0, str(recvBuffer))
+            except pywintypes.error as e:
+                return (e.winerror, "")
+
+    def send(self, buf):
+        """Tries to send 'buf' on this stream.
+
+        If successful, returns the number of bytes sent, between 1 and
+        len(buf).  0 is only a valid return value if len(buf) is 0.
+
+        On error, returns a negative errno value.
+
+        Will not block.  If no bytes can be immediately accepted for
+        transmission, returns -errno.EAGAIN immediately."""
+
+        retval = self.connect()
+        if retval != 0:
+            return -retval
+        elif len(buf) == 0:
+            return 0
+
+        if hasattr(self, "socket"):
+            try:
+                # Python 3 has separate types for strings and bytes.  We must
+                # have bytes here.
+                if six.PY3 and not isinstance(buf, six.binary_type):
+                    buf = six.binary_type(buf, 'utf-8')
+                return self.socket.send(buf)
+            except socket.error as e:
+                return -ovs.socket_util.get_exception_errno(e)
+        else:
+            if self.write_pending:
+                try:
+                    nBytesWritten = win32file.GetOverlappedResult(self.pipe,
+                                                            self.write,
+                                                            False)
+                    self.write_pending = False
+                    return nBytesWritten
+                except pywintypes.error as e:
+                    return -errno.EAGAIN
+
+            try:
+                # Python 3 has separate types for strings and bytes.  We must
+                # have bytes here.
+                if not isinstance(buf, six.binary_type):
+                    if six.PY3:
+                        buf = six.binary_type(buf, 'utf-8')
+                    else:
+                        buf = six.binary_type(buf)
+
+                self.write_pending = False
+                (errCode, nBytesWritten) = win32file.WriteFile(self.pipe,
+                                                            buf,
+                                                            self.write)
+                if errCode == winerror.ERROR_IO_PENDING:
+                    self.write_pending = True
+                    return -errno.EAGAIN
+                # elif errCode:
+                    # return -errCode
+
+                nBytesWritten = win32file.GetOverlappedResult(self.pipe,
+                                                            self.write,
+                                                            False)
+                win32event.SetEvent(self.write.hEvent)
+
+                return nBytesWritten
+            except pywintypes.error as e:
+                return -e.winerror
+
+    def run(self):
+        pass
+
+    def run_wait(self, poller):
+        pass
+
+    def wait(self, poller, wait):
+        if hasattr(self, "socket"):
+            import win32file
+            import win32event
+
+            assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND)
+
+            if self.state == Stream.__S_DISCONNECTED:
+                poller.immediate_wake()
+                return
+
+            if self.state == Stream.__S_CONNECTING:
+                wait = Stream.W_CONNECT
+
+            event = win32event.CreateEvent(None, True, True, None)
+
+            if wait == Stream.W_RECV:
+                win32file.WSAEventSelect(self.socket, event,
+                                        win32file.FD_READ |
+                                        win32file.FD_ACCEPT |
+                                        win32file.FD_CLOSE)
+                poller.fd_wait(event, ovs.poller.POLLIN)
+            else:
+                win32file.WSAEventSelect(self.socket, event,
+                                        win32file.FD_WRITE |
+                                        win32file.FD_CONNECT |
+                                        win32file.FD_CLOSE)
+                poller.fd_wait(event, ovs.poller.POLLOUT)
+        else:
+            if wait == Stream.W_RECV:
+                if self.read:
+                    poller.fd_wait(self.read.hEvent, ovs.poller.POLLIN)
+            else:
+                if self.write:
+                    poller.fd_wait(self.write.hEvent, ovs.poller.POLLOUT)
+
+    def connect_wait(self, poller):
+        self.wait(poller, Stream.W_CONNECT)
+
+    def recv_wait(self, poller):
+        self.wait(poller, Stream.W_RECV)
+
+    def send_wait(self, poller):
+        poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
+        self.wait(poller, Stream.W_SEND)
+
+    def __del__(self):
+        # Don't delete the file: we might have forked.
+        if hasattr(self, "socket"):
+            self.socket.close()
+
+
+class PassiveStream(object):
+    connect = None                  # overlapped for read operation
+    connect_pending = False
+
+    @staticmethod
+    def is_valid_name(name):
+        """Returns True if 'name' is a passive stream name in the form
+        "TYPE:ARGS" and TYPE is a supported passive stream type (currently
+        "punix:" or "ptcp"), otherwise False."""
+        return name.startswith("punix:") | name.startswith("ptcp:")
+
+    def __init__(self, sock, name, bind_path):
+        self.name = name
+        if isinstance(sock, socket.socket):
+            self.socket = sock
+        else:
+            self.pipe = sock
+        self.bind_path = bind_path
+
+    @staticmethod
+    def open(name):
+        """Attempts to start listening for remote stream connections.  'name'
+        is a connection name in the form "TYPE:ARGS", where TYPE is an passive
+        stream class's name and ARGS are stream class-specific. Currently the
+        supported values for TYPE are "punix" and "ptcp".
+
+        Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
+        new PassiveStream, on failure 'error' is a positive errno value and
+        'pstream' is None."""
+        # raise OSError
+        suffix = name.split(":", 1)[1]
+        if name.startswith("punix:"):
+            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+            try:
+                open(suffix, 'w').close()
+            except:
+                return errno.EAFNOSUPPORT, None
+
+            pipename = suffix.replace('/', '')
+            pipename = pipename.replace('\\', '')
+            pipename = "\\\\.\\pipe\\" + pipename
+
+            saAttr = win32security.SECURITY_ATTRIBUTES()
+            saAttr.bInheritHandle = 1
+
+            npipe = win32pipe.CreateNamedPipe(
+                        pipename,
+                        win32con.PIPE_ACCESS_DUPLEX |
+                        win32con.FILE_FLAG_OVERLAPPED,
+                        win32con.PIPE_TYPE_MESSAGE |
+                        win32con.PIPE_READMODE_BYTE |
+                        win32con.PIPE_WAIT,
+                        64, 65000, 65000, 0, saAttr
+                        )
+            return 0, PassiveStream(npipe, pipename, suffix)
+        else:
+            return errno.EAFNOSUPPORT, None
+
+    def close(self):
+        """Closes this PassiveStream."""
+        if hasattr(self, "socket"):
+            self.socket.close()
+        else:
+            win32pipe.DisconnectNamedPipe(self.pipe)
+        if self.bind_path is not None:
+            ovs.fatal_signal.unlink_file_now(self.bind_path)
+            self.bind_path = None
+
+    def accept(self):
+        """Tries to accept a new connection on this passive stream.  Returns
+        (error, stream): if successful, 'error' is 0 and 'stream' is the new
+        Stream object, and on failure 'error' is a positive errno value and
+        'stream' is None.
+
+        Will not block waiting for a connection.  If no connection is ready to
+        be accepted, returns (errno.EAGAIN, None) immediately."""
+
+        if hasattr(self, "socket"):
+            while True:
+                try:
+                    sock, addr = self.socket.accept()
+                    ovs.socket_util.set_nonblocking(sock)
+                    if (sys.platform != "win32"
+                       and sock.family == socket.AF_UNIX):
+                        return 0, Stream(sock, "unix:%s" % addr, 0)
+                    return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
+                                                           str(addr[1])), 0)
+                except socket.error as e:
+                    error = ovs.socket_util.get_exception_errno(e)
+                    if (sys.platform == "win32" and
+                        error == errno.WSAEWOULDBLOCK):
+                        error = errno.EAGAIN
+                    if error != errno.EAGAIN:
+                        # XXX rate-limit
+                        vlog.dbg("accept: %s" % os.strerror(error))
+                    return error, None
+        else:
+            if self.connect_pending:
+                try:
+                    win32file.GetOverlappedResult(self.pipe,
+                                                self.connect,
+                                                False)
+                    self.connect_pending = False
+                except pywintypes.error as e:
+                    return (errno.EAGAIN, "")
+                return 0, Stream(self.pipe, "", 0)
+
+            try:
+                self.connect_pending = False
+                self.connect = pywintypes.OVERLAPPED()
+                self.connect.hEvent = win32event.CreateEvent(None, True,
+                                                            True, None)
+                error = win32pipe.ConnectNamedPipe(self.pipe, self.connect)
+                if error == winerror.ERROR_IO_PENDING:
+                    self.connect_pending = True
+                    return errno.EAGAIN, None
+
+                stream = Stream(self.pipe, "", 0)
+
+                saAttr = win32security.SECURITY_ATTRIBUTES()
+                saAttr.bInheritHandle = 1
+                self.pipe = win32pipe.CreateNamedPipe(
+                        self.name,
+                        win32con.PIPE_ACCESS_DUPLEX |
+                        win32con.FILE_FLAG_OVERLAPPED,
+                        win32con.PIPE_TYPE_MESSAGE |
+                        win32con.PIPE_READMODE_BYTE |
+                        win32con.PIPE_WAIT,
+                        64, 65000, 65000, 0, saAttr
+                        )
+
+                return 0, stream
+            except pywintypes.error as e:
+                return errno.EAGAIN, None
+
+    def wait(self, poller):
+        if hasattr(self, "socket"):
+            poller.fd_wait(self.socket, ovs.poller.POLLIN)
+        else:
+            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
+
+    def __del__(self):
+        # Don't delete the file: we might have forked.
+        if hasattr(self, "socket"):
+            self.socket.close()
+
+
+def usage(name):
+    return """
+Active %s connection methods:
+  unix:FILE               Unix domain socket named FILE
+  tcp:IP:PORT             TCP socket to IP with port no of PORT
+
+Passive %s connection methods:
+  punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
+
+
+class UnixStream(Stream):
+    @staticmethod
+    def _open(suffix, dscp):
+        connect_path = suffix
+        return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+                                                True, None, connect_path)
+Stream.register_method("unix", UnixStream)
+
+
+class TCPStream(Stream):
+    @staticmethod
+    def _open(suffix, dscp):
+        error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM,
+                                                       suffix, 0, dscp)
+        if not error:
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        return error, sock
+Stream.register_method("tcp", TCPStream)
diff --git a/python/ovs/unixctl/client.py b/python/ovs/unixctl/client.py
index fde674e..ede4855 100644
--- a/python/ovs/unixctl/client.py
+++ b/python/ovs/unixctl/client.py
@@ -13,12 +13,16 @@ 
 # limitations under the License.
 
 import os
+import sys
 
 import six
 
 import ovs.jsonrpc
-import ovs.stream_unix as ovs_stream
 import ovs.util
+if sys.platform == "win32":
+    import ovs.stream_windows as ovs_stream
+else:
+    import ovs.stream_unix as ovs_stream
 
 
 vlog = ovs.vlog.Vlog("unixctl_client")
diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
index 50a11d4..d457a2c 100644
--- a/python/ovs/unixctl/server.py
+++ b/python/ovs/unixctl/server.py
@@ -22,7 +22,10 @@  from six.moves import range
 
 import ovs.dirs
 import ovs.jsonrpc
-import ovs.stream_unix as ovs_stream
+if sys.platform == "win32":
+    import ovs.stream_windows as ovs_stream
+else:
+    import ovs.stream_unix as ovs_stream
 import ovs.unixctl
 import ovs.util
 import ovs.version
@@ -148,6 +151,8 @@  class UnixctlServer(object):
     def run(self):
         for _ in range(10):
             error, stream = self._listener.accept()
+            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
+                error = errno.EAGAIN
             if not error:
                 rpc = ovs.jsonrpc.Connection(stream)
                 self._conns.append(UnixctlConnection(rpc))
@@ -155,8 +160,8 @@  class UnixctlServer(object):
                 break
             else:
                 # XXX: rate-limit
-                vlog.warn("%s: accept failed: %s" % (self._listener.name,
-                                                     os.strerror(error)))
+                vlog.warn("%s: accept failed: %s %d"
+                          % (self._listener.name, os.strerror(error), error))
 
         for conn in copy.copy(self._conns):
             error = conn.run()
diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
index a6c387a..8d9010d 100644
--- a/tests/test-jsonrpc.py
+++ b/tests/test-jsonrpc.py
@@ -23,7 +23,10 @@  import ovs.daemon
 import ovs.json
 import ovs.jsonrpc
 import ovs.poller
-import ovs.stream_unix as ovs_stream
+if sys.platform == "win32":
+    import ovs.stream_windows as ovs_stream
+else:
+    import ovs.stream_unix as ovs_stream
 
 
 def handle_rpc(rpc, msg):
@@ -53,14 +56,14 @@  def handle_rpc(rpc, msg):
 
 
 def do_listen(name):
+    ovs.daemon.daemonize()
+
     error, pstream = ovs_stream.PassiveStream.open(name)
     if error:
         sys.stderr.write("could not listen on \"%s\": %s\n"
                          % (name, os.strerror(error)))
         sys.exit(1)
 
-    ovs.daemon.daemonize()
-
     rpcs = []
     done = False
     while True:
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index e4e3395..9f6ef49 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -30,6 +30,10 @@  import ovs.poller
 import ovs.util
 from ovs.fatal_signal import signal_alarm
 import six
+if sys.platform == "win32":
+    import ovs.stream_windows as ovs_stream
+else:
+    import ovs.stream_unix as ovs_stream
 
 
 def unbox_json(json):
@@ -534,8 +538,8 @@  def do_idl(schema_file, remote, *commands):
     idl = ovs.db.idl.Idl(remote, schema_helper)
 
     if commands:
-        error, stream = ovs.stream.Stream.open_block(
-            ovs.stream.Stream.open(remote))
+        error, stream = ovs_stream.Stream.open_block(
+            ovs_stream.Stream.open(remote))
         if error:
             sys.stderr.write("failed to connect to \"%s\"" % remote)
             sys.exit(1)