如下是Flink官方文档JBDC connector的部分内容。Key handling <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>
Flink uses the primary key that defined in DDL when writing data to external databases. The connector operate in upsert mode if the primary key was defined, otherwise, the connector operate in append mode. In upsert mode, Flink will insert a new row or update the existing row according to the primary key, Flink can ensure the idempotence in this way. To guarantee the output result is as expected, it’s recommended to define primary key for the table and make sure the primary key is one of the unique key sets or primary key of the underlying database table. In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database. See CREATE TABLE DDL <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table> for more details about PRIMARY KEY syntax. 这里也有一点,In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database. 什么叫append mode。这个是指根据sql的逻辑自己去判定是否append,还是啥? 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。 赵一旦 <[email protected]> 于2020年11月23日周一 下午3:02写道: > 补充sql: > > DDL: > > CREATE TABLE flink_recent_pv_subid > ( > `supply_id` STRING, > `subid` STRING, > `mark` STRING, > `time` STRING, > `pv` BIGINT, > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED > ) WITH ( > 'connector.type' = 'jdbc', > > ...... > > ); > > > 查询SQL: > > INSERT INTO > flink_recent_pv_subid > SELECT > `sid`, > `subid`, > `mark`, > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), > 'yyyyMMddHHmm') as `time`, > count(1) AS `pv` > FROM baidu_log_view > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE); > > > 赵一旦 <[email protected]> 于2020年11月23日周一 下午3:00写道: > >> @hailongwang 一样的。 >> >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。 >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况, >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。 >> >> >> >> >> hailongwang <[email protected]> 于2020年11月23日周一 下午2:39写道: >> >>> 数据库中主键的设置跟 primary key 定义的一样不? >>> >>> >>> Best, >>> Hailong >>> 在 2020-11-23 13:15:01,"赵一旦" <[email protected]> 写道: >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on >>> >duplicate方式写入。 >>> > >>> >但我在使用中,发现报了 duplicate entry的错误。例如: >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4. >>> >MySQLIntegrityConstraintViolationException: Duplicate entry >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt' >>> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> > at sun.reflect.NativeConstructorAccessorImpl.newInstance( >>> >NativeConstructorAccessorImpl.java:62) >>> > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( >>> >DelegatingConstructorAccessorImpl.java:45) >>> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>> > at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) >>> > at com.mysql.jdbc.Util.getInstance(Util.java:386) >>> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041) >>> > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190) >>> > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122) >>> > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570) >>> > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731) >>> > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818) >>> > at >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement >>> >.java:2157) >>> > at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement >>> >.java:2460) >>> > at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement >>> >.java:2377) >>> > at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement >>> >.java:2361) >>> > at com.mysql.jdbc.PreparedStatement.executeBatchedInserts( >>> >PreparedStatement.java:1793) >>> > >>> >(2) >>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。 >>> >但这个冲突的entry是在14.11分那一波才报错的。 >>> >>
