[ https://issues.apache.org/jira/browse/FLINK-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064202#comment-17064202 ]
tangshangwen commented on FLINK-16708: -------------------------------------- On the first execution, connection.getAutoCommit () throws PSQLException: This connection has been closed, at which time the batchStatements have been cleared {code:java} // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); batchParameters.clear(); ... if (connection.getAutoCommit()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }{code} When executeBatch method is executed for the second time, batchStatements is isEmpty and returned without throwing any exception {code:java} // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { return new int[0]; } return internalExecuteBatch().getUpdateCount(); } {code} > When a JDBC connection has been closed, the retry policy of the > JDBCUpsertOutputFormat cannot take effect and may result in data loss > ------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-16708 > URL: https://issues.apache.org/jira/browse/FLINK-16708 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.10.0 > Reporter: tangshangwen > Priority: Major > > In our test environment, I used the tcpkill command to simulate a scenario > where the postgresql connection was closed. I found that the retry strategy > of the flush method did not take effect, and when it retried the second time, > it could not recognize that the connection had been closed because Before the > first check whether the connection is closed, the batchStatements of > PgStatement have been cleared, which causes the second execution to check > that the batchStatements are empty and return normally. > {code:java} > 2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch > error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at > org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch > error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at > org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)