I think this is what your asking for but your statement about 4 instances
is unclear as to whether that is 4 copies of the same DoFn or 4 completely
different DoFns. Also its unclear what you mean by instance/thread, I'm
assuming that you want at most 4 instances of a DoFn each being processed
by a single thread.

This is a bad idea because you limit your parallelism but this is similar
to what the default file sharding logic does. In Apache Beam the smallest
unit of output for a GroupByKey is a single key+iterable pair. We exploit
this by assigning all our values to a fixed number of keys and then
performing a GroupByKey. This is the same trick that powers the file
sharding logic in AvroIO/TextIO/...

Your pipeline would look like (fixed width font diagram):
your data      -> apply shard key       -> GroupByKey        -> partition
by key -> your dofn #1

     \> your dofn #2

     \> ...
a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???

This is not exactly the same as processing a single DoFn instance/thread
because it relies on the Runner to be able to schedule each key to be
processed on a different machine. For example a Runner may choose to
process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
choose to distribute them.



On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote:

> Hey Lukasz,
>
> I have a follow up question about this -
>
> What if I want to do something very similar, but instead of with 4
> instances of AvroIO following the partition transform, I want 4 instances
> of a DoFn that I've written. I want to ensure that each partition is
> processed by a single DoFn instance/thread. Is this possible with Beam?
>
> Thanks,
> Josh
>
>
>
> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote:
>
>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>
>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Google Cloud Dataflow won't override your setting. The dynamic sharding
>>> occurs if you don't explicitly set a numShard value.
>>>
>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> Thanks for the example. That sounds like a nice solution -
>>>> I am running on Dataflow though, which dynamically sets numShards - so
>>>> if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>> that Dataflow isn't going to override my setting right? I guess this should
>>>> work fine as long as I partition my stream into a large enough number of
>>>> partitions so that Dataflow won't override numShards.
>>>>
>>>> Josh
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Since your using a small number of shards, add a Partition transform
>>>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>>>> Write each partition with a single shard.
>>>>>
>>>>> (Fixed width diagram below)
>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>> Becomes:
>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>                       |-> AvroIO(numShards = 1)
>>>>>                       |-> AvroIO(numShards = 1)
>>>>>                       \-> AvroIO(numShards = 1)
>>>>>
>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>
>>>>>> I would like to partition the stream by some key in the element, so
>>>>>> that all elements with the same key will get processed by the same shard
>>>>>> writer, and therefore written to the same file. Is there a way to do 
>>>>>> this?
>>>>>> Note that in my stream the number of keys is very large (most elements 
>>>>>> have
>>>>>> a unique key, while a few elements share a key).
>>>>>>
>>>>>> Thanks,
>>>>>> Josh
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to