Note that you posted to the english speaking mailing list. For the Chinese-speaking version please use user...@flink.apache.org.
On Thu, Dec 24, 2020 at 3:39 PM Appleyuchi <appleyu...@163.com> wrote: > 是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出, > 求助,谢谢 > > > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.api.{EnvironmentSettings, Table} > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.types.Row > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > > import scala.math.Ordering.Int > > > > object FlinkKafkaDDLDemo > { > > def main(args: Array[String]): Unit = > { > > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > env.setParallelism(3) > > > > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > val bsSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings) > > val createTable = > """ > |CREATE TABLE PERSON ( > > | name VARCHAR COMMENT '姓名', > > | age VARCHAR COMMENT '年龄', > > | city VARCHAR COMMENT '所在城市', > > | address VARCHAR COMMENT '家庭住址', > > | ts TIMESTAMP(3) COMMENT '时间戳' > > |) > > |WITH ( > > | 'connector.type' = 'kafka', -- 使用 kafka connector > > | 'connector.version' = 'universal', -- kafka 版本 > > | 'connector.topic' = 'kafka_ddl', -- kafka topic > > | 'connector.startup-mode' = 'earliest-offset', -- 从最早的 > offset 开始读取 > > | 'connector.properties.0.key' = 'zookeeper.connect', -- > 连接信息 > > | 'connector.properties.0.value' = 'Desktop:2181', > > | 'connector.properties.1.key' = 'bootstrap.servers', > > | 'connector.properties.1.value' = 'Desktop:9091', > > | 'update-mode' = 'append', > > | 'format.type' = 'json', -- 数据源格式为 json > > | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json > 解析规则 > > |) > > """.stripMargin > > > > tEnv.executeSql(createTable) > > > > val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY > name""".stripMargin > > > > val result: Table = tEnv.sqlQuery(query) > > tEnv.toRetractStream[Row](result).print() > // tEnv.execute("Flink SQL DDL") > > } > > } > > > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng