Lyn Zhang created FLINK-33670: --------------------------------- Summary: Public operators cannot be reused in multi sinks Key: FLINK-33670 URL: https://issues.apache.org/jira/browse/FLINK-33670 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: Lyn Zhang Attachments: image-2023-11-28-14-31-30-153.png
Dear all: I find that some public operators cannot be reused when submit a job with multi sinks. I have an example as follows: {code:java} CREATE TABLE source ( id STRING, ts TIMESTAMP(3), v BIGINT, WATERMARK FOR ts AS ts - INTERVAL '3' SECOND ) WITH ( 'connector' = 'socket', 'hostname' = 'localhost', 'port' = '9999', 'byte-delimiter' = '10', 'format' = 'json' ); CREATE VIEW source_distinct AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER w AS row_nu FROM source WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC) ) WHERE row_nu = 1; CREATE TABLE print1 ( id STRING, ts TIMESTAMP(3) ) WITH('connector' = 'blackhole'); INSERT INTO print1 SELECT id, ts FROM source_distinct; CREATE TABLE print2 ( id STRING, ts TIMESTAMP(3), v BIGINT ) WITH('connector' = 'blackhole'); INSERT INTO print2 SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v) FROM source_distinct GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code} !image-2023-11-28-14-31-30-153.png|width=384,height=145! I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by default, This will create different rel digests of the deduplicate operator and finally cause match common operators fail. In real production environment, Reuse common operators like deduplicate is more worthy than project merge. A good solution is to interrupt the project merge cross shuffle operators in multi sinks cases. How did you consider it? Looking forward to your reply. -- This message was sent by Atlassian Jira (v8.20.10#820010)