I think this is what your asking for but your statement about 4 instances is unclear as to whether that is 4 copies of the same DoFn or 4 completely different DoFns. Also its unclear what you mean by instance/thread, I'm assuming that you want at most 4 instances of a DoFn each being processed by a single thread.
This is a bad idea because you limit your parallelism but this is similar to what the default file sharding logic does. In Apache Beam the smallest unit of output for a GroupByKey is a single key+iterable pair. We exploit this by assigning all our values to a fixed number of keys and then performing a GroupByKey. This is the same trick that powers the file sharding logic in AvroIO/TextIO/... Your pipeline would look like (fixed width font diagram): your data -> apply shard key -> GroupByKey -> partition by key -> your dofn #1 \> your dofn #2 \> ... a / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ??? This is not exactly the same as processing a single DoFn instance/thread because it relies on the Runner to be able to schedule each key to be processed on a different machine. For example a Runner may choose to process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may choose to distribute them. On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote: > Hey Lukasz, > > I have a follow up question about this - > > What if I want to do something very similar, but instead of with 4 > instances of AvroIO following the partition transform, I want 4 instances > of a DoFn that I've written. I want to ensure that each partition is > processed by a single DoFn instance/thread. Is this possible with Beam? > > Thanks, > Josh > > > > On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote: > >> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz! >> >> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote: >> >>> Google Cloud Dataflow won't override your setting. The dynamic sharding >>> occurs if you don't explicitly set a numShard value. >>> >>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote: >>> >>>> Hi Lukasz, >>>> >>>> Thanks for the example. That sounds like a nice solution - >>>> I am running on Dataflow though, which dynamically sets numShards - so >>>> if I set numShards to 1 on each of those AvroIO writers, I can't be sure >>>> that Dataflow isn't going to override my setting right? I guess this should >>>> work fine as long as I partition my stream into a large enough number of >>>> partitions so that Dataflow won't override numShards. >>>> >>>> Josh >>>> >>>> >>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Since your using a small number of shards, add a Partition transform >>>>> which uses a deterministic hash of the key to choose one of 4 partitions. >>>>> Write each partition with a single shard. >>>>> >>>>> (Fixed width diagram below) >>>>> Pipeline -> AvroIO(numShards = 4) >>>>> Becomes: >>>>> Pipeline -> Partition --> AvroIO(numShards = 1) >>>>> |-> AvroIO(numShards = 1) >>>>> |-> AvroIO(numShards = 1) >>>>> \-> AvroIO(numShards = 1) >>>>> >>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream >>>>>> (withWindowedWrites, hourly windows, numShards=4). >>>>>> >>>>>> I would like to partition the stream by some key in the element, so >>>>>> that all elements with the same key will get processed by the same shard >>>>>> writer, and therefore written to the same file. Is there a way to do >>>>>> this? >>>>>> Note that in my stream the number of keys is very large (most elements >>>>>> have >>>>>> a unique key, while a few elements share a key). >>>>>> >>>>>> Thanks, >>>>>> Josh >>>>>> >>>>> >>>>> >>>> >>> >> >