[ovs-dev,V3,2/5] Python tests: Ported UNIX sockets to Windows
diff mbox

Message ID CAM_3v9LB_vjTHveHRLX=za7y6wuNNxnvhqbs1_M8F1G-rRrZtQ@mail.gmail.com
State Not Applicable
Headers show

Commit Message

Gurucharan Shetty Jan. 3, 2017, 6:16 p.m. UTC
On 3 January 2017 at 08:46, Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
wrote:

> From: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
>
> Unix sockets (AF_UNIX) are not supported on Windows.
> The replacement of Unix sockets on Windows is implemented
> using named pipes, we are trying to mimic the behaviour
> of unix sockets.
>
> Instead of using Unix sockets to communicate
> between components Named Pipes are used. This
> makes the python sockets compatible with the
> Named Pipe used in Windows applications.
>
> Signed-off-by: Paul-Daniel Boca <pboca@cloudbasesolutions.com>
> Signed-off-by: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
> Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> ---
> V2: No changes.
> V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's.
>

I intend to add the following diff:

             retval = 0


> ---
>  python/ovs/jsonrpc.py        |   6 +
>  python/ovs/poller.py         |  72 ++++++---
>  python/ovs/socket_util.py    |  31 +++-
>  python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++
> ++++++++++---
>  python/ovs/unixctl/server.py |   4 +
>  tests/test-jsonrpc.py        |  16 +-
>  6 files changed, 436 insertions(+), 44 deletions(-)
>
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> index 6300c67..5a11500 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -14,6 +14,7 @@
>
>  import errno
>  import os
> +import sys
>
>  import six
>
> @@ -274,6 +275,11 @@ class Connection(object):
>                      except UnicodeError:
>                          error = errno.EILSEQ
>                  if error:
> +                    if (sys.platform == "win32" and
> +                            error == errno.WSAEWOULDBLOCK):
> +                        # WSAEWOULDBLOCK would be the equivalent on
> Windows
> +                        # for EAGAIN on Unix.
> +                        error = errno.EAGAIN
>                      if error == errno.EAGAIN:
>                          return error, None
>                      else:
> diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> index d7cb7d3..d836483 100644
> --- a/python/ovs/poller.py
> +++ b/python/ovs/poller.py
> @@ -18,6 +18,10 @@ import ovs.vlog
>  import select
>  import socket
>  import os
> +import sys
> +
> +if sys.platform == "win32":
> +    import ovs.winutils as winutils
>
>  try:
>      from OpenSSL import SSL
> @@ -62,7 +66,9 @@ class _SelectSelect(object):
>          if SSL and isinstance(fd, SSL.Connection):
>              fd = fd.fileno()
>
> -        assert isinstance(fd, int)
> +        if sys.platform != 'win32':
> +            # Skip this on Windows, it also register events
> +            assert isinstance(fd, int)
>          if events & POLLIN:
>              self.rlist.append(fd)
>              events &= ~POLLIN
> @@ -73,28 +79,58 @@ class _SelectSelect(object):
>              self.xlist.append(fd)
>
>      def poll(self, timeout):
> -        if timeout == -1:
> -            # epoll uses -1 for infinite timeout, select uses None.
> -            timeout = None
> -        else:
> -            timeout = float(timeout) / 1000
>          # XXX workaround a bug in eventlet
>          # see https://github.com/eventlet/eventlet/pull/25
>          if timeout == 0 and _using_eventlet_green_select():
>              timeout = 0.1
> +        if sys.platform == 'win32':
> +            events = self.rlist + self.wlist + self.xlist
> +            if not events:
> +                return []
> +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> +                raise WindowsError("Cannot handle more than maximum wait"
> +                                   "objects\n")
> +
> +            # win32event.INFINITE timeout is -1
> +            # timeout must be an int number, expressed in ms
> +            if timeout == 0.1:
> +                timeout = 100
> +            else:
> +                timeout = int(timeout)
> +
> +            # Wait until any of the events is set to signaled
> +            try:
> +                retval = winutils.win32event.WaitForMultipleObjects(
> +                    events,
> +                    False,  # Wait all
> +                    timeout)
> +            except winutils.pywintypes.error:
> +                    return [(0, POLLERR)]
>
> -        rlist, wlist, xlist = select.select(self.rlist, self.wlist,
> self.xlist,
> -                                            timeout)
> -        events_dict = {}
> -        for fd in rlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> -        for fd in wlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> -        for fd in xlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> -                                                        POLLHUP |
> -                                                        POLLNVAL)
> -        return list(events_dict.items())
> +            if retval == winutils.winerror.WAIT_TIMEOUT:
> +                return []
> +
> +            return [(events[retval], 0)]
> +        else:
> +            if timeout == -1:
> +                # epoll uses -1 for infinite timeout, select uses None.
> +                timeout = None
> +            else:
> +                timeout = float(timeout) / 1000
> +            rlist, wlist, xlist = select.select(self.rlist,
> +                                                self.wlist,
> +                                                self.xlist,
> +                                                timeout)
> +            events_dict = {}
> +            for fd in rlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> +            for fd in wlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> +            for fd in xlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> +                                                            POLLHUP |
> +                                                            POLLNVAL)
> +            return list(events_dict.items())
>
>
>  SelectPoll = _SelectSelect
> diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> index b358b05..fb6cee4 100644
> --- a/python/ovs/socket_util.py
> +++ b/python/ovs/socket_util.py
> @@ -17,6 +17,7 @@ import os
>  import os.path
>  import random
>  import socket
> +import sys
>
>  import six
>  from six.moves import range
> @@ -25,6 +26,10 @@ import ovs.fatal_signal
>  import ovs.poller
>  import ovs.vlog
>
> +if sys.platform == 'win32':
> +    import ovs.winutils as winutils
> +    import win32file
> +
>  vlog = ovs.vlog.Vlog("socket_util")
>
>
> @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path,
> connect_path, short=False):
>
>  def check_connection_completion(sock):
>      p = ovs.poller.SelectPoll()
> -    p.register(sock, ovs.poller.POLLOUT)
> +    if sys.platform == "win32":
> +        event = winutils.get_new_event(None, False, True, None)
> +        # Receive notification of readiness for writing, of completed
> +        # connection or multipoint join operation, and of socket closure.
> +        win32file.WSAEventSelect(sock, event,
> +                                 win32file.FD_WRITE |
> +                                 win32file.FD_CONNECT |
> +                                 win32file.FD_CLOSE)
> +        p.register(event, ovs.poller.POLLOUT)
> +    else:
> +        p.register(sock, ovs.poller.POLLOUT)
>      pfds = p.poll(0)
>      if len(pfds) == 1:
>          revents = pfds[0][1]
> @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port,
> dscp):
>          try:
>              sock.connect(address)
>          except socket.error as e:
> -            if get_exception_errno(e) != errno.EINPROGRESS:
> +            error = get_exception_errno(e)
> +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                # WSAEWOULDBLOCK would be the equivalent on Windows
> +                # for EINPROGRESS on Unix.
> +                error = errno.EINPROGRESS
> +            if error != errno.EINPROGRESS:
>                  raise
>          return 0, sock
>      except socket.error as e:
> @@ -257,9 +277,12 @@ def get_null_fd():
>      global null_fd
>      if null_fd < 0:
>          try:
> -            null_fd = os.open("/dev/null", os.O_RDWR)
> +            # os.devnull ensures compatibility with Windows, returns
> +            # '/dev/null' for Unix and 'nul' for Windows
> +            null_fd = os.open(os.devnull, os.O_RDWR)
>          except OSError as e:
> -            vlog.err("could not open /dev/null: %s" %
> os.strerror(e.errno))
> +            vlog.err("could not open %s: %s" % (os.devnull,
> +                                                os.strerror(e.errno)))
>              return -e.errno
>      return null_fd
>
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> index cd57eb3..e8e5700 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -15,6 +15,7 @@
>  import errno
>  import os
>  import socket
> +import sys
>
>  import six
>
> @@ -27,6 +28,13 @@ try:
>  except ImportError:
>      SSL = None
>
> +if sys.platform == 'win32':
> +    import ovs.winutils as winutils
> +    import pywintypes
> +    import win32event
> +    import win32file
> +    import win32pipe
> +
>  vlog = ovs.vlog.Vlog("stream")
>
>
> @@ -63,6 +71,13 @@ class Stream(object):
>      _SSL_certificate_file = None
>      _SSL_ca_cert_file = None
>
> +    # Windows only
> +    _write = None                # overlapped for write operation
> +    _read = None                 # overlapped for read operation
> +    _write_pending = False
> +    _read_pending = False
> +    _retry_connect = False
> +
>      @staticmethod
>      def register_method(method, cls):
>          Stream._SOCKET_METHODS[method + ":"] = cls
> @@ -81,8 +96,23 @@ class Stream(object):
>          otherwise False."""
>          return bool(Stream._find_method(name))
>
> -    def __init__(self, socket, name, status):
> +    def __init__(self, socket, name, status, pipe=None, is_server=False):
>          self.socket = socket
> +        self.pipe = pipe
> +        if sys.platform == 'win32':
> +            self._read = pywintypes.OVERLAPPED()
> +            self._read.hEvent = winutils.get_new_event()
> +            self._write = pywintypes.OVERLAPPED()
> +            self._write.hEvent = winutils.get_new_event()
> +            if pipe is not None:
> +                # Flag to check if fd is a server HANDLE.  In the case of
> a
> +                # server handle we have to issue a disconnect before
> closing
> +                # the actual handle.
> +                self._server = is_server
> +                suffix = name.split(":", 1)[1]
> +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +                self._pipename = winutils.get_pipe_name(suffix)
> +
>          self.name = name
>          if status == errno.EAGAIN:
>              self.state = Stream.__S_CONNECTING
> @@ -120,6 +150,38 @@ class Stream(object):
>          suffix = name.split(":", 1)[1]
>          if name.startswith("unix:"):
>              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            if sys.platform == 'win32':
> +                pipename = winutils.get_pipe_name(suffix)
> +
> +                if len(suffix) > 255:
> +                    # Return invalid argument if the name is too long
> +                    return errno.ENOENT, None
> +
> +                try:
> +                    # In case of "unix:" argument, the assumption is that
> +                    # there is a file created in the path (suffix).
> +                    open(suffix, 'r').close()
> +                except:
> +                    return errno.ENOENT, None
> +
> +                try:
> +                    npipe = winutils.create_file(pipename)
> +                    try:
> +                        winutils.set_pipe_mode(npipe,
> +
>  win32pipe.PIPE_READMODE_BYTE)
> +                    except pywintypes.error as e:
> +                        return errno.ENOENT, None
> +                except pywintypes.error as e:
> +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> +                        # Pipe is busy, set the retry flag to true and
> retry
> +                        # again during the connect function.
> +                        Stream.retry_connect = True
> +                        return 0, cls(None, name, errno.EAGAIN,
> +                                      pipe=win32file.INVALID_HANDLE_
> VALUE,
> +                                      is_server=False)
> +                    return errno.ENOENT, None
> +                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
> +
>          error, sock = cls._open(suffix, dscp)
>          if error:
>              return error, None
> @@ -145,6 +207,10 @@ class Stream(object):
>          if not error:
>              while True:
>                  error = stream.connect()
> +                if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> +                    # for EAGAIN on Unix.
> +                    error = errno.EAGAIN
>                  if error != errno.EAGAIN:
>                      break
>                  stream.run()
> @@ -152,7 +218,8 @@ class Stream(object):
>                  stream.run_wait(poller)
>                  stream.connect_wait(poller)
>                  poller.block()
> -            assert error != errno.EINPROGRESS
> +            if stream.socket is not None:
> +                assert error != errno.EINPROGRESS
>
>          if error and stream:
>              stream.close()
> @@ -160,11 +227,35 @@ class Stream(object):
>          return error, stream
>
>      def close(self):
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            if self._server:
> +                win32pipe.DisconnectNamedPipe(self.pipe)
> +            winutils.close_handle(self.pipe, vlog.warn)
> +            winutils.close_handle(self._read.hEvent, vlog.warn)
> +            winutils.close_handle(self._write.hEvent, vlog.warn)
>
>      def __scs_connecting(self):
> -        retval = ovs.socket_util.check_connection_completion(self.socket)
> -        assert retval != errno.EINPROGRESS
> +        if self.socket is not None:
> +            retval = ovs.socket_util.check_connection_completion(self.
> socket)
> +            assert retval != errno.EINPROGRESS
> +        elif sys.platform == 'win32' and self.retry_connect:
> +            try:
> +                self.pipe = winutils.create_file(self._pipename)
> +                self._retry_connect = False
> +                retval = 0
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> +                    retval = errno.EAGAIN
> +                else:
> +                    self._retry_connect = False
> +                    retval = errno.ENOENT
> +        else:
> +            # Windows only, if retry_connect is false, it means it's
> already
> +            # connected so we can set the value of retval to 0
> +            retval = 0
> +
>          if retval == 0:
>              self.state = Stream.__S_CONNECTED
>          elif retval != errno.EAGAIN:
> @@ -209,11 +300,63 @@ class Stream(object):
>          elif n == 0:
>              return (0, "")
>
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__recv_windows(n)
> +
>          try:
>              return (0, self.socket.recv(n))
>          except socket.error as e:
>              return (ovs.socket_util.get_exception_errno(e), "")
>
> +    def __recv_windows(self, n):
> +        if self._read_pending:
> +            try:
> +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> +                                                            self._read,
> +                                                            False)
> +                self._read_pending = False
> +                recvBuffer = self._read_buffer[:nBytesRead]
> +
> +                return (0, winutils.get_decoded_buffer(recvBuffer))
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self._read_pending = True
> +                    return (errno.EAGAIN, "")
> +                elif e.winerror in winutils.pipe_disconnected_errors:
> +                    # If the pipe was disconnected, return 0.
> +                    return (0, "")
> +                else:
> +                    return (errno.EINVAL, "")
> +
> +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> +                                                          n,
> +                                                          self._read)
> +        if errCode:
> +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> +                self._read_pending = True
> +                return (errno.EAGAIN, "")
> +            elif errCode in winutils.pipe_disconnected_errors:
> +                # If the pipe was disconnected, return 0.
> +                return (0, "")
> +            else:
> +                return (errCode, "")
> +
> +        try:
> +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> +                                                        self._read,
> +                                                        False)
> +            winutils.win32event.SetEvent(self._read.hEvent)
> +        except pywintypes.error as e:
> +            if e.winerror in winutils.pipe_disconnected_errors:
> +                # If the pipe was disconnected, return 0.
> +                return (0, "")
> +            else:
> +                return (e.winerror, "")
> +
> +        recvBuffer = self._read_buffer[:nBytesRead]
> +        return (0, winutils.get_decoded_buffer(recvBuffer))
> +
>      def send(self, buf):
>          """Tries to send 'buf' on this stream.
>
> @@ -231,6 +374,9 @@ class Stream(object):
>          elif len(buf) == 0:
>              return 0
>
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__send_windows(buf)
> +
>          try:
>              # Python 3 has separate types for strings and bytes.  We must
> have
>              # bytes here.
> @@ -240,6 +386,40 @@ class Stream(object):
>          except socket.error as e:
>              return -ovs.socket_util.get_exception_errno(e)
>
> +    def __send_windows(self, buf):
> +        if self._write_pending:
> +            try:
> +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> +
>  self._write,
> +                                                               False)
> +                self._write_pending = False
> +                return nBytesWritten
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self._read_pending = True
> +                    return -errno.EAGAIN
> +                elif e.winerror in winutils.pipe_disconnected_errors:
> +                    # If the pipe was disconnected, return connection
> reset.
> +                    return -errno.ECONNRESET
> +                else:
> +                    return -errno.EINVAL
> +
> +        buf = winutils.get_encoded_buffer(buf)
> +        self._write_pending = False
> +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> +                                                       buf,
> +                                                       self._write)
> +        if errCode:
> +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> +                self._write_pending = True
> +                return -errno.EAGAIN
> +            if (not nBytesWritten and
> +                    errCode in winutils.pipe_disconnected_errors):
> +                # If the pipe was disconnected, return connection reset.
> +                return -errno.ECONNRESET
> +        return nBytesWritten
> +
>      def run(self):
>          pass
>
> @@ -255,11 +435,52 @@ class Stream(object):
>
>          if self.state == Stream.__S_CONNECTING:
>              wait = Stream.W_CONNECT
> +
> +        if sys.platform == 'win32':
> +            self.__wait_windows(poller, wait)
> +            return
> +
>          if wait == Stream.W_RECV:
>              poller.fd_wait(self.socket, ovs.poller.POLLIN)
>          else:
>              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
>
> +    def __wait_windows(self, poller, wait):
> +        if self.socket is not None:
> +            if wait == Stream.W_RECV:
> +                read_flags = (win32file.FD_READ |
> +                              win32file.FD_ACCEPT |
> +                              win32file.FD_CLOSE)
> +                try:
> +                    win32file.WSAEventSelect(self.socket,
> +                                             self._read.hEvent,
> +                                             read_flags)
> +                except pywintypes.error as e:
> +                    vlog.err("failed to associate events with socket: %s"
> +                             % e.strerror)
> +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> +            else:
> +                write_flags = (win32file.FD_WRITE |
> +                               win32file.FD_CONNECT |
> +                               win32file.FD_CLOSE)
> +                try:
> +                    win32file.WSAEventSelect(self.socket,
> +                                             self._write.hEvent,
> +                                             write_flags)
> +                except pywintypes.error as e:
> +                    vlog.err("failed to associate events with socket: %s"
> +                             % e.strerror)
> +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> +        else:
> +            if wait == Stream.W_RECV:
> +                if self._read:
> +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> +            elif wait == Stream.W_SEND:
> +                if self._write:
> +                    poller.fd_wait(self._write.hEvent,
> ovs.poller.POLLOUT)
> +            elif wait == Stream.W_CONNECT:
> +                return
> +
>      def connect_wait(self, poller):
>          self.wait(poller, Stream.W_CONNECT)
>
> @@ -267,11 +488,22 @@ class Stream(object):
>          self.wait(poller, Stream.W_RECV)
>
>      def send_wait(self, poller):
> +        if sys.platform == 'win32':
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
>          self.wait(poller, Stream.W_SEND)
>
>      def __del__(self):
>          # Don't delete the file: we might have forked.
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            # Check if there are any remaining valid handles and close
> them
> +            if self.pipe:
> +                winutils.close_handle(self.pipe)
> +            if self._read.hEvent:
> +                winutils.close_handle(self._read.hEvent)
> +            if self._write.hEvent:
> +                winutils.close_handle(self._write.hEvent)
>
>      @staticmethod
>      def ssl_set_private_key_file(file_name):
> @@ -287,6 +519,10 @@ class Stream(object):
>
>
>  class PassiveStream(object):
> +    # Windows only
> +    connect = None                  # overlapped for read operation
> +    connect_pending = False
> +
>      @staticmethod
>      def is_valid_name(name):
>          """Returns True if 'name' is a passive stream name in the form
> @@ -294,9 +530,18 @@ class PassiveStream(object):
>          "punix:" or "ptcp"), otherwise False."""
>          return name.startswith("punix:") | name.startswith("ptcp:")
>
> -    def __init__(self, sock, name, bind_path):
> +    def __init__(self, sock, name, bind_path, pipe=None):
>          self.name = name
> +        self.pipe = pipe
>          self.socket = sock
> +        if pipe is not None:
> +            self.connect = pywintypes.OVERLAPPED()
> +            self.connect.hEvent = winutils.get_new_event(
> bManualReset=True)
> +            self.connect_pending = False
> +            suffix = name.split(":", 1)[1]
> +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            self._pipename = winutils.get_pipe_name(suffix)
> +
>          self.bind_path = bind_path
>
>      @staticmethod
> @@ -315,11 +560,27 @@ class PassiveStream(object):
>          bind_path = name[6:]
>          if name.startswith("punix:"):
>              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> bind_path)
> -            error, sock = ovs.socket_util.make_unix_
> socket(socket.SOCK_STREAM,
> -                                                           True,
> bind_path,
> -                                                           None)
> -            if error:
> -                return error, None
> +            if sys.platform != 'win32':
> +                error, sock = ovs.socket_util.make_unix_socket(
> +                    socket.SOCK_STREAM, True, bind_path, None)
> +                if error:
> +                    return error, None
> +            else:
> +                # Branch used only on Windows
> +                try:
> +                    open(bind_path, 'w').close()
> +                except:
> +                    return errno.ENOENT, None
> +
> +                pipename = winutils.get_pipe_name(bind_path)
> +                if len(pipename) > 255:
> +                    # Return invalid argument if the name is too long
> +                    return errno.ENOENT, None
> +
> +                npipe = winutils.create_named_pipe(pipename)
> +                if not npipe:
> +                    return errno.ENOENT, None
> +                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
>
>          elif name.startswith("ptcp:"):
>              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> @@ -341,7 +602,11 @@ class PassiveStream(object):
>
>      def close(self):
>          """Closes this PassiveStream."""
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            winutils.close_handle(self.pipe, vlog.warn)
> +            winutils.close_handle(self.connect.hEvent, vlog.warn)
>          if self.bind_path is not None:
>              ovs.fatal_signal.unlink_file_now(self.bind_path)
>              self.bind_path = None
> @@ -354,28 +619,80 @@ class PassiveStream(object):
>
>          Will not block waiting for a connection.  If no connection is
> ready to
>          be accepted, returns (errno.EAGAIN, None) immediately."""
> -
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__accept_windows()
>          while True:
>              try:
>                  sock, addr = self.socket.accept()
>                  ovs.socket_util.set_nonblocking(sock)
> -                if (sock.family == socket.AF_UNIX):
> +                if (sys.platform != 'win32' and sock.family ==
> socket.AF_UNIX):
>                      return 0, Stream(sock, "unix:%s" % addr, 0)
>                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
>                                                         str(addr[1])), 0)
>              except socket.error as e:
>                  error = ovs.socket_util.get_exception_errno(e)
> +                if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> +                    # for EAGAIN on Unix.
> +                    error = errno.EAGAIN
>                  if error != errno.EAGAIN:
>                      # XXX rate-limit
>                      vlog.dbg("accept: %s" % os.strerror(error))
>                  return error, None
>
> +    def __accept_windows(self):
> +        if self.connect_pending:
> +            try:
> +                winutils.get_overlapped_result(self.pipe, self.connect,
> False)
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self.connect_pending = True
> +                    return errno.EAGAIN, None
> +                else:
> +                    if self.pipe:
> +                        win32pipe.DisconnectNamedPipe(self.pipe)
> +                    return errno.EINVAL, None
> +            self.connect_pending = False
> +
> +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> +        if error:
> +            if error == winutils.winerror.ERROR_IO_PENDING:
> +                self.connect_pending = True
> +                return errno.EAGAIN, None
> +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> +                if self.pipe:
> +                    win32pipe.DisconnectNamedPipe(self.pipe)
> +                self.connect_pending = False
> +                return errno.EINVAL, None
> +            else:
> +                win32event.SetEvent(self.connect.hEvent)
> +
> +        npipe = winutils.create_named_pipe(self._pipename)
> +        if not npipe:
> +            return errno.ENOENT, None
> +
> +        old_pipe = self.pipe
> +        self.pipe = npipe
> +        winutils.win32event.ResetEvent(self.connect.hEvent)
> +        return 0, Stream(None, self.name, 0, pipe=old_pipe)
> +
>      def wait(self, poller):
> -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        if sys.platform != 'win32' or self.socket is not None:
> +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        else:
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
>
>      def __del__(self):
>          # Don't delete the file: we might have forked.
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            # Check if there are any remaining valid handles and close
> them
> +            if self.pipe:
> +                winutils.close_handle(self.pipe)
> +            if self._connect.hEvent:
> +                winutils.close_handle(self._read.hEvent)
>
>
>  def usage(name):
> diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> index 8595ed8..3f3e051 100644
> --- a/python/ovs/unixctl/server.py
> +++ b/python/ovs/unixctl/server.py
> @@ -148,6 +148,10 @@ class UnixctlServer(object):
>      def run(self):
>          for _ in range(10):
>              error, stream = self._listener.accept()
> +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> +                # WSAEWOULDBLOCK would be the equivalent on Windows
> +                # for EAGAIN on Unix.
> +                error = errno.EAGAIN
>              if not error:
>                  rpc = ovs.jsonrpc.Connection(stream)
>                  self._conns.append(UnixctlConnection(rpc))
> diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> index 18634e6..3eabcd7 100644
> --- a/tests/test-jsonrpc.py
> +++ b/tests/test-jsonrpc.py
> @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
>
>
>  def do_listen(name):
> -    error, pstream = ovs.stream.PassiveStream.open(name)
> -    if error:
> -        sys.stderr.write("could not listen on \"%s\": %s\n"
> -                         % (name, os.strerror(error)))
> -        sys.exit(1)
> +    if sys.platform != 'win32' or (
> +            ovs.daemon._detach and ovs.daemon._detached):
> +        # On Windows the child is a new process created which should be
> the
> +        # one that creates the PassiveStream. Without this check, the new
> +        # child process will create a new PassiveStream overwriting the
> one
> +        # that the parent process created.
> +        error, pstream = ovs.stream.PassiveStream.open(name)
> +        if error:
> +            sys.stderr.write("could not listen on \"%s\": %s\n"
> +                             % (name, os.strerror(error)))
> +            sys.exit(1)
>
>      ovs.daemon.daemonize()
>
> --
> 2.10.0.windows.1
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>

Comments

Alin Balutoiu Jan. 3, 2017, 7:03 p.m. UTC | #1
Thanks for the comment.
The socket attribute of the class Stream cannot be None if the code runs on Unix.
Therefore the condition "self.socket is not None" is True only if sockets are not used,
and named pipes are used instead (i.e. it runs on Windows).

However, if you prefer I can replace it with the following code:
    if self.socket is not None:
        retval = ovs.socket_util.check_connection_completion(self.socket)
        assert retval != errno.EINPROGRESS
    elif sys.platform == 'win32':
        if self.retry_connect:
            try:
                self.pipe = winutils.create_file(self._pipename)
                self._retry_connect = False
                retval = 0
            except pywintypes.error as e:
                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
                    retval = errno.EAGAIN
                else:
                    self._retry_connect = False
                    retval = errno.ENOENT
        else:
            # If retry_connect is false, it means it's already
            # connected so we can set the value of retval to 0
            retval = 0


> From: Guru Shetty [mailto:guru@ovn.org] 
> Sent: Tuesday, January 3, 2017 8:17 PM
> To: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
> Cc: dev@openvswitch.org
> Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to Windows
> 
> 
> 
> On 3 January 2017 at 08:46, Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com> wrote:
> From: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> 
> Unix sockets (AF_UNIX) are not supported on Windows.
> The replacement of Unix sockets on Windows is implemented
> using named pipes, we are trying to mimic the behaviour
> of unix sockets.
> 
> Instead of using Unix sockets to communicate
> between components Named Pipes are used. This
> makes the python sockets compatible with the
> Named Pipe used in Windows applications.
> 
> Signed-off-by: Paul-Daniel Boca <mailto:pboca@cloudbasesolutions.com>
> Signed-off-by: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> ---
> V2: No changes.
> V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's.
> 
> I intend to add the following diff:
> 
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> index e8e5700..af35afd 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -251,7 +251,7 @@ class Stream(object):
>                  else:
>                      self._retry_connect = False
>                      retval = errno.ENOENT
> -        else:
> +        elif sys.platform == 'win32':
>              # Windows only, if retry_connect is false, it means it's already
>              # connected so we can set the value of retval to 0
>              retval = 0
> 
>  
> ---
>  python/ovs/jsonrpc.py        |   6 +
>  python/ovs/poller.py         |  72 ++++++---
>  python/ovs/socket_util.py    |  31 +++-
>  python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++++++++++++---
>  python/ovs/unixctl/server.py |   4 +
>  tests/test-jsonrpc.py        |  16 +-
>  6 files changed, 436 insertions(+), 44 deletions(-)
> 
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> index 6300c67..5a11500 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -14,6 +14,7 @@
> 
>  import errno
>  import os
> +import sys
> 
>  import six
> 
> @@ -274,6 +275,11 @@ class Connection(object):
>                      except UnicodeError:
>                          error = errno.EILSEQ
>                  if error:
> +                    if (sys.platform == "win32" and
> +                            error == errno.WSAEWOULDBLOCK):
> +                        # WSAEWOULDBLOCK would be the equivalent on Windows
> +                        # for EAGAIN on Unix.
> +                        error = errno.EAGAIN
>                      if error == errno.EAGAIN:
>                          return error, None
>                      else:
> diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> index d7cb7d3..d836483 100644
> --- a/python/ovs/poller.py
> +++ b/python/ovs/poller.py
> @@ -18,6 +18,10 @@ import ovs.vlog
>  import select
>  import socket
>  import os
> +import sys
> +
> +if sys.platform == "win32":
> +    import ovs.winutils as winutils
> 
>  try:
>      from OpenSSL import SSL
> @@ -62,7 +66,9 @@ class _SelectSelect(object):
>          if SSL and isinstance(fd, SSL.Connection):
>              fd = fd.fileno()
> 
> -        assert isinstance(fd, int)
> +        if sys.platform != 'win32':
> +            # Skip this on Windows, it also register events
> +            assert isinstance(fd, int)
>          if events & POLLIN:
>              self.rlist.append(fd)
>              events &= ~POLLIN
> @@ -73,28 +79,58 @@ class _SelectSelect(object):
>              self.xlist.append(fd)
> 
>      def poll(self, timeout):
> -        if timeout == -1:
> -            # epoll uses -1 for infinite timeout, select uses None.
> -            timeout = None
> -        else:
> -            timeout = float(timeout) / 1000
>          # XXX workaround a bug in eventlet
>          # see https://github.com/eventlet/eventlet/pull/25
>          if timeout == 0 and _using_eventlet_green_select():
>              timeout = 0.1
> +        if sys.platform == 'win32':
> +            events = self.rlist + self.wlist + self.xlist
> +            if not events:
> +                return []
> +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> +                raise WindowsError("Cannot handle more than maximum wait"
> +                                   "objects\n")
> +
> +            # win32event.INFINITE timeout is -1
> +            # timeout must be an int number, expressed in ms
> +            if timeout == 0.1:
> +                timeout = 100
> +            else:
> +                timeout = int(timeout)
> +
> +            # Wait until any of the events is set to signaled
> +            try:
> +                retval = winutils.win32event.WaitForMultipleObjects(
> +                    events,
> +                    False,  # Wait all
> +                    timeout)
> +            except winutils.pywintypes.error:
> +                    return [(0, POLLERR)]
> 
> -        rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
> -                                            timeout)
> -        events_dict = {}
> -        for fd in rlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> -        for fd in wlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> -        for fd in xlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> -                                                        POLLHUP |
> -                                                        POLLNVAL)
> -        return list(events_dict.items())
> +            if retval == winutils.winerror.WAIT_TIMEOUT:
> +                return []
> +
> +            return [(events[retval], 0)]
> +        else:
> +            if timeout == -1:
> +                # epoll uses -1 for infinite timeout, select uses None.
> +                timeout = None
> +            else:
> +                timeout = float(timeout) / 1000
> +            rlist, wlist, xlist = select.select(self.rlist,
> +                                                self.wlist,
> +                                                self.xlist,
> +                                                timeout)
> +            events_dict = {}
> +            for fd in rlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> +            for fd in wlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> +            for fd in xlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> +                                                            POLLHUP |
> +                                                            POLLNVAL)
> +            return list(events_dict.items())
> 
> 
>  SelectPoll = _SelectSelect
> diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> index b358b05..fb6cee4 100644
> --- a/python/ovs/socket_util.py
> +++ b/python/ovs/socket_util.py
> @@ -17,6 +17,7 @@ import os
>  import os.path
>  import random
>  import socket
> +import sys
> 
>  import six
>  from six.moves import range
> @@ -25,6 +26,10 @@ import ovs.fatal_signal
>  import ovs.poller
>  import ovs.vlog
> 
> +if sys.platform == 'win32':
> +    import ovs.winutils as winutils
> +    import win32file
> +
>  vlog = ovs.vlog.Vlog("socket_util")
> 
> 
> @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False):
> 
>  def check_connection_completion(sock):
>      p = ovs.poller.SelectPoll()
> -    p.register(sock, ovs.poller.POLLOUT)
> +    if sys.platform == "win32":
> +        event = winutils.get_new_event(None, False, True, None)
> +        # Receive notification of readiness for writing, of completed
> +        # connection or multipoint join operation, and of socket closure.
> +        win32file.WSAEventSelect(sock, event,
> +                                 win32file.FD_WRITE |
> +                                 win32file.FD_CONNECT |
> +                                 win32file.FD_CLOSE)
> +        p.register(event, ovs.poller.POLLOUT)
> +    else:
> +        p.register(sock, ovs.poller.POLLOUT)
>      pfds = p.poll(0)
>      if len(pfds) == 1:
>          revents = pfds[0][1]
> @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, dscp):
>          try:
>              sock.connect(address)
>          except socket.error as e:
> -            if get_exception_errno(e) != errno.EINPROGRESS:
> +            error = get_exception_errno(e)
> +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                # WSAEWOULDBLOCK would be the equivalent on Windows
> +                # for EINPROGRESS on Unix.
> +                error = errno.EINPROGRESS
> +            if error != errno.EINPROGRESS:
>                  raise
>          return 0, sock
>      except socket.error as e:
> @@ -257,9 +277,12 @@ def get_null_fd():
>      global null_fd
>      if null_fd < 0:
>          try:
> -            null_fd = os.open("/dev/null", os.O_RDWR)
> +            # os.devnull ensures compatibility with Windows, returns
> +            # '/dev/null' for Unix and 'nul' for Windows
> +            null_fd = os.open(os.devnull, os.O_RDWR)
>          except OSError as e:
> -            vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
> +            vlog.err("could not open %s: %s" % (os.devnull,
> +                                                os.strerror(e.errno)))
>              return -e.errno
>      return null_fd
> 
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> index cd57eb3..e8e5700 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -15,6 +15,7 @@
>  import errno
>  import os
>  import socket
> +import sys
> 
>  import six
> 
> @@ -27,6 +28,13 @@ try:
>  except ImportError:
>      SSL = None
> 
> +if sys.platform == 'win32':
> +    import ovs.winutils as winutils
> +    import pywintypes
> +    import win32event
> +    import win32file
> +    import win32pipe
> +
>  vlog = ovs.vlog.Vlog("stream")
> 
> 
> @@ -63,6 +71,13 @@ class Stream(object):
>      _SSL_certificate_file = None
>      _SSL_ca_cert_file = None
> 
> +    # Windows only
> +    _write = None                # overlapped for write operation
> +    _read = None                 # overlapped for read operation
> +    _write_pending = False
> +    _read_pending = False
> +    _retry_connect = False
> +
>      @staticmethod
>      def register_method(method, cls):
>          Stream._SOCKET_METHODS[method + ":"] = cls
> @@ -81,8 +96,23 @@ class Stream(object):
>          otherwise False."""
>          return bool(Stream._find_method(name))
> 
> -    def __init__(self, socket, name, status):
> +    def __init__(self, socket, name, status, pipe=None, is_server=False):
>          self.socket = socket
> +        self.pipe = pipe
> +        if sys.platform == 'win32':
> +            self._read = pywintypes.OVERLAPPED()
> +            self._read.hEvent = winutils.get_new_event()
> +            self._write = pywintypes.OVERLAPPED()
> +            self._write.hEvent = winutils.get_new_event()
> +            if pipe is not None:
> +                # Flag to check if fd is a server HANDLE.  In the case of a
> +                # server handle we have to issue a disconnect before closing
> +                # the actual handle.
> +                self._server = is_server
> +                suffix = name.split(":", 1)[1]
> +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +                self._pipename = winutils.get_pipe_name(suffix)
> +
>          http://self.name = name
>          if status == errno.EAGAIN:
>              self.state = Stream.__S_CONNECTING
> @@ -120,6 +150,38 @@ class Stream(object):
>          suffix = name.split(":", 1)[1]
>          if name.startswith("unix:"):
>              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            if sys.platform == 'win32':
> +                pipename = winutils.get_pipe_name(suffix)
> +
> +                if len(suffix) > 255:
> +                    # Return invalid argument if the name is too long
> +                    return errno.ENOENT, None
> +
> +                try:
> +                    # In case of "unix:" argument, the assumption is that
> +                    # there is a file created in the path (suffix).
> +                    open(suffix, 'r').close()
> +                except:
> +                    return errno.ENOENT, None
> +
> +                try:
> +                    npipe = winutils.create_file(pipename)
> +                    try:
> +                        winutils.set_pipe_mode(npipe,
> +                                               win32pipe.PIPE_READMODE_BYTE)
> +                    except pywintypes.error as e:
> +                        return errno.ENOENT, None
> +                except pywintypes.error as e:
> +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> +                        # Pipe is busy, set the retry flag to true and retry
> +                        # again during the connect function.
> +                        Stream.retry_connect = True
> +                        return 0, cls(None, name, errno.EAGAIN,
> +                                      pipe=win32file.INVALID_HANDLE_VALUE,
> +                                      is_server=False)
> +                    return errno.ENOENT, None
> +                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
> +
>          error, sock = cls._open(suffix, dscp)
>          if error:
>              return error, None
> @@ -145,6 +207,10 @@ class Stream(object):
>          if not error:
>              while True:
>                  error = stream.connect()
> +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> +                    # for EAGAIN on Unix.
> +                    error = errno.EAGAIN
>                  if error != errno.EAGAIN:
>                      break
>                  stream.run()
> @@ -152,7 +218,8 @@ class Stream(object):
>                  stream.run_wait(poller)
>                  stream.connect_wait(poller)
>                  poller.block()
> -            assert error != errno.EINPROGRESS
> +            if stream.socket is not None:
> +                assert error != errno.EINPROGRESS
> 
>          if error and stream:
>              stream.close()
> @@ -160,11 +227,35 @@ class Stream(object):
>          return error, stream
> 
>      def close(self):
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            if self._server:
> +                win32pipe.DisconnectNamedPipe(self.pipe)
> +            winutils.close_handle(self.pipe, vlog.warn)
> +            winutils.close_handle(self._read.hEvent, vlog.warn)
> +            winutils.close_handle(self._write.hEvent, vlog.warn)
> 
>      def __scs_connecting(self):
> -        retval = ovs.socket_util.check_connection_completion(self.socket)
> -        assert retval != errno.EINPROGRESS
> +        if self.socket is not None:
> +            retval = ovs.socket_util.check_connection_completion(self.socket)
> +            assert retval != errno.EINPROGRESS
> +        elif sys.platform == 'win32' and self.retry_connect:
> +            try:
> +                self.pipe = winutils.create_file(self._pipename)
> +                self._retry_connect = False
> +                retval = 0
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> +                    retval = errno.EAGAIN
> +                else:
> +                    self._retry_connect = False
> +                    retval = errno.ENOENT
> +        else:
> +            # Windows only, if retry_connect is false, it means it's already
> +            # connected so we can set the value of retval to 0
> +            retval = 0
> +
>          if retval == 0:
>              self.state = Stream.__S_CONNECTED
>          elif retval != errno.EAGAIN:
> @@ -209,11 +300,63 @@ class Stream(object):
>          elif n == 0:
>              return (0, "")
> 
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__recv_windows(n)
> +
>          try:
>              return (0, self.socket.recv(n))
>          except socket.error as e:
>              return (ovs.socket_util.get_exception_errno(e), "")
> 
> +    def __recv_windows(self, n):
> +        if self._read_pending:
> +            try:
> +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> +                                                            self._read,
> +                                                            False)
> +                self._read_pending = False
> +                recvBuffer = self._read_buffer[:nBytesRead]
> +
> +                return (0, winutils.get_decoded_buffer(recvBuffer))
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self._read_pending = True
> +                    return (errno.EAGAIN, "")
> +                elif e.winerror in winutils.pipe_disconnected_errors:
> +                    # If the pipe was disconnected, return 0.
> +                    return (0, "")
> +                else:
> +                    return (errno.EINVAL, "")
> +
> +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> +                                                          n,
> +                                                          self._read)
> +        if errCode:
> +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> +                self._read_pending = True
> +                return (errno.EAGAIN, "")
> +            elif errCode in winutils.pipe_disconnected_errors:
> +                # If the pipe was disconnected, return 0.
> +                return (0, "")
> +            else:
> +                return (errCode, "")
> +
> +        try:
> +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> +                                                        self._read,
> +                                                        False)
> +            winutils.win32event.SetEvent(self._read.hEvent)
> +        except pywintypes.error as e:
> +            if e.winerror in winutils.pipe_disconnected_errors:
> +                # If the pipe was disconnected, return 0.
> +                return (0, "")
> +            else:
> +                return (e.winerror, "")
> +
> +        recvBuffer = self._read_buffer[:nBytesRead]
> +        return (0, winutils.get_decoded_buffer(recvBuffer))
> +
>      def send(self, buf):
>          """Tries to send 'buf' on this stream.
> 
> @@ -231,6 +374,9 @@ class Stream(object):
>          elif len(buf) == 0:
>              return 0
> 
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__send_windows(buf)
> +
>          try:
>              # Python 3 has separate types for strings and bytes.  We must have
>              # bytes here.
> @@ -240,6 +386,40 @@ class Stream(object):
>          except socket.error as e:
>              return -ovs.socket_util.get_exception_errno(e)
> 
> +    def __send_windows(self, buf):
> +        if self._write_pending:
> +            try:
> +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> +                                                               self._write,
> +                                                               False)
> +                self._write_pending = False
> +                return nBytesWritten
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self._read_pending = True
> +                    return -errno.EAGAIN
> +                elif e.winerror in winutils.pipe_disconnected_errors:
> +                    # If the pipe was disconnected, return connection reset.
> +                    return -errno.ECONNRESET
> +                else:
> +                    return -errno.EINVAL
> +
> +        buf = winutils.get_encoded_buffer(buf)
> +        self._write_pending = False
> +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> +                                                       buf,
> +                                                       self._write)
> +        if errCode:
> +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> +                self._write_pending = True
> +                return -errno.EAGAIN
> +            if (not nBytesWritten and
> +                    errCode in winutils.pipe_disconnected_errors):
> +                # If the pipe was disconnected, return connection reset.
> +                return -errno.ECONNRESET
> +        return nBytesWritten
> +
>      def run(self):
>          pass
> 
> @@ -255,11 +435,52 @@ class Stream(object):
> 
>          if self.state == Stream.__S_CONNECTING:
>              wait = Stream.W_CONNECT
> +
> +        if sys.platform == 'win32':
> +            self.__wait_windows(poller, wait)
> +            return
> +
>          if wait == Stream.W_RECV:
>              poller.fd_wait(self.socket, ovs.poller.POLLIN)
>          else:
>              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> 
> +    def __wait_windows(self, poller, wait):
> +        if self.socket is not None:
> +            if wait == Stream.W_RECV:
> +                read_flags = (win32file.FD_READ |
> +                              win32file.FD_ACCEPT |
> +                              win32file.FD_CLOSE)
> +                try:
> +                    win32file.WSAEventSelect(self.socket,
> +                                             self._read.hEvent,
> +                                             read_flags)
> +                except pywintypes.error as e:
> +                    vlog.err("failed to associate events with socket: %s"
> +                             % e.strerror)
> +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> +            else:
> +                write_flags = (win32file.FD_WRITE |
> +                               win32file.FD_CONNECT |
> +                               win32file.FD_CLOSE)
> +                try:
> +                    win32file.WSAEventSelect(self.socket,
> +                                             self._write.hEvent,
> +                                             write_flags)
> +                except pywintypes.error as e:
> +                    vlog.err("failed to associate events with socket: %s"
> +                             % e.strerror)
> +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> +        else:
> +            if wait == Stream.W_RECV:
> +                if self._read:
> +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> +            elif wait == Stream.W_SEND:
> +                if self._write:
> +                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> +            elif wait == Stream.W_CONNECT:
> +                return
> +
>      def connect_wait(self, poller):
>          self.wait(poller, Stream.W_CONNECT)
> 
> @@ -267,11 +488,22 @@ class Stream(object):
>          self.wait(poller, Stream.W_RECV)
> 
>      def send_wait(self, poller):
> +        if sys.platform == 'win32':
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
>          self.wait(poller, Stream.W_SEND)
> 
>      def __del__(self):
>          # Don't delete the file: we might have forked.
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            # Check if there are any remaining valid handles and close them
> +            if self.pipe:
> +                winutils.close_handle(self.pipe)
> +            if self._read.hEvent:
> +                winutils.close_handle(self._read.hEvent)
> +            if self._write.hEvent:
> +                winutils.close_handle(self._write.hEvent)
> 
>      @staticmethod
>      def ssl_set_private_key_file(file_name):
> @@ -287,6 +519,10 @@ class Stream(object):
> 
> 
>  class PassiveStream(object):
> +    # Windows only
> +    connect = None                  # overlapped for read operation
> +    connect_pending = False
> +
>      @staticmethod
>      def is_valid_name(name):
>          """Returns True if 'name' is a passive stream name in the form
> @@ -294,9 +530,18 @@ class PassiveStream(object):
>          "punix:" or "ptcp"), otherwise False."""
>          return name.startswith("punix:") | name.startswith("ptcp:")
> 
> -    def __init__(self, sock, name, bind_path):
> +    def __init__(self, sock, name, bind_path, pipe=None):
>          http://self.name = name
> +        self.pipe = pipe
>          self.socket = sock
> +        if pipe is not None:
> +            self.connect = pywintypes.OVERLAPPED()
> +            self.connect.hEvent = winutils.get_new_event(bManualReset=True)
> +            self.connect_pending = False
> +            suffix = name.split(":", 1)[1]
> +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            self._pipename = winutils.get_pipe_name(suffix)
> +
>          self.bind_path = bind_path
> 
>      @staticmethod
> @@ -315,11 +560,27 @@ class PassiveStream(object):
>          bind_path = name[6:]
>          if name.startswith("punix:"):
>              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> -            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> -                                                           True, bind_path,
> -                                                           None)
> -            if error:
> -                return error, None
> +            if sys.platform != 'win32':
> +                error, sock = ovs.socket_util.make_unix_socket(
> +                    socket.SOCK_STREAM, True, bind_path, None)
> +                if error:
> +                    return error, None
> +            else:
> +                # Branch used only on Windows
> +                try:
> +                    open(bind_path, 'w').close()
> +                except:
> +                    return errno.ENOENT, None
> +
> +                pipename = winutils.get_pipe_name(bind_path)
> +                if len(pipename) > 255:
> +                    # Return invalid argument if the name is too long
> +                    return errno.ENOENT, None
> +
> +                npipe = winutils.create_named_pipe(pipename)
> +                if not npipe:
> +                    return errno.ENOENT, None
> +                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
> 
>          elif name.startswith("ptcp:"):
>              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> @@ -341,7 +602,11 @@ class PassiveStream(object):
> 
>      def close(self):
>          """Closes this PassiveStream."""
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            winutils.close_handle(self.pipe, vlog.warn)
> +            winutils.close_handle(self.connect.hEvent, vlog.warn)
>          if self.bind_path is not None:
>              ovs.fatal_signal.unlink_file_now(self.bind_path)
>              self.bind_path = None
> @@ -354,28 +619,80 @@ class PassiveStream(object):
> 
>          Will not block waiting for a connection.  If no connection is ready to
>          be accepted, returns (errno.EAGAIN, None) immediately."""
> -
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__accept_windows()
>          while True:
>              try:
>                  sock, addr = self.socket.accept()
>                  ovs.socket_util.set_nonblocking(sock)
> -                if (sock.family == socket.AF_UNIX):
> +                if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
>                      return 0, Stream(sock, "unix:%s" % addr, 0)
>                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
>                                                         str(addr[1])), 0)
>              except socket.error as e:
>                  error = ovs.socket_util.get_exception_errno(e)
> +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> +                    # for EAGAIN on Unix.
> +                    error = errno.EAGAIN
>                  if error != errno.EAGAIN:
>                      # XXX rate-limit
>                      vlog.dbg("accept: %s" % os.strerror(error))
>                  return error, None
> 
> +    def __accept_windows(self):
> +        if self.connect_pending:
> +            try:
> +                winutils.get_overlapped_result(self.pipe, self.connect, False)
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self.connect_pending = True
> +                    return errno.EAGAIN, None
> +                else:
> +                    if self.pipe:
> +                        win32pipe.DisconnectNamedPipe(self.pipe)
> +                    return errno.EINVAL, None
> +            self.connect_pending = False
> +
> +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> +        if error:
> +            if error == winutils.winerror.ERROR_IO_PENDING:
> +                self.connect_pending = True
> +                return errno.EAGAIN, None
> +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> +                if self.pipe:
> +                    win32pipe.DisconnectNamedPipe(self.pipe)
> +                self.connect_pending = False
> +                return errno.EINVAL, None
> +            else:
> +                win32event.SetEvent(self.connect.hEvent)
> +
> +        npipe = winutils.create_named_pipe(self._pipename)
> +        if not npipe:
> +            return errno.ENOENT, None
> +
> +        old_pipe = self.pipe
> +        self.pipe = npipe
> +        winutils.win32event.ResetEvent(self.connect.hEvent)
> +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> +
>      def wait(self, poller):
> -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        if sys.platform != 'win32' or self.socket is not None:
> +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        else:
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> 
>      def __del__(self):
>          # Don't delete the file: we might have forked.
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            # Check if there are any remaining valid handles and close them
> +            if self.pipe:
> +                winutils.close_handle(self.pipe)
> +            if self._connect.hEvent:
> +                winutils.close_handle(self._read.hEvent)
> 
> 
>  def usage(name):
> diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> index 8595ed8..3f3e051 100644
> --- a/python/ovs/unixctl/server.py
> +++ b/python/ovs/unixctl/server.py
> @@ -148,6 +148,10 @@ class UnixctlServer(object):
>      def run(self):
>          for _ in range(10):
>              error, stream = self._listener.accept()
> +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> +                # WSAEWOULDBLOCK would be the equivalent on Windows
> +                # for EAGAIN on Unix.
> +                error = errno.EAGAIN
>              if not error:
>                  rpc = ovs.jsonrpc.Connection(stream)
>                  self._conns.append(UnixctlConnection(rpc))
> diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> index 18634e6..3eabcd7 100644
> --- a/tests/test-jsonrpc.py
> +++ b/tests/test-jsonrpc.py
> @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> 
> 
>  def do_listen(name):
> -    error, pstream = ovs.stream.PassiveStream.open(name)
> -    if error:
> -        sys.stderr.write("could not listen on \"%s\": %s\n"
> -                         % (name, os.strerror(error)))
> -        sys.exit(1)
> +    if sys.platform != 'win32' or (
> +            ovs.daemon._detach and ovs.daemon._detached):
> +        # On Windows the child is a new process created which should be the
> +        # one that creates the PassiveStream. Without this check, the new
> +        # child process will create a new PassiveStream overwriting the one
> +        # that the parent process created.
> +        error, pstream = ovs.stream.PassiveStream.open(name)
> +        if error:
> +            sys.stderr.write("could not listen on \"%s\": %s\n"
> +                             % (name, os.strerror(error)))
> +            sys.exit(1)
> 
>      ovs.daemon.daemonize()
> 
> --
> 2.10.0.windows.1
> _______________________________________________
> dev mailing list
> mailto:dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Gurucharan Shetty Jan. 3, 2017, 7:08 p.m. UTC | #2
On 3 January 2017 at 11:03, Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
wrote:

> Thanks for the comment.
> The socket attribute of the class Stream cannot be None if the code runs
> on Unix.
> Therefore the condition "self.socket is not None" is True only if sockets
> are not used,
> and named pipes are used instead (i.e. it runs on Windows).
>
> However, if you prefer I can replace it with the following code:
>
I like it better with the below piece. So please go ahead and send it as
part of the next version.


>     if self.socket is not None:
>         retval = ovs.socket_util.check_connection_completion(self.socket)
>         assert retval != errno.EINPROGRESS
>     elif sys.platform == 'win32':
>         if self.retry_connect:
>             try:
>                 self.pipe = winutils.create_file(self._pipename)
>                 self._retry_connect = False
>                 retval = 0
>             except pywintypes.error as e:
>                 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
>                     retval = errno.EAGAIN
>                 else:
>                     self._retry_connect = False
>                     retval = errno.ENOENT
>         else:
>             # If retry_connect is false, it means it's already
>             # connected so we can set the value of retval to 0
>             retval = 0
>
>
> > From: Guru Shetty [mailto:guru@ovn.org]
> > Sent: Tuesday, January 3, 2017 8:17 PM
> > To: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
> > Cc: dev@openvswitch.org
> > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets
> to Windows
> >
> >
> >
> > On 3 January 2017 at 08:46, Alin Balutoiu <mailto:abalutoiu@
> cloudbasesolutions.com> wrote:
> > From: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> >
> > Unix sockets (AF_UNIX) are not supported on Windows.
> > The replacement of Unix sockets on Windows is implemented
> > using named pipes, we are trying to mimic the behaviour
> > of unix sockets.
> >
> > Instead of using Unix sockets to communicate
> > between components Named Pipes are used. This
> > makes the python sockets compatible with the
> > Named Pipe used in Windows applications.
> >
> > Signed-off-by: Paul-Daniel Boca <mailto:pboca@cloudbasesolutions.com>
> > Signed-off-by: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > ---
> > V2: No changes.
> > V3: Changed Signed-off-by name and added previous Acked-by's,
> Tested-by's.
> >
> > I intend to add the following diff:
> >
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index e8e5700..af35afd 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -251,7 +251,7 @@ class Stream(object):
> >                  else:
> >                      self._retry_connect = False
> >                      retval = errno.ENOENT
> > -        else:
> > +        elif sys.platform == 'win32':
> >              # Windows only, if retry_connect is false, it means it's
> already
> >              # connected so we can set the value of retval to 0
> >              retval = 0
> >
> >
> > ---
> >  python/ovs/jsonrpc.py        |   6 +
> >  python/ovs/poller.py         |  72 ++++++---
> >  python/ovs/socket_util.py    |  31 +++-
> >  python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++
> ++++++++++---
> >  python/ovs/unixctl/server.py |   4 +
> >  tests/test-jsonrpc.py        |  16 +-
> >  6 files changed, 436 insertions(+), 44 deletions(-)
> >
> > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > index 6300c67..5a11500 100644
> > --- a/python/ovs/jsonrpc.py
> > +++ b/python/ovs/jsonrpc.py
> > @@ -14,6 +14,7 @@
> >
> >  import errno
> >  import os
> > +import sys
> >
> >  import six
> >
> > @@ -274,6 +275,11 @@ class Connection(object):
> >                      except UnicodeError:
> >                          error = errno.EILSEQ
> >                  if error:
> > +                    if (sys.platform == "win32" and
> > +                            error == errno.WSAEWOULDBLOCK):
> > +                        # WSAEWOULDBLOCK would be the equivalent on
> Windows
> > +                        # for EAGAIN on Unix.
> > +                        error = errno.EAGAIN
> >                      if error == errno.EAGAIN:
> >                          return error, None
> >                      else:
> > diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> > index d7cb7d3..d836483 100644
> > --- a/python/ovs/poller.py
> > +++ b/python/ovs/poller.py
> > @@ -18,6 +18,10 @@ import ovs.vlog
> >  import select
> >  import socket
> >  import os
> > +import sys
> > +
> > +if sys.platform == "win32":
> > +    import ovs.winutils as winutils
> >
> >  try:
> >      from OpenSSL import SSL
> > @@ -62,7 +66,9 @@ class _SelectSelect(object):
> >          if SSL and isinstance(fd, SSL.Connection):
> >              fd = fd.fileno()
> >
> > -        assert isinstance(fd, int)
> > +        if sys.platform != 'win32':
> > +            # Skip this on Windows, it also register events
> > +            assert isinstance(fd, int)
> >          if events & POLLIN:
> >              self.rlist.append(fd)
> >              events &= ~POLLIN
> > @@ -73,28 +79,58 @@ class _SelectSelect(object):
> >              self.xlist.append(fd)
> >
> >      def poll(self, timeout):
> > -        if timeout == -1:
> > -            # epoll uses -1 for infinite timeout, select uses None.
> > -            timeout = None
> > -        else:
> > -            timeout = float(timeout) / 1000
> >          # XXX workaround a bug in eventlet
> >          # see https://github.com/eventlet/eventlet/pull/25
> >          if timeout == 0 and _using_eventlet_green_select():
> >              timeout = 0.1
> > +        if sys.platform == 'win32':
> > +            events = self.rlist + self.wlist + self.xlist
> > +            if not events:
> > +                return []
> > +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> > +                raise WindowsError("Cannot handle more than maximum
> wait"
> > +                                   "objects\n")
> > +
> > +            # win32event.INFINITE timeout is -1
> > +            # timeout must be an int number, expressed in ms
> > +            if timeout == 0.1:
> > +                timeout = 100
> > +            else:
> > +                timeout = int(timeout)
> > +
> > +            # Wait until any of the events is set to signaled
> > +            try:
> > +                retval = winutils.win32event.WaitForMultipleObjects(
> > +                    events,
> > +                    False,  # Wait all
> > +                    timeout)
> > +            except winutils.pywintypes.error:
> > +                    return [(0, POLLERR)]
> >
> > -        rlist, wlist, xlist = select.select(self.rlist, self.wlist,
> self.xlist,
> > -                                            timeout)
> > -        events_dict = {}
> > -        for fd in rlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > -        for fd in wlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > -        for fd in xlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > -                                                        POLLHUP |
> > -                                                        POLLNVAL)
> > -        return list(events_dict.items())
> > +            if retval == winutils.winerror.WAIT_TIMEOUT:
> > +                return []
> > +
> > +            return [(events[retval], 0)]
> > +        else:
> > +            if timeout == -1:
> > +                # epoll uses -1 for infinite timeout, select uses None.
> > +                timeout = None
> > +            else:
> > +                timeout = float(timeout) / 1000
> > +            rlist, wlist, xlist = select.select(self.rlist,
> > +                                                self.wlist,
> > +                                                self.xlist,
> > +                                                timeout)
> > +            events_dict = {}
> > +            for fd in rlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > +            for fd in wlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > +            for fd in xlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > +                                                            POLLHUP |
> > +                                                            POLLNVAL)
> > +            return list(events_dict.items())
> >
> >
> >  SelectPoll = _SelectSelect
> > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> > index b358b05..fb6cee4 100644
> > --- a/python/ovs/socket_util.py
> > +++ b/python/ovs/socket_util.py
> > @@ -17,6 +17,7 @@ import os
> >  import os.path
> >  import random
> >  import socket
> > +import sys
> >
> >  import six
> >  from six.moves import range
> > @@ -25,6 +26,10 @@ import ovs.fatal_signal
> >  import ovs.poller
> >  import ovs.vlog
> >
> > +if sys.platform == 'win32':
> > +    import ovs.winutils as winutils
> > +    import win32file
> > +
> >  vlog = ovs.vlog.Vlog("socket_util")
> >
> >
> > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path,
> connect_path, short=False):
> >
> >  def check_connection_completion(sock):
> >      p = ovs.poller.SelectPoll()
> > -    p.register(sock, ovs.poller.POLLOUT)
> > +    if sys.platform == "win32":
> > +        event = winutils.get_new_event(None, False, True, None)
> > +        # Receive notification of readiness for writing, of completed
> > +        # connection or multipoint join operation, and of socket
> closure.
> > +        win32file.WSAEventSelect(sock, event,
> > +                                 win32file.FD_WRITE |
> > +                                 win32file.FD_CONNECT |
> > +                                 win32file.FD_CLOSE)
> > +        p.register(event, ovs.poller.POLLOUT)
> > +    else:
> > +        p.register(sock, ovs.poller.POLLOUT)
> >      pfds = p.poll(0)
> >      if len(pfds) == 1:
> >          revents = pfds[0][1]
> > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port,
> dscp):
> >          try:
> >              sock.connect(address)
> >          except socket.error as e:
> > -            if get_exception_errno(e) != errno.EINPROGRESS:
> > +            error = get_exception_errno(e)
> > +            if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                # for EINPROGRESS on Unix.
> > +                error = errno.EINPROGRESS
> > +            if error != errno.EINPROGRESS:
> >                  raise
> >          return 0, sock
> >      except socket.error as e:
> > @@ -257,9 +277,12 @@ def get_null_fd():
> >      global null_fd
> >      if null_fd < 0:
> >          try:
> > -            null_fd = os.open("/dev/null", os.O_RDWR)
> > +            # os.devnull ensures compatibility with Windows, returns
> > +            # '/dev/null' for Unix and 'nul' for Windows
> > +            null_fd = os.open(os.devnull, os.O_RDWR)
> >          except OSError as e:
> > -            vlog.err("could not open /dev/null: %s" %
> os.strerror(e.errno))
> > +            vlog.err("could not open %s: %s" % (os.devnull,
> > +                                                os.strerror(e.errno)))
> >              return -e.errno
> >      return null_fd
> >
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index cd57eb3..e8e5700 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -15,6 +15,7 @@
> >  import errno
> >  import os
> >  import socket
> > +import sys
> >
> >  import six
> >
> > @@ -27,6 +28,13 @@ try:
> >  except ImportError:
> >      SSL = None
> >
> > +if sys.platform == 'win32':
> > +    import ovs.winutils as winutils
> > +    import pywintypes
> > +    import win32event
> > +    import win32file
> > +    import win32pipe
> > +
> >  vlog = ovs.vlog.Vlog("stream")
> >
> >
> > @@ -63,6 +71,13 @@ class Stream(object):
> >      _SSL_certificate_file = None
> >      _SSL_ca_cert_file = None
> >
> > +    # Windows only
> > +    _write = None                # overlapped for write operation
> > +    _read = None                 # overlapped for read operation
> > +    _write_pending = False
> > +    _read_pending = False
> > +    _retry_connect = False
> > +
> >      @staticmethod
> >      def register_method(method, cls):
> >          Stream._SOCKET_METHODS[method + ":"] = cls
> > @@ -81,8 +96,23 @@ class Stream(object):
> >          otherwise False."""
> >          return bool(Stream._find_method(name))
> >
> > -    def __init__(self, socket, name, status):
> > +    def __init__(self, socket, name, status, pipe=None,
> is_server=False):
> >          self.socket = socket
> > +        self.pipe = pipe
> > +        if sys.platform == 'win32':
> > +            self._read = pywintypes.OVERLAPPED()
> > +            self._read.hEvent = winutils.get_new_event()
> > +            self._write = pywintypes.OVERLAPPED()
> > +            self._write.hEvent = winutils.get_new_event()
> > +            if pipe is not None:
> > +                # Flag to check if fd is a server HANDLE.  In the case
> of a
> > +                # server handle we have to issue a disconnect before
> closing
> > +                # the actual handle.
> > +                self._server = is_server
> > +                suffix = name.split(":", 1)[1]
> > +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> suffix)
> > +                self._pipename = winutils.get_pipe_name(suffix)
> > +
> >          http://self.name = name
> >          if status == errno.EAGAIN:
> >              self.state = Stream.__S_CONNECTING
> > @@ -120,6 +150,38 @@ class Stream(object):
> >          suffix = name.split(":", 1)[1]
> >          if name.startswith("unix:"):
> >              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +            if sys.platform == 'win32':
> > +                pipename = winutils.get_pipe_name(suffix)
> > +
> > +                if len(suffix) > 255:
> > +                    # Return invalid argument if the name is too long
> > +                    return errno.ENOENT, None
> > +
> > +                try:
> > +                    # In case of "unix:" argument, the assumption is
> that
> > +                    # there is a file created in the path (suffix).
> > +                    open(suffix, 'r').close()
> > +                except:
> > +                    return errno.ENOENT, None
> > +
> > +                try:
> > +                    npipe = winutils.create_file(pipename)
> > +                    try:
> > +                        winutils.set_pipe_mode(npipe,
> > +
>  win32pipe.PIPE_READMODE_BYTE)
> > +                    except pywintypes.error as e:
> > +                        return errno.ENOENT, None
> > +                except pywintypes.error as e:
> > +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > +                        # Pipe is busy, set the retry flag to true and
> retry
> > +                        # again during the connect function.
> > +                        Stream.retry_connect = True
> > +                        return 0, cls(None, name, errno.EAGAIN,
> > +                                      pipe=win32file.INVALID_HANDLE_
> VALUE,
> > +                                      is_server=False)
> > +                    return errno.ENOENT, None
> > +                return 0, cls(None, name, 0, pipe=npipe,
> is_server=False)
> > +
> >          error, sock = cls._open(suffix, dscp)
> >          if error:
> >              return error, None
> > @@ -145,6 +207,10 @@ class Stream(object):
> >          if not error:
> >              while True:
> >                  error = stream.connect()
> > +                if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                    # for EAGAIN on Unix.
> > +                    error = errno.EAGAIN
> >                  if error != errno.EAGAIN:
> >                      break
> >                  stream.run()
> > @@ -152,7 +218,8 @@ class Stream(object):
> >                  stream.run_wait(poller)
> >                  stream.connect_wait(poller)
> >                  poller.block()
> > -            assert error != errno.EINPROGRESS
> > +            if stream.socket is not None:
> > +                assert error != errno.EINPROGRESS
> >
> >          if error and stream:
> >              stream.close()
> > @@ -160,11 +227,35 @@ class Stream(object):
> >          return error, stream
> >
> >      def close(self):
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            if self._server:
> > +                win32pipe.DisconnectNamedPipe(self.pipe)
> > +            winutils.close_handle(self.pipe, vlog.warn)
> > +            winutils.close_handle(self._read.hEvent, vlog.warn)
> > +            winutils.close_handle(self._write.hEvent, vlog.warn)
> >
> >      def __scs_connecting(self):
> > -        retval = ovs.socket_util.check_connection_completion(self.
> socket)
> > -        assert retval != errno.EINPROGRESS
> > +        if self.socket is not None:
> > +            retval = ovs.socket_util.check_connection_completion(self.
> socket)
> > +            assert retval != errno.EINPROGRESS
> > +        elif sys.platform == 'win32' and self.retry_connect:
> > +            try:
> > +                self.pipe = winutils.create_file(self._pipename)
> > +                self._retry_connect = False
> > +                retval = 0
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > +                    retval = errno.EAGAIN
> > +                else:
> > +                    self._retry_connect = False
> > +                    retval = errno.ENOENT
> > +        else:
> > +            # Windows only, if retry_connect is false, it means it's
> already
> > +            # connected so we can set the value of retval to 0
> > +            retval = 0
> > +
> >          if retval == 0:
> >              self.state = Stream.__S_CONNECTED
> >          elif retval != errno.EAGAIN:
> > @@ -209,11 +300,63 @@ class Stream(object):
> >          elif n == 0:
> >              return (0, "")
> >
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__recv_windows(n)
> > +
> >          try:
> >              return (0, self.socket.recv(n))
> >          except socket.error as e:
> >              return (ovs.socket_util.get_exception_errno(e), "")
> >
> > +    def __recv_windows(self, n):
> > +        if self._read_pending:
> > +            try:
> > +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> > +                                                            self._read,
> > +                                                            False)
> > +                self._read_pending = False
> > +                recvBuffer = self._read_buffer[:nBytesRead]
> > +
> > +                return (0, winutils.get_decoded_buffer(recvBuffer))
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self._read_pending = True
> > +                    return (errno.EAGAIN, "")
> > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > +                    # If the pipe was disconnected, return 0.
> > +                    return (0, "")
> > +                else:
> > +                    return (errno.EINVAL, "")
> > +
> > +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> > +                                                          n,
> > +                                                          self._read)
> > +        if errCode:
> > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > +                self._read_pending = True
> > +                return (errno.EAGAIN, "")
> > +            elif errCode in winutils.pipe_disconnected_errors:
> > +                # If the pipe was disconnected, return 0.
> > +                return (0, "")
> > +            else:
> > +                return (errCode, "")
> > +
> > +        try:
> > +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> > +                                                        self._read,
> > +                                                        False)
> > +            winutils.win32event.SetEvent(self._read.hEvent)
> > +        except pywintypes.error as e:
> > +            if e.winerror in winutils.pipe_disconnected_errors:
> > +                # If the pipe was disconnected, return 0.
> > +                return (0, "")
> > +            else:
> > +                return (e.winerror, "")
> > +
> > +        recvBuffer = self._read_buffer[:nBytesRead]
> > +        return (0, winutils.get_decoded_buffer(recvBuffer))
> > +
> >      def send(self, buf):
> >          """Tries to send 'buf' on this stream.
> >
> > @@ -231,6 +374,9 @@ class Stream(object):
> >          elif len(buf) == 0:
> >              return 0
> >
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__send_windows(buf)
> > +
> >          try:
> >              # Python 3 has separate types for strings and bytes.  We
> must have
> >              # bytes here.
> > @@ -240,6 +386,40 @@ class Stream(object):
> >          except socket.error as e:
> >              return -ovs.socket_util.get_exception_errno(e)
> >
> > +    def __send_windows(self, buf):
> > +        if self._write_pending:
> > +            try:
> > +                nBytesWritten = winutils.get_overlapped_
> result(self.pipe,
> > +
>  self._write,
> > +                                                               False)
> > +                self._write_pending = False
> > +                return nBytesWritten
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self._read_pending = True
> > +                    return -errno.EAGAIN
> > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > +                    # If the pipe was disconnected, return connection
> reset.
> > +                    return -errno.ECONNRESET
> > +                else:
> > +                    return -errno.EINVAL
> > +
> > +        buf = winutils.get_encoded_buffer(buf)
> > +        self._write_pending = False
> > +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> > +                                                       buf,
> > +                                                       self._write)
> > +        if errCode:
> > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > +                self._write_pending = True
> > +                return -errno.EAGAIN
> > +            if (not nBytesWritten and
> > +                    errCode in winutils.pipe_disconnected_errors):
> > +                # If the pipe was disconnected, return connection reset.
> > +                return -errno.ECONNRESET
> > +        return nBytesWritten
> > +
> >      def run(self):
> >          pass
> >
> > @@ -255,11 +435,52 @@ class Stream(object):
> >
> >          if self.state == Stream.__S_CONNECTING:
> >              wait = Stream.W_CONNECT
> > +
> > +        if sys.platform == 'win32':
> > +            self.__wait_windows(poller, wait)
> > +            return
> > +
> >          if wait == Stream.W_RECV:
> >              poller.fd_wait(self.socket, ovs.poller.POLLIN)
> >          else:
> >              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> >
> > +    def __wait_windows(self, poller, wait):
> > +        if self.socket is not None:
> > +            if wait == Stream.W_RECV:
> > +                read_flags = (win32file.FD_READ |
> > +                              win32file.FD_ACCEPT |
> > +                              win32file.FD_CLOSE)
> > +                try:
> > +                    win32file.WSAEventSelect(self.socket,
> > +                                             self._read.hEvent,
> > +                                             read_flags)
> > +                except pywintypes.error as e:
> > +                    vlog.err("failed to associate events with socket:
> %s"
> > +                             % e.strerror)
> > +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > +            else:
> > +                write_flags = (win32file.FD_WRITE |
> > +                               win32file.FD_CONNECT |
> > +                               win32file.FD_CLOSE)
> > +                try:
> > +                    win32file.WSAEventSelect(self.socket,
> > +                                             self._write.hEvent,
> > +                                             write_flags)
> > +                except pywintypes.error as e:
> > +                    vlog.err("failed to associate events with socket:
> %s"
> > +                             % e.strerror)
> > +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > +        else:
> > +            if wait == Stream.W_RECV:
> > +                if self._read:
> > +                    poller.fd_wait(self._read.hEvent,
> ovs.poller.POLLIN)
> > +            elif wait == Stream.W_SEND:
> > +                if self._write:
> > +                    poller.fd_wait(self._write.hEvent,
> ovs.poller.POLLOUT)
> > +            elif wait == Stream.W_CONNECT:
> > +                return
> > +
> >      def connect_wait(self, poller):
> >          self.wait(poller, Stream.W_CONNECT)
> >
> > @@ -267,11 +488,22 @@ class Stream(object):
> >          self.wait(poller, Stream.W_RECV)
> >
> >      def send_wait(self, poller):
> > +        if sys.platform == 'win32':
> > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> >          self.wait(poller, Stream.W_SEND)
> >
> >      def __del__(self):
> >          # Don't delete the file: we might have forked.
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            # Check if there are any remaining valid handles and close
> them
> > +            if self.pipe:
> > +                winutils.close_handle(self.pipe)
> > +            if self._read.hEvent:
> > +                winutils.close_handle(self._read.hEvent)
> > +            if self._write.hEvent:
> > +                winutils.close_handle(self._write.hEvent)
> >
> >      @staticmethod
> >      def ssl_set_private_key_file(file_name):
> > @@ -287,6 +519,10 @@ class Stream(object):
> >
> >
> >  class PassiveStream(object):
> > +    # Windows only
> > +    connect = None                  # overlapped for read operation
> > +    connect_pending = False
> > +
> >      @staticmethod
> >      def is_valid_name(name):
> >          """Returns True if 'name' is a passive stream name in the form
> > @@ -294,9 +530,18 @@ class PassiveStream(object):
> >          "punix:" or "ptcp"), otherwise False."""
> >          return name.startswith("punix:") | name.startswith("ptcp:")
> >
> > -    def __init__(self, sock, name, bind_path):
> > +    def __init__(self, sock, name, bind_path, pipe=None):
> >          http://self.name = name
> > +        self.pipe = pipe
> >          self.socket = sock
> > +        if pipe is not None:
> > +            self.connect = pywintypes.OVERLAPPED()
> > +            self.connect.hEvent = winutils.get_new_event(
> bManualReset=True)
> > +            self.connect_pending = False
> > +            suffix = name.split(":", 1)[1]
> > +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +            self._pipename = winutils.get_pipe_name(suffix)
> > +
> >          self.bind_path = bind_path
> >
> >      @staticmethod
> > @@ -315,11 +560,27 @@ class PassiveStream(object):
> >          bind_path = name[6:]
> >          if name.startswith("punix:"):
> >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> bind_path)
> > -            error, sock = ovs.socket_util.make_unix_
> socket(socket.SOCK_STREAM,
> > -                                                           True,
> bind_path,
> > -                                                           None)
> > -            if error:
> > -                return error, None
> > +            if sys.platform != 'win32':
> > +                error, sock = ovs.socket_util.make_unix_socket(
> > +                    socket.SOCK_STREAM, True, bind_path, None)
> > +                if error:
> > +                    return error, None
> > +            else:
> > +                # Branch used only on Windows
> > +                try:
> > +                    open(bind_path, 'w').close()
> > +                except:
> > +                    return errno.ENOENT, None
> > +
> > +                pipename = winutils.get_pipe_name(bind_path)
> > +                if len(pipename) > 255:
> > +                    # Return invalid argument if the name is too long
> > +                    return errno.ENOENT, None
> > +
> > +                npipe = winutils.create_named_pipe(pipename)
> > +                if not npipe:
> > +                    return errno.ENOENT, None
> > +                return 0, PassiveStream(None, name, bind_path,
> pipe=npipe)
> >
> >          elif name.startswith("ptcp:"):
> >              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > @@ -341,7 +602,11 @@ class PassiveStream(object):
> >
> >      def close(self):
> >          """Closes this PassiveStream."""
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            winutils.close_handle(self.pipe, vlog.warn)
> > +            winutils.close_handle(self.connect.hEvent, vlog.warn)
> >          if self.bind_path is not None:
> >              ovs.fatal_signal.unlink_file_now(self.bind_path)
> >              self.bind_path = None
> > @@ -354,28 +619,80 @@ class PassiveStream(object):
> >
> >          Will not block waiting for a connection.  If no connection is
> ready to
> >          be accepted, returns (errno.EAGAIN, None) immediately."""
> > -
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__accept_windows()
> >          while True:
> >              try:
> >                  sock, addr = self.socket.accept()
> >                  ovs.socket_util.set_nonblocking(sock)
> > -                if (sock.family == socket.AF_UNIX):
> > +                if (sys.platform != 'win32' and sock.family ==
> socket.AF_UNIX):
> >                      return 0, Stream(sock, "unix:%s" % addr, 0)
> >                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> >                                                         str(addr[1])), 0)
> >              except socket.error as e:
> >                  error = ovs.socket_util.get_exception_errno(e)
> > +                if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                    # for EAGAIN on Unix.
> > +                    error = errno.EAGAIN
> >                  if error != errno.EAGAIN:
> >                      # XXX rate-limit
> >                      vlog.dbg("accept: %s" % os.strerror(error))
> >                  return error, None
> >
> > +    def __accept_windows(self):
> > +        if self.connect_pending:
> > +            try:
> > +                winutils.get_overlapped_result(self.pipe,
> self.connect, False)
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self.connect_pending = True
> > +                    return errno.EAGAIN, None
> > +                else:
> > +                    if self.pipe:
> > +                        win32pipe.DisconnectNamedPipe(self.pipe)
> > +                    return errno.EINVAL, None
> > +            self.connect_pending = False
> > +
> > +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> > +        if error:
> > +            if error == winutils.winerror.ERROR_IO_PENDING:
> > +                self.connect_pending = True
> > +                return errno.EAGAIN, None
> > +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> > +                if self.pipe:
> > +                    win32pipe.DisconnectNamedPipe(self.pipe)
> > +                self.connect_pending = False
> > +                return errno.EINVAL, None
> > +            else:
> > +                win32event.SetEvent(self.connect.hEvent)
> > +
> > +        npipe = winutils.create_named_pipe(self._pipename)
> > +        if not npipe:
> > +            return errno.ENOENT, None
> > +
> > +        old_pipe = self.pipe
> > +        self.pipe = npipe
> > +        winutils.win32event.ResetEvent(self.connect.hEvent)
> > +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> > +
> >      def wait(self, poller):
> > -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > +        if sys.platform != 'win32' or self.socket is not None:
> > +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > +        else:
> > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> >
> >      def __del__(self):
> >          # Don't delete the file: we might have forked.
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            # Check if there are any remaining valid handles and close
> them
> > +            if self.pipe:
> > +                winutils.close_handle(self.pipe)
> > +            if self._connect.hEvent:
> > +                winutils.close_handle(self._read.hEvent)
> >
> >
> >  def usage(name):
> > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> > index 8595ed8..3f3e051 100644
> > --- a/python/ovs/unixctl/server.py
> > +++ b/python/ovs/unixctl/server.py
> > @@ -148,6 +148,10 @@ class UnixctlServer(object):
> >      def run(self):
> >          for _ in range(10):
> >              error, stream = self._listener.accept()
> > +            if sys.platform == "win32" and error ==
> errno.WSAEWOULDBLOCK:
> > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                # for EAGAIN on Unix.
> > +                error = errno.EAGAIN
> >              if not error:
> >                  rpc = ovs.jsonrpc.Connection(stream)
> >                  self._conns.append(UnixctlConnection(rpc))
> > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> > index 18634e6..3eabcd7 100644
> > --- a/tests/test-jsonrpc.py
> > +++ b/tests/test-jsonrpc.py
> > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> >
> >
> >  def do_listen(name):
> > -    error, pstream = ovs.stream.PassiveStream.open(name)
> > -    if error:
> > -        sys.stderr.write("could not listen on \"%s\": %s\n"
> > -                         % (name, os.strerror(error)))
> > -        sys.exit(1)
> > +    if sys.platform != 'win32' or (
> > +            ovs.daemon._detach and ovs.daemon._detached):
> > +        # On Windows the child is a new process created which should be
> the
> > +        # one that creates the PassiveStream. Without this check, the
> new
> > +        # child process will create a new PassiveStream overwriting the
> one
> > +        # that the parent process created.
> > +        error, pstream = ovs.stream.PassiveStream.open(name)
> > +        if error:
> > +            sys.stderr.write("could not listen on \"%s\": %s\n"
> > +                             % (name, os.strerror(error)))
> > +            sys.exit(1)
> >
> >      ovs.daemon.daemonize()
> >
> > --
> > 2.10.0.windows.1
> > _______________________________________________
> > dev mailing list
> > mailto:dev@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
>
Alin Balutoiu Jan. 3, 2017, 7:16 p.m. UTC | #3
Ok, I'll include that in the next version.
Mind if I respin the series after you take a look over the rest of the patches?

> From: Guru Shetty [mailto:guru@ovn.org] 
> Sent: Tuesday, January 3, 2017 9:09 PM
> To: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
> Cc: dev@openvswitch.org
> Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to Windows
> 
> 
> 
> On 3 January 2017 at 11:03, Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com> wrote:
> Thanks for the comment.
> The socket attribute of the class Stream cannot be None if the code runs on Unix.
> Therefore the condition "self.socket is not None" is True only if sockets are not used,
> and named pipes are used instead (i.e. it runs on Windows).
> 
> However, if you prefer I can replace it with the following code:
> I like it better with the below piece. So please go ahead and send it as part of the next version.
>  
>     if self.socket is not None:
>         retval = ovs.socket_util.check_connection_completion(self.socket)
>         assert retval != errno.EINPROGRESS
>     elif sys.platform == 'win32':
>         if self.retry_connect:
>             try:
>                 self.pipe = winutils.create_file(self._pipename)
>                 self._retry_connect = False
>                 retval = 0
>             except pywintypes.error as e:
>                 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
>                     retval = errno.EAGAIN
>                 else:
>                     self._retry_connect = False
>                     retval = errno.ENOENT
>         else:
>             # If retry_connect is false, it means it's already
>             # connected so we can set the value of retval to 0
>             retval = 0
> 
> 
> > From: Guru Shetty [mailto:mailto:guru@ovn.org]
> > Sent: Tuesday, January 3, 2017 8:17 PM
> > To: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> > Cc: mailto:dev@openvswitch.org
> > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to Windows
> >
> >
> >
> > On 3 January 2017 at 08:46, Alin Balutoiu <mailto:mailto:abalutoiu@cloudbasesolutions.com> wrote:
> > From: Alin Balutoiu <mailto:mailto:abalutoiu@cloudbasesolutions.com>
> >
> > Unix sockets (AF_UNIX) are not supported on Windows.
> > The replacement of Unix sockets on Windows is implemented
> > using named pipes, we are trying to mimic the behaviour
> > of unix sockets.
> >
> > Instead of using Unix sockets to communicate
> > between components Named Pipes are used. This
> > makes the python sockets compatible with the
> > Named Pipe used in Windows applications.
> >
> > Signed-off-by: Paul-Daniel Boca <mailto:mailto:pboca@cloudbasesolutions.com>
> > Signed-off-by: Alin Balutoiu <mailto:mailto:abalutoiu@cloudbasesolutions.com>
> > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > ---
> > V2: No changes.
> > V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's.
> >
> > I intend to add the following diff:
> >
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index e8e5700..af35afd 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -251,7 +251,7 @@ class Stream(object):
> >                  else:
> >                      self._retry_connect = False
> >                      retval = errno.ENOENT
> > -        else:
> > +        elif sys.platform == 'win32':
> >              # Windows only, if retry_connect is false, it means it's already
> >              # connected so we can set the value of retval to 0
> >              retval = 0
> >
> >
> > ---
> >  python/ovs/jsonrpc.py        |   6 +
> >  python/ovs/poller.py         |  72 ++++++---
> >  python/ovs/socket_util.py    |  31 +++-
> >  python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++++++++++++---
> >  python/ovs/unixctl/server.py |   4 +
> >  tests/test-jsonrpc.py        |  16 +-
> >  6 files changed, 436 insertions(+), 44 deletions(-)
> >
> > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > index 6300c67..5a11500 100644
> > --- a/python/ovs/jsonrpc.py
> > +++ b/python/ovs/jsonrpc.py
> > @@ -14,6 +14,7 @@
> >
> >  import errno
> >  import os
> > +import sys
> >
> >  import six
> >
> > @@ -274,6 +275,11 @@ class Connection(object):
> >                      except UnicodeError:
> >                          error = errno.EILSEQ
> >                  if error:
> > +                    if (sys.platform == "win32" and
> > +                            error == errno.WSAEWOULDBLOCK):
> > +                        # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                        # for EAGAIN on Unix.
> > +                        error = errno.EAGAIN
> >                      if error == errno.EAGAIN:
> >                          return error, None
> >                      else:
> > diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> > index d7cb7d3..d836483 100644
> > --- a/python/ovs/poller.py
> > +++ b/python/ovs/poller.py
> > @@ -18,6 +18,10 @@ import ovs.vlog
> >  import select
> >  import socket
> >  import os
> > +import sys
> > +
> > +if sys.platform == "win32":
> > +    import ovs.winutils as winutils
> >
> >  try:
> >      from OpenSSL import SSL
> > @@ -62,7 +66,9 @@ class _SelectSelect(object):
> >          if SSL and isinstance(fd, SSL.Connection):
> >              fd = fd.fileno()
> >
> > -        assert isinstance(fd, int)
> > +        if sys.platform != 'win32':
> > +            # Skip this on Windows, it also register events
> > +            assert isinstance(fd, int)
> >          if events & POLLIN:
> >              self.rlist.append(fd)
> >              events &= ~POLLIN
> > @@ -73,28 +79,58 @@ class _SelectSelect(object):
> >              self.xlist.append(fd)
> >
> >      def poll(self, timeout):
> > -        if timeout == -1:
> > -            # epoll uses -1 for infinite timeout, select uses None.
> > -            timeout = None
> > -        else:
> > -            timeout = float(timeout) / 1000
> >          # XXX workaround a bug in eventlet
> >          # see https://github.com/eventlet/eventlet/pull/25
> >          if timeout == 0 and _using_eventlet_green_select():
> >              timeout = 0.1
> > +        if sys.platform == 'win32':
> > +            events = self.rlist + self.wlist + self.xlist
> > +            if not events:
> > +                return []
> > +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> > +                raise WindowsError("Cannot handle more than maximum wait"
> > +                                   "objects\n")
> > +
> > +            # win32event.INFINITE timeout is -1
> > +            # timeout must be an int number, expressed in ms
> > +            if timeout == 0.1:
> > +                timeout = 100
> > +            else:
> > +                timeout = int(timeout)
> > +
> > +            # Wait until any of the events is set to signaled
> > +            try:
> > +                retval = winutils.win32event.WaitForMultipleObjects(
> > +                    events,
> > +                    False,  # Wait all
> > +                    timeout)
> > +            except winutils.pywintypes.error:
> > +                    return [(0, POLLERR)]
> >
> > -        rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
> > -                                            timeout)
> > -        events_dict = {}
> > -        for fd in rlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > -        for fd in wlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > -        for fd in xlist:
> > -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > -                                                        POLLHUP |
> > -                                                        POLLNVAL)
> > -        return list(events_dict.items())
> > +            if retval == winutils.winerror.WAIT_TIMEOUT:
> > +                return []
> > +
> > +            return [(events[retval], 0)]
> > +        else:
> > +            if timeout == -1:
> > +                # epoll uses -1 for infinite timeout, select uses None.
> > +                timeout = None
> > +            else:
> > +                timeout = float(timeout) / 1000
> > +            rlist, wlist, xlist = select.select(self.rlist,
> > +                                                self.wlist,
> > +                                                self.xlist,
> > +                                                timeout)
> > +            events_dict = {}
> > +            for fd in rlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > +            for fd in wlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > +            for fd in xlist:
> > +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > +                                                            POLLHUP |
> > +                                                            POLLNVAL)
> > +            return list(events_dict.items())
> >
> >
> >  SelectPoll = _SelectSelect
> > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> > index b358b05..fb6cee4 100644
> > --- a/python/ovs/socket_util.py
> > +++ b/python/ovs/socket_util.py
> > @@ -17,6 +17,7 @@ import os
> >  import os.path
> >  import random
> >  import socket
> > +import sys
> >
> >  import six
> >  from six.moves import range
> > @@ -25,6 +26,10 @@ import ovs.fatal_signal
> >  import ovs.poller
> >  import ovs.vlog
> >
> > +if sys.platform == 'win32':
> > +    import ovs.winutils as winutils
> > +    import win32file
> > +
> >  vlog = ovs.vlog.Vlog("socket_util")
> >
> >
> > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False):
> >
> >  def check_connection_completion(sock):
> >      p = ovs.poller.SelectPoll()
> > -    p.register(sock, ovs.poller.POLLOUT)
> > +    if sys.platform == "win32":
> > +        event = winutils.get_new_event(None, False, True, None)
> > +        # Receive notification of readiness for writing, of completed
> > +        # connection or multipoint join operation, and of socket closure.
> > +        win32file.WSAEventSelect(sock, event,
> > +                                 win32file.FD_WRITE |
> > +                                 win32file.FD_CONNECT |
> > +                                 win32file.FD_CLOSE)
> > +        p.register(event, ovs.poller.POLLOUT)
> > +    else:
> > +        p.register(sock, ovs.poller.POLLOUT)
> >      pfds = p.poll(0)
> >      if len(pfds) == 1:
> >          revents = pfds[0][1]
> > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, dscp):
> >          try:
> >              sock.connect(address)
> >          except socket.error as e:
> > -            if get_exception_errno(e) != errno.EINPROGRESS:
> > +            error = get_exception_errno(e)
> > +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                # for EINPROGRESS on Unix.
> > +                error = errno.EINPROGRESS
> > +            if error != errno.EINPROGRESS:
> >                  raise
> >          return 0, sock
> >      except socket.error as e:
> > @@ -257,9 +277,12 @@ def get_null_fd():
> >      global null_fd
> >      if null_fd < 0:
> >          try:
> > -            null_fd = os.open("/dev/null", os.O_RDWR)
> > +            # os.devnull ensures compatibility with Windows, returns
> > +            # '/dev/null' for Unix and 'nul' for Windows
> > +            null_fd = os.open(os.devnull, os.O_RDWR)
> >          except OSError as e:
> > -            vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
> > +            vlog.err("could not open %s: %s" % (os.devnull,
> > +                                                os.strerror(e.errno)))
> >              return -e.errno
> >      return null_fd
> >
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index cd57eb3..e8e5700 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -15,6 +15,7 @@
> >  import errno
> >  import os
> >  import socket
> > +import sys
> >
> >  import six
> >
> > @@ -27,6 +28,13 @@ try:
> >  except ImportError:
> >      SSL = None
> >
> > +if sys.platform == 'win32':
> > +    import ovs.winutils as winutils
> > +    import pywintypes
> > +    import win32event
> > +    import win32file
> > +    import win32pipe
> > +
> >  vlog = ovs.vlog.Vlog("stream")
> >
> >
> > @@ -63,6 +71,13 @@ class Stream(object):
> >      _SSL_certificate_file = None
> >      _SSL_ca_cert_file = None
> >
> > +    # Windows only
> > +    _write = None                # overlapped for write operation
> > +    _read = None                 # overlapped for read operation
> > +    _write_pending = False
> > +    _read_pending = False
> > +    _retry_connect = False
> > +
> >      @staticmethod
> >      def register_method(method, cls):
> >          Stream._SOCKET_METHODS[method + ":"] = cls
> > @@ -81,8 +96,23 @@ class Stream(object):
> >          otherwise False."""
> >          return bool(Stream._find_method(name))
> >
> > -    def __init__(self, socket, name, status):
> > +    def __init__(self, socket, name, status, pipe=None, is_server=False):
> >          self.socket = socket
> > +        self.pipe = pipe
> > +        if sys.platform == 'win32':
> > +            self._read = pywintypes.OVERLAPPED()
> > +            self._read.hEvent = winutils.get_new_event()
> > +            self._write = pywintypes.OVERLAPPED()
> > +            self._write.hEvent = winutils.get_new_event()
> > +            if pipe is not None:
> > +                # Flag to check if fd is a server HANDLE.  In the case of a
> > +                # server handle we have to issue a disconnect before closing
> > +                # the actual handle.
> > +                self._server = is_server
> > +                suffix = name.split(":", 1)[1]
> > +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +                self._pipename = winutils.get_pipe_name(suffix)
> > +
> >          http://self.name = name
> >          if status == errno.EAGAIN:
> >              self.state = Stream.__S_CONNECTING
> > @@ -120,6 +150,38 @@ class Stream(object):
> >          suffix = name.split(":", 1)[1]
> >          if name.startswith("unix:"):
> >              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +            if sys.platform == 'win32':
> > +                pipename = winutils.get_pipe_name(suffix)
> > +
> > +                if len(suffix) > 255:
> > +                    # Return invalid argument if the name is too long
> > +                    return errno.ENOENT, None
> > +
> > +                try:
> > +                    # In case of "unix:" argument, the assumption is that
> > +                    # there is a file created in the path (suffix).
> > +                    open(suffix, 'r').close()
> > +                except:
> > +                    return errno.ENOENT, None
> > +
> > +                try:
> > +                    npipe = winutils.create_file(pipename)
> > +                    try:
> > +                        winutils.set_pipe_mode(npipe,
> > +                                               win32pipe.PIPE_READMODE_BYTE)
> > +                    except pywintypes.error as e:
> > +                        return errno.ENOENT, None
> > +                except pywintypes.error as e:
> > +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > +                        # Pipe is busy, set the retry flag to true and retry
> > +                        # again during the connect function.
> > +                        Stream.retry_connect = True
> > +                        return 0, cls(None, name, errno.EAGAIN,
> > +                                      pipe=win32file.INVALID_HANDLE_VALUE,
> > +                                      is_server=False)
> > +                    return errno.ENOENT, None
> > +                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
> > +
> >          error, sock = cls._open(suffix, dscp)
> >          if error:
> >              return error, None
> > @@ -145,6 +207,10 @@ class Stream(object):
> >          if not error:
> >              while True:
> >                  error = stream.connect()
> > +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                    # for EAGAIN on Unix.
> > +                    error = errno.EAGAIN
> >                  if error != errno.EAGAIN:
> >                      break
> >                  stream.run()
> > @@ -152,7 +218,8 @@ class Stream(object):
> >                  stream.run_wait(poller)
> >                  stream.connect_wait(poller)
> >                  poller.block()
> > -            assert error != errno.EINPROGRESS
> > +            if stream.socket is not None:
> > +                assert error != errno.EINPROGRESS
> >
> >          if error and stream:
> >              stream.close()
> > @@ -160,11 +227,35 @@ class Stream(object):
> >          return error, stream
> >
> >      def close(self):
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            if self._server:
> > +                win32pipe.DisconnectNamedPipe(self.pipe)
> > +            winutils.close_handle(self.pipe, vlog.warn)
> > +            winutils.close_handle(self._read.hEvent, vlog.warn)
> > +            winutils.close_handle(self._write.hEvent, vlog.warn)
> >
> >      def __scs_connecting(self):
> > -        retval = ovs.socket_util.check_connection_completion(self.socket)
> > -        assert retval != errno.EINPROGRESS
> > +        if self.socket is not None:
> > +            retval = ovs.socket_util.check_connection_completion(self.socket)
> > +            assert retval != errno.EINPROGRESS
> > +        elif sys.platform == 'win32' and self.retry_connect:
> > +            try:
> > +                self.pipe = winutils.create_file(self._pipename)
> > +                self._retry_connect = False
> > +                retval = 0
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > +                    retval = errno.EAGAIN
> > +                else:
> > +                    self._retry_connect = False
> > +                    retval = errno.ENOENT
> > +        else:
> > +            # Windows only, if retry_connect is false, it means it's already
> > +            # connected so we can set the value of retval to 0
> > +            retval = 0
> > +
> >          if retval == 0:
> >              self.state = Stream.__S_CONNECTED
> >          elif retval != errno.EAGAIN:
> > @@ -209,11 +300,63 @@ class Stream(object):
> >          elif n == 0:
> >              return (0, "")
> >
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__recv_windows(n)
> > +
> >          try:
> >              return (0, self.socket.recv(n))
> >          except socket.error as e:
> >              return (ovs.socket_util.get_exception_errno(e), "")
> >
> > +    def __recv_windows(self, n):
> > +        if self._read_pending:
> > +            try:
> > +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> > +                                                            self._read,
> > +                                                            False)
> > +                self._read_pending = False
> > +                recvBuffer = self._read_buffer[:nBytesRead]
> > +
> > +                return (0, winutils.get_decoded_buffer(recvBuffer))
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self._read_pending = True
> > +                    return (errno.EAGAIN, "")
> > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > +                    # If the pipe was disconnected, return 0.
> > +                    return (0, "")
> > +                else:
> > +                    return (errno.EINVAL, "")
> > +
> > +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> > +                                                          n,
> > +                                                          self._read)
> > +        if errCode:
> > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > +                self._read_pending = True
> > +                return (errno.EAGAIN, "")
> > +            elif errCode in winutils.pipe_disconnected_errors:
> > +                # If the pipe was disconnected, return 0.
> > +                return (0, "")
> > +            else:
> > +                return (errCode, "")
> > +
> > +        try:
> > +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> > +                                                        self._read,
> > +                                                        False)
> > +            winutils.win32event.SetEvent(self._read.hEvent)
> > +        except pywintypes.error as e:
> > +            if e.winerror in winutils.pipe_disconnected_errors:
> > +                # If the pipe was disconnected, return 0.
> > +                return (0, "")
> > +            else:
> > +                return (e.winerror, "")
> > +
> > +        recvBuffer = self._read_buffer[:nBytesRead]
> > +        return (0, winutils.get_decoded_buffer(recvBuffer))
> > +
> >      def send(self, buf):
> >          """Tries to send 'buf' on this stream.
> >
> > @@ -231,6 +374,9 @@ class Stream(object):
> >          elif len(buf) == 0:
> >              return 0
> >
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__send_windows(buf)
> > +
> >          try:
> >              # Python 3 has separate types for strings and bytes.  We must have
> >              # bytes here.
> > @@ -240,6 +386,40 @@ class Stream(object):
> >          except socket.error as e:
> >              return -ovs.socket_util.get_exception_errno(e)
> >
> > +    def __send_windows(self, buf):
> > +        if self._write_pending:
> > +            try:
> > +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> > +                                                               self._write,
> > +                                                               False)
> > +                self._write_pending = False
> > +                return nBytesWritten
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self._read_pending = True
> > +                    return -errno.EAGAIN
> > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > +                    # If the pipe was disconnected, return connection reset.
> > +                    return -errno.ECONNRESET
> > +                else:
> > +                    return -errno.EINVAL
> > +
> > +        buf = winutils.get_encoded_buffer(buf)
> > +        self._write_pending = False
> > +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> > +                                                       buf,
> > +                                                       self._write)
> > +        if errCode:
> > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > +                self._write_pending = True
> > +                return -errno.EAGAIN
> > +            if (not nBytesWritten and
> > +                    errCode in winutils.pipe_disconnected_errors):
> > +                # If the pipe was disconnected, return connection reset.
> > +                return -errno.ECONNRESET
> > +        return nBytesWritten
> > +
> >      def run(self):
> >          pass
> >
> > @@ -255,11 +435,52 @@ class Stream(object):
> >
> >          if self.state == Stream.__S_CONNECTING:
> >              wait = Stream.W_CONNECT
> > +
> > +        if sys.platform == 'win32':
> > +            self.__wait_windows(poller, wait)
> > +            return
> > +
> >          if wait == Stream.W_RECV:
> >              poller.fd_wait(self.socket, ovs.poller.POLLIN)
> >          else:
> >              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> >
> > +    def __wait_windows(self, poller, wait):
> > +        if self.socket is not None:
> > +            if wait == Stream.W_RECV:
> > +                read_flags = (win32file.FD_READ |
> > +                              win32file.FD_ACCEPT |
> > +                              win32file.FD_CLOSE)
> > +                try:
> > +                    win32file.WSAEventSelect(self.socket,
> > +                                             self._read.hEvent,
> > +                                             read_flags)
> > +                except pywintypes.error as e:
> > +                    vlog.err("failed to associate events with socket: %s"
> > +                             % e.strerror)
> > +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > +            else:
> > +                write_flags = (win32file.FD_WRITE |
> > +                               win32file.FD_CONNECT |
> > +                               win32file.FD_CLOSE)
> > +                try:
> > +                    win32file.WSAEventSelect(self.socket,
> > +                                             self._write.hEvent,
> > +                                             write_flags)
> > +                except pywintypes.error as e:
> > +                    vlog.err("failed to associate events with socket: %s"
> > +                             % e.strerror)
> > +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > +        else:
> > +            if wait == Stream.W_RECV:
> > +                if self._read:
> > +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > +            elif wait == Stream.W_SEND:
> > +                if self._write:
> > +                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > +            elif wait == Stream.W_CONNECT:
> > +                return
> > +
> >      def connect_wait(self, poller):
> >          self.wait(poller, Stream.W_CONNECT)
> >
> > @@ -267,11 +488,22 @@ class Stream(object):
> >          self.wait(poller, Stream.W_RECV)
> >
> >      def send_wait(self, poller):
> > +        if sys.platform == 'win32':
> > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> >          self.wait(poller, Stream.W_SEND)
> >
> >      def __del__(self):
> >          # Don't delete the file: we might have forked.
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            # Check if there are any remaining valid handles and close them
> > +            if self.pipe:
> > +                winutils.close_handle(self.pipe)
> > +            if self._read.hEvent:
> > +                winutils.close_handle(self._read.hEvent)
> > +            if self._write.hEvent:
> > +                winutils.close_handle(self._write.hEvent)
> >
> >      @staticmethod
> >      def ssl_set_private_key_file(file_name):
> > @@ -287,6 +519,10 @@ class Stream(object):
> >
> >
> >  class PassiveStream(object):
> > +    # Windows only
> > +    connect = None                  # overlapped for read operation
> > +    connect_pending = False
> > +
> >      @staticmethod
> >      def is_valid_name(name):
> >          """Returns True if 'name' is a passive stream name in the form
> > @@ -294,9 +530,18 @@ class PassiveStream(object):
> >          "punix:" or "ptcp"), otherwise False."""
> >          return name.startswith("punix:") | name.startswith("ptcp:")
> >
> > -    def __init__(self, sock, name, bind_path):
> > +    def __init__(self, sock, name, bind_path, pipe=None):
> >          http://self.name = name
> > +        self.pipe = pipe
> >          self.socket = sock
> > +        if pipe is not None:
> > +            self.connect = pywintypes.OVERLAPPED()
> > +            self.connect.hEvent = winutils.get_new_event(bManualReset=True)
> > +            self.connect_pending = False
> > +            suffix = name.split(":", 1)[1]
> > +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > +            self._pipename = winutils.get_pipe_name(suffix)
> > +
> >          self.bind_path = bind_path
> >
> >      @staticmethod
> > @@ -315,11 +560,27 @@ class PassiveStream(object):
> >          bind_path = name[6:]
> >          if name.startswith("punix:"):
> >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> > -            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > -                                                           True, bind_path,
> > -                                                           None)
> > -            if error:
> > -                return error, None
> > +            if sys.platform != 'win32':
> > +                error, sock = ovs.socket_util.make_unix_socket(
> > +                    socket.SOCK_STREAM, True, bind_path, None)
> > +                if error:
> > +                    return error, None
> > +            else:
> > +                # Branch used only on Windows
> > +                try:
> > +                    open(bind_path, 'w').close()
> > +                except:
> > +                    return errno.ENOENT, None
> > +
> > +                pipename = winutils.get_pipe_name(bind_path)
> > +                if len(pipename) > 255:
> > +                    # Return invalid argument if the name is too long
> > +                    return errno.ENOENT, None
> > +
> > +                npipe = winutils.create_named_pipe(pipename)
> > +                if not npipe:
> > +                    return errno.ENOENT, None
> > +                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
> >
> >          elif name.startswith("ptcp:"):
> >              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > @@ -341,7 +602,11 @@ class PassiveStream(object):
> >
> >      def close(self):
> >          """Closes this PassiveStream."""
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            winutils.close_handle(self.pipe, vlog.warn)
> > +            winutils.close_handle(self.connect.hEvent, vlog.warn)
> >          if self.bind_path is not None:
> >              ovs.fatal_signal.unlink_file_now(self.bind_path)
> >              self.bind_path = None
> > @@ -354,28 +619,80 @@ class PassiveStream(object):
> >
> >          Will not block waiting for a connection.  If no connection is ready to
> >          be accepted, returns (errno.EAGAIN, None) immediately."""
> > -
> > +        if sys.platform == 'win32' and self.socket is None:
> > +            return self.__accept_windows()
> >          while True:
> >              try:
> >                  sock, addr = self.socket.accept()
> >                  ovs.socket_util.set_nonblocking(sock)
> > -                if (sock.family == socket.AF_UNIX):
> > +                if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
> >                      return 0, Stream(sock, "unix:%s" % addr, 0)
> >                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> >                                                         str(addr[1])), 0)
> >              except socket.error as e:
> >                  error = ovs.socket_util.get_exception_errno(e)
> > +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                    # for EAGAIN on Unix.
> > +                    error = errno.EAGAIN
> >                  if error != errno.EAGAIN:
> >                      # XXX rate-limit
> >                      vlog.dbg("accept: %s" % os.strerror(error))
> >                  return error, None
> >
> > +    def __accept_windows(self):
> > +        if self.connect_pending:
> > +            try:
> > +                winutils.get_overlapped_result(self.pipe, self.connect, False)
> > +            except pywintypes.error as e:
> > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > +                    # The operation is still pending, try again
> > +                    self.connect_pending = True
> > +                    return errno.EAGAIN, None
> > +                else:
> > +                    if self.pipe:
> > +                        win32pipe.DisconnectNamedPipe(self.pipe)
> > +                    return errno.EINVAL, None
> > +            self.connect_pending = False
> > +
> > +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> > +        if error:
> > +            if error == winutils.winerror.ERROR_IO_PENDING:
> > +                self.connect_pending = True
> > +                return errno.EAGAIN, None
> > +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> > +                if self.pipe:
> > +                    win32pipe.DisconnectNamedPipe(self.pipe)
> > +                self.connect_pending = False
> > +                return errno.EINVAL, None
> > +            else:
> > +                win32event.SetEvent(self.connect.hEvent)
> > +
> > +        npipe = winutils.create_named_pipe(self._pipename)
> > +        if not npipe:
> > +            return errno.ENOENT, None
> > +
> > +        old_pipe = self.pipe
> > +        self.pipe = npipe
> > +        winutils.win32event.ResetEvent(self.connect.hEvent)
> > +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> > +
> >      def wait(self, poller):
> > -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > +        if sys.platform != 'win32' or self.socket is not None:
> > +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > +        else:
> > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> >
> >      def __del__(self):
> >          # Don't delete the file: we might have forked.
> > -        self.socket.close()
> > +        if self.socket is not None:
> > +            self.socket.close()
> > +        if self.pipe is not None:
> > +            # Check if there are any remaining valid handles and close them
> > +            if self.pipe:
> > +                winutils.close_handle(self.pipe)
> > +            if self._connect.hEvent:
> > +                winutils.close_handle(self._read.hEvent)
> >
> >
> >  def usage(name):
> > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> > index 8595ed8..3f3e051 100644
> > --- a/python/ovs/unixctl/server.py
> > +++ b/python/ovs/unixctl/server.py
> > @@ -148,6 +148,10 @@ class UnixctlServer(object):
> >      def run(self):
> >          for _ in range(10):
> >              error, stream = self._listener.accept()
> > +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > +                # for EAGAIN on Unix.
> > +                error = errno.EAGAIN
> >              if not error:
> >                  rpc = ovs.jsonrpc.Connection(stream)
> >                  self._conns.append(UnixctlConnection(rpc))
> > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> > index 18634e6..3eabcd7 100644
> > --- a/tests/test-jsonrpc.py
> > +++ b/tests/test-jsonrpc.py
> > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> >
> >
> >  def do_listen(name):
> > -    error, pstream = ovs.stream.PassiveStream.open(name)
> > -    if error:
> > -        sys.stderr.write("could not listen on \"%s\": %s\n"
> > -                         % (name, os.strerror(error)))
> > -        sys.exit(1)
> > +    if sys.platform != 'win32' or (
> > +            ovs.daemon._detach and ovs.daemon._detached):
> > +        # On Windows the child is a new process created which should be the
> > +        # one that creates the PassiveStream. Without this check, the new
> > +        # child process will create a new PassiveStream overwriting the one
> > +        # that the parent process created.
> > +        error, pstream = ovs.stream.PassiveStream.open(name)
> > +        if error:
> > +            sys.stderr.write("could not listen on \"%s\": %s\n"
> > +                             % (name, os.strerror(error)))
> > +            sys.exit(1)
> >
> >      ovs.daemon.daemonize()
> >
> > --
> > 2.10.0.windows.1
> > _______________________________________________
> > dev mailing list
> > mailto:mailto:dev@openvswitch.org
> > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Gurucharan Shetty Jan. 3, 2017, 7:35 p.m. UTC | #4
On 3 January 2017 at 11:16, Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
wrote:

> Ok, I'll include that in the next version.
> Mind if I respin the series after you take a look over the rest of the
> patches?
>

I am done looking at the rest of the patches. When you respin, please fix
the spelling of "intented" in the first patch of the series. Also, in the
first patch, edit (and add) AUTHORS file to include yourself.

Thanks.


>
> > From: Guru Shetty [mailto:guru@ovn.org]
> > Sent: Tuesday, January 3, 2017 9:09 PM
> > To: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
> > Cc: dev@openvswitch.org
> > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets
> to Windows
> >
> >
> >
> > On 3 January 2017 at 11:03, Alin Balutoiu <mailto:abalutoiu@
> cloudbasesolutions.com> wrote:
> > Thanks for the comment.
> > The socket attribute of the class Stream cannot be None if the code runs
> on Unix.
> > Therefore the condition "self.socket is not None" is True only if
> sockets are not used,
> > and named pipes are used instead (i.e. it runs on Windows).
> >
> > However, if you prefer I can replace it with the following code:
> > I like it better with the below piece. So please go ahead and send it as
> part of the next version.
> >
> >     if self.socket is not None:
> >         retval = ovs.socket_util.check_connection_completion(self.
> socket)
> >         assert retval != errno.EINPROGRESS
> >     elif sys.platform == 'win32':
> >         if self.retry_connect:
> >             try:
> >                 self.pipe = winutils.create_file(self._pipename)
> >                 self._retry_connect = False
> >                 retval = 0
> >             except pywintypes.error as e:
> >                 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> >                     retval = errno.EAGAIN
> >                 else:
> >                     self._retry_connect = False
> >                     retval = errno.ENOENT
> >         else:
> >             # If retry_connect is false, it means it's already
> >             # connected so we can set the value of retval to 0
> >             retval = 0
> >
> >
> > > From: Guru Shetty [mailto:mailto:guru@ovn.org]
> > > Sent: Tuesday, January 3, 2017 8:17 PM
> > > To: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> > > Cc: mailto:dev@openvswitch.org
> > > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX
> sockets to Windows
> > >
> > >
> > >
> > > On 3 January 2017 at 08:46, Alin Balutoiu <mailto:mailto:abalutoiu@
> cloudbasesolutions.com> wrote:
> > > From: Alin Balutoiu <mailto:mailto:abalutoiu@cloudbasesolutions.com>
> > >
> > > Unix sockets (AF_UNIX) are not supported on Windows.
> > > The replacement of Unix sockets on Windows is implemented
> > > using named pipes, we are trying to mimic the behaviour
> > > of unix sockets.
> > >
> > > Instead of using Unix sockets to communicate
> > > between components Named Pipes are used. This
> > > makes the python sockets compatible with the
> > > Named Pipe used in Windows applications.
> > >
> > > Signed-off-by: Paul-Daniel Boca <mailto:mailto:pboca@
> cloudbasesolutions.com>
> > > Signed-off-by: Alin Balutoiu <mailto:mailto:abalutoiu@
> cloudbasesolutions.com>
> > > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > > ---
> > > V2: No changes.
> > > V3: Changed Signed-off-by name and added previous Acked-by's,
> Tested-by's.
> > >
> > > I intend to add the following diff:
> > >
> > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > > index e8e5700..af35afd 100644
> > > --- a/python/ovs/stream.py
> > > +++ b/python/ovs/stream.py
> > > @@ -251,7 +251,7 @@ class Stream(object):
> > >                  else:
> > >                      self._retry_connect = False
> > >                      retval = errno.ENOENT
> > > -        else:
> > > +        elif sys.platform == 'win32':
> > >              # Windows only, if retry_connect is false, it means it's
> already
> > >              # connected so we can set the value of retval to 0
> > >              retval = 0
> > >
> > >
> > > ---
> > >  python/ovs/jsonrpc.py        |   6 +
> > >  python/ovs/poller.py         |  72 ++++++---
> > >  python/ovs/socket_util.py    |  31 +++-
> > >  python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++
> ++++++++++---
> > >  python/ovs/unixctl/server.py |   4 +
> > >  tests/test-jsonrpc.py        |  16 +-
> > >  6 files changed, 436 insertions(+), 44 deletions(-)
> > >
> > > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > > index 6300c67..5a11500 100644
> > > --- a/python/ovs/jsonrpc.py
> > > +++ b/python/ovs/jsonrpc.py
> > > @@ -14,6 +14,7 @@
> > >
> > >  import errno
> > >  import os
> > > +import sys
> > >
> > >  import six
> > >
> > > @@ -274,6 +275,11 @@ class Connection(object):
> > >                      except UnicodeError:
> > >                          error = errno.EILSEQ
> > >                  if error:
> > > +                    if (sys.platform == "win32" and
> > > +                            error == errno.WSAEWOULDBLOCK):
> > > +                        # WSAEWOULDBLOCK would be the equivalent on
> Windows
> > > +                        # for EAGAIN on Unix.
> > > +                        error = errno.EAGAIN
> > >                      if error == errno.EAGAIN:
> > >                          return error, None
> > >                      else:
> > > diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> > > index d7cb7d3..d836483 100644
> > > --- a/python/ovs/poller.py
> > > +++ b/python/ovs/poller.py
> > > @@ -18,6 +18,10 @@ import ovs.vlog
> > >  import select
> > >  import socket
> > >  import os
> > > +import sys
> > > +
> > > +if sys.platform == "win32":
> > > +    import ovs.winutils as winutils
> > >
> > >  try:
> > >      from OpenSSL import SSL
> > > @@ -62,7 +66,9 @@ class _SelectSelect(object):
> > >          if SSL and isinstance(fd, SSL.Connection):
> > >              fd = fd.fileno()
> > >
> > > -        assert isinstance(fd, int)
> > > +        if sys.platform != 'win32':
> > > +            # Skip this on Windows, it also register events
> > > +            assert isinstance(fd, int)
> > >          if events & POLLIN:
> > >              self.rlist.append(fd)
> > >              events &= ~POLLIN
> > > @@ -73,28 +79,58 @@ class _SelectSelect(object):
> > >              self.xlist.append(fd)
> > >
> > >      def poll(self, timeout):
> > > -        if timeout == -1:
> > > -            # epoll uses -1 for infinite timeout, select uses None.
> > > -            timeout = None
> > > -        else:
> > > -            timeout = float(timeout) / 1000
> > >          # XXX workaround a bug in eventlet
> > >          # see https://github.com/eventlet/eventlet/pull/25
> > >          if timeout == 0 and _using_eventlet_green_select():
> > >              timeout = 0.1
> > > +        if sys.platform == 'win32':
> > > +            events = self.rlist + self.wlist + self.xlist
> > > +            if not events:
> > > +                return []
> > > +            if len(events) > winutils.win32event.MAXIMUM_
> WAIT_OBJECTS:
> > > +                raise WindowsError("Cannot handle more than maximum
> wait"
> > > +                                   "objects\n")
> > > +
> > > +            # win32event.INFINITE timeout is -1
> > > +            # timeout must be an int number, expressed in ms
> > > +            if timeout == 0.1:
> > > +                timeout = 100
> > > +            else:
> > > +                timeout = int(timeout)
> > > +
> > > +            # Wait until any of the events is set to signaled
> > > +            try:
> > > +                retval = winutils.win32event.WaitForMultipleObjects(
> > > +                    events,
> > > +                    False,  # Wait all
> > > +                    timeout)
> > > +            except winutils.pywintypes.error:
> > > +                    return [(0, POLLERR)]
> > >
> > > -        rlist, wlist, xlist = select.select(self.rlist, self.wlist,
> self.xlist,
> > > -                                            timeout)
> > > -        events_dict = {}
> > > -        for fd in rlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > > -        for fd in wlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > > -        for fd in xlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > > -                                                        POLLHUP |
> > > -                                                        POLLNVAL)
> > > -        return list(events_dict.items())
> > > +            if retval == winutils.winerror.WAIT_TIMEOUT:
> > > +                return []
> > > +
> > > +            return [(events[retval], 0)]
> > > +        else:
> > > +            if timeout == -1:
> > > +                # epoll uses -1 for infinite timeout, select uses
> None.
> > > +                timeout = None
> > > +            else:
> > > +                timeout = float(timeout) / 1000
> > > +            rlist, wlist, xlist = select.select(self.rlist,
> > > +                                                self.wlist,
> > > +                                                self.xlist,
> > > +                                                timeout)
> > > +            events_dict = {}
> > > +            for fd in rlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > > +            for fd in wlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > > +            for fd in xlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > > +                                                            POLLHUP |
> > > +                                                            POLLNVAL)
> > > +            return list(events_dict.items())
> > >
> > >
> > >  SelectPoll = _SelectSelect
> > > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> > > index b358b05..fb6cee4 100644
> > > --- a/python/ovs/socket_util.py
> > > +++ b/python/ovs/socket_util.py
> > > @@ -17,6 +17,7 @@ import os
> > >  import os.path
> > >  import random
> > >  import socket
> > > +import sys
> > >
> > >  import six
> > >  from six.moves import range
> > > @@ -25,6 +26,10 @@ import ovs.fatal_signal
> > >  import ovs.poller
> > >  import ovs.vlog
> > >
> > > +if sys.platform == 'win32':
> > > +    import ovs.winutils as winutils
> > > +    import win32file
> > > +
> > >  vlog = ovs.vlog.Vlog("socket_util")
> > >
> > >
> > > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path,
> connect_path, short=False):
> > >
> > >  def check_connection_completion(sock):
> > >      p = ovs.poller.SelectPoll()
> > > -    p.register(sock, ovs.poller.POLLOUT)
> > > +    if sys.platform == "win32":
> > > +        event = winutils.get_new_event(None, False, True, None)
> > > +        # Receive notification of readiness for writing, of completed
> > > +        # connection or multipoint join operation, and of socket
> closure.
> > > +        win32file.WSAEventSelect(sock, event,
> > > +                                 win32file.FD_WRITE |
> > > +                                 win32file.FD_CONNECT |
> > > +                                 win32file.FD_CLOSE)
> > > +        p.register(event, ovs.poller.POLLOUT)
> > > +    else:
> > > +        p.register(sock, ovs.poller.POLLOUT)
> > >      pfds = p.poll(0)
> > >      if len(pfds) == 1:
> > >          revents = pfds[0][1]
> > > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port,
> dscp):
> > >          try:
> > >              sock.connect(address)
> > >          except socket.error as e:
> > > -            if get_exception_errno(e) != errno.EINPROGRESS:
> > > +            error = get_exception_errno(e)
> > > +            if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> > > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                # for EINPROGRESS on Unix.
> > > +                error = errno.EINPROGRESS
> > > +            if error != errno.EINPROGRESS:
> > >                  raise
> > >          return 0, sock
> > >      except socket.error as e:
> > > @@ -257,9 +277,12 @@ def get_null_fd():
> > >      global null_fd
> > >      if null_fd < 0:
> > >          try:
> > > -            null_fd = os.open("/dev/null", os.O_RDWR)
> > > +            # os.devnull ensures compatibility with Windows, returns
> > > +            # '/dev/null' for Unix and 'nul' for Windows
> > > +            null_fd = os.open(os.devnull, os.O_RDWR)
> > >          except OSError as e:
> > > -            vlog.err("could not open /dev/null: %s" %
> os.strerror(e.errno))
> > > +            vlog.err("could not open %s: %s" % (os.devnull,
> > > +                                                os.strerror(e.errno)))
> > >              return -e.errno
> > >      return null_fd
> > >
> > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > > index cd57eb3..e8e5700 100644
> > > --- a/python/ovs/stream.py
> > > +++ b/python/ovs/stream.py
> > > @@ -15,6 +15,7 @@
> > >  import errno
> > >  import os
> > >  import socket
> > > +import sys
> > >
> > >  import six
> > >
> > > @@ -27,6 +28,13 @@ try:
> > >  except ImportError:
> > >      SSL = None
> > >
> > > +if sys.platform == 'win32':
> > > +    import ovs.winutils as winutils
> > > +    import pywintypes
> > > +    import win32event
> > > +    import win32file
> > > +    import win32pipe
> > > +
> > >  vlog = ovs.vlog.Vlog("stream")
> > >
> > >
> > > @@ -63,6 +71,13 @@ class Stream(object):
> > >      _SSL_certificate_file = None
> > >      _SSL_ca_cert_file = None
> > >
> > > +    # Windows only
> > > +    _write = None                # overlapped for write operation
> > > +    _read = None                 # overlapped for read operation
> > > +    _write_pending = False
> > > +    _read_pending = False
> > > +    _retry_connect = False
> > > +
> > >      @staticmethod
> > >      def register_method(method, cls):
> > >          Stream._SOCKET_METHODS[method + ":"] = cls
> > > @@ -81,8 +96,23 @@ class Stream(object):
> > >          otherwise False."""
> > >          return bool(Stream._find_method(name))
> > >
> > > -    def __init__(self, socket, name, status):
> > > +    def __init__(self, socket, name, status, pipe=None,
> is_server=False):
> > >          self.socket = socket
> > > +        self.pipe = pipe
> > > +        if sys.platform == 'win32':
> > > +            self._read = pywintypes.OVERLAPPED()
> > > +            self._read.hEvent = winutils.get_new_event()
> > > +            self._write = pywintypes.OVERLAPPED()
> > > +            self._write.hEvent = winutils.get_new_event()
> > > +            if pipe is not None:
> > > +                # Flag to check if fd is a server HANDLE.  In the
> case of a
> > > +                # server handle we have to issue a disconnect before
> closing
> > > +                # the actual handle.
> > > +                self._server = is_server
> > > +                suffix = name.split(":", 1)[1]
> > > +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> suffix)
> > > +                self._pipename = winutils.get_pipe_name(suffix)
> > > +
> > >          http://self.name = name
> > >          if status == errno.EAGAIN:
> > >              self.state = Stream.__S_CONNECTING
> > > @@ -120,6 +150,38 @@ class Stream(object):
> > >          suffix = name.split(":", 1)[1]
> > >          if name.startswith("unix:"):
> > >              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +            if sys.platform == 'win32':
> > > +                pipename = winutils.get_pipe_name(suffix)
> > > +
> > > +                if len(suffix) > 255:
> > > +                    # Return invalid argument if the name is too long
> > > +                    return errno.ENOENT, None
> > > +
> > > +                try:
> > > +                    # In case of "unix:" argument, the assumption is
> that
> > > +                    # there is a file created in the path (suffix).
> > > +                    open(suffix, 'r').close()
> > > +                except:
> > > +                    return errno.ENOENT, None
> > > +
> > > +                try:
> > > +                    npipe = winutils.create_file(pipename)
> > > +                    try:
> > > +                        winutils.set_pipe_mode(npipe,
> > > +
>  win32pipe.PIPE_READMODE_BYTE)
> > > +                    except pywintypes.error as e:
> > > +                        return errno.ENOENT, None
> > > +                except pywintypes.error as e:
> > > +                    if e.winerror == winutils.winerror.ERROR_PIPE_
> BUSY:
> > > +                        # Pipe is busy, set the retry flag to true
> and retry
> > > +                        # again during the connect function.
> > > +                        Stream.retry_connect = True
> > > +                        return 0, cls(None, name, errno.EAGAIN,
> > > +                                      pipe=win32file.INVALID_HANDLE_
> VALUE,
> > > +                                      is_server=False)
> > > +                    return errno.ENOENT, None
> > > +                return 0, cls(None, name, 0, pipe=npipe,
> is_server=False)
> > > +
> > >          error, sock = cls._open(suffix, dscp)
> > >          if error:
> > >              return error, None
> > > @@ -145,6 +207,10 @@ class Stream(object):
> > >          if not error:
> > >              while True:
> > >                  error = stream.connect()
> > > +                if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> > > +                    # WSAEWOULDBLOCK would be the equivalent on
> Windows
> > > +                    # for EAGAIN on Unix.
> > > +                    error = errno.EAGAIN
> > >                  if error != errno.EAGAIN:
> > >                      break
> > >                  stream.run()
> > > @@ -152,7 +218,8 @@ class Stream(object):
> > >                  stream.run_wait(poller)
> > >                  stream.connect_wait(poller)
> > >                  poller.block()
> > > -            assert error != errno.EINPROGRESS
> > > +            if stream.socket is not None:
> > > +                assert error != errno.EINPROGRESS
> > >
> > >          if error and stream:
> > >              stream.close()
> > > @@ -160,11 +227,35 @@ class Stream(object):
> > >          return error, stream
> > >
> > >      def close(self):
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            if self._server:
> > > +                win32pipe.DisconnectNamedPipe(self.pipe)
> > > +            winutils.close_handle(self.pipe, vlog.warn)
> > > +            winutils.close_handle(self._read.hEvent, vlog.warn)
> > > +            winutils.close_handle(self._write.hEvent, vlog.warn)
> > >
> > >      def __scs_connecting(self):
> > > -        retval = ovs.socket_util.check_connection_completion(self.
> socket)
> > > -        assert retval != errno.EINPROGRESS
> > > +        if self.socket is not None:
> > > +            retval = ovs.socket_util.check_
> connection_completion(self.socket)
> > > +            assert retval != errno.EINPROGRESS
> > > +        elif sys.platform == 'win32' and self.retry_connect:
> > > +            try:
> > > +                self.pipe = winutils.create_file(self._pipename)
> > > +                self._retry_connect = False
> > > +                retval = 0
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > > +                    retval = errno.EAGAIN
> > > +                else:
> > > +                    self._retry_connect = False
> > > +                    retval = errno.ENOENT
> > > +        else:
> > > +            # Windows only, if retry_connect is false, it means it's
> already
> > > +            # connected so we can set the value of retval to 0
> > > +            retval = 0
> > > +
> > >          if retval == 0:
> > >              self.state = Stream.__S_CONNECTED
> > >          elif retval != errno.EAGAIN:
> > > @@ -209,11 +300,63 @@ class Stream(object):
> > >          elif n == 0:
> > >              return (0, "")
> > >
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__recv_windows(n)
> > > +
> > >          try:
> > >              return (0, self.socket.recv(n))
> > >          except socket.error as e:
> > >              return (ovs.socket_util.get_exception_errno(e), "")
> > >
> > > +    def __recv_windows(self, n):
> > > +        if self._read_pending:
> > > +            try:
> > > +                nBytesRead = winutils.get_overlapped_
> result(self.pipe,
> > > +
> self._read,
> > > +                                                            False)
> > > +                self._read_pending = False
> > > +                recvBuffer = self._read_buffer[:nBytesRead]
> > > +
> > > +                return (0, winutils.get_decoded_buffer(recvBuffer))
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_
> INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self._read_pending = True
> > > +                    return (errno.EAGAIN, "")
> > > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > > +                    # If the pipe was disconnected, return 0.
> > > +                    return (0, "")
> > > +                else:
> > > +                    return (errno.EINVAL, "")
> > > +
> > > +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> > > +                                                          n,
> > > +                                                          self._read)
> > > +        if errCode:
> > > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > > +                self._read_pending = True
> > > +                return (errno.EAGAIN, "")
> > > +            elif errCode in winutils.pipe_disconnected_errors:
> > > +                # If the pipe was disconnected, return 0.
> > > +                return (0, "")
> > > +            else:
> > > +                return (errCode, "")
> > > +
> > > +        try:
> > > +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> > > +                                                        self._read,
> > > +                                                        False)
> > > +            winutils.win32event.SetEvent(self._read.hEvent)
> > > +        except pywintypes.error as e:
> > > +            if e.winerror in winutils.pipe_disconnected_errors:
> > > +                # If the pipe was disconnected, return 0.
> > > +                return (0, "")
> > > +            else:
> > > +                return (e.winerror, "")
> > > +
> > > +        recvBuffer = self._read_buffer[:nBytesRead]
> > > +        return (0, winutils.get_decoded_buffer(recvBuffer))
> > > +
> > >      def send(self, buf):
> > >          """Tries to send 'buf' on this stream.
> > >
> > > @@ -231,6 +374,9 @@ class Stream(object):
> > >          elif len(buf) == 0:
> > >              return 0
> > >
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__send_windows(buf)
> > > +
> > >          try:
> > >              # Python 3 has separate types for strings and bytes.  We
> must have
> > >              # bytes here.
> > > @@ -240,6 +386,40 @@ class Stream(object):
> > >          except socket.error as e:
> > >              return -ovs.socket_util.get_exception_errno(e)
> > >
> > > +    def __send_windows(self, buf):
> > > +        if self._write_pending:
> > > +            try:
> > > +                nBytesWritten = winutils.get_overlapped_
> result(self.pipe,
> > > +
>  self._write,
> > > +                                                               False)
> > > +                self._write_pending = False
> > > +                return nBytesWritten
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_
> INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self._read_pending = True
> > > +                    return -errno.EAGAIN
> > > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > > +                    # If the pipe was disconnected, return connection
> reset.
> > > +                    return -errno.ECONNRESET
> > > +                else:
> > > +                    return -errno.EINVAL
> > > +
> > > +        buf = winutils.get_encoded_buffer(buf)
> > > +        self._write_pending = False
> > > +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> > > +                                                       buf,
> > > +                                                       self._write)
> > > +        if errCode:
> > > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > > +                self._write_pending = True
> > > +                return -errno.EAGAIN
> > > +            if (not nBytesWritten and
> > > +                    errCode in winutils.pipe_disconnected_errors):
> > > +                # If the pipe was disconnected, return connection
> reset.
> > > +                return -errno.ECONNRESET
> > > +        return nBytesWritten
> > > +
> > >      def run(self):
> > >          pass
> > >
> > > @@ -255,11 +435,52 @@ class Stream(object):
> > >
> > >          if self.state == Stream.__S_CONNECTING:
> > >              wait = Stream.W_CONNECT
> > > +
> > > +        if sys.platform == 'win32':
> > > +            self.__wait_windows(poller, wait)
> > > +            return
> > > +
> > >          if wait == Stream.W_RECV:
> > >              poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > >          else:
> > >              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> > >
> > > +    def __wait_windows(self, poller, wait):
> > > +        if self.socket is not None:
> > > +            if wait == Stream.W_RECV:
> > > +                read_flags = (win32file.FD_READ |
> > > +                              win32file.FD_ACCEPT |
> > > +                              win32file.FD_CLOSE)
> > > +                try:
> > > +                    win32file.WSAEventSelect(self.socket,
> > > +                                             self._read.hEvent,
> > > +                                             read_flags)
> > > +                except pywintypes.error as e:
> > > +                    vlog.err("failed to associate events with socket:
> %s"
> > > +                             % e.strerror)
> > > +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > > +            else:
> > > +                write_flags = (win32file.FD_WRITE |
> > > +                               win32file.FD_CONNECT |
> > > +                               win32file.FD_CLOSE)
> > > +                try:
> > > +                    win32file.WSAEventSelect(self.socket,
> > > +                                             self._write.hEvent,
> > > +                                             write_flags)
> > > +                except pywintypes.error as e:
> > > +                    vlog.err("failed to associate events with socket:
> %s"
> > > +                             % e.strerror)
> > > +                poller.fd_wait(self._write.hEvent,
> ovs.poller.POLLOUT)
> > > +        else:
> > > +            if wait == Stream.W_RECV:
> > > +                if self._read:
> > > +                    poller.fd_wait(self._read.hEvent,
> ovs.poller.POLLIN)
> > > +            elif wait == Stream.W_SEND:
> > > +                if self._write:
> > > +                    poller.fd_wait(self._write.hEvent,
> ovs.poller.POLLOUT)
> > > +            elif wait == Stream.W_CONNECT:
> > > +                return
> > > +
> > >      def connect_wait(self, poller):
> > >          self.wait(poller, Stream.W_CONNECT)
> > >
> > > @@ -267,11 +488,22 @@ class Stream(object):
> > >          self.wait(poller, Stream.W_RECV)
> > >
> > >      def send_wait(self, poller):
> > > +        if sys.platform == 'win32':
> > > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> > >          self.wait(poller, Stream.W_SEND)
> > >
> > >      def __del__(self):
> > >          # Don't delete the file: we might have forked.
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            # Check if there are any remaining valid handles and
> close them
> > > +            if self.pipe:
> > > +                winutils.close_handle(self.pipe)
> > > +            if self._read.hEvent:
> > > +                winutils.close_handle(self._read.hEvent)
> > > +            if self._write.hEvent:
> > > +                winutils.close_handle(self._write.hEvent)
> > >
> > >      @staticmethod
> > >      def ssl_set_private_key_file(file_name):
> > > @@ -287,6 +519,10 @@ class Stream(object):
> > >
> > >
> > >  class PassiveStream(object):
> > > +    # Windows only
> > > +    connect = None                  # overlapped for read operation
> > > +    connect_pending = False
> > > +
> > >      @staticmethod
> > >      def is_valid_name(name):
> > >          """Returns True if 'name' is a passive stream name in the form
> > > @@ -294,9 +530,18 @@ class PassiveStream(object):
> > >          "punix:" or "ptcp"), otherwise False."""
> > >          return name.startswith("punix:") | name.startswith("ptcp:")
> > >
> > > -    def __init__(self, sock, name, bind_path):
> > > +    def __init__(self, sock, name, bind_path, pipe=None):
> > >          http://self.name = name
> > > +        self.pipe = pipe
> > >          self.socket = sock
> > > +        if pipe is not None:
> > > +            self.connect = pywintypes.OVERLAPPED()
> > > +            self.connect.hEvent = winutils.get_new_event(
> bManualReset=True)
> > > +            self.connect_pending = False
> > > +            suffix = name.split(":", 1)[1]
> > > +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +            self._pipename = winutils.get_pipe_name(suffix)
> > > +
> > >          self.bind_path = bind_path
> > >
> > >      @staticmethod
> > > @@ -315,11 +560,27 @@ class PassiveStream(object):
> > >          bind_path = name[6:]
> > >          if name.startswith("punix:"):
> > >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> bind_path)
> > > -            error, sock = ovs.socket_util.make_unix_
> socket(socket.SOCK_STREAM,
> > > -                                                           True,
> bind_path,
> > > -                                                           None)
> > > -            if error:
> > > -                return error, None
> > > +            if sys.platform != 'win32':
> > > +                error, sock = ovs.socket_util.make_unix_socket(
> > > +                    socket.SOCK_STREAM, True, bind_path, None)
> > > +                if error:
> > > +                    return error, None
> > > +            else:
> > > +                # Branch used only on Windows
> > > +                try:
> > > +                    open(bind_path, 'w').close()
> > > +                except:
> > > +                    return errno.ENOENT, None
> > > +
> > > +                pipename = winutils.get_pipe_name(bind_path)
> > > +                if len(pipename) > 255:
> > > +                    # Return invalid argument if the name is too long
> > > +                    return errno.ENOENT, None
> > > +
> > > +                npipe = winutils.create_named_pipe(pipename)
> > > +                if not npipe:
> > > +                    return errno.ENOENT, None
> > > +                return 0, PassiveStream(None, name, bind_path,
> pipe=npipe)
> > >
> > >          elif name.startswith("ptcp:"):
> > >              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > > @@ -341,7 +602,11 @@ class PassiveStream(object):
> > >
> > >      def close(self):
> > >          """Closes this PassiveStream."""
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            winutils.close_handle(self.pipe, vlog.warn)
> > > +            winutils.close_handle(self.connect.hEvent, vlog.warn)
> > >          if self.bind_path is not None:
> > >              ovs.fatal_signal.unlink_file_now(self.bind_path)
> > >              self.bind_path = None
> > > @@ -354,28 +619,80 @@ class PassiveStream(object):
> > >
> > >          Will not block waiting for a connection.  If no connection is
> ready to
> > >          be accepted, returns (errno.EAGAIN, None) immediately."""
> > > -
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__accept_windows()
> > >          while True:
> > >              try:
> > >                  sock, addr = self.socket.accept()
> > >                  ovs.socket_util.set_nonblocking(sock)
> > > -                if (sock.family == socket.AF_UNIX):
> > > +                if (sys.platform != 'win32' and sock.family ==
> socket.AF_UNIX):
> > >                      return 0, Stream(sock, "unix:%s" % addr, 0)
> > >                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> > >                                                         str(addr[1])),
> 0)
> > >              except socket.error as e:
> > >                  error = ovs.socket_util.get_exception_errno(e)
> > > +                if sys.platform == 'win32' and error ==
> errno.WSAEWOULDBLOCK:
> > > +                    # WSAEWOULDBLOCK would be the equivalent on
> Windows
> > > +                    # for EAGAIN on Unix.
> > > +                    error = errno.EAGAIN
> > >                  if error != errno.EAGAIN:
> > >                      # XXX rate-limit
> > >                      vlog.dbg("accept: %s" % os.strerror(error))
> > >                  return error, None
> > >
> > > +    def __accept_windows(self):
> > > +        if self.connect_pending:
> > > +            try:
> > > +                winutils.get_overlapped_result(self.pipe,
> self.connect, False)
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_
> INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self.connect_pending = True
> > > +                    return errno.EAGAIN, None
> > > +                else:
> > > +                    if self.pipe:
> > > +                        win32pipe.DisconnectNamedPipe(self.pipe)
> > > +                    return errno.EINVAL, None
> > > +            self.connect_pending = False
> > > +
> > > +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> > > +        if error:
> > > +            if error == winutils.winerror.ERROR_IO_PENDING:
> > > +                self.connect_pending = True
> > > +                return errno.EAGAIN, None
> > > +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> > > +                if self.pipe:
> > > +                    win32pipe.DisconnectNamedPipe(self.pipe)
> > > +                self.connect_pending = False
> > > +                return errno.EINVAL, None
> > > +            else:
> > > +                win32event.SetEvent(self.connect.hEvent)
> > > +
> > > +        npipe = winutils.create_named_pipe(self._pipename)
> > > +        if not npipe:
> > > +            return errno.ENOENT, None
> > > +
> > > +        old_pipe = self.pipe
> > > +        self.pipe = npipe
> > > +        winutils.win32event.ResetEvent(self.connect.hEvent)
> > > +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> > > +
> > >      def wait(self, poller):
> > > -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > > +        if sys.platform != 'win32' or self.socket is not None:
> > > +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > > +        else:
> > > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> > >
> > >      def __del__(self):
> > >          # Don't delete the file: we might have forked.
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            # Check if there are any remaining valid handles and
> close them
> > > +            if self.pipe:
> > > +                winutils.close_handle(self.pipe)
> > > +            if self._connect.hEvent:
> > > +                winutils.close_handle(self._read.hEvent)
> > >
> > >
> > >  def usage(name):
> > > diff --git a/python/ovs/unixctl/server.py
> b/python/ovs/unixctl/server.py
> > > index 8595ed8..3f3e051 100644
> > > --- a/python/ovs/unixctl/server.py
> > > +++ b/python/ovs/unixctl/server.py
> > > @@ -148,6 +148,10 @@ class UnixctlServer(object):
> > >      def run(self):
> > >          for _ in range(10):
> > >              error, stream = self._listener.accept()
> > > +            if sys.platform == "win32" and error ==
> errno.WSAEWOULDBLOCK:
> > > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                # for EAGAIN on Unix.
> > > +                error = errno.EAGAIN
> > >              if not error:
> > >                  rpc = ovs.jsonrpc.Connection(stream)
> > >                  self._conns.append(UnixctlConnection(rpc))
> > > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> > > index 18634e6..3eabcd7 100644
> > > --- a/tests/test-jsonrpc.py
> > > +++ b/tests/test-jsonrpc.py
> > > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> > >
> > >
> > >  def do_listen(name):
> > > -    error, pstream = ovs.stream.PassiveStream.open(name)
> > > -    if error:
> > > -        sys.stderr.write("could not listen on \"%s\": %s\n"
> > > -                         % (name, os.strerror(error)))
> > > -        sys.exit(1)
> > > +    if sys.platform != 'win32' or (
> > > +            ovs.daemon._detach and ovs.daemon._detached):
> > > +        # On Windows the child is a new process created which should
> be the
> > > +        # one that creates the PassiveStream. Without this check, the
> new
> > > +        # child process will create a new PassiveStream overwriting
> the one
> > > +        # that the parent process created.
> > > +        error, pstream = ovs.stream.PassiveStream.open(name)
> > > +        if error:
> > > +            sys.stderr.write("could not listen on \"%s\": %s\n"
> > > +                             % (name, os.strerror(error)))
> > > +            sys.exit(1)
> > >
> > >      ovs.daemon.daemonize()
> > >
> > > --
> > > 2.10.0.windows.1
> > > _______________________________________________
> > > dev mailing list
> > > mailto:mailto:dev@openvswitch.org
> > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
Alin Balutoiu Jan. 3, 2017, 7:55 p.m. UTC | #5
Thanks for the review, I will send the patches soon.

> From: Guru Shetty [mailto:guru@ovn.org] 
> Sent: Tuesday, January 3, 2017 9:35 PM
> To: Alin Balutoiu <abalutoiu@cloudbasesolutions.com>
> Cc: dev@openvswitch.org
> Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to Windows
> 
> 
> 
> On 3 January 2017 at 11:16, Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com> wrote:
> Ok, I'll include that in the next version.
> Mind if I respin the series after you take a look over the rest of the patches?
> 
> I am done looking at the rest of the patches. When you respin, please fix the spelling of "intented" in the first patch of the series. Also, in the first patch, edit (and add) AUTHORS file to include yourself. 
> 
> Thanks.
>  
> 
> > From: Guru Shetty [mailto:mailto:guru@ovn.org]
> > Sent: Tuesday, January 3, 2017 9:09 PM
> > To: Alin Balutoiu <mailto:abalutoiu@cloudbasesolutions.com>
> > Cc: mailto:dev@openvswitch.org
> > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to Windows
> >
> >
> >
> > On 3 January 2017 at 11:03, Alin Balutoiu <mailto:mailto:abalutoiu@cloudbasesolutions.com> wrote:
> > Thanks for the comment.
> > The socket attribute of the class Stream cannot be None if the code runs on Unix.
> > Therefore the condition "self.socket is not None" is True only if sockets are not used,
> > and named pipes are used instead (i.e. it runs on Windows).
> >
> > However, if you prefer I can replace it with the following code:
> > I like it better with the below piece. So please go ahead and send it as part of the next version.
> >
> >     if self.socket is not None:
> >         retval = ovs.socket_util.check_connection_completion(self.socket)
> >         assert retval != errno.EINPROGRESS
> >     elif sys.platform == 'win32':
> >         if self.retry_connect:
> >             try:
> >                 self.pipe = winutils.create_file(self._pipename)
> >                 self._retry_connect = False
> >                 retval = 0
> >             except pywintypes.error as e:
> >                 if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> >                     retval = errno.EAGAIN
> >                 else:
> >                     self._retry_connect = False
> >                     retval = errno.ENOENT
> >         else:
> >             # If retry_connect is false, it means it's already
> >             # connected so we can set the value of retval to 0
> >             retval = 0
> >
> >
> > > From: Guru Shetty [mailto:mailto:mailto:mailto:guru@ovn.org]
> > > Sent: Tuesday, January 3, 2017 8:17 PM
> > > To: Alin Balutoiu <mailto:mailto:abalutoiu@cloudbasesolutions.com>
> > > Cc: mailto:mailto:dev@openvswitch.org
> > > Subject: Re: [ovs-dev] [PATCH V3 2/5] Python tests: Ported UNIX sockets to Windows
> > >
> > >
> > >
> > > On 3 January 2017 at 08:46, Alin Balutoiu <mailto:mailto:mailto:mailto:abalutoiu@cloudbasesolutions.com> wrote:
> > > From: Alin Balutoiu <mailto:mailto:mailto:mailto:abalutoiu@cloudbasesolutions.com>
> > >
> > > Unix sockets (AF_UNIX) are not supported on Windows.
> > > The replacement of Unix sockets on Windows is implemented
> > > using named pipes, we are trying to mimic the behaviour
> > > of unix sockets.
> > >
> > > Instead of using Unix sockets to communicate
> > > between components Named Pipes are used. This
> > > makes the python sockets compatible with the
> > > Named Pipe used in Windows applications.
> > >
> > > Signed-off-by: Paul-Daniel Boca <mailto:mailto:mailto:mailto:pboca@cloudbasesolutions.com>
> > > Signed-off-by: Alin Balutoiu <mailto:mailto:mailto:mailto:abalutoiu@cloudbasesolutions.com>
> > > Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > > Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions>
> > > ---
> > > V2: No changes.
> > > V3: Changed Signed-off-by name and added previous Acked-by's, Tested-by's.
> > >
> > > I intend to add the following diff:
> > >
> > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > > index e8e5700..af35afd 100644
> > > --- a/python/ovs/stream.py
> > > +++ b/python/ovs/stream.py
> > > @@ -251,7 +251,7 @@ class Stream(object):
> > >                  else:
> > >                      self._retry_connect = False
> > >                      retval = errno.ENOENT
> > > -        else:
> > > +        elif sys.platform == 'win32':
> > >              # Windows only, if retry_connect is false, it means it's already
> > >              # connected so we can set the value of retval to 0
> > >              retval = 0
> > >
> > >
> > > ---
> > >  python/ovs/jsonrpc.py        |   6 +
> > >  python/ovs/poller.py         |  72 ++++++---
> > >  python/ovs/socket_util.py    |  31 +++-
> > >  python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++++++++++++---
> > >  python/ovs/unixctl/server.py |   4 +
> > >  tests/test-jsonrpc.py        |  16 +-
> > >  6 files changed, 436 insertions(+), 44 deletions(-)
> > >
> > > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > > index 6300c67..5a11500 100644
> > > --- a/python/ovs/jsonrpc.py
> > > +++ b/python/ovs/jsonrpc.py
> > > @@ -14,6 +14,7 @@
> > >
> > >  import errno
> > >  import os
> > > +import sys
> > >
> > >  import six
> > >
> > > @@ -274,6 +275,11 @@ class Connection(object):
> > >                      except UnicodeError:
> > >                          error = errno.EILSEQ
> > >                  if error:
> > > +                    if (sys.platform == "win32" and
> > > +                            error == errno.WSAEWOULDBLOCK):
> > > +                        # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                        # for EAGAIN on Unix.
> > > +                        error = errno.EAGAIN
> > >                      if error == errno.EAGAIN:
> > >                          return error, None
> > >                      else:
> > > diff --git a/python/ovs/poller.py b/python/ovs/poller.py
> > > index d7cb7d3..d836483 100644
> > > --- a/python/ovs/poller.py
> > > +++ b/python/ovs/poller.py
> > > @@ -18,6 +18,10 @@ import ovs.vlog
> > >  import select
> > >  import socket
> > >  import os
> > > +import sys
> > > +
> > > +if sys.platform == "win32":
> > > +    import ovs.winutils as winutils
> > >
> > >  try:
> > >      from OpenSSL import SSL
> > > @@ -62,7 +66,9 @@ class _SelectSelect(object):
> > >          if SSL and isinstance(fd, SSL.Connection):
> > >              fd = fd.fileno()
> > >
> > > -        assert isinstance(fd, int)
> > > +        if sys.platform != 'win32':
> > > +            # Skip this on Windows, it also register events
> > > +            assert isinstance(fd, int)
> > >          if events & POLLIN:
> > >              self.rlist.append(fd)
> > >              events &= ~POLLIN
> > > @@ -73,28 +79,58 @@ class _SelectSelect(object):
> > >              self.xlist.append(fd)
> > >
> > >      def poll(self, timeout):
> > > -        if timeout == -1:
> > > -            # epoll uses -1 for infinite timeout, select uses None.
> > > -            timeout = None
> > > -        else:
> > > -            timeout = float(timeout) / 1000
> > >          # XXX workaround a bug in eventlet
> > >          # see https://github.com/eventlet/eventlet/pull/25
> > >          if timeout == 0 and _using_eventlet_green_select():
> > >              timeout = 0.1
> > > +        if sys.platform == 'win32':
> > > +            events = self.rlist + self.wlist + self.xlist
> > > +            if not events:
> > > +                return []
> > > +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> > > +                raise WindowsError("Cannot handle more than maximum wait"
> > > +                                   "objects\n")
> > > +
> > > +            # win32event.INFINITE timeout is -1
> > > +            # timeout must be an int number, expressed in ms
> > > +            if timeout == 0.1:
> > > +                timeout = 100
> > > +            else:
> > > +                timeout = int(timeout)
> > > +
> > > +            # Wait until any of the events is set to signaled
> > > +            try:
> > > +                retval = winutils.win32event.WaitForMultipleObjects(
> > > +                    events,
> > > +                    False,  # Wait all
> > > +                    timeout)
> > > +            except winutils.pywintypes.error:
> > > +                    return [(0, POLLERR)]
> > >
> > > -        rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
> > > -                                            timeout)
> > > -        events_dict = {}
> > > -        for fd in rlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > > -        for fd in wlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > > -        for fd in xlist:
> > > -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > > -                                                        POLLHUP |
> > > -                                                        POLLNVAL)
> > > -        return list(events_dict.items())
> > > +            if retval == winutils.winerror.WAIT_TIMEOUT:
> > > +                return []
> > > +
> > > +            return [(events[retval], 0)]
> > > +        else:
> > > +            if timeout == -1:
> > > +                # epoll uses -1 for infinite timeout, select uses None.
> > > +                timeout = None
> > > +            else:
> > > +                timeout = float(timeout) / 1000
> > > +            rlist, wlist, xlist = select.select(self.rlist,
> > > +                                                self.wlist,
> > > +                                                self.xlist,
> > > +                                                timeout)
> > > +            events_dict = {}
> > > +            for fd in rlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> > > +            for fd in wlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> > > +            for fd in xlist:
> > > +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> > > +                                                            POLLHUP |
> > > +                                                            POLLNVAL)
> > > +            return list(events_dict.items())
> > >
> > >
> > >  SelectPoll = _SelectSelect
> > > diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
> > > index b358b05..fb6cee4 100644
> > > --- a/python/ovs/socket_util.py
> > > +++ b/python/ovs/socket_util.py
> > > @@ -17,6 +17,7 @@ import os
> > >  import os.path
> > >  import random
> > >  import socket
> > > +import sys
> > >
> > >  import six
> > >  from six.moves import range
> > > @@ -25,6 +26,10 @@ import ovs.fatal_signal
> > >  import ovs.poller
> > >  import ovs.vlog
> > >
> > > +if sys.platform == 'win32':
> > > +    import ovs.winutils as winutils
> > > +    import win32file
> > > +
> > >  vlog = ovs.vlog.Vlog("socket_util")
> > >
> > >
> > > @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False):
> > >
> > >  def check_connection_completion(sock):
> > >      p = ovs.poller.SelectPoll()
> > > -    p.register(sock, ovs.poller.POLLOUT)
> > > +    if sys.platform == "win32":
> > > +        event = winutils.get_new_event(None, False, True, None)
> > > +        # Receive notification of readiness for writing, of completed
> > > +        # connection or multipoint join operation, and of socket closure.
> > > +        win32file.WSAEventSelect(sock, event,
> > > +                                 win32file.FD_WRITE |
> > > +                                 win32file.FD_CONNECT |
> > > +                                 win32file.FD_CLOSE)
> > > +        p.register(event, ovs.poller.POLLOUT)
> > > +    else:
> > > +        p.register(sock, ovs.poller.POLLOUT)
> > >      pfds = p.poll(0)
> > >      if len(pfds) == 1:
> > >          revents = pfds[0][1]
> > > @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, dscp):
> > >          try:
> > >              sock.connect(address)
> > >          except socket.error as e:
> > > -            if get_exception_errno(e) != errno.EINPROGRESS:
> > > +            error = get_exception_errno(e)
> > > +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                # for EINPROGRESS on Unix.
> > > +                error = errno.EINPROGRESS
> > > +            if error != errno.EINPROGRESS:
> > >                  raise
> > >          return 0, sock
> > >      except socket.error as e:
> > > @@ -257,9 +277,12 @@ def get_null_fd():
> > >      global null_fd
> > >      if null_fd < 0:
> > >          try:
> > > -            null_fd = os.open("/dev/null", os.O_RDWR)
> > > +            # os.devnull ensures compatibility with Windows, returns
> > > +            # '/dev/null' for Unix and 'nul' for Windows
> > > +            null_fd = os.open(os.devnull, os.O_RDWR)
> > >          except OSError as e:
> > > -            vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
> > > +            vlog.err("could not open %s: %s" % (os.devnull,
> > > +                                                os.strerror(e.errno)))
> > >              return -e.errno
> > >      return null_fd
> > >
> > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > > index cd57eb3..e8e5700 100644
> > > --- a/python/ovs/stream.py
> > > +++ b/python/ovs/stream.py
> > > @@ -15,6 +15,7 @@
> > >  import errno
> > >  import os
> > >  import socket
> > > +import sys
> > >
> > >  import six
> > >
> > > @@ -27,6 +28,13 @@ try:
> > >  except ImportError:
> > >      SSL = None
> > >
> > > +if sys.platform == 'win32':
> > > +    import ovs.winutils as winutils
> > > +    import pywintypes
> > > +    import win32event
> > > +    import win32file
> > > +    import win32pipe
> > > +
> > >  vlog = ovs.vlog.Vlog("stream")
> > >
> > >
> > > @@ -63,6 +71,13 @@ class Stream(object):
> > >      _SSL_certificate_file = None
> > >      _SSL_ca_cert_file = None
> > >
> > > +    # Windows only
> > > +    _write = None                # overlapped for write operation
> > > +    _read = None                 # overlapped for read operation
> > > +    _write_pending = False
> > > +    _read_pending = False
> > > +    _retry_connect = False
> > > +
> > >      @staticmethod
> > >      def register_method(method, cls):
> > >          Stream._SOCKET_METHODS[method + ":"] = cls
> > > @@ -81,8 +96,23 @@ class Stream(object):
> > >          otherwise False."""
> > >          return bool(Stream._find_method(name))
> > >
> > > -    def __init__(self, socket, name, status):
> > > +    def __init__(self, socket, name, status, pipe=None, is_server=False):
> > >          self.socket = socket
> > > +        self.pipe = pipe
> > > +        if sys.platform == 'win32':
> > > +            self._read = pywintypes.OVERLAPPED()
> > > +            self._read.hEvent = winutils.get_new_event()
> > > +            self._write = pywintypes.OVERLAPPED()
> > > +            self._write.hEvent = winutils.get_new_event()
> > > +            if pipe is not None:
> > > +                # Flag to check if fd is a server HANDLE.  In the case of a
> > > +                # server handle we have to issue a disconnect before closing
> > > +                # the actual handle.
> > > +                self._server = is_server
> > > +                suffix = name.split(":", 1)[1]
> > > +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +                self._pipename = winutils.get_pipe_name(suffix)
> > > +
> > >          http://self.name = name
> > >          if status == errno.EAGAIN:
> > >              self.state = Stream.__S_CONNECTING
> > > @@ -120,6 +150,38 @@ class Stream(object):
> > >          suffix = name.split(":", 1)[1]
> > >          if name.startswith("unix:"):
> > >              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +            if sys.platform == 'win32':
> > > +                pipename = winutils.get_pipe_name(suffix)
> > > +
> > > +                if len(suffix) > 255:
> > > +                    # Return invalid argument if the name is too long
> > > +                    return errno.ENOENT, None
> > > +
> > > +                try:
> > > +                    # In case of "unix:" argument, the assumption is that
> > > +                    # there is a file created in the path (suffix).
> > > +                    open(suffix, 'r').close()
> > > +                except:
> > > +                    return errno.ENOENT, None
> > > +
> > > +                try:
> > > +                    npipe = winutils.create_file(pipename)
> > > +                    try:
> > > +                        winutils.set_pipe_mode(npipe,
> > > +                                               win32pipe.PIPE_READMODE_BYTE)
> > > +                    except pywintypes.error as e:
> > > +                        return errno.ENOENT, None
> > > +                except pywintypes.error as e:
> > > +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > > +                        # Pipe is busy, set the retry flag to true and retry
> > > +                        # again during the connect function.
> > > +                        Stream.retry_connect = True
> > > +                        return 0, cls(None, name, errno.EAGAIN,
> > > +                                      pipe=win32file.INVALID_HANDLE_VALUE,
> > > +                                      is_server=False)
> > > +                    return errno.ENOENT, None
> > > +                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
> > > +
> > >          error, sock = cls._open(suffix, dscp)
> > >          if error:
> > >              return error, None
> > > @@ -145,6 +207,10 @@ class Stream(object):
> > >          if not error:
> > >              while True:
> > >                  error = stream.connect()
> > > +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                    # for EAGAIN on Unix.
> > > +                    error = errno.EAGAIN
> > >                  if error != errno.EAGAIN:
> > >                      break
> > >                  stream.run()
> > > @@ -152,7 +218,8 @@ class Stream(object):
> > >                  stream.run_wait(poller)
> > >                  stream.connect_wait(poller)
> > >                  poller.block()
> > > -            assert error != errno.EINPROGRESS
> > > +            if stream.socket is not None:
> > > +                assert error != errno.EINPROGRESS
> > >
> > >          if error and stream:
> > >              stream.close()
> > > @@ -160,11 +227,35 @@ class Stream(object):
> > >          return error, stream
> > >
> > >      def close(self):
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            if self._server:
> > > +                win32pipe.DisconnectNamedPipe(self.pipe)
> > > +            winutils.close_handle(self.pipe, vlog.warn)
> > > +            winutils.close_handle(self._read.hEvent, vlog.warn)
> > > +            winutils.close_handle(self._write.hEvent, vlog.warn)
> > >
> > >      def __scs_connecting(self):
> > > -        retval = ovs.socket_util.check_connection_completion(self.socket)
> > > -        assert retval != errno.EINPROGRESS
> > > +        if self.socket is not None:
> > > +            retval = ovs.socket_util.check_connection_completion(self.socket)
> > > +            assert retval != errno.EINPROGRESS
> > > +        elif sys.platform == 'win32' and self.retry_connect:
> > > +            try:
> > > +                self.pipe = winutils.create_file(self._pipename)
> > > +                self._retry_connect = False
> > > +                retval = 0
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> > > +                    retval = errno.EAGAIN
> > > +                else:
> > > +                    self._retry_connect = False
> > > +                    retval = errno.ENOENT
> > > +        else:
> > > +            # Windows only, if retry_connect is false, it means it's already
> > > +            # connected so we can set the value of retval to 0
> > > +            retval = 0
> > > +
> > >          if retval == 0:
> > >              self.state = Stream.__S_CONNECTED
> > >          elif retval != errno.EAGAIN:
> > > @@ -209,11 +300,63 @@ class Stream(object):
> > >          elif n == 0:
> > >              return (0, "")
> > >
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__recv_windows(n)
> > > +
> > >          try:
> > >              return (0, self.socket.recv(n))
> > >          except socket.error as e:
> > >              return (ovs.socket_util.get_exception_errno(e), "")
> > >
> > > +    def __recv_windows(self, n):
> > > +        if self._read_pending:
> > > +            try:
> > > +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> > > +                                                            self._read,
> > > +                                                            False)
> > > +                self._read_pending = False
> > > +                recvBuffer = self._read_buffer[:nBytesRead]
> > > +
> > > +                return (0, winutils.get_decoded_buffer(recvBuffer))
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self._read_pending = True
> > > +                    return (errno.EAGAIN, "")
> > > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > > +                    # If the pipe was disconnected, return 0.
> > > +                    return (0, "")
> > > +                else:
> > > +                    return (errno.EINVAL, "")
> > > +
> > > +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> > > +                                                          n,
> > > +                                                          self._read)
> > > +        if errCode:
> > > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > > +                self._read_pending = True
> > > +                return (errno.EAGAIN, "")
> > > +            elif errCode in winutils.pipe_disconnected_errors:
> > > +                # If the pipe was disconnected, return 0.
> > > +                return (0, "")
> > > +            else:
> > > +                return (errCode, "")
> > > +
> > > +        try:
> > > +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> > > +                                                        self._read,
> > > +                                                        False)
> > > +            winutils.win32event.SetEvent(self._read.hEvent)
> > > +        except pywintypes.error as e:
> > > +            if e.winerror in winutils.pipe_disconnected_errors:
> > > +                # If the pipe was disconnected, return 0.
> > > +                return (0, "")
> > > +            else:
> > > +                return (e.winerror, "")
> > > +
> > > +        recvBuffer = self._read_buffer[:nBytesRead]
> > > +        return (0, winutils.get_decoded_buffer(recvBuffer))
> > > +
> > >      def send(self, buf):
> > >          """Tries to send 'buf' on this stream.
> > >
> > > @@ -231,6 +374,9 @@ class Stream(object):
> > >          elif len(buf) == 0:
> > >              return 0
> > >
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__send_windows(buf)
> > > +
> > >          try:
> > >              # Python 3 has separate types for strings and bytes.  We must have
> > >              # bytes here.
> > > @@ -240,6 +386,40 @@ class Stream(object):
> > >          except socket.error as e:
> > >              return -ovs.socket_util.get_exception_errno(e)
> > >
> > > +    def __send_windows(self, buf):
> > > +        if self._write_pending:
> > > +            try:
> > > +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> > > +                                                               self._write,
> > > +                                                               False)
> > > +                self._write_pending = False
> > > +                return nBytesWritten
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self._read_pending = True
> > > +                    return -errno.EAGAIN
> > > +                elif e.winerror in winutils.pipe_disconnected_errors:
> > > +                    # If the pipe was disconnected, return connection reset.
> > > +                    return -errno.ECONNRESET
> > > +                else:
> > > +                    return -errno.EINVAL
> > > +
> > > +        buf = winutils.get_encoded_buffer(buf)
> > > +        self._write_pending = False
> > > +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> > > +                                                       buf,
> > > +                                                       self._write)
> > > +        if errCode:
> > > +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> > > +                self._write_pending = True
> > > +                return -errno.EAGAIN
> > > +            if (not nBytesWritten and
> > > +                    errCode in winutils.pipe_disconnected_errors):
> > > +                # If the pipe was disconnected, return connection reset.
> > > +                return -errno.ECONNRESET
> > > +        return nBytesWritten
> > > +
> > >      def run(self):
> > >          pass
> > >
> > > @@ -255,11 +435,52 @@ class Stream(object):
> > >
> > >          if self.state == Stream.__S_CONNECTING:
> > >              wait = Stream.W_CONNECT
> > > +
> > > +        if sys.platform == 'win32':
> > > +            self.__wait_windows(poller, wait)
> > > +            return
> > > +
> > >          if wait == Stream.W_RECV:
> > >              poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > >          else:
> > >              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> > >
> > > +    def __wait_windows(self, poller, wait):
> > > +        if self.socket is not None:
> > > +            if wait == Stream.W_RECV:
> > > +                read_flags = (win32file.FD_READ |
> > > +                              win32file.FD_ACCEPT |
> > > +                              win32file.FD_CLOSE)
> > > +                try:
> > > +                    win32file.WSAEventSelect(self.socket,
> > > +                                             self._read.hEvent,
> > > +                                             read_flags)
> > > +                except pywintypes.error as e:
> > > +                    vlog.err("failed to associate events with socket: %s"
> > > +                             % e.strerror)
> > > +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > > +            else:
> > > +                write_flags = (win32file.FD_WRITE |
> > > +                               win32file.FD_CONNECT |
> > > +                               win32file.FD_CLOSE)
> > > +                try:
> > > +                    win32file.WSAEventSelect(self.socket,
> > > +                                             self._write.hEvent,
> > > +                                             write_flags)
> > > +                except pywintypes.error as e:
> > > +                    vlog.err("failed to associate events with socket: %s"
> > > +                             % e.strerror)
> > > +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > > +        else:
> > > +            if wait == Stream.W_RECV:
> > > +                if self._read:
> > > +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> > > +            elif wait == Stream.W_SEND:
> > > +                if self._write:
> > > +                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> > > +            elif wait == Stream.W_CONNECT:
> > > +                return
> > > +
> > >      def connect_wait(self, poller):
> > >          self.wait(poller, Stream.W_CONNECT)
> > >
> > > @@ -267,11 +488,22 @@ class Stream(object):
> > >          self.wait(poller, Stream.W_RECV)
> > >
> > >      def send_wait(self, poller):
> > > +        if sys.platform == 'win32':
> > > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> > >          self.wait(poller, Stream.W_SEND)
> > >
> > >      def __del__(self):
> > >          # Don't delete the file: we might have forked.
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            # Check if there are any remaining valid handles and close them
> > > +            if self.pipe:
> > > +                winutils.close_handle(self.pipe)
> > > +            if self._read.hEvent:
> > > +                winutils.close_handle(self._read.hEvent)
> > > +            if self._write.hEvent:
> > > +                winutils.close_handle(self._write.hEvent)
> > >
> > >      @staticmethod
> > >      def ssl_set_private_key_file(file_name):
> > > @@ -287,6 +519,10 @@ class Stream(object):
> > >
> > >
> > >  class PassiveStream(object):
> > > +    # Windows only
> > > +    connect = None                  # overlapped for read operation
> > > +    connect_pending = False
> > > +
> > >      @staticmethod
> > >      def is_valid_name(name):
> > >          """Returns True if 'name' is a passive stream name in the form
> > > @@ -294,9 +530,18 @@ class PassiveStream(object):
> > >          "punix:" or "ptcp"), otherwise False."""
> > >          return name.startswith("punix:") | name.startswith("ptcp:")
> > >
> > > -    def __init__(self, sock, name, bind_path):
> > > +    def __init__(self, sock, name, bind_path, pipe=None):
> > >          http://self.name = name
> > > +        self.pipe = pipe
> > >          self.socket = sock
> > > +        if pipe is not None:
> > > +            self.connect = pywintypes.OVERLAPPED()
> > > +            self.connect.hEvent = winutils.get_new_event(bManualReset=True)
> > > +            self.connect_pending = False
> > > +            suffix = name.split(":", 1)[1]
> > > +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> > > +            self._pipename = winutils.get_pipe_name(suffix)
> > > +
> > >          self.bind_path = bind_path
> > >
> > >      @staticmethod
> > > @@ -315,11 +560,27 @@ class PassiveStream(object):
> > >          bind_path = name[6:]
> > >          if name.startswith("punix:"):
> > >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> > > -            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > > -                                                           True, bind_path,
> > > -                                                           None)
> > > -            if error:
> > > -                return error, None
> > > +            if sys.platform != 'win32':
> > > +                error, sock = ovs.socket_util.make_unix_socket(
> > > +                    socket.SOCK_STREAM, True, bind_path, None)
> > > +                if error:
> > > +                    return error, None
> > > +            else:
> > > +                # Branch used only on Windows
> > > +                try:
> > > +                    open(bind_path, 'w').close()
> > > +                except:
> > > +                    return errno.ENOENT, None
> > > +
> > > +                pipename = winutils.get_pipe_name(bind_path)
> > > +                if len(pipename) > 255:
> > > +                    # Return invalid argument if the name is too long
> > > +                    return errno.ENOENT, None
> > > +
> > > +                npipe = winutils.create_named_pipe(pipename)
> > > +                if not npipe:
> > > +                    return errno.ENOENT, None
> > > +                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
> > >
> > >          elif name.startswith("ptcp:"):
> > >              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > > @@ -341,7 +602,11 @@ class PassiveStream(object):
> > >
> > >      def close(self):
> > >          """Closes this PassiveStream."""
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            winutils.close_handle(self.pipe, vlog.warn)
> > > +            winutils.close_handle(self.connect.hEvent, vlog.warn)
> > >          if self.bind_path is not None:
> > >              ovs.fatal_signal.unlink_file_now(self.bind_path)
> > >              self.bind_path = None
> > > @@ -354,28 +619,80 @@ class PassiveStream(object):
> > >
> > >          Will not block waiting for a connection.  If no connection is ready to
> > >          be accepted, returns (errno.EAGAIN, None) immediately."""
> > > -
> > > +        if sys.platform == 'win32' and self.socket is None:
> > > +            return self.__accept_windows()
> > >          while True:
> > >              try:
> > >                  sock, addr = self.socket.accept()
> > >                  ovs.socket_util.set_nonblocking(sock)
> > > -                if (sock.family == socket.AF_UNIX):
> > > +                if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
> > >                      return 0, Stream(sock, "unix:%s" % addr, 0)
> > >                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> > >                                                         str(addr[1])), 0)
> > >              except socket.error as e:
> > >                  error = ovs.socket_util.get_exception_errno(e)
> > > +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> > > +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                    # for EAGAIN on Unix.
> > > +                    error = errno.EAGAIN
> > >                  if error != errno.EAGAIN:
> > >                      # XXX rate-limit
> > >                      vlog.dbg("accept: %s" % os.strerror(error))
> > >                  return error, None
> > >
> > > +    def __accept_windows(self):
> > > +        if self.connect_pending:
> > > +            try:
> > > +                winutils.get_overlapped_result(self.pipe, self.connect, False)
> > > +            except pywintypes.error as e:
> > > +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> > > +                    # The operation is still pending, try again
> > > +                    self.connect_pending = True
> > > +                    return errno.EAGAIN, None
> > > +                else:
> > > +                    if self.pipe:
> > > +                        win32pipe.DisconnectNamedPipe(self.pipe)
> > > +                    return errno.EINVAL, None
> > > +            self.connect_pending = False
> > > +
> > > +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> > > +        if error:
> > > +            if error == winutils.winerror.ERROR_IO_PENDING:
> > > +                self.connect_pending = True
> > > +                return errno.EAGAIN, None
> > > +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> > > +                if self.pipe:
> > > +                    win32pipe.DisconnectNamedPipe(self.pipe)
> > > +                self.connect_pending = False
> > > +                return errno.EINVAL, None
> > > +            else:
> > > +                win32event.SetEvent(self.connect.hEvent)
> > > +
> > > +        npipe = winutils.create_named_pipe(self._pipename)
> > > +        if not npipe:
> > > +            return errno.ENOENT, None
> > > +
> > > +        old_pipe = self.pipe
> > > +        self.pipe = npipe
> > > +        winutils.win32event.ResetEvent(self.connect.hEvent)
> > > +        return 0, Stream(None, http://self.name, 0, pipe=old_pipe)
> > > +
> > >      def wait(self, poller):
> > > -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > > +        if sys.platform != 'win32' or self.socket is not None:
> > > +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> > > +        else:
> > > +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> > >
> > >      def __del__(self):
> > >          # Don't delete the file: we might have forked.
> > > -        self.socket.close()
> > > +        if self.socket is not None:
> > > +            self.socket.close()
> > > +        if self.pipe is not None:
> > > +            # Check if there are any remaining valid handles and close them
> > > +            if self.pipe:
> > > +                winutils.close_handle(self.pipe)
> > > +            if self._connect.hEvent:
> > > +                winutils.close_handle(self._read.hEvent)
> > >
> > >
> > >  def usage(name):
> > > diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> > > index 8595ed8..3f3e051 100644
> > > --- a/python/ovs/unixctl/server.py
> > > +++ b/python/ovs/unixctl/server.py
> > > @@ -148,6 +148,10 @@ class UnixctlServer(object):
> > >      def run(self):
> > >          for _ in range(10):
> > >              error, stream = self._listener.accept()
> > > +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> > > +                # WSAEWOULDBLOCK would be the equivalent on Windows
> > > +                # for EAGAIN on Unix.
> > > +                error = errno.EAGAIN
> > >              if not error:
> > >                  rpc = ovs.jsonrpc.Connection(stream)
> > >                  self._conns.append(UnixctlConnection(rpc))
> > > diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
> > > index 18634e6..3eabcd7 100644
> > > --- a/tests/test-jsonrpc.py
> > > +++ b/tests/test-jsonrpc.py
> > > @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> > >
> > >
> > >  def do_listen(name):
> > > -    error, pstream = ovs.stream.PassiveStream.open(name)
> > > -    if error:
> > > -        sys.stderr.write("could not listen on \"%s\": %s\n"
> > > -                         % (name, os.strerror(error)))
> > > -        sys.exit(1)
> > > +    if sys.platform != 'win32' or (
> > > +            ovs.daemon._detach and ovs.daemon._detached):
> > > +        # On Windows the child is a new process created which should be the
> > > +        # one that creates the PassiveStream. Without this check, the new
> > > +        # child process will create a new PassiveStream overwriting the one
> > > +        # that the parent process created.
> > > +        error, pstream = ovs.stream.PassiveStream.open(name)
> > > +        if error:
> > > +            sys.stderr.write("could not listen on \"%s\": %s\n"
> > > +                             % (name, os.strerror(error)))
> > > +            sys.exit(1)
> > >
> > >      ovs.daemon.daemonize()
> > >
> > > --
> > > 2.10.0.windows.1
> > > _______________________________________________
> > > dev mailing list
> > > mailto:mailto:mailto:mailto:dev@openvswitch.org
> > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Patch
diff mbox

diff --git a/python/ovs/stream.py b/python/ovs/stream.py
index e8e5700..af35afd 100644
--- a/python/ovs/stream.py
+++ b/python/ovs/stream.py
@@ -251,7 +251,7 @@  class Stream(object):
                 else:
                     self._retry_connect = False
                     retval = errno.ENOENT
-        else:
+        elif sys.platform == 'win32':
             # Windows only, if retry_connect is false, it means it's
already
             # connected so we can set the value of retval to 0