[ https://issues.apache.org/jira/browse/FLINK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dian Fu updated FLINK-18549: ---------------------------- Fix Version/s: (was: 1.11.1) 1.11.2 > flink 1.11 can not commit partition automatically > ------------------------------------------------- > > Key: FLINK-18549 > URL: https://issues.apache.org/jira/browse/FLINK-18549 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.11.0 > Reporter: Jun Zhang > Priority: Major > Fix For: 1.11.2 > > > I use the sql of flink 1.11, read from kafka and writing to hdfs, I found > that the partition cannot be submitted automatically. This is my complete > code。 > My checkpoint interval is 10s. I think it should be normal that there will be > _SUCCESS file under the partition of hdfs every 10s, but in fact there is no > > {code:java} > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.enableCheckpointing(10000); > bsEnv.setParallelism(1); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > String sqlSource = "CREATE TABLE source_kafka (\n" + > " appName STRING,\n" + > " appVersion STRING,\n" + > " uploadTime STRING\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', \n" + > " 'connector.version' = '0.10',\n" + > " 'connector.topic' = 'test_topic',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'connector.properties.group.id' = 'testGroup',\n" > + > " 'format.type'='json',\n" + > " 'update-mode' = 'append' )"; > tEnv.executeSql(sqlSource); > String sql = "CREATE TABLE fs_table (\n" + > " appName STRING,\n" + > " appVersion STRING,\n" + > " uploadTime STRING,\n" + > " dt STRING," + > " h string" + > ") PARTITIONED BY (dt,h) WITH (\n" + > " 'connector'='filesystem',\n" + > " 'path'='hdfs://localhost/tmp/',\n" + > " 'sink.partition-commit.policy.kind' = 'success-file', > " + > " 'format'='orc'\n" + > ")"; > tEnv.executeSql(sql); > String insertSql = "insert into fs_table SELECT appName > ,appVersion,uploadTime, " + > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > tEnv.executeSql(insertSql); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)