Hi, Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。
steven chen <stevenche...@163.com> 于2020年5月29日周五 下午2:34写道: > 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答 > CREATE TABLE user_behavior ( > > itemCode VARCHAR, > > ts BIGINT COMMENT '时间戳', > > t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')), > > proctime as PROCTIME(), > > WATERMARK FOR t as t - INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'scan-flink-topic', > > 'connector.properties.group.id' ='qrcode_pv_five_min', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = 'localhost:9092', > > 'update-mode' = 'append', > > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > ); > CREATE TABLE pv_five_min ( > item_code VARCHAR, > dt VARCHAR, > dd VARCHAR, > pv BIGINT > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode', > 'connector.table' = 'qrcode_pv_five_min', > 'connector.driver' = 'com.mysql.jdbc.Driver', > 'connector.username' = 'root', > 'connector.password' = 'root', > 'connector.write.flush.max-rows' = '1' > ); > INSERT INTO pv_five_min > SELECT > itemCode As item_code, > DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt, > DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd, > COUNT(*) AS pv > FROM user_behavior > GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode; > > > > > > > -- Best, Benchao Li