I see, thanks for the tips!

Last question about this! How could this be adapted to work in a
unbounded/streaming job? To work in an unbounded job, I need to put a
Window.into with a trigger before GroupByKey.
I guess this would mean that the "shard gets processed by a single thread
in MyDofn" guarantee will only apply to messages within a single window,
and would not apply across windows?
If this is the case, is there a better solution? I would like to avoid
buffering data in windows, and want the shard guarantee to apply across
windows.



On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote:

> Your code looks like what I was describing. My only comment would be to
> use a deterministic hashing function which is stable across JVM versions
> and JVM instances as it will help in making your pipeline consistent across
> different runs/environments.
>
> Parallelizing across 8 instances instead of 4 would break the contract
> around GroupByKey (since it didn't group all the elements for a key
> correctly). Also, each element is the smallest unit of work and
> specifically in your pipeline you have chosen to reduce all your elements
> into 4 logical elements (each containing some proportion of your original
> data).
>
> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote:
>
>> Thanks for the reply, Lukasz.
>>
>>
>> What I meant was that I want to shard my data by a "shard key", and be
>> sure that any two elements with the same "shard key" are processed by the
>> same thread on the same worker. (Or if that's not possible, by the same
>> worker JVM with no thread guarantee would be good enough). It doesn't
>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>> processing the data.
>>
>>
>> It sounds like what you suggested will work for this, with the downside
>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>
>> It seems a bit long and messy but am I right in thinking it would look
>> like this? ...
>>
>>
>> PCollection<MyElement> elements = ...;
>>
>> elements
>>
>> .apply(MapElements
>>
>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>> TypeDescriptor.of(MyElement.class)))
>>
>> .via((MyElement e) -> KV.of(
>>
>> e.getKey().toString().hashCode() % 4, e)))
>>
>> .apply(GroupByKey.create())
>>
>> .apply(Partition.of(4,
>>
>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
>> kv.getKey()))
>>
>> .apply(ParDo.of(new MyDofn()));
>>
>> // Where MyDofn must be changed to handle a KV<Integer,
>> Iterable<MyElement>> as input instead of just a MyElement
>>
>>
>> I was wondering is there a guarantee that the runner won't parallelise
>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>> input elements with the same key are they actually guaranteed to be
>> processed on the same instance?
>>
>>
>> Thanks,
>>
>> Josh
>>
>>
>>
>>
>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I think this is what your asking for but your statement about 4
>>> instances is unclear as to whether that is 4 copies of the same DoFn or 4
>>> completely different DoFns. Also its unclear what you mean by
>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn
>>> each being processed by a single thread.
>>>
>>> This is a bad idea because you limit your parallelism but this is
>>> similar to what the default file sharding logic does. In Apache Beam the
>>> smallest unit of output for a GroupByKey is a single key+iterable pair. We
>>> exploit this by assigning all our values to a fixed number of keys and then
>>> performing a GroupByKey. This is the same trick that powers the file
>>> sharding logic in AvroIO/TextIO/...
>>>
>>> Your pipeline would look like (fixed width font diagram):
>>> your data      -> apply shard key       -> GroupByKey        ->
>>> partition by key -> your dofn #1
>>>
>>>          \> your dofn #2
>>>
>>>          \> ...
>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>>
>>> This is not exactly the same as processing a single DoFn instance/thread
>>> because it relies on the Runner to be able to schedule each key to be
>>> processed on a different machine. For example a Runner may choose to
>>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
>>> choose to distribute them.
>>>
>>>
>>>
>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hey Lukasz,
>>>>
>>>> I have a follow up question about this -
>>>>
>>>> What if I want to do something very similar, but instead of with 4
>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>> of a DoFn that I've written. I want to ensure that each partition is
>>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>>
>>>> Thanks,
>>>> Josh
>>>>
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote:
>>>>
>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>
>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>
>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Lukasz,
>>>>>>>
>>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>>> I am running on Dataflow though, which dynamically sets numShards -
>>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be 
>>>>>>> sure
>>>>>>> that Dataflow isn't going to override my setting right? I guess this 
>>>>>>> should
>>>>>>> work fine as long as I partition my stream into a large enough number of
>>>>>>> partitions so that Dataflow won't override numShards.
>>>>>>>
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Since your using a small number of shards, add a Partition
>>>>>>>> transform which uses a deterministic hash of the key to choose one of 4
>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>
>>>>>>>> (Fixed width diagram below)
>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>> Becomes:
>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>
>>>>>>>>> I would like to partition the stream by some key in the element,
>>>>>>>>> so that all elements with the same key will get processed by the same 
>>>>>>>>> shard
>>>>>>>>> writer, and therefore written to the same file. Is there a way to do 
>>>>>>>>> this?
>>>>>>>>> Note that in my stream the number of keys is very large (most 
>>>>>>>>> elements have
>>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to