Actually, this is a documented known issue.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L152

On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jklu...@mozilla.com> wrote:

> Indeed, I was wrong about the ValueProvider distinction. I updated that in
> the JIRA.
>
> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
> number. Things work fine for explicit sharding. It's the runner-provided
> sharding mode that encounters the Flatten of PCollections with conflicting
> triggers.
>
> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>
>> FileIO requires an explicit numShards in unbounded mode for a number of
>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>> need something to group on.
>>
>> It is extremely surprising that behavior would change between using a
>> ValueProvider or not. The exact same codepath should be triggered
>> regardless of whether a ValueProvider is used.
>>
>> Reuven
>>
>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>> reading this archived) that there are known problems with continuation
>>> triggers. A spec on continuation triggers that we missed was that they
>>> really must be "compatible" (this is an arbitrary concept, having only to
>>> do with Flattening two PCollections together) with their original trigger.
>>> Without this, we also know that you can have three PCollections with
>>> identical triggering and you can CoGroupByKey them together but you cannot
>>> do this three-way join as a sequence of binary joins.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>>>
>>>> Thanks for the response, Chamikara. I filed
>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>
>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <chamik...@google.com>
>>>> wrote:
>>>>
>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>> unbounded input to files when using GlobalWindows for unsharded output is 
>>>>> a
>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>
>>>>> - Cham
>>>>>
>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>> first goes through one GroupByKey while the other goes through 2. These 
>>>>>> two
>>>>>> collections are then flattened together and they have incompatible 
>>>>>> triggers
>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>
>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>> WriteFiles uses a different code path that doesn't flatten collections 
>>>>>> and
>>>>>> no exception is thrown.
>>>>>>
>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>> different directories according to date and I really don't care about
>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>
>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>> actually writes out files periodically?
>>>>>>>
>>>>>>> I tried the following:
>>>>>>>
>>>>>>> Window.into(new GlobalWindows())
>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>   .discardingFiredPanes()
>>>>>>>
>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>
>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>
>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>> flatten the incompatible PCollections together.
>>>>>>>
>>>>>>>
>>>>>>>

Reply via email to