Add unit test for passive mode. --- python/ovs/db/idl.py | 18 +++++++++++++++--- python/ovs/jsonrpc.py | 19 +++++++++++-------- python/ovs/stream.py | 47 +++++++++++++++++++++++++++++++---------------- tests/ovsdb-idl.at | 31 +++++++++++++++++++++++++++++++ tests/test-ovsdb.py | 49 ++++++++++++++++++++++++++++++++----------------- 5 files changed, 120 insertions(+), 44 deletions(-)
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index c8990c7..4b492fe 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -83,7 +83,7 @@ class Idl(object): currently being constructed, if there is one, or None otherwise. """ - def __init__(self, remote, schema): + def __init__(self, remote, schema, session = None): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to ovs.jsonrpc.session.open(). The connection will maintain an in-memory @@ -101,7 +101,16 @@ class Idl(object): As a convenience to users, 'schema' may also be an instance of the SchemaHelper class. - The IDL uses and modifies 'schema' directly.""" + The IDL uses and modifies 'schema' directly. + + In passive mode ( where the OVSDB connects to its manager ), + we first need to wait for the OVSDB to connect and then + pass the 'session' object (while the it is still open ) and + the schema we retrieved from the open session to the IDL to use it. + + If in active mode, do not pass 'session' and it will be created + by IDL by using 'remote'. + """ assert isinstance(schema, SchemaHelper) schema = schema.get_idl_schema() @@ -109,7 +118,10 @@ class Idl(object): self.tables = schema.tables self.readonly = schema.readonly self._db = schema - self._session = ovs.jsonrpc.Session.open(remote) + if session: + self._session = session + else: + self._session = ovs.jsonrpc.Session.open(remote) self._monitor_request_id = None self._last_seqno = None self.change_seqno = 0 diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index d54d74b..1b68d3f 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -418,23 +418,25 @@ class Session(object): self.__disconnect() name = self.reconnect.get_name() - if not self.reconnect.is_passive(): - error, self.stream = ovs.stream.Stream.open(name) + if self.reconnect.is_passive(): + if self.pstream is not None: + self.pstream.close() + error, self.pstream = ovs.stream.PassiveStream.open(name) if not error: - self.reconnect.connecting(ovs.timeval.msec()) + self.reconnect.listening(ovs.timeval.msec()) else: self.reconnect.connect_failed(ovs.timeval.msec(), error) - elif self.pstream is not None: - error, self.pstream = ovs.stream.PassiveStream.open(name) + else: + error, self.stream = ovs.stream.Stream.open(name) if not error: - self.reconnect.listening(ovs.timeval.msec()) + self.reconnect.connecting(ovs.timeval.msec()) else: self.reconnect.connect_failed(ovs.timeval.msec(), error) self.seqno += 1 def run(self): - if self.pstream is not None: + if self.pstream is not None and self.stream is None: error, stream = self.pstream.accept() if error == 0: if self.rpc or self.stream: @@ -444,11 +446,11 @@ class Session(object): self.__disconnect() self.reconnect.connected(ovs.timeval.msec()) self.rpc = Connection(stream) + self.stream = stream elif error != errno.EAGAIN: self.reconnect.listen_error(ovs.timeval.msec(), error) self.pstream.close() self.pstream = None - if self.rpc: backlog = self.rpc.get_backlog() self.rpc.run() @@ -559,3 +561,4 @@ class Session(object): def force_reconnect(self): self.reconnect.force_reconnect(ovs.timeval.msec()) + diff --git a/python/ovs/stream.py b/python/ovs/stream.py index fb083ee..a8be6e0 100644 --- a/python/ovs/stream.py +++ b/python/ovs/stream.py @@ -15,7 +15,6 @@ import errno import os import socket - import ovs.poller import ovs.socket_util import ovs.vlog @@ -261,11 +260,11 @@ class PassiveStream(object): @staticmethod def is_valid_name(name): """Returns True if 'name' is a passive stream name in the form - "TYPE:ARGS" and TYPE is a supported passive stream type (currently only - "punix:"), otherwise False.""" - return name.startswith("punix:") + "TYPE:ARGS" and TYPE is a supported passive stream type, + otherwise False.""" + return name.startswith("punix:") or name.startswith("ptcp:") - def __init__(self, sock, name, bind_path): + def __init__(self, sock, name, bind_path=None): self.name = name self.socket = sock self.bind_path = bind_path @@ -274,22 +273,31 @@ class PassiveStream(object): def open(name): """Attempts to start listening for remote stream connections. 'name' is a connection name in the form "TYPE:ARGS", where TYPE is an passive - stream class's name and ARGS are stream class-specific. Currently the - only supported TYPE is "punix". + stream class's name and ARGS are stream class-specific. Returns (error, pstream): on success 'error' is 0 and 'pstream' is the new PassiveStream, on failure 'error' is a positive errno value and 'pstream' is None.""" if not PassiveStream.is_valid_name(name): return errno.EAFNOSUPPORT, None - - bind_path = name[6:] + bind_path = None if name.startswith("punix:"): + bind_path = name[6:] bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) - error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, - True, bind_path, None) - if error: - return error, None + error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, + True, bind_path, + None) + if error: + return error, None + + elif name.startswith("ptcp:"): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + remote = name.split(':') + sock.bind((remote[1], int(remote[2]))) + + else: + raise Exception('Unknown connection string') try: sock.listen(10) @@ -320,7 +328,10 @@ class PassiveStream(object): try: sock, addr = self.socket.accept() ovs.socket_util.set_nonblocking(sock) - return 0, Stream(sock, "unix:%s" % addr, 0) + if (sock.family == socket.AF_UNIX): + return 0, Stream(sock, "unix:%s" % addr, 0) + return 0, Stream(sock, 'ptcp:' + addr[0] + ':' + str(addr[1]), + 0) except socket.error, e: error = ovs.socket_util.get_exception_errno(e) if error != errno.EAGAIN: @@ -350,8 +361,10 @@ class UnixStream(Stream): @staticmethod def _open(suffix, dscp): connect_path = suffix - return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, - True, None, connect_path) + return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, + True, None, connect_path) + + Stream.register_method("unix", UnixStream) @@ -363,4 +376,6 @@ class TCPStream(Stream): if not error: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return error, sock + + Stream.register_method("tcp", TCPStream) diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at index ad780af..effdef0 100644 --- a/tests/ovsdb-idl.at +++ b/tests/ovsdb-idl.at @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL], OVSDB_CHECK_IDL_TCP_PY($@) OVSDB_CHECK_IDL_TCP6_PY($@)]) + +# This test uses the Python IDL implementation with passive tcp +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY], + [AT_SETUP([$1 - Python ptcp]) + AT_SKIP_IF([test $HAVE_PYTHON = no]) + AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5]) + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema], + [0], [stdout], [ignore]) + # find free TCP port + AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) + PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT]) + AT_CHECK([kill `cat pid`]) + + # start OVSDB server in passive mode + AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3], + [0], [stdout], [ignore], [kill `cat pid`]) + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]), + [0], [$4], [], [kill `cat pid`]) + AT_CLEANUP + ]) + + +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, select empty], + [], + [['["idltest",{"op":"select","table":"link1","where":[]}]']], + [[000: empty +001: {"error":null,"result":[{"rows":[]}]} +002: done +]]) + OVSDB_CHECK_IDL([simple idl, initially empty, no ops], [], [], diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py index a6897f3..dccaf77 100644 --- a/tests/test-ovsdb.py +++ b/tests/test-ovsdb.py @@ -403,17 +403,31 @@ def do_idl(schema_file, remote, *commands): commands = commands[1:] else: schema_helper.register_all() - idl = ovs.db.idl.Idl(remote, schema_helper) - if commands: - error, stream = ovs.stream.Stream.open_block( - ovs.stream.Stream.open(remote)) - if error: - sys.stderr.write("failed to connect to \"%s\"" % remote) - sys.exit(1) - rpc = ovs.jsonrpc.Connection(stream) + if remote.startswith('ptcp'): + passive = True + else: + passive = False + + if passive: + session = ovs.jsonrpc.Session.open(remote) + session.run() + session.run() + + rpc = session.rpc + idl = ovs.db.idl.Idl(remote, schema_helper, session) else: - rpc = None + idl = ovs.db.idl.Idl(remote, schema_helper) + + if commands: + error, stream = ovs.stream.Stream.open_block( + ovs.stream.Stream.open(remote)) + if error: + sys.stderr.write("failed to connect to \"%s\"" % remote) + sys.exit(1) + rpc = ovs.jsonrpc.Connection(stream) + else: + rpc = None symtab = {} seqno = 0 @@ -471,14 +485,15 @@ def do_idl(schema_file, remote, *commands): sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json())) sys.stdout.flush() - if rpc: - rpc.close() - while idl.change_seqno == seqno and not idl.run(): - poller = ovs.poller.Poller() - idl.wait(poller) - poller.block() - print_idl(idl, step) - step += 1 + if not passive: + if rpc: + rpc.close() + while idl.change_seqno == seqno and not idl.run(): + poller = ovs.poller.Poller() + idl.wait(poller) + poller.block() + print_idl(idl, step) + step += 1 idl.close() print("%03d: done" % step) -- 2.1.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev