gaoling ma created FLINK-18626: ---------------------------------- Summary: the result of aggregate SQL on streaming cannot write to upsert table sink Key: FLINK-18626 URL: https://issues.apache.org/jira/browse/FLINK-18626 Project: Flink Issue Type: Improvement Components: Connectors / JDBC, Table SQL / API Affects Versions: 1.11.0 Reporter: gaoling ma
{code:java} StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); ...... bsTableEnv.executeSql("CREATE TABLE aaa(\n" + " `area_code` VARCHAR,\n" + " `stat_date` DATE,\n" + " `index` BIGINT,\n" + " PRIMARY KEY (area_code, stat_date) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://***/laowufp_data_test',\n" + " 'table-name' = 'aaa',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = '***',\n" + " 'password' = '***'\n" + ")"); bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code"); {code} When I write the aggregate SQL results into upsert stream JDBC table sink, the program automatically exits with no hint. The aggregate results suppose to be a restract stream, but another question is how to change the restract stream into upsert stream. Or there is a better way to continuous update the aggregate SQL results into JDBC table. Your comment is appreciated. -- This message was sent by Atlassian Jira (v8.3.4#803005)