[ https://issues.apache.org/jira/browse/FLINK-30431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-30431: ----------------------------------- Labels: pull-request-available (was: ) > JDBC Connector fails to reestablish the lost DB connection. > ------------------------------------------------------------ > > Key: FLINK-30431 > URL: https://issues.apache.org/jira/browse/FLINK-30431 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.15.0 > Environment: Flink 1.15.2 > flink-connector-jdbc: 1.15.0 > Reporter: Sai Nadendla > Priority: Major > Labels: pull-request-available > > Our use case with JDBC connector is to sink records to Amazon Redshift DB > table. > At some point in time the connection with redshift gets closed and the > Flink's JDBC connector tries to detect & reestablish the connection in the > following manner in the @ JdbcOutputFormat.flush() : > {code:java} > 1. public synchronized void flush() throws IOException { > 2. .. > 3. > 4. for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { > 5. try { > 6. attemptFlush(); > 7. ... > 8. } catch (SQLException e) { > 9. ... > 10. try { > 11. if (!connectionProvider.isConnectionValid()) { <-- TRUE! > 12. updateExecutor(true); > 13. } > 14. } catch (Exception exception) { > 15. .... > 16. throw new IOException("Reestablish JDBC connection failed", > exception); > 17. } > 18. .... > 19. } > 20. } > 21. .... > 22. }{code} > updateExecutor() is called (from line#12 of the above code snippet) to close > statements and re-establish the DB connection. > > {code:java} > 1. public void updateExecutor(boolean reconnect) throws SQLException, ...{ > 2. jdbcStatementExecutor.closeStatements(); > 3. jdbcStatementExecutor.prepareStatements( > 4. reconnect > 5. ? connectionProvider.reestablishConnection() > 6. : connectionProvider.getConnection()); > 7. } {code} > h4. > ---- > h3. Results: > h4. Expected: > The connection should be re-established and the updates should be reflected > on DB. > h4. Actual: > {color:#de350b}*The {{connection re-establish}} code is never reached/invoked > !!.*{color} The closeStatements() fails/throws (as the connection is already > closed). > {noformat} > Caused by: com.amazon.redshift.util.RedshiftException: This connection has > been closed. > at > com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095) > ~[?:?] > at > com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299) > ~[?:?] > at > com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042) > ~[?:?] > at > com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748) > ~[?:?] > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) > ~[?:?] > at > org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402) > ~[?:?] > at > org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226) > ~[?:?] > at > org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[?:1.8.0_292] > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > ~[?:1.8.0_292] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > ~[?:1.8.0_292] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > ~[?:1.8.0_292] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_292] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_292] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] > 2022-11-09 03:00:07,510 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping > checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b. > 2022-11-09 03:00:07,517 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > 0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED. > {noformat} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)