Hi Yongsong,
in newer Flink versions we introduced the concept of statament sets,
which are available via `TableEnvironment.createStatementSet()`. They
allow you to opimized a branching pipeline as a whole with reusing subplans.
In older Flink versions, you can convert the Table to a DataStream and
reregister it as a Table. In this case, the subplan will be materialized
into a DataStream pipeline and the planner sees it as a blackbox that
will be shared by multiple branches.
I hope this helps.
Regards,
Timo
On 08.02.21 03:59, Yongsong He wrote:
Hi experts,
I want to cache a temporary table for reuse it
Flink version 1.10.1
the table is consumer from kafka, struct like:
create table a (
field1 string,
field2 string,
field3 string,
field4 string
)
the sample code looks like:
val settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")
temptable.where(condition1) ... then do something
temptable.where(condition2) ... then do otherthing
I want to reuse temptable for higher performance, what operators need or
it already cached in flink sql plan ?
Any help would be appreciated :)