如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
duplicate方式写入。
但我在使用中,发现报了 duplicate entry的错误。例如:
Caused by: com.mysql.jdbc.exceptions.jdbc4.
MySQLIntegrityConstraintViolationException: Duplicate entry
'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
at com.mysql.jdbc.Util.getInstance(Util.java:386)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
.java:2157)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2460)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2377)
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2361)
at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
PreparedStatement.java:1793)
(2)
此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
但这个冲突的entry是在14.11分那一波才报错的。