[jira] [Created] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase
Di Wu created FLINK-35600: - Summary: Data read duplication during the full-to-incremental conversion phase Key: FLINK-35600 URL: https://issues.apache.org/jira/browse/FLINK-35600 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: Di Wu Assume that the table has been split into 3 Chunks Timeline t1: chunk1 is read t2: a piece of data A belonging to chunk2 is inserted in MySQL t3: chunk2 is read, and data A has been sent downstream t4: chunk3 is read At this time, startOffset will be set to lowwatermark t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method shouldEmit to determine whether the data is sent downstream In this method {code:java} private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) { if (pureBinlogPhaseTables.contains(tableId)) { return true; } // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { pureBinlogPhaseTables.add(tableId); return true; } } {code} *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without ts_sec variable, and the default value is 0 *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* So this expression is judged as true *Data A continues to be sent downstream, and the data is repeated* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase
[ https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-35600: -- Description: Assume that the table has been split into 3 Chunks Timeline t1: chunk1 is read t2: a piece of data A belonging to chunk2 is inserted in MySQL t3: chunk2 is read, and data A has been sent downstream t4: chunk3 is read At this time, startOffset will be set to lowwatermark t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method *shouldEmit* to determine whether the data is sent downstream In this method {code:java} private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) { if (pureBinlogPhaseTables.contains(tableId)) { return true; } // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { pureBinlogPhaseTables.add(tableId); return true; } } {code} *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without ts_sec variable, and the default value is 0 *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* So this expression is judged as true *Data A continues to be sent downstream, and the data is repeated* was: Assume that the table has been split into 3 Chunks Timeline t1: chunk1 is read t2: a piece of data A belonging to chunk2 is inserted in MySQL t3: chunk2 is read, and data A has been sent downstream t4: chunk3 is read At this time, startOffset will be set to lowwatermark t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method shouldEmit to determine whether the data is sent downstream In this method {code:java} private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) { if (pureBinlogPhaseTables.contains(tableId)) { return true; } // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { pureBinlogPhaseTables.add(tableId); return true; } } {code} *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without ts_sec variable, and the default value is 0 *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* So this expression is judged as true *Data A continues to be sent downstream, and the data is repeated* > Data read duplication during the full-to-incremental conversion phase > - > > Key: FLINK-35600 > URL: https://issues.apache.org/jira/browse/FLINK-35600 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Di Wu >Priority: Major > Labels: pull-request-available > > Assume that the table has been split into 3 Chunks > Timeline > t1: chunk1 is read > t2: a piece of data A belonging to chunk2 is inserted in MySQL > t3: chunk2 is read, and data A has been sent downstream > t4: chunk3 is read > At this time, startOffset will be set to lowwatermark > t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method > *shouldEmit* to determine whether the data is sent downstream > In this method > {code:java} > private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset > position) { > if (pureBinlogPhaseTables.contains(tableId)) { > return true; > } > // the existed tables those have finished snapshot reading > if (maxSplitHighWatermarkMap.containsKey(tableId) > && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { > pureBinlogPhaseTables.add(tableId); > return true; > } > } {code} > *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data > without ts_sec variable, and the default value is 0 > *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* > So this expression is judged as true > *Data A continues to be sent downstream, and the data is repeated* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35838) FLIP-399: Flink Connector Doris
Di Wu created FLINK-35838: - Summary: FLIP-399: Flink Connector Doris Key: FLINK-35838 URL: https://issues.apache.org/jira/browse/FLINK-35838 Project: Flink Issue Type: New Feature Reporter: Di Wu As discussed in Flink dev mailing list[1][2], we should finish the repo and doc migration as soon as possible. https://lists.apache.org/thread/w3hoglk0pqbzqhzlfcgzkkz3xrwo90rt https://lists.apache.org/thread/b32qvhzpmq06z2x5s9c8qr3pzsnld34m -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35839) Migrate repo from apache/doris to apache/flink
Di Wu created FLINK-35839: - Summary: Migrate repo from apache/doris to apache/flink Key: FLINK-35839 URL: https://issues.apache.org/jira/browse/FLINK-35839 Project: Flink Issue Type: Sub-task Reporter: Di Wu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35840) Add documentation for Doris
Di Wu created FLINK-35840: - Summary: Add documentation for Doris Key: FLINK-35840 URL: https://issues.apache.org/jira/browse/FLINK-35840 Project: Flink Issue Type: Sub-task Reporter: Di Wu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35841) Change package name to org.apache.flink
Di Wu created FLINK-35841: - Summary: Change package name to org.apache.flink Key: FLINK-35841 URL: https://issues.apache.org/jira/browse/FLINK-35841 Project: Flink Issue Type: Sub-task Reporter: Di Wu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35842) Change the FlinkCDC dependency to 3.1.x
Di Wu created FLINK-35842: - Summary: Change the FlinkCDC dependency to 3.1.x Key: FLINK-35842 URL: https://issues.apache.org/jira/browse/FLINK-35842 Project: Flink Issue Type: Sub-task Reporter: Di Wu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36631) Supports reading incremental data from Oracle from a specified SCN
Di Wu created FLINK-36631: - Summary: Supports reading incremental data from Oracle from a specified SCN Key: FLINK-36631 URL: https://issues.apache.org/jira/browse/FLINK-36631 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: 3.0.0 Reporter: Di Wu OracleCDC currently only supports initial and latest-offset. Add specific-offset to support reading incremental data from a specified SCN. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Description: Oracle When reading via OracleIncrementalSource, the connection is occasionally closed. {code:java} 14:57:56,432 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection [pool-14-thread-1] [] - Connection gracefully closed 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper [debezium-snapshot-reader-0] [] - Mining session stopped due to the java.sql.SQLException: 关闭的 Resultset: getLong 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler [debezium-snapshot-reader-0] [] - Producer failure java.sql.SQLException: 关闭的 Resultset: getLong at oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0] at io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] at io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) ~[flink-connector-oracle-cdc/:?] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) ~[flink-cdc-base/:?] at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) ~[flink-cdc-base/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code} {*}reason{*}: This is because after split is read, the reader will be closed, at which point LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics to obtain statistical information. Finally, in the code {code:java} public T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper mapper) throws SQLException { Objects.requireNonNull(mapper, "Mapper must be provided"); Connection conn = connection(); // Check if the conn is connected try (Statement statement = statementFactory.createStatement(conn);) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("running '{}'", query); } try (ResultSet resultSet = statement.executeQuery(query);) { //When you get here, split executes the close method to close the connection, and an error will be reported return mapper.apply(resultSet); } } } {code} *solve:* -1. we can regenerate a connection before calling the *captureSessionMemoryStatistics(connection)* method, but this will be time-consuming. In my local test, it took 6 seconds.- 2. Since *captureSessionMemoryStatistics* is just statistical information, I think it can be placed before {*}process{*}, so that it can ensure that the connection is no longer in use when split reader close was: Oracle When reading via OracleIncrementalSource, the connection is occasionally closed. {code:java} 14:57:56,432 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher 14:57:56,597 INFO io.debezium.jdbc
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Fix Version/s: 3.0.0 > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Di Wu >Priority: Major > Labels: CDC > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Description: Oracle When reading via OracleIncrementalSource, the connection is occasionally closed. > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Di Wu >Priority: Major > Labels: CDC > Fix For: cdc-3.3.0 > > > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1
[ https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36687: -- Description: Update Doris Pipeline Connector Version to 24.0.1 Improve Doris Pipeline Connector Version to 24.0.1, Related release note [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] was:Update Doris Pipeline Connector Version to 24.0.1 > Improve Doris Pipeline Connector Version to 24.0.1 > -- > > Key: FLINK-36687 > URL: https://issues.apache.org/jira/browse/FLINK-36687 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Di Wu >Priority: Major > Labels: pull-request-available > > Update Doris Pipeline Connector Version to 24.0.1 > Improve Doris Pipeline Connector Version to 24.0.1, Related release note > [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] > [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1
[ https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36687: -- Description: Improve Doris Pipeline Connector Version to 24.0.1, Related release note [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] was: Update Doris Pipeline Connector Version to 24.0.1 Improve Doris Pipeline Connector Version to 24.0.1, Related release note [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] > Improve Doris Pipeline Connector Version to 24.0.1 > -- > > Key: FLINK-36687 > URL: https://issues.apache.org/jira/browse/FLINK-36687 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Di Wu >Priority: Major > Labels: pull-request-available > > Improve Doris Pipeline Connector Version to 24.0.1, Related release note > [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] > [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1
[ https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu closed FLINK-36687. - Resolution: Resolved > Improve Doris Pipeline Connector Version to 24.0.1 > -- > > Key: FLINK-36687 > URL: https://issues.apache.org/jira/browse/FLINK-36687 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Di Wu >Priority: Major > Labels: pull-request-available > > Improve Doris Pipeline Connector Version to 24.0.1, Related release note > [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] > [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1
[ https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897140#comment-17897140 ] Di Wu commented on FLINK-36687: --- This has been resolved here https://github.com/apache/flink-cdc/pull/3691 > Improve Doris Pipeline Connector Version to 24.0.1 > -- > > Key: FLINK-36687 > URL: https://issues.apache.org/jira/browse/FLINK-36687 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Di Wu >Priority: Major > Labels: pull-request-available > > Improve Doris Pipeline Connector Version to 24.0.1, Related release note > [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477] > [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1
Di Wu created FLINK-36687: - Summary: Improve Doris Pipeline Connector Version to 24.0.1 Key: FLINK-36687 URL: https://issues.apache.org/jira/browse/FLINK-36687 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: Di Wu Update Doris Pipeline Connector Version to 24.0.1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36397) Using the offset obtained after a query transaction as a high watermark cannot ensure exactly-once semantics.
[ https://issues.apache.org/jira/browse/FLINK-36397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897115#comment-17897115 ] Di Wu commented on FLINK-36397: --- In snapshotreader, the data from lw to hw will be replayed, which should not cause data loss. > Using the offset obtained after a query transaction as a high watermark > cannot ensure exactly-once semantics. > - > > Key: FLINK-36397 > URL: https://issues.apache.org/jira/browse/FLINK-36397 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: Zhongmin Qiao >Assignee: Zhongmin Qiao >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.1 > > Attachments: picture1.png > > > !picture1.png|width=564,height=357! > Using the offset obtained after a query transaction as a high watermark > cannot ensure exactly-once semantics because "show master status" and the > query action are not in the same transaction. There may be data inserted > between the query action and the retrieval of the high watermark. As a > result, these data will be lost since we only deliver data after the high > watermark during the binlog phase. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Fix Version/s: cdc-3.3.0 (was: 3.0.0) > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Di Wu >Priority: Major > Labels: CDC > Fix For: cdc-3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Labels: CDC (was: ) > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Di Wu >Priority: Major > Labels: CDC > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Affects Version/s: cdc-3.1.1 cdc-3.2.0 cdc-3.3.0 > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0 >Reporter: Di Wu >Priority: Major > Labels: CDC, pull-request-available > > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed. > > {code:java} > 14:57:56,432 INFO > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader > [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> > Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher > 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection > [pool-14-thread-1] [] - Connection gracefully closed > 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper > [debezium-snapshot-reader-0] [] - Mining session stopped due to the > java.sql.SQLException: 关闭的 Resultset: getLong > 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler > [debezium-snapshot-reader-0] [] - Producer failure > java.sql.SQLException: 关闭的 Resultset: getLong > at > oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) > ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0] > at > io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) > ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] > at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) > ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] > at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) > ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] > at > io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) > ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] > at > io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) > ~[flink-connector-oracle-cdc/:?] > at > io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) > ~[flink-cdc-base/:?] > at > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) > ~[flink-cdc-base/:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_322] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_322] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code} > > {*}reason{*}: > > This is because after split is read, the reader will be closed, at which > point LogMinerStreamingChangeEventSource will perform > captureSessionMemoryStatistics to obtain statistical information. > Finally, in the code > > {code:java} > public T queryAndMap(String query, StatementFactory statementFactory, > ResultSetMapper mapper) throws SQLException { > Objects.requireNonNull(mapper, "Mapper must be provided"); > Connection conn = connection(); // Check if the conn is connected > try (Statement statement = statementFactory.createStatement(conn);) { > if (LOGGER.isTraceEnabled()) { > LOGGER.trace("running '{}'", query); > } > try (ResultSet resultSet = statement.executeQuery(query);) { > //When you get here, split executes the close method to close the > connection, and an error will be reported > return mapper.apply(resultSet); > } > } > } {code} > > *solve:* > -1. we can regenerate a connection before calling the > *captureSessionMemoryStatist
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Description: Oracle When reading via OracleIncrementalSource, the connection is occasionally closed. {code:java} 14:57:56,432 INFO org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection [pool-14-thread-1] [] - Connection gracefully closed 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper [debezium-snapshot-reader-0] [] - Mining session stopped due to the java.sql.SQLException: 关闭的 Resultset: getLong 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler [debezium-snapshot-reader-0] [] - Producer failure java.sql.SQLException: 关闭的 Resultset: getLong at oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0] at io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] at io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) ~[flink-connector-oracle-cdc/:?] at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) ~[flink-connector-oracle-cdc/:?] at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) ~[flink-cdc-base/:?] at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) ~[flink-cdc-base/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code} {*}reason{*}: This is because after split is read, the reader will be closed, at which point LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics to obtain statistical information. Finally, in the code {code:java} public T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper mapper) throws SQLException { Objects.requireNonNull(mapper, "Mapper must be provided"); Connection conn = connection(); // Check if the conn is connected try (Statement statement = statementFactory.createStatement(conn);) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("running '{}'", query); } try (ResultSet resultSet = statement.executeQuery(query);) { //When you get here, split executes the close method to close the connection, and an error will be reported return mapper.apply(resultSet); } } } {code} *solve:* -1. we can regenerate a connection before calling the *captureSessionMemoryStatistics(connection)* method, but this will be time-consuming. In my local test, it took 6 seconds.- 2. Since *captureSessionMemoryStatistics* is just statistical information, I think it can be placed before process, so that it can ensure that the connection is no longer in use when split reader close was: Oracle When reading via OracleIncrementalSource, the connection is occasionally closed. > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Fli
[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
[ https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36649: -- Fix Version/s: (was: cdc-3.3.0) > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed > -- > > Key: FLINK-36649 > URL: https://issues.apache.org/jira/browse/FLINK-36649 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Di Wu >Priority: Major > Labels: CDC, pull-request-available > > Oracle When reading via OracleIncrementalSource, the connection is > occasionally closed. > > {code:java} > 14:57:56,432 INFO > org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader > [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> > Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher > 14:57:56,597 INFO io.debezium.jdbc.JdbcConnection > [pool-14-thread-1] [] - Connection gracefully closed > 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper > [debezium-snapshot-reader-0] [] - Mining session stopped due to the > java.sql.SQLException: 关闭的 Resultset: getLong > 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler > [debezium-snapshot-reader-0] [] - Producer failure > java.sql.SQLException: 关闭的 Resultset: getLong > at > oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254) > ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0] > at > io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373) > ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] > at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) > ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] > at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) > ~[debezium-core-1.9.8.Final.jar:1.9.8.Final] > at > io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372) > ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final] > at > io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353) > ~[flink-connector-oracle-cdc/:?] > at > io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106) > ~[flink-connector-oracle-cdc/:?] > at > org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112) > ~[flink-cdc-base/:?] > at > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99) > ~[flink-cdc-base/:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_322] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_322] > at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code} > > {*}reason{*}: > > This is because after split is read, the reader will be closed, at which > point LogMinerStreamingChangeEventSource will perform > captureSessionMemoryStatistics to obtain statistical information. > Finally, in the code > > {code:java} > public T queryAndMap(String query, StatementFactory statementFactory, > ResultSetMapper mapper) throws SQLException { > Objects.requireNonNull(mapper, "Mapper must be provided"); > Connection conn = connection(); // Check if the conn is connected > try (Statement statement = statementFactory.createStatement(conn);) { > if (LOGGER.isTraceEnabled()) { > LOGGER.trace("running '{}'", query); > } > try (ResultSet resultSet = statement.executeQuery(query);) { > //When you get here, split executes the close method to close the > connection, and an error will be reported > return mapper.apply(resultSet); > } > } > } {code} > > *solve:* > -1. we can regenerate a connection before calling the > *captureSessionMemoryStatistics(connection)* method, but this will be > time-consuming. In my local test, it took 6 seconds.- > 2. Since *cap
[jira] [Created] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed
Di Wu created FLINK-36649: - Summary: Oracle When reading via OracleIncrementalSource, the connection is occasionally closed Key: FLINK-36649 URL: https://issues.apache.org/jira/browse/FLINK-36649 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Di Wu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36631) Supports reading incremental data from Oracle from a specified SCN
[ https://issues.apache.org/jira/browse/FLINK-36631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36631: -- Affects Version/s: cdc-3.1.1 cdc-3.2.0 cdc-3.1.0 (was: 3.0.0) > Supports reading incremental data from Oracle from a specified SCN > -- > > Key: FLINK-36631 > URL: https://issues.apache.org/jira/browse/FLINK-36631 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: Di Wu >Assignee: Di Wu >Priority: Major > Labels: CDC, pull-request-available > > OracleCDC currently only supports initial and latest-offset. Add > specific-offset to support reading incremental data from a specified SCN. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36813) MySQLCDC supports synchronization of specified fields
[ https://issues.apache.org/jira/browse/FLINK-36813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36813: -- Description: *Background* In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here [https://github.com/apache/flink-cdc/discussions/3058] *Current situation* For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: [https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list] For the full snapshot stage, * is currently used in {_}MySqlSnapshotSplitReadTask{_}, refer to {code:java} if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); {code} *Solution* We can refer to debezium [RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776), The user configures column.include.list, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. was: *Background* In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058 *Current situation* For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list For the full snapshot stage, * is currently used in {_}MySqlSnapshotSplitReadTask{_}, refer to {code:java} if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); {code} *Solution* The user configures {_}column.include.list{_}, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. > MySQLCDC supports synchronization of specified fields > - > > Key: FLINK-36813 > URL: https://issues.apache.org/jira/browse/FLINK-36813 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.1 >Reporter: Di Wu >Priority: Major > Labels: CDC, pull-request-available > Fix For: cdc-3.3.0 > > > *Background* > In some scenarios, MySQL synchronization only expects to synchronize > specified fields instead of all fields in the table. > 1. The user only has the permission for some fields in MySQL > 2. The user has too many fields in a single table and only wants to > synchronize some fields, for example, here > [https://github.com/apache/flink-cdc/discussions/3058] > *Current situation* > For the incremental stage, you only need to configure the column.include.list > property of debezium to support the synchronization of some fields in the > incremental stage, refer to: > [https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list] > For the full snapshot stage, * is currently used in > {_}MySqlSnapshotSplitReadTask{_}, refer to > {code:java} > if (isScanningData) { > return buildSelectWithRowLimits( > tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); > {code} > > *Solution* > We can refer to debezium > [RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776), > The user configures column.include.list, and then captures the specific > columns in MySqlSnapshotSplitReadTask, and splices them when constructing > Scan SQL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36813) MySQLCDC supports synchronization of specified fields
Di Wu created FLINK-36813: - Summary: MySQLCDC supports synchronization of specified fields Key: FLINK-36813 URL: https://issues.apache.org/jira/browse/FLINK-36813 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: cdc-3.2.1 Reporter: Di Wu Fix For: cdc-3.3.0 *Background* In some scenarios, MySQL synchronization only expects to synchronize specified fields instead of all fields in the table. 1. The user only has the permission for some fields in MySQL 2. The user has too many fields in a single table and only wants to synchronize some fields, for example, here https://github.com/apache/flink-cdc/discussions/3058 *Current situation* For the incremental stage, you only need to configure the column.include.list property of debezium to support the synchronization of some fields in the incremental stage, refer to: https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list For the full snapshot stage, * is currently used in {_}MySqlSnapshotSplitReadTask{_}, refer to {code:java} if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); {code} *Solution* The user configures {_}column.include.list{_}, and then captures the specific columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan SQL. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36631) Supports reading incremental data from Oracle from a specified SCN
[ https://issues.apache.org/jira/browse/FLINK-36631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Di Wu updated FLINK-36631: -- Fix Version/s: cdc-3.4.0 > Supports reading incremental data from Oracle from a specified SCN > -- > > Key: FLINK-36631 > URL: https://issues.apache.org/jira/browse/FLINK-36631 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1 >Reporter: Di Wu >Assignee: Di Wu >Priority: Major > Labels: CDC, pull-request-available > Fix For: cdc-3.4.0 > > > OracleCDC currently only supports initial and latest-offset. Add > specific-offset to support reading incremental data from a specified SCN. -- This message was sent by Atlassian Jira (v8.20.10#820010)