I've read more deeply into the WriteFiles code and I'm understanding now that the exception is due to WriteFiles' attempt to handle unsharded input. In that case, it creates a sharded and unsharded collection; the first goes through one GroupByKey while the other goes through 2. These two collections are then flattened together and they have incompatible triggers due to the double-grouped collection using a continuation trigger.
I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I switch to hard coding an integer rather than passing a ValueProvider, WriteFiles uses a different code path that doesn't flatten collections and no exception is thrown. So, this might really be considered a bug of WriteFiles (and thus FileIO). But I'd love to hear other interpretations. On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote: > I'm building a pipeline that streams from Pubsub and writes to files. I'm > using FileIO's dynamic destinations to place elements into different > directories according to date and I really don't care about ordering of > elements beyond the date buckets. > > So, I think GlobalWindows is appropriate in this case, even though the > input is unbounded. Is it possible to use GlobalWindows but set a trigger > based on number of elements and/or processing time so that beam actually > writes out files periodically? > > I tried the following: > > Window.into(new GlobalWindows()) > .triggering(Repeatedly.forever(AfterFirst.of( > AfterPane.elementCountAtLeast(10000), > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) > .discardingFiredPanes() > > But it raises an exception about incompatible triggers: > > Inputs to Flatten had incompatible triggers: > Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), > Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), > AfterSynchronizedProcessingTime.pastFirstElementInPane())) > > I believe that what's happening is that FileIO with explicit numShards > (required in the case of unbounded input) is forcing a GroupByKey, which > activates continuation triggers that are incompatible with my stated > triggers. It's internals of WriteFiles that's trying to flatten the > incompatible PCollections together. > > >