Hi Dong, Thanks for your proposal. It is a very interesting feature and also a complex one. Especially the "shotgun surgery"[1] of handling the useProcessingTime logic. While reading the FLIP, I was wondering if it possible to leverage the Visitor design pattern(not exactly use the pattern directly) to encapsulate the logic centrally in the Watermark or the Visitors, i.e. high cohesion?
Best regards, Jing [1] https://en.wikipedia.org/wiki/Shotgun_surgery On Wed, Jul 26, 2023 at 1:46 AM David Anderson <dander...@apache.org> wrote: > Dong, > > Thank you for the careful analysis of my proposal. Your conclusions > make sense to me. > > David > > On Mon, Jul 24, 2023 at 8:37 PM Dong Lin <lindon...@gmail.com> wrote: > > > > Hi David, > > > > Thank you for the detailed comments and the suggestion of this > alternative approach. > > > > I agree with you that this alternative can also address the target > use-case with the same correctness. In comparison to the current FLIP, this > alternative indeed introduces much less complexity to the Flink runtime > internal implementation. > > > > At a high level, this alternative is simulating a one-time emission of > Watermark(useProcessingTime=true) with periodic emission of > Watermark(timestamp=wall-lock-time). > > > > One downside of this alternative is that it can introduce a bit of extra > per-record runtime overhead. This is because the ingestion time watermark > will be emitted periodically according to pipeline.auto-watermark-interval > (200 ms by default). Thus there is still a short period where the watermark > from the HybridSource can be lagging behind wall-clock time. For operators > whose logic depends on the watermark, such as TemporalRowTimeJoinOperator, > they will need to check build-side watermark and delay/buffer records on > the probe-side until it receives the next ingestion-time watermark. > > > > The impact of this overhead probably depends on the throughput/watermark > of the probe-side records. On the other hand, given that join operator is > typically already heavy (due to state backend access and build-side > buffer), and the watermark from probe-side (e.g. Kafka) is probably also > lagging behind wall-clock time, it is probably not an issue in most cases. > Therefore I agree that it is worth trying this approach. We can revisit > this issue if we any issues around performance or usability of this > approach. > > > > Another potential concern is that it requires the user to use ingestion > time. I am not sure we are able to do this in a backward-compatible way > yet. We probably need to go through the existing APIs around ingestion time > watermark to validate this. > > > > BTW, with the introduction of RecordAttributes(isBacklog=true/false) > from FLIP-327, another short-term approach is to let > TemporalProcessTimeJoinOperator keep buffering records from > MySQL/HybridSource as long as isBacklog=true, and process them in a > processing-time manner once it receives isBacklog=false. This should also > address the use-case targeted by FLIP-326. The only caveat with this > approach is that it is a bit hacky, because it requires JoinOpertor to > always buffer records when isBacklog=true, whereas isBacklog's semantics > only says it is "optional" to buffer records, which can be an issue in the > long term. > > > > Thanks, > > Dong > > > > On Tue, Jul 25, 2023 at 2:37 AM David Anderson <dander...@apache.org> > wrote: > >> > >> I'm delighted to see interest in developing support for > >> processing-time temporal joins. > >> > >> The proposed implementation seems rather complex, and I'm not > >> convinced this complexity is justified/necessary. I'd like to outline > >> a simpler alternative that I think would satisfy the key objectives. > >> > >> Key ideas: > >> > >> 1. Limit support to the HybridSource (or a derivative thereof). (E.g., > >> I'm guessing the MySQL CDC Source could be reworked to be a hybrid > >> source.) > >> 2. Have this HybridSource wait to begin emitting watermarks until it > >> has handled all events from the bounded sources. (I'm not sure how the > >> HybridSource handles this now; if this is an incompatible change, we > >> can find a way to deal with that.) > >> 3. Instruct users to use an ingestion time watermarking strategy for > >> their unbounded source (the source the HybridSource handles last) if > >> they want to do something like a processing time temporal join. > >> > >> One objection to this is the limitation of only supporting the > >> HybridSource -- what about cases where the user has a single source, > >> e.g., a Kafka topic? I'm suggesting the user would divide their > >> build-side stream into two parts -- a bounded component that is fully > >> ingested by the hybrid source before watermarking begins, followed by > >> an unbounded component. > >> > >> I think this alternative handles use cases like processing-time > >> temporal join rather nicely, without requiring any changes to > >> watermarks or the core runtime. > >> > >> David > >> > >> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser < > martijnvis...@apache.org> wrote: > >> > > >> > Hi Dong and Xuannan, > >> > > >> > I'm excited to see this FLIP. I think support for processing-time > >> > temporal joins is something that the Flink users will greatly benefit > >> > off. I specifically want to call-out that it's great to see the use > >> > cases that this enables. From a technical implementation perspective, > >> > I defer to the opinion of others with expertise on this topic. > >> > > >> > Best regards, > >> > > >> > Martijn > >> > > >> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su <suxuanna...@gmail.com> > wrote: > >> > > > >> > > Hi all, > >> > > > >> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > >> > > enhance the watermark to properly support processing-time temporal > >> > > join, which has been documented in FLIP-326 [1]. > >> > > > >> > > We want to support the use case where the records from the probe > side > >> > > of the processing-time temporal join need to wait until the build > side > >> > > finishes the snapshot phrase by enhancing the expressiveness of the > >> > > Watermark. Additionally, these changes lay the groundwork for > >> > > simplifying the DataStream APIs, eliminating the need for users to > >> > > explicitly differentiate between event-time and processing-time, > >> > > resulting in a more intuitive user experience. > >> > > > >> > > Please refer to the FLIP document for more details about the > proposed > >> > > design and implementation. We welcome any feedback and opinions on > >> > > this proposal. > >> > > > >> > > Best regards, > >> > > > >> > > Dong and Xuannan > >> > > > >> > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join >