Lyn Zhang created FLINK-33670:
---------------------------------
Summary: Public operators cannot be reused in multi sinks
Key: FLINK-33670
URL: https://issues.apache.org/jira/browse/FLINK-33670
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Lyn Zhang
Attachments: image-2023-11-28-14-31-30-153.png
Dear all:
I find that some public operators cannot be reused when submit a job with
multi sinks. I have an example as follows:
{code:java}
CREATE TABLE source (
id STRING,
ts TIMESTAMP(3),
v BIGINT,
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
) WITH (
'connector' = 'socket',
'hostname' = 'localhost',
'port' = '9999',
'byte-delimiter' = '10',
'format' = 'json'
);
CREATE VIEW source_distinct AS
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER w AS row_nu
FROM source
WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
) WHERE row_nu = 1;
CREATE TABLE print1 (
id STRING,
ts TIMESTAMP(3)
) WITH('connector' = 'blackhole');
INSERT INTO print1 SELECT id, ts FROM source_distinct;
CREATE TABLE print2 (
id STRING,
ts TIMESTAMP(3),
v BIGINT
) WITH('connector' = 'blackhole');
INSERT INTO print2
SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
FROM source_distinct
GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
!image-2023-11-28-14-31-30-153.png|width=384,height=145!
I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by
default, This will create different rel digests of the deduplicate operator
and finally cause match common operators fail.
In real production environment, Reuse common operators like deduplicate is more
worthy than project merge. A good solution is to interrupt the project merge
cross shuffle operators in multi sinks cases.
How did you consider it? Looking forward to your reply.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)