[ 
https://issues.apache.org/jira/browse/FLINK-30431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30431:
-----------------------------------
    Priority: Major  (was: Blocker)

> JDBC Connector fails to reestablish the lost DB connection. 
> ------------------------------------------------------------
>
>                 Key: FLINK-30431
>                 URL: https://issues.apache.org/jira/browse/FLINK-30431
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.15.0
>         Environment: Flink 1.15.2
> flink-connector-jdbc: 1.15.0
>            Reporter: Sai Nadendla
>            Priority: Major
>
> Our use case with JDBC connector is to sink records to Amazon Redshift DB 
> table.
> At some point in time the connection with redshift gets closed and the 
> Flink's JDBC connector tries to detect & reestablish the connection in the 
> following manner in the @ JdbcOutputFormat.flush() :
> {code:java}
> 1. public synchronized void flush() throws IOException {
> 2.    ..
> 3.    
> 4.    for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
> 5.        try {
> 6.            attemptFlush();
> 7.            ...
> 8.        } catch (SQLException e) {
> 9.            ...
> 10.            try {
> 11.                if (!connectionProvider.isConnectionValid()) { <-- TRUE!
> 12.                    updateExecutor(true);  
> 13.                }
> 14.            } catch (Exception exception) {
> 15.              ....
> 16.              throw new IOException("Reestablish JDBC connection failed", 
> exception);
> 17.            }
> 18.            ....
> 19.        }
> 20.     }
> 21.     ....
> 22. }{code}
> updateExecutor() is called (from line#12 of the above code snippet) to close 
> statements and re-establish the DB connection. 
>  
> {code:java}
> 1. public void updateExecutor(boolean reconnect) throws SQLException, ...{
> 2.     jdbcStatementExecutor.closeStatements(); 
> 3.     jdbcStatementExecutor.prepareStatements(
> 4.             reconnect
> 5.                     ? connectionProvider.reestablishConnection()
> 6.                     : connectionProvider.getConnection());
> 7. } {code}
> h4.  
> ----
> h3. Results:
> h4. Expected:
> The connection should be re-established and the updates should be reflected 
> on DB.
> h4. Actual:
> {color:#de350b}*The {{connection re-establish}} code is never reached/invoked 
> !!.*{color} The closeStatements() fails/throws (as the connection is already 
> closed).
> {noformat}
> Caused by: com.amazon.redshift.util.RedshiftException: This connection has 
> been closed.
>     at 
> com.amazon.redshift.jdbc.RedshiftConnectionImpl.checkClosed(RedshiftConnectionImpl.java:1095)
>  ~[?:?]
>     at 
> com.amazon.redshift.jdbc.RedshiftConnectionImpl.cancelQuery(RedshiftConnectionImpl.java:1299)
>  ~[?:?]
>     at 
> com.amazon.redshift.jdbc.RedshiftStatementImpl.cancel(RedshiftStatementImpl.java:1042)
>  ~[?:?]
>     at 
> com.amazon.redshift.jdbc.RedshiftStatementImpl.close(RedshiftStatementImpl.java:748)
>  ~[?:?]
>     at 
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
>  ~[?:?]
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.updateExecutor(JdbcOutputFormat.java:402)
>  ~[?:?]
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:226)
>  ~[?:?]
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
>  ~[?:?]
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_292]
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> ~[?:1.8.0_292]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  ~[?:1.8.0_292]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  ~[?:1.8.0_292]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_292]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_292]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> 2022-11-09 03:00:07,510 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping 
> checkpoint coordinator for job 0f70cbc56798e19978b509bf0da0107b.
> 2022-11-09 03:00:07,517 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> 0f70cbc56798e19978b509bf0da0107b reached terminal state FAILED.
> {noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to