I've read more deeply into the WriteFiles code and I'm understanding now
that the exception is due to WriteFiles' attempt to handle unsharded input.
In that case, it creates a sharded and unsharded collection; the first goes
through one GroupByKey while the other goes through 2. These two
collections are then flattened together and they have incompatible triggers
due to the double-grouped collection using a continuation trigger.

I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I switch
to hard coding an integer rather than passing a ValueProvider, WriteFiles
uses a different code path that doesn't flatten collections and no
exception is thrown.

So, this might really be considered a bug of WriteFiles (and thus FileIO).
But I'd love to hear other interpretations.

On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote:

> I'm building a pipeline that streams from Pubsub and writes to files. I'm
> using FileIO's dynamic destinations to place elements into different
> directories according to date and I really don't care about ordering of
> elements beyond the date buckets.
>
> So, I think GlobalWindows is appropriate in this case, even though the
> input is unbounded. Is it possible to use GlobalWindows but set a trigger
> based on number of elements and/or processing time so that beam actually
> writes out files periodically?
>
> I tried the following:
>
> Window.into(new GlobalWindows())
>   .triggering(Repeatedly.forever(AfterFirst.of(
>     AfterPane.elementCountAtLeast(10000),
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>   .discardingFiredPanes()
>
> But it raises an exception about incompatible triggers:
>
> Inputs to Flatten had incompatible triggers:
> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>
> I believe that what's happening is that FileIO with explicit numShards
> (required in the case of unbounded input) is forcing a GroupByKey, which
> activates continuation triggers that are incompatible with my stated
> triggers. It's internals of WriteFiles that's trying to flatten the
> incompatible PCollections together.
>
>
>

Reply via email to