[ https://issues.apache.org/jira/browse/FLINK-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhaoshijie reassigned FLINK-12494: ---------------------------------- Assignee: (was: zhaoshijie) > JDBCOutputFormat support reconnect when link failure and flush by timeInterval > ------------------------------------------------------------------------------ > > Key: FLINK-12494 > URL: https://issues.apache.org/jira/browse/FLINK-12494 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.8.0 > Reporter: zhaoshijie > Priority: Major > > when i JDBCSink(flink-1.4.2) wite recode to mysql,find exception as flow : > > {code:java} > java.util.concurrent.ExecutionException: > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > The last packet successfully received from the server was 265,251 > milliseconds ago. The last packet sent successfully to the server was 265,252 > milliseconds ago. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.get(StreamRecordQueueEntry.java:68) > at > org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:129) > ... 2 more > Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: > Communications link failure > The last packet successfully received from the server was 265,251 > milliseconds ago. The last packet sent successfully to the server was 265,252 > milliseconds ago. > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) > at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1116) > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3364) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1983) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624) > at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127) > at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2293) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQuery(JDBCDimensionTableFunction.scala:199) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction.executeQueryAndCombine(JDBCDimensionTableFunction.scala:139) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:83) > at > org.apache.flink.table.plan.nodes.datastream.dimension.JDBCDimensionTableFunction$$anon$1.apply(JDBCDimensionTableFunction.scala:73) > at > org.apache.flink.streaming.api.functions.async.DimensionTableJoinFunction.lambda$asyncInvoke$0(DimensionTableJoinFunction.java:105) > at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: java.net.SocketException: Connection reset > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) > at java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3345) > ... 16 more > {code} > i think it is too long not write record by connection(idleConnection),server > close connection initiative. sparse data is relatively common in fact, so i > think we should reconnect when then connection is invalid。 > besides,i find JDBCOutputFormat.flush only call by snapshotState method and > "batchCount >= batchInterval",also if ours sink records is sparse, we will > find actual write happended by very large time delay,so should we add a flush > condition:currentTime- lastFlushTime > timeInterval? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)