Indeed, I was wrong about the ValueProvider distinction. I updated that in the JIRA.
It's when numShards is 0 (so runner-provided sharding) vs. an explicit number. Things work fine for explicit sharding. It's the runner-provided sharding mode that encounters the Flatten of PCollections with conflicting triggers. On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote: > FileIO requires an explicit numShards in unbounded mode for a number of > reasons - one being that a trigger has to happen on a GroupByKey, and we > need something to group on. > > It is extremely surprising that behavior would change between using a > ValueProvider or not. The exact same codepath should be triggered > regardless of whether a ValueProvider is used. > > Reuven > > On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <k...@apache.org> wrote: > >> Definitely sounds like a bug but also I want to caution you (or anyone >> reading this archived) that there are known problems with continuation >> triggers. A spec on continuation triggers that we missed was that they >> really must be "compatible" (this is an arbitrary concept, having only to >> do with Flattening two PCollections together) with their original trigger. >> Without this, we also know that you can have three PCollections with >> identical triggering and you can CoGroupByKey them together but you cannot >> do this three-way join as a sequence of binary joins. >> >> Kenn >> >> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com> wrote: >> >>> Thanks for the response, Chamikara. I filed >>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work >>> around the problem in my case by not using a ValueProvider for numShards. >>> >>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> I'm not to familiar about the exact underlying issue here but writing >>>> unbounded input to files when using GlobalWindows for unsharded output is a >>>> valid usecase so sounds like a bug. Feel free to create a JIRA. >>>> >>>> - Cham >>>> >>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> >>>> wrote: >>>> >>>>> I've read more deeply into the WriteFiles code and I'm understanding >>>>> now that the exception is due to WriteFiles' attempt to handle unsharded >>>>> input. In that case, it creates a sharded and unsharded collection; the >>>>> first goes through one GroupByKey while the other goes through 2. These >>>>> two >>>>> collections are then flattened together and they have incompatible >>>>> triggers >>>>> due to the double-grouped collection using a continuation trigger. >>>>> >>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I >>>>> switch to hard coding an integer rather than passing a ValueProvider, >>>>> WriteFiles uses a different code path that doesn't flatten collections and >>>>> no exception is thrown. >>>>> >>>>> So, this might really be considered a bug of WriteFiles (and thus >>>>> FileIO). But I'd love to hear other interpretations. >>>>> >>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> >>>>> wrote: >>>>> >>>>>> I'm building a pipeline that streams from Pubsub and writes to files. >>>>>> I'm using FileIO's dynamic destinations to place elements into different >>>>>> directories according to date and I really don't care about ordering of >>>>>> elements beyond the date buckets. >>>>>> >>>>>> So, I think GlobalWindows is appropriate in this case, even though >>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a >>>>>> trigger based on number of elements and/or processing time so that beam >>>>>> actually writes out files periodically? >>>>>> >>>>>> I tried the following: >>>>>> >>>>>> Window.into(new GlobalWindows()) >>>>>> .triggering(Repeatedly.forever(AfterFirst.of( >>>>>> AfterPane.elementCountAtLeast(10000), >>>>>> >>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>>>>> .discardingFiredPanes() >>>>>> >>>>>> But it raises an exception about incompatible triggers: >>>>>> >>>>>> Inputs to Flatten had incompatible triggers: >>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>>>>> >>>>>> I believe that what's happening is that FileIO with explicit >>>>>> numShards (required in the case of unbounded input) is forcing a >>>>>> GroupByKey, which activates continuation triggers that are incompatible >>>>>> with my stated triggers. It's internals of WriteFiles that's trying to >>>>>> flatten the incompatible PCollections together. >>>>>> >>>>>> >>>>>>