The following code is a attempt at port splitter: I want to forward data coming on tcp connection to several host/port addresses. It sort of works, but I am not happy with it. asyncore based code is supposed to be simple, but I need while loops and a lot of try/except clauses. Also, I had to add suspend/activate_channel methods in the Writer class that use variables with leading underscores. Otherwise the handle_write() method is called in a tight loop. I designed the code by looking at Python 2.3 source for asyncore and originally wanted to use add_channel() and del_channel() methods. However in Python 2.6 del_channel() closes the socket in addition to deleting it from the map. I do not want to have one connection per message, the traffic may be high and there are no message delimiters. The purpose of this exercise is to split incoming operational data so I can test a new version of software. Comments please - I have cognitive dissonance about the code, my little yellow rubber duck is of no help here.
The code is run as:

python2.6 afwdport.py 50002 localhost 50003 catbert 50001

where 50002 is the localhost incoming data port, (localhost, 50003) and (catbert, 50001) are destinations.

George

import asyncore, os, socket, sys, time

TMOUT = 10

#----------------------------------------------------------------------
def log_msg(msg):
    print >> sys.stderr, '%s: %s' % (time.ctime(), msg)

#----------------------------------------------------------------------
class Reader(asyncore.dispatcher):
    def __init__(self, sock, writers):
        asyncore.dispatcher.__init__(self, sock)
        self.writers = writers

    def handle_read(self):
        data = self.recv(1024)
        for writer in self.writers:
            writer.add_data(data)

    def handle_expt(self):
        self.handle_close()

    def handle_close(self):
        log_msg('closing reader connection')
        self.close()

    def writable(self):
        return False

#----------------------------------------------------------------------
class Writer(asyncore.dispatcher):
    def __init__(self, address):
        asyncore.dispatcher.__init__(self)
        self.address = address
        self.data = ''
        self.mksocket()

    def suspend_channel(self, map=None):
        fd = self._fileno
        if map is None:
            map = self._map
        if fd in map:
            del map[fd]

    def activate_channel(self):
        if self._fileno not in self._map:
            self._map[self._fileno] = self

    def mksocket(self):
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.connect(self.address)
        log_msg('connected to %s' % str(self.address))

    def add_data(self, data):
        self.data += data
        self.activate_channel()

    def handle_write(self):
        while self.data:
            log_msg('sending data to %s' % str(self.address))
            sent = self.send(self.data)
            self.data = self.data[sent:]
        self.suspend_channel()

    def handle_expt(self):
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        log_msg(asyncore._strerror(err))
        self.handle_close()

    def handle_close(self):
        log_msg('closing writer connection')
        self.close()
        # try to reconnect
        time.sleep(TMOUT)
        self.mksocket()

    def readable(self):
        return False

#----------------------------------------------------------------------
class Dispatcher(asyncore.dispatcher):
    def __init__(self, port, destinations):
        asyncore.dispatcher.__init__(self)
        self.address = socket.gethostbyname(socket.gethostname()), port
        self.writers = [Writer(_) for _ in destinations]
        self.reader = None
        self.handle_connect()

    def handle_connect(self):
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(self.address)
        self.listen(1)
        log_msg('listening on %s' % str(self.address))

    def handle_accept(self):
        conn, addr = self.accept()
        log_msg('connection from %s' % str(addr))
        # current read connection not closed for some reason
        if self.reader:
            self.reader.close()
        self.reader = Reader(conn, self.writers)

    def cleanup(self):
        try:
            if self.reader:
                self.reader.close()
        except socket.error, e:
            log_msg('error closing reader connection %s' % e)
        # writer might have unwatched connections
        for w in self.writers:
            try:
                w.close()
            except socket.error, e:
                log_msg('error closing writer connection %s' % e)

#----------------------------------------------------------------------
def main(port, destinations):
    disp = None
    try:
        # asyncore.loop() exits when input connection closes
        while True:
            try:
                disp = Dispatcher(port, destinations)
                asyncore.loop(timeout=TMOUT, use_poll=True)
            except socket.error, (errno, e):
                if errno == 98:
                    log_msg('sleeping %d s: %s', (30, e))
                    time.sleep(30)
    except BaseException, e:
        log_msg('terminating - uncaught exception: %s' % e)
        raise SystemExit
    finally:
        if disp:
            disp.cleanup()

#----------------------------------------------------------------------
if __name__ == '__main__':
    nargs = len(sys.argv)
    try:
        assert nargs > 3 and nargs % 2 == 0
        port = int(sys.argv[1])
        destinations = [(sys.argv[n], int(sys.argv[n+1])) \
            for n in range(2, nargs-1, 2)]
        main(port, destinations)
    except (AssertionError, ValueError), e:
        print 'Error: %s' % e
        print 'Usage: python %s local-port host port ...' % sys.argv[0]
        raise SystemExit(1)
--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to