Hi, all. Currently, we use two seperated interfaces
SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The
interface SupportsWatermarkPushDown relies on
SupportsComputedColumnPushDown when
watermark is defined on a computed column. During the implementation, we
find the method in SupportsWatermarkPushDown uses an out-of-date interface
WatermarkProvider and the duplication of SupportsComputedColumnPushDown and
SupportsProjectionPushDown. Therefore, we decide to propose a new interface
of SupportsWatermarkPushDown to solve the problems we mentioned.


*Problems of SupportsComputedColumnPushDown and
SupportsWatermarkPushDown*Problems
of SupportsWatermarkPushDown

SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to
register WatermarkGenerator into DynamicTableSource now. However, the
community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
create watermark generators in FLIP-126. WatermarkStrategy is a factory of
TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses the
method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
is used to generate deprecated AssignerWithPeriodicWatermarks and
PunctuatedWatermarkAssignerProvider. Therefore, we think it's not suitable
to use the WatermarkProvider any more.


Problems of SupportsComputedColumnPushDown

There are two problems around when using SupportsComputedColumnPushDown
 alone.

First, planner will transform the computed column and query such as select
a+b to a LogicalProject. When it comes to the optimization phase, we have
no means to distinguish whether the Rexnode in the projection is from
computed columns or query. So SupportsComputedColumnPushDown in reality
will push not only the computed column but also the calculation in the
query.

Second, when a plan matches the rule
PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to
place all fields we require. However, both two rules
PushComputedColumnIntoTableSourceScanRule and
PushProjectIntoTableSourceScanRule will do the same work that prune the
records that read from source. It seems that we have two duplicate rules in
planner. But I think we should use the rule
PushProjectIntoTableSourceScanRule rather than
PushComputedColumnIntoTableSourceScanRule if we don't support watermark
push down. Compared to PushComputedColumnIntoTableSourceScanRule,
PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
data from source rather than use a map function in flink.

Therefore, we think it's not a good idea to use two interfaces rather than
one.


*New Proposal*

First of all, let us address some background when pushing watermarks into
table source scan. There are two structures that we need to consider. We
list two examples below for discussion.

structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])structure
2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
5000:INTERVAL SECOND)])   +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])

As we can see, structure 2 is much more complicated than structure 1. For
structure 1, we can use the row from table scan to generate watermarks
directly. But for structure 2, we need to calculate the rowtime expression
in LogicalProject and use the result of calculation to generate watermarks.
Considering that WatermarkStrategy has the ability to extract timestamp
from row, we have a proposal to push only WatermarkStrategy to scan.


Push WatermarkStrategy to Scan

In this interface, we will only push WatermarkStrategy to DynamicTableSource
.

public interface SupportsWatermarkPushDown {        void
applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }

The advantage of the new api is that it's very simple for the developers of
Connector. They only need to take WatermarkStrategy into consideration and
don't need to deal with other infos such as ComputedColumnConverter in
SupportsComputedColumnPushDown. But it also has one disadvantage that it
needs to calculate the rowtime expression again in LogicalProjection
because we don't build a new row in scan to store the calculated timestamp.
However, we can replace the calculation of rowtime in LogicalProjection
with a reference to eliminate duplicate calculation, which will use the
StreamRecord's getter to read the timestamp that is calculated before. But
this optimization still has one limitation that it relies on computed
columns are not defined on other computed columns. For nested computed
columns, we have no place to save the intermediate result.

But we still have a problem that when we push an udf into the source, we
need a context as powerful as FunctionContext to open the udf. But the
current WatermarkGeneratorSupplier.Context only supports method
getMetricGroup and misses methods getCachedFile, getJobParameter and g
etExternalResourceInfos, which means we can't convert the
WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
that the udf is only used to generate watermark, we suggest to throw
UnsupportedException when invoking the methods exist in FunctionContext but
don't exist in WatermarkGeneratorSupplier.Context. But we have to admit
that there are risks in doing so because we have no promise that the udf
will not invoke these methods.


*Summary*

We have addressed the whole problem and solution in detail. As a
conclusion, I think the new interface avoids the problems we mentioned
before. It has a clear definition as its name tells and has nothing in
common with SupportProjectionPushDown. As for its disadvantage, I think
it's acceptable to calculate the rowtime column twice and we also have a
plan to improve its efficiency as a follow up if it brings some performance
problems.

Reply via email to