Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces and solutions.
I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much cleaner and easier to develop than before. So I'm +1 to the proposal. Best, Jark On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fskm...@gmail.com> wrote: > Hi, all. It seems the format is not normal. So I add a google doc in > link[1]. This discussion is about the interfaces in FLIP-95: New Table > Source And Table Sink and propose to merge two interfaces > SupportsWatermarkPushDown and SupportsComputedColumnPushDown. > > I am looking forward to any opinions and suggestions from the community. > > Regards, > Shengkai > > [1] > > https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# > > Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道: > > > Hi, all. Currently, we use two seperated interfaces > > SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. > The > > interface SupportsWatermarkPushDown relies on > > SupportsComputedColumnPushDown when watermark is defined on a computed > > column. During the implementation, we find the method in > > SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider > > and the duplication of SupportsComputedColumnPushDown and > > SupportsProjectionPushDown. Therefore, we decide to propose a new > > interface of SupportsWatermarkPushDown to solve the problems we > mentioned. > > > > > > *Problems of SupportsComputedColumnPushDown and > SupportsWatermarkPushDown*Problems > > of SupportsWatermarkPushDown > > > > SupportsWatermarkPushDown uses an inner interface named > WatermarkProvider to > > register WatermarkGenerator into DynamicTableSource now. However, the > > community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to > > create watermark generators in FLIP-126. WatermarkStrategy is a factory > > of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses > > the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate > > Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it > > is used to generate deprecated AssignerWithPeriodicWatermarks and > > PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > > suitable to use the WatermarkProvider any more. > > > > > > Problems of SupportsComputedColumnPushDown > > > > There are two problems around when using SupportsComputedColumnPushDown > > alone. > > > > First, planner will transform the computed column and query such as > select > > a+b to a LogicalProject. When it comes to the optimization phase, we have > > no means to distinguish whether the Rexnode in the projection is from > > computed columns or query. So SupportsComputedColumnPushDown in reality > > will push not only the computed column but also the calculation in the > > query. > > > > Second, when a plan matches the rule > > PushComputedColumnIntoTableSourceScanRule, we have to build a new > RowData to > > place all fields we require. However, both two rules > > PushComputedColumnIntoTableSourceScanRule and > > PushProjectIntoTableSourceScanRule will do the same work that prune the > > records that read from source. It seems that we have two duplicate rules > in > > planner. But I think we should use the rule > > PushProjectIntoTableSourceScanRule rather than > > PushComputedColumnIntoTableSourceScanRule if we don't support watermark > > push down. Compared to PushComputedColumnIntoTableSourceScanRule, > > PushProjectIntoTableSourceScanRule is much lighter and we can read pruned > > data from source rather than use a map function in flink. > > > > Therefore, we think it's not a good idea to use two interfaces rather > than > > one. > > > > > > *New Proposal* > > > > First of all, let us address some background when pushing watermarks into > > table source scan. There are two structures that we need to consider. We > > list two examples below for discussion. > > > > structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, > 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, > default_database, MyTable]])structure > 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL > SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL > SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, > MyTable]]) > > > > As we can see, structure 2 is much more complicated than structure 1. For > > structure 1, we can use the row from table scan to generate watermarks > > directly. But for structure 2, we need to calculate the rowtime > expression > > in LogicalProject and use the result of calculation to generate > > watermarks. Considering that WatermarkStrategy has the ability to extract > > timestamp from row, we have a proposal to push only WatermarkStrategy to > > scan. > > > > > > Push WatermarkStrategy to Scan > > > > In this interface, we will only push WatermarkStrategy to > > DynamicTableSource. > > > > public interface SupportsWatermarkPushDown { void > applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > > > > The advantage of the new api is that it's very simple for the developers > > of Connector. They only need to take WatermarkStrategy into consideration > > and don't need to deal with other infos such as ComputedColumnConverter > > in SupportsComputedColumnPushDown. But it also has one disadvantage that > > it needs to calculate the rowtime expression again in LogicalProjection > > because we don't build a new row in scan to store the calculated > timestamp. > > However, we can replace the calculation of rowtime in LogicalProjection > > with a reference to eliminate duplicate calculation, which will use the > > StreamRecord's getter to read the timestamp that is calculated before. > But > > this optimization still has one limitation that it relies on computed > > columns are not defined on other computed columns. For nested computed > > columns, we have no place to save the intermediate result. > > > > But we still have a problem that when we push an udf into the source, we > > need a context as powerful as FunctionContext to open the udf. But the > > current WatermarkGeneratorSupplier.Context only supports method > > getMetricGroup and misses methods getCachedFile, getJobParameter and g > > etExternalResourceInfos, which means we can't convert the > > WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering > > that the udf is only used to generate watermark, we suggest to throw > > UnsupportedException when invoking the methods exist in FunctionContext > > but don't exist in WatermarkGeneratorSupplier.Context. But we have to > > admit that there are risks in doing so because we have no promise that > the > > udf will not invoke these methods. > > > > > > *Summary* > > > > We have addressed the whole problem and solution in detail. As a > > conclusion, I think the new interface avoids the problems we mentioned > > before. It has a clear definition as its name tells and has nothing in > > common with SupportProjectionPushDown. As for its disadvantage, I think > > it's acceptable to calculate the rowtime column twice and we also have a > > plan to improve its efficiency as a follow up if it brings some > performance > > problems. > > > > > > >