Hi Jeff,

See my replies below inline.

On 8/6/14, 7:16 AM, Johnston, Jeffrey wrote:
Hi,

I have been using FastqStreamer() and yield() to process a large fastq file in 
chunks, modifying both the read and name and then appending the output to a new 
fastq file as each chunk is processed. This works great, but would benefit 
greatly from being parallelized.

As far as I can tell, this problem isn’t easily solved with the existing 
parallel tools because you can’t determine how many jobs you’ll need in advance 
(you just call yield() until it stops returning reads).

That's right, we don't currently have anything in R like e.g. Python's multiprocessing.Pool: https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

Although we wouldn't want something exactly like that, because Python's implementation exhausts the input stream as fast as possible and buffers all the results in memory if they are not consumed quickly enough, as described here: http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration

I think it would be ideal to have a function that takes an input stream, a BPPARAM, and a maximum number of chunks to buffer, and returns another input stream of the results.


After some digging, I found the sclapply() function in the HTSeqGenie package 
by Gregoire Pau, which he describes as a “multicore dispatcher”:

https://stat.ethz.ch/pipermail/bioc-devel/2013-October/004754.html

I wasn’t able to get the package to install from source due to some 
dependencies (there are no binaries for Mac), but I did extract the function 
and adapt it slightly for my use case. Here’s an example:

processChunk <- function(fq_chunk) {
   # manipulate fastq reads here
}

yieldHelper <- function() {
   fq <- yield(fqstream)
   if(length(fq) == 0) return(NULL)
   fq
}

fqstream <- FastqStreamer(“…”, n=1e6)
sclapply(yieldHelper, processChunk, max.parallel.jobs=4)
close(fqstream)

Based on the discussion linked above, it seems like there was some interest in 
integrating this idea into BiocParallel. I would find that very useful as it 
improves performance quite a bit and can likely be applied to numerous 
stream-based processing tasks.

I will point out that in my case above, the processChunk() function doesn’t 
return anything. Instead it appends the modified fastq records to a new file. I 
have to use the Unix lockfile command to ensure that only one child process 
appends to the output file at a time. I am not certain if there is a more 
elegant solution to this (perhaps a queue that is emptied by a dedicated writer 
process?).

This is the problem, I think. A general solution would allow you to stream the processed results back into R, where you could pass them into another stream filter, or finally consume them. That was the idea behind the example code that I demonstrated, but my code worked a little differently, in that the task of *reading* the fastq file was delegated to a subprocess. So my solution also doesn't generalize to multi-step parallel pipelines.

Thanks,
Jeff


-Ryan

_______________________________________________
Bioc-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/bioc-devel

Reply via email to