Indeed, I was wrong about the ValueProvider distinction. I updated that in
the JIRA.

It's when numShards is 0 (so runner-provided sharding) vs. an explicit
number. Things work fine for explicit sharding. It's the runner-provided
sharding mode that encounters the Flatten of PCollections with conflicting
triggers.

On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:

> FileIO requires an explicit numShards in unbounded mode for a number of
> reasons - one being that a trigger has to happen on a GroupByKey, and we
> need something to group on.
>
> It is extremely surprising that behavior would change between using a
> ValueProvider or not. The exact same codepath should be triggered
> regardless of whether a ValueProvider is used.
>
> Reuven
>
> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Definitely sounds like a bug but also I want to caution you (or anyone
>> reading this archived) that there are known problems with continuation
>> triggers. A spec on continuation triggers that we missed was that they
>> really must be "compatible" (this is an arbitrary concept, having only to
>> do with Flattening two PCollections together) with their original trigger.
>> Without this, we also know that you can have three PCollections with
>> identical triggering and you can CoGroupByKey them together but you cannot
>> do this three-way join as a sequence of binary joins.
>>
>> Kenn
>>
>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>>
>>> Thanks for the response, Chamikara. I filed
>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>> around the problem in my case by not using a ValueProvider for numShards.
>>>
>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>> I'm not to familiar about the exact underlying issue here but writing
>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>
>>>> - Cham
>>>>
>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com>
>>>> wrote:
>>>>
>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>> first goes through one GroupByKey while the other goes through 2. These 
>>>>> two
>>>>> collections are then flattened together and they have incompatible 
>>>>> triggers
>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>
>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>> no exception is thrown.
>>>>>
>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>>>> directories according to date and I really don't care about ordering of
>>>>>> elements beyond the date buckets.
>>>>>>
>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>> actually writes out files periodically?
>>>>>>
>>>>>> I tried the following:
>>>>>>
>>>>>> Window.into(new GlobalWindows())
>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>   .discardingFiredPanes()
>>>>>>
>>>>>> But it raises an exception about incompatible triggers:
>>>>>>
>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>
>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>> flatten the incompatible PCollections together.
>>>>>>
>>>>>>
>>>>>>

Reply via email to