I just wrote a fun class that lets you - submit jobs to be dispatched to a queue - manage how many tasks are in progress at once - dynamically adjust that number - shut down cleanly, including - recovering jobs that were queued but hadn't been dispatched
This uses a combination of a DeferredQueue, a task.Cooperator, and the DeferredPool I posted on Monday. For now I named it ResizableDispatchQueue (not a great name, suggestions welcome). You can pick it up from http://pastebin.com/f7dc9320e I can think of lots of uses. Here's a simple example. You want to write a server with a web interface that allows people to enter their phone number so you can send them an SMS. You anticipate lots of people will use the service. But sending SMS messages is quite slow, and the company that you ship those jobs off to is concerned that you'll overrun their service (or maybe they have an API limit, etc). So you need to queue up jobs locally and send them off at a certain rate. You'd like to be able to adjust that rate up or down. You also want to be able to shut your service down cleanly (i.e., not in the middle of a task), and when you restart it you want to be able to re-queue the jobs that were queued last time but which hadn't gone out. For example, suppose your function that sends the SMS is called sendSMS and that it takes a (number, message) tuple arg. Then: dispatcher = ResizableDispatchQueue(sendSMS) # Tell it to send at most 5 things at once. dispatcher.start(5) # Same as dispatcher.width = 5 # Later... send off some SMS messages. dispatcher.put((2127399921, 'Hello...')) dispatcher.put((5052929919, 'Test...')) # Later, bump up to 10 simultaneous jobs. dispatcher.width = 10 # Oops, turns out we're sending too fast, turn it down a little. dispatcher.narrow(3) # Get a copy of the list of pending jobs. jobs = dispatcher.pending() # Arrange to increase the number of jobs in an hour's time. reactor.callLater(3600, dispatcher.setWidth, 20) # Time to shutdown. Wait for any tasks underway to complete, and save # the list of jobs not yet dispatched. def saveJobs(jobs): pickle.dump(jobs, ...) d = dispatcher.stop() d.addCallback(saveJobs) On restart you just unpickle the old job list and pass its items to dispatcher.put(). I have a small test suite that's a bit weird (it schedules various things and tests how long the overall job takes and what's still pending when stop is called). It could be much better, but it does at least illustrate that the code seems to work. Let me know if you want it. There's also the issue about what to do when the dispatch function hits an error. An option could be added to re-queue the job, but it's perhaps better to let the dispatch function do that along with whatever else it needs. As usual, I'd be happy to hear comments and suggestions. I'll probably adjust this so the DeferredQueue uses a priority queue. Terry _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python