Definitely sounds like a bug but also I want to caution you (or anyone reading this archived) that there are known problems with continuation triggers. A spec on continuation triggers that we missed was that they really must be "compatible" (this is an arbitrary concept, having only to do with Flattening two PCollections together) with their original trigger. Without this, we also know that you can have three PCollections with identical triggering and you can CoGroupByKey them together but you cannot do this three-way join as a sequence of binary joins.
Kenn On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com> wrote: > Thanks for the response, Chamikara. I filed > https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work > around the problem in my case by not using a ValueProvider for numShards. > > On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> I'm not to familiar about the exact underlying issue here but writing >> unbounded input to files when using GlobalWindows for unsharded output is a >> valid usecase so sounds like a bug. Feel free to create a JIRA. >> >> - Cham >> >> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> wrote: >> >>> I've read more deeply into the WriteFiles code and I'm understanding now >>> that the exception is due to WriteFiles' attempt to handle unsharded input. >>> In that case, it creates a sharded and unsharded collection; the first goes >>> through one GroupByKey while the other goes through 2. These two >>> collections are then flattened together and they have incompatible triggers >>> due to the double-grouped collection using a continuation trigger. >>> >>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I >>> switch to hard coding an integer rather than passing a ValueProvider, >>> WriteFiles uses a different code path that doesn't flatten collections and >>> no exception is thrown. >>> >>> So, this might really be considered a bug of WriteFiles (and thus >>> FileIO). But I'd love to hear other interpretations. >>> >>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote: >>> >>>> I'm building a pipeline that streams from Pubsub and writes to files. >>>> I'm using FileIO's dynamic destinations to place elements into different >>>> directories according to date and I really don't care about ordering of >>>> elements beyond the date buckets. >>>> >>>> So, I think GlobalWindows is appropriate in this case, even though the >>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger >>>> based on number of elements and/or processing time so that beam actually >>>> writes out files periodically? >>>> >>>> I tried the following: >>>> >>>> Window.into(new GlobalWindows()) >>>> .triggering(Repeatedly.forever(AfterFirst.of( >>>> AfterPane.elementCountAtLeast(10000), >>>> >>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>>> .discardingFiredPanes() >>>> >>>> But it raises an exception about incompatible triggers: >>>> >>>> Inputs to Flatten had incompatible triggers: >>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>>> >>>> I believe that what's happening is that FileIO with explicit numShards >>>> (required in the case of unbounded input) is forcing a GroupByKey, which >>>> activates continuation triggers that are incompatible with my stated >>>> triggers. It's internals of WriteFiles that's trying to flatten the >>>> incompatible PCollections together. >>>> >>>> >>>>