The current implementation IDL is:
1. connects to the OVSDB
2. get the schema and
3. disconnect
4. connect again to work with the OVSDB server.

You can still work like that, if you want, by not sending session object to
the IDL init function:

def __init__(self, remote, schema, session=None):

...


I wanted a different approach.

when working with socket (unix and punix, active or passive mode), there is
no problem as we are on the same server.
when working with tcp it is a bit different. we assume working with
different machine. In case we use active TCP, there is no problem with the
above implementation (connect to read the schema, disconnect, connect to
work), as IDL initiate the session. In passive TCP, the IDL is actually a
server waiting for the client to connect to it.

The changes I made (which are not in the idl.py file) are to **support** a
different approach which is more reasonable for server:
1. wait for connection from OVSDB server
2. upon connection, open a stream and a session object
3. read the schema. leave the stream open.
4. use the open session object for working with the OVSDB server.

This will make IDL work like a server and a server does not disconnect its
client after getting some info and then go back to listen mode to have the
client connect to it again.

Note that the change in the actions are actually not in OVS's python files
but in the project I work on (Neutron/L2GW) and just to emphasize again,
you have a full backward compatibility if you prefer to work in passive
mode 'the old way'.


For the CAVEAT section, it is related to the fact that if the IDL go to
listen mode, any OVSDB server can connect to it. There is no multi session
management in the IDL (as it should be because passive TCP is actually
making it a server). Currently the IDL, when working in passive TCP, is a
single session server. If multiple OVSDB servers will try to connect to it
using the same IP and TCP_PORT, we will have a problem (no mater what
approach as mentioned above we will use).

The problem is that the IDL upon first connection reads the schema and then
we assume we are working with the same OVSDB server. In case a different
server will get connected after the schema will be read we will have a
situation where we have a schema from server A while actually working with
server B.

A way around this problem is if we have multiple OVSDB server - make sure
each one of them uses a different TCP_PORT to connect to the IDL.

There are 3 solutions for this problem:
1. Have the IDL act as a full server, which means to add multi-session
support. I didn't want to add this now in order to make the patch as small
as possible but I can add that soon, if you would like. This is the best
solution but it will change the current code much more than this patch ....
2. Make sure that if we are in passive TCP, erase and re-read the schema
every time the OVSDB server connects to the IDL.
3. Refuse to connect to a server with IP address other then the first
server that was connected and log an error (simplest yet lame).


Ofer.








‫בתאריך יום ד׳, 24 בפבר׳ 2016 ב-2:48 מאת ‪Ben Pfaff‬‏ <‪b...@ovn.org‬‏>:‬

> I spent some time looking this over, just now, and I'm a bit confused.
>
> It's clear that the python implementation of PassiveStream did not
> support TCP.  Great, the patch adds that.  If the patch just added that,
> I'd be happy, I'd just commit it.
>
> What I don't understand is why other changes are needed to support
> passive streams in the IDL.  The python implementation of
> jsonrpc.Session seems to support passive connection just as well as
> active ones; the code is basically the same as the C implementation.  So
> why are other changes needed?
>
> The CAVEAT in the commit message worries me too.  Why doesn't the IDL
> work just the same in this regard for active and passive connections?
>
> Thanks,
>
> Ben.
>
> On Wed, Feb 17, 2016 at 05:22:19PM +0200, Ofer Ben Yacov wrote:
> > From: Ofer Ben-Yacov <ofer.benya...@gmail.com>
> >
> > Currently the IDL does not support passive TCP connection,
> > i.e. when the OVSDB connects to its manager.
> >
> > This patch enables IDL to use an already-open session
> > (the one which was previously used for retrieving the db schema).
> > In addition, it enables IDL to go back to "listen mode" in case the
> connection
> > is lost.
> >
> > LIMITATIONS:
> > ----------------------
> >
> > This patch enables a **SINGLE** TCP connection from an OVSDB server to an
> > agent that uses IDL with {IP,PORT} pair. Therefore, the agent will
> support
> > only **ONE** OVSDB server using {IP,PORT} pair.
> >
> > Future development may add multi-session server capability that will
> allow
> > an agent to use single {IP,PORT} pair to connect to multiple OVSDB
> servers.
> >
> >
> > CAVEAT:
> > --------------
> >
> > When a database first connects to the agent, the agent gets the schema
> and
> > data and builds its tables. If the session disconnects, the agent goes
> back
> > to "listen mode" and accepts **ANY** TCP connection, which means that if
> > another database will try to connect to the agent using the same
> {IP,PORT}
> > pair, it will be connected to the IDL that has the schema and data from
> > the first database.
> >
> > A future patch can resolve this problem.
> >
> > USAGE:
> > -------------
> >
> > To use IDL in passive mode, the following example code can be use:
> >
> > (snippet)
> >
> > from ovs.jsonrpc import Session
> > ...
> >
> > from neutron.agent.ovsdb.native import idlutils
> >
> > ...
> >
> > session = Session.open('ptcp:192.168.10.10:6640')
> >
> > # first call to session.run creates the PassiveStream object and second
> one
> > # accept incoming connection
> > session.run()
> > session.run()
> >
> > # this static method is similar to the original neutron method but the
> > # rpc.close() command that would result closing the socket.
> > helper = idlutils.get_schema_helper_from_stream_no_close(session.stream,
> >         'hardware_vtep')
> > helper.register_all()
> > self.idl = idl.Idl(self.connection, helper, session)
> > idlutils.wait_for_change(self.idl, self.timeout)
> >
> > self.poller = poller.Poller()
> > self.thread = threading.Thread(target=self.run)
> > self.thread.setDaemon(True)
> > self.thread.start()
> >
> >
> > TESTING:
> > ---------------
> > Added unit test for passive mode. See ovsdb-idl.at file.
> >
> > TODO
> > ----
> > Test this patch against C implementation
> >
> >
> > Signed-off-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com>
> >
> > Tested-by: "Ofer Ben-Yacov" <ofer.benya...@gmail.com>
> >
> > Requested-by: Ben Pfaff <b...@nicira.com>,
> > Requested-by: "D M, Vikas" <vikas....@hpe.com>,
> > Requested-by: "Kamat, Maruti Haridas" <maruti.ka...@hpe.com>,
> > Requested-by: "Sukhdev Kapur" <sukh...@arista.com>,
> > Requested-by: "Migliaccio, Armando" <armando.migliac...@hpe.com>
> >
> > ---
> >  python/ovs/db/idl.py  | 18 +++++++++++++++---
> >  python/ovs/jsonrpc.py | 18 ++++++++++--------
> >  python/ovs/stream.py  | 37 +++++++++++++++++++++++++------------
> >  tests/ovsdb-idl.at    | 31 +++++++++++++++++++++++++++++++
> >  tests/test-ovsdb.py   | 47
> ++++++++++++++++++++++++++++++-----------------
> >  5 files changed, 111 insertions(+), 40 deletions(-)
> >
> > diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> > index e69d35e..24e9f11 100644
> > --- a/python/ovs/db/idl.py
> > +++ b/python/ovs/db/idl.py
> > @@ -86,7 +86,7 @@ class Idl(object):
> >        currently being constructed, if there is one, or None otherwise.
> >  """
> >
> > -    def __init__(self, remote, schema):
> > +    def __init__(self, remote, schema, session=None):
> >          """Creates and returns a connection to the database named
> 'db_name' on
> >          'remote', which should be in a form acceptable to
> >          ovs.jsonrpc.session.open().  The connection will maintain an
> in-memory
> > @@ -104,7 +104,16 @@ class Idl(object):
> >          As a convenience to users, 'schema' may also be an instance of
> the
> >          SchemaHelper class.
> >
> > -        The IDL uses and modifies 'schema' directly."""
> > +        The IDL uses and modifies 'schema' directly.
> > +
> > +        In passive mode ( where the OVSDB server connects to its
> manager ),
> > +        we first need to wait for the OVSDB server to connect and then
> > +        pass the 'session' object (while the it is still open ) and
> > +        the schema we retrieved from the open session to the IDL to use
> it.
> > +
> > +        If in active mode, do not pass 'session' and it will be created
> > +        by IDL by using 'remote'.
> > +        """
> >
> >          assert isinstance(schema, SchemaHelper)
> >          schema = schema.get_idl_schema()
> > @@ -112,7 +121,10 @@ class Idl(object):
> >          self.tables = schema.tables
> >          self.readonly = schema.readonly
> >          self._db = schema
> > -        self._session = ovs.jsonrpc.Session.open(remote)
> > +        if session:
> > +            self._session = session
> > +        else:
> > +            self._session = ovs.jsonrpc.Session.open(remote)
> >          self._monitor_request_id = None
> >          self._last_seqno = None
> >          self.change_seqno = 0
> > diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> > index e3ef6db..8f63a75 100644
> > --- a/python/ovs/jsonrpc.py
> > +++ b/python/ovs/jsonrpc.py
> > @@ -429,23 +429,25 @@ class Session(object):
> >          self.__disconnect()
> >
> >          name = self.reconnect.get_name()
> > -        if not self.reconnect.is_passive():
> > -            error, self.stream = ovs.stream.Stream.open(name)
> > +        if self.reconnect.is_passive():
> > +            if self.pstream is not None:
> > +                self.pstream.close()
> > +            error, self.pstream = ovs.stream.PassiveStream.open(name)
> >              if not error:
> > -                self.reconnect.connecting(ovs.timeval.msec())
> > +                self.reconnect.listening(ovs.timeval.msec())
> >              else:
> >                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
> > -        elif self.pstream is not None:
> > -            error, self.pstream = ovs.stream.PassiveStream.open(name)
> > +        else:
> > +            error, self.stream = ovs.stream.Stream.open(name)
> >              if not error:
> > -                self.reconnect.listening(ovs.timeval.msec())
> > +                self.reconnect.connecting(ovs.timeval.msec())
> >              else:
> >                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
> >
> >          self.seqno += 1
> >
> >      def run(self):
> > -        if self.pstream is not None:
> > +        if self.pstream is not None and self.stream is None:
> >              error, stream = self.pstream.accept()
> >              if error == 0:
> >                  if self.rpc or self.stream:
> > @@ -455,11 +457,11 @@ class Session(object):
> >                      self.__disconnect()
> >                  self.reconnect.connected(ovs.timeval.msec())
> >                  self.rpc = Connection(stream)
> > +                self.stream = stream
> >              elif error != errno.EAGAIN:
> >                  self.reconnect.listen_error(ovs.timeval.msec(), error)
> >                  self.pstream.close()
> >                  self.pstream = None
> > -
> >          if self.rpc:
> >              backlog = self.rpc.get_backlog()
> >              self.rpc.run()
> > diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> > index bc14836..656ba74 100644
> > --- a/python/ovs/stream.py
> > +++ b/python/ovs/stream.py
> > @@ -271,9 +271,9 @@ class PassiveStream(object):
> >      @staticmethod
> >      def is_valid_name(name):
> >          """Returns True if 'name' is a passive stream name in the form
> > -        "TYPE:ARGS" and TYPE is a supported passive stream type
> (currently only
> > -        "punix:"), otherwise False."""
> > -        return name.startswith("punix:")
> > +        "TYPE:ARGS" and TYPE is a supported passive stream type
> (currently
> > +        "punix:" or "ptcp"), otherwise False."""
> > +        return name.startswith("punix:") | name.startswith("ptcp:")
> >
> >      def __init__(self, sock, name, bind_path):
> >          self.name = name
> > @@ -284,22 +284,32 @@ class PassiveStream(object):
> >      def open(name):
> >          """Attempts to start listening for remote stream connections.
> 'name'
> >          is a connection name in the form "TYPE:ARGS", where TYPE is an
> passive
> > -        stream class's name and ARGS are stream class-specific.
> Currently the
> > -        only supported TYPE is "punix".
> > +        stream class's name and ARGS are stream class-specific.
> Currently the
> > +        supported values for TYPE are "punix" and "ptcp".
> >
> >          Returns (error, pstream): on success 'error' is 0 and 'pstream'
> is the
> >          new PassiveStream, on failure 'error' is a positive errno value
> and
> >          'pstream' is None."""
> >          if not PassiveStream.is_valid_name(name):
> >              return errno.EAFNOSUPPORT, None
> > -
> > -        bind_path = name[6:]
> > +        bind_path = None
> >          if name.startswith("punix:"):
> > +            bind_path = name[6:]
> >              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR,
> bind_path)
> > -        error, sock =
> ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > -                                                       True, bind_path,
> None)
> > -        if error:
> > -            return error, None
> > +            error, sock =
> ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> > +                                                           True,
> bind_path,
> > +                                                           None)
> > +            if error:
> > +                return error, None
> > +
> > +        elif name.startswith("ptcp:"):
> > +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> > +            remote = name.split(':')
> > +            sock.bind((remote[1], int(remote[2])))
> > +
> > +        else:
> > +            raise Exception('Unknown connection string')
> >
> >          try:
> >              sock.listen(10)
> > @@ -330,7 +340,10 @@ class PassiveStream(object):
> >              try:
> >                  sock, addr = self.socket.accept()
> >                  ovs.socket_util.set_nonblocking(sock)
> > -                return 0, Stream(sock, "unix:%s" % addr, 0)
> > +                if (sock.family == socket.AF_UNIX):
> > +                    return 0, Stream(sock, "unix:%s" % addr, 0)
> > +                return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> > +                                                       str(addr[1])), 0)
> >              except socket.error as e:
> >                  error = ovs.socket_util.get_exception_errno(e)
> >                  if error != errno.EAGAIN:
> > diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> > index ebf82a5..813812e 100644
> > --- a/tests/ovsdb-idl.at
> > +++ b/tests/ovsdb-idl.at
> > @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
> >     OVSDB_CHECK_IDL_TCP_PY($@)
> >     OVSDB_CHECK_IDL_TCP6_PY($@)])
> >
> > +
> > +# This test uses the Python IDL implementation with passive tcp
> > +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
> > +  [AT_SETUP([$1 - Python ptcp])
> > +   AT_SKIP_IF([test $HAVE_PYTHON = no])
> > +   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
> > +   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> > +                  [0], [stdout], [ignore])
> > +   # find free TCP port
> > +   AT_CHECK([ovsdb-server --log-file
> '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir
> --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1
> --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> > +   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
> > +   AT_CHECK([kill `cat pid`])
> > +
> > +   # start OVSDB server in passive mode
> > +   AT_CHECK([ovsdb-server --log-file
> '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir
> --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT
> --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> > +   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl
> $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
> > +      [0], [stdout], [ignore], [kill `cat pid`])
> > +   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,,
> [[| $6]]),
> > +            [0], [$4], [], [kill `cat pid`])
> > +   AT_CLEANUP
> > +   ])
> > +
> > +
> > +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty,
> select empty],
> > +  [],
> > +  [['["idltest",{"op":"select","table":"link1","where":[]}]']],
> > +  [[000: empty
> > +001: {"error":null,"result":[{"rows":[]}]}
> > +002: done
> > +]])
> > +
> >  OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
> >    [],
> >    [],
> > diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> > index 73c3048..c28ed6b 100644
> > --- a/tests/test-ovsdb.py
> > +++ b/tests/test-ovsdb.py
> > @@ -407,17 +407,29 @@ def do_idl(schema_file, remote, *commands):
> >          commands = commands[1:]
> >      else:
> >          schema_helper.register_all()
> > -    idl = ovs.db.idl.Idl(remote, schema_helper)
> >
> > -    if commands:
> > -        error, stream = ovs.stream.Stream.open_block(
> > -            ovs.stream.Stream.open(remote))
> > -        if error:
> > -            sys.stderr.write("failed to connect to \"%s\"" % remote)
> > -            sys.exit(1)
> > -        rpc = ovs.jsonrpc.Connection(stream)
> > +    passive = remote.startswith('ptcp')
> > +    if passive:
> > +        session = ovs.jsonrpc.Session.open(remote)
> > +        # first call to session.run creates the PassiveStream object and
> > +        # second one accept incoming connection
> > +        session.run()
> > +        session.run()
> > +
> > +        rpc = session.rpc
> > +        idl = ovs.db.idl.Idl(remote, schema_helper, session)
> >      else:
> > -        rpc = None
> > +        idl = ovs.db.idl.Idl(remote, schema_helper)
> > +
> > +        if commands:
> > +            error, stream = ovs.stream.Stream.open_block(
> > +                ovs.stream.Stream.open(remote))
> > +            if error:
> > +                sys.stderr.write("failed to connect to \"%s\"" % remote)
> > +                sys.exit(1)
> > +            rpc = ovs.jsonrpc.Connection(stream)
> > +        else:
> > +            rpc = None
> >
> >      symtab = {}
> >      seqno = 0
> > @@ -475,14 +487,15 @@ def do_idl(schema_file, remote, *commands):
> >              sys.stdout.write("%s\n" %
> ovs.json.to_string(reply.to_json()))
> >              sys.stdout.flush()
> >
> > -    if rpc:
> > -        rpc.close()
> > -    while idl.change_seqno == seqno and not idl.run():
> > -        poller = ovs.poller.Poller()
> > -        idl.wait(poller)
> > -        poller.block()
> > -    print_idl(idl, step)
> > -    step += 1
> > +    if not passive:
> > +        if rpc:
> > +            rpc.close()
> > +        while idl.change_seqno == seqno and not idl.run():
> > +            poller = ovs.poller.Poller()
> > +            idl.wait(poller)
> > +            poller.block()
> > +        print_idl(idl, step)
> > +        step += 1
> >      idl.close()
> >      print("%03d: done" % step)
> >
> > --
> > 2.1.4
> >
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to