[ https://issues.apache.org/jira/browse/FLINK-18481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17151166#comment-17151166 ]
initsun edited comment on FLINK-18481 at 7/4/20, 4:29 AM: ---------------------------------------------------------- {quote}StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String createA = "CREATE TABLE MyUserTable (\n" + " t1 STRING,\n" + " t2 INT\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'csvtb', \n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092', \n" + " 'connector.startup-mode' = 'earliest-offset', \n" + " 'format.type' = 'csv'\n" + ")\n"; bsTableEnv.executeSql(createA); TableResult insert = bsTableEnv.executeSql("INSERT INTO MyUserTable VALUES('test111',2)"); insert.print(); Thread.sleep(10000); TableResult tableResult = bsTableEnv.executeSql("SELECT t1,t2 FROM MyUserTable"); tableResult.print(); {quote} Still not was (Author: init): {quote}StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String createA = "CREATE TABLE MyUserTable (\n" + " t1 STRING,\n" + " t2 INT\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'csvtb', \n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092', \n" + " 'connector.startup-mode' = 'earliest-offset', \n" + " 'format.type' = 'csv'\n" + ")\n"; bsTableEnv.executeSql(createA); TableResult insert = bsTableEnv.executeSql("INSERT INTO MyUserTable VALUES('test111',2)"); insert.print(); Thread.sleep(10000); TableResult tableResult = bsTableEnv.executeSql("SELECT t1,t2 FROM MyUserTable"); tableResult.print(); {quote} > Kafka connector can't select data > --------------------------------- > > Key: FLINK-18481 > URL: https://issues.apache.org/jira/browse/FLINK-18481 > Project: Flink > Issue Type: Bug > Affects Versions: 1.12.0, 1.11.1 > Reporter: initsun > Priority: Major > > When I use flnk1.11-snapshot or 1.12-snapshot, I use flinksql and Kafka > connector, such as > “EnvironmentSettings fsSettings = EnvironmentSettings.newInstance > ().useOldPlanner().inStreamingMode().build(); > StreamExecutionEnvironment fsEnv = Stream > ExecutionEnvironment.getExecutionEnvironment (); > StreamTableEnvironment tableEnv = St reamTableEnvironment.create (fsEnv, > fsSettings); > String createA = "CREATE TABLE MyUserTable (\n" + > " t1 STRING,\n" + > " t2 INT\n" + > ") WITH (\n" + > " ' connector.type ' = 'kafka', \n" + > " ' connector.version ' = '0.11',\n" + > " ' connector.topic ' = 'csvtb', \n" + > " ' connector.properties.bootstrap .servers' = ' localhost:9092 ', \n" + > " ' connector.startup -mode' = 'earliest-offset', \n" + > " ' format.type ' = 'csv'\n" + > ")\n"; > tableEnv.executeSql (createA); > TableResult insert = tableEnv.executeSql ("INSERT INTO MyUserTable > VALUES('test',2)"); > insert.print (); > TableResult tableResult = tableEnv.executeSql ("SELECT t1,t2 FROM > MyUserTable"); > tableResult.print ();” > This code can insert data into Kafka, but it can't output the result. Why, > thank you -- This message was sent by Atlassian Jira (v8.3.4#803005)