I'm building a threaded file searcher that uses some of Fredrik Lundh's ( http://effbot.org/zone/wide-finder.htm) suggestions for parsing text very quickly in pure python, as I have about a 10GB log file to parse every day. A naiive approach would be to just parse the 1MB chunks, add the results into a list, and just traverse that list.
I want to take this idea a bit further. I want to process results as they're being found. A great way to implement this is to use the Queue class that python provides. My idea is to exploit the iterator protocol to have it block until a result is found, if any, and return the result until we're finished parsing the file then we can raise StopIteration. My idea is sort of similar to a producer / consumer, but it follows something of this idiom: producer produces the file chunks consumer consumes the file chunks -> consumer parsers the file chunks and produces results class waits on the production of the original consumer and processes it as they come. I am having a bit of trouble with the concurrency, but I'm using this as an exercise to understand how concurrency works from a broader scale. I am not trying to get into a debate of whether this is really needed or a python-concurrency debate:) Without further ado, my class is as follows: class ThreadedFileSearcher(object): def __init__(self, filename, rx_pat, blocking_timeout = 10): self.name = filename self.pattern = rx_pat self.blocking_timeout = blocking_timeout #need to find a better way to do this with more threads that can return #stable results (aka chunks are in order) self._thread_count = 1 #the queues self._results = Queue.Queue() self._queue = Queue.Queue() #returns the get_chunk() implementation self._engine = LogParsingEngine(filename) #start acquiring file offsets for the file #as daemon threads self._initialize_worker_threads(self._prime_queue) #experimental...should probably be some type of conditional variable self._stop_processing = False def __iter__(self): #start the worker threads self._initialize_worker_threads(self._target_worker) return self.next() def _initialize_worker_threads(self, callback): #should really use just one thread for i in xrange(self._thread_count): t = threading.Thread(target=callback) t.setDaemon(True) t.start() def _prime_queue(self): """put code chunk offsets on the queue""" #get_chunks() just returns 1MB offsets in the file for chunk in self._engine.get_chunks(): self._queue.put(chunk) def _target_worker(self): """code chunk to parse queue""" #loop infinitely while True: try: #get next chunk offset from the queue start_pos, bytes_to_read = self._queue.get( timeout=self.blocking_timeout ) except (TypeError, Queue.Empty): #None was returned from the .get() #this will throw a TypeError as it tries to unpack None #or the Queue was empty self._stop_processing = True #exit loop break #process the cunk here f = open(self.name, 'r') f.seek(start_pos) #find all matching lines in the chunk for chunk in self.pattern.findall(f.read(bytes_to_read)): #an non-overlapping matches of self.pattern #placed on the queue as a string self._results.put(chunk) f.close() #done! self._queue.task_done() def next(self): while True: try: #wait for the results to be put on matchedlist = self._results.get(timeout=self.blocking_timeout) except Queue.Empty: #if the worker thread finished if self._stop_processing: raise StopIteration else: self._results.task_done() yield matchedlist To use the following class, I wanted to have some kind of interface like this: regex = re.compile("-{3}Processing..-{3}") #---Processing..--- f = ThreadedFileSearcher("LogFile.log", regex) for line in f: #guaranteed to be a line that matches regex #process something... print line I am looking for some suggestions, comments, and better ways to modify this. One thing someone will realize when using this class is that the initial parsing will be incredibly quick, but if the blocking_timeout is set to 10, then there will be a 10second wait at the end to test if the worker threads should set the stop conditions. A glaring hole leading from this is if the blocking_timeout is set to something like 1second and by the time a user attempts to iterate over the results, the worker threads will prematurely stop processing. Speaking of stop processing, should self._stop_processing be a conditional variable. Right now it's a boolean, and I think that's pretty hacky. I don't like the StopIteration stop condition, maybe someone has a better suggestion? A future modification I'm looking for is to launch multiple threads that process different parts of the file (different chunks) and return their results, probably indexed by their chunk offset. Then I can iterate over that sequentially. I think that would be a trivial parallel optimization. Thoughts? Comments? Thanks very much, Mahmoud Abdelkader mahm...@linux.com http://blog.mahmoudimus.com/
-- http://mail.python.org/mailman/listinfo/python-list