请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。

On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[email protected]> wrote:

> 如下是Flink官方文档JBDC connector的部分内容。Key handling
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >
>
> Flink uses the primary key that defined in DDL when writing data to
> external databases. The connector operate in upsert mode if the primary key
> was defined, otherwise, the connector operate in append mode.
>
> In upsert mode, Flink will insert a new row or update the existing row
> according to the primary key, Flink can ensure the idempotence in this way.
> To guarantee the output result is as expected, it’s recommended to define
> primary key for the table and make sure the primary key is one of the
> unique key sets or primary key of the underlying database table. In append
> mode, Flink will interpret all records as INSERT messages, the INSERT
> operation may fail if a primary key or unique constraint violation happens
> in the underlying database.
>
> See CREATE TABLE DDL
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >
> for
> more details about PRIMARY KEY syntax.
>
>
> 这里也有一点,In append mode, Flink will interpret all records as INSERT messages,
> the INSERT operation may fail if a primary key or unique constraint
> violation happens in the underlying database.  什么叫append
> mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>
> 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>
>
>
> 赵一旦 <[email protected]> 于2020年11月23日周一 下午3:02写道:
>
> > 补充sql:
> >
> > DDL:
> >
> > CREATE TABLE flink_recent_pv_subid
> > (
> >     `supply_id` STRING,
> >     `subid`     STRING,
> >     `mark`      STRING,
> >     `time`      STRING,
> >     `pv`        BIGINT,
> >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > ) WITH (
> >   'connector.type' = 'jdbc',
> >
> >   ......
> >
> > );
> >
> >
> > 查询SQL:
> >
> > INSERT INTO
> >     flink_recent_pv_subid
> > SELECT
> >     `sid`,
> >     `subid`,
> >     `mark`,
> >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> 'yyyyMMddHHmm') as `time`,
> >     count(1) AS `pv`
> > FROM baidu_log_view
> > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
> >
> >
> > 赵一旦 <[email protected]> 于2020年11月23日周一 下午3:00写道:
> >
> >> @hailongwang 一样的。
> >>
> >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> >>
> >>
> >>
> >>
> >> hailongwang <[email protected]> 于2020年11月23日周一 下午2:39写道:
> >>
> >>> 数据库中主键的设置跟 primary key 定义的一样不?
> >>>
> >>>
> >>> Best,
> >>> Hailong
> >>> 在 2020-11-23 13:15:01,"赵一旦" <[email protected]> 写道:
> >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> >>> >duplicate方式写入。
> >>> >
> >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >>> Method)
> >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >>> >NativeConstructorAccessorImpl.java:62)
> >>> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >>> >DelegatingConstructorAccessorImpl.java:45)
> >>> >    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >>> >    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >>> >    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >>> >    at
> >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >>> >.java:2157)
> >>> >    at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2460)
> >>> >    at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2377)
> >>> >    at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2361)
> >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >>> >PreparedStatement.java:1793)
> >>> >
> >>> >(2)
> >>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >>> >但这个冲突的entry是在14.11分那一波才报错的。
> >>>
> >>
>

回复