[ https://issues.apache.org/jira/browse/FLINK-17892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115646#comment-17115646 ]
hailong wang commented on FLINK-17892: -------------------------------------- Yes, If the planner to reuse a source one starts from the earilest offset, the other starts from the latest offset, It is right not to reuse source. My example is as follows: {code:java} CREATE TABLE sourceTable ( id BIGINT, b1 ROW(q1 STRING, q2 STRING) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = '**', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '**', 'connector.properties.bootstrap.servers' = '**', 'connector.properties.group.id' = 'mygroupid', 'format.type' = 'json' );CREATE TABLE sinkTable ( id BIGINT, b1q1 STRING ) WITH ( 'connector.type' = '**', 'connector.version' = '0.10', 'connector.topic' = '**', 'connector.properties.zookeeper.connect' = '**', 'connector.properties.bootstrap.servers' = '**', 'format.type' = 'json' );CREATE TABLE sinkTable1 ( id BIGINT, b1q1 STRING ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = '**', 'connector.properties.zookeeper.connect' = '**', 'connector.properties.bootstrap.servers' = '**', 'format.type' = 'json' ); INSERT INTO sinkTable SELECT id, b1.q1 from sourceTable /*+ OPTIONS('connector.startup-mode' = 'earliest-offset') */; INSERT INTO sinkTable1 SELECT id, b1.q1 from sourceTable /*+ OPTIONS('connector.startup-mode' = 'latest-offset') */; {code} The streamGraph is: !image-2020-05-25-10-47-18-829.png|width=936,height=285! >From the side of different connector.startup-mode, source not be reused is >right. But, DataSource(ID=1) and DataSource(ID=6) use the same groupid consume the same topic, this will lead to sinkTable and sinkTable1 get a random piece of data of this topic. Maybe the correct dag is: !image-2020-05-25-10-50-53-033.png|width=1242,height=324! > Dynamic option may not be a part of the table digest > ---------------------------------------------------- > > Key: FLINK-17892 > URL: https://issues.apache.org/jira/browse/FLINK-17892 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 1.11.0 > Reporter: hailong wang > Priority: Major > Attachments: image-2020-05-25-10-47-18-829.png, > image-2020-05-25-10-50-53-033.png > > > For now, Table properties not be a part of table digest, but dynamic option > will be included. > This will lead to an error when plan reused. > if I defines a kafka table: > {code:java} > CREATE TABLE KAFKA ( > …… > ) with ( > topic = 'xx', > groupid = 'xxx' > …… > ) > Insert into sinktable select * from KAFKA; > Insert into sinktable1 select * from KAFKA;{code} > KAFKA source will be reused according to the SQL above. > But if i add different table hint to dml, like: > {code:java} > Insert into sinktable select * from KAFKA /*+ OPTIONS('k1' = 'v1')*/; > Insert into sinktable1 select * from KAFKA /*+ OPTIONS('k2' = 'v2')*/; > {code} > There will be two kafka tableSources use the same groupid to consumer the > same topic. > So I think dynamic option may not be a part of the table digest. -- This message was sent by Atlassian Jira (v8.3.4#803005)