I did put the explanation in the patch text: # first call to session.run creates the PassiveStream object and second one # accept incoming connection session.run() session.run()
But you are right, I will add it to the code too. Ofer. On Tue, Jan 26, 2016 at 8:53 PM, Russell Bryant <russ...@ovn.org> wrote: > On 01/25/2016 04:09 AM, ofer.benya...@gmail.com wrote: > > From: Ofer Ben-Yacov <ofer.benya...@gmail.com> > > > > Currently the IDL does not support passive TCP connection, > > i.e. when the OVSDB connects to its manager. > > > > This patch enables IDL to use an already-open session > > (the one which was previously used for retrieving the db schema). > > In addition, it enables IDL to go back to "listen mode" in case the > connection > > is lost. > > > > LIMITATIONS: > > ---------------------- > > > > This patch enables a **SINGLE** TCP connection from an OVSDB server to an > > agent that uses IDL with {IP,PORT} pair. Therefore, the agent will > support > > only **ONE** OVSDB server using {IP,PORT} pair. > > > > Future development may add multi-session server capability that will > allow > > an agent to use single {IP,PORT} pair to connect to multiple OVSDB > servers. > > > > > > CAVEAT: > > -------------- > > > > When a database first connects to the agent, the agent gets the schema > and > > data and builds its tables. If the session disconnects, the agent goes > back > > to "listen mode" and accepts **ANY** TCP connection, which means that if > > another database will try to connect to the agent using the same > {IP,PORT} > > pair, it will be connected to the IDL that has the schema and data from > > the first database. > > > > A future patch can resolve this problem. > > > > USAGE: > > ------------- > > > > To use IDL in passive mode, the following example code can be use: > > > > (snippet) > > > > from ovs.jsonrpc import Session > > ... > > > > from neutron.agent.ovsdb.native import idlutils > > > > ... > > > > session = Session.open('ptcp:192.168.10.10:6640') > > > > # first call to session.run creates the PassiveStream object and second > one > > # accept incoming connection > > session.run() > > session.run() > > > > # this static method is similar to the original neutron method but the > > # rpc.close() command that would result closing the socket. > > helper = idlutils.get_schema_helper_from_stream_no_close(session.stream, > > 'hardware_vtep') > > helper.register_all() > > self.idl = idl.Idl(self.connection, helper, session) > > idlutils.wait_for_change(self.idl, self.timeout) > > > > self.poller = poller.Poller() > > self.thread = threading.Thread(target=self.run) > > self.thread.setDaemon(True) > > self.thread.start() > > It's a little odd to show example usage that depends on neutron code. I > actually don't even see get_schema_helper_from_stream_no_close() in > idlutils right now. Is that a pending patch? > > Maybe this usage information could be incorporated into a docstring > somewhere in idl.py? That would make it more useful to future users, as > they likely won't find the details in the commit message. > > > > > > > TESTING: > > --------------- > > Added unit test for passive mode. See ovsdb-idl.at file. > > > > > > Signed-off-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com> > > > > Tested-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com> > > > > Requested-by: Ben Pfaff <b...@nicira.com>, > > "D M, Vikas" <vikas....@hpe.com>, > > "Kamat, Maruti Haridas" <maruti.ka...@hpe.com>, > > "Sukhdev Kapur" <sukh...@arista.com>, > > "Migliaccio, Armando" <armando.migliac...@hpe.com> > > Each email address should have it's own "Requested-by" prefix. > > > > > --- > > python/ovs/db/idl.py | 18 +++++++++++++++--- > > python/ovs/jsonrpc.py | 18 ++++++++++-------- > > python/ovs/stream.py | 32 ++++++++++++++++++++++---------- > > tests/ovsdb-idl.at | 31 +++++++++++++++++++++++++++++++ > > tests/test-ovsdb.py | 49 > ++++++++++++++++++++++++++++++++----------------- > > 5 files changed, 110 insertions(+), 38 deletions(-) > > > > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py > > index 3187db9..8f55d0d 100644 > > --- a/python/ovs/db/idl.py > > +++ b/python/ovs/db/idl.py > > @@ -85,7 +85,7 @@ class Idl(object): > > currently being constructed, if there is one, or None otherwise. > > """ > > > > - def __init__(self, remote, schema): > > + def __init__(self, remote, schema, session = None): > > You need this patch to get the full set of flake8 errors: > > https://patchwork.ozlabs.org/patch/571788/ > > with that, I get the following (which includes the need to remove spaces > around '=' above. > > python/ovs/db/idl.py:88:47: E251 unexpected spaces around keyword / > parameter equals > python/ovs/db/idl.py:88:49: E251 unexpected spaces around keyword / > parameter equals > > > """Creates and returns a connection to the database named > 'db_name' on > > 'remote', which should be in a form acceptable to > > ovs.jsonrpc.session.open(). The connection will maintain an > in-memory > > @@ -103,7 +103,16 @@ class Idl(object): > > As a convenience to users, 'schema' may also be an instance of > the > > SchemaHelper class. > > > > - The IDL uses and modifies 'schema' directly.""" > > + The IDL uses and modifies 'schema' directly. > > + > > + In passive mode ( where the OVSDB connects to its manager ), > > + we first need to wait for the OVSDB to connect and then > > saying "the OVSDB server" would read a little clearer to me than just > "the OVSDB". > > > + pass the 'session' object (while the it is still open ) and > > + the schema we retrieved from the open session to the IDL to use > it. > > + > > + If in active mode, do not pass 'session' and it will be created > > + by IDL by using 'remote'. > > + """ > > > > assert isinstance(schema, SchemaHelper) > > schema = schema.get_idl_schema() > > @@ -111,7 +120,10 @@ class Idl(object): > > self.tables = schema.tables > > self.readonly = schema.readonly > > self._db = schema > > - self._session = ovs.jsonrpc.Session.open(remote) > > + if session: > > + self._session = session > > + else: > > + self._session = ovs.jsonrpc.Session.open(remote) > > self._monitor_request_id = None > > self._last_seqno = None > > self.change_seqno = 0 > > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py > > index 99aa27c..de1125b 100644 > > --- a/python/ovs/jsonrpc.py > > +++ b/python/ovs/jsonrpc.py > > @@ -418,23 +418,25 @@ class Session(object): > > self.__disconnect() > > > > name = self.reconnect.get_name() > > - if not self.reconnect.is_passive(): > > - error, self.stream = ovs.stream.Stream.open(name) > > + if self.reconnect.is_passive(): > > + if self.pstream is not None: > > + self.pstream.close() > > + error, self.pstream = ovs.stream.PassiveStream.open(name) > > if not error: > > - self.reconnect.connecting(ovs.timeval.msec()) > > + self.reconnect.listening(ovs.timeval.msec()) > > else: > > self.reconnect.connect_failed(ovs.timeval.msec(), error) > > - elif self.pstream is not None: > > - error, self.pstream = ovs.stream.PassiveStream.open(name) > > + else: > > + error, self.stream = ovs.stream.Stream.open(name) > > if not error: > > - self.reconnect.listening(ovs.timeval.msec()) > > + self.reconnect.connecting(ovs.timeval.msec()) > > else: > > self.reconnect.connect_failed(ovs.timeval.msec(), error) > > > > self.seqno += 1 > > > > def run(self): > > - if self.pstream is not None: > > + if self.pstream is not None and self.stream is None: > > error, stream = self.pstream.accept() > > if error == 0: > > if self.rpc or self.stream: > > @@ -444,11 +446,11 @@ class Session(object): > > self.__disconnect() > > self.reconnect.connected(ovs.timeval.msec()) > > self.rpc = Connection(stream) > > + self.stream = stream > > elif error != errno.EAGAIN: > > self.reconnect.listen_error(ovs.timeval.msec(), error) > > self.pstream.close() > > self.pstream = None > > - > > Unrelated formatting change here. > > > if self.rpc: > > backlog = self.rpc.get_backlog() > > self.rpc.run() > > diff --git a/python/ovs/stream.py b/python/ovs/stream.py > > index a555a76..46f6a8d 100644 > > --- a/python/ovs/stream.py > > +++ b/python/ovs/stream.py > > @@ -278,26 +278,35 @@ class PassiveStream(object): > > def open(name): > > """Attempts to start listening for remote stream connections. > 'name' > > is a connection name in the form "TYPE:ARGS", where TYPE is an > passive > > - stream class's name and ARGS are stream class-specific. > Currently the > > - only supported TYPE is "punix". > > + stream class's name and ARGS are stream class-specific. > > How about updating this to explicitly say that it's now "punix" and "ptcp" > ? > > > > > Returns (error, pstream): on success 'error' is 0 and 'pstream' > is the > > new PassiveStream, on failure 'error' is a positive errno value > and > > 'pstream' is None.""" > > if not PassiveStream.is_valid_name(name): > > return errno.EAFNOSUPPORT, None > > - > > - bind_path = name[6:] > > + bind_path = None > > if name.startswith("punix:"): > > + bind_path = name[6:] > > bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, > bind_path) > > - error, sock = > ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, > > - True, bind_path, > None) > > - if error: > > - return error, None > > + error, sock = > ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, > > + True, > bind_path, > > + None) > > + if error: > > + return error, None > > + > > + elif name.startswith("ptcp:"): > > + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) > > + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) > > + remote = name.split(':') > > + sock.bind((remote[1], int(remote[2]))) > > + > > + else: > > + raise Exception('Unknown connection string') > > > > try: > > sock.listen(10) > > - except socket.error as e: > > + except socket.error, e: > > This change is reverting a Python 3 compatibility fix. I'm sure it was > just an accident while resolving conflicts in a rebase, but it should be > restored to "except socket.error as e". > > If you install flake8 as well as the hacking flake8 plugin, you will get > an error at build-time about this. > > python/ovs/stream.py:309:9: H231 Python 3.x incompatible 'except x,y:' > construct > > > > vlog.err("%s: listen: %s" % (name, os.strerror(e.error))) > > sock.close() > > return e.error, None > > @@ -324,7 +333,10 @@ class PassiveStream(object): > > try: > > sock, addr = self.socket.accept() > > ovs.socket_util.set_nonblocking(sock) > > - return 0, Stream(sock, "unix:%s" % addr, 0) > > + if (sock.family == socket.AF_UNIX): > > + return 0, Stream(sock, "unix:%s" % addr, 0) > > + return 0, Stream(sock, 'ptcp:' + addr[0] + ':' + > str(addr[1]), > > This is a minor style thing, but I'd prefer: > > 'ptcp:%s:%s' % (addr[0], addr[1]) > > > + 0) > > except socket.error as e: > > error = ovs.socket_util.get_exception_errno(e) > > if error != errno.EAGAIN: > > diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at > > index ebf82a5..813812e 100644 > > --- a/tests/ovsdb-idl.at > > +++ b/tests/ovsdb-idl.at > > @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL], > > OVSDB_CHECK_IDL_TCP_PY($@) > > OVSDB_CHECK_IDL_TCP6_PY($@)]) > > > > + > > +# This test uses the Python IDL implementation with passive tcp > > +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY], > > + [AT_SETUP([$1 - Python ptcp]) > > + AT_SKIP_IF([test $HAVE_PYTHON = no]) > > + AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5]) > > + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema], > > + [0], [stdout], [ignore]) > > + # find free TCP port > > + AT_CHECK([ovsdb-server --log-file > '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir > --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 > --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) > > + PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT]) > > + AT_CHECK([kill `cat pid`]) > > + > > + # start OVSDB server in passive mode > > + AT_CHECK([ovsdb-server --log-file > '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir > --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT > --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore]) > > + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl > $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3], > > + [0], [stdout], [ignore], [kill `cat pid`]) > > + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, > [[| $6]]), > > + [0], [$4], [], [kill `cat pid`]) > > + AT_CLEANUP > > + ]) > > + > > + > > +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, > select empty], > > + [], > > + [['["idltest",{"op":"select","table":"link1","where":[]}]']], > > + [[000: empty > > +001: {"error":null,"result":[{"rows":[]}]} > > +002: done > > +]]) > > + > > OVSDB_CHECK_IDL([simple idl, initially empty, no ops], > > [], > > [], > > Several other tests in here test with both IPv4 and IPv6. > OVSDB_CHECK_IDL_TCP_PY vs OVSDB_CHECK_IDL_TCP6_PY. > > Can you update this to test with both IPv4 and IPv6? I actually don't > think IPv6 is working currently. > > It would also be great if you could come up with a test that ran against > both the C and Python implementations, like all the tests that call > OVSDB_CHECK_IDL. That may require some work in the C code though, and > if you're not comfortable with that, don't worry about it. You could > leave it as a TODO in here for now. > > > diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py > > index 4690722..ac28d76 100644 > > --- a/tests/test-ovsdb.py > > +++ b/tests/test-ovsdb.py > > @@ -406,17 +406,31 @@ def do_idl(schema_file, remote, *commands): > > commands = commands[1:] > > else: > > schema_helper.register_all() > > - idl = ovs.db.idl.Idl(remote, schema_helper) > > > > - if commands: > > - error, stream = ovs.stream.Stream.open_block( > > - ovs.stream.Stream.open(remote)) > > - if error: > > - sys.stderr.write("failed to connect to \"%s\"" % remote) > > - sys.exit(1) > > - rpc = ovs.jsonrpc.Connection(stream) > > + if remote.startswith('ptcp'): > > + passive = True > > + else: > > + passive = False > > minor simplification: > > passive = remove.startswith('ptcp:') > > > + > > + if passive: > > + session = ovs.jsonrpc.Session.open(remote) > > + session.run() > > + session.run() > > A comment would be helpful in the code here to explain why you're > calling session.run() twice here. Otherwise it looks like an accident. > > > + > > + rpc = session.rpc > > + idl = ovs.db.idl.Idl(remote, schema_helper, session) > > else: > > - rpc = None > > + idl = ovs.db.idl.Idl(remote, schema_helper) > > + > > + if commands: > > + error, stream = ovs.stream.Stream.open_block( > > + ovs.stream.Stream.open(remote)) > > + if error: > > + sys.stderr.write("failed to connect to \"%s\"" % remote) > > + sys.exit(1) > > + rpc = ovs.jsonrpc.Connection(stream) > > + else: > > + rpc = None > > > > symtab = {} > > seqno = 0 > > @@ -474,14 +488,15 @@ def do_idl(schema_file, remote, *commands): > > sys.stdout.write("%s\n" % > ovs.json.to_string(reply.to_json())) > > sys.stdout.flush() > > > > - if rpc: > > - rpc.close() > > - while idl.change_seqno == seqno and not idl.run(): > > - poller = ovs.poller.Poller() > > - idl.wait(poller) > > - poller.block() > > - print_idl(idl, step) > > - step += 1 > > + if not passive: > > + if rpc: > > + rpc.close() > > + while idl.change_seqno == seqno and not idl.run(): > > + poller = ovs.poller.Poller() > > + idl.wait(poller) > > + poller.block() > > + print_idl(idl, step) > > + step += 1 > > idl.close() > > print("%03d: done" % step) > > > > > > > -- > Russell Bryant > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev