补充sql:
DDL:
CREATE TABLE flink_recent_pv_subid
(
`supply_id` STRING,
`subid` STRING,
`mark` STRING,
`time` STRING,
`pv` BIGINT,
PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
......
);
查询SQL:
INSERT INTO
flink_recent_pv_subid
SELECT
`sid`,
`subid`,
`mark`,
DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') as `time`,
count(1) AS `pv`
FROM baidu_log_view
GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
赵一旦 <[email protected]> 于2020年11月23日周一 下午3:00写道:
> @hailongwang 一样的。
>
> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>
>
>
>
> hailongwang <[email protected]> 于2020年11月23日周一 下午2:39写道:
>
>> 数据库中主键的设置跟 primary key 定义的一样不?
>>
>>
>> Best,
>> Hailong
>> 在 2020-11-23 13:15:01,"赵一旦" <[email protected]> 写道:
>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>> >duplicate方式写入。
>> >
>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> > at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>> >NativeConstructorAccessorImpl.java:62)
>> > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>> >DelegatingConstructorAccessorImpl.java:45)
>> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> > at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>> > at com.mysql.jdbc.Util.getInstance(Util.java:386)
>> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>> > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>> > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>> > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>> > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>> > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>> > at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>> >.java:2157)
>> > at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2460)
>> > at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2377)
>> > at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2361)
>> > at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>> >PreparedStatement.java:1793)
>> >
>> >(2)
>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>> >但这个冲突的entry是在14.11分那一波才报错的。
>>
>