After all, I'm posting my angry code.
The first file is a module with some classes, that can be used
to accomplish my task.
The second file is a UNIX program, it uses socketpair() and then fork()
to start both client and server, they talk each other via PB protocol.
Running that produces some noise to the screen and then they exit.

That was my minimal effort...
That is really a partial solution to the problem, but:
 a. it already works for me,
 b. I'm not sure it will work for other protocols,
 c. I found it too complex and involving a lot of other code to make it in a 
proper way
so, sorry, I haven't worked toward a real contribution to a twisted code.

Perhaps anyone may suggest another "proper" solution, more elegant?
I'm interested in having this kind of functionality supported by twisted.

-- 
Alexey.
# -*- coding: utf-8 -*-


from twisted.internet import address, error, base, tcp
import sys
import socket


# Problem1: we have an opened file (descriptor, handle, whatever your system provides),
# we want to run some protocol on top of that file, because that file is
# some sort of a connection to the outside world. Maybe it is a socket.
# But it may really be something like a PTY, we do not depend on that.
# This is a full-duplex select()able and poll()able system IO object,
# that supports read(), write() and close() and may be made O_NONBLOCK with fcntl.
#
# Problem2: we have two opened files, and since they are respectively read- and write- endpoints of
# two (perhaps)independent unidirectional data channels, we want to build a protocol session
# on top of bidirectional transport, that read()s from the first file and
# write()s to the second file. The files may be FIFOs, but we do not depend on
# that, because they also may be single dup()ed PTY or socket(), or even two separate sockets.
#
# Problem3: we have two unidirectional transports. First - is the IReadDescriptor, other is IWriteDescriptor.
# We want to combine that into IReadWriteDescriptor and optionally into IHalfCloseableDescriptor.
# And then use that to bootstrap a protocol on top of that.
#
# We know, what kind of protocol we want. We know precisely, whether it is
# client- or server- side of protocol, when protocol provide such asymmetry, ofcourse.
# And yes, we do want not limit ourselves to using only client or only server protocols.
#
#
#
#
# Proposed Solution: create a Transport instance and marry it with a newly
#  created Protocol instance.
#  Sounds simple.
#  like this:
#    transport = fullDuplexTransportOneFD(fileDescriptor)
#   or
#    transport = fullDuplexTransportTwoFDs(readFileDescripor, writeFileDescriptor)
#   or
#    transport = fullDuplexTransport(readTransport, writeTransport)
#   and then
#    protocol = someProtocol()
#    meet(transport, protocol)
#   or
#    protocol = buildProtocolOnTopOfTransport(protocolFactory, transport)
#
# But since, as does russian proverb say, living a life is not as easy as crossing a field,
# or in other words, life's a bitch,
# the solution is not practically possible to accomplish in a week. At least for a non twisted developer.
#
# Situation is as follows:
# There are Protocol factories. Worse, there are client and server protocol factories.
# They are operated separately and not interchangeable because their interfaces differ.
# Rumors say, that Protocol instances may be built without factories.
# But the code in several ProtocolFactory.buildProtocol() does this:
#   proto = create_the_protocol(...)
# and then
#   proto.factory = self
# So I doubt, that every protocol implementation will function without
# proper self.factory installed. But we want a solution for every
# possible transport/protocol pair, that makes sense ofcourse.
# Again, interface between protocol instance and it's factory is not
# unified among server and client variants.
# So we'll have to keep that ProtocolFactory complication, and keep it happy.
#
# Transports.
# There is Client and Server transport classes in t.i.tcp. Yes.
# And you will have to live with that...
# Every kind of transport is created by respective transport factory,
# that is the only one who knows how to do that.
#
# Transport Factories.
# There is no abstract TransportFactory implementation, that will incorporate common code,
# unrelated to transport type and to the connection side, that will be functional enough
# to construct a transport instance and meet it with a protocol instance.
# There are server transport connection (listen*, Port) and client transport connection (connect*, Connector) builders.
# They both do interface with ProtocolFactory, but interfaces differ.
# They may also interface directly with protocol instance:
#  t.i.tcp.Port.doRead: protocol.makeConnection(transport)
# Transport factory operates based on deep knowledge about the transport's underlying type of file
# and depending on the side, client's or server's.
# I have experimented trying to instantiate a server-side protocol instance
# out of PBServerFactory using a modified Connector and modified Client classes,
# but was stopped by the fact, that PBServerFactory does not provide an interface, that
# a Connector do count on.
# After on-the-fly patching of PBServerFactory instance with a noop methods,
# it worked, but I'm not sure whether that is enough for every other protocol and factory.
# And you know what, I'm sure that should not become a part of the twisted code, it is too ugly.
# Alexey.
#

# this is only a partial solution for one fd which must be a connected socket.

class FDSocketFakeConnector(base.BaseConnector):
    def __init__(self, fd, factory, reactor, transportClass):
        self.fd_arg = fd
        self.transportClass = transportClass
        base.BaseConnector.__init__(self, factory, None, reactor)
        

    def _makeTransport(self):
        import os
        sys.stderr.write("[%d]: FDSocketFakeConnector._makeTransport(0x%x)\n" %(os.getpid(), id(self)))
        self.transport = self.transportClass(self.fd_arg, self, self.reactor)
        return self.transport



class FDSocketTransport(tcp.BaseClient):
    realAddress = ""
    def __init__(self, fd, connector, reactor):
        import os
        sys.stderr.write("[%d]: FDSocketTransport(0x%x)\n" %(os.getpid(), id(self)))
        self.connector = connector
        if isinstance(fd, int) or isinstance(fd, long):
            self.sock = socket.fromfd(fd, self.addressFamily, self.socketType)
        elif isinstance(fd, socket._socket.socket):
            self.sock = fd
        else:
            self._finishInit(None, None, error.ConnectError("unknown socket type: %s" %(type(fd))), reactor)
            return
        self._finishInit(self.doConnect, self.sock, None, reactor)

    def createInternetSocket(self):
        import os
        sys.stderr.write("[%d]: createInternetSocket(0x%x)\n" %(os.getpid(), id(self)))
        self.sock.setblocking(0)
        return self.sock

    def failIfNotConnected(self, err):
        import os
        sys.stderr.write("[%d]: FDSocket.Transport(0x%x).failIfNotConnected(%s)" %(os.getpid(), id(self), err))
        try:
            raise Exception("")
        except:
            e = sys.exc_info()
        frame = e[2].tb_frame
        while frame:
            sys.stderr.write("\n  from %s, line %d" %(frame.f_code, frame.f_lineno))
            frame = frame.f_back
        sys.stderr.write("\n")
        tcp.BaseClient.failIfNotConnected(self, err)

    def doConnect(self):
        # I do not understand all of these, but I just copied all that. Alexey.
        
        if not hasattr(self, "connector"):
            # this happens when connection failed but doConnect
            # was scheduled via a callLater in self._finishInit
            return

        try:
            err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err:
                self.failIfNotConnected(error.getConnectError((err, strerror(err))))
                return
        except socket.error, exc:
            # this happens if you run inetd-managed server from your shell instead. Alexey.
            #
            if exc[0]==88:
                self.failIfNotConnected(SocketOperationOnNonSocket(0, exc))
            else:
                self.failIfNotConnected(error.getConnectError(exc))
            return
        del self.doWrite
        del self.doRead
        # we first stop and then start, to reset any references to the old doRead
        self.stopReading()
        self.stopWriting()
        self._connectDone()

class FDUNIXStreamSocket(FDSocketTransport):
    # don't know whether that additional class helps at all. Alexey.
    addressFamily = socket.AF_UNIX
    socketType = socket.SOCK_STREAM
    
    def getHost(self):
        return ""

    def getPeer(self):
        return ""

class SocketOperationOnNonSocket(error.ConnectError):
    """Application expected some kind of a socket, but got a non-socket instead"""

# use it:
#  c = FDSocketFakeConnector(fd, protocolFactory, reactor, FDUNIXStreamSocket)
#  c.connect()  - that will start the protocol
#
# for server protocol factory
# I have done this before it's usage:
#
#  protocolFactory.startedConnecting = lambda x=None: None
#  protocolFactory.clientConnectionFailed = lambda x=None, y=None: x
#  protocolFactory.clientConnectionLost = lambda x=None, y=None: None
#
# You may use subclassing for that aswell.
#
#!/usr/bin/python
# -*- coding: utf-8 -*-

from twisted.internet import reactor
from twisted.spread import pb
from tx_fake_connector import FDSocketFakeConnector, FDUNIXStreamSocket
import socket
import os
import sys

sock0, sock1 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
pid = os.fork()
if pid: # parent
    # run a server side of PB protocol
    # when done() method is invoked - close connection after assuring all previous messages have
    # been delivered.
    del sock1
    # this is just a senseless Root object, only to test PB is working.
    class test(pb.Root):
        def remote_test(self, arg):
            print "p:test: %s" % (arg)
            return 1
        
        def remote_done(self, arg):
            print "p:done()"
            def invoked(_=None):
                reactor.callLater(0, c.transport.loseConnection)
                reactor.callLater(0, reactor.stop)
            arg.callRemote("").addBoth(invoked)
            return "done() returns nothing"

    # you create/receive your server factory as usual
    factory = pb.PBServerFactory(test())
    # note the patching that is required to server PB factory to work with a Connector, that is ugly.
    factory.startedConnecting = lambda x=None: None
    factory.clientConnectionFailed = lambda x=None, y=None: y
    factory.clientConnectionLost = lambda x=None, y=None: None
    # actually, you may prefer subclassing, but that is still ugly.
    
    c = FDSocketFakeConnector(sock0, factory, reactor, FDUNIXStreamSocket)
    c.connect()
    # that's it
    
    sys.stdout.write("p: connected...\n")
    
else:  # child
    # run client PB instance, inform server when we are done, wait for an acknowledge of that.
    del sock0
    root = None

    # use it like that
    f = pb.PBClientFactory()
    c = FDSocketFakeConnector(sock1, f, reactor, FDUNIXStreamSocket)
    d = c.connect()
    # other code is as usual, just another example of a PB client...

    sys.stdout.write("c: connected..., d= %r, c = %r\n" % (d,c))
    d = f.getRootObject()
    def _gotRoot(obj):
        global root
        root = obj
        sys.stderr.write("[%d]: root = %r\n" %(os.getpid(), root))
        return obj
    def _noRoot(f):
        sys.stderr.write("[%d]: failed to get root: %s\n" %(os.getpid(), f))
        reactor.stop()
        return f
    class noop(pb.Referenceable):
        # do nothing, just to be able to receive a message and then return a None value.
        def remote_(self, _=None):
            return None
    d.addCallbacks(_gotRoot, _noRoot)
    d.addCallback(lambda obj: obj.callRemote("test", "argument"))
    d.addCallbacks(lambda ret: "returned: "+str(ret), lambda f: 'failure:' + str(f))
    d.addCallback(lambda msg="NOMSGPROVIDED": sys.stdout.write("c:Answer1: %s\n" % (msg)) or root and root.callRemote("done", noop()))
    d.addBoth(lambda val="NONE": sys.stdout.write("c:Answer2: %s\n" %(val)) or reactor.callLater(0, reactor.stop))

reactor.run()
_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to