Hi,

Flink里面watermark要求时间字段是毫秒级别的,你可以看下你的watermark是否正常。感觉可能是这里的问题。


steven chen <stevenche...@163.com> 于2020年5月29日周五 下午2:34写道:

> 数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
> CREATE TABLE user_behavior (
>
> itemCode VARCHAR,
>
> ts BIGINT COMMENT '时间戳',
>
> t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'yyyy-MM-dd HH:mm:ss')),
>
> proctime as PROCTIME(),
>
> WATERMARK FOR t as t - INTERVAL '5' SECOND
>
> ) WITH (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'scan-flink-topic',
>
> 'connector.properties.group.id' ='qrcode_pv_five_min',
>
> 'connector.startup-mode' = 'latest-offset',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'update-mode' = 'append',
>
> 'format.type' = 'json',
>
> 'format.derive-schema' = 'true'
> );
> CREATE TABLE pv_five_min (
> item_code VARCHAR,
> dt VARCHAR,
> dd VARCHAR,
> pv BIGINT
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
> 'connector.table' = 'qrcode_pv_five_min',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = 'root',
> 'connector.write.flush.max-rows' = '1'
> );
> INSERT INTO pv_five_min
> SELECT
> itemCode As item_code,
> DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dt,
> DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'yyyy-MM-dd HH:mm') dd,
> COUNT(*) AS pv
> FROM user_behavior
> GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;
>
>
>
>
>
>
>


-- 

Best,
Benchao Li

Reply via email to