Hi, all. Currently, we use two seperated interfaces SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The interface SupportsWatermarkPushDown relies on SupportsComputedColumnPushDown when watermark is defined on a computed column. During the implementation, we find the method in SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider and the duplication of SupportsComputedColumnPushDown and SupportsProjectionPushDown. Therefore, we decide to propose a new interface of SupportsWatermarkPushDown to solve the problems we mentioned.
*Problems of SupportsComputedColumnPushDown and SupportsWatermarkPushDown*Problems of SupportsWatermarkPushDown SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to register WatermarkGenerator into DynamicTableSource now. However, the community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to create watermark generators in FLIP-126. WatermarkStrategy is a factory of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it is used to generate deprecated AssignerWithPeriodicWatermarks and PunctuatedWatermarkAssignerProvider. Therefore, we think it's not suitable to use the WatermarkProvider any more. Problems of SupportsComputedColumnPushDown There are two problems around when using SupportsComputedColumnPushDown alone. First, planner will transform the computed column and query such as select a+b to a LogicalProject. When it comes to the optimization phase, we have no means to distinguish whether the Rexnode in the projection is from computed columns or query. So SupportsComputedColumnPushDown in reality will push not only the computed column but also the calculation in the query. Second, when a plan matches the rule PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to place all fields we require. However, both two rules PushComputedColumnIntoTableSourceScanRule and PushProjectIntoTableSourceScanRule will do the same work that prune the records that read from source. It seems that we have two duplicate rules in planner. But I think we should use the rule PushProjectIntoTableSourceScanRule rather than PushComputedColumnIntoTableSourceScanRule if we don't support watermark push down. Compared to PushComputedColumnIntoTableSourceScanRule, PushProjectIntoTableSourceScanRule is much lighter and we can read pruned data from source rather than use a map function in flink. Therefore, we think it's not a good idea to use two interfaces rather than one. *New Proposal* First of all, let us address some background when pushing watermarks into table source scan. There are two structures that we need to consider. We list two examples below for discussion. structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])structure 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) As we can see, structure 2 is much more complicated than structure 1. For structure 1, we can use the row from table scan to generate watermarks directly. But for structure 2, we need to calculate the rowtime expression in LogicalProject and use the result of calculation to generate watermarks. Considering that WatermarkStrategy has the ability to extract timestamp from row, we have a proposal to push only WatermarkStrategy to scan. Push WatermarkStrategy to Scan In this interface, we will only push WatermarkStrategy to DynamicTableSource . public interface SupportsWatermarkPushDown { void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } The advantage of the new api is that it's very simple for the developers of Connector. They only need to take WatermarkStrategy into consideration and don't need to deal with other infos such as ComputedColumnConverter in SupportsComputedColumnPushDown. But it also has one disadvantage that it needs to calculate the rowtime expression again in LogicalProjection because we don't build a new row in scan to store the calculated timestamp. However, we can replace the calculation of rowtime in LogicalProjection with a reference to eliminate duplicate calculation, which will use the StreamRecord's getter to read the timestamp that is calculated before. But this optimization still has one limitation that it relies on computed columns are not defined on other computed columns. For nested computed columns, we have no place to save the intermediate result. But we still have a problem that when we push an udf into the source, we need a context as powerful as FunctionContext to open the udf. But the current WatermarkGeneratorSupplier.Context only supports method getMetricGroup and misses methods getCachedFile, getJobParameter and g etExternalResourceInfos, which means we can't convert the WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering that the udf is only used to generate watermark, we suggest to throw UnsupportedException when invoking the methods exist in FunctionContext but don't exist in WatermarkGeneratorSupplier.Context. But we have to admit that there are risks in doing so because we have no promise that the udf will not invoke these methods. *Summary* We have addressed the whole problem and solution in detail. As a conclusion, I think the new interface avoids the problems we mentioned before. It has a clear definition as its name tells and has nothing in common with SupportProjectionPushDown. As for its disadvantage, I think it's acceptable to calculate the rowtime column twice and we also have a plan to improve its efficiency as a follow up if it brings some performance problems.