On Apr 20, 12:25 pm, Dun Peal <dunpea...@gmail.com> wrote: > Hi, > > I'm writing and testing an asyncore-based server. Unfortunately, it > doesn't seem to work. The code below is based on the official docs and > examples, and starts a listening and sending dispatcher, where the > sending dispatcher connects and sends a message to the listener - yet > Handler.handle_read() never gets called, and I'm not sure why. Any > ideas? > > Thanks, D. > > import asyncore, socket, sys > > COMM_PORT = 9345 > > class Handler(asyncore.dispatcher): > def handle_read(self): > print 'This never prints' > > class Listener(asyncore.dispatcher): > def __init__(self, port=COMM_PORT): > asyncore.dispatcher.__init__(self) > self.create_socket(socket.AF_INET, socket.SOCK_STREAM) > self.set_reuse_addr() > self.bind(('', port)) > self.listen(5) > > def handle_accept(self): > client, addr = self.accept() > print 'This prints.' > return Handler(client) > > class Sender(asyncore.dispatcher): > def __init__(self, host): > asyncore.dispatcher.__init__(self) > self.create_socket(socket.AF_INET, socket.SOCK_STREAM) > self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) > self.connect( (host, COMM_PORT) ) > self.buffer = 'Msg\r\n' > > def handle_connect(self): > pass > > def writable(self): > return len(self.buffer) > 0 > > def handle_write(self): > sent = self.send(self.buffer) > self.buffer = self.buffer[sent:] > > def test_communication(): > from multiprocessing import Process > def listener(): > l = Listener() > asyncore.loop(timeout=10, count=1) > lis = Process(target=listener) > lis.start() > def sender(): > s = Sender('localhost') > asyncore.loop(timeout=10, count=1) > sen = Process(target=sender) > sen.start() > lis.join() > > test_communication()
You didn't let the program run long enough for the later events to happen. loop(count=1) basically means one I/O event will be processed - in the case of your example, that's an accept(). Then asyncore is done and it never gets to your custom handle_read. So you can try passing a higher count to loop, or you can add your own loop around the loop call. Or you can switch to Twisted which actually makes testing a lot easier than this - no need to spawn multiple processes or call accept or recv yourself. Here's a somewhat equivalent Twisted-based version of your program: from twisted.internet.protocol import ServerFactory, Protocol from twisted.internet import reactor factory = ServerFactory() factory.protocol = Protocol reactor.listenTCP(0, factory) reactor.run() It's hard to write the equivalent unit test, because the test you wrote for the asyncore-based version is testing lots of low level details which, as you can see, don't actually appear in the Twisted- based version because Twisted does them for you already. However, once you get past all that low-level stuff and get to the part where you actually implement some of your application logic, you might have tests for your protocol implementation that look something like this: from twisted.trial.unittest import TestCase from twisted.test.proto_helpers import StringTransport from yourapp import Handler # Or a better name class HandlerTests(TestCase): def test_someMessage(self): """ When the "X" message is received, the "Y" response is sent back. """ transport = StringTransport() protocol = Handler() protocol.makeConnection(transport) protocol.dataReceived("X") self.assertEqual(transport.value(), "Y") Hope this helps, Jean-Paul -- http://mail.python.org/mailman/listinfo/python-list