Hi,

你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。

Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options>
> 
> def register_rides_source(st_env):
>    source_ddl = \
>    """
>    create table source1(
>     id int,
>     time1 varchar ,
>     type string
>     ) with (
>    'connector.type' = 'kafka',
>    'connector.topic' = 'tp1',
>    'connector.startup-mode' = 'latest-offset',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'format.type' = 'json',
>    'connector.version' = 'universal',
>    'update-mode' = 'append'
>     )
>    “"" 

回复