[ 
https://issues.apache.org/jira/browse/FLINK-17892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115646#comment-17115646
 ] 

hailong wang commented on FLINK-17892:
--------------------------------------

Yes, If the planner to reuse a source one starts from the earilest offset, the 
other starts from the latest offset, It is right not to reuse source.

My example is as follows:
{code:java}
CREATE TABLE sourceTable (
    id BIGINT,
    b1 ROW(q1 STRING, q2 STRING)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.10',
    'connector.topic' = '**',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.zookeeper.connect' = '**',
    'connector.properties.bootstrap.servers' = '**',
    'connector.properties.group.id' = 'mygroupid',
    'format.type' = 'json'
);CREATE TABLE sinkTable (
    id BIGINT,
    b1q1 STRING
) WITH (
    'connector.type' = '**',
    'connector.version' = '0.10',
    'connector.topic' = '**',
    'connector.properties.zookeeper.connect' = '**',
    'connector.properties.bootstrap.servers' = '**',
    'format.type' = 'json'
);CREATE TABLE sinkTable1 (
    id BIGINT,
    b1q1 STRING
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.10',
    'connector.topic' = '**',
    'connector.properties.zookeeper.connect' = '**',
    'connector.properties.bootstrap.servers' = '**',
    'format.type' = 'json'
);
INSERT INTO sinkTable SELECT id, b1.q1 from sourceTable /*+ 
OPTIONS('connector.startup-mode' = 'earliest-offset') */;
INSERT INTO sinkTable1 SELECT id, b1.q1 from sourceTable /*+ 
OPTIONS('connector.startup-mode' = 'latest-offset') */;
{code}
The streamGraph is:

!image-2020-05-25-10-47-18-829.png|width=936,height=285!

>From the side of different connector.startup-mode, source not be reused is 
>right.

But, DataSource(ID=1) and DataSource(ID=6) use the same groupid consume the 
same topic, this will lead to sinkTable and sinkTable1 get a random piece of 
data of this topic.

Maybe the correct dag is:

!image-2020-05-25-10-50-53-033.png|width=1242,height=324!

 

 

> Dynamic option may not be a part of the table digest
> ----------------------------------------------------
>
>                 Key: FLINK-17892
>                 URL: https://issues.apache.org/jira/browse/FLINK-17892
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0
>            Reporter: hailong wang
>            Priority: Major
>         Attachments: image-2020-05-25-10-47-18-829.png, 
> image-2020-05-25-10-50-53-033.png
>
>
> For now, Table properties not be a part of table digest, but dynamic option 
> will be included.
> This will lead to an error when plan reused.
> if I defines a kafka table:
> {code:java}
> CREATE TABLE KAFKA (
> ……
> ) with (
> topic = 'xx',
> groupid = 'xxx'
> ……
> )
> Insert into sinktable select * from KAFKA;
> Insert into sinktable1 select * from KAFKA;{code}
> KAFKA source will be reused according to the SQL above.
> But if i add different table hint to dml, like:
> {code:java}
> Insert into sinktable select * from KAFKA /*+ OPTIONS('k1' = 'v1')*/;
> Insert into sinktable1 select * from KAFKA /*+ OPTIONS('k2' = 'v2')*/;
> {code}
> There will be two kafka tableSources  use the same groupid to  consumer the 
> same topic.
> So I think dynamic option may not be a part of the table digest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to