Thanks for the response Gabriel.
On Wed, Jul 1, 2009 at 12:54 AM, Gabriel Genellina<gagsl-...@yahoo.com.ar> wrote: > En Tue, 30 Jun 2009 22:52:18 -0300, Mag Gam <magaw...@gmail.com> escribió: > >> I am very new to python and I am in the process of loading a very >> large compressed csv file into another format. I was wondering if I >> can do this in a multi thread approach. > > Does the format conversion involve a significant processing time? If not, > the total time is dominated by the I/O time (reading and writing the file) > so it's doubtful you gain anything from multiple threads. The format does inolve significant time processing each line. > >> Here is the pseudo code I was thinking about: >> >> Let T = Total number of lines in a file, Example 1000000 (1 million >> files) >> Let B = Total number of lines in a buffer, for example 10000 lines >> >> >> Create a thread to read until buffer >> Create another thread to read buffer+buffer ( So we have 2 threads >> now. But since the file is zipped I have to wait until the first >> thread is completed. Unless someone knows of a clever technique. >> Write the content of thread 1 into a numpy array >> Write the content of thread 2 into a numpy array > > Can you process each line independently? Is the record order important? If > not (or at least, some local dis-ordering is acceptable) you may use a few > worker threads (doing the conversion), feed them thru a Queue object, put > the converted lines into another Queue, and let another thread write the > results onto the destination file. Yes, each line can be independent. The original file is a time series file which I am placing it into a Numpy array therefore I don't think the record order is important. The writing is actually done when I place a value into a "dset" object. Let me show you what I mean. reader=csv.reader(open("100000.csv")) for s,row in enumerate(reader): if s!=0 and s%bufsize==0: dset[s-bufsize:s] = t #here is where I am writing the data to the data structure. Using a range or broadcasting. #15 columns if len(row) != 15: break t[s%bufsize] = tuple(row) #Do this all the way at the end for flushing. if (s%bufsize != 0): dset[(s//bufsize)*bufsize:s]=t[0:s%bufsize] > > import Queue, threading, csv > > def convert(in_queue, out_queue): > while True: > row = in_queue.get() > if row is None: break > # ... convert row > out_queue.put(converted_line) > > def write_output(out_queue): > while True: > line = out_queue.get() > if line is None: break > # ... write line to output file > > in_queue = Queue.Queue() > out_queue = Queue.Queue() > tlist = [] > for i in range(4): > t = threading.Thread(target=convert, args=(in_queue, out_queue)) > t.start() > tlist.append(t) > output_thread = threading.Thread(target=write_output, args=(out_queue,)) > output_thread.start() > > with open("...") as csvfile: > reader = csv.reader(csvfile, ...) > for row in reader: > in_queue.put(row) > > for t in tlist: in_queue.put(None) # indicate end-of-work > for t in tlist: t.join() # wait until finished > out_queue.put(None) > output_thread.join() # wait until finished > > -- > Gabriel Genellina > > -- > http://mail.python.org/mailman/listinfo/python-list > -- http://mail.python.org/mailman/listinfo/python-list