Actually, this is a documented known issue. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L152
On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jklu...@mozilla.com> wrote: > Indeed, I was wrong about the ValueProvider distinction. I updated that in > the JIRA. > > It's when numShards is 0 (so runner-provided sharding) vs. an explicit > number. Things work fine for explicit sharding. It's the runner-provided > sharding mode that encounters the Flatten of PCollections with conflicting > triggers. > > On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote: > >> FileIO requires an explicit numShards in unbounded mode for a number of >> reasons - one being that a trigger has to happen on a GroupByKey, and we >> need something to group on. >> >> It is extremely surprising that behavior would change between using a >> ValueProvider or not. The exact same codepath should be triggered >> regardless of whether a ValueProvider is used. >> >> Reuven >> >> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> Definitely sounds like a bug but also I want to caution you (or anyone >>> reading this archived) that there are known problems with continuation >>> triggers. A spec on continuation triggers that we missed was that they >>> really must be "compatible" (this is an arbitrary concept, having only to >>> do with Flattening two PCollections together) with their original trigger. >>> Without this, we also know that you can have three PCollections with >>> identical triggering and you can CoGroupByKey them together but you cannot >>> do this three-way join as a sequence of binary joins. >>> >>> Kenn >>> >>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com> wrote: >>> >>>> Thanks for the response, Chamikara. I filed >>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work >>>> around the problem in my case by not using a ValueProvider for numShards. >>>> >>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com> >>>> wrote: >>>> >>>>> I'm not to familiar about the exact underlying issue here but writing >>>>> unbounded input to files when using GlobalWindows for unsharded output is >>>>> a >>>>> valid usecase so sounds like a bug. Feel free to create a JIRA. >>>>> >>>>> - Cham >>>>> >>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> >>>>> wrote: >>>>> >>>>>> I've read more deeply into the WriteFiles code and I'm understanding >>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded >>>>>> input. In that case, it creates a sharded and unsharded collection; the >>>>>> first goes through one GroupByKey while the other goes through 2. These >>>>>> two >>>>>> collections are then flattened together and they have incompatible >>>>>> triggers >>>>>> due to the double-grouped collection using a continuation trigger. >>>>>> >>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I >>>>>> switch to hard coding an integer rather than passing a ValueProvider, >>>>>> WriteFiles uses a different code path that doesn't flatten collections >>>>>> and >>>>>> no exception is thrown. >>>>>> >>>>>> So, this might really be considered a bug of WriteFiles (and thus >>>>>> FileIO). But I'd love to hear other interpretations. >>>>>> >>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> >>>>>> wrote: >>>>>> >>>>>>> I'm building a pipeline that streams from Pubsub and writes to >>>>>>> files. I'm using FileIO's dynamic destinations to place elements into >>>>>>> different directories according to date and I really don't care about >>>>>>> ordering of elements beyond the date buckets. >>>>>>> >>>>>>> So, I think GlobalWindows is appropriate in this case, even though >>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a >>>>>>> trigger based on number of elements and/or processing time so that beam >>>>>>> actually writes out files periodically? >>>>>>> >>>>>>> I tried the following: >>>>>>> >>>>>>> Window.into(new GlobalWindows()) >>>>>>> .triggering(Repeatedly.forever(AfterFirst.of( >>>>>>> AfterPane.elementCountAtLeast(10000), >>>>>>> >>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>>>>>> .discardingFiredPanes() >>>>>>> >>>>>>> But it raises an exception about incompatible triggers: >>>>>>> >>>>>>> Inputs to Flatten had incompatible triggers: >>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>>>>>> >>>>>>> I believe that what's happening is that FileIO with explicit >>>>>>> numShards (required in the case of unbounded input) is forcing a >>>>>>> GroupByKey, which activates continuation triggers that are incompatible >>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to >>>>>>> flatten the incompatible PCollections together. >>>>>>> >>>>>>> >>>>>>>