It is indeed well documented that numShards is required for unbounded
input. And I do believe that a helpful error is thrown in the case of
unbounded input and runner-determined sharding.

I do believe there's still a bug here; it's just wandered quite a bit from
the original title of the thread. The title should now be "Exception when
using custom triggering and runner-determined file sharding".

I was seeing the IllegalStateException in a unit test when I tried to
compile my pipeline with the custom triggering. That unit test exercised
*bounded* file input and numShards=0.

In bounded mode, it would still be useful to be able to limit file sizes
via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But
elementCountAtLeast will emit a continuation trigger that trips the Flatten
problem for runner-determined sharding.


On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax <re...@google.com> wrote:

> Ah,
>
> numShards = 0 is explicitly not supported in unbounded mode today, for the
> reason mentioned above. If FileIO doesn't reject the pipeline in that case,
> we should fix that.
>
> Reuven
>
> On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>
>> Indeed, I was wrong about the ValueProvider distinction. I updated that
>> in the JIRA.
>>
>> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
>> number. Things work fine for explicit sharding. It's the runner-provided
>> sharding mode that encounters the Flatten of PCollections with conflicting
>> triggers.
>>
>> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>>
>>> FileIO requires an explicit numShards in unbounded mode for a number of
>>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>>> need something to group on.
>>>
>>> It is extremely surprising that behavior would change between using a
>>> ValueProvider or not. The exact same codepath should be triggered
>>> regardless of whether a ValueProvider is used.
>>>
>>> Reuven
>>>
>>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>>> reading this archived) that there are known problems with continuation
>>>> triggers. A spec on continuation triggers that we missed was that they
>>>> really must be "compatible" (this is an arbitrary concept, having only to
>>>> do with Flattening two PCollections together) with their original trigger.
>>>> Without this, we also know that you can have three PCollections with
>>>> identical triggering and you can CoGroupByKey them together but you cannot
>>>> do this three-way join as a sequence of binary joins.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com>
>>>> wrote:
>>>>
>>>>> Thanks for the response, Chamikara. I filed
>>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>>> unbounded input to files when using GlobalWindows for unsharded output 
>>>>>> is a
>>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>>
>>>>>> - Cham
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>>> first goes through one GroupByKey while the other goes through 2. These 
>>>>>>> two
>>>>>>> collections are then flattened together and they have incompatible 
>>>>>>> triggers
>>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>>
>>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>>> WriteFiles uses a different code path that doesn't flatten collections 
>>>>>>> and
>>>>>>> no exception is thrown.
>>>>>>>
>>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>>
>>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>>> different directories according to date and I really don't care about
>>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>>
>>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>>> actually writes out files periodically?
>>>>>>>>
>>>>>>>> I tried the following:
>>>>>>>>
>>>>>>>> Window.into(new GlobalWindows())
>>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>>
>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>>   .discardingFiredPanes()
>>>>>>>>
>>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>>
>>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>>
>>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>>> flatten the incompatible PCollections together.
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to