Yuan Kui created FLINK-36043:
--------------------------------

             Summary: [SQL] Unexpected SinkMaterializer operator generated when 
use coalesce function on upsert key fields
                 Key: FLINK-36043
                 URL: https://issues.apache.org/jira/browse/FLINK-36043
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.20.0
            Reporter: Yuan Kui
         Attachments: image-2024-08-13-16-40-27-929.png

As deigned, the SinkMaterializer operator should not be generated when the 
upsert keys are the same as the primary keys. Example:

 
{code:java}
-- source table
create table source_table (
   col1 int,
   col2 int,
   col3 int,
   col4 int
) with (
   'connector' = 'datagen',
   'rows-per-second'='10',
   'fields.col1.kind'='random',
   'fields.col2.kind'='random',
   'fields.col3.kind'='random',
   'fields.col4.kind'='random'
);

-- sink table
create table sink_table(
   col1 int,
   col2 int,
   col3_cnt bigint,
   col4_max int,
   primary key(col1, col2) not enforced
) with (
   'connector' = 'blackhole'
);

-- sql
insert into sink_table
select
  col1,
  col2,
  count(col3) like col3_cnt,
  max(col4) like col4_max
from source_table
group by col1,col2;{code}
It works well, and the excution plan has no SinkMaterializer operator:

 

 
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
col3_cnt, col4_max])
+- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
col3_cnt, MAX(col4) AS col4_max])
   +- Exchange(distribution=[hash[col1, col2]])
      +- TableSourceScan(table=[[default_catalog, default_database, 
source_table]], fields=[col1, col2, col3, col4])  {code}
however, if we use coalesce function on upsert keys, such as:

 

 
{code:java}
insert into sink_table
select
  -- use coalesce
  coalesce(col1, 0) as col1, 
  col2,
  count(col3) like col3_cnt,
  max(col4) like col4_max
from source_table
group by col1,col2; {code}
the SinkMaterializer operator will be generated:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
col3_cnt, col4_max], upsertMaterialize=[true])
+- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
   +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
col3_cnt, MAX(col4) AS col4_max])
      +- Exchange(distribution=[hash[col1, col2]])
         +- TableSourceScan(table=[[default_catalog, default_database, 
source_table]], fields=[col1, col2, col3, col4]) {code}
If changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)`,  meet the same 
problem.

 

The code for determining whether a SinkMaterializer operator should be 
generated is in 
`org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
 If making a point in line 881 like this:

!image-2024-08-13-16-40-27-929.png!

I found the changeLogUpsertKeys are empty, which lead to 'whether to generate 
SinkMaterializer'  always true.

Is that by designed or a bug?

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to