I don't believe that the issue is Flink specific but rather that Flink is one example of many potential examples. Enforcing that watermark updates can only happen at bundle boundaries would ensure that any data buffered while processing a single bundle in a DoFn could be output ON_TIME, especially without any need for a TimerSpec to explicitly hold the watermark for that purpose. This is in reference to data buffered within a single bundle, and not cross-bundle buffering such as in the case of GroupIntoBatches.
Take for example a PCollection with 1 second Fixed windowing. The PCollection holds payload bodies for an external API to which requests will be made. A hypothetical runner creates a bundle with Element A and Element B where Element A belongs to the window [0:00:01, 0:00:02) and Element B belongs to the window [0:00:02, 0:00:03). Assume that the DoFn is going to buffer all elements in a bundle so as to generate fewer round-trip requests to the external API, and then output the corresponding responses. The following is a high-level order of events that could result in data being labelled as LATE: 1. Watermark is 0:00:00 2. DoFn receives the bundle containing both Element A and Element B 3. Element A is processed by the DoFn, buffering in-memory and returning/completing 4. Watermark is (maybe) updated after having processed the element; let's assume in this example it is in fact updated to 0:00:02 5. Element B (from the same bundle) is processed by the DoFn 6. It's the end of the bundle, so now the in-memory buffered entities are used to make a request to external API 7. The API responses are gathered and intended to be output to the same window from which the corresponding element with request data originated (Element A and Element B carried this data) 8. The response data associated with the request payload found in Element A is output with the timestamp of Element A i.e. something in the range of Element A's window [0:00:01, 0:00:02) 9. The data in the prior step is considered LATE, strictly as a result of updating the watermark to 0:00:02 in Step 4 above If Step 4 was moved to be the last step in the process (i.e. at the bundle boundary) this issue would be avoided. I would also argue that updating the watermark only after receiving a response for an input Element is a more accurate depiction of having completed processing for the element. All that said, I could buy the argument that the above description might represent an anti-pattern of sorts where response data should actually be output with a timestamp corresponding to its receipt rather than the timestamp of its corresponding input element carrying the request body. - Evan On Mon, Mar 28, 2022 at 11:07 AM Reuven Lax <re...@google.com> wrote: > I agree with you that changing on-time elements to late elements is > incorrect, however I don't quite understand why doing things on bundle > boundaries helps. Is this specific to Flink? > > > > On Mon, Mar 28, 2022 at 1:07 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Robert, >> >> I had the same impression that holding the watermark between bundles is >> actually not part of the computational model. Now the question is - >> should it be? >> >> As you said, buffering and emitting in-memory buffered data means >> possibly outputting data that arrived as ON_TIME, but is outputted as >> LATE (or droppable, which is even worse). My understanding is that this >> is why there is the (deprecated) getAllowedTimestampSkew() method of >> DoFn, but that only bypasses the check, does not solve the issue (which >> is why it is deprecated, I suppose). I strongly believe that outputting >> elements that switch from ON_TIME to LATE is a correctness bug, because >> it has the potential to violate causality (which is strongly >> counter-intuitive in our universe :)). For some pipelines it can >> definitely cause incorrect outputs. >> >> If we could ensure the output watermark gets updated only between >> @FinishBundle and @StartBundle call then this problem would go away. I >> looked into the code of FlinkRunner and it seems to me that we could >> quite easily ensure this by not outputting watermark when a bundle is >> open and output it once it finishes. I didn't dig into that too deep, so >> I don't know if there would be any caveats, the question is apparently, >> if we could make these guarantees for other runners as well and if we >> could sensibly create a @ValidatesRunner test. >> >> WDYT? >> >> Jan >> >> On 3/25/22 23:06, Robert Bradshaw wrote: >> > I do not think there is a hard and fast rule about updating watermarks >> > only at bundle boundaries. This seems perfectly legal for a pure 1:1 >> > mapping DoFn. The issue is that DoFns are allowed to buffer data and >> > emit them in a later process (or finishBundle). If the watermark has >> > moved on, that may result in late data. We don't really have a way for >> > a DoFn to declare *it's* output watermark (i.e. "I promise not to emit >> > any data before this timestamp.") >> > >> > On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin <egal...@apache.org> wrote: >> >> Thanks for starting this thread Jan, I'm keen to hear thoughts and >> outcomes! I thought I would mention that answers to the questions posed >> here will help to unblock a 2.38.0 release blocker[1]. >> >> >> >> [1] https://issues.apache.org/jira/browse/BEAM-14064 >> >> >> >> On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi, >> >>> >> >>> this is follow-up thread started from [1]. In the thread there is >> mentioned multiple times that (in stateless ParDo), the output watermark is >> allowed to advance only on bundle boundaries [2]. Essentially that would >> mean that anything in between calls to @StartBundle and @FinishBundle would >> be processed in single instant in (output) event-time. This makes perfect >> sense. >> >>> >> >>> The issue is that it seems that not all runners actually implement >> this behavior. FlinkRunner for instance does not have a "natural" concept >> of bundles and those are created in a more ad-hoc way to adhere with the >> DoFn life-cycle (see [3]). Watermark updates and elements are completely >> interleaved without any synchronization with bundle "open" or "close". If >> watermark updates are allowed to happen only on boundaries of bundles, then >> this seems to break this contract. >> >>> >> >>> The question therefore is - should we consider FlinkRunner as >> non-compliant with this aspect of the Apache Beam model or is this an >> "optional" part that runners are free to implement at will? In the case of >> the former, do we miss some @ValidatesRunner tests for this? >> >>> >> >>> Jan >> >>> >> >>> [1] https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db >> >>> >> >>> [2] https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50 >> >>> >> >>> [3] >> https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786 >> >>> >> >>> >> >