各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO



|
errorStream.addSink(JdbcSink.sink(
"insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
        (statement, result) -> {
            statement.setString(1,result.getAction());
            statement.setString(2,result.getServerIp());
            statement.setString(3,result.getHandleSerialno());
            statement.setString(4,result.getMd5Num());
            statement.setString(5,result.getInsertTime());
            statement.setString(6, result.getDateTime());
        },
        JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(200)
                .withMaxRetries(5)
                .build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                
.withUrl("jdbc:mysql://111/data_ret_log?useSSL=false&useUnicode=false&failOverReadOnly=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnectForPools=true&serverTimezone=Asia/Shanghai&autoReconnect=true")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("111")
                .withPassword("222")
                .build()
)).name("sink-error-mysql");
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复