[ https://issues.apache.org/jira/browse/FLINK-18481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17151164#comment-17151164 ]
Jark Wu commented on FLINK-18481: --------------------------------- 1. Could you try to sleep a while after the first job submission? The submission of {{executeSql()}} is asynchronized. 2. Could you try to use blink planner instead? > Kafka connector can't select data > --------------------------------- > > Key: FLINK-18481 > URL: https://issues.apache.org/jira/browse/FLINK-18481 > Project: Flink > Issue Type: Bug > Affects Versions: 1.12.0, 1.11.1 > Reporter: initsun > Priority: Major > > When I use flnk1.11-snapshot or 1.12-snapshot, I use flinksql and Kafka > connector, such as > “EnvironmentSettings fsSettings = EnvironmentSettings.newInstance > ().useOldPlanner().inStreamingMode().build(); > StreamExecutionEnvironment fsEnv = Stream > ExecutionEnvironment.getExecutionEnvironment (); > StreamTableEnvironment tableEnv = St reamTableEnvironment.create (fsEnv, > fsSettings); > String createA = "CREATE TABLE MyUserTable (\n" + > " t1 STRING,\n" + > " t2 INT\n" + > ") WITH (\n" + > " ' connector.type ' = 'kafka', \n" + > " ' connector.version ' = '0.11',\n" + > " ' connector.topic ' = 'csvtb', \n" + > " ' connector.properties.bootstrap .servers' = ' localhost:9092 ', \n" + > " ' connector.startup -mode' = 'earliest-offset', \n" + > " ' format.type ' = 'csv'\n" + > ")\n"; > tableEnv.executeSql (createA); > TableResult insert = tableEnv.executeSql ("INSERT INTO MyUserTable > VALUES('test',2)"); > insert.print (); > TableResult tableResult = tableEnv.executeSql ("SELECT t1,t2 FROM > MyUserTable"); > tableResult.print ();” > This code can insert data into Kafka, but it can't output the result. Why, > thank you -- This message was sent by Atlassian Jira (v8.3.4#803005)