Hi Shengkai,

first of I would not consider the section Problems of SupportsWatermarkPushDown" as a "problem". It was planned to update the WatermarkProvider once the interfaces are ready. See the comment in WatermarkProvider:

// marker interface that will be filled after FLIP-126:
// WatermarkGenerator<RowData> getWatermarkGenerator();

So far we had no sources that actually implement WatermarkStrategy.

Second, for generating watermarks I don't see a problem in merging the two mentioned interfaces SupportsWatermarkPushDown and SupportsComputedColumnPushDown into one. The descibed design sounds reasonable to me and the impact on performance should not be too large.

However, by merging these two interfaces we are also merging two completely separate concepts. Computed columns are not always used for generating a rowtime or watermark. Users can and certainly will implement more complex logic in there. One example could be decrypting encrypted records/columns, performing checksum checks, reading metadata etc.

So in any case we should still provide two interfaces:

SupportsWatermarkPushDown (functionality of computed columns + watermarks)


SupportsComputedColumnPushDown (functionality of computed columns only)

I'm fine with such a design, but it is also confusing for implementers that SupportsWatermarkPushDown includes the functionality of the other interface.

What do you think?

Regards,
Timo


On 08.09.20 04:32, Jark Wu wrote:
Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
and solutions.

I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much
cleaner and easier to develop than before.
So I'm +1 to the proposal.

Best,
Jark

On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fskm...@gmail.com> wrote:

Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95:  New Table
Source And Table Sink and propose to merge two interfaces
SupportsWatermarkPushDown and SupportsComputedColumnPushDown.

I am looking forward to any opinions and suggestions from the community.

Regards,
Shengkai

[1]

https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#

Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道:

    Hi, all. Currently, we use two seperated interfaces
SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
The
interface SupportsWatermarkPushDown relies on
SupportsComputedColumnPushDown when watermark is defined on a computed
column. During the implementation, we find the method in
SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
and the duplication of SupportsComputedColumnPushDown and
SupportsProjectionPushDown. Therefore, we decide to propose a new
interface of SupportsWatermarkPushDown to solve the problems we
mentioned.


*Problems of SupportsComputedColumnPushDown and
SupportsWatermarkPushDown*Problems
of SupportsWatermarkPushDown

SupportsWatermarkPushDown uses an inner interface named
WatermarkProvider to
register WatermarkGenerator into DynamicTableSource now. However, the
community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
create watermark generators in FLIP-126. WatermarkStrategy is a factory
of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
is used to generate deprecated AssignerWithPeriodicWatermarks and
PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
suitable to use the WatermarkProvider any more.


Problems of SupportsComputedColumnPushDown

There are two problems around when using SupportsComputedColumnPushDown
  alone.

First, planner will transform the computed column and query such as
select
a+b to a LogicalProject. When it comes to the optimization phase, we have
no means to distinguish whether the Rexnode in the projection is from
computed columns or query. So SupportsComputedColumnPushDown in reality
will push not only the computed column but also the calculation in the
query.

Second, when a plan matches the rule
PushComputedColumnIntoTableSourceScanRule, we have to build a new
RowData to
place all fields we require. However, both two rules
PushComputedColumnIntoTableSourceScanRule and
PushProjectIntoTableSourceScanRule will do the same work that prune the
records that read from source. It seems that we have two duplicate rules
in
planner. But I think we should use the rule
PushProjectIntoTableSourceScanRule rather than
PushComputedColumnIntoTableSourceScanRule if we don't support watermark
push down. Compared to PushComputedColumnIntoTableSourceScanRule,
PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
data from source rather than use a map function in flink.

Therefore, we think it's not a good idea to use two interfaces rather
than
one.


*New Proposal*

First of all, let us address some background when pushing watermarks into
table source scan. There are two structures that we need to consider. We
list two examples below for discussion.

structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])structure
2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL
SECOND)])   +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])

As we can see, structure 2 is much more complicated than structure 1. For
structure 1, we can use the row from table scan to generate watermarks
directly. But for structure 2, we need to calculate the rowtime
expression
in LogicalProject and use the result of calculation to generate
watermarks. Considering that WatermarkStrategy has the ability to extract
timestamp from row, we have a proposal to push only WatermarkStrategy to
scan.


Push WatermarkStrategy to Scan

In this interface, we will only push WatermarkStrategy to
DynamicTableSource.

public interface SupportsWatermarkPushDown {        void
applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }

The advantage of the new api is that it's very simple for the developers
of Connector. They only need to take WatermarkStrategy into consideration
and don't need to deal with other infos such as ComputedColumnConverter
in SupportsComputedColumnPushDown. But it also has one disadvantage that
it needs to calculate the rowtime expression again in LogicalProjection
because we don't build a new row in scan to store the calculated
timestamp.
However, we can replace the calculation of rowtime in LogicalProjection
with a reference to eliminate duplicate calculation, which will use the
StreamRecord's getter to read the timestamp that is calculated before.
But
this optimization still has one limitation that it relies on computed
columns are not defined on other computed columns. For nested computed
columns, we have no place to save the intermediate result.

But we still have a problem that when we push an udf into the source, we
need a context as powerful as FunctionContext to open the udf. But the
current WatermarkGeneratorSupplier.Context only supports method
getMetricGroup and misses methods getCachedFile, getJobParameter and g
etExternalResourceInfos, which means we can't convert the
WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
that the udf is only used to generate watermark, we suggest to throw
UnsupportedException when invoking the methods exist in FunctionContext
but don't exist in WatermarkGeneratorSupplier.Context. But we have to
admit that there are risks in doing so because we have no promise that
the
udf will not invoke these methods.


*Summary*

We have addressed the whole problem and solution in detail. As a
conclusion, I think the new interface avoids the problems we mentioned
before. It has a clear definition as its name tells and has nothing in
common with SupportProjectionPushDown. As for its disadvantage, I think
it's acceptable to calculate the rowtime column twice and we also have a
plan to improve its efficiency as a follow up if it brings some
performance
problems.






Reply via email to