It is indeed well documented that numShards is required for unbounded input. And I do believe that a helpful error is thrown in the case of unbounded input and runner-determined sharding.
I do believe there's still a bug here; it's just wandered quite a bit from the original title of the thread. The title should now be "Exception when using custom triggering and runner-determined file sharding". I was seeing the IllegalStateException in a unit test when I tried to compile my pipeline with the custom triggering. That unit test exercised *bounded* file input and numShards=0. In bounded mode, it would still be useful to be able to limit file sizes via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But elementCountAtLeast will emit a continuation trigger that trips the Flatten problem for runner-determined sharding. On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax <re...@google.com> wrote: > Ah, > > numShards = 0 is explicitly not supported in unbounded mode today, for the > reason mentioned above. If FileIO doesn't reject the pipeline in that case, > we should fix that. > > Reuven > > On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jklu...@mozilla.com> wrote: > >> Indeed, I was wrong about the ValueProvider distinction. I updated that >> in the JIRA. >> >> It's when numShards is 0 (so runner-provided sharding) vs. an explicit >> number. Things work fine for explicit sharding. It's the runner-provided >> sharding mode that encounters the Flatten of PCollections with conflicting >> triggers. >> >> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote: >> >>> FileIO requires an explicit numShards in unbounded mode for a number of >>> reasons - one being that a trigger has to happen on a GroupByKey, and we >>> need something to group on. >>> >>> It is extremely surprising that behavior would change between using a >>> ValueProvider or not. The exact same codepath should be triggered >>> regardless of whether a ValueProvider is used. >>> >>> Reuven >>> >>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Definitely sounds like a bug but also I want to caution you (or anyone >>>> reading this archived) that there are known problems with continuation >>>> triggers. A spec on continuation triggers that we missed was that they >>>> really must be "compatible" (this is an arbitrary concept, having only to >>>> do with Flattening two PCollections together) with their original trigger. >>>> Without this, we also know that you can have three PCollections with >>>> identical triggering and you can CoGroupByKey them together but you cannot >>>> do this three-way join as a sequence of binary joins. >>>> >>>> Kenn >>>> >>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jklu...@mozilla.com> >>>> wrote: >>>> >>>>> Thanks for the response, Chamikara. I filed >>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work >>>>> around the problem in my case by not using a ValueProvider for numShards. >>>>> >>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> I'm not to familiar about the exact underlying issue here but writing >>>>>> unbounded input to files when using GlobalWindows for unsharded output >>>>>> is a >>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA. >>>>>> >>>>>> - Cham >>>>>> >>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jklu...@mozilla.com> >>>>>> wrote: >>>>>> >>>>>>> I've read more deeply into the WriteFiles code and I'm understanding >>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded >>>>>>> input. In that case, it creates a sharded and unsharded collection; the >>>>>>> first goes through one GroupByKey while the other goes through 2. These >>>>>>> two >>>>>>> collections are then flattened together and they have incompatible >>>>>>> triggers >>>>>>> due to the double-grouped collection using a continuation trigger. >>>>>>> >>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I >>>>>>> switch to hard coding an integer rather than passing a ValueProvider, >>>>>>> WriteFiles uses a different code path that doesn't flatten collections >>>>>>> and >>>>>>> no exception is thrown. >>>>>>> >>>>>>> So, this might really be considered a bug of WriteFiles (and thus >>>>>>> FileIO). But I'd love to hear other interpretations. >>>>>>> >>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jklu...@mozilla.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I'm building a pipeline that streams from Pubsub and writes to >>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into >>>>>>>> different directories according to date and I really don't care about >>>>>>>> ordering of elements beyond the date buckets. >>>>>>>> >>>>>>>> So, I think GlobalWindows is appropriate in this case, even though >>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a >>>>>>>> trigger based on number of elements and/or processing time so that beam >>>>>>>> actually writes out files periodically? >>>>>>>> >>>>>>>> I tried the following: >>>>>>>> >>>>>>>> Window.into(new GlobalWindows()) >>>>>>>> .triggering(Repeatedly.forever(AfterFirst.of( >>>>>>>> AfterPane.elementCountAtLeast(10000), >>>>>>>> >>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>>>>>>> .discardingFiredPanes() >>>>>>>> >>>>>>>> But it raises an exception about incompatible triggers: >>>>>>>> >>>>>>>> Inputs to Flatten had incompatible triggers: >>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>>>>>>> >>>>>>>> I believe that what's happening is that FileIO with explicit >>>>>>>> numShards (required in the case of unbounded input) is forcing a >>>>>>>> GroupByKey, which activates continuation triggers that are incompatible >>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to >>>>>>>> flatten the incompatible PCollections together. >>>>>>>> >>>>>>>> >>>>>>>>