代码结构改成这样的了:




val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment

val blinkEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
blinkEnvSettings)





streamExecutionEnv.execute("from kafka sink hbase")




还是报一样的错











在 2020-07-08 15:40:41,"夏帅" <jkill...@dingtalk.com.INVALID> 写道:
>你好,
>可以看看你的代码结构是不是以下这种
>    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>    val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>    val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>  ......
>    tableEnv.execute("")
>如果是的话,可以尝试使用bsEnv.execute("")
>1.11对于两者的execute代码实现有改动
>
>
>------------------------------------------------------------------
>发件人:Zhou Zach <wander...@163.com>
>发送时间:2020年7月8日(星期三) 15:30
>收件人:Flink user-zh mailing list <user-zh@flink.apache.org>
>主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>
>代码在flink 
>1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>Exception in thread "main" java.lang.IllegalStateException: No operators 
>defined in streaming topology. Cannot generate StreamGraph.
>at 
>org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>at 
>org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>
>
>但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>
>
>
>
>query:
>streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE `user` (
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |    'connector.type' = 'kafka',
>        |    'connector.version' = 'universal',
>        |    -- 'connector.topic' = 'user',
>        |    'connector.topic' = 'user_long',
>        |    'connector.startup-mode' = 'latest-offset',
>        |    'connector.properties.group.id' = 'user_flink',
>        |    'format.type' = 'json',
>        |    'format.derive-schema' = 'true'
>        |)
>        |""".stripMargin)
>
>
>
>
>
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE user_hbase3(
>        |    rowkey BIGINT,
>        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>        |) WITH (
>        |    'connector.type' = 'hbase',
>        |    'connector.version' = '2.1.0',
>        |    'connector.table-name' = 'user_hbase2',
>        |    'connector.zookeeper.znode.parent' = '/hbase',
>        |    'connector.write.buffer-flush.max-size' = '10mb',
>        |    'connector.write.buffer-flush.max-rows' = '1000',
>        |    'connector.write.buffer-flush.interval' = '2s'
>        |)
>        |""".stripMargin)
>
>
>    streamTableEnv.executeSql(
>      """
>        |
>        |insert into user_hbase3
>        |SELECT uid,
>        |
>        |  ROW(sex, age, created_time ) as cf
>        |  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
> created_time from `user`)
>        |
>        |""".stripMargin)
>
>
>
>
>
>
>
>

回复