That looks like exactly what I needed. Thanks! -Steve
On Mon, Apr 15, 2019 at 3:42 PM Addison Higham <[email protected]> wrote: > Hi Steven, > > Usually, what you want to do is something like this: > > Instead of a `SourceFunction` use a `RichParallelSourceFunction` and as an > argument to that function, you might have a list of prefixes you want to > consume in parallel. > > The `RichParallelSourceFunction` has a a method called > `getRuntimeContext()` and the `RuntimeContext` has some methods called > `getIndexOfThisSubtask()` and ` getNumberOfParallelSubtasks()` which tell > you the number of which subtask your current task is and the total number > of subtasks. This should allow you to deterministically split up the list > of prefixes, so that each parallel worker gets a number of prefixes. You > can then use some state to store progress (in the case of s3, probably as > the last record read or something) and then use that for dealing with > failure. > > Hopefully that makes sense and gives you an idea, but yes, the general > idea is that you just use the subtask number to do a division of a static > set of data that can get passed in the constructor (and will be serialized > out to all the parallel instances). > > > > On Mon, Apr 15, 2019 at 9:16 AM Steven Nelson <[email protected]> > wrote: > >> I am working on a process to do some compaction of files in S3. I read a >> bucket full of files key them, pull them all into a window, then remove >> older versions of the file. The files are not organized inside the bucket, >> they are simply name by guid. I can iterate them using a custom Source that >> just does a listObjects/listNextBatchOfObjects. The source emits ObjectKeys >> from. The problem is that right now I need to only have one source running >> at a time in order to ensure that I only get each file once. What I would >> like to do is have parallelism on the source where the sources are able to >> pick a file prefix like 00 or A6 and use that for listObjects. This would >> allow me to emit more filenames downstream. I could build some sort of >> process to use a DB to track partition ownership, but I am hoping there is >> a better (or already implemented) solution. Any ideas? >> >> -Steve >> >
