I'm not to familiar about the exact underlying issue here but writing
unbounded input to files when using GlobalWindows for unsharded output is a
valid usecase so sounds like a bug. Feel free to create a JIRA.

- Cham

On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> wrote:

> I've read more deeply into the WriteFiles code and I'm understanding now
> that the exception is due to WriteFiles' attempt to handle unsharded input.
> In that case, it creates a sharded and unsharded collection; the first goes
> through one GroupByKey while the other goes through 2. These two
> collections are then flattened together and they have incompatible triggers
> due to the double-grouped collection using a continuation trigger.
>
> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
> switch to hard coding an integer rather than passing a ValueProvider,
> WriteFiles uses a different code path that doesn't flatten collections and
> no exception is thrown.
>
> So, this might really be considered a bug of WriteFiles (and thus FileIO).
> But I'd love to hear other interpretations.
>
> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>
>> I'm building a pipeline that streams from Pubsub and writes to files. I'm
>> using FileIO's dynamic destinations to place elements into different
>> directories according to date and I really don't care about ordering of
>> elements beyond the date buckets.
>>
>> So, I think GlobalWindows is appropriate in this case, even though the
>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>> based on number of elements and/or processing time so that beam actually
>> writes out files periodically?
>>
>> I tried the following:
>>
>> Window.into(new GlobalWindows())
>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>     AfterPane.elementCountAtLeast(10000),
>>
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>   .discardingFiredPanes()
>>
>> But it raises an exception about incompatible triggers:
>>
>> Inputs to Flatten had incompatible triggers:
>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>
>> I believe that what's happening is that FileIO with explicit numShards
>> (required in the case of unbounded input) is forcing a GroupByKey, which
>> activates continuation triggers that are incompatible with my stated
>> triggers. It's internals of WriteFiles that's trying to flatten the
>> incompatible PCollections together.
>>
>>
>>

Reply via email to