That looks like exactly what I needed. Thanks!

-Steve

On Mon, Apr 15, 2019 at 3:42 PM Addison Higham <[email protected]> wrote:

> Hi Steven,
>
> Usually, what you want to do is something like this:
>
> Instead of a `SourceFunction` use a `RichParallelSourceFunction` and as an
> argument to that function, you might have a list of prefixes you want to
> consume in parallel.
>
> The `RichParallelSourceFunction` has a a method called
> `getRuntimeContext()` and the `RuntimeContext` has some methods called
> `getIndexOfThisSubtask()` and ` getNumberOfParallelSubtasks()` which tell
> you the number of which subtask your current task is and the total number
> of subtasks. This should allow you to deterministically split up the list
> of prefixes, so that each parallel worker gets a number of prefixes. You
> can then use some state to store progress (in the case of s3, probably as
> the last record read or something) and then use that for dealing with
> failure.
>
> Hopefully that makes sense and gives you an idea, but yes, the general
> idea is that you just use the subtask number to do a division of a static
> set of data that can get passed in the constructor (and will be serialized
> out to all the parallel instances).
>
>
>
> On Mon, Apr 15, 2019 at 9:16 AM Steven Nelson <[email protected]>
> wrote:
>
>> I am working on a process to do some compaction of files in S3. I read a
>> bucket full of files key them, pull them all into a window, then remove
>> older versions of the file. The files are not organized inside the bucket,
>> they are simply name by guid. I can iterate them using a custom Source that
>> just does a listObjects/listNextBatchOfObjects. The source emits ObjectKeys
>> from. The problem is that right now I need to only have one source running
>> at a time in order to ensure that I only get each file once. What I would
>> like to do is have parallelism on the source where the sources are able to
>> pick a file prefix like 00 or A6 and use that for listObjects. This would
>> allow me to emit more filenames downstream. I could build some sort of
>> process to use a DB to track partition ownership, but I am hoping there is
>> a better (or already implemented) solution. Any ideas?
>>
>> -Steve
>>
>

Reply via email to