目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
 多条记录的操作
sink的invoke代码
@Override
public void invoke(Tuple5<String, String, String, String, List<BroadBandReq>> 
value, Context context) throws Exception {
connection.setAutoCommit(false);
List<BroadBandReq> f4 = value.f4;
for (BroadBandReq rs: f4){
statement.setString(1,rs.getUserId());
statement.setString(2,rs.getPhoneNum());
statement.setString(3,rs.getProvId());
statement.addBatch();
}
try {
statement.executeBatch();
connection.commit();
}catch (Exception e){
LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, 
phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4);
connection.rollback();
e.printStackTrace();
            throw new Exception(e);
}
    }




java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when trying 
to get lock; try restarting transaction
        at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
        at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
        at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
        at 
com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
        at 
com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
        at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get 
lock; try restarting transaction
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
        at com.mysql.jdbc.Util.getInstance(Util.java:408)
        at 
com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
        at 
com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772)
        at 
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
        at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
        at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67)
        ... 33 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: 
Deadlock found when trying to get lock; try restarting transaction
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
        at com.mysql.jdbc.Util.getInstance(Util.java:408)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
        at 
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
        at 
com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
        at 
com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
        ... 36 more
[flink-akka.actor.default-dispatcher-783] 











回复