Hi Xintong, Thanks for your comments.
However, my major concern after reading this FLIP is that, the current > design might be too complicated. It tries take all possible kinds of events > (timestamp watermark, end-of-data, end-of-partition, internal watermark, > and arbitrary user defined watermarks) into consideration, which > complicates the design when it comes to serialization and propagation. IMHO, this feature, the ability to send custom events across operators > along the data streams, has the potential to become a major differentiator > of DataStream API V2 comparing to V1. For such a feature, I don't think > it's feasible to design everything properly at the very beginning without > enough user feedbacks. I'd suggest to start with a smaller scope, and build > the feature incrementally as new demands and feedbacks arise. I think there is a misunderstanding. The scope of this FLIP is not to encapsulate all events under generalized watermarks. Instead, it encapsulates timestamp watermarks and enables users/runtime to generate/send custom generalized watermarks. I updated the FLIP accordingly [1]. Concrete use cases are usually helpful for designing such general > mechanism. You may examine the design by trying to use it to fulfill the > demands from the use cases. In cases you are looking for such use cases in > addition to the event-time watermaks, here are some inputs. > - In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from > source, and use that information for various improvements. In DataStream > API V1, such information is carried by RecordAttributes, which is quite > similar to the genralized watermark except that we do not allow defining > arbitrary custom attributes. > - In Flink CDC, there are two phases, scaning the table at certain time > point, and cosuming the binlog from that time point. In the first phase, > there's only +I but no +U/-U/-D in the changelog, and downstream operators > can do many optimizations based on that information. We haven't bring those > optimizations to the community, because that requires the runtime layer to > understand the concept of table / sql changelogs. If we can send custom > events accross operators, without requiring runtime to understand those > events, the problem would be solved. > - In processing-time temporal join, the join operator does not wait for the > build side to complete before consuming the probe side data. This is > because the build side is contineously updated and the join operator does > not know when the initial build is finished. The result is that, initial > data from the probe side that should be joined with initial data from the > build side are missed. If we can send a signal from the build side source > to the join operator, notifying about the completion of initial build, the > problem would be solved. Similar to the previous case, such information > should not be understood by the runtime layer. - As mentioned, the scope of this FLIP is not to bring other (except time based watermarks) events into the generalized watermark framework, but establish a framework, support customization of generalized watermarks. I added the above use-cases to the FLIP as motivation for a such framework. I think we should at least include the following information: > - non-data events / records / indicators > - flow along the data streams > - can be generated / handled by process functions, connectors, and the > framework > - may need to be aligned across parallel data streams - Definitely agree. I think we will do these specializations once we integrate other events into our framework (which is out of the scope of this FLIP). Requiring users to always implement serializations for custom watermarks > might be a bit too heavy. Alternatively, we may consider only support > primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types > are proved necessary in future, we can introduce STRING or BYTES so that > users can do custom serde by themselves. Another benefit of using primitive types is that, it simplifies the > alignment semantics. Currently in this FLIP, users are required to > implement a WatermarkCombiner, which is not trivil. If we only support > limited types, we can (maybe only) provide built-in combiners for users, > e.g., ALL / ANY for BOOLEAN, GREATEST / LEAST for LONG, etc. Combiners for > STRING and BYTES are a bit more complicated, that's why I don't recommend > to support them in the first version. - Agreed. As an example, I added predefined watermarks (e.g., LONG) section and gave an example for LONG [2] in the FLIP. IMHO, the above answers also address your other comments. In addition, I agree with your comments about connectors/source being able to send generalized watermarks. I also added support for this in the FLIP [3]. Thanks again for your feedback. Hope my replies answer your questions. Regards, Jeyhun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks#FLIP467:IntroduceGeneralizedWatermarks-1.Generalizedwatermark-definition [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks#FLIP467:IntroduceGeneralizedWatermarks-PredefinedWatermarks [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks#FLIP467:IntroduceGeneralizedWatermarks-Source/Connectorssendgeneralizedwatermarks On Sat, Jul 6, 2024 at 4:49 PM Xintong Song <tonysong...@gmail.com> wrote: > Hi Jeyhun, > > Thanks for working on this FLIP. > > In general, I think it's a good idea to generalize the concept of Watermark > to not only representing the advancing of event time, but general > indicators / events / signals that need to be passed along the data > streams. So +1 for working towards this direction. > > However, my major concern after reading this FLIP is that, the current > design might be too complicated. It tries take all possible kinds of events > (timestamp watermark, end-of-data, end-of-partition, internal watermark, > and arbitrary user defined watermarks) into consideration, which > complicates the design when it comes to serialization and propagation. > > IMHO, this feature, the ability to send custom events across operators > along the data streams, has the potential to become a major differentiator > of DataStream API V2 comparing to V1. For such a feature, I don't think > it's feasible to design everything properly at the very beginning without > enough user feedbacks. I'd suggest to start with a smaller scope, and build > the feature incrementally as new demands and feedbacks arise. > > To be specific, I'd suggest to: > 1. Only focus on user facing events, thus Watermarks that are either > generated or handled by user codes (process functions and connectors). > Refactor of existing internal events does not bring any benefit to users, > and may even unstablize existing mechanisms. We could do that incrementally > after the generalized watermark mechsnism becomes stable. > 2. Start with a limited set of supported data types and propagation > strategies. We can add suport for arbitrary types and strategies later, if > proved necessary. By that time, we should be able to better understand the > use cases based on real feedbacks. > 3. Try to minimize the set of concepts and apis that users need to > understand and work with, and make them simple and easy to understand. I'm > not saying we should not discuss designs of internal implementations in > this FLIP. Just it would be easier to understand the FLIP if it presents > first how users should understand and use the feature, then the key > internal designs in order to achieve that. > > # Some detailed suggestions > > ## Use cases > > Concrete use cases are usually helpful for designing such general > mechanism. You may examine the design by trying to use it to fulfill the > demands from the use cases. In cases you are looking for such use cases in > addition to the event-time watermaks, here are some inputs. > - In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from > source, and use that information for various improvements. In DataStream > API V1, such information is carried by RecordAttributes, which is quite > similar to the genralized watermark except that we do not allow defining > arbitrary custom attributes. > - In Flink CDC, there are two phases, scaning the table at certain time > point, and cosuming the binlog from that time point. In the first phase, > there's only +I but no +U/-U/-D in the changelog, and downstream operators > can do many optimizations based on that information. We haven't bring those > optimizations to the community, because that requires the runtime layer to > understand the concept of table / sql changelogs. If we can send custom > events accross operators, without requiring runtime to understand those > events, the problem would be solved. > - In processing-time temporal join, the join operator does not wait for the > build side to complete before consuming the probe side data. This is > because the build side is contineously updated and the join operator does > not know when the initial build is finished. The result is that, initial > data from the probe side that should be joined with initial data from the > build side are missed. If we can send a signal from the build side source > to the join operator, notifying about the completion of initial build, the > problem would be solved. Similar to the previous case, such information > should not be understood by the runtime layer. > > ## Watermark Definition > > The FLIP defines the new generalized Watermak as "indicators in data > streams", which is a bit too general. > > I think we should at least include the following information: > - non-data events / records / indicators > - flow along the data streams > - can be generated / handled by process functions, connectors, and the > framework > - may need to be aligned across parallel data streams > > ## Types of Watermarks > > Requiring users to always implement serializations for custom watermarks > might be a bit too heavy. Alternatively, we may consider only support > primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types > are proved necessary in future, we can introduce STRING or BYTES so that > users can do custom serde by themselves. > > Another benefit of using primitive types is that, it simplifies the > alignment semantics. Currently in this FLIP, users are required to > implement a WatermarkCombiner, which is not trivil. If we only support > limited types, we can (maybe only) provide built-in combiners for users, > e.g., ALL / ANY for BOOLEAN, GREATEST / LEAST for LONG, etc. Combiners for > STRING and BYTES are a bit more complicated, that's why I don't recommend > to support them in the first version. > > The primitive types should already cover existing use cases. > - event-time watermark, LONG, LEAST (means we should return the earliest > timestamp among all channels) > - is-processing-backlog (data freshness), BOOLEAN, ALL (means we should > return true only if values recieved from all channels are all true) > - insert-only (changelog that only contains +I), BOOLEAN, ALL > - initial-build completed, BOOLEAN, ALL > > ## Blocking Alignment? > > In addition to providing built-in combiners, another thing that might be > declared is whether we should block the channel for aligning. > > I think this is not needed for all the above listed use cases. > > The only two examples that I can think of are: > - checkpoint barrier > - schema evolution in flink cdc, where no data of the new schema is allowed > to be processed before data of the old schema from all channels are > drained. > > Both of them are not user-facing and already have existing solutions. So > maybe we can exclude this from the first version. > > ## Declaration and Handling > > It is a bit confusing that a process function (as shown in TLDR/Example) > has three methods: onWatermark(), watermarkPolicy() and > declareWatermarks(). What are the differences between onWatermark() and > watermarkPolicy()? > > IIUC, a process function should only do three things about the watermarks: > - Declare watermarks that it may generate > - Emit watermarks > - Handle received watermarks > > Given that watermark is emitted via WatermarkManager.emitWatermark(), there > should be only two methods on ProcessFunction, for declaring and handling > respectively. The current onWatermark() and watermarkPolicy() can be > combined by making onWatermark returns PEEK / POP to indicate whether the > current watermark should be further handled by the framework. > > Moreover, supporting custom watermarks means there's no guarantee that all > operators and the framework knows how to handle the watermark. The default > behavior should be defined when declaring the watermark. E.g., a changelog > stream that contains only +I records going through an AGG operator may > result in a changelog stream that contains -U/+U. Therefore, if the > operators and framework don't know how to handle an insert-only watermark, > the safest play is to stop propagating the watermark. > > Best, > > Xintong > > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517 > > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data > > [3] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag > > > > On Wed, Jul 3, 2024 at 6:08 AM Jeyhun Karimov <je.kari...@gmail.com> > wrote: > > > Hi devs, > > > > I'd like to start a discussion about FLIP-467: Introduce Generalized > > Watermarks [1] . > > This is another sub-FLIP of DataStream API V2 [2]. > > > > > > After this FLIP one can declare generalized (custom) watermarks and > define > > their custom propagation and alignment process. This FLIP opens new > > prospects to simplify "signal"ing mechanisms inside the Flink runtime and > > at the same time reveals new use-cases. > > > > You can find more details in the FLIP [1]. > > Looking forward to hearing your comments, thanks! > > > > > > Best regards, > > Jeyhun > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > >