我遇到个问题,请教一下:
环境 1.11 idea
参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
求助,各位
下面是pom 和代码,以及运行结果
// 创建执行环境
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//设置StateBackend
bsEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/chkdir"));
EnvironmentSettings bsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
// Kafka
String sourceDDL ="CREATE TABLE user_behavior (" +
"user_id BIGINT," +
"item_id BIGINT," +
"category_id BIGINT," +
"behavior STRING," +
"ts TIMESTAMP (3)," +
"proctime AS PROCTIME ()," +
"WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
"WITH (" +
"'connector'='kafka'," +
"'topic'='user_behavior'," +
"'scan.startup.mode'='earliest-offset'," +
"'properties.bootstrap.servers'='localhost:9092'," +
"'format'='json'" +
")";
//写入es 改为print
/* String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
"hour_of_day BIGINT," +
"buy_cnt BIGINT" +
") WITH (" +
"'connector'='elasticsearch-7'," +
"'hosts'='http://localhost:9200'," +
"'index'='buy_cnt_per_hour')";*/
String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
"hour_of_day BIGINT," +
"buy_cnt BIGINT" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
"SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
hour_of_day , COUNT(*) as buy_cnt\n" +
"FROM user_behavior\n" +
"WHERE behavior = 'buy'\n" +
"GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
//注册source和sink
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// tableResult.print();
tEnv.executeSql(transformationDDL);
pom
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
运行结果
01:15:12,358 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser -
Kafka version: unknown
01:15:12,358 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser -
Kafka commitId: unknown
01:15:12,358 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser -
Kafka startTimeMs: 1597338912355
01:15:12,361 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
- [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
user_behavior-0
01:15:12,365 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
- [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
of partition user_behavior-0
01:15:12,377 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Consumer
clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
01:15:12,387 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
- [Consumer clientId=consumer-20, groupId=null] Resetting offset for
partition user_behavior-0 to offset 0.
01:15:12,545 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1 (type=CHECKPOINT) @ 1597338912539 for job
c10220b65246e8269defa48f441a7e09.
01:15:12,709 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 1 for job c10220b65246e8269defa48f441a7e09 (14080
bytes in 169 ms).
01:15:17,541 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 2 (type=CHECKPOINT) @ 1597338917540 for job
c10220b65246e8269defa48f441a7e09.
01:15:17,553 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 2 for job c10220b65246e8269defa48f441a7e09 (14752
bytes in 11 ms).
01:15:22,546 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 3 (type=CHECKPOINT) @ 1597338922545 for job
c10220b65246e8269defa48f441a7e09.
01:15:22,558 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 3 for job c10220b65246e8269defa48f441a7e09 (15004
bytes in 12 ms).
原始数据
3>
+I(999602,4024409,883960,cart,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
3> +I(30616,1693200,4022701,pv,2017-11-27T00:07:36,2020-08-13T17:16:20.440)
3> +I(145183,3533745,1102540,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(323010,3376212,1574064,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(944547,2640409,2465336,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(232939,1976318,411153,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3>
+I(355996,5161162,1582197,buy,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
3> +I(443987,3791622,1464116,pv,2017-11-27T00:07:37,2020-08-13T17:16:20.440)
--
Sent from: http://apache-flink.147419.n8.nabble.com/