Note that you posted to the english speaking mailing list. For the
Chinese-speaking version please use user...@flink.apache.org.

On Thu, Dec 24, 2020 at 3:39 PM Appleyuchi <appleyu...@163.com> wrote:

> 是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,
> 求助,谢谢
>
>
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api.{EnvironmentSettings, Table}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> import scala.math.Ordering.Int
>
>
>
> object FlinkKafkaDDLDemo
> {
>
>   def main(args: Array[String]): Unit =
>   {
>
>     val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>
>     env.setParallelism(3)
>
>
>
>     val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>     val bsSettings = EnvironmentSettings.newInstance()
>       .useBlinkPlanner()
>       .inStreamingMode()
>       .build()
>     val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>
>     val createTable =
>       """
>               |CREATE TABLE PERSON (
>
>               |    name VARCHAR COMMENT '姓名',
>
>               |    age VARCHAR COMMENT '年龄',
>
>               |    city VARCHAR COMMENT '所在城市',
>
>               |    address VARCHAR COMMENT '家庭住址',
>
>               |    ts TIMESTAMP(3) COMMENT '时间戳'
>
>               |)
>
>               |WITH (
>
>               |    'connector.type' = 'kafka', -- 使用 kafka connector
>
>               |    'connector.version' = 'universal',  -- kafka 版本
>
>               |    'connector.topic' = 'kafka_ddl',  -- kafka topic
>
>               |    'connector.startup-mode' = 'earliest-offset', -- 从最早的 
> offset 开始读取
>
>               |    'connector.properties.0.key' = 'zookeeper.connect',  -- 
> 连接信息
>
>               |    'connector.properties.0.value' = 'Desktop:2181',
>
>               |    'connector.properties.1.key' = 'bootstrap.servers',
>
>               |    'connector.properties.1.value' = 'Desktop:9091',
>
>               |    'update-mode' = 'append',
>
>               |    'format.type' = 'json',  -- 数据源格式为 json
>
>               |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 
> 解析规则
>
>               |)
>
>             """.stripMargin
>
>
>
>     tEnv.executeSql(createTable)
>
>
>
>     val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY 
> name""".stripMargin
>
>
>
>     val result: Table = tEnv.sqlQuery(query)
>
>     tEnv.toRetractStream[Row](result).print()
> //    tEnv.execute("Flink SQL DDL")
>
>   }
>
> }
>
>
>
>
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to