I see, thanks for the tips! Last question about this! How could this be adapted to work in a unbounded/streaming job? To work in an unbounded job, I need to put a Window.into with a trigger before GroupByKey. I guess this would mean that the "shard gets processed by a single thread in MyDofn" guarantee will only apply to messages within a single window, and would not apply across windows? If this is the case, is there a better solution? I would like to avoid buffering data in windows, and want the shard guarantee to apply across windows.
On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote: > Your code looks like what I was describing. My only comment would be to > use a deterministic hashing function which is stable across JVM versions > and JVM instances as it will help in making your pipeline consistent across > different runs/environments. > > Parallelizing across 8 instances instead of 4 would break the contract > around GroupByKey (since it didn't group all the elements for a key > correctly). Also, each element is the smallest unit of work and > specifically in your pipeline you have chosen to reduce all your elements > into 4 logical elements (each containing some proportion of your original > data). > > On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote: > >> Thanks for the reply, Lukasz. >> >> >> What I meant was that I want to shard my data by a "shard key", and be >> sure that any two elements with the same "shard key" are processed by the >> same thread on the same worker. (Or if that's not possible, by the same >> worker JVM with no thread guarantee would be good enough). It doesn't >> actually matter to me whether there's 1 or 4 or 100 DoFn instances >> processing the data. >> >> >> It sounds like what you suggested will work for this, with the downside >> of me needing to choose a number of shards/DoFns (e.g. 4). >> >> It seems a bit long and messy but am I right in thinking it would look >> like this? ... >> >> >> PCollection<MyElement> elements = ...; >> >> elements >> >> .apply(MapElements >> >> .into(TypeDescriptors.kvs(TypeDescriptors.integers(), >> TypeDescriptor.of(MyElement.class))) >> >> .via((MyElement e) -> KV.of( >> >> e.getKey().toString().hashCode() % 4, e))) >> >> .apply(GroupByKey.create()) >> >> .apply(Partition.of(4, >> >> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) -> >> kv.getKey())) >> >> .apply(ParDo.of(new MyDofn())); >> >> // Where MyDofn must be changed to handle a KV<Integer, >> Iterable<MyElement>> as input instead of just a MyElement >> >> >> I was wondering is there a guarantee that the runner won't parallelise >> the final MyDofn across e.g. 8 instances instead of 4? If there are two >> input elements with the same key are they actually guaranteed to be >> processed on the same instance? >> >> >> Thanks, >> >> Josh >> >> >> >> >> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote: >> >>> I think this is what your asking for but your statement about 4 >>> instances is unclear as to whether that is 4 copies of the same DoFn or 4 >>> completely different DoFns. Also its unclear what you mean by >>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn >>> each being processed by a single thread. >>> >>> This is a bad idea because you limit your parallelism but this is >>> similar to what the default file sharding logic does. In Apache Beam the >>> smallest unit of output for a GroupByKey is a single key+iterable pair. We >>> exploit this by assigning all our values to a fixed number of keys and then >>> performing a GroupByKey. This is the same trick that powers the file >>> sharding logic in AvroIO/TextIO/... >>> >>> Your pipeline would look like (fixed width font diagram): >>> your data -> apply shard key -> GroupByKey -> >>> partition by key -> your dofn #1 >>> >>> \> your dofn #2 >>> >>> \> ... >>> a / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ??? >>> >>> This is not exactly the same as processing a single DoFn instance/thread >>> because it relies on the Runner to be able to schedule each key to be >>> processed on a different machine. For example a Runner may choose to >>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may >>> choose to distribute them. >>> >>> >>> >>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote: >>> >>>> Hey Lukasz, >>>> >>>> I have a follow up question about this - >>>> >>>> What if I want to do something very similar, but instead of with 4 >>>> instances of AvroIO following the partition transform, I want 4 instances >>>> of a DoFn that I've written. I want to ensure that each partition is >>>> processed by a single DoFn instance/thread. Is this possible with Beam? >>>> >>>> Thanks, >>>> Josh >>>> >>>> >>>> >>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote: >>>> >>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz! >>>>> >>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Google Cloud Dataflow won't override your setting. The dynamic >>>>>> sharding occurs if you don't explicitly set a numShard value. >>>>>> >>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote: >>>>>> >>>>>>> Hi Lukasz, >>>>>>> >>>>>>> Thanks for the example. That sounds like a nice solution - >>>>>>> I am running on Dataflow though, which dynamically sets numShards - >>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be >>>>>>> sure >>>>>>> that Dataflow isn't going to override my setting right? I guess this >>>>>>> should >>>>>>> work fine as long as I partition my stream into a large enough number of >>>>>>> partitions so that Dataflow won't override numShards. >>>>>>> >>>>>>> Josh >>>>>>> >>>>>>> >>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Since your using a small number of shards, add a Partition >>>>>>>> transform which uses a deterministic hash of the key to choose one of 4 >>>>>>>> partitions. Write each partition with a single shard. >>>>>>>> >>>>>>>> (Fixed width diagram below) >>>>>>>> Pipeline -> AvroIO(numShards = 4) >>>>>>>> Becomes: >>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1) >>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>> \-> AvroIO(numShards = 1) >>>>>>>> >>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream >>>>>>>>> (withWindowedWrites, hourly windows, numShards=4). >>>>>>>>> >>>>>>>>> I would like to partition the stream by some key in the element, >>>>>>>>> so that all elements with the same key will get processed by the same >>>>>>>>> shard >>>>>>>>> writer, and therefore written to the same file. Is there a way to do >>>>>>>>> this? >>>>>>>>> Note that in my stream the number of keys is very large (most >>>>>>>>> elements have >>>>>>>>> a unique key, while a few elements share a key). >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Josh >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >