I guess that the main issue here is that Zip format is not splittable. So if it’s one huge zip file, you still won’t be able to parallelise the read. So, if you could unzip such file in advance and read the path with these files from zip in your pipeline, then it should be automatically read in parallel. Otherwise, one zip file (despite of the size) can be processed only by one single worker (instance of DoFn).
> On 21 Jan 2021, at 12:44, Tucker Barbour <tucker.barb...@gmail.com> wrote: > > There isn't a problem with the output being a flattened PCollection. I think > that's actually what I'm intending to accomplish. A naive implementation > would have a DoFn take a zip file as input (probably as FileIO.ReadableFile), > recursively extract elements from the zip, i.e. extract an email from the zip > and then immediately extract the attachments from an email and so on. For > each item where an item is either an email or an attachment we output each > item as an individual record to the output PCollection. This would create a > flattened PCollection for all items in the file hierarchy. However, this > becomes problematic when the zip file is large, several million files. To > avoid doing all the work in a single DoFn, I've implemented this iterative > solution with a simple do/while: > > fun main(args: Array<String>) { > do { > val (pipeline, options) = createPipelineAndOptions(args) > > val data = pipeline.apply(readFilesToExtract(options)) > val exclusionSet = pipeline.apply(readExclusionSet(options)) > > // Could use side input or group by key > val inclusionSet = filter(data, exclusionSet) > > val extractedItems = inclusionSet.apply(ParDo.of(ExtractEmbeddedItems())) > > extractedItems.apply(WriteExtractedItemsToDatabase()) > } while (hasExtractedItems(pipeline)) > } > > Each call to ExtractEmbeddedItems only extracts a single level in the file > hierarchy. For example, the first iteration would extract all emails in a zip > but _not_ extract attachments. The second iteration would extract all the > attachments from the emails found in iteration 1. And so on. This _works_ but > isn't ideal. Especially figuring out whether to continue with the do/while > loop or terminate. It is better than extracting all elements in a single DoFn > since we gain parallelism in subsequent iterations. > > After reading more about SplitableDoFn, it seems like a SplitableDoFn might > be a better option for solving this problem but I'm still trying to figure > that out. A DoFn can create element/restriction pairs where the element is a > FileIO.ReadableFile and the restriction is a set of items in the Zip. I'm not > sure using the OffsetRange works here because of how the Zip format works so > may need to explore writing a custom restriction and tracker. > > On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Thank you for details! I’ll try to share some of my thoughts on this. > > Well, maybe I don’t understand something but what is a problem to have a > "flattened” PCollection as an output of your ParDo that reads input zip > file(s)? For example, if input element in a file is an email, then your DoFn > can create several outputs depending on the structure of this email. Of > course, the order of elements in output PCollection won’t be guaranteed but > all needed later information can be saved in the structure of output record > (like POJO or AvroRecord). > > Also, if I understand correctly, then on every precessing step you need to > reject some records depending on SHA from already known rejection list. So, > if it’s possible to calculate this SHA on the “Read” step for every record, > then you can use either SideInput or GroupByKey transform (where key is SHA) > to filter the records. > > Please, let me know if I missed something. > >> On 19 Jan 2021, at 18:43, Tucker Barbour <tucker.barb...@gmail.com >> <mailto:tucker.barb...@gmail.com>> wrote: >> >> My initial thought is the latter -- the output PCollection would be a >> PCollection<Record> where Record can be either an email or attachment. A >> Record would still need to have an attribute referencing its "parent". For >> example an email Record would have a unique identifier, e.g. ID, and any >> attachment Record would have a reference to it's parent email, e.g. >> parentID. However, outputting pairs might also work and may be a better >> option considering the need to maintain the relationship between a parent >> and child. We're basically building a tree. An additional wrinkle is that >> attachments may themselves have embedded items which would also need to be >> represented in the output PCollection as Records. For example, an email with >> an attachment which itself is a zip of Word documents. The structure of this >> file hierarchy is not known ahead of time. >> >> The input is expected to be a PCollection of one or more (though usually in >> the order of 10s not anything like millions) zip files or other archive file >> formats. The output is expected to be a PCollection whose elements are nodes >> in the file hierarchy. If a zip file where to have the following structure >> >> - Top Level Zip File >> `-> Email 001 >> `-> Attachment A >> `-> Embedded Document A01 >> `-> Embedded Document A02 >> `-> Attachment B >> `-> Embedded Document B01 >> `-> Email 002 >> `-> Attachment C >> `-> Embedded Document C01 >> >> We'd expect an output PCollection whose elements are: >> >> Email001, Attachment A, Embedded Document A01, Embedded Document A02, >> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded >> Document C01. >> >> We'd then perform further PTransforms on this "flattened" PCollection. >> >> >> >> >> >> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <aromanenko....@gmail.com >> <mailto:aromanenko....@gmail.com>> wrote: >> What is exactly an output PCollection in your example? Is it just a >> PCollection of pairs (email and attachment) or it’s like >> PCollection<Record>, where Record can be either email or attachment? Or it >> is something else? >> >> Could you add a simple example with expected input/output of your pipeline? >> >> > On 18 Jan 2021, at 12:26, Tucker Barbour <tucker.barb...@gmail.com >> > <mailto:tucker.barb...@gmail.com>> wrote: >> > >> > I have a use-case where I'm extracting embedded items from archive file >> > formats which themselves have embedded items. For example a zip file with >> > emails with attachments. The goal in this example would be to create a >> > PCollection where each email is an element as well as each attachment >> > being an element. (No need to create a tree structure here.) There are >> > certain criteria which would prevent continuing embedded item extraction, >> > such as an item SHA being present in a "rejection" list. The pipeline will >> > perform a series of transformations on the items and then continue to >> > extract embedded items. This type of problem lends itself to be solved >> > with an iterative algorithm. My understanding is that BEAM does not >> > support iterative algorithms to the same extent Spark does. In BEAM I >> > would have to persist the results of each iteration and instantiate a new >> > pipeline for each iteration. This _works_ though isn't ideal. The >> > "rejection" list is a PCollection of a few million elements. Re-reading >> > this "rejection" list on each iteration isn't ideal. >> > >> > Is there a way to write such an iterative algorithm in BEAM without having >> > to create a new pipeline on each iteration? Further, would something like >> > SplitableDoFn be a potential solution? Looking through the user's guide >> > and some existing implementations of SplitableDoFns I'm thinking not but >> > I'm still trying to understand SplitableDoFns. >> >