I did put the explanation in the patch text:

# first call to session.run creates the PassiveStream object and second one
# accept incoming connection
session.run()
session.run()


But you are right, I will add it to the code too.

Ofer.


On Tue, Jan 26, 2016 at 8:53 PM, Russell Bryant <russ...@ovn.org> wrote:

> On 01/25/2016 04:09 AM, ofer.benya...@gmail.com wrote:
> > From: Ofer Ben-Yacov <ofer.benya...@gmail.com>
> >
> > Currently the IDL does not support passive TCP connection,
> > i.e. when the OVSDB connects to its manager.
> >
> > This patch enables IDL to use an already-open session
> > (the one which was previously used for retrieving the db schema).
> > In addition, it enables IDL to go back to "listen mode" in case the
> connection
> > is lost.
> >
> > LIMITATIONS:
> > ----------------------
> >
> > This patch enables a **SINGLE** TCP connection from an OVSDB server to an
> > agent that uses IDL with {IP,PORT} pair. Therefore, the agent will
> support
> > only **ONE** OVSDB server using {IP,PORT} pair.
> >
> > Future development may add multi-session server capability that will
> allow
> > an agent to use single {IP,PORT} pair to connect to multiple OVSDB
> servers.
> >
> >
> > CAVEAT:
> > --------------
> >
> > When a database first connects to the agent, the agent gets the schema
> and
> > data and builds its tables. If the session disconnects, the agent goes
> back
> > to "listen mode" and accepts **ANY** TCP connection, which means that if
> > another database will try to connect to the agent using the same
> {IP,PORT}
> > pair, it will be connected to the IDL that has the schema and data from
> > the first database.
> >
> > A future patch can resolve this problem.
> >
> > USAGE:
> > -------------
> >
> > To use IDL in passive mode, the following example code can be use:
> >
> > (snippet)
> >
> > from ovs.jsonrpc import Session
> > ...
> >
> > from neutron.agent.ovsdb.native import idlutils
> >
> > ...
> >
> > session = Session.open('ptcp:192.168.10.10:6640')
> >
> > # first call to session.run creates the PassiveStream object and second
> one
> > # accept incoming connection
> > session.run()
> > session.run()
> >
> > # this static method is similar to the original neutron method but the
> > # rpc.close() command that would result closing the socket.
> > helper = idlutils.get_schema_helper_from_stream_no_close(session.stream,
> >         'hardware_vtep')
> > helper.register_all()
> > self.idl = idl.Idl(self.connection, helper, session)
> > idlutils.wait_for_change(self.idl, self.timeout)
> >
> > self.poller = poller.Poller()
> > self.thread = threading.Thread(target=self.run)
> > self.thread.setDaemon(True)
> > self.thread.start()
>
> It's a little odd to show example usage that depends on neutron code.  I
> actually don't even see get_schema_helper_from_stream_no_close() in
> idlutils right now.  Is that a pending patch?
>
> Maybe this usage information could be incorporated into a docstring
> somewhere in idl.py?  That would make it more useful to future users, as
> they likely won't find the details in the commit message.
>
> >
> >
> > TESTING:
> > ---------------
> > Added unit test for passive mode. See ovsdb-idl.at file.
> >
> >
> > Signed-off-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com>
> >
> > Tested-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com>
> >
> > Requested-by: Ben Pfaff <b...@nicira.com>,
> >         "D M, Vikas" <vikas....@hpe.com>,
> >         "Kamat, Maruti Haridas" <maruti.ka...@hpe.com>,
> >         "Sukhdev Kapur" <sukh...@arista.com>,
> >         "Migliaccio, Armando" <armando.migliac...@hpe.com>
>
> Each email address should have it's own "Requested-by" prefix.
>
> >
> > ---
> >  python/ovs/db/idl.py  | 18 +++++++++++++++---
> >  python/ovs/jsonrpc.py | 18 ++++++++++--------
> >  python/ovs/stream.py  | 32 ++++++++++++++++++++++----------
> >  tests/ovsdb-idl.at    | 31 +++++++++++++++++++++++++++++++
> >  tests/test-ovsdb.py   | 49
> ++++++++++++++++++++++++++++++++-----------------
> >  5 files changed, 110 insertions(+), 38 deletions(-)
> >
> > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> > index 3187db9..8f55d0d 100644
> > --- a/python/ovs/db/idl.py
> > +++ b/python/ovs/db/idl.py
> > @@ -85,7 +85,7 @@ class Idl(object):
> >        currently being constructed, if there is one, or None otherwise.
> >  """
> >
> > -    def __init__(self, remote, schema):
> > +    def __init__(self, remote, schema, session = None):
>
> You need this patch to get the full set of flake8 errors:
>
> https://patchwork.ozlabs.org/patch/571788/
>
> with that, I get the following (which includes the need to remove spaces
> around '=' above.
>
> python/ovs/db/idl.py:88:47: E251 unexpected spaces around keyword /
> parameter equals
> python/ovs/db/idl.py:88:49: E251 unexpected spaces around keyword /
> parameter equals
>
> >          """Creates and returns a connection to the database named
> 'db_name' on
> >          'remote', which should be in a form acceptable to
> >          ovs.jsonrpc.session.open().  The connection will maintain an
> in-memory
> > @@ -103,7 +103,16 @@ class Idl(object):
> >          As a convenience to users, 'schema' may also be an instance of
> the
> >          SchemaHelper class.
> >
> > -        The IDL uses and modifies 'schema' directly."""
> > +        The IDL uses and modifies 'schema' directly.
> > +
> > +        In passive mode ( where the OVSDB connects to its manager ),
> > +        we first need to wait for the OVSDB to connect and then
>
> saying "the OVSDB server" would read a little clearer to me than just
> "the OVSDB".
>
> > +        pass the 'session' object (while the it is still open ) and
> > +        the schema we retrieved from the open session to the IDL to use
> it.
> > +
> > +        If in active mode, do not pass 'session' and it will be created
> > +        by IDL by using 'remote'.
> > +        """
> >
> >          assert isinstance(schema, SchemaHelper)
> >          schema = schema.get_idl_schema()
> > @@ -111,7 +120,10 @@ class Idl(object):
> >          self.tables = schema.tables
> >          self.readonly = schema.readonly
> >          self._db = schema
> > -        self._session = ovs.jsonrpc.Session.open(remote)
> > +        if session:
> > +            self._session = session
> > +        else:
> > +            self._session = ovs.jsonrpc.Session.open(remote)
> >          self._monitor_request_id = None
> >          self._last_seqno = None
> >          self.change_seqno = 0
> > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > index 99aa27c..de1125b 100644
> > --- a/python/ovs/jsonrpc.py
> > +++ b/python/ovs/jsonrpc.py
> > @@ -418,23 +418,25 @@ class Session(object):
> >          self.__disconnect()
> >
> >          name = self.reconnect.get_name()
> > -        if not self.reconnect.is_passive():
> > -            error, self.stream = ovs.stream.Stream.open(name)
> > +        if self.reconnect.is_passive():
> > +            if self.pstream is not None:
> > +                self.pstream.close()
> > +            error, self.pstream = ovs.stream.PassiveStream.open(name)
> >              if not error:
> > -                self.reconnect.connecting(ovs.timeval.msec())
> > +                self.reconnect.listening(ovs.timeval.msec())
> >              else:
> >                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
> > -        elif self.pstream is not None:
> > -            error, self.pstream = ovs.stream.PassiveStream.open(name)
> > +        else:
> > +            error, self.stream = ovs.stream.Stream.open(name)
> >              if not error:
> > -                self.reconnect.listening(ovs.timeval.msec())
> > +                self.reconnect.connecting(ovs.timeval.msec())
> >              else:
> >                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
> >
> >          self.seqno += 1
> >
> >      def run(self):
> > -        if self.pstream is not None:
> > +        if self.pstream is not None and self.stream is None:
> >              error, stream = self.pstream.accept()
> >              if error == 0:
> >                  if self.rpc or self.stream:
> > @@ -444,11 +446,11 @@ class Session(object):
> >                      self.__disconnect()
> >                  self.reconnect.connected(ovs.timeval.msec())
> >                  self.rpc = Connection(stream)
> > +                self.stream = stream
> >              elif error != errno.EAGAIN:
> >                  self.reconnect.listen_error(ovs.timeval.msec(), error)
> >                  self.pstream.close()
> >                  self.pstream = None
> > -
>
> Unrelated formatting change here.
>
> >          if self.rpc:
> >              backlog = self.rpc.get_backlog()
> >              self.rpc.run()
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index a555a76..46f6a8d 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -278,26 +278,35 @@ class PassiveStream(object):
> >      def open(name):
> >          """Attempts to start listening for remote stream connections.
> 'name'
> >          is a connection name in the form "TYPE:ARGS", where TYPE is an
> passive
> > -        stream class's name and ARGS are stream class-specific.
> Currently the
> > -        only supported TYPE is "punix".
> > +        stream class's name and ARGS are stream class-specific.
>
> How about updating this to explicitly say that it's now "punix" and "ptcp"
> ?
>
> >
> >          Returns (error, pstream): on success 'error' is 0 and 'pstream'
> is the
> >          new PassiveStream, on failure 'error' is a positive errno value
> and
> >          'pstream' is None."""
> >          if not PassiveStream.is_valid_name(name):
> >              return errno.EAFNOSUPPORT, None
> > -
> > -        bind_path = name[6:]
> > +        bind_path = None
> >          if name.startswith("punix:"):
> > +            bind_path = name[6:]
> >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> bind_path)
> > -        error, sock =
> ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > -                                                       True, bind_path,
> None)
> > -        if error:
> > -            return error, None
> > +            error, sock =
> ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > +                                                           True,
> bind_path,
> > +                                                           None)
> > +            if error:
> > +                return error, None
> > +
> > +        elif name.startswith("ptcp:"):
> > +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> > +            remote = name.split(':')
> > +            sock.bind((remote[1], int(remote[2])))
> > +
> > +        else:
> > +            raise Exception('Unknown connection string')
> >
> >          try:
> >              sock.listen(10)
> > -        except socket.error as e:
> > +        except socket.error, e:
>
> This change is reverting a Python 3 compatibility fix.  I'm sure it was
> just an accident while resolving conflicts in a rebase, but it should be
> restored to "except socket.error as e".
>
> If you install flake8 as well as the hacking flake8 plugin, you will get
> an error at build-time about this.
>
> python/ovs/stream.py:309:9: H231  Python 3.x incompatible 'except x,y:'
> construct
>
>
> >              vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
> >              sock.close()
> >              return e.error, None
> > @@ -324,7 +333,10 @@ class PassiveStream(object):
> >              try:
> >                  sock, addr = self.socket.accept()
> >                  ovs.socket_util.set_nonblocking(sock)
> > -                return 0, Stream(sock, "unix:%s" % addr, 0)
> > +                if (sock.family == socket.AF_UNIX):
> > +                    return 0, Stream(sock, "unix:%s" % addr, 0)
> > +                return 0, Stream(sock, 'ptcp:' + addr[0] + ':' +
> str(addr[1]),
>
> This is a minor style thing, but I'd prefer:
>
>     'ptcp:%s:%s' % (addr[0], addr[1])
>
> > +                                 0)
> >              except socket.error as e:
> >                  error = ovs.socket_util.get_exception_errno(e)
> >                  if error != errno.EAGAIN:
> > diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> > index ebf82a5..813812e 100644
> > --- a/tests/ovsdb-idl.at
> > +++ b/tests/ovsdb-idl.at
> > @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
> >     OVSDB_CHECK_IDL_TCP_PY($@)
> >     OVSDB_CHECK_IDL_TCP6_PY($@)])
> >
> > +
> > +# This test uses the Python IDL implementation with passive tcp
> > +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
> > +  [AT_SETUP([$1 - Python ptcp])
> > +   AT_SKIP_IF([test $HAVE_PYTHON = no])
> > +   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
> > +   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> > +                  [0], [stdout], [ignore])
> > +   # find free TCP port
> > +   AT_CHECK([ovsdb-server --log-file
> '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir
> --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1
> --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> > +   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
> > +   AT_CHECK([kill `cat pid`])
> > +
> > +   # start OVSDB server in passive mode
> > +   AT_CHECK([ovsdb-server --log-file
> '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir
> --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT
> --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> > +   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl
> $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
> > +      [0], [stdout], [ignore], [kill `cat pid`])
> > +   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,,
> [[| $6]]),
> > +            [0], [$4], [], [kill `cat pid`])
> > +   AT_CLEANUP
> > +   ])
> > +
> > +
> > +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty,
> select empty],
> > +  [],
> > +  [['["idltest",{"op":"select","table":"link1","where":[]}]']],
> > +  [[000: empty
> > +001: {"error":null,"result":[{"rows":[]}]}
> > +002: done
> > +]])
> > +
> >  OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
> >    [],
> >    [],
>
> Several other tests in here test with both IPv4 and IPv6.
> OVSDB_CHECK_IDL_TCP_PY vs OVSDB_CHECK_IDL_TCP6_PY.
>
> Can you update this to test with both IPv4 and IPv6?  I actually don't
> think IPv6 is working currently.
>
> It would also be great if you could come up with a test that ran against
> both the C and Python implementations, like all the tests that call
> OVSDB_CHECK_IDL.  That may require some work in the C code though, and
> if you're not comfortable with that, don't worry about it.  You could
> leave it as a TODO in here for now.
>
> > diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> > index 4690722..ac28d76 100644
> > --- a/tests/test-ovsdb.py
> > +++ b/tests/test-ovsdb.py
> > @@ -406,17 +406,31 @@ def do_idl(schema_file, remote, *commands):
> >          commands = commands[1:]
> >      else:
> >          schema_helper.register_all()
> > -    idl = ovs.db.idl.Idl(remote, schema_helper)
> >
> > -    if commands:
> > -        error, stream = ovs.stream.Stream.open_block(
> > -            ovs.stream.Stream.open(remote))
> > -        if error:
> > -            sys.stderr.write("failed to connect to \"%s\"" % remote)
> > -            sys.exit(1)
> > -        rpc = ovs.jsonrpc.Connection(stream)
> > +    if remote.startswith('ptcp'):
> > +        passive = True
> > +    else:
> > +        passive = False
>
> minor simplification:
>
>     passive = remove.startswith('ptcp:')
>
> > +
> > +    if passive:
> > +        session = ovs.jsonrpc.Session.open(remote)
> > +        session.run()
> > +        session.run()
>
> A comment would be helpful in the code here to explain why you're
> calling session.run() twice here.  Otherwise it looks like an accident.
>
> > +
> > +        rpc = session.rpc
> > +        idl = ovs.db.idl.Idl(remote, schema_helper, session)
> >      else:
> > -        rpc = None
> > +        idl = ovs.db.idl.Idl(remote, schema_helper)
> > +
> > +        if commands:
> > +            error, stream = ovs.stream.Stream.open_block(
> > +                ovs.stream.Stream.open(remote))
> > +            if error:
> > +                sys.stderr.write("failed to connect to \"%s\"" % remote)
> > +                sys.exit(1)
> > +            rpc = ovs.jsonrpc.Connection(stream)
> > +        else:
> > +            rpc = None
> >
> >      symtab = {}
> >      seqno = 0
> > @@ -474,14 +488,15 @@ def do_idl(schema_file, remote, *commands):
> >              sys.stdout.write("%s\n" %
> ovs.json.to_string(reply.to_json()))
> >              sys.stdout.flush()
> >
> > -    if rpc:
> > -        rpc.close()
> > -    while idl.change_seqno == seqno and not idl.run():
> > -        poller = ovs.poller.Poller()
> > -        idl.wait(poller)
> > -        poller.block()
> > -    print_idl(idl, step)
> > -    step += 1
> > +    if not passive:
> > +        if rpc:
> > +            rpc.close()
> > +        while idl.change_seqno == seqno and not idl.run():
> > +            poller = ovs.poller.Poller()
> > +            idl.wait(poller)
> > +            poller.block()
> > +        print_idl(idl, step)
> > +        step += 1
> >      idl.close()
> >      print("%03d: done" % step)
> >
> >
>
>
> --
> Russell Bryant
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to