The following code is a attempt at port splitter: I want to forward data
coming on tcp connection to several host/port addresses. It sort of
works, but I am not happy with it. asyncore based code is supposed to be
simple, but I need while loops and a lot of try/except clauses. Also, I
had to add suspend/activate_channel methods in the Writer class that use
variables with leading underscores. Otherwise the handle_write() method
is called in a tight loop. I designed the code by looking at Python 2.3
source for asyncore and originally wanted to use add_channel() and
del_channel() methods. However in Python 2.6 del_channel() closes the
socket in addition to deleting it from the map. I do not want to have
one connection per message, the traffic may be high and there are no
message delimiters. The purpose of this exercise is to split incoming
operational data so I can test a new version of software.
Comments please - I have cognitive dissonance about the code, my little
yellow rubber duck is of no help here.
The code is run as:
python2.6 afwdport.py 50002 localhost 50003 catbert 50001
where 50002 is the localhost incoming data port, (localhost, 50003) and
(catbert, 50001) are destinations.
George
import asyncore, os, socket, sys, time
TMOUT = 10
#----------------------------------------------------------------------
def log_msg(msg):
print >> sys.stderr, '%s: %s' % (time.ctime(), msg)
#----------------------------------------------------------------------
class Reader(asyncore.dispatcher):
def __init__(self, sock, writers):
asyncore.dispatcher.__init__(self, sock)
self.writers = writers
def handle_read(self):
data = self.recv(1024)
for writer in self.writers:
writer.add_data(data)
def handle_expt(self):
self.handle_close()
def handle_close(self):
log_msg('closing reader connection')
self.close()
def writable(self):
return False
#----------------------------------------------------------------------
class Writer(asyncore.dispatcher):
def __init__(self, address):
asyncore.dispatcher.__init__(self)
self.address = address
self.data = ''
self.mksocket()
def suspend_channel(self, map=None):
fd = self._fileno
if map is None:
map = self._map
if fd in map:
del map[fd]
def activate_channel(self):
if self._fileno not in self._map:
self._map[self._fileno] = self
def mksocket(self):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.connect(self.address)
log_msg('connected to %s' % str(self.address))
def add_data(self, data):
self.data += data
self.activate_channel()
def handle_write(self):
while self.data:
log_msg('sending data to %s' % str(self.address))
sent = self.send(self.data)
self.data = self.data[sent:]
self.suspend_channel()
def handle_expt(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
log_msg(asyncore._strerror(err))
self.handle_close()
def handle_close(self):
log_msg('closing writer connection')
self.close()
# try to reconnect
time.sleep(TMOUT)
self.mksocket()
def readable(self):
return False
#----------------------------------------------------------------------
class Dispatcher(asyncore.dispatcher):
def __init__(self, port, destinations):
asyncore.dispatcher.__init__(self)
self.address = socket.gethostbyname(socket.gethostname()), port
self.writers = [Writer(_) for _ in destinations]
self.reader = None
self.handle_connect()
def handle_connect(self):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(self.address)
self.listen(1)
log_msg('listening on %s' % str(self.address))
def handle_accept(self):
conn, addr = self.accept()
log_msg('connection from %s' % str(addr))
# current read connection not closed for some reason
if self.reader:
self.reader.close()
self.reader = Reader(conn, self.writers)
def cleanup(self):
try:
if self.reader:
self.reader.close()
except socket.error, e:
log_msg('error closing reader connection %s' % e)
# writer might have unwatched connections
for w in self.writers:
try:
w.close()
except socket.error, e:
log_msg('error closing writer connection %s' % e)
#----------------------------------------------------------------------
def main(port, destinations):
disp = None
try:
# asyncore.loop() exits when input connection closes
while True:
try:
disp = Dispatcher(port, destinations)
asyncore.loop(timeout=TMOUT, use_poll=True)
except socket.error, (errno, e):
if errno == 98:
log_msg('sleeping %d s: %s', (30, e))
time.sleep(30)
except BaseException, e:
log_msg('terminating - uncaught exception: %s' % e)
raise SystemExit
finally:
if disp:
disp.cleanup()
#----------------------------------------------------------------------
if __name__ == '__main__':
nargs = len(sys.argv)
try:
assert nargs > 3 and nargs % 2 == 0
port = int(sys.argv[1])
destinations = [(sys.argv[n], int(sys.argv[n+1])) \
for n in range(2, nargs-1, 2)]
main(port, destinations)
except (AssertionError, ValueError), e:
print 'Error: %s' % e
print 'Usage: python %s local-port host port ...' % sys.argv[0]
raise SystemExit(1)
--
http://mail.python.org/mailman/listinfo/python-list