Hi,
这是因为flink
1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去

    tableResult.getJobClient.get()
      .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
      .get()

进行wait job finished

Best,
Xingbo

DanielGu <[email protected]> 于2020年8月14日周五 上午11:45写道:

> 我遇到个问题,请教一下:
> 环境 1.11 idea
> 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
> 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
> https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
> 求助,各位
>
>
>
> 下面是pom 和代码,以及运行结果
>
>         // 创建执行环境
>         StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         //设置StateBackend
>         bsEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/chkdir"));
>         EnvironmentSettings bsSettings = EnvironmentSettings
>                 .newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
>
>         // Kafka
>         String sourceDDL ="CREATE TABLE user_behavior (" +
>                 "user_id BIGINT," +
>                 "item_id BIGINT," +
>                 "category_id BIGINT," +
>                 "behavior STRING," +
>                 "ts TIMESTAMP (3)," +
>                 "proctime AS PROCTIME ()," +
>                 "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
>                 "WITH (" +
>                 "'connector'='kafka'," +
>                 "'topic'='user_behavior'," +
>                 "'scan.startup.mode'='earliest-offset'," +
>                 "'properties.bootstrap.servers'='localhost:9092'," +
>                 "'format'='json'" +
>                 ")";
>
>
>         //写入es 改为print
> /*        String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
>                 "hour_of_day BIGINT," +
>                 "buy_cnt BIGINT" +
>                 ") WITH (" +
>                 "'connector'='elasticsearch-7'," +
>                 "'hosts'='http://localhost:9200'," +
>                 "'index'='buy_cnt_per_hour')";*/
>         String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
>                  "hour_of_day BIGINT," +
>                  "buy_cnt BIGINT" +
>                 ") WITH (\n" +
>                 " 'connector' = 'print'\n" +
>                 ")";
>
>
>         String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
>                 "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
>                 "FROM user_behavior\n" +
>                 "WHERE behavior = 'buy'\n" +
>                 "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
>         //注册source和sink
>         tEnv.executeSql(sourceDDL);
>         tEnv.executeSql(sinkDDL);
> //        tableResult.print();
>
>        tEnv.executeSql(transformationDDL);
>
> pom
> <dependencies>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-common</artifactId>
>             <version>${flink.version}</version>
>             <scope>provided</scope>
>         </dependency>
>
>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-clients_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-json</artifactId>
>             <version>${flink.version}</version>
>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>
>         <dependency>
>             <groupId>mysql</groupId>
>             <artifactId>mysql-connector-java</artifactId>
>             <version>${mysql.version}</version>
>         </dependency>
>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-runtime-web_${scala.version}</artifactId>
>             <version>${flink.version}</version>
>             <scope>provided</scope>
>         </dependency>
> </dependencies>
>
> 运行结果
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka version: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka commitId: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka startTimeMs: 1597338912355
> 01:15:12,361 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
>
> - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
> user_behavior-0
> 01:15:12,365 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
>
> - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
> of partition user_behavior-0
> 01:15:12,377 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  -
> [Consumer
> clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
> 01:15:12,387 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
>
> - [Consumer clientId=consumer-20, groupId=null] Resetting offset for
> partition user_behavior-0 to offset 0.
> 01:15:12,545 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job
> c10220b65246e8269defa48f441a7e09.
> 01:15:12,709 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080
> bytes in 169 ms).
> 01:15:17,541 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job
> c10220b65246e8269defa48f441a7e09.
> 01:15:17,553 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752
> bytes in 11 ms).
> 01:15:22,546 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job
> c10220b65246e8269defa48f441a7e09.
> 01:15:22,558 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004
> bytes in 12 ms).
>
>
> 原始数据
>
> 3>
> +I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
> 3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
> 3>
> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
> 3>
> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复