[ 
https://issues.apache.org/jira/browse/FLINK-17892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115712#comment-17115712
 ] 

Jark Wu commented on FLINK-17892:
---------------------------------

I see. Thanks for the explanation. The default start position is 
"GROUP_OFFSETS", which uses the committed offset of this group id. If the 
source is used by multiple pipline without changing the default start position, 
the reading offset will be un-deterministed. 

I'm not sure why we use "GROUP_OFFSETS" as default, is it the most commonly 
case? I know it is the default behavior of {{FlinkKafkaConsumer}}, but I think 
Table/SQL is a little different, where the registered kafka source will be used 
in different job and may forgot to change the default behavior. Shall we change 
it to "LATEST_OFFSET"? It is also the default behavior of 
{{kafka-console-consumer.sh}}.

What do you think [~twalthr], [~aljoscha]?

> Dynamic option may not be a part of the table digest
> ----------------------------------------------------
>
>                 Key: FLINK-17892
>                 URL: https://issues.apache.org/jira/browse/FLINK-17892
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0
>            Reporter: hailong wang
>            Priority: Major
>         Attachments: image-2020-05-25-10-47-18-829.png, 
> image-2020-05-25-10-50-53-033.png
>
>
> For now, Table properties not be a part of table digest, but dynamic option 
> will be included.
> This will lead to an error when plan reused.
> if I defines a kafka table:
> {code:java}
> CREATE TABLE KAFKA (
> ……
> ) with (
> topic = 'xx',
> groupid = 'xxx'
> ……
> )
> Insert into sinktable select * from KAFKA;
> Insert into sinktable1 select * from KAFKA;{code}
> KAFKA source will be reused according to the SQL above.
> But if i add different table hint to dml, like:
> {code:java}
> Insert into sinktable select * from KAFKA /*+ OPTIONS('k1' = 'v1')*/;
> Insert into sinktable1 select * from KAFKA /*+ OPTIONS('k2' = 'v2')*/;
> {code}
> There will be two kafka tableSources  use the same groupid to  consumer the 
> same topic.
> So I think dynamic option may not be a part of the table digest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to