hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
" a bigint, " +
" b bigint " +
" ) WITH ( " +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +
" 'connector.topic' = 'mytesttopic', " +
" 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
" 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
" 'connector.properties.group.id' = 'flink-test-cxy', " +
" 'connector.startup-mode' = 'latest-offset', " +
" 'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
" game_id varchar, " +
" PRIMARY KEY (id) NOT ENFORCED " +
" ) " +
" with ( " +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , "
+
" 'connector.username' = 'root' , " +
" 'connector.password' = 'root', " +
" 'connector.table' = 'mysqlsink' , " +
" 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
" 'connector.write.flush.interval' = '2s', " +
" 'connector.write.flush.max-rows' = '300' " +
" )");
tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values
(select a,cast(b as varchar) b from mySource)");
问题一 : 上面的insert语句会出现如下错误
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply
'$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A,
VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE
FIELD)>)'
问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select
a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry
'1' for key 'PRIMARY'