AF_UNIX sockets are not supported on Windows. Instead of an AF_UNIX socket use named pipes to communicate between components. This makes the python sockets compatible with the named pipe used in Windows applications. Added stream_windows.py with named pipe and localhost tcp connections support.
Signed-off-by: Paul-Daniel Boca <pb...@cloudbasesolutions.com> --- python/automake.mk | 1 + python/ovs/jsonrpc.py | 9 +- python/ovs/poller.py | 49 +++- python/ovs/socket_util.py | 20 +- python/ovs/stream_windows.py | 611 +++++++++++++++++++++++++++++++++++++++++++ python/ovs/unixctl/client.py | 6 +- python/ovs/unixctl/server.py | 11 +- tests/test-jsonrpc.py | 17 +- tests/test-ovsdb.py | 8 +- 9 files changed, 703 insertions(+), 29 deletions(-) create mode 100644 python/ovs/stream_windows.py diff --git a/python/automake.mk b/python/automake.mk index 3fe9519..7bbf382 100644 --- a/python/automake.mk +++ b/python/automake.mk @@ -27,6 +27,7 @@ ovs_pyfiles = \ python/ovs/process.py \ python/ovs/reconnect.py \ python/ovs/socket_util.py \ + python/ovs/stream_windows.py \ python/ovs/stream_unix.py \ python/ovs/timeval.py \ python/ovs/unixctl/__init__.py \ diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index 8ca01a0..d70f13e 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -14,13 +14,17 @@ import errno import os +import sys import six import ovs.json import ovs.poller import ovs.reconnect -import ovs.stream_unix as ovs_stream +if sys.platform == "win32": + import ovs.stream_windows as ovs_stream +else: + import ovs.stream_unix as ovs_stream import ovs.timeval import ovs.util import ovs.vlog @@ -274,6 +278,9 @@ class Connection(object): except UnicodeError: error = errno.EILSEQ if error: + if (sys.platform == "win32" + and error == errno.WSAEWOULDBLOCK): + error = errno.EAGAIN if error == errno.EAGAIN: return error, None else: diff --git a/python/ovs/poller.py b/python/ovs/poller.py index de6bf22..970decc 100644 --- a/python/ovs/poller.py +++ b/python/ovs/poller.py @@ -18,6 +18,7 @@ import ovs.vlog import select import socket import os +import sys try: import eventlet.patcher @@ -54,7 +55,8 @@ class _SelectSelect(object): def register(self, fd, events): if isinstance(fd, socket.socket): fd = fd.fileno() - assert isinstance(fd, int) + if not sys.platform == "win32": + assert isinstance(fd, int) if events & POLLIN: self.rlist.append(fd) events &= ~POLLIN @@ -75,18 +77,39 @@ class _SelectSelect(object): if timeout == 0 and _using_eventlet_green_select(): timeout = 0.1 - rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist, - timeout) - events_dict = {} - for fd in rlist: - events_dict[fd] = events_dict.get(fd, 0) | POLLIN - for fd in wlist: - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT - for fd in xlist: - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | - POLLHUP | - POLLNVAL) - return list(events_dict.items()) + if sys.platform == "win32": + import win32event + import winerror + + if timeout is None: + timeout = 0xFFFFFFFF + else: + timeout = int(timeout * 1000) + + events = self.rlist + self.wlist + self.xlist + if not events: + return list() + error = win32event.WaitForMultipleObjectsEx(events, False, + timeout, False) + if error == winerror.WAIT_TIMEOUT: + return list() + + return [(events[error], 0)] + else: + rlist, wlist, xlist = select.select(self.rlist, + self.wlist, + self.xlist, + timeout) + events_dict = {} + for fd in rlist: + events_dict[fd] = events_dict.get(fd, 0) | POLLIN + for fd in wlist: + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT + for fd in xlist: + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | + POLLHUP | + POLLNVAL) + return list(events_dict.items()) SelectPoll = _SelectSelect diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py index b358b05..54f448d 100644 --- a/python/ovs/socket_util.py +++ b/python/ovs/socket_util.py @@ -17,6 +17,7 @@ import os import os.path import random import socket +import sys import six from six.moves import range @@ -25,6 +26,10 @@ import ovs.fatal_signal import ovs.poller import ovs.vlog +if sys.platform == "win32": + import win32file + import win32event + vlog = ovs.vlog.Vlog("socket_util") @@ -158,7 +163,15 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False): def check_connection_completion(sock): p = ovs.poller.SelectPoll() - p.register(sock, ovs.poller.POLLOUT) + if sys.platform == "win32": + event = win32event.CreateEvent(None, False, True, None) + win32file.WSAEventSelect(sock, event, + win32file.FD_WRITE | + win32file.FD_CONNECT | + win32file.FD_CLOSE) + p.register(event, ovs.poller.POLLOUT) + else: + p.register(sock, ovs.poller.POLLOUT) pfds = p.poll(0) if len(pfds) == 1: revents = pfds[0][1] @@ -228,7 +241,10 @@ def inet_open_active(style, target, default_port, dscp): try: sock.connect(address) except socket.error as e: - if get_exception_errno(e) != errno.EINPROGRESS: + error = get_exception_errno(e) + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: + error = errno.EINPROGRESS + if error != errno.EINPROGRESS: raise return 0, sock except socket.error as e: diff --git a/python/ovs/stream_windows.py b/python/ovs/stream_windows.py new file mode 100644 index 0000000..dd8d4ba --- /dev/null +++ b/python/ovs/stream_windows.py @@ -0,0 +1,611 @@ +# Copyright (c) 2010, 2011, 2012 Nicira, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import errno +import os +import socket +import sys +import six + +import ovs.poller +import ovs.socket_util +import ovs.vlog + +import pywintypes +import winerror +import win32pipe +import win32con +import win32security +import win32file +import win32event + +vlog = ovs.vlog.Vlog("stream") + + +def stream_or_pstream_needs_probes(name): + """ 1 if the stream or pstream specified by 'name' needs periodic probes to + verify connectivity. For [p]streams which need probes, it can take a long + time to notice the connection was dropped. Returns 0 if probes aren't + needed, and -1 if 'name' is invalid""" + + if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name): + # Only unix and punix are supported currently. + return 0 + else: + return -1 + + +class Stream(object): + """Bidirectional byte stream. Currently only Unix domain sockets + are implemented.""" + + # States. + __S_CONNECTING = 0 + __S_CONNECTED = 1 + __S_DISCONNECTED = 2 + + # Kinds of events that one might wait for. + W_CONNECT = 0 # Connect complete (success or failure). + W_RECV = 1 # Data received. + W_SEND = 2 # Send buffer room available. + + _SOCKET_METHODS = {} + + write = None # overlapped for write operation + read = None # overlapped for read operation + write_pending = False + read_pending = False + retry_connect = False + + @staticmethod + def register_method(method, cls): + Stream._SOCKET_METHODS[method + ":"] = cls + + @staticmethod + def _find_method(name): + for method, cls in six.iteritems(Stream._SOCKET_METHODS): + if name.startswith(method): + return cls + return None + + @staticmethod + def is_valid_name(name): + """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and + TYPE is a supported stream type (currently only "unix:" and "tcp:"), + otherwise False.""" + return bool(Stream._find_method(name)) + + def __init__(self, sock, name, status): + if isinstance(sock, socket.socket): + self.socket = sock + else: + self.pipe = sock + self.read = pywintypes.OVERLAPPED() + self.read.hEvent = win32event.CreateEvent(None, True, True, None) + self.write = pywintypes.OVERLAPPED() + self.write.hEvent = win32event.CreateEvent(None, True, True, None) + + self.name = name + if status == errno.EAGAIN: + self.state = Stream.__S_CONNECTING + elif status == 0: + self.state = Stream.__S_CONNECTED + else: + self.state = Stream.__S_DISCONNECTED + + self.error = 0 + + # Default value of dscp bits for connection between controller and manager. + # Value of IPTOS_PREC_INTERNETCONTROL = 0xc0 which is defined + # in <netinet/ip.h> is used. + IPTOS_PREC_INTERNETCONTROL = 0xc0 + DSCP_DEFAULT = IPTOS_PREC_INTERNETCONTROL >> 2 + + @staticmethod + def open(name, dscp=DSCP_DEFAULT): + """Attempts to connect a stream to a remote peer. 'name' is a + connection name in the form "TYPE:ARGS", where TYPE is an active stream + class's name and ARGS are stream class-specific. Currently the only + supported TYPEs are "unix" and "tcp". + + Returns (error, stream): on success 'error' is 0 and 'stream' is the + new Stream, on failure 'error' is a positive errno value and 'stream' + is None. + + Never returns errno.EAGAIN or errno.EINPROGRESS. Instead, returns 0 + and a new Stream. The connect() method can be used to check for + successful connection completion.""" + cls = Stream._find_method(name) + if not cls: + return errno.EAFNOSUPPORT, None + + suffix = name.split(":", 1)[1] + if name.startswith("unix:"): + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + suffix = suffix.replace('/', '') + suffix = suffix.replace('\\', '') + suffix = "\\\\.\\pipe\\" + suffix + + saAttr = win32security.SECURITY_ATTRIBUTES() + saAttr.bInheritHandle = 1 + try: + npipe = win32file.CreateFile( + suffix, + win32file.GENERIC_READ | win32file.GENERIC_WRITE, + 0, None, + win32file.OPEN_EXISTING, + win32file.FILE_ATTRIBUTE_NORMAL | + win32file.FILE_FLAG_OVERLAPPED | + win32file.FILE_FLAG_NO_BUFFERING, + None) + except pywintypes.error as e: + return e.winerror, None + + return 0, Stream(npipe, suffix, 0) + else: + error, sock = cls._open(suffix, dscp) + if error: + return error, None + else: + status = ovs.socket_util.check_connection_completion(sock) + return 0, Stream(sock, name, status) + + @staticmethod + def _open(suffix, dscp): + raise NotImplementedError("This method must be overrided by subclass") + + @staticmethod + def open_block(error_stream): + """Blocks until a Stream completes its connection attempt, either + succeeding or failing. (error, stream) should be the tuple returned by + Stream.open(). Returns a tuple of the same form. + + Typical usage: + error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))""" + + # Py3 doesn't support tuple parameter unpacking - PEP 3113 + error, stream = error_stream + if not error: + while True: + error = stream.connect() + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: + error = errno.EAGAIN + if error != errno.EAGAIN: + break + stream.run() + poller = ovs.poller.Poller() + stream.run_wait(poller) + stream.connect_wait(poller) + poller.block() + assert error != errno.EINPROGRESS + + if error and stream: + stream.close() + stream = None + return error, stream + + def close(self): + if hasattr(self, "socket"): + self.socket.close() + + def __scs_connecting(self): + if hasattr(self, "socket"): + retval = ovs.socket_util.check_connection_completion(self.socket) + elif self.retry_connect: + saAttr = win32security.SECURITY_ATTRIBUTES() + saAttr.bInheritHandle = 1 + + try: + self.pipe = win32file.CreateFile( + self.name, + win32file.GENERIC_READ | win32file.GENERIC_WRITE, + 0, None, + win32file.OPEN_EXISTING, + win32file.FILE_ATTRIBUTE_NORMAL | + win32file.FILE_FLAG_OVERLAPPED | + win32file.FILE_FLAG_NO_BUFFERING, + None) + except pywintypes.error: + retval = errno.EAGAIN + self.retry_connect = True + + assert retval != errno.EINPROGRESS + if retval == 0: + self.state = Stream.__S_CONNECTED + elif retval != errno.EAGAIN: + self.state = Stream.__S_DISCONNECTED + self.error = retval + + def connect(self): + """Tries to complete the connection on this stream. If the connection + is complete, returns 0 if the connection was successful or a positive + errno value if it failed. If the connection is still in progress, + returns errno.EAGAIN.""" + # raise + if self.state == Stream.__S_CONNECTING: + self.__scs_connecting() + + if self.state == Stream.__S_CONNECTING: + return errno.EAGAIN + elif self.state == Stream.__S_CONNECTED: + return 0 + else: + assert self.state == Stream.__S_DISCONNECTED + return self.error + + def recv(self, n): + """Tries to receive up to 'n' bytes from this stream. Returns a + (error, string) tuple: + + - If successful, 'error' is zero and 'string' contains between 1 + and 'n' bytes of data. + + - On error, 'error' is a positive errno value. + + - If the connection has been closed in the normal fashion or if 'n' + is 0, the tuple is (0, ""). + + The recv function will not block waiting for data to arrive. If no + data have been received, it returns (errno.EAGAIN, "") immediately.""" + + retval = self.connect() + if retval != 0: + return (retval, "") + elif n == 0: + return (0, "") + if hasattr(self, "socket"): + try: + return (0, self.socket.recv(n)) + except socket.error as e: + return (ovs.socket_util.get_exception_errno(e), "") + else: + if self.read_pending: + try: + nBytesRead = win32file.GetOverlappedResult(self.pipe, + self.read, + False) + self.read_pending = False + recvBuffer = self.read_buffer[:nBytesRead] + if six.PY3: + return (0, bytes(recvBuffer).decode("utf-8")) + else: + return (0, str(recvBuffer)) + except pywintypes.error as e: + return (errno.EAGAIN, "") + + try: + (errCode, self.read_buffer) = win32file.ReadFile(self.pipe, + n, + self.read) + + if errCode == winerror.ERROR_IO_PENDING: + self.read_pending = True + return (errno.EAGAIN, "") + # elif errCode: + # return (errCode, "") + + nBytesRead = win32file.GetOverlappedResult(self.pipe, + self.read, + False) + win32event.SetEvent(self.read.hEvent) + recvBuffer = self.read_buffer[:nBytesRead] + if six.PY3: + return (0, bytes(recvBuffer).decode("utf-8")) + else: + return (0, str(recvBuffer)) + except pywintypes.error as e: + return (e.winerror, "") + + def send(self, buf): + """Tries to send 'buf' on this stream. + + If successful, returns the number of bytes sent, between 1 and + len(buf). 0 is only a valid return value if len(buf) is 0. + + On error, returns a negative errno value. + + Will not block. If no bytes can be immediately accepted for + transmission, returns -errno.EAGAIN immediately.""" + + retval = self.connect() + if retval != 0: + return -retval + elif len(buf) == 0: + return 0 + + if hasattr(self, "socket"): + try: + # Python 3 has separate types for strings and bytes. We must + # have bytes here. + if six.PY3 and not isinstance(buf, six.binary_type): + buf = six.binary_type(buf, 'utf-8') + return self.socket.send(buf) + except socket.error as e: + return -ovs.socket_util.get_exception_errno(e) + else: + if self.write_pending: + try: + nBytesWritten = win32file.GetOverlappedResult(self.pipe, + self.write, + False) + self.write_pending = False + return nBytesWritten + except pywintypes.error as e: + return -errno.EAGAIN + + try: + # Python 3 has separate types for strings and bytes. We must + # have bytes here. + if not isinstance(buf, six.binary_type): + if six.PY3: + buf = six.binary_type(buf, 'utf-8') + else: + buf = six.binary_type(buf) + + self.write_pending = False + (errCode, nBytesWritten) = win32file.WriteFile(self.pipe, + buf, + self.write) + if errCode == winerror.ERROR_IO_PENDING: + self.write_pending = True + return -errno.EAGAIN + # elif errCode: + # return -errCode + + nBytesWritten = win32file.GetOverlappedResult(self.pipe, + self.write, + False) + win32event.SetEvent(self.write.hEvent) + + return nBytesWritten + except pywintypes.error as e: + return -e.winerror + + def run(self): + pass + + def run_wait(self, poller): + pass + + def wait(self, poller, wait): + if hasattr(self, "socket"): + import win32file + import win32event + + assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND) + + if self.state == Stream.__S_DISCONNECTED: + poller.immediate_wake() + return + + if self.state == Stream.__S_CONNECTING: + wait = Stream.W_CONNECT + + event = win32event.CreateEvent(None, True, True, None) + + if wait == Stream.W_RECV: + win32file.WSAEventSelect(self.socket, event, + win32file.FD_READ | + win32file.FD_ACCEPT | + win32file.FD_CLOSE) + poller.fd_wait(event, ovs.poller.POLLIN) + else: + win32file.WSAEventSelect(self.socket, event, + win32file.FD_WRITE | + win32file.FD_CONNECT | + win32file.FD_CLOSE) + poller.fd_wait(event, ovs.poller.POLLOUT) + else: + if wait == Stream.W_RECV: + if self.read: + poller.fd_wait(self.read.hEvent, ovs.poller.POLLIN) + else: + if self.write: + poller.fd_wait(self.write.hEvent, ovs.poller.POLLOUT) + + def connect_wait(self, poller): + self.wait(poller, Stream.W_CONNECT) + + def recv_wait(self, poller): + self.wait(poller, Stream.W_RECV) + + def send_wait(self, poller): + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) + self.wait(poller, Stream.W_SEND) + + def __del__(self): + # Don't delete the file: we might have forked. + if hasattr(self, "socket"): + self.socket.close() + else: + win32file.CloseHandle(self.pipe) + self.pipe = None + + +class PassiveStream(object): + connect = None # overlapped for read operation + connect_pending = False + + @staticmethod + def is_valid_name(name): + """Returns True if 'name' is a passive stream name in the form + "TYPE:ARGS" and TYPE is a supported passive stream type (currently + "punix:" or "ptcp"), otherwise False.""" + return name.startswith("punix:") | name.startswith("ptcp:") + + def __init__(self, sock, name, bind_path): + self.name = name + if isinstance(sock, socket.socket): + self.socket = sock + else: + self.pipe = sock + self.bind_path = bind_path + + @staticmethod + def open(name): + """Attempts to start listening for remote stream connections. 'name' + is a connection name in the form "TYPE:ARGS", where TYPE is an passive + stream class's name and ARGS are stream class-specific. Currently the + supported values for TYPE are "punix" and "ptcp". + + Returns (error, pstream): on success 'error' is 0 and 'pstream' is the + new PassiveStream, on failure 'error' is a positive errno value and + 'pstream' is None.""" + # raise OSError + suffix = name.split(":", 1)[1] + if name.startswith("punix:"): + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + try: + open(suffix, 'w').close() + except: + return errno.EAFNOSUPPORT, None + + pipename = suffix.replace('/', '') + pipename = pipename.replace('\\', '') + pipename = "\\\\.\\pipe\\" + pipename + + saAttr = win32security.SECURITY_ATTRIBUTES() + saAttr.bInheritHandle = 1 + + npipe = win32pipe.CreateNamedPipe( + pipename, + win32con.PIPE_ACCESS_DUPLEX | + win32con.FILE_FLAG_OVERLAPPED, + win32con.PIPE_TYPE_MESSAGE | + win32con.PIPE_READMODE_BYTE | + win32con.PIPE_WAIT, + 64, 65000, 65000, 0, saAttr + ) + return 0, PassiveStream(npipe, pipename, suffix) + else: + return errno.EAFNOSUPPORT, None + + def close(self): + """Closes this PassiveStream.""" + if hasattr(self, "socket"): + self.socket.close() + else: + win32pipe.DisconnectNamedPipe(self.pipe) + if self.bind_path is not None: + ovs.fatal_signal.unlink_file_now(self.bind_path) + self.bind_path = None + + def accept(self): + """Tries to accept a new connection on this passive stream. Returns + (error, stream): if successful, 'error' is 0 and 'stream' is the new + Stream object, and on failure 'error' is a positive errno value and + 'stream' is None. + + Will not block waiting for a connection. If no connection is ready to + be accepted, returns (errno.EAGAIN, None) immediately.""" + + if hasattr(self, "socket"): + while True: + try: + sock, addr = self.socket.accept() + ovs.socket_util.set_nonblocking(sock) + if (sys.platform != "win32" + and sock.family == socket.AF_UNIX): + return 0, Stream(sock, "unix:%s" % addr, 0) + return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], + str(addr[1])), 0) + except socket.error as e: + error = ovs.socket_util.get_exception_errno(e) + if (sys.platform == "win32" and + error == errno.WSAEWOULDBLOCK): + error = errno.EAGAIN + if error != errno.EAGAIN: + # XXX rate-limit + vlog.dbg("accept: %s" % os.strerror(error)) + return error, None + else: + if self.connect_pending: + try: + win32file.GetOverlappedResult(self.pipe, self.connect) + self.connect_pending = False + except pywintypes.error as e: + return (errno.EAGAIN, "") + return 0, Stream(self.pipe, "", 0) + + try: + self.connect_pending = False + self.connect = pywintypes.OVERLAPPED() + self.connect.hEvent = win32event.CreateEvent(None, True, + True, None) + error = win32pipe.ConnectNamedPipe(self.pipe, self.connect) + if error == winerror.ERROR_IO_PENDING: + self.connect_pending = True + return errno.EAGAIN, None + + stream = Stream(self.pipe, "", 0) + + saAttr = win32security.SECURITY_ATTRIBUTES() + saAttr.bInheritHandle = 1 + self.pipe = win32pipe.CreateNamedPipe( + self.name, + win32con.PIPE_ACCESS_DUPLEX | + win32con.FILE_FLAG_OVERLAPPED, + win32con.PIPE_TYPE_MESSAGE | + win32con.PIPE_READMODE_BYTE | + win32con.PIPE_WAIT, + 64, 65000, 65000, 0, saAttr + ) + + return 0, stream + except pywintypes.error as e: + return errno.EAGAIN, None + + def wait(self, poller): + if hasattr(self, "socket"): + poller.fd_wait(self.socket, ovs.poller.POLLIN) + else: + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) + + def __del__(self): + # Don't delete the file: we might have forked. + if hasattr(self, "socket"): + self.socket.close() + else: + win32file.CloseHandle(self.pipe) + self.pipe = None + + +def usage(name): + return """ +Active %s connection methods: + unix:FILE Unix domain socket named FILE + tcp:IP:PORT TCP socket to IP with port no of PORT + +Passive %s connection methods: + punix:FILE Listen on Unix domain socket FILE""" % (name, name) + + +class UnixStream(Stream): + @staticmethod + def _open(suffix, dscp): + connect_path = suffix + return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, + True, None, connect_path) +Stream.register_method("unix", UnixStream) + + +class TCPStream(Stream): + @staticmethod + def _open(suffix, dscp): + error, sock = ovs.socket_util.inet_open_active(socket.SOCK_STREAM, + suffix, 0, dscp) + if not error: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + return error, sock +Stream.register_method("tcp", TCPStream) diff --git a/python/ovs/unixctl/client.py b/python/ovs/unixctl/client.py index fde674e..ede4855 100644 --- a/python/ovs/unixctl/client.py +++ b/python/ovs/unixctl/client.py @@ -13,12 +13,16 @@ # limitations under the License. import os +import sys import six import ovs.jsonrpc -import ovs.stream_unix as ovs_stream import ovs.util +if sys.platform == "win32": + import ovs.stream_windows as ovs_stream +else: + import ovs.stream_unix as ovs_stream vlog = ovs.vlog.Vlog("unixctl_client") diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py index 50a11d4..d457a2c 100644 --- a/python/ovs/unixctl/server.py +++ b/python/ovs/unixctl/server.py @@ -22,7 +22,10 @@ from six.moves import range import ovs.dirs import ovs.jsonrpc -import ovs.stream_unix as ovs_stream +if sys.platform == "win32": + import ovs.stream_windows as ovs_stream +else: + import ovs.stream_unix as ovs_stream import ovs.unixctl import ovs.util import ovs.version @@ -148,6 +151,8 @@ class UnixctlServer(object): def run(self): for _ in range(10): error, stream = self._listener.accept() + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: + error = errno.EAGAIN if not error: rpc = ovs.jsonrpc.Connection(stream) self._conns.append(UnixctlConnection(rpc)) @@ -155,8 +160,8 @@ class UnixctlServer(object): break else: # XXX: rate-limit - vlog.warn("%s: accept failed: %s" % (self._listener.name, - os.strerror(error))) + vlog.warn("%s: accept failed: %s %d" + % (self._listener.name, os.strerror(error), error)) for conn in copy.copy(self._conns): error = conn.run() diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py index 18634e6..8d9010d 100644 --- a/tests/test-jsonrpc.py +++ b/tests/test-jsonrpc.py @@ -23,7 +23,10 @@ import ovs.daemon import ovs.json import ovs.jsonrpc import ovs.poller -import ovs.stream +if sys.platform == "win32": + import ovs.stream_windows as ovs_stream +else: + import ovs.stream_unix as ovs_stream def handle_rpc(rpc, msg): @@ -53,14 +56,14 @@ def handle_rpc(rpc, msg): def do_listen(name): - error, pstream = ovs.stream.PassiveStream.open(name) + ovs.daemon.daemonize() + + error, pstream = ovs_stream.PassiveStream.open(name) if error: sys.stderr.write("could not listen on \"%s\": %s\n" % (name, os.strerror(error))) sys.exit(1) - ovs.daemon.daemonize() - rpcs = [] done = False while True: @@ -111,7 +114,7 @@ def do_request(name, method, params_string): sys.stderr.write("not a valid JSON-RPC request: %s\n" % s) sys.exit(1) - error, stream = ovs.stream.Stream.open_block(ovs.stream.Stream.open(name)) + error, stream = ovs_stream.Stream.open_block(ovs_stream.Stream.open(name)) if error: sys.stderr.write("could not open \"%s\": %s\n" % (name, os.strerror(error))) @@ -142,7 +145,7 @@ def do_notify(name, method, params_string): sys.stderr.write("not a valid JSON-RPC notification: %s\n" % s) sys.exit(1) - error, stream = ovs.stream.Stream.open_block(ovs.stream.Stream.open(name)) + error, stream = ovs_stream.Stream.open_block(ovs_stream.Stream.open(name)) if error: sys.stderr.write("could not open \"%s\": %s\n" % (name, os.strerror(error))) @@ -174,7 +177,7 @@ def main(argv): listen LOCAL listen for connections on LOCAL request REMOTE METHOD PARAMS send request, print reply notify REMOTE METHOD PARAMS send notification and exit -""" + ovs.stream.usage("JSON-RPC") +""" + ovs_stream.usage("JSON-RPC") group = parser.add_argument_group(title="Commands", description=group_description) diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py index e4e3395..9f6ef49 100644 --- a/tests/test-ovsdb.py +++ b/tests/test-ovsdb.py @@ -30,6 +30,10 @@ import ovs.poller import ovs.util from ovs.fatal_signal import signal_alarm import six +if sys.platform == "win32": + import ovs.stream_windows as ovs_stream +else: + import ovs.stream_unix as ovs_stream def unbox_json(json): @@ -534,8 +538,8 @@ def do_idl(schema_file, remote, *commands): idl = ovs.db.idl.Idl(remote, schema_helper) if commands: - error, stream = ovs.stream.Stream.open_block( - ovs.stream.Stream.open(remote)) + error, stream = ovs_stream.Stream.open_block( + ovs_stream.Stream.open(remote)) if error: sys.stderr.write("failed to connect to \"%s\"" % remote) sys.exit(1) -- 2.7.2.windows.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev