Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学 <ccc0606fight...@163.com> wrote:

> 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
>
> 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
> 是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values
> (1,2,3,4,5,6),(1,2,3,4,9,10);
> 或者是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
> 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO
>
>
>
> |
> errorStream.addSink(JdbcSink.sink(
> "insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
>         (statement, result) -> {
>             statement.setString(1,result.getAction());
>             statement.setString(2,result.getServerIp());
>             statement.setString(3,result.getHandleSerialno());
>             statement.setString(4,result.getMd5Num());
>             statement.setString(5,result.getInsertTime());
>             statement.setString(6, result.getDateTime());
>         },
>         JdbcExecutionOptions.builder()
>                 .withBatchSize(1000)
>                 .withBatchIntervalMs(200)
>                 .withMaxRetries(5)
>                 .build(),
> new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>
> .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
>                 .withDriverName("com.mysql.jdbc.Driver")
>                 .withUsername("111")
>                 .withPassword("222")
>                 .build()
> )).name("sink-error-mysql");
> |
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |

回复