[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Di Wu updated FLINK-36649: -------------------------- Affects Version/s: cdc-3.1.1 cdc-3.2.0 cdc-3.3.0 > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -------------------------------------------------------------------------------------- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0 > Reporter: Di Wu > Priority: Major > Labels: CDC, pull-request-available > > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed. > > {code:java} > 14:57:56,432 INFO > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader > [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> > Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher > 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection > [pool-14-thread-1] [] - Connection gracefully closed > 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper > [debezium-snapshot-reader-0] [] - Mining session stopped due to the > java.sql.SQLException: 关闭的 Resultset: getLong > 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler > [debezium-snapshot-reader-0] [] - Producer failure > java.sql.SQLException: 关闭的 Resultset: getLong > at > oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) > ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0] > at > io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) > ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] > at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) > ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] > at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) > ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] > at > io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) > ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] > at > io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) > ~[flink-connector-oracle-cdc/:?] > at > io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) > ~[flink-cdc-base/:?] > at > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) > ~[flink-cdc-base/:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_322] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_322] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code} > > {*}reason{*}: > > This is because after split is read, the reader will be closed, at which > point LogMinerStreamingChangeEventSource will perform > captureSessionMemoryStatistics to obtain statistical information. > Finally, in the code > > {code:java} > public <T> T queryAndMap(String query, StatementFactory statementFactory, > ResultSetMapper<T> mapper) throws SQLException { > Objects.requireNonNull(mapper, "Mapper must be provided"); > Connection conn = connection(); // Check if the conn is connected > try (Statement statement = statementFactory.createStatement(conn);) { > if (LOGGER.isTraceEnabled()) { > LOGGER.trace("running '{}'", query); > } > try (ResultSet resultSet = statement.executeQuery(query);) { > //When you get here, split executes the close method to close the > connection, and an error will be reported > return mapper.apply(resultSet); > } > } > } {code} > > *solve:* > -1. we can regenerate a connection before calling the > *captureSessionMemoryStatistics(connection)* method, but this will be > time-consuming. In my local test, it took 6 seconds.- > 2. Since *captureSessionMemoryStatistics* is just statistical information, I > think it can be placed before process, so that it can ensure that the > connection is no longer in use when split reader close > > -- This message was sent by Atlassian Jira (v8.20.10#820010)