您好:

       非常感谢您的建议,我已经成功解决了这个问题,但是我又发现了一个新的问题,我这里设置的超时时间是一分钟或者超时行数是5000行,
我在这期间更新了维表数据,但是我发现已经超过了超时时间,输出结果仍然没有被更新,是我理解的有问题么?
我尝试了停止输入流数据直到达到超时时间后仍然没有更新维表,除非停止整个程序,否则我的维表数据都不会被更新。
请问这个问题有解决的办法么?

def register_mysql_source(st_env):
    source_ddl = \
    """
    CREATE TABLE dim_mysql (
    id int,  -- 
    type varchar -- 
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '****',
    'password' = '****',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '1s',
    'lookup.max-retries' = '3'
    )
    """    
    st_env.sql_update(source_ddl)
  


                                      感谢!




琴师

 
发件人: Leonard Xu
发送时间: 2020-07-22 10:54
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
 
你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。
 
Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options>
> 
> def register_rides_source(st_env):
>    source_ddl = \
>    """
>    create table source1(
>     id int,
>     time1 varchar ,
>     type string
>     ) with (
>    'connector.type' = 'kafka',
>    'connector.topic' = 'tp1',
>    'connector.startup-mode' = 'latest-offset',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'format.type' = 'json',
>    'connector.version' = 'universal',
>    'update-mode' = 'append'
>     )
>    “"" 

回复