I'm not to familiar about the exact underlying issue here but writing unbounded input to files when using GlobalWindows for unsharded output is a valid usecase so sounds like a bug. Feel free to create a JIRA.
- Cham On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> wrote: > I've read more deeply into the WriteFiles code and I'm understanding now > that the exception is due to WriteFiles' attempt to handle unsharded input. > In that case, it creates a sharded and unsharded collection; the first goes > through one GroupByKey while the other goes through 2. These two > collections are then flattened together and they have incompatible triggers > due to the double-grouped collection using a continuation trigger. > > I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I > switch to hard coding an integer rather than passing a ValueProvider, > WriteFiles uses a different code path that doesn't flatten collections and > no exception is thrown. > > So, this might really be considered a bug of WriteFiles (and thus FileIO). > But I'd love to hear other interpretations. > > On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote: > >> I'm building a pipeline that streams from Pubsub and writes to files. I'm >> using FileIO's dynamic destinations to place elements into different >> directories according to date and I really don't care about ordering of >> elements beyond the date buckets. >> >> So, I think GlobalWindows is appropriate in this case, even though the >> input is unbounded. Is it possible to use GlobalWindows but set a trigger >> based on number of elements and/or processing time so that beam actually >> writes out files periodically? >> >> I tried the following: >> >> Window.into(new GlobalWindows()) >> .triggering(Repeatedly.forever(AfterFirst.of( >> AfterPane.elementCountAtLeast(10000), >> >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >> .discardingFiredPanes() >> >> But it raises an exception about incompatible triggers: >> >> Inputs to Flatten had incompatible triggers: >> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >> >> I believe that what's happening is that FileIO with explicit numShards >> (required in the case of unbounded input) is forcing a GroupByKey, which >> activates continuation triggers that are incompatible with my stated >> triggers. It's internals of WriteFiles that's trying to flatten the >> incompatible PCollections together. >> >> >>