[ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--------------------------
    Affects Version/s: cdc-3.1.1
                       cdc-3.2.0
                       cdc-3.3.0

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-36649
>                 URL: https://issues.apache.org/jira/browse/FLINK-36649
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0
>            Reporter: Di Wu
>            Priority: Major
>              Labels: CDC, pull-request-available
>
> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed.
>  
> {code:java}
> 14:57:56,432 INFO  
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader
>  [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> 
> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
> 14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                            
>   [pool-14-thread-1] [] - Connection gracefully closed
> 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper       
>   [debezium-snapshot-reader-0] [] - Mining session stopped due to the 
> java.sql.SQLException: 关闭的 Resultset: getLong
> 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                          
>   [debezium-snapshot-reader-0] [] - Producer failure
> java.sql.SQLException: 关闭的 Resultset: getLong
>     at 
> oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254)
>  ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
>     at 
> io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373)
>  ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
>     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372)
>  ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112)
>  ~[flink-cdc-base/:?]
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)
>  ~[flink-cdc-base/:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_322]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_322]
>     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code}
>  
> {*}reason{*}: 
>  
> This is because after split is read, the reader will be closed, at which 
> point LogMinerStreamingChangeEventSource will perform 
> captureSessionMemoryStatistics to obtain statistical information.
> Finally, in the code
>  
> {code:java}
> public <T> T queryAndMap(String query, StatementFactory statementFactory, 
> ResultSetMapper<T> mapper) throws SQLException {
>     Objects.requireNonNull(mapper, "Mapper must be provided");
>     Connection conn = connection();      // Check if the conn is connected
>     try (Statement statement = statementFactory.createStatement(conn);) {
>         if (LOGGER.isTraceEnabled()) {
>             LOGGER.trace("running '{}'", query);
>         }
>         try (ResultSet resultSet = statement.executeQuery(query);) {
>             //When you get here, split executes the close method to close the 
> connection, and an error will be reported
>             return mapper.apply(resultSet);
>         }
>     }
> } {code}
>  
> *solve:*
> -1. we can regenerate a connection before calling the 
> *captureSessionMemoryStatistics(connection)* method, but this will be 
> time-consuming. In my local test, it took 6 seconds.-
> 2. Since *captureSessionMemoryStatistics* is just statistical information, I 
> think it can be placed before process, so that it can ensure that the 
> connection is no longer in use when split reader close
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to