Hi We are trying to get a clear understanding of how Beam Behaves when streaming, but without explicit windowing applied.
The Use Case -> * A Message arrives on PubSub that contains the uri's of a set of files in GCS that must be processed together. * The files are read from GCS using FileIO (we need to do some initial parsing of each file) * (DoFn) We emit text elements based on Regex * (DoFn) The text elements are further parsed and now turned into KVs * We perform Joins / CoGroupByKey / GroupByKeys on the KVs * (DoFn) Create elements to be written to BigQuery * Store the elements into BQ (different rows/ tables etc.) It can almost be seen as a micro batch being executed per incoming message, they are not massive and the delay in starting up Dataflow batch JSONs for these (1 to 2 minutes) introduces unnecessary latency. (We want to process these "microbatches" in a few seconds at most). There is no business concept of time based windowing. The window really is per input file from PubSub. Where we are getting a bit unstuck is that writing this pipeline, with no triggers at all it works as expected (well we've not tested edge cases etc.) but this is unexpected because: of the documentation stating "If you are using unbounded PCollections, you must use either non-global windowing or an aggregation trigger in order to perform a GroupByKey or CoGroupByKey. This is because a bounded GroupByKey or CoGroupByKey must wait for all the data with a certain key to be collected, but with unbounded collections, the data is unlimited. Windowing and/or triggers allow grouping to operate on logical, finite bundles of data within the unbounded data streams." Which to my mind indicates that a GroupByKey in a streaming pipeline with no Explicit Triggers in a global window (we aren't assigning any windows explicitly either) should error. But it does not. So the questions then arise: * Why can we do the GroupByKeys/Joins etc. without triggers? (Why is it not erroring) * When Performing joins / GroupByKeys etc. what will be included? will it only be elements from the Source message -> messages read etc? or can elements from other Source messages be included (if they have the same keys). * Essentially, if we do nothing (or if we trigger once per input message, will only elements from that trigger * Would a trigger such as AfterPane.elementCountAtLeast(1), potentially combine multiple elements from different source messages (impulsed), and this can potentially be managed by adding something to our Keys, that is unique to every source message? If someone could help clarifying the boundaries here it would be super helpful Thanks so much, Stephan PS. Locally I'm cheating with: PCollection<String> filepatterns = p.apply(GenerateSequence.from(0).to(9).withRate(1, Duration.standardSeconds(1))) .apply(MapElements.into(TypeDescriptors.strings()).via(input -> { return String.format("/c:/Users/Stephan/dev/word-count-beam/small-%s.txt", input); })); PCollection<KV<String, String>> kvCollection = filepatterns.apply(FileIO.matchAll() .withEmptyMatchTreatment(DISALLOW)) .apply(FileIO.readMatches()) Which triggers the reading of files via a sequence and not PubSub, so we aren't introducing any watermarking/time dimensions