Hmm ok, I don't quite get why what I want to do isn't supported in Beam ... I 
don't actually have a limited parallelism requirement, I just want to be able 
to partition my unbounded stream by a key determined from the elements, so that 
any two elements with the same key will be routed to the same worker. I want to 
do this because my DoFn keeps some in-memory cached state for each key (which I 
was planning to store at either DoFn or JVM level). Does this sound like a bad 
idea? 


> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote:
> 
> Your right, the window acts as a secondary key within GroupByKey 
> (KeyA,Window1 != KeyA,Window2), which means that each of those two composite 
> keys can be scheduled to execute at the same time.
> 
> At this point I think you should challenge your limited parallelism 
> requirement as you'll need to build something outside of Apache Beam to 
> provide these parallelization limits across windows (e.g. lock within the 
> same process when limiting yourself to a single machine, distributed lock 
> service when dealing with multiple machines).
> 
> The backlog of data is either going to grow infinitely at the GroupByKey or 
> grow infinitely at the source if your pipeline can't keep up. It is up to the 
> Runner to be smart and not produce a giant backlog at the GroupByKey since it 
> knows how fast work is being completed (unfortunately I don't know if any 
> Runner is this smart yet to push the backlog up to the source).
> 
>> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jof...@gmail.com> wrote:
>> I see, thanks for the tips!
>> 
>> Last question about this! How could this be adapted to work in a 
>> unbounded/streaming job? To work in an unbounded job, I need to put a 
>> Window.into with a trigger before GroupByKey. 
>> I guess this would mean that the "shard gets processed by a single thread in 
>> MyDofn" guarantee will only apply to messages within a single window, and 
>> would not apply across windows?
>> If this is the case, is there a better solution? I would like to avoid 
>> buffering data in windows, and want the shard guarantee to apply across 
>> windows.
>> 
>> 
>> 
>>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:
>>> Your code looks like what I was describing. My only comment would be to use 
>>> a deterministic hashing function which is stable across JVM versions and 
>>> JVM instances as it will help in making your pipeline consistent across 
>>> different runs/environments.
>>> 
>>> Parallelizing across 8 instances instead of 4 would break the contract 
>>> around GroupByKey (since it didn't group all the elements for a key 
>>> correctly). Also, each element is the smallest unit of work and 
>>> specifically in your pipeline you have chosen to reduce all your elements 
>>> into 4 logical elements (each containing some proportion of your original 
>>> data).
>>> 
>>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote:
>>>> Thanks for the reply, Lukasz. 
>>>> 
>>>> What I meant was that I want to shard my data by a "shard key", and be 
>>>> sure that any two elements with the same "shard key" are processed by the 
>>>> same thread on the same worker. (Or if that's not possible, by the same 
>>>> worker JVM with no thread guarantee would be good enough). It doesn't 
>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances 
>>>> processing the data.
>>>> 
>>>> It sounds like what you suggested will work for this, with the downside of 
>>>> me needing to choose a number of shards/DoFns (e.g. 4).
>>>> It seems a bit long and messy but am I right in thinking it would look 
>>>> like this? ...
>>>> 
>>>> PCollection<MyElement> elements = ...;
>>>> elements
>>>>             .apply(MapElements
>>>>                 .into(TypeDescriptors.kvs(TypeDescriptors.integers(), 
>>>> TypeDescriptor.of(MyElement.class)))
>>>>                 .via((MyElement e) -> KV.of(
>>>>                     e.getKey().toString().hashCode() % 4, e)))
>>>>             .apply(GroupByKey.create())
>>>>             .apply(Partition.of(4,
>>>>                 (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) 
>>>> (kv, i) -> kv.getKey()))
>>>>             .apply(ParDo.of(new MyDofn()));
>>>> // Where MyDofn must be changed to handle a KV<Integer, 
>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>> 
>>>> I was wondering is there a guarantee that the runner won't parallelise the 
>>>> final MyDofn across e.g. 8 instances instead of 4? If there are two input 
>>>> elements with the same key are they actually guaranteed to be processed on 
>>>> the same instance?
>>>> 
>>>> Thanks,
>>>> Josh
>>>> 
>>>> 
>>>> 
>>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>> I think this is what your asking for but your statement about 4 instances 
>>>>> is unclear as to whether that is 4 copies of the same DoFn or 4 
>>>>> completely different DoFns. Also its unclear what you mean by 
>>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn 
>>>>> each being processed by a single thread.
>>>>> 
>>>>> This is a bad idea because you limit your parallelism but this is similar 
>>>>> to what the default file sharding logic does. In Apache Beam the smallest 
>>>>> unit of output for a GroupByKey is a single key+iterable pair. We exploit 
>>>>> this by assigning all our values to a fixed number of keys and then 
>>>>> performing a GroupByKey. This is the same trick that powers the file 
>>>>> sharding logic in AvroIO/TextIO/... 
>>>>> 
>>>>> Your pipeline would look like (fixed width font diagram):
>>>>> your data      -> apply shard key       -> GroupByKey        -> partition 
>>>>> by key -> your dofn #1
>>>>>                                                                           
>>>>>        \> your dofn #2
>>>>>                                                                           
>>>>>        \> ...
>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>>> 
>>>>> This is not exactly the same as processing a single DoFn instance/thread 
>>>>> because it relies on the Runner to be able to schedule each key to be 
>>>>> processed on a different machine. For example a Runner may choose to 
>>>>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may 
>>>>> choose to distribute them.
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote:
>>>>>> Hey Lukasz,
>>>>>> 
>>>>>> I have a follow up question about this - 
>>>>>> 
>>>>>> What if I want to do something very similar, but instead of with 4 
>>>>>> instances of AvroIO following the partition transform, I want 4 
>>>>>> instances of a DoFn that I've written. I want to ensure that each 
>>>>>> partition is processed by a single DoFn instance/thread. Is this 
>>>>>> possible with Beam?
>>>>>> 
>>>>>> Thanks,
>>>>>> Josh
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote:
>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>> 
>>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic 
>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>> 
>>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote:
>>>>>>>>> Hi Lukasz,
>>>>>>>>> 
>>>>>>>>> Thanks for the example. That sounds like a nice solution - 
>>>>>>>>> I am running on Dataflow though, which dynamically sets numShards - 
>>>>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't 
>>>>>>>>> be sure that Dataflow isn't going to override my setting right? I 
>>>>>>>>> guess this should work fine as long as I partition my stream into a 
>>>>>>>>> large enough number of partitions so that Dataflow won't override 
>>>>>>>>> numShards.
>>>>>>>>> 
>>>>>>>>> Josh
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> 
>>>>>>>>>> wrote:
>>>>>>>>>> Since your using a small number of shards, add a Partition transform 
>>>>>>>>>> which uses a deterministic hash of the key to choose one of 4 
>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>> 
>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>> Becomes:
>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>> 
>>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream 
>>>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>> 
>>>>>>>>>>> I would like to partition the stream by some key in the element, so 
>>>>>>>>>>> that all elements with the same key will get processed by the same 
>>>>>>>>>>> shard writer, and therefore written to the same file. Is there a 
>>>>>>>>>>> way to do this? Note that in my stream the number of keys is very 
>>>>>>>>>>> large (most elements have a unique key, while a few elements share 
>>>>>>>>>>> a key).
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Josh
> 

Reply via email to