[ https://issues.apache.org/jira/browse/FLINK-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064580#comment-17064580 ]
Weike Dong commented on FLINK-16708: ------------------------------------ Hi [~lzljs3620320] , do you have time to look into this issue? Thanks > When a JDBC connection has been closed, the retry policy of the > JDBCUpsertOutputFormat cannot take effect and may result in data loss > ------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-16708 > URL: https://issues.apache.org/jira/browse/FLINK-16708 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.10.0 > Reporter: tangshangwen > Assignee: tangshangwen > Priority: Major > > In our test environment, I used the tcpkill command to simulate a scenario > where the postgresql connection was closed. I found that the retry strategy > of the flush method did not take effect, and when it retried the second time, > it could not recognize that the connection had been closed because Before the > first check whether the connection is closed, the batchStatements of > PgStatement have been cleared, which causes the second execution to check > that the batchStatements are empty and return normally. > {code:java} > 2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch > error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at > org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch > error, retry times = 1 > org.postgresql.util.PSQLException: This connection has been closed. > at > org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) > at > org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) > at > org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) > at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) > at > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) > at > org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) > at > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)