Re: Iterative algorithm in BEAM

2021-01-21 Thread Tucker Barbour
There isn't a problem with the output being a flattened PCollection. I think that's actually what I'm intending to accomplish. A naive implementation would have a DoFn take a zip file as input (probably as FileIO.ReadableFile), recursively extract elements from the zip, i.e. extract an email from t

Re: Iterative algorithm in BEAM

2021-01-21 Thread Alexey Romanenko
I guess that the main issue here is that Zip format is not splittable. So if it’s one huge zip file, you still won’t be able to parallelise the read. So, if you could unzip such file in advance and read the path with these files from zip in your pipeline, then it should be automatically read in

Re: Iterative algorithm in BEAM

2021-01-21 Thread Tucker Barbour
Getting away from BEAM a bit, but there _could_ be a way to enable parallel reads of a single zip by first reading just the central directory and generating elements in an output PCollection containing a triple of byte offset of an entry, the compressed size of the entry, and the type of compressio

Re: Iterative algorithm in BEAM

2021-01-21 Thread Kenneth Knowles
Your idea works: first read the directory metadata and then downstream an SDF that jumps to file offsets. This is very much what SDF is for. Splitting within a zip entry will not be useful. If you have indeterminate nesting depth, you do need iterative computation. Beam doesn't directly support th

Re: Iterative algorithm in BEAM

2021-01-21 Thread Tucker Barbour
I do have an indeterminate nested depth which has always be the source of issues. However, if I can use an SDF to read entries from a zip in parallel it might be acceptable to do the rest of the item extraction in a single DoFn, i.e. a call to `processElement` will recursively extract all attachmen

Re: Iterative algorithm in BEAM

2021-01-21 Thread Tucker Barbour
The initial DoFn would basically look like this where the call to zip.entries is able to just read the central directory of the zip file instead of having to read the entire contents of the file class SplitZip : DoFn() { @ProcessElement fun processElement(context: ProcessElement) { val ele